Skip to content

Commit

Permalink
Implement proper heartbeat for video streams
Browse files Browse the repository at this point in the history
Signed-off-by: Jacki <[email protected]>
  • Loading branch information
TheJackiMonster committed Aug 16, 2024
1 parent 234c159 commit 96b277f
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 11 deletions.
131 changes: 121 additions & 10 deletions src/discourse.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@ discourse_subscription_stream_message(MESSENGER_DiscourseSubscriptionInfo *info,

buffer = NULL;

if ((info->last_timestamp == timestamp) ||
GDateTime *dt = g_date_time_new_now_local();

if ((!payload_len) || (info->last_timestamp == timestamp) ||
((!(info->last_timestamp)) && (!(info->position))))
goto skip_buffer;

Expand Down Expand Up @@ -437,7 +439,6 @@ discourse_subscription_stream_message(MESSENGER_DiscourseSubscriptionInfo *info,

info->position += duration;

GDateTime *dt = g_date_time_new_now_local();
pthread_mutex_lock(&(info->mutex));

if (info->end_datetime)
Expand All @@ -447,15 +448,27 @@ discourse_subscription_stream_message(MESSENGER_DiscourseSubscriptionInfo *info,
}

if (dt)
{
info->end_datetime = g_date_time_add_seconds(dt, (gdouble) duration / clockrate);
g_date_time_unref(dt);
}
info->end_datetime = g_date_time_add_seconds(dt, 0.1 + (gdouble) duration / clockrate);

pthread_mutex_unlock(&(info->mutex));

skip_buffer:
info->last_timestamp = timestamp;
if (payload_len)
info->last_timestamp = timestamp;
else if (dt)
{
pthread_mutex_lock(&(info->mutex));

if (info->end_datetime)
g_date_time_unref(info->end_datetime);

info->end_datetime = g_date_time_add_seconds(dt, 0.1);

pthread_mutex_unlock(&(info->mutex));
}

if (dt)
g_date_time_unref(dt);

if (buffer)
gst_buffer_unref(buffer);
Expand All @@ -464,6 +477,46 @@ discourse_subscription_stream_message(MESSENGER_DiscourseSubscriptionInfo *info,
return;
}

static gboolean
_discourse_video_heartbeat(gpointer user_data)
{
MESSENGER_DiscourseInfo *info = (MESSENGER_DiscourseInfo*) user_data;

info->heartbeat = 0;

GstBuffer *buffer = gst_buffer_new();

if (!buffer)
return FALSE;

GstFlowReturn ret = GST_FLOW_ERROR;

gst_rtp_buffer_allocate_data(
buffer,
0,
0,
0
);

g_signal_emit_by_name(
info->video_heartbeat_source,
"push-buffer",
buffer,
&ret
);

if (buffer)
gst_buffer_unref(buffer);

info->heartbeat = util_timeout_add(
100,
G_SOURCE_FUNC(_discourse_video_heartbeat),
info
);

return FALSE;
}

static gboolean
discourse_subscription_link_widget(MESSENGER_DiscourseSubscriptionInfo *info,
GtkContainer *container)
Expand Down Expand Up @@ -602,9 +655,7 @@ _setup_video_gst_pipelines(MESSENGER_DiscourseInfo *info)
"video/x-raw,width=1280,height=720 ! videoconvert ! video/x-raw,format=I420 ! "
"x264enc bitrate=1000 speed-preset=fast bframes=0 key-int-max=30 tune=zerolatency byte-stream=true ! "
"video/x-h264,profile=baseline ! rtph264pay aggregate-mode=zero-latency mtu=45000 ! "
"tee ! queue ! capsfilter name=filter ! fdsink name=sink "
"fakesrc num-buffers=-1 sizetype=empty filltype=zero ! "
"video/x-h264,stream-format=avc,alignment=au ! rtph264pay ! mux.",
"tee ! queue ! rtpmux name=mux ! capsfilter name=filter ! fdsink name=sink",
NULL
);

Expand All @@ -616,6 +667,28 @@ _setup_video_gst_pipelines(MESSENGER_DiscourseInfo *info)
GST_BIN(info->video_record_pipeline), "sink"
);

GstElement *mux = gst_bin_get_by_name(
GST_BIN(info->video_record_pipeline),
"mux"
);

info->video_heartbeat_source = gst_element_factory_make("appsrc", NULL);

{
gst_bin_add(
GST_BIN(info->video_record_pipeline),
info->video_heartbeat_source
);

GstPad *mux_pad = gst_element_request_pad_simple(mux, "sink_%u");

GstPad *pad = gst_element_get_static_pad(
info->video_heartbeat_source, "src"
);

gst_pad_link(pad, mux_pad);
}

GstElement *filter = gst_bin_get_by_name(
GST_BIN(info->video_record_pipeline), "filter"
);
Expand All @@ -635,6 +708,7 @@ _setup_video_gst_pipelines(MESSENGER_DiscourseInfo *info)
NULL
);

