[GStreamer] Refactor media player to use MediaTime consistently
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016, 2017 Metrological Group B.V.
3  * Copyright (C) 2016, 2017 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GRefPtrGStreamer.h"
28 #include "GStreamerEMEUtilities.h"
29 #include "GStreamerMediaDescription.h"
30 #include "GStreamerMediaSample.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35 #include <gst/app/gstappsink.h>
36 #include <gst/app/gstappsrc.h>
37 #include <gst/gst.h>
38 #include <gst/pbutils/pbutils.h>
39 #include <gst/video/video.h>
40 #include <wtf/Condition.h>
41 #include <wtf/glib/GLibUtilities.h>
42 #include <wtf/glib/RunLoopSourcePriority.h>
43
44 GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
45 #define GST_CAT_DEFAULT webkit_mse_debug
46
47 namespace WebCore {
48
49 static const char* dumpAppendState(AppendPipeline::AppendState appendState)
50 {
51     switch (appendState) {
52     case AppendPipeline::AppendState::Invalid:
53         return "Invalid";
54     case AppendPipeline::AppendState::NotStarted:
55         return "NotStarted";
56     case AppendPipeline::AppendState::Ongoing:
57         return "Ongoing";
58     case AppendPipeline::AppendState::KeyNegotiation:
59         return "KeyNegotiation";
60     case AppendPipeline::AppendState::DataStarve:
61         return "DataStarve";
62     case AppendPipeline::AppendState::Sampling:
63         return "Sampling";
64     case AppendPipeline::AppendState::LastSample:
65         return "LastSample";
66     case AppendPipeline::AppendState::Aborting:
67         return "Aborting";
68     default:
69         return "(unknown)";
70     }
71 }
72
73 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);
74 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
75 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
76 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
77 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);
78 #if !LOG_DISABLED
79 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
80 #endif
81 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
82 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
83 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
84
85 static void appendPipelineNeedContextMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
86 {
87     GST_TRACE("received callback");
88     appendPipeline->handleNeedContextSyncMessage(message);
89 }
90
91 static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
92 {
93     appendPipeline->handleApplicationMessage(message);
94 }
95
96 #if ENABLE(ENCRYPTED_MEDIA)
97 static void appendPipelineElementMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
98 {
99     appendPipeline->handleElementMessage(message);
100 }
101 #endif
102
103 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
104     : m_mediaSourceClient(mediaSourceClient.get())
105     , m_sourceBufferPrivate(sourceBufferPrivate.get())
106     , m_playerPrivate(&playerPrivate)
107     , m_id(0)
108     , m_appsrcAtLeastABufferLeft(false)
109     , m_appsrcNeedDataReceived(false)
110     , m_appsrcDataLeavingProbeId(0)
111     , m_appendState(AppendState::NotStarted)
112     , m_abortPending(false)
113     , m_streamType(Unknown)
114 {
115     ASSERT(WTF::isMainThread());
116
117     GST_TRACE("Creating AppendPipeline (%p)", this);
118
119     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
120     // The track name is still unknown at this time, though.
121     m_pipeline = gst_pipeline_new(nullptr);
122
123     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
124     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
125     gst_bus_enable_sync_message_emission(m_bus.get());
126
127     g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(appendPipelineNeedContextMessageCallback), this);
128     g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
129 #if ENABLE(ENCRYPTED_MEDIA)
130     g_signal_connect(m_bus.get(), "message::element", G_CALLBACK(appendPipelineElementMessageCallback), this);
131 #endif
132
133     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
134     // below will already take the initial reference and we need an additional one for us.
135     m_appsrc = gst_element_factory_make("appsrc", nullptr);
136
137     const String& type = m_sourceBufferPrivate->type().containerType();
138     if (type.endsWith("mp4"))
139         m_demux = gst_element_factory_make("qtdemux", nullptr);
140     else if (type.endsWith("webm"))
141         m_demux = gst_element_factory_make("matroskademux", nullptr);
142     else
143         ASSERT_NOT_REACHED();
144
145     m_appsink = gst_element_factory_make("appsink", nullptr);
146
147     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
148     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
149
150     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
151     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
152
153     setAppsrcDataLeavingProbe();
154
155 #if !LOG_DISABLED
156     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
157     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
158     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
159     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
160     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
161     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
162     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
163 #endif
164
165     // These signals won't be connected outside of the lifetime of "this".
166     g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);
167     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
168     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
169     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
170     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
171
172     // Add_many will take ownership of a reference. That's why we used an assignment before.
173     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
174     gst_element_link(m_appsrc.get(), m_demux.get());
175
176     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
177 };
178
179 AppendPipeline::~AppendPipeline()
180 {
181     ASSERT(WTF::isMainThread());
182
183     {
184         LockHolder locker(m_newSampleLock);
185         setAppendState(AppendState::Invalid);
186         m_newSampleCondition.notifyOne();
187     }
188
189     {
190         LockHolder locker(m_padAddRemoveLock);
191         m_playerPrivate = nullptr;
192         m_padAddRemoveCondition.notifyOne();
193     }
194
195     GST_TRACE("Destroying AppendPipeline (%p)", this);
196
197     // FIXME: Maybe notify appendComplete here?
198
199     if (m_pipeline) {
200         ASSERT(m_bus);
201         g_signal_handlers_disconnect_by_func(m_bus.get(), reinterpret_cast<gpointer>(appendPipelineNeedContextMessageCallback), this);
202         gst_bus_disable_sync_message_emission(m_bus.get());
203         gst_bus_remove_signal_watch(m_bus.get());
204         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
205         m_pipeline = nullptr;
206     }
207
208     if (m_appsrc) {
209         removeAppsrcDataLeavingProbe();
210         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
211         m_appsrc = nullptr;
212     }
213
214     if (m_demux) {
215 #if !LOG_DISABLED
216         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
217         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
218 #endif
219
220         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
221         m_demux = nullptr;
222     }
223
224     if (m_appsink) {
225         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
226         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
227         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
228
229 #if !LOG_DISABLED
230         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
231 #endif
232
233         m_appsink = nullptr;
234     }
235
236     m_appsinkCaps = nullptr;
237     m_demuxerSrcPadCaps = nullptr;
238 };
239
240 void AppendPipeline::clearPlayerPrivate()
241 {
242     ASSERT(WTF::isMainThread());
243     GST_DEBUG("cleaning private player");
244
245     {
246         LockHolder locker(m_newSampleLock);
247         // Make sure that AppendPipeline won't process more data from now on and
248         // instruct handleNewSample to abort itself from now on as well.
249         setAppendState(AppendState::Invalid);
250
251         // Awake any pending handleNewSample operation in the streaming thread.
252         m_newSampleCondition.notifyOne();
253     }
254
255     {
256         LockHolder locker(m_padAddRemoveLock);
257         m_playerPrivate = nullptr;
258         m_padAddRemoveCondition.notifyOne();
259     }
260
261     // And now that no handleNewSample operations will remain stalled waiting
262     // for the main thread, stop the pipeline.
263     if (m_pipeline)
264         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
265 }
266
267 void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
268 {
269     const gchar* contextType = nullptr;
270     gst_message_parse_context_type(message, &contextType);
271     GST_TRACE("context type: %s", contextType);
272     if (!g_strcmp0(contextType, "drm-preferred-decryption-system-id")
273         && m_appendState != AppendPipeline::AppendState::KeyNegotiation)
274         setAppendState(AppendPipeline::AppendState::KeyNegotiation);
275
276     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
277     if (m_playerPrivate)
278         m_playerPrivate->handleSyncMessage(message);
279 }
280
281 void AppendPipeline::handleApplicationMessage(GstMessage* message)
282 {
283     ASSERT(WTF::isMainThread());
284
285     const GstStructure* structure = gst_message_get_structure(message);
286
287     if (gst_structure_has_name(structure, "appsrc-need-data")) {
288         handleAppsrcNeedDataReceived();
289         return;
290     }
291
292     if (gst_structure_has_name(structure, "appsrc-buffer-left")) {
293         handleAppsrcAtLeastABufferLeft();
294         return;
295     }
296
297     if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
298         GRefPtr<GstPad> demuxerSrcPad;
299         gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
300         ASSERT(demuxerSrcPad);
301         connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
302         return;
303     }
304
305     if (gst_structure_has_name(structure, "appsink-caps-changed")) {
306         appsinkCapsChanged();
307         return;
308     }
309
310     if (gst_structure_has_name(structure, "appsink-new-sample")) {
311         GRefPtr<GstSample> newSample;
312         gst_structure_get(structure, "new-sample", GST_TYPE_SAMPLE, &newSample.outPtr(), nullptr);
313
314         appsinkNewSample(newSample.get());
315         return;
316     }
317
318     if (gst_structure_has_name(structure, "appsink-eos")) {
319         appsinkEOS();
320         return;
321     }
322
323     ASSERT_NOT_REACHED();
324 }
325
326 #if ENABLE(ENCRYPTED_MEDIA)
327 void AppendPipeline::handleElementMessage(GstMessage* message)
328 {
329     ASSERT(WTF::isMainThread());
330
331     const GstStructure* structure = gst_message_get_structure(message);
332     GST_TRACE("%s message from %s", gst_structure_get_name(structure), GST_MESSAGE_SRC_NAME(message));
333     if (m_playerPrivate && gst_structure_has_name(structure, "drm-key-needed")) {
334         if (m_appendState != AppendPipeline::AppendState::KeyNegotiation)
335             setAppendState(AppendPipeline::AppendState::KeyNegotiation);
336
337         GST_DEBUG("sending drm-key-needed message from %s to the player", GST_MESSAGE_SRC_NAME(message));
338         GRefPtr<GstEvent> event;
339         gst_structure_get(structure, "event", GST_TYPE_EVENT, &event.outPtr(), nullptr);
340         m_playerPrivate->handleProtectionEvent(event.get());
341     }
342 }
343 #endif
344
345 void AppendPipeline::handleAppsrcNeedDataReceived()
346 {
347     if (!m_appsrcAtLeastABufferLeft) {
348         GST_TRACE("discarding until at least a buffer leaves appsrc");
349         return;
350     }
351
352     ASSERT(m_appendState == AppendState::KeyNegotiation || m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);
353     ASSERT(!m_appsrcNeedDataReceived);
354
355     GST_TRACE("received need-data from appsrc");
356
357     m_appsrcNeedDataReceived = true;
358     checkEndOfAppend();
359 }
360
361 void AppendPipeline::handleAppsrcAtLeastABufferLeft()
362 {
363     m_appsrcAtLeastABufferLeft = true;
364     GST_TRACE("received buffer-left from appsrc");
365 #if LOG_DISABLED
366     removeAppsrcDataLeavingProbe();
367 #endif
368 }
369
370 gint AppendPipeline::id()
371 {
372     ASSERT(WTF::isMainThread());
373
374     if (m_id)
375         return m_id;
376
377     static gint s_totalAudio = 0;
378     static gint s_totalVideo = 0;
379     static gint s_totalText = 0;
380
381     switch (m_streamType) {
382     case Audio:
383         m_id = ++s_totalAudio;
384         break;
385     case Video:
386         m_id = ++s_totalVideo;
387         break;
388     case Text:
389         m_id = ++s_totalText;
390         break;
391     case Unknown:
392     case Invalid:
393         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
394         ASSERT_NOT_REACHED();
395         break;
396     }
397
398     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
399
400     return m_id;
401 }
402
403 void AppendPipeline::setAppendState(AppendState newAppendState)
404 {
405     ASSERT(WTF::isMainThread());
406     // Valid transitions:
407     // NotStarted-->Ongoing-->DataStarve-->NotStarted
408     //           |         |            `->Aborting-->NotStarted
409     //           |         `->Sampling-ยทยทยท->Sampling-->LastSample-->NotStarted
410     //           |         |                                     `->Aborting-->NotStarted
411     //           |         `->KeyNegotiation-->Ongoing-->[...]
412     //           `->Aborting-->NotStarted
413     AppendState oldAppendState = m_appendState;
414     AppendState nextAppendState = AppendState::Invalid;
415
416     if (oldAppendState != newAppendState)
417         GST_TRACE("%s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
418
419     bool ok = false;
420
421     switch (oldAppendState) {
422     case AppendState::NotStarted:
423         switch (newAppendState) {
424         case AppendState::Ongoing:
425             ok = true;
426             gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
427             break;
428         case AppendState::NotStarted:
429             ok = true;
430             if (m_pendingBuffer) {
431                 GST_TRACE("pushing pending buffer %p", m_pendingBuffer.get());
432                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
433                 nextAppendState = AppendState::Ongoing;
434             }
435             break;
436         case AppendState::Aborting:
437             ok = true;
438             nextAppendState = AppendState::NotStarted;
439             break;
440         case AppendState::Invalid:
441             ok = true;
442             break;
443         default:
444             break;
445         }
446         break;
447     case AppendState::KeyNegotiation:
448         switch (newAppendState) {
449         case AppendState::Ongoing:
450         case AppendState::Invalid:
451             ok = true;
452             break;
453         default:
454             break;
455         }
456         break;
457     case AppendState::Ongoing:
458         switch (newAppendState) {
459         case AppendState::KeyNegotiation:
460         case AppendState::Sampling:
461         case AppendState::Invalid:
462             ok = true;
463             break;
464         case AppendState::DataStarve:
465             ok = true;
466             GST_DEBUG("received all pending samples");
467             m_sourceBufferPrivate->didReceiveAllPendingSamples();
468             if (m_abortPending)
469                 nextAppendState = AppendState::Aborting;
470             else
471                 nextAppendState = AppendState::NotStarted;
472             break;
473         default:
474             break;
475         }
476         break;
477     case AppendState::DataStarve:
478         switch (newAppendState) {
479         case AppendState::NotStarted:
480         case AppendState::Invalid:
481             ok = true;
482             break;
483         case AppendState::Aborting:
484             ok = true;
485             nextAppendState = AppendState::NotStarted;
486             break;
487         default:
488             break;
489         }
490         break;
491     case AppendState::Sampling:
492         switch (newAppendState) {
493         case AppendState::Sampling:
494         case AppendState::Invalid:
495             ok = true;
496             break;
497         case AppendState::LastSample:
498             ok = true;
499             GST_DEBUG("received all pending samples");
500             m_sourceBufferPrivate->didReceiveAllPendingSamples();
501             if (m_abortPending)
502                 nextAppendState = AppendState::Aborting;
503             else
504                 nextAppendState = AppendState::NotStarted;
505             break;
506         default:
507             break;
508         }
509         break;
510     case AppendState::LastSample:
511         switch (newAppendState) {
512         case AppendState::NotStarted:
513         case AppendState::Invalid:
514             ok = true;
515             break;
516         case AppendState::Aborting:
517             ok = true;
518             nextAppendState = AppendState::NotStarted;
519             break;
520         default:
521             break;
522         }
523         break;
524     case AppendState::Aborting:
525         switch (newAppendState) {
526         case AppendState::NotStarted:
527             ok = true;
528             resetPipeline();
529             m_abortPending = false;
530             nextAppendState = AppendState::NotStarted;
531             break;
532         case AppendState::Invalid:
533             ok = true;
534             break;
535         default:
536             break;
537         }
538         break;
539     case AppendState::Invalid:
540         ok = true;
541         break;
542     }
543
544     if (ok)
545         m_appendState = newAppendState;
546     else
547         GST_ERROR("Invalid append state transition %s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
548
549     ASSERT(ok);
550
551     if (nextAppendState != AppendState::Invalid)
552         setAppendState(nextAppendState);
553 }
554
555 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
556 {
557     ASSERT(WTF::isMainThread());
558
559     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
560     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
561
562     GstStructure* structure = gst_caps_get_structure(m_demuxerSrcPadCaps.get(), 0);
563     bool sizeConfigured = false;
564
565 #if GST_CHECK_VERSION(1, 5, 3) && ENABLE(ENCRYPTED_MEDIA)
566     if (gst_structure_has_name(structure, "application/x-cenc")) {
567         // Any previous decryptor should have been removed from the pipeline by disconnectFromAppSinkFromStreamingThread()
568         ASSERT(!m_decryptor);
569
570         m_decryptor = GStreamerEMEUtilities::createDecryptor(gst_structure_get_string(structure, "protection-system"));
571         if (!m_decryptor) {
572             GST_ERROR("decryptor not found for caps: %" GST_PTR_FORMAT, m_demuxerSrcPadCaps.get());
573             return;
574         }
575
576         const gchar* originalMediaType = gst_structure_get_string(structure, "original-media-type");
577
578         if (!MediaPlayerPrivateGStreamerMSE::supportsCodecs(originalMediaType)) {
579             m_presentationSize = WebCore::FloatSize();
580             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
581         } else if (g_str_has_prefix(originalMediaType, "video/")) {
582             int width = 0;
583             int height = 0;
584             float finalHeight = 0;
585
586             if (gst_structure_get_int(structure, "width", &width) && gst_structure_get_int(structure, "height", &height)) {
587                 int ratioNumerator = 1;
588                 int ratioDenominator = 1;
589
590                 gst_structure_get_fraction(structure, "pixel-aspect-ratio", &ratioNumerator, &ratioDenominator);
591                 finalHeight = height * ((float) ratioDenominator / (float) ratioNumerator);
592             }
593
594             m_presentationSize = WebCore::FloatSize(width, finalHeight);
595             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
596         } else {
597             m_presentationSize = WebCore::FloatSize();
598             if (g_str_has_prefix(originalMediaType, "audio/"))
599                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
600             else if (g_str_has_prefix(originalMediaType, "text/"))
601                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
602         }
603         sizeConfigured = true;
604     }
605 #endif
606
607     if (!sizeConfigured) {
608         const char* structureName = gst_structure_get_name(structure);
609         GstVideoInfo info;
610
611         if (!MediaPlayerPrivateGStreamerMSE::supportsCodecs(structureName)) {
612             m_presentationSize = WebCore::FloatSize();
613             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
614         } else if (g_str_has_prefix(structureName, "video/") && gst_video_info_from_caps(&info, demuxerSrcPadCaps)) {
615             float width, height;
616
617             width = info.width;
618             height = info.height * ((float) info.par_d / (float) info.par_n);
619
620             m_presentationSize = WebCore::FloatSize(width, height);
621             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
622         } else {
623             m_presentationSize = WebCore::FloatSize();
624             if (g_str_has_prefix(structureName, "audio/"))
625                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
626             else if (g_str_has_prefix(structureName, "text/"))
627                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
628         }
629     }
630 }
631
632 void AppendPipeline::appsinkCapsChanged()
633 {
634     ASSERT(WTF::isMainThread());
635
636     if (!m_appsink)
637         return;
638
639     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
640     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
641
642     if (!caps)
643         return;
644
645     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
646     bool previousCapsWereNull = !m_appsinkCaps;
647
648     if (m_appsinkCaps != caps) {
649         m_appsinkCaps = WTFMove(caps);
650         if (m_playerPrivate)
651             m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull);
652         didReceiveInitializationSegment();
653         gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
654     }
655 }
656
657 void AppendPipeline::checkEndOfAppend()
658 {
659     ASSERT(WTF::isMainThread());
660
661     if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling))
662         return;
663
664     GST_TRACE("end of append data mark was received");
665
666     switch (m_appendState) {
667     case AppendState::Ongoing:
668         GST_TRACE("DataStarve");
669         m_appsrcNeedDataReceived = false;
670         setAppendState(AppendState::DataStarve);
671         break;
672     case AppendState::Sampling:
673         GST_TRACE("LastSample");
674         m_appsrcNeedDataReceived = false;
675         setAppendState(AppendState::LastSample);
676         break;
677     default:
678         ASSERT_NOT_REACHED();
679         break;
680     }
681 }
682
683 void AppendPipeline::appsinkNewSample(GstSample* sample)
684 {
685     ASSERT(WTF::isMainThread());
686
687     {
688         LockHolder locker(m_newSampleLock);
689
690         // If we were in KeyNegotiation but samples are coming, assume we're already OnGoing
691         if (m_appendState == AppendState::KeyNegotiation)
692             setAppendState(AppendState::Ongoing);
693
694         // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
695         if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
696             GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
697             // FIXME: Return ERROR and find a more robust way to detect that all the
698             // data has been processed, so we don't need to resort to these hacks.
699             // All in all, return OK, even if it's not the proper thing to do. We don't want to break the demuxer.
700             m_flowReturn = GST_FLOW_OK;
701             m_newSampleCondition.notifyOne();
702             return;
703         }
704
705         RefPtr<GStreamerMediaSample> mediaSample = WebCore::GStreamerMediaSample::create(sample, m_presentationSize, trackId());
706
707         GST_TRACE("append: trackId=%s PTS=%f presentationSize=%.0fx%.0f", mediaSample->trackID().string().utf8().data(), mediaSample->presentationTime().toFloat(), mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
708
709         // If we're beyond the duration, ignore this sample and the remaining ones.
710         MediaTime duration = m_mediaSourceClient->duration();
711         if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
712             GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
713             setAppendState(AppendState::LastSample);
714             m_flowReturn = GST_FLOW_OK;
715             m_newSampleCondition.notifyOne();
716             return;
717         }
718
719         // Add a gap sample if a gap is detected before the first sample.
720         if (mediaSample->decodeTime() == MediaTime::zeroTime()
721             && mediaSample->presentationTime() > MediaTime::zeroTime()
722             && mediaSample->presentationTime() <= MediaTime(1, 10)) {
723             GST_DEBUG("Adding gap offset");
724             mediaSample->applyPtsOffset(MediaTime::zeroTime());
725         }
726
727         m_sourceBufferPrivate->didReceiveSample(*mediaSample);
728         setAppendState(AppendState::Sampling);
729         m_flowReturn = GST_FLOW_OK;
730         m_newSampleCondition.notifyOne();
731     }
732
733     checkEndOfAppend();
734 }
735
736 void AppendPipeline::appsinkEOS()
737 {
738     ASSERT(WTF::isMainThread());
739
740     switch (m_appendState) {
741     case AppendState::Aborting:
742         // Ignored. Operation completion will be managed by the Aborting->NotStarted transition.
743         return;
744     case AppendState::Ongoing:
745         // Finish Ongoing and Sampling states.
746         setAppendState(AppendState::DataStarve);
747         break;
748     case AppendState::Sampling:
749         setAppendState(AppendState::LastSample);
750         break;
751     default:
752         GST_DEBUG("Unexpected EOS");
753         break;
754     }
755 }
756
757 void AppendPipeline::didReceiveInitializationSegment()
758 {
759     ASSERT(WTF::isMainThread());
760
761     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
762
763     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
764     initializationSegment.duration = m_mediaSourceClient->duration();
765
766     switch (m_streamType) {
767     case Audio: {
768         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
769         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
770         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
771         initializationSegment.audioTracks.append(info);
772         break;
773     }
774     case Video: {
775         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
776         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
777         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
778         initializationSegment.videoTracks.append(info);
779         break;
780     }
781     default:
782         GST_ERROR("Unsupported stream type or codec");
783         break;
784     }
785
786     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
787 }
788
789 AtomicString AppendPipeline::trackId()
790 {
791     ASSERT(WTF::isMainThread());
792
793     if (!m_track)
794         return AtomicString();
795
796     return m_track->id();
797 }
798
799 void AppendPipeline::resetPipeline()
800 {
801     ASSERT(WTF::isMainThread());
802     GST_DEBUG("resetting pipeline");
803     m_appsrcAtLeastABufferLeft = false;
804     setAppsrcDataLeavingProbe();
805
806     {
807         LockHolder locker(m_newSampleLock);
808         m_newSampleCondition.notifyOne();
809         gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
810         gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
811     }
812
813 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
814     {
815         static unsigned i = 0;
816         // This is here for debugging purposes. It does not make sense to have it as class member.
817         WTF::String  dotFileName = String::format("reset-pipeline-%d", ++i);
818         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
819     }
820 #endif
821
822 }
823
824 void AppendPipeline::setAppsrcDataLeavingProbe()
825 {
826     if (m_appsrcDataLeavingProbeId)
827         return;
828
829     GST_TRACE("setting appsrc data leaving probe");
830
831     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
832     m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);
833 }
834
835 void AppendPipeline::removeAppsrcDataLeavingProbe()
836 {
837     if (!m_appsrcDataLeavingProbeId)
838         return;
839
840     GST_TRACE("removing appsrc data leaving probe");
841
842     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
843     gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);
844     m_appsrcDataLeavingProbeId = 0;
845 }
846
847 void AppendPipeline::abort()
848 {
849     ASSERT(WTF::isMainThread());
850     GST_DEBUG("aborting");
851
852     m_pendingBuffer = nullptr;
853
854     // Abort already ongoing.
855     if (m_abortPending)
856         return;
857
858     m_abortPending = true;
859     if (m_appendState == AppendState::NotStarted)
860         setAppendState(AppendState::Aborting);
861     // Else, the automatic state transitions will take care when the ongoing append finishes.
862 }
863
864 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
865 {
866     GstFlowReturn result;
867
868     if (m_abortPending) {
869         m_pendingBuffer = adoptGRef(buffer);
870         result = GST_FLOW_OK;
871     } else {
872         setAppendState(AppendPipeline::AppendState::Ongoing);
873         GST_TRACE("pushing new buffer %p", buffer);
874         result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer);
875     }
876
877     return result;
878 }
879
880 void AppendPipeline::reportAppsrcAtLeastABufferLeft()
881 {
882     GST_TRACE("buffer left appsrc, reposting to bus");
883     GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left");
884     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
885     gst_bus_post(m_bus.get(), message);
886 }
887
888 void AppendPipeline::reportAppsrcNeedDataReceived()
889 {
890     GST_TRACE("received need-data signal at appsrc, reposting to bus");
891     GstStructure* structure = gst_structure_new_empty("appsrc-need-data");
892     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
893     gst_bus_post(m_bus.get(), message);
894 }
895
896 GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
897 {
898     ASSERT(!WTF::isMainThread());
899
900     // Even if we're disabled, it's important to pull the sample out anyway to
901     // avoid deadlocks when changing to GST_STATE_NULL having a non empty appsink.
902     GRefPtr<GstSample> sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(appsink)));
903     LockHolder locker(m_newSampleLock);
904
905     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
906         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
907         return GST_FLOW_ERROR;
908     }
909
910     GstStructure* structure = gst_structure_new("appsink-new-sample", "new-sample", GST_TYPE_SAMPLE, sample.get(), nullptr);
911     GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
912     gst_bus_post(m_bus.get(), message);
913     GST_TRACE("appsink-new-sample message posted to bus");
914
915     m_newSampleCondition.wait(m_newSampleLock);
916     // We've been awaken because the sample was processed or because of
917     // an exceptional condition (entered in Invalid state, destructor, etc.).
918     // We can't reliably delete info here, appendPipelineAppsinkNewSampleMainThread will do it.
919
920     return m_flowReturn;
921 }
922
923 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
924 {
925     if (!m_appsink)
926         return;
927
928     GST_DEBUG("connecting to appsink");
929
930     if (m_demux->numsrcpads > 1) {
931         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
932         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
933         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
934         return;
935     }
936
937     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
938
939     // Only one stream per demuxer is supported.
940     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
941
942     gint64 timeLength = 0;
943     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
944         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
945         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
946     else
947         m_initialDuration = MediaTime::positiveInfiniteTime();
948
949     if (WTF::isMainThread())
950         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
951     else {
952         // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
953         LockHolder locker(m_padAddRemoveLock);
954         if (!m_playerPrivate)
955             return;
956
957         GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
958         GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
959         gst_bus_post(m_bus.get(), message);
960         GST_TRACE("demuxer-connect-to-appsink message posted to bus");
961
962         m_padAddRemoveCondition.wait(m_padAddRemoveLock);
963     }
964
965     // Must be done in the thread we were called from (usually streaming thread).
966     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
967         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
968         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
969
970     if (isData) {
971         // FIXME: Only add appsink one time. This method can be called several times.
972         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
973         if (!parent)
974             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
975
976 #if ENABLE(ENCRYPTED_MEDIA)
977         if (m_decryptor) {
978             gst_object_ref(m_decryptor.get());
979             gst_bin_add(GST_BIN(m_pipeline.get()), m_decryptor.get());
980
981             GRefPtr<GstPad> decryptorSinkPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "sink"));
982             gst_pad_link(demuxerSrcPad, decryptorSinkPad.get());
983
984             GRefPtr<GstPad> decryptorSrcPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "src"));
985             gst_pad_link(decryptorSrcPad.get(), appsinkSinkPad.get());
986
987             gst_element_sync_state_with_parent(m_appsink.get());
988             gst_element_sync_state_with_parent(m_decryptor.get());
989
990             if (m_pendingDecryptionStructure)
991                 dispatchPendingDecryptionStructure();
992         } else {
993 #endif
994             gst_pad_link(demuxerSrcPad, appsinkSinkPad.get());
995             gst_element_sync_state_with_parent(m_appsink.get());
996 #if ENABLE(ENCRYPTED_MEDIA)
997         }
998 #endif
999         gst_element_set_state(m_pipeline.get(), GST_STATE_PAUSED);
1000     }
1001 }
1002
1003 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
1004 {
1005     ASSERT(WTF::isMainThread());
1006     GST_DEBUG("Connecting to appsink");
1007
1008     LockHolder locker(m_padAddRemoveLock);
1009     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
1010
1011     // Only one stream per demuxer is supported.
1012     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
1013
1014     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
1015
1016     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
1017         m_padAddRemoveCondition.notifyOne();
1018         return;
1019     }
1020
1021 #ifndef GST_DISABLE_GST_DEBUG
1022     {
1023         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1024         GST_DEBUG("%s", strcaps.get());
1025     }
1026 #endif
1027
1028     if (m_initialDuration > m_mediaSourceClient->duration()
1029         || (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime()))
1030         m_mediaSourceClient->durationChanged(m_initialDuration);
1031
1032     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
1033
1034     switch (m_streamType) {
1035     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
1036         if (m_playerPrivate)
1037             m_track = WebCore::AudioTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
1038         break;
1039     case WebCore::MediaSourceStreamTypeGStreamer::Video:
1040         if (m_playerPrivate)
1041             m_track = WebCore::VideoTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
1042         break;
1043     case WebCore::MediaSourceStreamTypeGStreamer::Text:
1044         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
1045         break;
1046     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
1047         {
1048             GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1049             GST_DEBUG("Unsupported track codec: %s", strcaps.get());
1050         }
1051         // This is going to cause an error which will detach the SourceBuffer and tear down this
1052         // AppendPipeline, so we need the padAddRemove lock released before continuing.
1053         m_track = nullptr;
1054         m_padAddRemoveCondition.notifyOne();
1055         locker.unlockEarly();
1056         didReceiveInitializationSegment();
1057         return;
1058     default:
1059         // No useful data, but notify anyway to complete the append operation.
1060         GST_DEBUG("Received all pending samples (no data)");
1061         m_sourceBufferPrivate->didReceiveAllPendingSamples();
1062         break;
1063     }
1064
1065     m_padAddRemoveCondition.notifyOne();
1066 }
1067
1068 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad* demuxerSrcPad)
1069 {
1070     // Must be done in the thread we were called from (usually streaming thread).
1071     if (!gst_pad_is_linked(demuxerSrcPad)) {
1072         gulong probeId = GPOINTER_TO_ULONG(g_object_get_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId"));
1073         if (probeId) {
1074             GST_DEBUG("Disconnecting black hole probe.");
1075             g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", nullptr);
1076             gst_pad_remove_probe(demuxerSrcPad, probeId);
1077         } else
1078             GST_WARNING("Not disconnecting demuxer src pad because it wasn't linked");
1079         return;
1080     }
1081
1082     GST_DEBUG("Disconnecting appsink");
1083
1084 #if ENABLE(ENCRYPTED_MEDIA)
1085     if (m_decryptor) {
1086         gst_element_unlink(m_decryptor.get(), m_appsink.get());
1087         gst_element_unlink(m_demux.get(), m_decryptor.get());
1088         gst_element_set_state(m_decryptor.get(), GST_STATE_NULL);
1089         gst_bin_remove(GST_BIN(m_pipeline.get()), m_decryptor.get());
1090     } else
1091 #endif
1092         gst_element_unlink(m_demux.get(), m_appsink.get());
1093 }
1094
1095 #if ENABLE(ENCRYPTED_MEDIA)
1096 void AppendPipeline::dispatchPendingDecryptionStructure()
1097 {
1098     ASSERT(m_decryptor);
1099     ASSERT(m_pendingDecryptionStructure);
1100     ASSERT(m_appendState == AppendState::KeyNegotiation);
1101     GST_TRACE("dispatching key to append pipeline %p", this);
1102
1103     // Release the m_pendingDecryptionStructure object since
1104     // gst_event_new_custom() takes over ownership of it.
1105     gst_element_send_event(m_pipeline.get(), gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_OOB, m_pendingDecryptionStructure.release()));
1106
1107     setAppendState(AppendState::Ongoing);
1108 }
1109
1110 void AppendPipeline::dispatchDecryptionStructure(GUniquePtr<GstStructure>&& structure)
1111 {
1112     if (m_appendState == AppendState::KeyNegotiation) {
1113         GST_TRACE("append pipeline %p in key negotiation", this);
1114         m_pendingDecryptionStructure = WTFMove(structure);
1115         if (m_decryptor)
1116             dispatchPendingDecryptionStructure();
1117         else
1118             GST_TRACE("no decryptor yet, waiting for it");
1119     } else
1120         GST_TRACE("append pipeline %p not in key negotiation", this);
1121 }
1122 #endif
1123
1124 static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
1125 {
1126     GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
1127     GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
1128     gst_bus_post(appendPipeline->bus(), message);
1129     GST_TRACE("appsink-caps-changed message posted to bus");
1130 }
1131
1132 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)
1133 {
1134     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1135
1136     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1137     gsize bufferSize = gst_buffer_get_size(buffer);
1138
1139     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);
1140
1141     appendPipeline->reportAppsrcAtLeastABufferLeft();
1142
1143     return GST_PAD_PROBE_OK;
1144 }
1145
1146 #if !LOG_DISABLED
1147 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
1148 {
1149     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1150     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1151     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
1152     return GST_PAD_PROBE_OK;
1153 }
1154 #endif
1155
1156 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
1157 {
1158     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1159     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1160     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
1161     return GST_PAD_PROBE_DROP;
1162 }
1163
1164 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)
1165 {
1166     appendPipeline->reportAppsrcNeedDataReceived();
1167 }
1168
1169 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1170 {
1171     appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
1172 }
1173
1174 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1175 {
1176     appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
1177 }
1178
1179 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
1180 {
1181     return appendPipeline->handleNewAppsinkSample(appsink);
1182 }
1183
1184 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
1185 {
1186     if (WTF::isMainThread())
1187         appendPipeline->appsinkEOS();
1188     else {
1189         GstStructure* structure = gst_structure_new_empty("appsink-eos");
1190         GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
1191         gst_bus_post(appendPipeline->bus(), message);
1192         GST_TRACE("appsink-eos message posted to bus");
1193     }
1194
1195     GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
1196 }
1197
1198
1199
1200 } // namespace WebCore.
1201
1202 #endif // USE(GSTREAMER)