Skip to content

Commit

Permalink
Merge pull request #221 from Dash-Industry-Forum/preencrypted-content
Browse files Browse the repository at this point in the history
Support pre-encrypted content
  • Loading branch information
tobbee authored Nov 1, 2024
2 parents e9d80d1 + 9b0d88e commit 21bce56
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 69 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Nothing yet
### Added

- Better logging when loading asset representation data
- Check that pre-encrypted content has the same duration for all representations

### Fixed

- Pre-encrypted content is not re-fragmented, but left as it

## [1.5.0] - 2024-10-02

Expand Down
97 changes: 56 additions & 41 deletions cmd/livesim2/app/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (am *assetMgr) addAsset(assetPath string) *asset {
}

// discoverAssets walks the file tree and finds all directories containing MPD files.
func (am *assetMgr) discoverAssets() error {
func (am *assetMgr) discoverAssets(logger *slog.Logger) error {
err := fs.WalkDir(am.vodFS, ".", func(p string, d fs.DirEntry, err error) error {
if path.Ext(p) == ".mpd" {
err := am.loadAsset(p)
err := am.loadAsset(logger, p)
if err != nil {
slog.Warn("Asset loading problem. Skipping", "asset", p, "err", err.Error())
logger.Warn("Asset loading problem. Skipping", "asset", p, "err", err.Error())
}
}
return nil
Expand All @@ -86,22 +86,24 @@ func (am *assetMgr) discoverAssets() error {
}

for aID, a := range am.assets {
err := a.consolidateAsset()
logger := logger.With("assetPath", a.AssetPath)
err := a.consolidateAsset(logger)
if err != nil {
slog.Warn("Asset consolidation problem. Skipping", "error", err.Error())
logger.Warn("Asset consolidation problem. Skipping", "error", err.Error())
delete(am.assets, aID) // This deletion should be safe
continue
}
slog.Info("Asset consolidated", "asset", a.AssetPath, "loopDurMS", a.LoopDurMS)
logger.Info("Asset consolidated", "loopDurMS", a.LoopDurMS)
}
return nil
}

func (am *assetMgr) loadAsset(mpdPath string) error {
func (am *assetMgr) loadAsset(logger *slog.Logger, mpdPath string) error {
assetPath, mpdName := path.Split(mpdPath)
if assetPath != "" {
assetPath = assetPath[:len(assetPath)-1]
}
logger = logger.With("assetPath", assetPath, "mpdName", mpdName)
asset := am.addAsset(assetPath)
md := internal.ReadMPDData(am.vodFS, mpdPath)

Expand Down Expand Up @@ -144,10 +146,10 @@ func (am *assetMgr) loadAsset(mpdPath string) error {
return fmt.Errorf("segmentTemplate on Representation level. Only supported on AdaptationSet level")
}
if _, ok := asset.Reps[rep.Id]; ok {
slog.Debug("Representation already loaded", "rep", rep.Id, "asset", mpdPath)
logger.Debug("Representation already loaded", "rep", rep.Id)
continue
}
r, err := am.loadRep(assetPath, as, rep)
r, err := am.loadRep(logger, assetPath, as, rep)
if err != nil {
return fmt.Errorf("getRep: %w", err)
}
Expand All @@ -166,24 +168,24 @@ func (am *assetMgr) loadAsset(mpdPath string) error {
}
}
}
slog.Info("asset MPD loaded", "asset", assetPath, "mpdName", mpdPath)
logger.Info("Asset MPD loaded")
return nil
}

func (am *assetMgr) loadRep(assetPath string, as *m.AdaptationSetType, rep *m.RepresentationType) (*RepData, error) {
func (am *assetMgr) loadRep(logger *slog.Logger, assetPath string, as *m.AdaptationSetType, rep *m.RepresentationType) (*RepData, error) {
logger = logger.With("rep", rep.Id, "assetPath", assetPath)
rp := RepData{ID: rep.Id,
ContentType: string(as.ContentType),
Codecs: as.Codecs,
MpdTimescale: 1,
}
if !am.writeRepData {
ok, err := rp.readFromJSON(am.vodFS, am.repDataDir, assetPath)
ok, err := rp.readFromJSON(logger, am.vodFS, am.repDataDir, assetPath)
if ok {
slog.Info("Representation loaded from JSON", "rep", rp.ID, "asset", assetPath)
return &rp, err
}
}
slog.Debug("Loading full representation", "rep", rp.ID, "asset", assetPath)
logger.Debug("Loading full representation", "rep", rp.ID, "asset", assetPath)
st := as.SegmentTemplate
if rep.SegmentTemplate != nil {
st = rep.SegmentTemplate
Expand All @@ -199,7 +201,7 @@ func (am *assetMgr) loadRep(assetPath string, as *m.AdaptationSetType, rep *m.Re
if st.Timescale != nil {
rp.MpdTimescale = int(*st.Timescale)
}
err := rp.addRegExpAndInit(am.vodFS, assetPath)
err := rp.addRegExpAndInit(logger, am.vodFS, assetPath)
if err != nil {
return nil, fmt.Errorf("addRegExpAndInit: %w", err)
}
Expand Down Expand Up @@ -292,15 +294,13 @@ segLoop:
if !am.writeRepData {
return &rp, nil
}
err = rp.writeToJSON(am.repDataDir, assetPath)
if err == nil {
slog.Info("Representation data written to JSON file", "rep", rp.ID, "asset", assetPath)
}
err = rp.writeToJSON(logger, am.repDataDir, assetPath)
return &rp, err
}

// readFromJSON reads the representation data from a gzipped or plain JSON file.
func (rp *RepData) readFromJSON(vodFS fs.FS, repDataDir, assetPath string) (bool, error) {
func (rp *RepData) readFromJSON(logger *slog.Logger, vodFS fs.FS, repDataDir, assetPath string) (bool, error) {
logger = logger.With("rep", rp.ID, "assetPath", assetPath)
if repDataDir == "" {
return false, nil
}
Expand All @@ -323,7 +323,7 @@ func (rp *RepData) readFromJSON(vodFS fs.FS, repDataDir, assetPath string) (bool
if err != nil {
return true, err
}
slog.Debug("Read repdata", "path", gzipPath)
logger.Info("Read gzipped repdata", "path", gzipPath)
}
if len(data) == 0 {
_, err := os.Stat(repDataPath)
Expand All @@ -332,7 +332,7 @@ func (rp *RepData) readFromJSON(vodFS fs.FS, repDataDir, assetPath string) (bool
if err != nil {
return true, err
}
slog.Debug("Read repdata", "path", repDataPath)
logger.Info("Read repdata", "path", repDataPath)
}
}
if len(data) == 0 {
Expand All @@ -341,14 +341,14 @@ func (rp *RepData) readFromJSON(vodFS fs.FS, repDataDir, assetPath string) (bool
if err := json.Unmarshal(data, &rp); err != nil {
return true, err
}
err = rp.addRegExpAndInit(vodFS, assetPath)
err = rp.addRegExpAndInit(logger, vodFS, assetPath)
if err != nil {
return true, fmt.Errorf("addRegExpAndInit: %w", err)
}
return true, nil
}

func (rp *RepData) addRegExpAndInit(vodFS fs.FS, assetPath string) error {
func (rp *RepData) addRegExpAndInit(logger *slog.Logger, vodFS fs.FS, assetPath string) error {
switch {
case strings.Contains(rp.MediaURI, "$Number$"):
rexStr := strings.ReplaceAll(rp.MediaURI, "$Number$", `(\d+)`)
Expand All @@ -361,7 +361,7 @@ func (rp *RepData) addRegExpAndInit(vodFS fs.FS, assetPath string) error {
}

if rp.ContentType != "image" {
err := rp.readInit(vodFS, assetPath)
err := rp.readInit(logger, vodFS, assetPath)
if err != nil {
return err
}
Expand All @@ -370,7 +370,8 @@ func (rp *RepData) addRegExpAndInit(vodFS fs.FS, assetPath string) error {
}

// writeToJSON writes the representation data to a gzipped JSON file.
func (rp *RepData) writeToJSON(repDataDir, assetPath string) error {
func (rp *RepData) writeToJSON(logger *slog.Logger, repDataDir, assetPath string) error {
logger = logger.With("rep", rp.ID, "assetPath", assetPath)
if repDataDir == "" {
return nil
}
Expand All @@ -397,7 +398,7 @@ func (rp *RepData) writeToJSON(repDataDir, assetPath string) error {
if err != nil {
return err
}
slog.Debug("Wrote repData", "path", gzipPath)
logger.Debug("Wrote repData", "path", gzipPath)
return nil
}

Expand Down Expand Up @@ -592,7 +593,8 @@ func (a *asset) setReferenceRep() error {
}

// consolidateAsset sets up reference track and loop duration if possible
func (a *asset) consolidateAsset() error {
func (a *asset) consolidateAsset(logger *slog.Logger) error {
logger = logger.With("assetPath", a.AssetPath)
err := a.setReferenceRep()
if err != nil {
return fmt.Errorf("setReferenceRep: %w", err)
Expand All @@ -603,15 +605,20 @@ func (a *asset) consolidateAsset() error {
// This is not an integral number of milliseconds, so we should drop this asset
return fmt.Errorf("cannot match loop duration %d for asset %s rep %s", a.LoopDurMS, a.AssetPath, refRep.ID)
}
badPreEncrypted := false
for _, rep := range a.Reps {
if rep.ContentType != refRep.ContentType {
if rep.ContentType != refRep.ContentType && !rep.PreEncrypted {
continue
}
repDurMS := 1000 * rep.duration() / rep.MediaTimescale
if repDurMS != a.LoopDurMS {
slog.Info("Rep duration differs from loop duration", "rep", rep.ID, "asset", a.AssetPath)
logger.Warn("Duration differs", "representation", rep.ID, "referenceRepresentation", refRep.ID, "refDurMS", a.LoopDurMS, "repDurMS", repDurMS)
badPreEncrypted = true
}
}
if badPreEncrypted {
return fmt.Errorf("pre-encrypted representations do not all have same duration")
}
return nil
}

Expand Down Expand Up @@ -741,7 +748,7 @@ func prepareForEncryption(codec string) bool {
return strings.HasPrefix(codec, "avc") || strings.HasPrefix(codec, "mp4a.40")
}

func (r *RepData) readInit(vodFS fs.FS, assetPath string) error {
func (r *RepData) readInit(logger *slog.Logger, vodFS fs.FS, assetPath string) error {
data, err := fs.ReadFile(vodFS, path.Join(assetPath, r.InitURI))
if err != nil {
return fmt.Errorf("read initURI %q: %w", r.InitURI, err)
Expand All @@ -757,7 +764,7 @@ func (r *RepData) readInit(vodFS fs.FS, assetPath string) error {

if prepareForEncryption(r.Codecs) {
assetName := path.Base(assetPath)
err = r.addEncryption(assetName, data)
err = r.addEncryption(logger, assetName, data)
if err != nil {
return fmt.Errorf("addEncryption: %w", err)
}
Expand All @@ -774,25 +781,33 @@ func (r *RepData) readInit(vodFS fs.FS, assetPath string) error {
return nil
}

func (r *RepData) addEncryption(assetName string, data []byte) error {
func (r *RepData) addEncryption(logger *slog.Logger, assetName string, data []byte) error {
// Set up the encryption data for this representation given asset
ed := repEncData{}
ed.keyID = kidFromString(assetName)
ed.key = kidToKey(ed.keyID)
ed.iv = defaultIV

// Generate cbcs data or exit if already encrypted
// Generate cbcs data, but exit if already encrypted
initSeg, err := getInitSeg(data)
if err != nil {
return fmt.Errorf("decode init: %w", err)
}
stsd := initSeg.Moov.Trak.Mdia.Minf.Stbl.Stsd
for _, c := range stsd.Children {
switch c.Type() {
case "encv", "enca":
slog.Info("asset", assetName, "repID", r.ID, "Init segment already encrypted")
r.PreEncrypted = true
return nil
switch box := c.(type) {
case *mp4.VisualSampleEntryBox:
if box.Type() == "encv" && box.Sinf != nil && box.Sinf.Schm != nil {
logger.Info("Video pre-encrypted", "repID", r.ID, "scheme", box.Sinf.Schm.SchemeType, "init", r.InitURI)
r.PreEncrypted = true
return nil
}
case *mp4.AudioSampleEntryBox:
if box.Type() == "enca" && box.Sinf != nil && box.Sinf.Schm != nil {
logger.Info("Video pre-encrypted", "repID", r.ID, "scheme", box.Sinf.Schm.SchemeType, "init", r.InitURI)
r.PreEncrypted = true
return nil
}
}
}
kid, err := mp4.NewUUIDFromHex(hex.EncodeToString(ed.keyID[:]))
Expand All @@ -803,7 +818,7 @@ func (r *RepData) addEncryption(assetName string, data []byte) error {
if err != nil {
return fmt.Errorf("init protect cbcs: %w", err)
}
slog.Info("Encrypted init segment with cbcs", "asset", assetName, "repID", r.ID)
logger.Info("Generate init segment for encryption", "scheme", "cbcs", "repID", r.ID)
ed.cbcsPD = ipd
ed.cbcsInitSeg = initSeg
ed.cbcsInitBytes, err = getInitBytes(initSeg)
Expand All @@ -820,7 +835,7 @@ func (r *RepData) addEncryption(assetName string, data []byte) error {
if err != nil {
return fmt.Errorf("init protect cenc: %w", err)
}
slog.Info("Encrypted init segment with cenc", "asset", assetName, "repID", r.ID)
logger.Info("Generate init segment for encryption", "scheme", "cenc", "repID", r.ID)
ed.cencPD = ipd
ed.cencInitSeg = initSeg
ed.cencInitBytes, err = getInitBytes(initSeg)
Expand Down
6 changes: 4 additions & 2 deletions cmd/livesim2/app/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package app

import (
"log/slog"
"os"
"testing"

Expand All @@ -16,7 +17,8 @@ func TestLoadAsset(t *testing.T) {
vodFS := os.DirFS("testdata")
tmpDir := t.TempDir()
am := newAssetMgr(vodFS, tmpDir, true)
err := am.discoverAssets()
logger := slog.Default()
err := am.discoverAssets(logger)
require.NoError(t, err)
// This was first time
asset, ok := am.findAsset("assets/testpic_2s")
Expand All @@ -31,7 +33,7 @@ func TestLoadAsset(t *testing.T) {
assert.Equal(t, 8000, asset.LoopDurMS)
// Second time we load using gzipped repData files
am = newAssetMgr(vodFS, tmpDir, true)
err = am.discoverAssets()
err = am.discoverAssets(logger)
require.NoError(t, err)
asset, ok = am.findAsset("assets/testpic_2s")
require.True(t, ok)
Expand Down
Loading

0 comments on commit 21bce56

Please sign in to comment.