g_object_set(info->video_heartbeat_source, "caps", caps, NULL);
g_object_set(filter, "caps", caps, NULL);
gst_caps_unref(caps);

Expand Down Expand Up @@ -665,13 +739,15 @@ discourse_create_info(struct GNUNET_CHAT_Discourse *discourse)
info->video_record_pipeline = NULL;
info->video_record_source = NULL;
info->video_record_sink = NULL;
info->video_heartbeat_source = NULL;

info->audio_mix_pipeline = NULL;
info->audio_mix_element = NULL;
info->audio_volume_element = NULL;

pthread_mutex_init(&(info->mutex), NULL);

info->heartbeat = 0;
info->subscriptions = NULL;

const struct GNUNET_ShortHashCode *id = GNUNET_CHAT_discourse_get_id(
Expand Down Expand Up @@ -715,9 +791,30 @@ discourse_destroy_info(struct GNUNET_CHAT_Discourse *discourse)

pthread_mutex_unlock(&(info->mutex));

if (info->heartbeat)
util_source_remove(info->heartbeat);

if (info->video_record_pipeline)
{
gst_element_set_state(info->video_record_pipeline, GST_STATE_NULL);

GstElement *mux = gst_bin_get_by_name(
GST_BIN(info->video_record_pipeline),
"mux"
);

GstPad *mux_pad = gst_element_request_pad_simple(mux, "sink_%u");
GstPad *pad = gst_element_get_static_pad(
info->video_heartbeat_source, "src"
);

gst_pad_unlink(pad, mux_pad);

gst_bin_remove(
GST_BIN(info->video_record_pipeline),
info->video_heartbeat_source
);

gst_object_unref(GST_OBJECT(info->video_record_pipeline));
}

Expand Down Expand Up @@ -923,13 +1020,27 @@ discourse_set_mute(struct GNUNET_CHAT_Discourse *discourse,
if (!info)
return;

if ((mute) && (info->heartbeat))
{
util_source_remove(info->heartbeat);
info->heartbeat = 0;
}

const GstState state = mute? GST_STATE_NULL : GST_STATE_PLAYING;

if (info->audio_record_pipeline)
gst_element_set_state(info->audio_record_pipeline, state);

if (info->video_record_pipeline)
{
gst_element_set_state(info->video_record_pipeline, state);

if ((!mute) && (!(info->heartbeat)))
info->heartbeat = util_idle_add(
G_SOURCE_FUNC(_discourse_video_heartbeat),
info
);
}
}

bool
Expand Down
4 changes: 3 additions & 1 deletion src/discourse.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ typedef struct MESSENGER_DiscourseInfo
GstElement *video_record_pipeline;
GstElement *video_record_source;
GstElement *video_record_sink;
GstElement *video_heartbeat_source;

GstElement *audio_mix_pipeline;
GstElement *audio_mix_element;
GstElement *audio_volume_element;

pthread_mutex_t mutex;

guint heartbeat;

GList *subscriptions;
} MESSENGER_DiscourseInfo;

Expand Down

0 comments on commit 96b277f

Please sign in to comment.