Skip to content

Commit

Permalink
playback: improve /list response time (#3637)
Browse files Browse the repository at this point in the history
Response times of the /list endpoint were slow because the duration of
each segment was computed from scratch by summing the duration of each
of its parts.

This is improved by storing the duration of the overall segment in the
header and using that, if available.
  • Loading branch information
aler9 committed Jan 2, 2025
1 parent fc803da commit a2dc7fc
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 34 deletions.
4 changes: 2 additions & 2 deletions internal/playback/on_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func seekAndMux(
}
defer f.Close()

firstInit, err = segmentFMP4ReadInit(f)
firstInit, _, err = segmentFMP4ReadHeader(f)
if err != nil {
return err
}
Expand All @@ -81,7 +81,7 @@ func seekAndMux(
defer f.Close()

var init *fmp4.Init
init, err = segmentFMP4ReadInit(f)
init, _, err = segmentFMP4ReadHeader(f)
if err != nil {
return err
}
Expand Down
30 changes: 17 additions & 13 deletions internal/playback/on_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type listEntry struct {
URL string `json:"url"`
}

func computeDurationAndConcatenate(
func readDurationAndConcatenate(
recordFormat conf.RecordFormat,
segments []*recordstore.Segment,
) ([]listEntry, error) {
Expand All @@ -45,19 +45,23 @@ func computeDurationAndConcatenate(
}
defer f.Close()

init, err := segmentFMP4ReadInit(f)
init, duration, err := segmentFMP4ReadHeader(f)
if err != nil {
return err
}

_, err = f.Seek(0, io.SeekStart)
if err != nil {
return err
}

maxDuration, err := segmentFMP4ReadDuration(f, init)
if err != nil {
return err
// if duration is not present in the header, compute it
// by parsing each part
if duration == 0 {
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return err
}

Check warning on line 59 in internal/playback/on_list.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/on_list.go#L58-L59

Added lines #L58 - L59 were not covered by tests

duration, err = segmentFMP4ReadDurationFromParts(f, init)
if err != nil {
return err
}

Check warning on line 64 in internal/playback/on_list.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/on_list.go#L63-L64

Added lines #L63 - L64 were not covered by tests
}

if len(out) != 0 && segmentFMP4CanBeConcatenated(
Expand All @@ -66,12 +70,12 @@ func computeDurationAndConcatenate(
init,
seg.Start) {
prevStart := out[len(out)-1].Start
curEnd := seg.Start.Add(maxDuration)
curEnd := seg.Start.Add(duration)
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
} else {
out = append(out, listEntry{
Start: seg.Start,
Duration: listEntryDuration(maxDuration),
Duration: listEntryDuration(duration),
})
}

Expand Down Expand Up @@ -137,7 +141,7 @@ func (s *Server) onList(ctx *gin.Context) {
return
}

entries, err := computeDurationAndConcatenate(pathConf.RecordFormat, segments)
entries, err := readDurationAndConcatenate(pathConf.RecordFormat, segments)
if err != nil {
s.writeError(ctx, http.StatusInternalServerError, err)
return
Expand Down
2 changes: 1 addition & 1 deletion internal/playback/on_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestOnListFiltered(t *testing.T) {

s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.StringDuration(10 * time.Second),
ReadTimeout: conf.Duration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Name: "mypath",
Expand Down
42 changes: 27 additions & 15 deletions internal/playback/segment_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,61 +60,73 @@ func segmentFMP4CanBeConcatenated(
!curStart.After(prevEnd.Add(concatenationTolerance))
}

func segmentFMP4ReadInit(r io.ReadSeeker) (*fmp4.Init, error) {
func segmentFMP4ReadHeader(r io.ReadSeeker) (*fmp4.Init, time.Duration, error) {
buf := make([]byte, 8)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
return nil, 0, err

Check warning on line 67 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L67

Added line #L67 was not covered by tests
}

// find ftyp
// find and skip ftyp

if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) {
return nil, fmt.Errorf("ftyp box not found")
return nil, 0, fmt.Errorf("ftyp box not found")

Check warning on line 73 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L73

Added line #L73 was not covered by tests
}

ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])

_, err = r.Seek(int64(ftypSize), io.SeekStart)
if err != nil {
return nil, err
return nil, 0, err

Check warning on line 80 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L80

Added line #L80 was not covered by tests
}

// find moov
// find and skip moov

_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
return nil, 0, err

Check warning on line 87 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L87

Added line #L87 was not covered by tests
}

if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) {
return nil, fmt.Errorf("moov box not found")
return nil, 0, fmt.Errorf("moov box not found")

Check warning on line 91 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L91

Added line #L91 was not covered by tests
}

moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])

_, err = r.Seek(0, io.SeekStart)
_, err = r.Seek(8, io.SeekCurrent)
if err != nil {
return nil, err
return nil, 0, err

Check warning on line 98 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L98

Added line #L98 was not covered by tests
}

buf = make([]byte, ftypSize+moovSize)
// read mvhd

var mvhd mp4.Mvhd
mvhdSize, err := mp4.Unmarshal(r, uint64(moovSize-8), &mvhd, mp4.Context{})
if err != nil {
return nil, 0, err
}

Check warning on line 107 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L106-L107

Added lines #L106 - L107 were not covered by tests

d := time.Duration(mvhd.DurationV0) * time.Second / time.Duration(mvhd.Timescale)

// read tracks

buf = make([]byte, uint64(moovSize)-16-mvhdSize)

_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
return nil, 0, err

Check warning on line 117 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L117

Added line #L117 was not covered by tests
}

