[GStreamer][MSE] Trim space between codecs
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016, 2017 Metrological Group B.V.
3  * Copyright (C) 2016, 2017 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GRefPtrGStreamer.h"
28 #include "GStreamerEMEUtilities.h"
29 #include "GStreamerMediaDescription.h"
30 #include "GStreamerMediaSample.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35 #include <gst/app/gstappsink.h>
36 #include <gst/app/gstappsrc.h>
37 #include <gst/gst.h>
38 #include <gst/pbutils/pbutils.h>
39 #include <gst/video/video.h>
40 #include <wtf/Condition.h>
41 #include <wtf/glib/GLibUtilities.h>
42 #include <wtf/glib/RunLoopSourcePriority.h>
43
44 GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
45 #define GST_CAT_DEFAULT webkit_mse_debug
46
47 namespace WebCore {
48
49 static const char* dumpAppendState(AppendPipeline::AppendState appendState)
50 {
51     switch (appendState) {
52     case AppendPipeline::AppendState::Invalid:
53         return "Invalid";
54     case AppendPipeline::AppendState::NotStarted:
55         return "NotStarted";
56     case AppendPipeline::AppendState::Ongoing:
57         return "Ongoing";
58     case AppendPipeline::AppendState::KeyNegotiation:
59         return "KeyNegotiation";
60     case AppendPipeline::AppendState::DataStarve:
61         return "DataStarve";
62     case AppendPipeline::AppendState::Sampling:
63         return "Sampling";
64     case AppendPipeline::AppendState::LastSample:
65         return "LastSample";
66     case AppendPipeline::AppendState::Aborting:
67         return "Aborting";
68     default:
69         return "(unknown)";
70     }
71 }
72
73 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);
74 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
75 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
76 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
77 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);
78 #if !LOG_DISABLED
79 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
80 #endif
81 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
82 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
83 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
84
85 static void appendPipelineNeedContextMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
86 {
87     GST_TRACE("received callback");
88     appendPipeline->handleNeedContextSyncMessage(message);
89 }
90
91 static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
92 {
93     appendPipeline->handleApplicationMessage(message);
94 }
95
96 #if ENABLE(ENCRYPTED_MEDIA)
97 static void appendPipelineElementMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
98 {
99     appendPipeline->handleElementMessage(message);
100 }
101 #endif
102
103 static void appendPipelineStateChangeMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
104 {
105     appendPipeline->handleStateChangeMessage(message);
106 }
107
108 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
109     : m_mediaSourceClient(mediaSourceClient.get())
110     , m_sourceBufferPrivate(sourceBufferPrivate.get())
111     , m_playerPrivate(&playerPrivate)
112     , m_id(0)
113     , m_appsrcAtLeastABufferLeft(false)
114     , m_appsrcNeedDataReceived(false)
115     , m_appsrcDataLeavingProbeId(0)
116     , m_appendState(AppendState::NotStarted)
117     , m_abortPending(false)
118     , m_streamType(Unknown)
119 {
120     ASSERT(WTF::isMainThread());
121
122     GST_TRACE("Creating AppendPipeline (%p)", this);
123
124     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
125     // The track name is still unknown at this time, though.
126     m_pipeline = gst_pipeline_new(nullptr);
127
128     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
129     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
130     gst_bus_enable_sync_message_emission(m_bus.get());
131
132     g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(appendPipelineNeedContextMessageCallback), this);
133     g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
134 #if ENABLE(ENCRYPTED_MEDIA)
135     g_signal_connect(m_bus.get(), "message::element", G_CALLBACK(appendPipelineElementMessageCallback), this);
136 #endif
137     g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(appendPipelineStateChangeMessageCallback), this);
138
139     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
140     // below will already take the initial reference and we need an additional one for us.
141     m_appsrc = gst_element_factory_make("appsrc", nullptr);
142
143     const String& type = m_sourceBufferPrivate->type().containerType();
144     if (type.endsWith("mp4"))
145         m_demux = gst_element_factory_make("qtdemux", nullptr);
146     else if (type.endsWith("webm"))
147         m_demux = gst_element_factory_make("matroskademux", nullptr);
148     else
149         ASSERT_NOT_REACHED();
150
151     m_appsink = gst_element_factory_make("appsink", nullptr);
152
153     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
154     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
155
156     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
157     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
158
159     setAppsrcDataLeavingProbe();
160
161 #if !LOG_DISABLED
162     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
163     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
164     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
165     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
166     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
167     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
168     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
169 #endif
170
171     // These signals won't be connected outside of the lifetime of "this".
172     g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);
173     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
174     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
175     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
176     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
177
178     // Add_many will take ownership of a reference. That's why we used an assignment before.
179     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
180     gst_element_link(m_appsrc.get(), m_demux.get());
181
182     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
183 };
184
185 AppendPipeline::~AppendPipeline()
186 {
187     ASSERT(WTF::isMainThread());
188
189     {
190         LockHolder locker(m_newSampleLock);
191         setAppendState(AppendState::Invalid);
192         m_newSampleCondition.notifyOne();
193     }
194
195     {
196         LockHolder locker(m_padAddRemoveLock);
197         m_playerPrivate = nullptr;
198         m_padAddRemoveCondition.notifyOne();
199     }
200
201     GST_TRACE("Destroying AppendPipeline (%p)", this);
202
203     // FIXME: Maybe notify appendComplete here?
204
205     if (m_pipeline) {
206         ASSERT(m_bus);
207         g_signal_handlers_disconnect_by_func(m_bus.get(), reinterpret_cast<gpointer>(appendPipelineNeedContextMessageCallback), this);
208         gst_bus_disable_sync_message_emission(m_bus.get());
209         gst_bus_remove_signal_watch(m_bus.get());
210         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
211         m_pipeline = nullptr;
212     }
213
214     if (m_appsrc) {
215         removeAppsrcDataLeavingProbe();
216         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
217         m_appsrc = nullptr;
218     }
219
220     if (m_demux) {
221 #if !LOG_DISABLED
222         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
223         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
224 #endif
225
226         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
227         m_demux = nullptr;
228     }
229
230     if (m_appsink) {
231         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
232         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
233         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
234
235 #if !LOG_DISABLED
236         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
237 #endif
238
239         m_appsink = nullptr;
240     }
241
242     m_appsinkCaps = nullptr;
243     m_demuxerSrcPadCaps = nullptr;
244 };
245
246 void AppendPipeline::clearPlayerPrivate()
247 {
248     ASSERT(WTF::isMainThread());
249     GST_DEBUG("cleaning private player");
250
251     {
252         LockHolder locker(m_newSampleLock);
253         // Make sure that AppendPipeline won't process more data from now on and
254         // instruct handleNewSample to abort itself from now on as well.
255         setAppendState(AppendState::Invalid);
256
257         // Awake any pending handleNewSample operation in the streaming thread.
258         m_newSampleCondition.notifyOne();
259     }
260
261     {
262         LockHolder locker(m_padAddRemoveLock);
263         m_playerPrivate = nullptr;
264         m_padAddRemoveCondition.notifyOne();
265     }
266
267     // And now that no handleNewSample operations will remain stalled waiting
268     // for the main thread, stop the pipeline.
269     if (m_pipeline)
270         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
271 }
272
273 void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
274 {
275     const gchar* contextType = nullptr;
276     gst_message_parse_context_type(message, &contextType);
277     GST_TRACE("context type: %s", contextType);
278     if (!g_strcmp0(contextType, "drm-preferred-decryption-system-id")
279         && m_appendState != AppendPipeline::AppendState::KeyNegotiation)
280         setAppendState(AppendPipeline::AppendState::KeyNegotiation);
281
282     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
283     if (m_playerPrivate)
284         m_playerPrivate->handleSyncMessage(message);
285 }
286
287 void AppendPipeline::handleApplicationMessage(GstMessage* message)
288 {
289     ASSERT(WTF::isMainThread());
290
291     const GstStructure* structure = gst_message_get_structure(message);
292
293     if (gst_structure_has_name(structure, "appsrc-need-data")) {
294         handleAppsrcNeedDataReceived();
295         return;
296     }
297
298     if (gst_structure_has_name(structure, "appsrc-buffer-left")) {
299         handleAppsrcAtLeastABufferLeft();
300         return;
301     }
302
303     if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
304         GRefPtr<GstPad> demuxerSrcPad;
305         gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
306         ASSERT(demuxerSrcPad);
307         connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
308         return;
309     }
310
311     if (gst_structure_has_name(structure, "appsink-caps-changed")) {
312         appsinkCapsChanged();
313         return;
314     }
315
316     if (gst_structure_has_name(structure, "appsink-new-sample")) {
317         GRefPtr<GstSample> newSample;
318         gst_structure_get(structure, "new-sample", GST_TYPE_SAMPLE, &newSample.outPtr(), nullptr);
319
320         appsinkNewSample(newSample.get());
321         return;
322     }
323
324     if (gst_structure_has_name(structure, "appsink-eos")) {
325         appsinkEOS();
326         return;
327     }
328
329     ASSERT_NOT_REACHED();
330 }
331
332 #if ENABLE(ENCRYPTED_MEDIA)
333 void AppendPipeline::handleElementMessage(GstMessage* message)
334 {
335     ASSERT(WTF::isMainThread());
336
337     const GstStructure* structure = gst_message_get_structure(message);
338     GST_TRACE("%s message from %s", gst_structure_get_name(structure), GST_MESSAGE_SRC_NAME(message));
339     if (m_playerPrivate && gst_structure_has_name(structure, "drm-key-needed")) {
340         if (m_appendState != AppendPipeline::AppendState::KeyNegotiation)
341             setAppendState(AppendPipeline::AppendState::KeyNegotiation);
342
343         GST_DEBUG("sending drm-key-needed message from %s to the player", GST_MESSAGE_SRC_NAME(message));
344         GRefPtr<GstEvent> event;
345         gst_structure_get(structure, "event", GST_TYPE_EVENT, &event.outPtr(), nullptr);
346         m_playerPrivate->handleProtectionEvent(event.get());
347     }
348 }
349 #endif
350
351 void AppendPipeline::handleStateChangeMessage(GstMessage* message)
352 {
353     ASSERT(WTF::isMainThread());
354
355     if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
356         GstState currentState, newState;
357         gst_message_parse_state_changed(message, &currentState, &newState, nullptr);
358         CString sourceBufferType = String(m_sourceBufferPrivate->type().raw())
359             .replace("/", "_").replace(" ", "_")
360             .replace("\"", "").replace("\'", "").utf8();
361         CString dotFileName = String::format("webkit-append-%s-%s_%s",
362             sourceBufferType.data(),
363             gst_element_state_get_name(currentState),
364             gst_element_state_get_name(newState)).utf8();
365         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data());
366     }
367 }
368
369 void AppendPipeline::handleAppsrcNeedDataReceived()
370 {
371     if (!m_appsrcAtLeastABufferLeft) {
372         GST_TRACE("discarding until at least a buffer leaves appsrc");
373         return;
374     }
375
376     ASSERT(m_appendState == AppendState::KeyNegotiation || m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);
377     ASSERT(!m_appsrcNeedDataReceived);
378
379     GST_TRACE("received need-data from appsrc");
380
381     m_appsrcNeedDataReceived = true;
382     checkEndOfAppend();
383 }
384
385 void AppendPipeline::handleAppsrcAtLeastABufferLeft()
386 {
387     m_appsrcAtLeastABufferLeft = true;
388     GST_TRACE("received buffer-left from appsrc");
389 #if LOG_DISABLED
390     removeAppsrcDataLeavingProbe();
391 #endif
392 }
393
394 gint AppendPipeline::id()
395 {
396     ASSERT(WTF::isMainThread());
397
398     if (m_id)
399         return m_id;
400
401     static gint s_totalAudio = 0;
402     static gint s_totalVideo = 0;
403     static gint s_totalText = 0;
404
405     switch (m_streamType) {
406     case Audio:
407         m_id = ++s_totalAudio;
408         break;
409     case Video:
410         m_id = ++s_totalVideo;
411         break;
412     case Text:
413         m_id = ++s_totalText;
414         break;
415     case Unknown:
416     case Invalid:
417         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
418         ASSERT_NOT_REACHED();
419         break;
420     }
421
422     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
423
424     return m_id;
425 }
426
427 void AppendPipeline::setAppendState(AppendState newAppendState)
428 {
429     ASSERT(WTF::isMainThread());
430     // Valid transitions:
431     // NotStarted-->Ongoing-->DataStarve-->NotStarted
432     //           |         |            `->Aborting-->NotStarted
433     //           |         `->Sampling-ยทยทยท->Sampling-->LastSample-->NotStarted
434     //           |         |                                     `->Aborting-->NotStarted
435     //           |         `->KeyNegotiation-->Ongoing-->[...]
436     //           `->Aborting-->NotStarted
437     AppendState oldAppendState = m_appendState;
438     AppendState nextAppendState = AppendState::Invalid;
439
440     if (oldAppendState != newAppendState)
441         GST_TRACE("%s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
442
443     bool ok = false;
444
445     switch (oldAppendState) {
446     case AppendState::NotStarted:
447         switch (newAppendState) {
448         case AppendState::Ongoing:
449             ok = true;
450             gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
451             break;
452         case AppendState::NotStarted:
453             ok = true;
454             if (m_pendingBuffer) {
455                 GST_TRACE("pushing pending buffer %p", m_pendingBuffer.get());
456                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
457                 nextAppendState = AppendState::Ongoing;
458             }
459             break;
460         case AppendState::Aborting:
461             ok = true;
462             nextAppendState = AppendState::NotStarted;
463             break;
464         case AppendState::Invalid:
465             ok = true;
466             break;
467         default:
468             break;
469         }
470         break;
471     case AppendState::KeyNegotiation:
472         switch (newAppendState) {
473         case AppendState::Ongoing:
474         case AppendState::Invalid:
475             ok = true;
476             break;
477         default:
478             break;
479         }
480         break;
481     case AppendState::Ongoing:
482         switch (newAppendState) {
483         case AppendState::KeyNegotiation:
484         case AppendState::Sampling:
485         case AppendState::Invalid:
486             ok = true;
487             break;
488         case AppendState::DataStarve:
489             ok = true;
490             GST_DEBUG("received all pending samples");
491             m_sourceBufferPrivate->didReceiveAllPendingSamples();
492             if (m_abortPending)
493                 nextAppendState = AppendState::Aborting;
494             else
495                 nextAppendState = AppendState::NotStarted;
496             break;
497         default:
498             break;
499         }
500         break;
501     case AppendState::DataStarve:
502         switch (newAppendState) {
503         case AppendState::NotStarted:
504         case AppendState::Invalid:
505             ok = true;
506             break;
507         case AppendState::Aborting:
508             ok = true;
509             nextAppendState = AppendState::NotStarted;
510             break;
511         default:
512             break;
513         }
514         break;
515     case AppendState::Sampling:
516         switch (newAppendState) {
517         case AppendState::Sampling:
518         case AppendState::Invalid:
519             ok = true;
520             break;
521         case AppendState::LastSample:
522             ok = true;
523             GST_DEBUG("received all pending samples");
524             m_sourceBufferPrivate->didReceiveAllPendingSamples();
525             if (m_abortPending)
526                 nextAppendState = AppendState::Aborting;
527             else
528                 nextAppendState = AppendState::NotStarted;
529             break;
530         default:
531             break;
532         }
533         break;
534     case AppendState::LastSample:
535         switch (newAppendState) {
536         case AppendState::NotStarted:
537         case AppendState::Invalid:
538             ok = true;
539             break;
540         case AppendState::Aborting:
541             ok = true;
542             nextAppendState = AppendState::NotStarted;
543             break;
544         default:
545             break;
546         }
547         break;
548     case AppendState::Aborting:
549         switch (newAppendState) {
550         case AppendState::NotStarted:
551             ok = true;
552             resetPipeline();
553             m_abortPending = false;
554             nextAppendState = AppendState::NotStarted;
555             break;
556         case AppendState::Invalid:
557             ok = true;
558             break;
559         default:
560             break;
561         }
562         break;
563     case AppendState::Invalid:
564         ok = true;
565         break;
566     }
567
568     if (ok)
569         m_appendState = newAppendState;
570     else
571         GST_ERROR("Invalid append state transition %s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
572
573     ASSERT(ok);
574
575     if (nextAppendState != AppendState::Invalid)
576         setAppendState(nextAppendState);
577 }
578
579 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
580 {
581     ASSERT(WTF::isMainThread());
582
583     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
584     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
585
586     GstStructure* structure = gst_caps_get_structure(m_demuxerSrcPadCaps.get(), 0);
587     bool sizeConfigured = false;
588
589 #if GST_CHECK_VERSION(1, 5, 3) && ENABLE(ENCRYPTED_MEDIA)
590     if (gst_structure_has_name(structure, "application/x-cenc")) {
591         // Any previous decryptor should have been removed from the pipeline by disconnectFromAppSinkFromStreamingThread()
592         ASSERT(!m_decryptor);
593
594         m_decryptor = GStreamerEMEUtilities::createDecryptor(gst_structure_get_string(structure, "protection-system"));
595         if (!m_decryptor) {
596             GST_ERROR("decryptor not found for caps: %" GST_PTR_FORMAT, m_demuxerSrcPadCaps.get());
597             return;
598         }
599
600         const gchar* originalMediaType = gst_structure_get_string(structure, "original-media-type");
601
602         if (!MediaPlayerPrivateGStreamerMSE::supportsCodec(originalMediaType)) {
603             m_presentationSize = WebCore::FloatSize();
604             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
605         } else if (g_str_has_prefix(originalMediaType, "video/")) {
606             int width = 0;
607             int height = 0;
608             float finalHeight = 0;
609
610             if (gst_structure_get_int(structure, "width", &width) && gst_structure_get_int(structure, "height", &height)) {
611                 int ratioNumerator = 1;
612                 int ratioDenominator = 1;
613
614                 gst_structure_get_fraction(structure, "pixel-aspect-ratio", &ratioNumerator, &ratioDenominator);
615                 finalHeight = height * ((float) ratioDenominator / (float) ratioNumerator);
616             }
617
618             m_presentationSize = WebCore::FloatSize(width, finalHeight);
619             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
620         } else {
621             m_presentationSize = WebCore::FloatSize();
622             if (g_str_has_prefix(originalMediaType, "audio/"))
623                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
624             else if (g_str_has_prefix(originalMediaType, "text/"))
625                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
626         }
627         sizeConfigured = true;
628     }
629 #endif
630
631     if (!sizeConfigured) {
632         const char* structureName = gst_structure_get_name(structure);
633         GstVideoInfo info;
634
635         if (!MediaPlayerPrivateGStreamerMSE::supportsCodec(structureName)) {
636             m_presentationSize = WebCore::FloatSize();
637             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
638         } else if (g_str_has_prefix(structureName, "video/") && gst_video_info_from_caps(&info, demuxerSrcPadCaps)) {
639             float width, height;
640
641             width = info.width;
642             height = info.height * ((float) info.par_d / (float) info.par_n);
643
644             m_presentationSize = WebCore::FloatSize(width, height);
645             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
646         } else {
647             m_presentationSize = WebCore::FloatSize();
648             if (g_str_has_prefix(structureName, "audio/"))
649                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
650             else if (g_str_has_prefix(structureName, "text/"))
651                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
652         }
653     }
654 }
655
656 void AppendPipeline::appsinkCapsChanged()
657 {
658     ASSERT(WTF::isMainThread());
659
660     if (!m_appsink)
661         return;
662
663     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
664     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
665
666     if (!caps)
667         return;
668
669     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
670     bool previousCapsWereNull = !m_appsinkCaps;
671
672     if (m_appsinkCaps != caps) {
673         m_appsinkCaps = WTFMove(caps);
674         if (m_playerPrivate)
675             m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull);
676         didReceiveInitializationSegment();
677         gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
678     }
679 }
680
681 void AppendPipeline::checkEndOfAppend()
682 {
683     ASSERT(WTF::isMainThread());
684
685     if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling))
686         return;
687
688     GST_TRACE("end of append data mark was received");
689
690     switch (m_appendState) {
691     case AppendState::Ongoing:
692         GST_TRACE("DataStarve");
693         m_appsrcNeedDataReceived = false;
694         setAppendState(AppendState::DataStarve);
695         break;
696     case AppendState::Sampling:
697         GST_TRACE("LastSample");
698         m_appsrcNeedDataReceived = false;
699         setAppendState(AppendState::LastSample);
700         break;
701     default:
702         ASSERT_NOT_REACHED();
703         break;
704     }
705 }
706
707 void AppendPipeline::appsinkNewSample(GstSample* sample)
708 {
709     ASSERT(WTF::isMainThread());
710
711     {
712         LockHolder locker(m_newSampleLock);
713
714         // If we were in KeyNegotiation but samples are coming, assume we're already OnGoing
715         if (m_appendState == AppendState::KeyNegotiation)
716             setAppendState(AppendState::Ongoing);
717
718         // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
719         if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
720             GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
721             // FIXME: Return ERROR and find a more robust way to detect that all the
722             // data has been processed, so we don't need to resort to these hacks.
723             // All in all, return OK, even if it's not the proper thing to do. We don't want to break the demuxer.
724             m_flowReturn = GST_FLOW_OK;
725             m_newSampleCondition.notifyOne();
726             return;
727         }
728
729         RefPtr<GStreamerMediaSample> mediaSample = WebCore::GStreamerMediaSample::create(sample, m_presentationSize, trackId());
730
731         GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
732             mediaSample->trackID().string().utf8().data(),
733             mediaSample->presentationTime().toString().utf8().data(),
734             mediaSample->decodeTime().toString().utf8().data(),
735             mediaSample->duration().toString().utf8().data(),
736             mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
737
738         // If we're beyond the duration, ignore this sample and the remaining ones.
739         MediaTime duration = m_mediaSourceClient->duration();
740         if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
741             GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
742             setAppendState(AppendState::LastSample);
743             m_flowReturn = GST_FLOW_OK;
744             m_newSampleCondition.notifyOne();
745             return;
746         }
747
748         // Add a gap sample if a gap is detected before the first sample.
749         if (mediaSample->decodeTime() == MediaTime::zeroTime()
750             && mediaSample->presentationTime() > MediaTime::zeroTime()
751             && mediaSample->presentationTime() <= MediaTime(1, 10)) {
752             GST_DEBUG("Adding gap offset");
753             mediaSample->applyPtsOffset(MediaTime::zeroTime());
754         }
755
756         m_sourceBufferPrivate->didReceiveSample(*mediaSample);
757         setAppendState(AppendState::Sampling);
758         m_flowReturn = GST_FLOW_OK;
759         m_newSampleCondition.notifyOne();
760     }
761
762     checkEndOfAppend();
763 }
764
765 void AppendPipeline::appsinkEOS()
766 {
767     ASSERT(WTF::isMainThread());
768
769     switch (m_appendState) {
770     case AppendState::Aborting:
771         // Ignored. Operation completion will be managed by the Aborting->NotStarted transition.
772         return;
773     case AppendState::Ongoing:
774         // Finish Ongoing and Sampling states.
775         setAppendState(AppendState::DataStarve);
776         break;
777     case AppendState::Sampling:
778         setAppendState(AppendState::LastSample);
779         break;
780     default:
781         GST_DEBUG("Unexpected EOS");
782         break;
783     }
784 }
785
786 void AppendPipeline::didReceiveInitializationSegment()
787 {
788     ASSERT(WTF::isMainThread());
789
790     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
791
792     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
793     initializationSegment.duration = m_mediaSourceClient->duration();
794
795     switch (m_streamType) {
796     case Audio: {
797         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
798         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
799         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
800         initializationSegment.audioTracks.append(info);
801         break;
802     }
803     case Video: {
804         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
805         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
806         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
807         initializationSegment.videoTracks.append(info);
808         break;
809     }
810     default:
811         GST_ERROR("Unsupported stream type or codec");
812         break;
813     }
814
815     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
816 }
817
818 AtomicString AppendPipeline::trackId()
819 {
820     ASSERT(WTF::isMainThread());
821
822     if (!m_track)
823         return AtomicString();
824
825     return m_track->id();
826 }
827
828 void AppendPipeline::resetPipeline()
829 {
830     ASSERT(WTF::isMainThread());
831     GST_DEBUG("resetting pipeline");
832     m_appsrcAtLeastABufferLeft = false;
833     setAppsrcDataLeavingProbe();
834
835     {
836         LockHolder locker(m_newSampleLock);
837         m_newSampleCondition.notifyOne();
838         gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
839         gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
840     }
841
842 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
843     {
844         static unsigned i = 0;
845         // This is here for debugging purposes. It does not make sense to have it as class member.
846         WTF::String  dotFileName = String::format("reset-pipeline-%d", ++i);
847         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
848     }
849 #endif
850
851 }
852
853 void AppendPipeline::setAppsrcDataLeavingProbe()
854 {
855     if (m_appsrcDataLeavingProbeId)
856         return;
857
858     GST_TRACE("setting appsrc data leaving probe");
859
860     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
861     m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);
862 }
863
864 void AppendPipeline::removeAppsrcDataLeavingProbe()
865 {
866     if (!m_appsrcDataLeavingProbeId)
867         return;
868
869     GST_TRACE("removing appsrc data leaving probe");
870
871     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
872     gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);
873     m_appsrcDataLeavingProbeId = 0;
874 }
875
876 void AppendPipeline::abort()
877 {
878     ASSERT(WTF::isMainThread());
879     GST_DEBUG("aborting");
880
881     m_pendingBuffer = nullptr;
882
883     // Abort already ongoing.
884     if (m_abortPending)
885         return;
886
887     m_abortPending = true;
888     if (m_appendState == AppendState::NotStarted)
889         setAppendState(AppendState::Aborting);
890     // Else, the automatic state transitions will take care when the ongoing append finishes.
891 }
892
893 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
894 {
895     GstFlowReturn result;
896
897     if (m_abortPending) {
898         m_pendingBuffer = adoptGRef(buffer);
899         result = GST_FLOW_OK;
900     } else {
901         setAppendState(AppendPipeline::AppendState::Ongoing);
902         GST_TRACE("pushing new buffer %p", buffer);
903         result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer);
904     }
905
906     return result;
907 }
908
909 void AppendPipeline::reportAppsrcAtLeastABufferLeft()
910 {
911     GST_TRACE("buffer left appsrc, reposting to bus");
912     GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left");
913     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
914     gst_bus_post(m_bus.get(), message);
915 }
916
917 void AppendPipeline::reportAppsrcNeedDataReceived()
918 {
919     GST_TRACE("received need-data signal at appsrc, reposting to bus");
920     GstStructure* structure = gst_structure_new_empty("appsrc-need-data");
921     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
922     gst_bus_post(m_bus.get(), message);
923 }
924
925 GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
926 {
927     ASSERT(!WTF::isMainThread());
928
929     // Even if we're disabled, it's important to pull the sample out anyway to
930     // avoid deadlocks when changing to GST_STATE_NULL having a non empty appsink.
931     GRefPtr<GstSample> sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(appsink)));
932     LockHolder locker(m_newSampleLock);
933
934     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
935         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
936         return GST_FLOW_ERROR;
937     }
938
939     GstStructure* structure = gst_structure_new("appsink-new-sample", "new-sample", GST_TYPE_SAMPLE, sample.get(), nullptr);
940     GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
941     gst_bus_post(m_bus.get(), message);
942     GST_TRACE("appsink-new-sample message posted to bus");
943
944     m_newSampleCondition.wait(m_newSampleLock);
945     // We've been awaken because the sample was processed or because of
946     // an exceptional condition (entered in Invalid state, destructor, etc.).
947     // We can't reliably delete info here, appendPipelineAppsinkNewSampleMainThread will do it.
948
949     return m_flowReturn;
950 }
951
952 static GRefPtr<GstElement>
953 createOptionalParserForFormat(GstPad* demuxerSrcPad)
954 {
955     GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
956     GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
957     const char* mediaType = gst_structure_get_name(structure);
958
959     GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
960     GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
961
962     if (!g_strcmp0(mediaType, "audio/x-opus")) {
963         GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get());
964         RELEASE_ASSERT(opusparse);
965         return GRefPtr<GstElement>(opusparse);
966     }
967     if (!g_strcmp0(mediaType, "audio/x-vorbis")) {
968         GstElement* vorbisparse = gst_element_factory_make("vorbisparse", parserName.get());
969         RELEASE_ASSERT(vorbisparse);
970         return GRefPtr<GstElement>(vorbisparse);
971     }
972
973     return nullptr;
974 }
975
976 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
977 {
978     if (!m_appsink)
979         return;
980
981     GST_DEBUG("connecting to appsink");
982
983     if (m_demux->numsrcpads > 1) {
984         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
985         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
986         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
987         return;
988     }
989
990     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
991
992     // Only one stream per demuxer is supported.
993     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
994
995     gint64 timeLength = 0;
996     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
997         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
998         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
999     else
1000         m_initialDuration = MediaTime::positiveInfiniteTime();
1001
1002     if (WTF::isMainThread())
1003         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
1004     else {
1005         // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
1006         LockHolder locker(m_padAddRemoveLock);
1007         if (!m_playerPrivate)
1008             return;
1009
1010         GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
1011         GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
1012         gst_bus_post(m_bus.get(), message);
1013         GST_TRACE("demuxer-connect-to-appsink message posted to bus");
1014
1015         m_padAddRemoveCondition.wait(m_padAddRemoveLock);
1016
1017         if (!m_playerPrivate)
1018             return;
1019     }
1020
1021     // Must be done in the thread we were called from (usually streaming thread).
1022     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
1023         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
1024         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
1025
1026     if (isData) {
1027         // FIXME: Only add appsink one time. This method can be called several times.
1028         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
1029         if (!parent)
1030             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
1031
1032         // Current head of the pipeline being built.
1033         GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
1034
1035         // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
1036         // the contained audio streams in order to know the duration of the frames.
1037         // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
1038         m_parser = createOptionalParserForFormat(currentSrcPad.get());
1039         if (m_parser) {
1040             gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
1041             gst_element_sync_state_with_parent(m_parser.get());
1042
1043             GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
1044             GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
1045
1046             gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
1047             currentSrcPad = parserSrcPad;
1048         }
1049
1050 #if ENABLE(ENCRYPTED_MEDIA)
1051         if (m_decryptor) {
1052             gst_object_ref(m_decryptor.get());
1053             gst_bin_add(GST_BIN(m_pipeline.get()), m_decryptor.get());
1054             gst_element_sync_state_with_parent(m_decryptor.get());
1055
1056             GRefPtr<GstPad> decryptorSinkPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "sink"));
1057             GRefPtr<GstPad> decryptorSrcPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "src"));
1058
1059             gst_pad_link(currentSrcPad.get(), decryptorSinkPad.get());
1060             currentSrcPad = decryptorSrcPad;
1061         }
1062 #endif
1063
1064         gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
1065
1066         gst_element_sync_state_with_parent(m_appsink.get());
1067
1068 #if ENABLE(ENCRYPTED_MEDIA)
1069         if (m_pendingDecryptionStructure)
1070             dispatchPendingDecryptionStructure();
1071 #endif
1072         gst_element_set_state(m_pipeline.get(), GST_STATE_PAUSED);
1073         gst_element_sync_state_with_parent(m_appsink.get());
1074
1075         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
1076     }
1077 }
1078
1079 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
1080 {
1081     ASSERT(WTF::isMainThread());
1082     GST_DEBUG("Connecting to appsink");
1083
1084     LockHolder locker(m_padAddRemoveLock);
1085     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
1086
1087     // Only one stream per demuxer is supported.
1088     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
1089
1090     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
1091
1092     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
1093         m_padAddRemoveCondition.notifyOne();
1094         return;
1095     }
1096
1097 #ifndef GST_DISABLE_GST_DEBUG
1098     {
1099         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1100         GST_DEBUG("%s", strcaps.get());
1101     }
1102 #endif
1103
1104     if (m_initialDuration > m_mediaSourceClient->duration()
1105         || (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime()))
1106         m_mediaSourceClient->durationChanged(m_initialDuration);
1107
1108     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
1109
1110     switch (m_streamType) {
1111     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
1112         if (m_playerPrivate)
1113             m_track = WebCore::AudioTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
1114         break;
1115     case WebCore::MediaSourceStreamTypeGStreamer::Video:
1116         if (m_playerPrivate)
1117             m_track = WebCore::VideoTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
1118         break;
1119     case WebCore::MediaSourceStreamTypeGStreamer::Text:
1120         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
1121         break;
1122     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
1123         {
1124             GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1125             GST_DEBUG("Unsupported track codec: %s", strcaps.get());
1126         }
1127         // This is going to cause an error which will detach the SourceBuffer and tear down this
1128         // AppendPipeline, so we need the padAddRemove lock released before continuing.
1129         m_track = nullptr;
1130         m_padAddRemoveCondition.notifyOne();
1131         locker.unlockEarly();
1132         didReceiveInitializationSegment();
1133         return;
1134     default:
1135         // No useful data, but notify anyway to complete the append operation.
1136         GST_DEBUG("Received all pending samples (no data)");
1137         m_sourceBufferPrivate->didReceiveAllPendingSamples();
1138         break;
1139     }
1140
1141     m_padAddRemoveCondition.notifyOne();
1142 }
1143
1144 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
1145 {
1146     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
1147
1148     GST_DEBUG("Disconnecting appsink");
1149
1150 #if ENABLE(ENCRYPTED_MEDIA)
1151     if (m_decryptor) {
1152         gst_element_set_state(m_decryptor.get(), GST_STATE_NULL);
1153         gst_bin_remove(GST_BIN(m_pipeline.get()), m_decryptor.get());
1154         m_decryptor = nullptr;
1155     }
1156 #endif
1157
1158     if (m_parser) {
1159         gst_element_set_state(m_parser.get(), GST_STATE_NULL);
1160         gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
1161         m_parser = nullptr;
1162     }
1163
1164     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
1165 }
1166
1167 #if ENABLE(ENCRYPTED_MEDIA)
1168 void AppendPipeline::dispatchPendingDecryptionStructure()
1169 {
1170     ASSERT(m_decryptor);
1171     ASSERT(m_pendingDecryptionStructure);
1172     ASSERT(m_appendState == AppendState::KeyNegotiation);
1173     GST_TRACE("dispatching key to append pipeline %p", this);
1174
1175     // Release the m_pendingDecryptionStructure object since
1176     // gst_event_new_custom() takes over ownership of it.
1177     gst_element_send_event(m_pipeline.get(), gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_OOB, m_pendingDecryptionStructure.release()));
1178
1179     setAppendState(AppendState::Ongoing);
1180 }
1181
1182 void AppendPipeline::dispatchDecryptionStructure(GUniquePtr<GstStructure>&& structure)
1183 {
1184     if (m_appendState == AppendState::KeyNegotiation) {
1185         GST_TRACE("append pipeline %p in key negotiation", this);
1186         m_pendingDecryptionStructure = WTFMove(structure);
1187         if (m_decryptor)
1188             dispatchPendingDecryptionStructure();
1189         else
1190             GST_TRACE("no decryptor yet, waiting for it");
1191     } else
1192         GST_TRACE("append pipeline %p not in key negotiation", this);
1193 }
1194 #endif
1195
1196 static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
1197 {
1198     GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
1199     GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
1200     gst_bus_post(appendPipeline->bus(), message);
1201     GST_TRACE("appsink-caps-changed message posted to bus");
1202 }
1203
1204 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)
1205 {
1206     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1207
1208     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1209     gsize bufferSize = gst_buffer_get_size(buffer);
1210
1211     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);
1212
1213     appendPipeline->reportAppsrcAtLeastABufferLeft();
1214
1215     return GST_PAD_PROBE_OK;
1216 }
1217
1218 #if !LOG_DISABLED
1219 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
1220 {
1221     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1222     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1223     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
1224     return GST_PAD_PROBE_OK;
1225 }
1226 #endif
1227
1228 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
1229 {
1230     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1231     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1232     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
1233     return GST_PAD_PROBE_DROP;
1234 }
1235
1236 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)
1237 {
1238     appendPipeline->reportAppsrcNeedDataReceived();
1239 }
1240
1241 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1242 {
1243     appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
1244 }
1245
1246 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1247 {
1248     appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
1249 }
1250
1251 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
1252 {
1253     return appendPipeline->handleNewAppsinkSample(appsink);
1254 }
1255
1256 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
1257 {
1258     if (WTF::isMainThread())
1259         appendPipeline->appsinkEOS();
1260     else {
1261         GstStructure* structure = gst_structure_new_empty("appsink-eos");
1262         GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
1263         gst_bus_post(appendPipeline->bus(), message);
1264         GST_TRACE("appsink-eos message posted to bus");
1265     }
1266
1267     GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
1268 }
1269
1270
1271
1272 } // namespace WebCore.
1273
1274 #endif // USE(GSTREAMER)