[GStreamer] AppendPipeline: support dispatch of decryption-specific GstStructure...
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016 Metrological Group B.V.
3  * Copyright (C) 2016 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GRefPtrGStreamer.h"
28 #include "GStreamerMediaDescription.h"
29 #include "GStreamerMediaSample.h"
30 #include "GStreamerUtilities.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35 #include <gst/app/gstappsink.h>
36 #include <gst/app/gstappsrc.h>
37 #include <gst/gst.h>
38 #include <gst/pbutils/pbutils.h>
39 #include <gst/video/video.h>
40 #include <wtf/Condition.h>
41 #include <wtf/glib/GLibUtilities.h>
42 #include <wtf/glib/RunLoopSourcePriority.h>
43
44 GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
45 #define GST_CAT_DEFAULT webkit_mse_debug
46
47 namespace WebCore {
48
49 static const char* dumpAppendState(AppendPipeline::AppendState appendState)
50 {
51     switch (appendState) {
52     case AppendPipeline::AppendState::Invalid:
53         return "Invalid";
54     case AppendPipeline::AppendState::NotStarted:
55         return "NotStarted";
56     case AppendPipeline::AppendState::Ongoing:
57         return "Ongoing";
58     case AppendPipeline::AppendState::KeyNegotiation:
59         return "KeyNegotiation";
60     case AppendPipeline::AppendState::DataStarve:
61         return "DataStarve";
62     case AppendPipeline::AppendState::Sampling:
63         return "Sampling";
64     case AppendPipeline::AppendState::LastSample:
65         return "LastSample";
66     case AppendPipeline::AppendState::Aborting:
67         return "Aborting";
68     default:
69         return "(unknown)";
70     }
71 }
72
73 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);
74 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
75 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
76 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
77 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);
78 #if !LOG_DISABLED
79 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
80 #endif
81 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
82 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
83 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
84
85 static void appendPipelineNeedContextMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
86 {
87     GST_TRACE("received callback");
88     appendPipeline->handleNeedContextSyncMessage(message);
89 }
90
91 static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
92 {
93     appendPipeline->handleApplicationMessage(message);
94 }
95
96 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
97     : m_mediaSourceClient(mediaSourceClient.get())
98     , m_sourceBufferPrivate(sourceBufferPrivate.get())
99     , m_playerPrivate(&playerPrivate)
100     , m_id(0)
101     , m_appsrcAtLeastABufferLeft(false)
102     , m_appsrcNeedDataReceived(false)
103     , m_appsrcDataLeavingProbeId(0)
104     , m_appendState(AppendState::NotStarted)
105     , m_abortPending(false)
106     , m_streamType(Unknown)
107 {
108     ASSERT(WTF::isMainThread());
109
110     GST_TRACE("Creating AppendPipeline (%p)", this);
111
112     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
113     // The track name is still unknown at this time, though.
114     m_pipeline = gst_pipeline_new(nullptr);
115
116     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
117     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
118     gst_bus_enable_sync_message_emission(m_bus.get());
119
120     g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(appendPipelineNeedContextMessageCallback), this);
121     g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
122
123     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
124     // below will already take the initial reference and we need an additional one for us.
125     m_appsrc = gst_element_factory_make("appsrc", nullptr);
126     m_demux = gst_element_factory_make("qtdemux", nullptr);
127     m_appsink = gst_element_factory_make("appsink", nullptr);
128
129     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
130     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
131
132     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
133     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
134
135     setAppsrcDataLeavingProbe();
136
137 #if !LOG_DISABLED
138     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
139     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
140     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
141     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
142     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
143     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
144     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
145 #endif
146
147     // These signals won't be connected outside of the lifetime of "this".
148     g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);
149     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
150     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
151     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
152     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
153
154     // Add_many will take ownership of a reference. That's why we used an assignment before.
155     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
156     gst_element_link(m_appsrc.get(), m_demux.get());
157
158     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
159 };
160
161 AppendPipeline::~AppendPipeline()
162 {
163     ASSERT(WTF::isMainThread());
164
165     {
166         LockHolder locker(m_newSampleLock);
167         setAppendState(AppendState::Invalid);
168         m_newSampleCondition.notifyOne();
169     }
170
171     {
172         LockHolder locker(m_padAddRemoveLock);
173         m_playerPrivate = nullptr;
174         m_padAddRemoveCondition.notifyOne();
175     }
176
177     GST_TRACE("Destroying AppendPipeline (%p)", this);
178
179     // FIXME: Maybe notify appendComplete here?
180
181     if (m_pipeline) {
182         ASSERT(m_bus);
183         gst_bus_remove_signal_watch(m_bus.get());
184         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
185         m_pipeline = nullptr;
186     }
187
188     if (m_appsrc) {
189         removeAppsrcDataLeavingProbe();
190         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
191         m_appsrc = nullptr;
192     }
193
194     if (m_demux) {
195 #if !LOG_DISABLED
196         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
197         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
198 #endif
199
200         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
201         m_demux = nullptr;
202     }
203
204     if (m_appsink) {
205         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
206         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
207         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
208
209 #if !LOG_DISABLED
210         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
211 #endif
212
213         m_appsink = nullptr;
214     }
215
216     m_appsinkCaps = nullptr;
217     m_demuxerSrcPadCaps = nullptr;
218 };
219
220 void AppendPipeline::clearPlayerPrivate()
221 {
222     ASSERT(WTF::isMainThread());
223     GST_DEBUG("cleaning private player");
224
225     {
226         LockHolder locker(m_newSampleLock);
227         // Make sure that AppendPipeline won't process more data from now on and
228         // instruct handleNewSample to abort itself from now on as well.
229         setAppendState(AppendState::Invalid);
230
231         // Awake any pending handleNewSample operation in the streaming thread.
232         m_newSampleCondition.notifyOne();
233     }
234
235     {
236         LockHolder locker(m_padAddRemoveLock);
237         m_playerPrivate = nullptr;
238         m_padAddRemoveCondition.notifyOne();
239     }
240
241     // And now that no handleNewSample operations will remain stalled waiting
242     // for the main thread, stop the pipeline.
243     if (m_pipeline)
244         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
245 }
246
247 void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
248 {
249     const gchar* contextType = nullptr;
250     gst_message_parse_context_type(message, &contextType);
251     GST_TRACE("context type: %s", contextType);
252     if (!g_strcmp0(contextType, "drm-preferred-decryption-system-id"))
253         setAppendState(AppendPipeline::AppendState::KeyNegotiation);
254
255     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
256     if (m_playerPrivate)
257         m_playerPrivate->handleSyncMessage(message);
258 }
259
260 void AppendPipeline::handleApplicationMessage(GstMessage* message)
261 {
262     ASSERT(WTF::isMainThread());
263
264     const GstStructure* structure = gst_message_get_structure(message);
265
266     if (gst_structure_has_name(structure, "appsrc-need-data")) {
267         handleAppsrcNeedDataReceived();
268         return;
269     }
270
271     if (gst_structure_has_name(structure, "appsrc-buffer-left")) {
272         handleAppsrcAtLeastABufferLeft();
273         return;
274     }
275
276     if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
277         GRefPtr<GstPad> demuxerSrcPad;
278         gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
279         ASSERT(demuxerSrcPad);
280         connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
281         return;
282     }
283
284     if (gst_structure_has_name(structure, "appsink-caps-changed")) {
285         appsinkCapsChanged();
286         return;
287     }
288
289     if (gst_structure_has_name(structure, "appsink-new-sample")) {
290         GRefPtr<GstSample> newSample;
291         gst_structure_get(structure, "new-sample", GST_TYPE_SAMPLE, &newSample.outPtr(), nullptr);
292
293         appsinkNewSample(newSample.get());
294         return;
295     }
296
297     if (gst_structure_has_name(structure, "appsink-eos")) {
298         appsinkEOS();
299         return;
300     }
301
302     ASSERT_NOT_REACHED();
303 }
304
305 void AppendPipeline::handleAppsrcNeedDataReceived()
306 {
307     if (!m_appsrcAtLeastABufferLeft) {
308         GST_TRACE("discarding until at least a buffer leaves appsrc");
309         return;
310     }
311
312     ASSERT(m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);
313     ASSERT(!m_appsrcNeedDataReceived);
314
315     GST_TRACE("received need-data from appsrc");
316
317     m_appsrcNeedDataReceived = true;
318     checkEndOfAppend();
319 }
320
321 void AppendPipeline::handleAppsrcAtLeastABufferLeft()
322 {
323     m_appsrcAtLeastABufferLeft = true;
324     GST_TRACE("received buffer-left from appsrc");
325 #if LOG_DISABLED
326     removeAppsrcDataLeavingProbe();
327 #endif
328 }
329
330 gint AppendPipeline::id()
331 {
332     ASSERT(WTF::isMainThread());
333
334     if (m_id)
335         return m_id;
336
337     static gint s_totalAudio = 0;
338     static gint s_totalVideo = 0;
339     static gint s_totalText = 0;
340
341     switch (m_streamType) {
342     case Audio:
343         m_id = ++s_totalAudio;
344         break;
345     case Video:
346         m_id = ++s_totalVideo;
347         break;
348     case Text:
349         m_id = ++s_totalText;
350         break;
351     case Unknown:
352     case Invalid:
353         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
354         ASSERT_NOT_REACHED();
355         break;
356     }
357
358     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
359
360     return m_id;
361 }
362
363 void AppendPipeline::setAppendState(AppendState newAppendState)
364 {
365     ASSERT(WTF::isMainThread());
366     // Valid transitions:
367     // NotStarted-->Ongoing-->DataStarve-->NotStarted
368     //           |         |            `->Aborting-->NotStarted
369     //           |         `->Sampling-ยทยทยท->Sampling-->LastSample-->NotStarted
370     //           |         |                                     `->Aborting-->NotStarted
371     //           |         `->KeyNegotiation-->Ongoing-->[...]
372     //           `->Aborting-->NotStarted
373     AppendState oldAppendState = m_appendState;
374     AppendState nextAppendState = AppendState::Invalid;
375
376     if (oldAppendState != newAppendState)
377         GST_TRACE("%s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
378
379     bool ok = false;
380
381     switch (oldAppendState) {
382     case AppendState::NotStarted:
383         switch (newAppendState) {
384         case AppendState::Ongoing:
385             ok = true;
386             gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
387             break;
388         case AppendState::NotStarted:
389             ok = true;
390             if (m_pendingBuffer) {
391                 GST_TRACE("pushing pending buffer %p", m_pendingBuffer.get());
392                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
393                 nextAppendState = AppendState::Ongoing;
394             }
395             break;
396         case AppendState::Aborting:
397             ok = true;
398             nextAppendState = AppendState::NotStarted;
399             break;
400         case AppendState::Invalid:
401             ok = true;
402             break;
403         default:
404             break;
405         }
406         break;
407     case AppendState::KeyNegotiation:
408         switch (newAppendState) {
409         case AppendState::Ongoing:
410         case AppendState::Invalid:
411             ok = true;
412             break;
413         default:
414             break;
415         }
416         break;
417     case AppendState::Ongoing:
418         switch (newAppendState) {
419         case AppendState::KeyNegotiation:
420         case AppendState::Sampling:
421         case AppendState::Invalid:
422             ok = true;
423             break;
424         case AppendState::DataStarve:
425             ok = true;
426             GST_DEBUG("received all pending samples");
427             m_sourceBufferPrivate->didReceiveAllPendingSamples();
428             if (m_abortPending)
429                 nextAppendState = AppendState::Aborting;
430             else
431                 nextAppendState = AppendState::NotStarted;
432             break;
433         default:
434             break;
435         }
436         break;
437     case AppendState::DataStarve:
438         switch (newAppendState) {
439         case AppendState::NotStarted:
440         case AppendState::Invalid:
441             ok = true;
442             break;
443         case AppendState::Aborting:
444             ok = true;
445             nextAppendState = AppendState::NotStarted;
446             break;
447         default:
448             break;
449         }
450         break;
451     case AppendState::Sampling:
452         switch (newAppendState) {
453         case AppendState::Sampling:
454         case AppendState::Invalid:
455             ok = true;
456             break;
457         case AppendState::LastSample:
458             ok = true;
459             GST_DEBUG("received all pending samples");
460             m_sourceBufferPrivate->didReceiveAllPendingSamples();
461             if (m_abortPending)
462                 nextAppendState = AppendState::Aborting;
463             else
464                 nextAppendState = AppendState::NotStarted;
465             break;
466         default:
467             break;
468         }
469         break;
470     case AppendState::LastSample:
471         switch (newAppendState) {
472         case AppendState::NotStarted:
473         case AppendState::Invalid:
474             ok = true;
475             break;
476         case AppendState::Aborting:
477             ok = true;
478             nextAppendState = AppendState::NotStarted;
479             break;
480         default:
481             break;
482         }
483         break;
484     case AppendState::Aborting:
485         switch (newAppendState) {
486         case AppendState::NotStarted:
487             ok = true;
488             resetPipeline();
489             m_abortPending = false;
490             nextAppendState = AppendState::NotStarted;
491             break;
492         case AppendState::Invalid:
493             ok = true;
494             break;
495         default:
496             break;
497         }
498         break;
499     case AppendState::Invalid:
500         ok = true;
501         break;
502     }
503
504     if (ok)
505         m_appendState = newAppendState;
506     else
507         GST_ERROR("Invalid append state transition %s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
508
509     ASSERT(ok);
510
511     if (nextAppendState != AppendState::Invalid)
512         setAppendState(nextAppendState);
513 }
514
515 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
516 {
517     ASSERT(WTF::isMainThread());
518
519     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
520     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
521
522     GstStructure* structure = gst_caps_get_structure(m_demuxerSrcPadCaps.get(), 0);
523     bool sizeConfigured = false;
524
525 #if GST_CHECK_VERSION(1, 5, 3) && ENABLE(ENCRYPTED_MEDIA)
526     if (gst_structure_has_name(structure, "application/x-cenc")) {
527         // Any previous decryptor should have been removed from the pipeline by disconnectFromAppSinkFromStreamingThread()
528         ASSERT(!m_decryptor);
529
530         m_decryptor = WebCore::createGstDecryptor(gst_structure_get_string(structure, "protection-system"));
531         if (!m_decryptor) {
532             GST_ERROR("decryptor not found for caps: %" GST_PTR_FORMAT, m_demuxerSrcPadCaps.get());
533             return;
534         }
535
536         const gchar* originalMediaType = gst_structure_get_string(structure, "original-media-type");
537
538         if (!MediaPlayerPrivateGStreamerMSE::supportsCodecs(originalMediaType)) {
539             m_presentationSize = WebCore::FloatSize();
540             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
541         } else if (g_str_has_prefix(originalMediaType, "video/")) {
542             int width = 0;
543             int height = 0;
544             float finalHeight = 0;
545
546             if (gst_structure_get_int(structure, "width", &width) && gst_structure_get_int(structure, "height", &height)) {
547                 int ratioNumerator = 1;
548                 int ratioDenominator = 1;
549
550                 gst_structure_get_fraction(structure, "pixel-aspect-ratio", &ratioNumerator, &ratioDenominator);
551                 finalHeight = height * ((float) ratioDenominator / (float) ratioNumerator);
552             }
553
554             m_presentationSize = WebCore::FloatSize(width, finalHeight);
555             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
556         } else {
557             m_presentationSize = WebCore::FloatSize();
558             if (g_str_has_prefix(originalMediaType, "audio/"))
559                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
560             else if (g_str_has_prefix(originalMediaType, "text/"))
561                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
562         }
563         sizeConfigured = true;
564     }
565 #endif
566
567     if (!sizeConfigured) {
568         const char* structureName = gst_structure_get_name(structure);
569         GstVideoInfo info;
570
571         if (!MediaPlayerPrivateGStreamerMSE::supportsCodecs(structureName)) {
572             m_presentationSize = WebCore::FloatSize();
573             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
574         } else if (g_str_has_prefix(structureName, "video/") && gst_video_info_from_caps(&info, demuxerSrcPadCaps)) {
575             float width, height;
576
577             width = info.width;
578             height = info.height * ((float) info.par_d / (float) info.par_n);
579
580             m_presentationSize = WebCore::FloatSize(width, height);
581             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
582         } else {
583             m_presentationSize = WebCore::FloatSize();
584             if (g_str_has_prefix(structureName, "audio/"))
585                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
586             else if (g_str_has_prefix(structureName, "text/"))
587                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
588         }
589     }
590 }
591
592 void AppendPipeline::appsinkCapsChanged()
593 {
594     ASSERT(WTF::isMainThread());
595
596     if (!m_appsink)
597         return;
598
599     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
600     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
601
602     if (!caps)
603         return;
604
605     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
606     bool previousCapsWereNull = !m_appsinkCaps;
607
608     if (m_appsinkCaps != caps) {
609         m_appsinkCaps = WTFMove(caps);
610         if (m_playerPrivate && previousCapsWereNull)
611             m_playerPrivate->trackDetected(this, m_oldTrack, m_track);
612         didReceiveInitializationSegment();
613         gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
614     }
615 }
616
617 void AppendPipeline::checkEndOfAppend()
618 {
619     ASSERT(WTF::isMainThread());
620
621     if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling))
622         return;
623
624     GST_TRACE("end of append data mark was received");
625
626     switch (m_appendState) {
627     case AppendState::Ongoing:
628         GST_TRACE("DataStarve");
629         m_appsrcNeedDataReceived = false;
630         setAppendState(AppendState::DataStarve);
631         break;
632     case AppendState::Sampling:
633         GST_TRACE("LastSample");
634         m_appsrcNeedDataReceived = false;
635         setAppendState(AppendState::LastSample);
636         break;
637     default:
638         ASSERT_NOT_REACHED();
639         break;
640     }
641 }
642
643 void AppendPipeline::appsinkNewSample(GstSample* sample)
644 {
645     ASSERT(WTF::isMainThread());
646
647     {
648         LockHolder locker(m_newSampleLock);
649
650         // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
651         if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
652             GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
653             // FIXME: Return ERROR and find a more robust way to detect that all the
654             // data has been processed, so we don't need to resort to these hacks.
655             // All in all, return OK, even if it's not the proper thing to do. We don't want to break the demuxer.
656             m_flowReturn = GST_FLOW_OK;
657             m_newSampleCondition.notifyOne();
658             return;
659         }
660
661         RefPtr<GStreamerMediaSample> mediaSample = WebCore::GStreamerMediaSample::create(sample, m_presentationSize, trackId());
662
663         GST_TRACE("append: trackId=%s PTS=%f presentationSize=%.0fx%.0f", mediaSample->trackID().string().utf8().data(), mediaSample->presentationTime().toFloat(), mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
664
665         // If we're beyond the duration, ignore this sample and the remaining ones.
666         MediaTime duration = m_mediaSourceClient->duration();
667         if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
668             GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
669             setAppendState(AppendState::LastSample);
670             m_flowReturn = GST_FLOW_OK;
671             m_newSampleCondition.notifyOne();
672             return;
673         }
674
675         // Add a gap sample if a gap is detected before the first sample.
676         if (mediaSample->decodeTime() == MediaTime::zeroTime()
677             && mediaSample->presentationTime() > MediaTime::zeroTime()
678             && mediaSample->presentationTime() <= MediaTime::createWithDouble(0.1)) {
679             GST_DEBUG("Adding gap offset");
680             mediaSample->applyPtsOffset(MediaTime::zeroTime());
681         }
682
683         m_sourceBufferPrivate->didReceiveSample(*mediaSample);
684         setAppendState(AppendState::Sampling);
685         m_flowReturn = GST_FLOW_OK;
686         m_newSampleCondition.notifyOne();
687     }
688
689     checkEndOfAppend();
690 }
691
692 void AppendPipeline::appsinkEOS()
693 {
694     ASSERT(WTF::isMainThread());
695
696     switch (m_appendState) {
697     case AppendState::Aborting:
698         // Ignored. Operation completion will be managed by the Aborting->NotStarted transition.
699         return;
700     case AppendState::Ongoing:
701         // Finish Ongoing and Sampling states.
702         setAppendState(AppendState::DataStarve);
703         break;
704     case AppendState::Sampling:
705         setAppendState(AppendState::LastSample);
706         break;
707     default:
708         GST_DEBUG("Unexpected EOS");
709         break;
710     }
711 }
712
713 void AppendPipeline::didReceiveInitializationSegment()
714 {
715     ASSERT(WTF::isMainThread());
716
717     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
718
719     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
720     initializationSegment.duration = m_mediaSourceClient->duration();
721
722     switch (m_streamType) {
723     case Audio: {
724         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
725         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
726         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
727         initializationSegment.audioTracks.append(info);
728         break;
729     }
730     case Video: {
731         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
732         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
733         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
734         initializationSegment.videoTracks.append(info);
735         break;
736     }
737     default:
738         GST_ERROR("Unsupported stream type or codec");
739         break;
740     }
741
742     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
743 }
744
745 AtomicString AppendPipeline::trackId()
746 {
747     ASSERT(WTF::isMainThread());
748
749     if (!m_track)
750         return AtomicString();
751
752     return m_track->id();
753 }
754
755 void AppendPipeline::resetPipeline()
756 {
757     ASSERT(WTF::isMainThread());
758     GST_DEBUG("resetting pipeline");
759     m_appsrcAtLeastABufferLeft = false;
760     setAppsrcDataLeavingProbe();
761
762     {
763         LockHolder locker(m_newSampleLock);
764         m_newSampleCondition.notifyOne();
765         gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
766         gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
767     }
768
769 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
770     {
771         static unsigned i = 0;
772         // This is here for debugging purposes. It does not make sense to have it as class member.
773         WTF::String  dotFileName = String::format("reset-pipeline-%d", ++i);
774         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
775     }
776 #endif
777
778 }
779
780 void AppendPipeline::setAppsrcDataLeavingProbe()
781 {
782     if (m_appsrcDataLeavingProbeId)
783         return;
784
785     GST_TRACE("setting appsrc data leaving probe");
786
787     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
788     m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);
789 }
790
791 void AppendPipeline::removeAppsrcDataLeavingProbe()
792 {
793     if (!m_appsrcDataLeavingProbeId)
794         return;
795
796     GST_TRACE("removing appsrc data leaving probe");
797
798     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
799     gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);
800     m_appsrcDataLeavingProbeId = 0;
801 }
802
803 void AppendPipeline::abort()
804 {
805     ASSERT(WTF::isMainThread());
806     GST_DEBUG("aborting");
807
808     m_pendingBuffer = nullptr;
809
810     // Abort already ongoing.
811     if (m_abortPending)
812         return;
813
814     m_abortPending = true;
815     if (m_appendState == AppendState::NotStarted)
816         setAppendState(AppendState::Aborting);
817     // Else, the automatic state transitions will take care when the ongoing append finishes.
818 }
819
820 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
821 {
822     GstFlowReturn result;
823
824     if (m_abortPending) {
825         m_pendingBuffer = adoptGRef(buffer);
826         result = GST_FLOW_OK;
827     } else {
828         setAppendState(AppendPipeline::AppendState::Ongoing);
829         GST_TRACE("pushing new buffer %p", buffer);
830         result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer);
831     }
832
833     return result;
834 }
835
836 void AppendPipeline::reportAppsrcAtLeastABufferLeft()
837 {
838     GST_TRACE("buffer left appsrc, reposting to bus");
839     GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left");
840     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
841     gst_bus_post(m_bus.get(), message);
842 }
843
844 void AppendPipeline::reportAppsrcNeedDataReceived()
845 {
846     GST_TRACE("received need-data signal at appsrc, reposting to bus");
847     GstStructure* structure = gst_structure_new_empty("appsrc-need-data");
848     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
849     gst_bus_post(m_bus.get(), message);
850 }
851
852 GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
853 {
854     ASSERT(!WTF::isMainThread());
855
856     // Even if we're disabled, it's important to pull the sample out anyway to
857     // avoid deadlocks when changing to GST_STATE_NULL having a non empty appsink.
858     GRefPtr<GstSample> sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(appsink)));
859     LockHolder locker(m_newSampleLock);
860
861     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
862         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
863         return GST_FLOW_ERROR;
864     }
865
866     GstStructure* structure = gst_structure_new("appsink-new-sample", "new-sample", GST_TYPE_SAMPLE, sample.get(), nullptr);
867     GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
868     gst_bus_post(m_bus.get(), message);
869     GST_TRACE("appsink-new-sample message posted to bus");
870
871     m_newSampleCondition.wait(m_newSampleLock);
872     // We've been awaken because the sample was processed or because of
873     // an exceptional condition (entered in Invalid state, destructor, etc.).
874     // We can't reliably delete info here, appendPipelineAppsinkNewSampleMainThread will do it.
875
876     return m_flowReturn;
877 }
878
879 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
880 {
881     if (!m_appsink)
882         return;
883
884     GST_DEBUG("connecting to appsink");
885
886     if (m_demux->numsrcpads > 1) {
887         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
888         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
889         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
890         return;
891     }
892
893     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
894
895     // Only one stream per demuxer is supported.
896     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
897
898     gint64 timeLength = 0;
899     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
900         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
901         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
902     else
903         m_initialDuration = MediaTime::positiveInfiniteTime();
904
905     if (WTF::isMainThread())
906         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
907     else {
908         // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
909         LockHolder locker(m_padAddRemoveLock);
910         if (!m_playerPrivate)
911             return;
912
913         GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
914         GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
915         gst_bus_post(m_bus.get(), message);
916         GST_TRACE("demuxer-connect-to-appsink message posted to bus");
917
918         m_padAddRemoveCondition.wait(m_padAddRemoveLock);
919     }
920
921     // Must be done in the thread we were called from (usually streaming thread).
922     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
923         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
924         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
925
926     if (isData) {
927         // FIXME: Only add appsink one time. This method can be called several times.
928         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
929         if (!parent)
930             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
931
932 #if ENABLE(ENCRYPTED_MEDIA)
933         if (m_decryptor) {
934             gst_object_ref(m_decryptor.get());
935             gst_bin_add(GST_BIN(m_pipeline.get()), m_decryptor.get());
936
937             GRefPtr<GstPad> decryptorSinkPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "sink"));
938             gst_pad_link(demuxerSrcPad, decryptorSinkPad.get());
939
940             GRefPtr<GstPad> decryptorSrcPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "src"));
941             gst_pad_link(decryptorSrcPad.get(), appsinkSinkPad.get());
942
943             gst_element_sync_state_with_parent(m_appsink.get());
944             gst_element_sync_state_with_parent(m_decryptor.get());
945
946             if (m_pendingDecryptionStructure)
947                 dispatchPendingDecryptionStructure();
948         } else {
949 #endif
950             gst_pad_link(demuxerSrcPad, appsinkSinkPad.get());
951             gst_element_sync_state_with_parent(m_appsink.get());
952 #if ENABLE(ENCRYPTED_MEDIA)
953         }
954 #endif
955         gst_element_set_state(m_pipeline.get(), GST_STATE_PAUSED);
956     }
957 }
958
959 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
960 {
961     ASSERT(WTF::isMainThread());
962     GST_DEBUG("Connecting to appsink");
963
964     LockHolder locker(m_padAddRemoveLock);
965     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
966
967     // Only one stream per demuxer is supported.
968     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
969
970     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
971
972     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
973         m_padAddRemoveCondition.notifyOne();
974         return;
975     }
976
977 #ifndef GST_DISABLE_GST_DEBUG
978     {
979         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
980         GST_DEBUG("%s", strcaps.get());
981     }
982 #endif
983
984     if (m_initialDuration > m_mediaSourceClient->duration()
985         || (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime()))
986         m_mediaSourceClient->durationChanged(m_initialDuration);
987
988     m_oldTrack = m_track;
989
990     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
991
992     switch (m_streamType) {
993     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
994         if (m_playerPrivate)
995             m_track = WebCore::AudioTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
996         break;
997     case WebCore::MediaSourceStreamTypeGStreamer::Video:
998         if (m_playerPrivate)
999             m_track = WebCore::VideoTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
1000         break;
1001     case WebCore::MediaSourceStreamTypeGStreamer::Text:
1002         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
1003         break;
1004     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
1005         {
1006             GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1007             GST_DEBUG("Unsupported track codec: %s", strcaps.get());
1008         }
1009         // This is going to cause an error which will detach the SourceBuffer and tear down this
1010         // AppendPipeline, so we need the padAddRemove lock released before continuing.
1011         m_track = nullptr;
1012         m_padAddRemoveCondition.notifyOne();
1013         locker.unlockEarly();
1014         didReceiveInitializationSegment();
1015         return;
1016     default:
1017         // No useful data, but notify anyway to complete the append operation.
1018         GST_DEBUG("Received all pending samples (no data)");
1019         m_sourceBufferPrivate->didReceiveAllPendingSamples();
1020         break;
1021     }
1022
1023     m_padAddRemoveCondition.notifyOne();
1024 }
1025
1026 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad* demuxerSrcPad)
1027 {
1028     // Must be done in the thread we were called from (usually streaming thread).
1029     if (!gst_pad_is_linked(demuxerSrcPad)) {
1030         gulong probeId = GPOINTER_TO_ULONG(g_object_get_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId"));
1031         if (probeId) {
1032             GST_DEBUG("Disconnecting black hole probe.");
1033             g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", nullptr);
1034             gst_pad_remove_probe(demuxerSrcPad, probeId);
1035         } else
1036             GST_WARNING("Not disconnecting demuxer src pad because it wasn't linked");
1037         return;
1038     }
1039
1040     GST_DEBUG("Disconnecting appsink");
1041
1042 #if ENABLE(ENCRYPTED_MEDIA)
1043     if (m_decryptor) {
1044         gst_element_unlink(m_decryptor.get(), m_appsink.get());
1045         gst_element_unlink(m_demux.get(), m_decryptor.get());
1046         gst_element_set_state(m_decryptor.get(), GST_STATE_NULL);
1047         gst_bin_remove(GST_BIN(m_pipeline.get()), m_decryptor.get());
1048     } else
1049 #endif
1050         gst_element_unlink(m_demux.get(), m_appsink.get());
1051 }
1052
1053 #if ENABLE(ENCRYPTED_MEDIA)
1054 void AppendPipeline::dispatchPendingDecryptionStructure()
1055 {
1056     ASSERT(m_decryptor);
1057     ASSERT(m_pendingDecryptionStructure);
1058     ASSERT(m_appendState == KeyNegotiation);
1059     GST_TRACE("dispatching key to append pipeline %p", this);
1060
1061     // Release the m_pendingDecryptionStructure object since
1062     // gst_event_new_custom() takes over ownership of it.
1063     gst_element_send_event(m_pipeline.get(), gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_OOB, m_pendingDecryptionStructure.release()));
1064
1065     setAppendState(AppendState::Ongoing);
1066 }
1067
1068 void AppendPipeline::dispatchDecryptionStructure(GUniquePtr<GstStructure>&& structure)
1069 {
1070     if (m_appendState == AppendState::KeyNegotiation) {
1071         GST_TRACE("append pipeline %p in key negotiation", this);
1072         m_pendingDecryptionStructure = WTFMove(structure);
1073         if (m_decryptor)
1074             dispatchPendingDecryptionStructure();
1075         else
1076             GST_TRACE("no decryptor yet, waiting for it");
1077     } else
1078         GST_TRACE("append pipeline %p not in key negotiation", this);
1079 }
1080 #endif
1081
1082 static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
1083 {
1084     GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
1085     GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
1086     gst_bus_post(appendPipeline->bus(), message);
1087     GST_TRACE("appsink-caps-changed message posted to bus");
1088 }
1089
1090 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)
1091 {
1092     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1093
1094     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1095     gsize bufferSize = gst_buffer_get_size(buffer);
1096
1097     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);
1098
1099     appendPipeline->reportAppsrcAtLeastABufferLeft();
1100
1101     return GST_PAD_PROBE_OK;
1102 }
1103
1104 #if !LOG_DISABLED
1105 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
1106 {
1107     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1108     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1109     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
1110     return GST_PAD_PROBE_OK;
1111 }
1112 #endif
1113
1114 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
1115 {
1116     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1117     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1118     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
1119     return GST_PAD_PROBE_DROP;
1120 }
1121
1122 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)
1123 {
1124     appendPipeline->reportAppsrcNeedDataReceived();
1125 }
1126
1127 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1128 {
1129     appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
1130 }
1131
1132 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1133 {
1134     appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
1135 }
1136
1137 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
1138 {
1139     return appendPipeline->handleNewAppsinkSample(appsink);
1140 }
1141
1142 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
1143 {
1144     if (WTF::isMainThread())
1145         appendPipeline->appsinkEOS();
1146     else {
1147         GstStructure* structure = gst_structure_new_empty("appsink-eos");
1148         GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
1149         gst_bus_post(appendPipeline->bus(), message);
1150         GST_TRACE("appsink-eos message posted to bus");
1151     }
1152
1153     GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
1154 }
1155
1156
1157
1158 } // namespace WebCore.
1159
1160 #endif // USE(GSTREAMER)