var init fmp4.Init
err = init.Unmarshal(bytes.NewReader(buf))
if err != nil {
return nil, err
return nil, 0, err

Check warning on line 123 in internal/playback/segment_fmp4.go

View check run for this annotation

Codecov / codecov/patch

internal/playback/segment_fmp4.go#L123

Added line #L123 was not covered by tests
}

return &init, nil
return &init, d, nil
}

func segmentFMP4ReadDuration(
func segmentFMP4ReadDurationFromParts(
r io.ReadSeeker,
init *fmp4.Init,
) (time.Duration, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/playback/segment_fmp4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func BenchmarkFMP4ReadInit(b *testing.B) {
}
defer f.Close()

_, err = segmentFMP4ReadInit(f)
_, _, err = segmentFMP4ReadHeader(f)
if err != nil {
panic(err)
}
Expand Down
77 changes: 75 additions & 2 deletions internal/recorder/format_fmp4_segment.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package recorder

import (
"bytes"
"fmt"
"io"
"os"
"time"

"github.com/abema/go-mp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"

Expand All @@ -31,6 +34,70 @@ func writeInit(f io.Writer, tracks []*formatFMP4Track) error {
return err
}

func writeDuration(f *os.File, d time.Duration) error {
_, err := f.Seek(0, io.SeekStart)
if err != nil {
return err
}

Check warning on line 41 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L40-L41

Added lines #L40 - L41 were not covered by tests

// find and skip ftyp

buf := make([]byte, 8)
_, err = io.ReadFull(f, buf)
if err != nil {
return err
}

Check warning on line 49 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L48-L49

Added lines #L48 - L49 were not covered by tests

if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) {
return fmt.Errorf("ftyp box not found")
}

Check warning on line 53 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L52-L53

Added lines #L52 - L53 were not covered by tests

ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])

_, err = f.Seek(int64(ftypSize), io.SeekStart)
if err != nil {
return err
}

Check warning on line 60 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L59-L60

Added lines #L59 - L60 were not covered by tests

// find and skip moov

_, err = io.ReadFull(f, buf)
if err != nil {
return err
}

Check warning on line 67 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L66-L67

Added lines #L66 - L67 were not covered by tests

if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) {
return fmt.Errorf("moov box not found")
}

Check warning on line 71 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L70-L71

Added lines #L70 - L71 were not covered by tests

moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])

moovPos, err := f.Seek(8, io.SeekCurrent)
if err != nil {
return err
}

Check warning on line 78 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L77-L78

Added lines #L77 - L78 were not covered by tests

var mvhd mp4.Mvhd
_, err = mp4.Unmarshal(f, uint64(moovSize-8), &mvhd, mp4.Context{})
if err != nil {
return err
}

Check warning on line 84 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L83-L84

Added lines #L83 - L84 were not covered by tests

mvhd.DurationV0 = uint32(d / time.Millisecond)

_, err = f.Seek(moovPos, io.SeekStart)
if err != nil {
return err
}

Check warning on line 91 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L90-L91

Added lines #L90 - L91 were not covered by tests

_, err = mp4.Marshal(f, &mvhd, mp4.Context{})
if err != nil {
return err
}

Check warning on line 96 in internal/recorder/format_fmp4_segment.go

View check run for this annotation

Codecov / codecov/patch

internal/recorder/format_fmp4_segment.go#L95-L96

Added lines #L95 - L96 were not covered by tests

return nil
}

type formatFMP4Segment struct {
f *formatFMP4
startDTS time.Duration
Expand All @@ -55,13 +122,19 @@ func (s *formatFMP4Segment) close() error {

if s.fi != nil {
s.f.ri.Log(logger.Debug, "closing segment %s", s.path)
err2 := s.fi.Close()

duration := s.lastDTS - s.startDTS
err2 := writeDuration(s.fi, duration)
if err == nil {
err = err2
}

err2 = s.fi.Close()
if err == nil {
err = err2
}

if err2 == nil {
duration := s.lastDTS - s.startDTS
s.f.ri.rec.OnSegmentComplete(s.path, duration)
}
}
Expand Down

0 comments on commit a2dc7fc

Please sign in to comment.