Unreviewed, rolling out r234489.
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016, 2017 Metrological Group B.V.
3  * Copyright (C) 2016, 2017 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GStreamerCommon.h"
28 #include "GStreamerEMEUtilities.h"
29 #include "GStreamerMediaDescription.h"
30 #include "MediaSampleGStreamer.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35 #include <gst/app/gstappsink.h>
36 #include <gst/app/gstappsrc.h>
37 #include <gst/gst.h>
38 #include <gst/pbutils/pbutils.h>
39 #include <gst/video/video.h>
40 #include <wtf/Condition.h>
41 #include <wtf/glib/GLibUtilities.h>
42 #include <wtf/glib/RunLoopSourcePriority.h>
43
44 GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
45 #define GST_CAT_DEFAULT webkit_mse_debug
46
47 namespace WebCore {
48
49 static const char* dumpAppendState(AppendPipeline::AppendState appendState)
50 {
51     switch (appendState) {
52     case AppendPipeline::AppendState::Invalid:
53         return "Invalid";
54     case AppendPipeline::AppendState::NotStarted:
55         return "NotStarted";
56     case AppendPipeline::AppendState::Ongoing:
57         return "Ongoing";
58     case AppendPipeline::AppendState::DataStarve:
59         return "DataStarve";
60     case AppendPipeline::AppendState::Sampling:
61         return "Sampling";
62     case AppendPipeline::AppendState::LastSample:
63         return "LastSample";
64     case AppendPipeline::AppendState::Aborting:
65         return "Aborting";
66     default:
67         return "(unknown)";
68     }
69 }
70
71 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);
72 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
73 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
74 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
75 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);
76 #if !LOG_DISABLED
77 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
78 #endif
79
80 #if ENABLE(ENCRYPTED_MEDIA)
81 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
82 #endif
83
84 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
85 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
86 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
87
88 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
89
90 static void appendPipelineNeedContextMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
91 {
92     GST_TRACE("received callback");
93     appendPipeline->handleNeedContextSyncMessage(message);
94 }
95
96 static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
97 {
98     appendPipeline->handleApplicationMessage(message);
99 }
100
101 static void appendPipelineStateChangeMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
102 {
103     appendPipeline->handleStateChangeMessage(message);
104 }
105
106 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
107     : m_mediaSourceClient(mediaSourceClient.get())
108     , m_sourceBufferPrivate(sourceBufferPrivate.get())
109     , m_playerPrivate(&playerPrivate)
110     , m_id(0)
111     , m_appsrcAtLeastABufferLeft(false)
112     , m_appsrcNeedDataReceived(false)
113     , m_appsrcDataLeavingProbeId(0)
114     , m_appendState(AppendState::NotStarted)
115     , m_abortPending(false)
116     , m_streamType(Unknown)
117 {
118     ASSERT(WTF::isMainThread());
119
120     GST_TRACE("Creating AppendPipeline (%p)", this);
121
122     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
123     // The track name is still unknown at this time, though.
124     m_pipeline = gst_pipeline_new(nullptr);
125
126     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
127     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
128     gst_bus_enable_sync_message_emission(m_bus.get());
129
130     g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(appendPipelineNeedContextMessageCallback), this);
131     g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
132     g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(appendPipelineStateChangeMessageCallback), this);
133
134     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
135     // below will already take the initial reference and we need an additional one for us.
136     m_appsrc = gst_element_factory_make("appsrc", nullptr);
137
138     const String& type = m_sourceBufferPrivate->type().containerType();
139     if (type.endsWith("mp4"))
140         m_demux = gst_element_factory_make("qtdemux", nullptr);
141     else if (type.endsWith("webm"))
142         m_demux = gst_element_factory_make("matroskademux", nullptr);
143     else
144         ASSERT_NOT_REACHED();
145
146     m_appsink = gst_element_factory_make("appsink", nullptr);
147
148     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
149     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
150
151     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
152     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
153
154     setAppsrcDataLeavingProbe();
155
156 #if !LOG_DISABLED
157     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
158     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
159     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
160     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
161     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
162     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
163     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
164 #endif
165
166 #if ENABLE(ENCRYPTED_MEDIA)
167     m_appsinkPadEventProbeInformation.appendPipeline = this;
168     m_appsinkPadEventProbeInformation.description = "appsink event probe";
169     m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr);
170 #endif
171
172     // These signals won't be connected outside of the lifetime of "this".
173     g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);
174     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
175     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
176     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
177     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
178
179     // Add_many will take ownership of a reference. That's why we used an assignment before.
180     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
181     gst_element_link(m_appsrc.get(), m_demux.get());
182
183     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
184 };
185
186 AppendPipeline::~AppendPipeline()
187 {
188     ASSERT(WTF::isMainThread());
189
190     {
191         LockHolder locker(m_newSampleLock);
192         setAppendState(AppendState::Invalid);
193         m_newSampleCondition.notifyOne();
194     }
195
196     {
197         LockHolder locker(m_padAddRemoveLock);
198         m_playerPrivate = nullptr;
199         m_padAddRemoveCondition.notifyOne();
200     }
201
202     GST_TRACE("Destroying AppendPipeline (%p)", this);
203
204     // FIXME: Maybe notify appendComplete here?
205
206     if (m_pipeline) {
207         ASSERT(m_bus);
208         g_signal_handlers_disconnect_by_func(m_bus.get(), reinterpret_cast<gpointer>(appendPipelineNeedContextMessageCallback), this);
209         gst_bus_disable_sync_message_emission(m_bus.get());
210         gst_bus_remove_signal_watch(m_bus.get());
211         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
212         m_pipeline = nullptr;
213     }
214
215     if (m_appsrc) {
216         removeAppsrcDataLeavingProbe();
217         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
218         m_appsrc = nullptr;
219     }
220
221     if (m_demux) {
222 #if !LOG_DISABLED
223         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
224         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
225 #endif
226
227         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
228         m_demux = nullptr;
229     }
230
231     if (m_appsink) {
232         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
233         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
234         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
235
236 #if !LOG_DISABLED
237         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
238 #endif
239
240 #if ENABLE(ENCRYPTED_MEDIA)
241         gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId);
242 #endif
243         m_appsink = nullptr;
244     }
245
246     m_appsinkCaps = nullptr;
247     m_demuxerSrcPadCaps = nullptr;
248 };
249
250 void AppendPipeline::clearPlayerPrivate()
251 {
252     ASSERT(WTF::isMainThread());
253     GST_DEBUG("cleaning private player");
254
255     {
256         LockHolder locker(m_newSampleLock);
257         // Make sure that AppendPipeline won't process more data from now on and
258         // instruct handleNewSample to abort itself from now on as well.
259         setAppendState(AppendState::Invalid);
260
261         // Awake any pending handleNewSample operation in the streaming thread.
262         m_newSampleCondition.notifyOne();
263     }
264
265     {
266         LockHolder locker(m_padAddRemoveLock);
267         m_playerPrivate = nullptr;
268         m_padAddRemoveCondition.notifyOne();
269     }
270
271     // And now that no handleNewSample operations will remain stalled waiting
272     // for the main thread, stop the pipeline.
273     if (m_pipeline)
274         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
275 }
276
277 void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
278 {
279     const gchar* contextType = nullptr;
280     gst_message_parse_context_type(message, &contextType);
281     GST_TRACE("context type: %s", contextType);
282
283     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
284     if (m_playerPrivate)
285         m_playerPrivate->handleSyncMessage(message);
286 }
287
288 void AppendPipeline::handleApplicationMessage(GstMessage* message)
289 {
290     ASSERT(WTF::isMainThread());
291
292     const GstStructure* structure = gst_message_get_structure(message);
293
294     if (gst_structure_has_name(structure, "appsrc-need-data")) {
295         handleAppsrcNeedDataReceived();
296         return;
297     }
298
299     if (gst_structure_has_name(structure, "appsrc-buffer-left")) {
300         handleAppsrcAtLeastABufferLeft();
301         return;
302     }
303
304     if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
305         GRefPtr<GstPad> demuxerSrcPad;
306         gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
307         ASSERT(demuxerSrcPad);
308         connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
309         return;
310     }
311
312     if (gst_structure_has_name(structure, "appsink-caps-changed")) {
313         appsinkCapsChanged();
314         return;
315     }
316
317     if (gst_structure_has_name(structure, "appsink-new-sample")) {
318         GRefPtr<GstSample> newSample;
319         gst_structure_get(structure, "new-sample", GST_TYPE_SAMPLE, &newSample.outPtr(), nullptr);
320
321         appsinkNewSample(newSample.get());
322         return;
323     }
324
325     if (gst_structure_has_name(structure, "appsink-eos")) {
326         appsinkEOS();
327         return;
328     }
329
330     ASSERT_NOT_REACHED();
331 }
332
333 void AppendPipeline::handleStateChangeMessage(GstMessage* message)
334 {
335     ASSERT(WTF::isMainThread());
336
337     if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
338         GstState currentState, newState;
339         gst_message_parse_state_changed(message, &currentState, &newState, nullptr);
340         CString sourceBufferType = String(m_sourceBufferPrivate->type().raw())
341             .replace("/", "_").replace(" ", "_")
342             .replace("\"", "").replace("\'", "").utf8();
343         CString dotFileName = String::format("webkit-append-%s-%s_%s",
344             sourceBufferType.data(),
345             gst_element_state_get_name(currentState),
346             gst_element_state_get_name(newState)).utf8();
347         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data());
348     }
349 }
350
351 void AppendPipeline::handleAppsrcNeedDataReceived()
352 {
353     if (!m_appsrcAtLeastABufferLeft) {
354         GST_TRACE("discarding until at least a buffer leaves appsrc");
355         return;
356     }
357
358     ASSERT(m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);
359     ASSERT(!m_appsrcNeedDataReceived);
360
361     GST_TRACE("received need-data from appsrc");
362
363     m_appsrcNeedDataReceived = true;
364     checkEndOfAppend();
365 }
366
367 void AppendPipeline::handleAppsrcAtLeastABufferLeft()
368 {
369     m_appsrcAtLeastABufferLeft = true;
370     GST_TRACE("received buffer-left from appsrc");
371 #if LOG_DISABLED
372     removeAppsrcDataLeavingProbe();
373 #endif
374 }
375
376 gint AppendPipeline::id()
377 {
378     ASSERT(WTF::isMainThread());
379
380     if (m_id)
381         return m_id;
382
383     static gint s_totalAudio = 0;
384     static gint s_totalVideo = 0;
385     static gint s_totalText = 0;
386
387     switch (m_streamType) {
388     case Audio:
389         m_id = ++s_totalAudio;
390         break;
391     case Video:
392         m_id = ++s_totalVideo;
393         break;
394     case Text:
395         m_id = ++s_totalText;
396         break;
397     case Unknown:
398     case Invalid:
399         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
400         ASSERT_NOT_REACHED();
401         break;
402     }
403
404     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
405
406     return m_id;
407 }
408
409 void AppendPipeline::setAppendState(AppendState newAppendState)
410 {
411     ASSERT(WTF::isMainThread());
412     // Valid transitions:
413     // NotStarted-->Ongoing-->DataStarve-->NotStarted
414     //           |         |            `->Aborting-->NotStarted
415     //           |         `->Sampling-ยทยทยท->Sampling-->LastSample-->NotStarted
416     //           |                                               `->Aborting-->NotStarted
417     //           `->Aborting-->NotStarted
418     AppendState oldAppendState = m_appendState;
419     AppendState nextAppendState = AppendState::Invalid;
420
421     if (oldAppendState != newAppendState)
422         GST_TRACE("%s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
423
424     bool ok = false;
425
426     switch (oldAppendState) {
427     case AppendState::NotStarted:
428         switch (newAppendState) {
429         case AppendState::Ongoing:
430             ok = true;
431             gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
432             break;
433         case AppendState::NotStarted:
434             ok = true;
435             if (m_pendingBuffer) {
436                 GST_TRACE("pushing pending buffer %p", m_pendingBuffer.get());
437                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
438                 nextAppendState = AppendState::Ongoing;
439             }
440             break;
441         case AppendState::Aborting:
442             ok = true;
443             nextAppendState = AppendState::NotStarted;
444             break;
445         case AppendState::Invalid:
446             ok = true;
447             break;
448         default:
449             break;
450         }
451         break;
452     case AppendState::Ongoing:
453         switch (newAppendState) {
454         case AppendState::Sampling:
455         case AppendState::Invalid:
456             ok = true;
457             break;
458         case AppendState::DataStarve:
459             ok = true;
460             GST_DEBUG("received all pending samples");
461             m_sourceBufferPrivate->didReceiveAllPendingSamples();
462             if (m_abortPending)
463                 nextAppendState = AppendState::Aborting;
464             else
465                 nextAppendState = AppendState::NotStarted;
466             break;
467         default:
468             break;
469         }
470         break;
471     case AppendState::DataStarve:
472         switch (newAppendState) {
473         case AppendState::NotStarted:
474         case AppendState::Invalid:
475             ok = true;
476             break;
477         case AppendState::Aborting:
478             ok = true;
479             nextAppendState = AppendState::NotStarted;
480             break;
481         default:
482             break;
483         }
484         break;
485     case AppendState::Sampling:
486         switch (newAppendState) {
487         case AppendState::Sampling:
488         case AppendState::Invalid:
489             ok = true;
490             break;
491         case AppendState::LastSample:
492             ok = true;
493             GST_DEBUG("received all pending samples");
494             m_sourceBufferPrivate->didReceiveAllPendingSamples();
495             if (m_abortPending)
496                 nextAppendState = AppendState::Aborting;
497             else
498                 nextAppendState = AppendState::NotStarted;
499             break;
500         default:
501             break;
502         }
503         break;
504     case AppendState::LastSample:
505         switch (newAppendState) {
506         case AppendState::NotStarted:
507         case AppendState::Invalid:
508             ok = true;
509             break;
510         case AppendState::Aborting:
511             ok = true;
512             nextAppendState = AppendState::NotStarted;
513             break;
514         default:
515             break;
516         }
517         break;
518     case AppendState::Aborting:
519         switch (newAppendState) {
520         case AppendState::NotStarted:
521             ok = true;
522             resetPipeline();
523             m_abortPending = false;
524             nextAppendState = AppendState::NotStarted;
525             break;
526         case AppendState::Invalid:
527             ok = true;
528             break;
529         default:
530             break;
531         }
532         break;
533     case AppendState::Invalid:
534         ok = true;
535         break;
536     }
537
538     if (ok)
539         m_appendState = newAppendState;
540     else
541         GST_ERROR("Invalid append state transition %s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
542
543     ASSERT(ok);
544
545     if (nextAppendState != AppendState::Invalid)
546         setAppendState(nextAppendState);
547 }
548
549 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
550 {
551     ASSERT(WTF::isMainThread());
552
553     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
554     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
555
556     const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get());
557     if (!MediaPlayerPrivateGStreamerMSE::supportsCodec(originalMediaType)) {
558             m_presentationSize = WebCore::FloatSize();
559             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
560     } else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
561         std::optional<FloatSize> size = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get());
562         if (size.has_value())
563             m_presentationSize = size.value();
564         else
565             m_presentationSize = WebCore::FloatSize();
566
567         m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
568     } else {
569         m_presentationSize = WebCore::FloatSize();
570         if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX))
571             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
572         else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX))
573             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
574     }
575 }
576
577 void AppendPipeline::appsinkCapsChanged()
578 {
579     ASSERT(WTF::isMainThread());
580
581     if (!m_appsink)
582         return;
583
584     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
585     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
586
587     if (!caps)
588         return;
589
590     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
591     bool previousCapsWereNull = !m_appsinkCaps;
592
593     if (m_appsinkCaps != caps) {
594         m_appsinkCaps = WTFMove(caps);
595         if (m_playerPrivate)
596             m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull);
597         didReceiveInitializationSegment();
598         gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
599     }
600 }
601
602 void AppendPipeline::checkEndOfAppend()
603 {
604     ASSERT(WTF::isMainThread());
605
606     if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling))
607         return;
608
609     GST_TRACE("end of append data mark was received");
610
611     switch (m_appendState) {
612     case AppendState::Ongoing:
613         GST_TRACE("DataStarve");
614         m_appsrcNeedDataReceived = false;
615         setAppendState(AppendState::DataStarve);
616         break;
617     case AppendState::Sampling:
618         GST_TRACE("LastSample");
619         m_appsrcNeedDataReceived = false;
620         setAppendState(AppendState::LastSample);
621         break;
622     default:
623         ASSERT_NOT_REACHED();
624         break;
625     }
626 }
627
628 void AppendPipeline::appsinkNewSample(GstSample* sample)
629 {
630     ASSERT(WTF::isMainThread());
631
632     {
633         LockHolder locker(m_newSampleLock);
634
635         // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
636         if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
637             GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
638             // FIXME: Return ERROR and find a more robust way to detect that all the
639             // data has been processed, so we don't need to resort to these hacks.
640             // All in all, return OK, even if it's not the proper thing to do. We don't want to break the demuxer.
641             m_flowReturn = GST_FLOW_OK;
642             m_newSampleCondition.notifyOne();
643             return;
644         }
645
646         RefPtr<MediaSampleGStreamer> mediaSample = WebCore::MediaSampleGStreamer::create(sample, m_presentationSize, trackId());
647
648         GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
649             mediaSample->trackID().string().utf8().data(),
650             mediaSample->presentationTime().toString().utf8().data(),
651             mediaSample->decodeTime().toString().utf8().data(),
652             mediaSample->duration().toString().utf8().data(),
653             mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
654
655         // If we're beyond the duration, ignore this sample and the remaining ones.
656         MediaTime duration = m_mediaSourceClient->duration();
657         if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
658             GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
659             setAppendState(AppendState::LastSample);
660             m_flowReturn = GST_FLOW_OK;
661             m_newSampleCondition.notifyOne();
662             return;
663         }
664
665         // Add a gap sample if a gap is detected before the first sample.
666         if (mediaSample->decodeTime() == MediaTime::zeroTime()
667             && mediaSample->presentationTime() > MediaTime::zeroTime()
668             && mediaSample->presentationTime() <= MediaTime(1, 10)) {
669             GST_DEBUG("Adding gap offset");
670             mediaSample->applyPtsOffset(MediaTime::zeroTime());
671         }
672
673         m_sourceBufferPrivate->didReceiveSample(*mediaSample);
674         setAppendState(AppendState::Sampling);
675         m_flowReturn = GST_FLOW_OK;
676         m_newSampleCondition.notifyOne();
677     }
678
679     checkEndOfAppend();
680 }
681
682 void AppendPipeline::appsinkEOS()
683 {
684     ASSERT(WTF::isMainThread());
685
686     switch (m_appendState) {
687     case AppendState::Aborting:
688         // Ignored. Operation completion will be managed by the Aborting->NotStarted transition.
689         return;
690     case AppendState::Ongoing:
691         // Finish Ongoing and Sampling states.
692         setAppendState(AppendState::DataStarve);
693         break;
694     case AppendState::Sampling:
695         setAppendState(AppendState::LastSample);
696         break;
697     default:
698         GST_DEBUG("Unexpected EOS");
699         break;
700     }
701 }
702
703 void AppendPipeline::didReceiveInitializationSegment()
704 {
705     ASSERT(WTF::isMainThread());
706
707     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
708
709     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
710     initializationSegment.duration = m_mediaSourceClient->duration();
711
712     switch (m_streamType) {
713     case Audio: {
714         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
715         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
716         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
717         initializationSegment.audioTracks.append(info);
718         break;
719     }
720     case Video: {
721         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
722         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
723         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
724         initializationSegment.videoTracks.append(info);
725         break;
726     }
727     default:
728         GST_ERROR("Unsupported stream type or codec");
729         break;
730     }
731
732     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
733 }
734
735 AtomicString AppendPipeline::trackId()
736 {
737     ASSERT(WTF::isMainThread());
738
739     if (!m_track)
740         return AtomicString();
741
742     return m_track->id();
743 }
744
745 void AppendPipeline::resetPipeline()
746 {
747     ASSERT(WTF::isMainThread());
748     GST_DEBUG("resetting pipeline");
749     m_appsrcAtLeastABufferLeft = false;
750     setAppsrcDataLeavingProbe();
751
752     {
753         LockHolder locker(m_newSampleLock);
754         m_newSampleCondition.notifyOne();
755         gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
756         gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
757     }
758
759 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
760     {
761         static unsigned i = 0;
762         // This is here for debugging purposes. It does not make sense to have it as class member.
763         WTF::String  dotFileName = String::format("reset-pipeline-%d", ++i);
764         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
765     }
766 #endif
767
768 }
769
770 void AppendPipeline::setAppsrcDataLeavingProbe()
771 {
772     if (m_appsrcDataLeavingProbeId)
773         return;
774
775     GST_TRACE("setting appsrc data leaving probe");
776
777     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
778     m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);
779 }
780
781 void AppendPipeline::removeAppsrcDataLeavingProbe()
782 {
783     if (!m_appsrcDataLeavingProbeId)
784         return;
785
786     GST_TRACE("removing appsrc data leaving probe");
787
788     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
789     gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);
790     m_appsrcDataLeavingProbeId = 0;
791 }
792
793 void AppendPipeline::abort()
794 {
795     ASSERT(WTF::isMainThread());
796     GST_DEBUG("aborting");
797
798     m_pendingBuffer = nullptr;
799
800     // Abort already ongoing.
801     if (m_abortPending)
802         return;
803
804     m_abortPending = true;
805     if (m_appendState == AppendState::NotStarted)
806         setAppendState(AppendState::Aborting);
807     // Else, the automatic state transitions will take care when the ongoing append finishes.
808 }
809
810 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
811 {
812     GstFlowReturn result;
813
814     if (m_abortPending) {
815         m_pendingBuffer = adoptGRef(buffer);
816         result = GST_FLOW_OK;
817     } else {
818         setAppendState(AppendPipeline::AppendState::Ongoing);
819         GST_TRACE("pushing new buffer %p", buffer);
820         result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer);
821     }
822
823     return result;
824 }
825
826 void AppendPipeline::reportAppsrcAtLeastABufferLeft()
827 {
828     GST_TRACE("buffer left appsrc, reposting to bus");
829     GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left");
830     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
831     gst_bus_post(m_bus.get(), message);
832 }
833
834 void AppendPipeline::reportAppsrcNeedDataReceived()
835 {
836     GST_TRACE("received need-data signal at appsrc, reposting to bus");
837     GstStructure* structure = gst_structure_new_empty("appsrc-need-data");
838     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
839     gst_bus_post(m_bus.get(), message);
840 }
841
842 GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
843 {
844     ASSERT(!WTF::isMainThread());
845
846     // Even if we're disabled, it's important to pull the sample out anyway to
847     // avoid deadlocks when changing to GST_STATE_NULL having a non empty appsink.
848     GRefPtr<GstSample> sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(appsink)));
849     LockHolder locker(m_newSampleLock);
850
851     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
852         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
853         return GST_FLOW_ERROR;
854     }
855
856     GstStructure* structure = gst_structure_new("appsink-new-sample", "new-sample", GST_TYPE_SAMPLE, sample.get(), nullptr);
857     GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
858     gst_bus_post(m_bus.get(), message);
859     GST_TRACE("appsink-new-sample message posted to bus");
860
861     m_newSampleCondition.wait(m_newSampleLock);
862     // We've been awaken because the sample was processed or because of
863     // an exceptional condition (entered in Invalid state, destructor, etc.).
864     // We can't reliably delete info here, appendPipelineAppsinkNewSampleMainThread will do it.
865
866     return m_flowReturn;
867 }
868
869 static GRefPtr<GstElement>
870 createOptionalParserForFormat(GstPad* demuxerSrcPad)
871 {
872     GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
873     GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
874     const char* mediaType = gst_structure_get_name(structure);
875
876     GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
877     GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
878
879     if (!g_strcmp0(mediaType, "audio/x-opus")) {
880         GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get());
881         RELEASE_ASSERT(opusparse);
882         return GRefPtr<GstElement>(opusparse);
883     }
884     if (!g_strcmp0(mediaType, "audio/x-vorbis")) {
885         GstElement* vorbisparse = gst_element_factory_make("vorbisparse", parserName.get());
886         RELEASE_ASSERT(vorbisparse);
887         return GRefPtr<GstElement>(vorbisparse);
888     }
889
890     return nullptr;
891 }
892
893 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
894 {
895     if (!m_appsink)
896         return;
897
898     GST_DEBUG("connecting to appsink");
899
900     if (m_demux->numsrcpads > 1) {
901         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
902         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
903         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
904         return;
905     }
906
907     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
908
909     // Only one stream per demuxer is supported.
910     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
911
912     gint64 timeLength = 0;
913     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
914         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
915         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
916     else
917         m_initialDuration = MediaTime::positiveInfiniteTime();
918
919     if (WTF::isMainThread())
920         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
921     else {
922         // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
923         LockHolder locker(m_padAddRemoveLock);
924         if (!m_playerPrivate)
925             return;
926
927         GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
928         GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
929         gst_bus_post(m_bus.get(), message);
930         GST_TRACE("demuxer-connect-to-appsink message posted to bus");
931
932         m_padAddRemoveCondition.wait(m_padAddRemoveLock);
933
934         if (!m_playerPrivate)
935             return;
936     }
937
938     // Must be done in the thread we were called from (usually streaming thread).
939     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
940         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
941         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
942
943     if (isData) {
944         // FIXME: Only add appsink one time. This method can be called several times.
945         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
946         if (!parent)
947             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
948
949         // Current head of the pipeline being built.
950         GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
951
952         // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
953         // the contained audio streams in order to know the duration of the frames.
954         // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
955         m_parser = createOptionalParserForFormat(currentSrcPad.get());
956         if (m_parser) {
957             gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
958             gst_element_sync_state_with_parent(m_parser.get());
959
960             GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
961             GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
962
963             gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
964             currentSrcPad = parserSrcPad;
965         }
966
967         gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
968
969         gst_element_sync_state_with_parent(m_appsink.get());
970
971         gst_element_set_state(m_pipeline.get(), GST_STATE_PAUSED);
972         gst_element_sync_state_with_parent(m_appsink.get());
973
974         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
975     }
976 }
977
978 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
979 {
980     ASSERT(WTF::isMainThread());
981     GST_DEBUG("Connecting to appsink");
982
983     const String& type = m_sourceBufferPrivate->type().containerType();
984     if (type.endsWith("webm"))
985         gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
986
987     LockHolder locker(m_padAddRemoveLock);
988     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
989
990     // Only one stream per demuxer is supported.
991     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
992
993     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
994
995     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
996         m_padAddRemoveCondition.notifyOne();
997         return;
998     }
999
1000 #ifndef GST_DISABLE_GST_DEBUG
1001     {
1002         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1003         GST_DEBUG("%s", strcaps.get());
1004     }
1005 #endif
1006
1007     if (m_initialDuration > m_mediaSourceClient->duration()
1008         || (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime()))
1009         m_mediaSourceClient->durationChanged(m_initialDuration);
1010
1011     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
1012
1013     switch (m_streamType) {
1014     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
1015         if (m_playerPrivate)
1016             m_track = WebCore::AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
1017         break;
1018     case WebCore::MediaSourceStreamTypeGStreamer::Video:
1019         if (m_playerPrivate)
1020             m_track = WebCore::VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
1021         break;
1022     case WebCore::MediaSourceStreamTypeGStreamer::Text:
1023         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
1024         break;
1025     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
1026         {
1027             GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1028             GST_DEBUG("Unsupported track codec: %s", strcaps.get());
1029         }
1030         // This is going to cause an error which will detach the SourceBuffer and tear down this
1031         // AppendPipeline, so we need the padAddRemove lock released before continuing.
1032         m_track = nullptr;
1033         m_padAddRemoveCondition.notifyOne();
1034         locker.unlockEarly();
1035         didReceiveInitializationSegment();
1036         return;
1037     default:
1038         // No useful data, but notify anyway to complete the append operation.
1039         GST_DEBUG("Received all pending samples (no data)");
1040         m_sourceBufferPrivate->didReceiveAllPendingSamples();
1041         break;
1042     }
1043
1044     m_padAddRemoveCondition.notifyOne();
1045 }
1046
1047 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
1048 {
1049     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
1050
1051     GST_DEBUG("Disconnecting appsink");
1052
1053     if (m_parser) {
1054         gst_element_set_state(m_parser.get(), GST_STATE_NULL);
1055         gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
1056         m_parser = nullptr;
1057     }
1058
1059     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
1060 }
1061
1062 static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
1063 {
1064     GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
1065     GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
1066     gst_bus_post(appendPipeline->bus(), message);
1067     GST_TRACE("appsink-caps-changed message posted to bus");
1068 }
1069
1070 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)
1071 {
1072     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1073
1074     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1075     gsize bufferSize = gst_buffer_get_size(buffer);
1076
1077     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);
1078
1079     appendPipeline->reportAppsrcAtLeastABufferLeft();
1080
1081     return GST_PAD_PROBE_OK;
1082 }
1083
1084 #if !LOG_DISABLED
1085 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
1086 {
1087     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1088     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1089     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
1090     return GST_PAD_PROBE_OK;
1091 }
1092 #endif
1093
1094 #if ENABLE(ENCRYPTED_MEDIA)
1095 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation *padProbeInformation)
1096 {
1097     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
1098     GstEvent* event = gst_pad_probe_info_get_event(info);
1099     GST_DEBUG("Handling event %s on append pipeline appsinkPad", GST_EVENT_TYPE_NAME(event));
1100     WebCore::AppendPipeline* appendPipeline = padProbeInformation->appendPipeline;
1101
1102     switch (GST_EVENT_TYPE(event)) {
1103     case GST_EVENT_PROTECTION:
1104         if (appendPipeline && appendPipeline->playerPrivate())
1105             appendPipeline->playerPrivate()->handleProtectionEvent(event);
1106         return GST_PAD_PROBE_DROP;
1107     default:
1108         break;
1109     }
1110
1111     return GST_PAD_PROBE_OK;
1112 }
1113 #endif
1114
1115 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
1116 {
1117     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1118     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1119     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
1120     return GST_PAD_PROBE_DROP;
1121 }
1122
1123 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)
1124 {
1125     appendPipeline->reportAppsrcNeedDataReceived();
1126 }
1127
1128 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1129 {
1130     appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
1131 }
1132
1133 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1134 {
1135     appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
1136 }
1137
1138 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
1139 {
1140     return appendPipeline->handleNewAppsinkSample(appsink);
1141 }
1142
1143 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
1144 {
1145     if (WTF::isMainThread())
1146         appendPipeline->appsinkEOS();
1147     else {
1148         GstStructure* structure = gst_structure_new_empty("appsink-eos");
1149         GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
1150         gst_bus_post(appendPipeline->bus(), message);
1151         GST_TRACE("appsink-eos message posted to bus");
1152     }
1153
1154     GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
1155 }
1156
1157 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
1158 {
1159     // matroskademux sets GstSegment.start to the PTS of the first frame.
1160     //
1161     // This way in the unlikely case a user made a .mkv or .webm file where a certain portion of the movie is skipped
1162     // (e.g. by concatenating a MSE initialization segment with any MSE media segment other than the first) and opened
1163     // it with a regular player, playback would start immediately. GstSegment.duration is not modified in any case.
1164     //
1165     // Leaving the usefulness of that feature aside, the fact that it uses GstSegment.start is problematic for MSE.
1166     // In MSE is not unusual to process unordered MSE media segments. In this case, a frame may have
1167     // PTS <<< GstSegment.start and be discarded by downstream. This happens for instance in elements derived from
1168     // audiobasefilter, such as opusparse.
1169     //
1170     // This probe remedies the problem by setting GstSegment.start to 0 in all cases, not only when the PTS of the first
1171     // frame is zero.
1172
1173     ASSERT(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
1174     GstEvent* event = static_cast<GstEvent*>(info->data);
1175     if (event->type == GST_EVENT_SEGMENT) {
1176         GstSegment segment;
1177         gst_event_copy_segment(event, &segment);
1178
1179         segment.start = 0;
1180
1181         GRefPtr<GstEvent> newEvent = adoptGRef(gst_event_new_segment(&segment));
1182         gst_event_replace(reinterpret_cast<GstEvent**>(&info->data), newEvent.get());
1183     }
1184     return GST_PAD_PROBE_OK;
1185 }
1186
1187 } // namespace WebCore.
1188
1189 #endif // USE(GSTREAMER)