Skip to content

Commit

Permalink
feat: optimized cmaf-ingest to send content-length if possible
Browse files Browse the repository at this point in the history
  • Loading branch information
tobbee committed Jan 13, 2025
1 parent 341cd80 commit c3baf65
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 31 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- CMAF ingest of full segments now send Content-Length header

### Fixed

- endNumber in live MPD (Issue #235)
Expand Down
124 changes: 93 additions & 31 deletions cmd/livesim2/app/cmaf-ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/http/httptest"
"path"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -50,6 +51,7 @@ type cmafIngester struct {
dur *int
nrSegsToSend *int // calculate from dur and segDur
streamsURLs bool
useChunked bool
cfg *ResponseConfig
asset *asset
repsData []cmafRepData
Expand Down Expand Up @@ -169,6 +171,7 @@ func (cm *cmafIngesterMgr) NewCmafIngester(req CmafIngesterSetup) (nr uint64, er
testNowMS: req.TestNowMS,
dur: req.Duration,
streamsURLs: req.StreamsURLs,
useChunked: cfg.ChunkDurS != nil,
log: log,
cfg: cfg,
asset: asset,
Expand Down Expand Up @@ -463,13 +466,16 @@ func (c *cmafIngester) sendInitSegment(ctx context.Context, rd cmafRepData, rawI
if err != nil {
return fmt.Errorf("error creating request: %w", err)
}
setIngestHeader(req)
req.Header.Set("Content-Type", rd.mimeType)
req.Header.Set("Connection", "keep-alive")
if c.user != "" || c.passWord != "" {
req.SetBasicAuth(c.user, c.passWord)
}
slog.Info("Sending init segment", "fileName", fileName, "url", url)
setReqHeaders(req, rd.contentType, c.user, c.passWord)
err = c.sendRequest(ctx, req, url)
if err != nil {
return fmt.Errorf("error sending request: %w", err)
}
return nil
}

func (c *cmafIngester) sendRequest(ctx context.Context, req *http.Request, url string) error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("Error sending request: %w", err)
Expand All @@ -483,15 +489,32 @@ func (c *cmafIngester) sendInitSegment(ctx context.Context, rd cmafRepData, rawI
return fmt.Errorf("bad status code: %d", resp.StatusCode)
}
defer func() {
slog.Debug("Closing body", "filename", fileName)
resp.Body.Close()
}()
return nil
}

func setReqHeaders(req *http.Request, contentType, user, password string) {
req.Header.Set("Connection", "keep-alive")
if user != "" && password != "" {
req.SetBasicAuth(user, password)
}
switch contentType {
case "video":
req.Header.Set("Content-Type", "video/mp4")
case "audio":
req.Header.Set("Content-Type", "audio/mp4")
case "text":
req.Header.Set("Content-Type", "application/mp4")
default:
slog.Warn("unknown content type", "type", contentType)
}
setIngestHeader(req)
}

// sendMediaSegments sends all media segments for all representations. isLast triggers lmsg insertion.
func (c *cmafIngester) sendMediaSegments(ctx context.Context, nextSegNr, nowMS int, isLast bool) error {
c.log.Debug("Start media segment", "nr", nextSegNr, "nowMS", nowMS)
c.log.Debug("Start media segment", "nr", nextSegNr, "nowMS", nowMS, "useChunked", c.useChunked)
wTimes := calcWrapTimes(c.asset, c.cfg, nowMS+50, m.Duration(100*time.Millisecond))
wg := sync.WaitGroup{}
if c.cfg.SegTimelineFlag {
Expand Down Expand Up @@ -544,7 +567,7 @@ func (c *cmafIngester) sendMediaSegment(ctx context.Context, wg *sync.WaitGroup,
defer wg.Done()

u := fmt.Sprintf("%s/%s", c.dest(), segPath)
c.log.Info("send media segment", "path", segPath, "segNr", segNr, "nowMS", nowMS, "url", u)
c.log.Info("send media segment", "path", segPath, "segNr", segNr, "nowMS", nowMS, "url", u, "chunked", c.useChunked)

nrBytesCh := make(chan int)
defer close(nrBytesCh)
Expand All @@ -553,10 +576,12 @@ func (c *cmafIngester) sendMediaSegment(ctx context.Context, wg *sync.WaitGroup,
finishedSendCh := make(chan struct{})
defer close(finishedSendCh)

src := newCmafSource(nrBytesCh, writeMoreCh, c.log, u, contentType, c.user, c.passWord)
src := newCmafSource(nrBytesCh, writeMoreCh, c.log, u, contentType, c.user, c.passWord, c.useChunked)

// Create media segment based on number and send it to segPath
go src.startReadAndSend(ctx, finishedSendCh)
if c.useChunked {
go src.startReadAndSendChunked(ctx, finishedSendCh)
}
code, err := writeSegment(ctx, src, c.log, c.cfg, c.mgr.s.Cfg.DrmCfg, c.mgr.s.assetMgr.vodFS,
c.asset, segPart, nowMS, c.mgr.s.textTemplates, isLast)
c.log.Info("writeSegment", "code", code, "err", err)
Expand All @@ -579,13 +604,32 @@ func (c *cmafIngester) sendMediaSegment(ctx context.Context, wg *sync.WaitGroup,
return
}
}
<-writeMoreCh // Capture final message
nrBytesCh <- -1 // Signal that we are done to Read (that reads and pushes to remote)
<-finishedSendCh
if c.useChunked {
<-writeMoreCh // Capture final message
nrBytesCh <- -1 // Signal that we are done to Read (that reads and pushes to remote)
<-finishedSendCh
} else {
// Write should have written everything to a c.buffer
req, err := http.NewRequestWithContext(ctx, "PUT", u, src.buffer)
if err != nil {
c.log.Error("creating request", "err", err)
http.Error(src, "writeSegment", http.StatusInternalServerError)
return
}
src.setReqHeaders(req)
slog.Info("Sending media segment", "url", u)
err = c.sendRequest(ctx, req, u)
if err != nil {
c.log.Error("sending request", "err", err)
http.Error(src, "writeSegment", http.StatusInternalServerError)
return
}
}
}

// cmafSource intermediates HTTP response writer and client push writer
// It provides a Read method that the client can use to read the data.
// If useChunked, the data is sent in chunks, otherwise as a whole using Content-Length.
type cmafSource struct {
ctx context.Context
req *http.Request
Expand All @@ -597,28 +641,33 @@ type cmafSource struct {
status int
log *slog.Logger
buf []byte
buffer *bytes.Buffer
bufLevel int // Keeping track of local buffer
offset int // Offset in local buffer
user string
password string
useChunked bool
}

func newCmafSource(nrBytesCh chan int, writeMoreCh chan struct{}, log *slog.Logger, url string, contentType, user, password string) *cmafSource {
func newCmafSource(nrBytesCh chan int, writeMoreCh chan struct{}, log *slog.Logger, url string, contentType, user, password string, useChunked bool) *cmafSource {
cs := cmafSource{
url: url,
contentType: contentType,
h: make(http.Header),
log: log,
buf: make([]byte, 1024*64),
nrBytesCh: nrBytesCh,
writeMoreCh: writeMoreCh,
user: user,
password: password,
useChunked: useChunked,
}
if useChunked {
cs.buf = make([]byte, 64*1024)
}
return &cs
}

func (cs *cmafSource) startReadAndSend(ctx context.Context, finishedCh chan struct{}) {
func (cs *cmafSource) startReadAndSendChunked(ctx context.Context, finishedCh chan struct{}) {
cs.writeMoreCh <- struct{}{} // Get the writer going
cs.ctx = ctx
req, err := http.NewRequestWithContext(ctx, "PUT", cs.url, cs)
Expand Down Expand Up @@ -649,21 +698,8 @@ func (cs *cmafSource) startReadAndSend(ctx context.Context, finishedCh chan stru
}

func (cs *cmafSource) setReqHeaders(req *http.Request) {
setReqHeaders(req, cs.contentType, cs.user, cs.password)
req.Header.Set("Connection", "keep-alive")
if cs.user != "" || cs.password != "" {
req.SetBasicAuth(cs.user, cs.password)
}
setIngestHeader(req)
switch cs.contentType {
case "video":
req.Header.Set("Content-Type", "video/mp4")
case "audio":
req.Header.Set("Content-Type", "audio/mp4")
case "text":
req.Header.Set("Content-Type", "application/mp4")
default:
cs.log.Warn("unknown content type", "type", cs.contentType)
}
}

func (cs *cmafSource) Header() http.Header {
Expand All @@ -675,6 +711,32 @@ func (cs *cmafSource) Flush() {
}

func (cs *cmafSource) Write(b []byte) (int, error) {
if !cs.useChunked {
contentLength := -1 // Set to -1 to signal that we have checked once
if cl, ok := cs.h["Content-Length"]; ok {
cl, err := strconv.Atoi(cl[0])
if err != nil {
cs.log.Error("Content-Length", "err", err)
}
contentLength = cl
}
if contentLength <= 0 {
return 0, fmt.Errorf("bad content length: %d", contentLength)
}
if len(b) != contentLength {
cs.log.Warn("Content-Length mismatch", "length", len(b), "contentLength", contentLength)
}
if cs.buffer == nil {
cs.buffer = bytes.NewBuffer(make([]byte, 0, contentLength))
} else {
cs.buffer.Reset()
}
n, err := cs.buffer.Write(b)
if err != nil {
cs.log.Error("Write", "err", err)
}
return n, err
}
<-cs.writeMoreCh
if cs.offset != 0 || cs.bufLevel != 0 {
cs.log.Warn("bad write levels", "url", cs.url, "offset", cs.offset, "bufLevel", cs.bufLevel)
Expand Down

0 comments on commit c3baf65

Please sign in to comment.