[GStreamer][EME] waitingforkey event should consider decryptors' waiting status
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016, 2017 Metrological Group B.V.
3  * Copyright (C) 2016, 2017 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GStreamerCommon.h"
28 #include "GStreamerEMEUtilities.h"
29 #include "GStreamerMediaDescription.h"
30 #include "MediaSampleGStreamer.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35 #include <gst/app/gstappsink.h>
36 #include <gst/app/gstappsrc.h>
37 #include <gst/gst.h>
38 #include <gst/pbutils/pbutils.h>
39 #include <gst/video/video.h>
40 #include <wtf/Condition.h>
41 #include <wtf/glib/GLibUtilities.h>
42 #include <wtf/glib/RunLoopSourcePriority.h>
43
44 GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
45 #define GST_CAT_DEFAULT webkit_mse_debug
46
47 namespace WebCore {
48
49 GType AppendPipeline::s_endOfAppendMetaType = 0;
50 const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr;
51 std::once_flag AppendPipeline::s_staticInitializationFlag;
52
53 struct EndOfAppendMeta {
54     GstMeta base;
55     static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; }
56     static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); }
57     static void free(GstMeta*, GstBuffer*) { }
58 };
59
60 void AppendPipeline::staticInitialization()
61 {
62     ASSERT(WTF::isMainThread());
63
64     const char* tags[] = { nullptr };
65     s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
66     s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
67 }
68
69 static const char* dumpAppendState(AppendPipeline::AppendState appendState)
70 {
71     switch (appendState) {
72     case AppendPipeline::AppendState::Invalid:
73         return "Invalid";
74     case AppendPipeline::AppendState::NotStarted:
75         return "NotStarted";
76     case AppendPipeline::AppendState::Ongoing:
77         return "Ongoing";
78     case AppendPipeline::AppendState::DataStarve:
79         return "DataStarve";
80     case AppendPipeline::AppendState::Sampling:
81         return "Sampling";
82     case AppendPipeline::AppendState::LastSample:
83         return "LastSample";
84     case AppendPipeline::AppendState::Aborting:
85         return "Aborting";
86     default:
87         return "(unknown)";
88     }
89 }
90
91 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
92 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
93 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
94 #if !LOG_DISABLED
95 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
96 #endif
97
98 #if ENABLE(ENCRYPTED_MEDIA)
99 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
100 #endif
101
102 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
103 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
104 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
105 static void appendPipelineDemuxerNoMorePads(GstElement*, AppendPipeline*);
106
107 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
108
109 static void appendPipelineNeedContextMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
110 {
111     GST_TRACE("received callback");
112     appendPipeline->handleNeedContextSyncMessage(message);
113 }
114
115 static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
116 {
117     appendPipeline->handleApplicationMessage(message);
118 }
119
120 static void appendPipelineStateChangeMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
121 {
122     appendPipeline->handleStateChangeMessage(message);
123 }
124
125 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
126     : m_mediaSourceClient(mediaSourceClient.get())
127     , m_sourceBufferPrivate(sourceBufferPrivate.get())
128     , m_playerPrivate(&playerPrivate)
129     , m_id(0)
130     , m_wasBusAlreadyNotifiedOfAvailableSamples(false)
131     , m_appendState(AppendState::NotStarted)
132     , m_abortPending(false)
133     , m_streamType(Unknown)
134 {
135     ASSERT(WTF::isMainThread());
136     std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
137
138     GST_TRACE("Creating AppendPipeline (%p)", this);
139
140     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
141     // The track name is still unknown at this time, though.
142     static size_t appendPipelineCount = 0;
143     String pipelineName = String::format("append-pipeline-%s-%zu",
144         m_sourceBufferPrivate->type().containerType().replace("/", "-").utf8().data(), appendPipelineCount++);
145     m_pipeline = gst_pipeline_new(pipelineName.utf8().data());
146
147     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
148     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
149     gst_bus_enable_sync_message_emission(m_bus.get());
150
151     g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(appendPipelineNeedContextMessageCallback), this);
152     g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
153     g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(appendPipelineStateChangeMessageCallback), this);
154
155     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
156     // below will already take the initial reference and we need an additional one for us.
157     m_appsrc = gst_element_factory_make("appsrc", nullptr);
158
159     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
160     gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) {
161         return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo);
162     }, this, nullptr);
163
164     const String& type = m_sourceBufferPrivate->type().containerType();
165     if (type.endsWith("mp4"))
166         m_demux = gst_element_factory_make("qtdemux", nullptr);
167     else if (type.endsWith("webm"))
168         m_demux = gst_element_factory_make("matroskademux", nullptr);
169     else
170         ASSERT_NOT_REACHED();
171
172     m_appsink = gst_element_factory_make("appsink", nullptr);
173
174     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
175     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
176     gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
177
178     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
179     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
180
181 #if !LOG_DISABLED
182     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
183     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
184     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
185     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
186     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
187     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
188     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
189 #endif
190
191 #if ENABLE(ENCRYPTED_MEDIA)
192     m_appsinkPadEventProbeInformation.appendPipeline = this;
193     m_appsinkPadEventProbeInformation.description = "appsink event probe";
194     m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr);
195 #endif
196
197     // These signals won't be connected outside of the lifetime of "this".
198     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
199     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
200     g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(appendPipelineDemuxerNoMorePads), this);
201     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
202     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
203
204     // Add_many will take ownership of a reference. That's why we used an assignment before.
205     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
206     gst_element_link(m_appsrc.get(), m_demux.get());
207
208     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
209 };
210
211 AppendPipeline::~AppendPipeline()
212 {
213     ASSERT(WTF::isMainThread());
214
215     setAppendState(AppendState::Invalid);
216
217     {
218         LockHolder locker(m_padAddRemoveLock);
219         m_playerPrivate = nullptr;
220         m_padAddRemoveCondition.notifyOne();
221     }
222
223     GST_TRACE("Destroying AppendPipeline (%p)", this);
224
225     // FIXME: Maybe notify appendComplete here?
226
227     if (m_pipeline) {
228         ASSERT(m_bus);
229         g_signal_handlers_disconnect_by_func(m_bus.get(), reinterpret_cast<gpointer>(appendPipelineNeedContextMessageCallback), this);
230         gst_bus_disable_sync_message_emission(m_bus.get());
231         gst_bus_remove_signal_watch(m_bus.get());
232         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
233         m_pipeline = nullptr;
234     }
235
236     if (m_appsrc) {
237         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
238         m_appsrc = nullptr;
239     }
240
241     if (m_demux) {
242 #if !LOG_DISABLED
243         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
244         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
245 #endif
246
247         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
248         m_demux = nullptr;
249     }
250
251     if (m_appsink) {
252         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
253         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
254         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
255
256 #if !LOG_DISABLED
257         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
258 #endif
259
260 #if ENABLE(ENCRYPTED_MEDIA)
261         gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId);
262 #endif
263         m_appsink = nullptr;
264     }
265
266     m_appsinkCaps = nullptr;
267     m_demuxerSrcPadCaps = nullptr;
268 };
269
270 GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
271 {
272     ASSERT(!WTF::isMainThread());
273     m_streamingThread = &WTF::Thread::current();
274
275     GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
276     ASSERT(GST_IS_BUFFER(buffer));
277
278     EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType));
279     if (!endOfAppendMeta) {
280         // Normal buffer, nothing to do.
281         return GST_PAD_PROBE_OK;
282     }
283
284     GST_TRACE_OBJECT(m_pipeline.get(), "posting end-of-append request to bus");
285     GstStructure* structure = gst_structure_new_empty("end-of-append");
286     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
287     gst_bus_post(m_bus.get(), message);
288     return GST_PAD_PROBE_DROP;
289 }
290
291 void AppendPipeline::clearPlayerPrivate()
292 {
293     ASSERT(WTF::isMainThread());
294     GST_DEBUG("cleaning private player");
295
296     // Make sure that AppendPipeline won't process more data from now on and
297     // instruct handleNewSample to abort itself from now on as well.
298     setAppendState(AppendState::Invalid);
299
300     {
301         LockHolder locker(m_padAddRemoveLock);
302         m_playerPrivate = nullptr;
303         m_padAddRemoveCondition.notifyOne();
304     }
305
306     // And now that no handleNewSample operations will remain stalled waiting
307     // for the main thread, stop the pipeline.
308     if (m_pipeline)
309         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
310 }
311
312 void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
313 {
314     const gchar* contextType = nullptr;
315     gst_message_parse_context_type(message, &contextType);
316     GST_TRACE("context type: %s", contextType);
317
318     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
319     if (m_playerPrivate)
320         m_playerPrivate->handleSyncMessage(message);
321 }
322
323 void AppendPipeline::handleApplicationMessage(GstMessage* message)
324 {
325     ASSERT(WTF::isMainThread());
326
327     const GstStructure* structure = gst_message_get_structure(message);
328
329     if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
330         GRefPtr<GstPad> demuxerSrcPad;
331         gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
332         ASSERT(demuxerSrcPad);
333         connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
334         return;
335     }
336
337     if (gst_structure_has_name(structure, "appsink-caps-changed")) {
338         appsinkCapsChanged();
339         return;
340     }
341
342     if (gst_structure_has_name(structure, "appsink-new-sample")) {
343         m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
344         consumeAppsinkAvailableSamples();
345         return;
346     }
347
348     if (gst_structure_has_name(structure, "appsink-eos")) {
349         appsinkEOS();
350         return;
351     }
352
353     if (gst_structure_has_name(structure, "demuxer-no-more-pads")) {
354         demuxerNoMorePads();
355         return;
356     }
357
358     if (gst_structure_has_name(structure, "end-of-append")) {
359         handleEndOfAppend();
360         return;
361     }
362
363     ASSERT_NOT_REACHED();
364 }
365
366 void AppendPipeline::demuxerNoMorePads()
367 {
368     GST_TRACE("calling didReceiveInitializationSegment");
369     didReceiveInitializationSegment();
370     GST_TRACE("set pipeline to playing");
371     gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
372 }
373
374 void AppendPipeline::handleStateChangeMessage(GstMessage* message)
375 {
376     ASSERT(WTF::isMainThread());
377
378     if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
379         GstState currentState, newState;
380         gst_message_parse_state_changed(message, &currentState, &newState, nullptr);
381         CString sourceBufferType = String(m_sourceBufferPrivate->type().raw())
382             .replace("/", "_").replace(" ", "_")
383             .replace("\"", "").replace("\'", "").utf8();
384         CString dotFileName = String::format("webkit-append-%s-%s_%s",
385             sourceBufferType.data(),
386             gst_element_state_get_name(currentState),
387             gst_element_state_get_name(newState)).utf8();
388         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data());
389     }
390 }
391
392 gint AppendPipeline::id()
393 {
394     ASSERT(WTF::isMainThread());
395
396     if (m_id)
397         return m_id;
398
399     static gint s_totalAudio = 0;
400     static gint s_totalVideo = 0;
401     static gint s_totalText = 0;
402
403     switch (m_streamType) {
404     case Audio:
405         m_id = ++s_totalAudio;
406         break;
407     case Video:
408         m_id = ++s_totalVideo;
409         break;
410     case Text:
411         m_id = ++s_totalText;
412         break;
413     case Unknown:
414     case Invalid:
415         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
416         ASSERT_NOT_REACHED();
417         break;
418     }
419
420     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
421
422     return m_id;
423 }
424
425 void AppendPipeline::setAppendState(AppendState newAppendState)
426 {
427     ASSERT(WTF::isMainThread());
428     // Valid transitions:
429     // NotStarted-->Ongoing-->DataStarve-->NotStarted
430     //           |         |            `->Aborting-->NotStarted
431     //           |         `->Sampling-ยทยทยท->Sampling-->LastSample-->NotStarted
432     //           |                                               `->Aborting-->NotStarted
433     //           `->Aborting-->NotStarted
434     AppendState oldAppendState = m_appendState;
435     AppendState nextAppendState = AppendState::Invalid;
436
437     if (oldAppendState != newAppendState)
438         GST_TRACE("%s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
439
440     bool ok = false;
441
442     switch (oldAppendState) {
443     case AppendState::NotStarted:
444         switch (newAppendState) {
445         case AppendState::Ongoing:
446             ok = true;
447             gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
448             break;
449         case AppendState::NotStarted:
450             ok = true;
451             if (m_pendingBuffer) {
452                 GST_TRACE("pushing pending buffer %" GST_PTR_FORMAT, m_pendingBuffer.get());
453                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
454                 nextAppendState = AppendState::Ongoing;
455             }
456             break;
457         case AppendState::Aborting:
458             ok = true;
459             nextAppendState = AppendState::NotStarted;
460             break;
461         case AppendState::Invalid:
462             ok = true;
463             break;
464         default:
465             break;
466         }
467         break;
468     case AppendState::Ongoing:
469         switch (newAppendState) {
470         case AppendState::Sampling:
471         case AppendState::Invalid:
472             ok = true;
473             break;
474         case AppendState::DataStarve:
475             ok = true;
476             GST_DEBUG("received all pending samples");
477             m_sourceBufferPrivate->didReceiveAllPendingSamples();
478             if (m_abortPending)
479                 nextAppendState = AppendState::Aborting;
480             else
481                 nextAppendState = AppendState::NotStarted;
482             break;
483         default:
484             break;
485         }
486         break;
487     case AppendState::DataStarve:
488         switch (newAppendState) {
489         case AppendState::NotStarted:
490         case AppendState::Invalid:
491             ok = true;
492             break;
493         case AppendState::Aborting:
494             ok = true;
495             nextAppendState = AppendState::NotStarted;
496             break;
497         default:
498             break;
499         }
500         break;
501     case AppendState::Sampling:
502         switch (newAppendState) {
503         case AppendState::Sampling:
504         case AppendState::Invalid:
505             ok = true;
506             break;
507         case AppendState::LastSample:
508             ok = true;
509             GST_DEBUG("received all pending samples");
510             m_sourceBufferPrivate->didReceiveAllPendingSamples();
511             if (m_abortPending)
512                 nextAppendState = AppendState::Aborting;
513             else
514                 nextAppendState = AppendState::NotStarted;
515             break;
516         default:
517             break;
518         }
519         break;
520     case AppendState::LastSample:
521         switch (newAppendState) {
522         case AppendState::NotStarted:
523         case AppendState::Invalid:
524             ok = true;
525             break;
526         case AppendState::Aborting:
527             ok = true;
528             nextAppendState = AppendState::NotStarted;
529             break;
530         default:
531             break;
532         }
533         break;
534     case AppendState::Aborting:
535         switch (newAppendState) {
536         case AppendState::NotStarted:
537             ok = true;
538             resetPipeline();
539             m_abortPending = false;
540             nextAppendState = AppendState::NotStarted;
541             break;
542         case AppendState::Invalid:
543             ok = true;
544             break;
545         default:
546             break;
547         }
548         break;
549     case AppendState::Invalid:
550         ok = true;
551         break;
552     }
553
554     if (ok)
555         m_appendState = newAppendState;
556     else
557         GST_ERROR("Invalid append state transition %s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
558
559     ASSERT(ok);
560
561     if (nextAppendState != AppendState::Invalid)
562         setAppendState(nextAppendState);
563 }
564
565 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
566 {
567     ASSERT(WTF::isMainThread());
568
569     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
570     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
571
572     const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get());
573     if (!MediaPlayerPrivateGStreamerMSE::supportsCodec(originalMediaType)) {
574             m_presentationSize = WebCore::FloatSize();
575             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
576     } else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
577         std::optional<FloatSize> size = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get());
578         if (size.has_value())
579             m_presentationSize = size.value();
580         else
581             m_presentationSize = WebCore::FloatSize();
582
583         m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
584     } else {
585         m_presentationSize = WebCore::FloatSize();
586         if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX))
587             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
588         else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX))
589             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
590     }
591 }
592
593 void AppendPipeline::appsinkCapsChanged()
594 {
595     ASSERT(WTF::isMainThread());
596
597     if (!m_appsink)
598         return;
599
600     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
601     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
602
603     if (!caps)
604         return;
605
606     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
607     bool previousCapsWereNull = !m_appsinkCaps;
608
609     if (m_appsinkCaps != caps) {
610         m_appsinkCaps = WTFMove(caps);
611         if (m_playerPrivate)
612             m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull);
613         gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
614     }
615 }
616
617 void AppendPipeline::handleEndOfAppend()
618 {
619     ASSERT(WTF::isMainThread());
620     GST_TRACE_OBJECT(m_pipeline.get(), "received end-of-append");
621
622     // Regardless of the state transition, the result is the same: didReceiveAllPendingSamples() is called.
623     switch (m_appendState) {
624     case AppendState::Ongoing:
625         GST_TRACE("DataStarve");
626         setAppendState(AppendState::DataStarve);
627         break;
628     case AppendState::Sampling:
629         GST_TRACE("LastSample");
630         setAppendState(AppendState::LastSample);
631         break;
632     default:
633         ASSERT_NOT_REACHED();
634         break;
635     }
636 }
637
638 void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
639 {
640     ASSERT(WTF::isMainThread());
641
642     if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
643         GST_WARNING("Received sample without buffer from appsink.");
644         return;
645     }
646
647     RefPtr<MediaSampleGStreamer> mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());
648
649     GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
650         mediaSample->trackID().string().utf8().data(),
651         mediaSample->presentationTime().toString().utf8().data(),
652         mediaSample->decodeTime().toString().utf8().data(),
653         mediaSample->duration().toString().utf8().data(),
654         mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
655
656     // If we're beyond the duration, ignore this sample and the remaining ones.
657     MediaTime duration = m_mediaSourceClient->duration();
658     if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
659         GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
660         setAppendState(AppendState::LastSample);
661         return;
662     }
663
664     // Add a gap sample if a gap is detected before the first sample.
665     if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) {
666         GST_DEBUG("Adding gap offset");
667         mediaSample->applyPtsOffset(MediaTime::zeroTime());
668     }
669
670     m_sourceBufferPrivate->didReceiveSample(*mediaSample);
671     setAppendState(AppendState::Sampling);
672 }
673
674 void AppendPipeline::appsinkEOS()
675 {
676     ASSERT(WTF::isMainThread());
677
678     switch (m_appendState) {
679     case AppendState::Aborting:
680         // Ignored. Operation completion will be managed by the Aborting->NotStarted transition.
681         return;
682     case AppendState::Ongoing:
683         // Finish Ongoing and Sampling states.
684         setAppendState(AppendState::DataStarve);
685         break;
686     case AppendState::Sampling:
687         setAppendState(AppendState::LastSample);
688         break;
689     default:
690         GST_DEBUG("Unexpected EOS");
691         break;
692     }
693 }
694
695 void AppendPipeline::didReceiveInitializationSegment()
696 {
697     ASSERT(WTF::isMainThread());
698
699     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
700
701     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
702     initializationSegment.duration = m_mediaSourceClient->duration();
703
704     switch (m_streamType) {
705     case Audio: {
706         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
707         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
708         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
709         initializationSegment.audioTracks.append(info);
710         break;
711     }
712     case Video: {
713         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
714         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
715         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
716         initializationSegment.videoTracks.append(info);
717         break;
718     }
719     default:
720         GST_ERROR("Unsupported stream type or codec");
721         break;
722     }
723
724     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
725 }
726
727 AtomicString AppendPipeline::trackId()
728 {
729     ASSERT(WTF::isMainThread());
730
731     if (!m_track)
732         return AtomicString();
733
734     return m_track->id();
735 }
736
737 void AppendPipeline::consumeAppsinkAvailableSamples()
738 {
739     ASSERT(WTF::isMainThread());
740
741     GRefPtr<GstSample> sample;
742     int batchedSampleCount = 0;
743     while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
744         appsinkNewSample(WTFMove(sample));
745         batchedSampleCount++;
746     }
747
748     GST_TRACE_OBJECT(m_pipeline.get(), "batchedSampleCount = %d", batchedSampleCount);
749 }
750
751 void AppendPipeline::resetPipeline()
752 {
753     ASSERT(WTF::isMainThread());
754     GST_DEBUG("resetting pipeline");
755
756     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
757     gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
758
759 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
760     {
761         static unsigned i = 0;
762         // This is here for debugging purposes. It does not make sense to have it as class member.
763         WTF::String  dotFileName = String::format("reset-pipeline-%d", ++i);
764         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
765     }
766 #endif
767
768 }
769
770 void AppendPipeline::abort()
771 {
772     ASSERT(WTF::isMainThread());
773     GST_DEBUG("aborting");
774
775     m_pendingBuffer = nullptr;
776
777     // Abort already ongoing.
778     if (m_abortPending)
779         return;
780
781     m_abortPending = true;
782     if (m_appendState == AppendState::NotStarted)
783         setAppendState(AppendState::Aborting);
784     // Else, the automatic state transitions will take care when the ongoing append finishes.
785 }
786
787 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
788 {
789     if (m_abortPending) {
790         m_pendingBuffer = adoptGRef(buffer);
791         return GST_FLOW_OK;
792     }
793
794     setAppendState(AppendPipeline::AppendState::Ongoing);
795
796     GST_TRACE_OBJECT(m_pipeline.get(), "pushing data buffer %" GST_PTR_FORMAT, buffer);
797     GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer);
798     // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should
799     // be true at this point.
800     g_return_val_if_fail(pushDataBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
801
802     // Push an additional empty buffer that marks the end of the append.
803     // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful
804     // completion of the append.
805     //
806     // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the
807     // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case
808     // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous
809     // buffer has completed.
810
811     GstBuffer* endOfAppendBuffer = gst_buffer_new();
812     gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr);
813
814     GST_TRACE_OBJECT(m_pipeline.get(), "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer);
815     GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer);
816     g_return_val_if_fail(pushEndOfAppendBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
817
818     return GST_FLOW_OK;
819 }
820
821 GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
822 {
823     ASSERT(!WTF::isMainThread());
824     if (&WTF::Thread::current() != m_streamingThread) {
825         // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
826         // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
827         // This error will only raise if someone modifies the pipeline to include more than one streaming thread or
828         // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken.
829         // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has
830         // been demuxed completely.;
831         g_critical("Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run.");
832         ASSERT_NOT_REACHED();
833     }
834
835     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
836         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
837         return GST_FLOW_ERROR;
838     }
839
840     if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
841         GstStructure* structure = gst_structure_new_empty("appsink-new-sample");
842         GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
843         gst_bus_post(m_bus.get(), message);
844         GST_TRACE("appsink-new-sample message posted to bus");
845     }
846
847     return GST_FLOW_OK;
848 }
849
850 static GRefPtr<GstElement>
851 createOptionalParserForFormat(GstPad* demuxerSrcPad)
852 {
853     GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
854     GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
855     const char* mediaType = gst_structure_get_name(structure);
856
857     GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
858     GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
859
860     if (!g_strcmp0(mediaType, "audio/x-opus")) {
861         GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get());
862         ASSERT(opusparse);
863         g_return_val_if_fail(opusparse, nullptr);
864         return GRefPtr<GstElement>(opusparse);
865     }
866     if (!g_strcmp0(mediaType, "audio/x-vorbis")) {
867         GstElement* vorbisparse = gst_element_factory_make("vorbisparse", parserName.get());
868         ASSERT(vorbisparse);
869         g_return_val_if_fail(vorbisparse, nullptr);
870         return GRefPtr<GstElement>(vorbisparse);
871     }
872     if (!g_strcmp0(mediaType, "video/x-h264")) {
873         GstElement* h264parse = gst_element_factory_make("h264parse", parserName.get());
874         ASSERT(h264parse);
875         g_return_val_if_fail(h264parse, nullptr);
876         return GRefPtr<GstElement>(h264parse);
877     }
878
879     return nullptr;
880 }
881
882 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
883 {
884     if (!m_appsink)
885         return;
886
887     GST_DEBUG("connecting to appsink");
888
889     if (m_demux->numsrcpads > 1) {
890         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
891         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
892         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
893         return;
894     }
895
896     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
897
898     // Only one stream per demuxer is supported.
899     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
900
901     gint64 timeLength = 0;
902     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
903         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
904         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
905     else
906         m_initialDuration = MediaTime::positiveInfiniteTime();
907
908     if (WTF::isMainThread())
909         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
910     else {
911         // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
912         LockHolder locker(m_padAddRemoveLock);
913         if (!m_playerPrivate)
914             return;
915
916         GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
917         GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
918         gst_bus_post(m_bus.get(), message);
919         GST_TRACE("demuxer-connect-to-appsink message posted to bus");
920
921         m_padAddRemoveCondition.wait(m_padAddRemoveLock);
922
923         if (!m_playerPrivate)
924             return;
925     }
926
927     // Must be done in the thread we were called from (usually streaming thread).
928     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
929         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
930         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
931
932     if (isData) {
933         // FIXME: Only add appsink one time. This method can be called several times.
934         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
935         if (!parent)
936             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
937
938         // Current head of the pipeline being built.
939         GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
940
941         // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
942         // the contained audio streams in order to know the duration of the frames.
943         // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
944         m_parser = createOptionalParserForFormat(currentSrcPad.get());
945         if (m_parser) {
946             gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
947             gst_element_sync_state_with_parent(m_parser.get());
948
949             GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
950             GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
951
952             gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
953             currentSrcPad = parserSrcPad;
954         }
955
956         gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
957
958         gst_element_sync_state_with_parent(m_appsink.get());
959
960         gst_element_set_state(m_pipeline.get(), GST_STATE_PAUSED);
961         gst_element_sync_state_with_parent(m_appsink.get());
962
963         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
964     }
965 }
966
967 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
968 {
969     ASSERT(WTF::isMainThread());
970     GST_DEBUG("Connecting to appsink");
971
972     const String& type = m_sourceBufferPrivate->type().containerType();
973     if (type.endsWith("webm"))
974         gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
975
976     LockHolder locker(m_padAddRemoveLock);
977     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
978
979     // Only one stream per demuxer is supported.
980     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
981
982     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
983
984     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
985         m_padAddRemoveCondition.notifyOne();
986         return;
987     }
988
989 #ifndef GST_DISABLE_GST_DEBUG
990     {
991         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
992         GST_DEBUG("%s", strcaps.get());
993     }
994 #endif
995
996     if (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime())
997         m_mediaSourceClient->durationChanged(m_initialDuration);
998
999     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
1000
1001     switch (m_streamType) {
1002     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
1003         if (m_playerPrivate)
1004             m_track = WebCore::AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
1005         break;
1006     case WebCore::MediaSourceStreamTypeGStreamer::Video:
1007         if (m_playerPrivate)
1008             m_track = WebCore::VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
1009         break;
1010     case WebCore::MediaSourceStreamTypeGStreamer::Text:
1011         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
1012         break;
1013     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
1014         {
1015             GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1016             GST_DEBUG("Unsupported track codec: %s", strcaps.get());
1017         }
1018         // This is going to cause an error which will detach the SourceBuffer and tear down this
1019         // AppendPipeline, so we need the padAddRemove lock released before continuing.
1020         m_track = nullptr;
1021         m_padAddRemoveCondition.notifyOne();
1022         locker.unlockEarly();
1023         didReceiveInitializationSegment();
1024         return;
1025     default:
1026         // No useful data.
1027         break;
1028     }
1029
1030     m_appsinkCaps = WTFMove(caps);
1031     if (m_playerPrivate)
1032         m_playerPrivate->trackDetected(this, m_track, true);
1033
1034     m_padAddRemoveCondition.notifyOne();
1035 }
1036
1037 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
1038 {
1039     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
1040
1041     GST_DEBUG("Disconnecting appsink");
1042
1043     if (m_parser) {
1044         gst_element_set_state(m_parser.get(), GST_STATE_NULL);
1045         gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
1046         m_parser = nullptr;
1047     }
1048
1049     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
1050 }
1051
1052 void AppendPipeline::appendPipelineDemuxerNoMorePadsFromAnyThread()
1053 {
1054     GST_TRACE("appendPipelineDemuxerNoMorePadsFromAnyThread");
1055     GstStructure* structure = gst_structure_new_empty("demuxer-no-more-pads");
1056     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
1057     gst_bus_post(m_bus.get(), message);
1058     GST_TRACE("appendPipelineDemuxerNoMorePadsFromAnyThread - posted to bus");
1059 }
1060
1061 static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
1062 {
1063     GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
1064     GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
1065     gst_bus_post(appendPipeline->bus(), message);
1066     GST_TRACE("appsink-caps-changed message posted to bus");
1067 }
1068
1069 #if !LOG_DISABLED
1070 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
1071 {
1072     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1073     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1074     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
1075     return GST_PAD_PROBE_OK;
1076 }
1077 #endif
1078
1079 #if ENABLE(ENCRYPTED_MEDIA)
1080 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation *padProbeInformation)
1081 {
1082     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
1083     GstEvent* event = gst_pad_probe_info_get_event(info);
1084     GST_DEBUG("Handling event %s on append pipeline appsinkPad", GST_EVENT_TYPE_NAME(event));
1085     WebCore::AppendPipeline* appendPipeline = padProbeInformation->appendPipeline;
1086
1087     switch (GST_EVENT_TYPE(event)) {
1088     case GST_EVENT_PROTECTION:
1089         if (appendPipeline && appendPipeline->playerPrivate())
1090             appendPipeline->playerPrivate()->handleProtectionEvent(event);
1091         return GST_PAD_PROBE_DROP;
1092     default:
1093         break;
1094     }
1095
1096     return GST_PAD_PROBE_OK;
1097 }
1098 #endif
1099
1100 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
1101 {
1102     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1103     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1104     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
1105     return GST_PAD_PROBE_DROP;
1106 }
1107
1108 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1109 {
1110     appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
1111 }
1112
1113 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1114 {
1115     appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
1116 }
1117
1118 static void appendPipelineDemuxerNoMorePads(GstElement*, AppendPipeline* appendPipeline)
1119 {
1120     appendPipeline->appendPipelineDemuxerNoMorePadsFromAnyThread();
1121 }
1122
1123 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
1124 {
1125     return appendPipeline->handleNewAppsinkSample(appsink);
1126 }
1127
1128 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
1129 {
1130     if (WTF::isMainThread())
1131         appendPipeline->appsinkEOS();
1132     else {
1133         GstStructure* structure = gst_structure_new_empty("appsink-eos");
1134         GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
1135         gst_bus_post(appendPipeline->bus(), message);
1136         GST_TRACE("appsink-eos message posted to bus");
1137     }
1138
1139     GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
1140 }
1141
1142 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
1143 {
1144     // matroskademux sets GstSegment.start to the PTS of the first frame.
1145     //
1146     // This way in the unlikely case a user made a .mkv or .webm file where a certain portion of the movie is skipped
1147     // (e.g. by concatenating a MSE initialization segment with any MSE media segment other than the first) and opened
1148     // it with a regular player, playback would start immediately. GstSegment.duration is not modified in any case.
1149     //
1150     // Leaving the usefulness of that feature aside, the fact that it uses GstSegment.start is problematic for MSE.
1151     // In MSE is not unusual to process unordered MSE media segments. In this case, a frame may have
1152     // PTS <<< GstSegment.start and be discarded by downstream. This happens for instance in elements derived from
1153     // audiobasefilter, such as opusparse.
1154     //
1155     // This probe remedies the problem by setting GstSegment.start to 0 in all cases, not only when the PTS of the first
1156     // frame is zero.
1157
1158     ASSERT(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
1159     GstEvent* event = static_cast<GstEvent*>(info->data);
1160     if (event->type == GST_EVENT_SEGMENT) {
1161         GstSegment segment;
1162         gst_event_copy_segment(event, &segment);
1163
1164         segment.start = 0;
1165
1166         GRefPtr<GstEvent> newEvent = adoptGRef(gst_event_new_segment(&segment));
1167         gst_event_replace(reinterpret_cast<GstEvent**>(&info->data), newEvent.get());
1168     }
1169     return GST_PAD_PROBE_OK;
1170 }
1171
1172 } // namespace WebCore.
1173
1174 #endif // USE(GSTREAMER)