Skip to content

Commit

Permalink
fix(concat): fix stream align + input validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkness4 committed Aug 21, 2024
1 parent 1ad2e67 commit 9cbf9fc
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
36 changes: 31 additions & 5 deletions video/concat/concat.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ int concat(void *ctx, const char *output_file, size_t input_files_count,
// stream_mapping is mapping from input stream index to output stream index.
int **stream_mapping = NULL;
int *stream_mapping_size = NULL;
enum AVMediaType *stream_mapping_codec = NULL;

// last_pts and last_dts used for concatenation. Size is
// input_files_count*stream_mapping_size.
Expand Down Expand Up @@ -175,6 +176,15 @@ int concat(void *ctx, const char *output_file, size_t input_files_count,
ret = AVERROR(ENOMEM);
goto end;
}
if (input_idx == 0) {
stream_mapping_codec =
arena_alloc(&arena, stream_mapping_size[input_idx] *
sizeof(*stream_mapping_codec));
if (!stream_mapping_codec) {
ret = AVERROR(ENOMEM);
goto end;
}
}
dts_offset = arena_alloc(&arena, stream_mapping_size[input_idx] *
sizeof(*dts_offset));
if (!dts_offset) {
Expand Down Expand Up @@ -217,7 +227,22 @@ int concat(void *ctx, const char *output_file, size_t input_files_count,
continue;
}

stream_mapping[input_idx][i] = stream_index++;
if (input_idx == 0) { // Input 0 gets to choose the mapping.
stream_mapping[input_idx][i] = stream_index;
stream_mapping_codec[stream_index] = in_codecpar->codec_type;
stream_index++;
} else {
// Find the stream in the first input that matches the current stream.
for (unsigned int j = 0; j < stream_mapping_size[0]; j++) {
if (stream_mapping[0][j] >= 0 &&
stream_mapping_codec[stream_mapping[0][j]] ==
in_codecpar->codec_type) {
stream_mapping[input_idx][i] = stream_mapping[0][j];
break;
}
}
}

const int out_stream_index = stream_mapping[input_idx][i];
fprintf(stderr, "Input %zu, mapping stream %d (%s) to output stream %d\n",
input_idx, i, av_get_media_type_string(in_codecpar->codec_type),
Expand Down Expand Up @@ -303,14 +328,15 @@ int concat(void *ctx, const char *output_file, size_t input_files_count,

fix_ts(dts_offset, prev_dts, prev_duration, input_idx, pkt);

if ((ret = av_interleaved_write_frame(ofmt_ctx, pkt)) < 0) {
ret = av_interleaved_write_frame(ofmt_ctx, pkt);
/* pkt is now blank (av_interleaved_write_frame() takes ownership of
* its contents and resets pkt), so that no unreferencing is
* necessary. This would be different if one used av_write_frame(). */
if (ret < 0) {
fprintf(stderr, "Error writing packet to output file: %s\n",
av_err2str(ret));
av_packet_unref(pkt);
break;
}

av_packet_unref(pkt);
} // while packets.

goTraceProcessInputEnd(span);
Expand Down
39 changes: 31 additions & 8 deletions video/concat/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,22 @@ func applyOptions(opts []Option) *Options {
func Do(ctx context.Context, output string, inputs []string, opts ...Option) error {
o := applyOptions(opts)

attrs := make([]attribute.KeyValue, 0, len(inputs))
for idx, input := range inputs {
// Check if all files are valid
validInputs := make([]string, 0, len(inputs))
for _, input := range inputs {
if err := probe.Do([]string{input}); err != nil {
log.Err(err).Str("input", input).Msg("input is invalid")
}
validInputs = append(validInputs, input)
}

if len(validInputs) == 0 {
log.Warn().Msg("no valid inputs")
return nil
}

attrs := make([]attribute.KeyValue, 0, len(validInputs))
for idx, input := range validInputs {
attrs = append(attrs, attribute.String(fmt.Sprintf("input%d", idx), input))
}
attrs = append(attrs, attribute.String("output", output))
Expand All @@ -119,14 +133,15 @@ func Do(ctx context.Context, output string, inputs []string, opts ...Option) err
log.Info().Str("output", output).Strs("inputs", inputs).Any("options", o).Msg("concat")

// If mixed formats (adts vs asc), we should remux the others first using intermediates or FIFO
if areFormatMixed(inputs) {
i, useFIFO, err := remuxMixedTS(ctx, inputs, opts...)
if areFormatMixed(validInputs) {
log.Warn().Msg("mixed formats detected, using intermediates or FIFO to remux files first")
i, useFIFO, err := remuxMixedTS(ctx, validInputs, opts...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
inputs = i
validInputs = i

if !useFIFO {
// Delete intermediates
Expand All @@ -144,12 +159,12 @@ func Do(ctx context.Context, output string, inputs []string, opts ...Option) err
}
}

inputsC := C.malloc(C.size_t(len(inputs)) * C.size_t(unsafe.Sizeof(uintptr(0))))
inputsC := C.malloc(C.size_t(len(validInputs)) * C.size_t(unsafe.Sizeof(uintptr(0))))
defer C.free(inputsC)
// convert the C array to a Go Array so we can index it
inputsCIndexable := (*[1<<30 - 1]*C.char)(inputsC)

for idx, input := range inputs {
for idx, input := range validInputs {
cInput := C.CString(input)
defer C.free(unsafe.Pointer(cInput))
inputsCIndexable[idx] = cInput
Expand All @@ -161,7 +176,7 @@ func Do(ctx context.Context, output string, inputs []string, opts ...Option) err
cOutput := C.CString(output)
defer C.free(unsafe.Pointer(cOutput))

if err := C.concat(ctxp, cOutput, C.size_t(len(inputs)), (**C.char)(inputsC), C.int(o.audioOnly)); err != 0 {
if err := C.concat(ctxp, cOutput, C.size_t(len(validInputs)), (**C.char)(inputsC), C.int(o.audioOnly)); err != 0 {
if err == C.AVERROR_EOF {
return nil
}
Expand Down Expand Up @@ -266,6 +281,14 @@ func WithPrefix(ctx context.Context, remuxFormat string, prefix string, opts ...
continue
}

// Ignore files without video or audio
if ok, err := probe.ContainsVideoOrAudio(filepath.Join(path, de.Name())); err != nil {
log.Err(err).Msg("failed to probe file to determine format")
continue
} else if !ok {
continue
}

names = append(names, de.Name())
}

Expand Down
2 changes: 1 addition & 1 deletion video/concat/concat_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func TestDo(t *testing.T) {
ctx := context.Background()
shut, err := telemetry.SetupOTELSDK(ctx, telemetry.WithStdout())
shut, err := telemetry.SetupOTELSDK(ctx)
defer shut(ctx)
require.NoError(t, err)
err = concat.Do(ctx, "output.mp4", []string{"input.ts", "input.mp4"})
Expand Down

0 comments on commit 9cbf9fc

Please sign in to comment.