REGRESSION(r207879-207891): [GStreamer] Introduced many layout test failures and...
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016 Metrological Group B.V.
3  * Copyright (C) 2016 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GRefPtrGStreamer.h"
28 #include "GStreamerMediaDescription.h"
29 #include "GStreamerMediaSample.h"
30 #include "GStreamerUtilities.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35
36 #include <gst/app/gstappsink.h>
37 #include <gst/app/gstappsrc.h>
38 #include <gst/gst.h>
39 #include <gst/pbutils/pbutils.h>
40 #include <gst/video/video.h>
41 #include <wtf/Condition.h>
42 #include <wtf/glib/GLibUtilities.h>
43
44 namespace WebCore {
45
46 static const char* dumpAppendState(AppendPipeline::AppendState appendState)
47 {
48     switch (appendState) {
49     case AppendPipeline::AppendState::Invalid:
50         return "Invalid";
51     case AppendPipeline::AppendState::NotStarted:
52         return "NotStarted";
53     case AppendPipeline::AppendState::Ongoing:
54         return "Ongoing";
55     case AppendPipeline::AppendState::KeyNegotiation:
56         return "KeyNegotiation";
57     case AppendPipeline::AppendState::DataStarve:
58         return "DataStarve";
59     case AppendPipeline::AppendState::Sampling:
60         return "Sampling";
61     case AppendPipeline::AppendState::LastSample:
62         return "LastSample";
63     case AppendPipeline::AppendState::Aborting:
64         return "Aborting";
65     default:
66         return "(unknown)";
67     }
68 }
69
70 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);
71 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
72 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
73 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
74 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);
75 #if !LOG_DISABLED
76 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
77 #endif
78 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
79 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
80 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
81
82 static void appendPipelineElementMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
83 {
84     appendPipeline->handleElementMessage(message);
85 }
86
87 static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
88 {
89     appendPipeline->handleApplicationMessage(message);
90 }
91
92 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
93     : m_mediaSourceClient(mediaSourceClient.get())
94     , m_sourceBufferPrivate(sourceBufferPrivate.get())
95     , m_playerPrivate(&playerPrivate)
96     , m_id(0)
97     , m_appsrcAtLeastABufferLeft(false)
98     , m_appsrcNeedDataReceived(false)
99     , m_appsrcDataLeavingProbeId(0)
100     , m_appendState(AppendState::NotStarted)
101     , m_abortPending(false)
102     , m_streamType(Unknown)
103 {
104     ASSERT(WTF::isMainThread());
105
106     GST_TRACE("Creating AppendPipeline (%p)", this);
107
108     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
109     // The track name is still unknown at this time, though.
110     m_pipeline = adoptGRef(gst_pipeline_new(nullptr));
111
112     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
113     gst_bus_add_signal_watch(m_bus.get());
114     gst_bus_enable_sync_message_emission(m_bus.get());
115
116     g_signal_connect(m_bus.get(), "sync-message::element", G_CALLBACK(appendPipelineElementMessageCallback), this);
117     g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
118
119     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
120     // below will already take the initial reference and we need an additional one for us.
121     m_appsrc = gst_element_factory_make("appsrc", nullptr);
122     m_demux = gst_element_factory_make("qtdemux", nullptr);
123     m_appsink = gst_element_factory_make("appsink", nullptr);
124
125     g_object_set(G_OBJECT(m_demux.get()), "always-honor-tfdt", TRUE, nullptr);
126     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
127     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
128
129     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
130     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
131
132     setAppsrcDataLeavingProbe();
133
134 #if !LOG_DISABLED
135     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
136     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
137     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
138     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
139     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
140     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
141     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
142 #endif
143
144     // These signals won't be connected outside of the lifetime of "this".
145     g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);
146     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
147     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
148     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
149     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
150
151     // Add_many will take ownership of a reference. That's why we used an assignment before.
152     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
153     gst_element_link(m_appsrc.get(), m_demux.get());
154
155     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
156 };
157
158 AppendPipeline::~AppendPipeline()
159 {
160     ASSERT(WTF::isMainThread());
161
162     {
163         LockHolder locker(m_newSampleLock);
164         setAppendState(AppendState::Invalid);
165         m_newSampleCondition.notifyOne();
166     }
167
168     {
169         LockHolder locker(m_padAddRemoveLock);
170         m_playerPrivate = nullptr;
171         m_padAddRemoveCondition.notifyOne();
172     }
173
174     GST_TRACE("Destroying AppendPipeline (%p)", this);
175
176     // FIXME: Maybe notify appendComplete here?
177
178     if (m_pipeline) {
179         ASSERT(m_bus);
180         gst_bus_remove_signal_watch(m_bus.get());
181         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
182         m_pipeline = nullptr;
183     }
184
185     if (m_appsrc) {
186         removeAppsrcDataLeavingProbe();
187         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
188         m_appsrc = nullptr;
189     }
190
191     if (m_demux) {
192 #if !LOG_DISABLED
193         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
194         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
195 #endif
196
197         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
198         m_demux = nullptr;
199     }
200
201     if (m_appsink) {
202         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
203         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
204         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
205
206 #if !LOG_DISABLED
207         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
208 #endif
209
210         m_appsink = nullptr;
211     }
212
213     m_appsinkCaps = nullptr;
214     m_demuxerSrcPadCaps = nullptr;
215 };
216
217 void AppendPipeline::clearPlayerPrivate()
218 {
219     ASSERT(WTF::isMainThread());
220     GST_DEBUG("cleaning private player");
221
222     {
223         LockHolder locker(m_newSampleLock);
224         // Make sure that AppendPipeline won't process more data from now on and
225         // instruct handleNewSample to abort itself from now on as well.
226         setAppendState(AppendState::Invalid);
227
228         // Awake any pending handleNewSample operation in the streaming thread.
229         m_newSampleCondition.notifyOne();
230     }
231
232     {
233         LockHolder locker(m_padAddRemoveLock);
234         m_playerPrivate = nullptr;
235         m_padAddRemoveCondition.notifyOne();
236     }
237
238     // And now that no handleNewSample operations will remain stalled waiting
239     // for the main thread, stop the pipeline.
240     if (m_pipeline)
241         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
242 }
243
244 void AppendPipeline::handleElementMessage(GstMessage* message)
245 {
246     ASSERT(WTF::isMainThread());
247
248     const GstStructure* structure = gst_message_get_structure(message);
249     if (gst_structure_has_name(structure, "drm-key-needed"))
250         setAppendState(AppendPipeline::AppendState::KeyNegotiation);
251
252     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
253     if (m_playerPrivate)
254         m_playerPrivate->handleSyncMessage(message);
255 }
256
257 void AppendPipeline::handleApplicationMessage(GstMessage* message)
258 {
259     ASSERT(WTF::isMainThread());
260
261     const GstStructure* structure = gst_message_get_structure(message);
262
263     if (gst_structure_has_name(structure, "appsrc-need-data")) {
264         handleAppsrcNeedDataReceived();
265         return;
266     }
267
268     if (gst_structure_has_name(structure, "appsrc-buffer-left")) {
269         handleAppsrcAtLeastABufferLeft();
270         return;
271     }
272
273     if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
274         GRefPtr<GstPad> demuxerSrcPad;
275         gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
276         ASSERT(demuxerSrcPad);
277         connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
278         return;
279     }
280
281     if (gst_structure_has_name(structure, "appsink-caps-changed")) {
282         appsinkCapsChanged();
283         return;
284     }
285
286     if (gst_structure_has_name(structure, "appsink-new-sample")) {
287         GRefPtr<GstSample> newSample;
288         gst_structure_get(structure, "new-sample", GST_TYPE_SAMPLE, &newSample.outPtr(), nullptr);
289
290         appsinkNewSample(newSample.get());
291         return;
292     }
293
294     if (gst_structure_has_name(structure, "appsink-eos")) {
295         appsinkEOS();
296         return;
297     }
298
299     ASSERT_NOT_REACHED();
300 }
301
302 void AppendPipeline::handleAppsrcNeedDataReceived()
303 {
304     if (!m_appsrcAtLeastABufferLeft) {
305         GST_TRACE("discarding until at least a buffer leaves appsrc");
306         return;
307     }
308
309     ASSERT(m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);
310     ASSERT(!m_appsrcNeedDataReceived);
311
312     GST_TRACE("received need-data from appsrc");
313
314     m_appsrcNeedDataReceived = true;
315     checkEndOfAppend();
316 }
317
318 void AppendPipeline::handleAppsrcAtLeastABufferLeft()
319 {
320     m_appsrcAtLeastABufferLeft = true;
321     GST_TRACE("received buffer-left from appsrc");
322 #if LOG_DISABLED
323     removeAppsrcDataLeavingProbe();
324 #endif
325 }
326
327 gint AppendPipeline::id()
328 {
329     ASSERT(WTF::isMainThread());
330
331     if (m_id)
332         return m_id;
333
334     static gint s_totalAudio = 0;
335     static gint s_totalVideo = 0;
336     static gint s_totalText = 0;
337
338     switch (m_streamType) {
339     case Audio:
340         m_id = ++s_totalAudio;
341         break;
342     case Video:
343         m_id = ++s_totalVideo;
344         break;
345     case Text:
346         m_id = ++s_totalText;
347         break;
348     case Unknown:
349     case Invalid:
350         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
351         ASSERT_NOT_REACHED();
352         break;
353     }
354
355     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
356
357     return m_id;
358 }
359
360 void AppendPipeline::setAppendState(AppendState newAppendState)
361 {
362     ASSERT(WTF::isMainThread());
363     // Valid transitions:
364     // NotStarted-->Ongoing-->DataStarve-->NotStarted
365     //           |         |            `->Aborting-->NotStarted
366     //           |         `->Sampling-ยทยทยท->Sampling-->LastSample-->NotStarted
367     //           |         |                                     `->Aborting-->NotStarted
368     //           |         `->KeyNegotiation-->Ongoing-->[...]
369     //           `->Aborting-->NotStarted
370     AppendState oldAppendState = m_appendState;
371     AppendState nextAppendState = AppendState::Invalid;
372
373     if (oldAppendState != newAppendState)
374         GST_TRACE("%s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
375
376     bool ok = false;
377
378     switch (oldAppendState) {
379     case AppendState::NotStarted:
380         switch (newAppendState) {
381         case AppendState::Ongoing:
382             ok = true;
383             gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
384             break;
385         case AppendState::NotStarted:
386             ok = true;
387             if (m_pendingBuffer) {
388                 GST_TRACE("pushing pending buffer %p", m_pendingBuffer.get());
389                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
390                 nextAppendState = AppendState::Ongoing;
391             }
392             break;
393         case AppendState::Aborting:
394             ok = true;
395             nextAppendState = AppendState::NotStarted;
396             break;
397         case AppendState::Invalid:
398             ok = true;
399             break;
400         default:
401             break;
402         }
403         break;
404     case AppendState::KeyNegotiation:
405         switch (newAppendState) {
406         case AppendState::Ongoing:
407         case AppendState::Invalid:
408             ok = true;
409             break;
410         default:
411             break;
412         }
413         break;
414     case AppendState::Ongoing:
415         switch (newAppendState) {
416         case AppendState::KeyNegotiation:
417         case AppendState::Sampling:
418         case AppendState::Invalid:
419             ok = true;
420             break;
421         case AppendState::DataStarve:
422             ok = true;
423             GST_DEBUG("received all pending samples");
424             m_sourceBufferPrivate->didReceiveAllPendingSamples();
425             if (m_abortPending)
426                 nextAppendState = AppendState::Aborting;
427             else
428                 nextAppendState = AppendState::NotStarted;
429             break;
430         default:
431             break;
432         }
433         break;
434     case AppendState::DataStarve:
435         switch (newAppendState) {
436         case AppendState::NotStarted:
437         case AppendState::Invalid:
438             ok = true;
439             break;
440         case AppendState::Aborting:
441             ok = true;
442             nextAppendState = AppendState::NotStarted;
443             break;
444         default:
445             break;
446         }
447         break;
448     case AppendState::Sampling:
449         switch (newAppendState) {
450         case AppendState::Sampling:
451         case AppendState::Invalid:
452             ok = true;
453             break;
454         case AppendState::LastSample:
455             ok = true;
456             GST_DEBUG("received all pending samples");
457             m_sourceBufferPrivate->didReceiveAllPendingSamples();
458             if (m_abortPending)
459                 nextAppendState = AppendState::Aborting;
460             else
461                 nextAppendState = AppendState::NotStarted;
462             break;
463         default:
464             break;
465         }
466         break;
467     case AppendState::LastSample:
468         switch (newAppendState) {
469         case AppendState::NotStarted:
470         case AppendState::Invalid:
471             ok = true;
472             break;
473         case AppendState::Aborting:
474             ok = true;
475             nextAppendState = AppendState::NotStarted;
476             break;
477         default:
478             break;
479         }
480         break;
481     case AppendState::Aborting:
482         switch (newAppendState) {
483         case AppendState::NotStarted:
484             ok = true;
485             resetPipeline();
486             m_abortPending = false;
487             nextAppendState = AppendState::NotStarted;
488             break;
489         case AppendState::Invalid:
490             ok = true;
491             break;
492         default:
493             break;
494         }
495         break;
496     case AppendState::Invalid:
497         ok = true;
498         break;
499     }
500
501     if (ok)
502         m_appendState = newAppendState;
503     else
504         GST_ERROR("Invalid append state transition %s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
505
506     ASSERT(ok);
507
508     if (nextAppendState != AppendState::Invalid)
509         setAppendState(nextAppendState);
510 }
511
512 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
513 {
514     ASSERT(WTF::isMainThread());
515
516     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
517     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
518
519     GstStructure* structure = gst_caps_get_structure(m_demuxerSrcPadCaps.get(), 0);
520     bool sizeConfigured = false;
521
522 #if GST_CHECK_VERSION(1, 5, 3) && ENABLE(LEGACY_ENCRYPTED_MEDIA)
523     if (gst_structure_has_name(structure, "application/x-cenc")) {
524         // Any previous decryptor should have been removed from the pipeline by disconnectFromAppSinkFromStreamingThread()
525         ASSERT(!m_decryptor);
526
527         m_decryptor = adoptGRef(WebCore::createGstDecryptor(gst_structure_get_string(structure, "protection-system")));
528         if (!m_decryptor) {
529             GST_ERROR("decryptor not found for caps: %" GST_PTR_FORMAT, m_demuxerSrcPadCaps.get());
530             return;
531         }
532
533         const gchar* originalMediaType = gst_structure_get_string(structure, "original-media-type");
534
535         if (!MediaPlayerPrivateGStreamerMSE::supportsCodecs(originalMediaType)) {
536             m_presentationSize = WebCore::FloatSize();
537             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
538         } else if (g_str_has_prefix(originalMediaType, "video/")) {
539             int width = 0;
540             int height = 0;
541             float finalHeight = 0;
542
543             if (gst_structure_get_int(structure, "width", &width) && gst_structure_get_int(structure, "height", &height)) {
544                 int ratioNumerator = 1;
545                 int ratioDenominator = 1;
546
547                 gst_structure_get_fraction(structure, "pixel-aspect-ratio", &ratioNumerator, &ratioDenominator);
548                 finalHeight = height * ((float) ratioDenominator / (float) ratioNumerator);
549             }
550
551             m_presentationSize = WebCore::FloatSize(width, finalHeight);
552             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
553         } else {
554             m_presentationSize = WebCore::FloatSize();
555             if (g_str_has_prefix(originalMediaType, "audio/"))
556                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
557             else if (g_str_has_prefix(originalMediaType, "text/"))
558                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
559         }
560         sizeConfigured = true;
561     }
562 #endif
563
564     if (!sizeConfigured) {
565         const char* structureName = gst_structure_get_name(structure);
566         GstVideoInfo info;
567
568         if (!MediaPlayerPrivateGStreamerMSE::supportsCodecs(structureName)) {
569             m_presentationSize = WebCore::FloatSize();
570             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
571         } else if (g_str_has_prefix(structureName, "video/") && gst_video_info_from_caps(&info, demuxerSrcPadCaps)) {
572             float width, height;
573
574             width = info.width;
575             height = info.height * ((float) info.par_d / (float) info.par_n);
576
577             m_presentationSize = WebCore::FloatSize(width, height);
578             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
579         } else {
580             m_presentationSize = WebCore::FloatSize();
581             if (g_str_has_prefix(structureName, "audio/"))
582                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
583             else if (g_str_has_prefix(structureName, "text/"))
584                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
585         }
586     }
587 }
588
589 void AppendPipeline::appsinkCapsChanged()
590 {
591     ASSERT(WTF::isMainThread());
592
593     if (!m_appsink)
594         return;
595
596     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
597     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
598
599     if (!caps)
600         return;
601
602     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
603     bool previousCapsWereNull = !m_appsinkCaps;
604
605     if (gst_caps_replace(&m_appsinkCaps.outPtr(), caps.get())) {
606         if (m_playerPrivate && previousCapsWereNull)
607             m_playerPrivate->trackDetected(this, m_oldTrack, m_track);
608         didReceiveInitializationSegment();
609         gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
610     }
611 }
612
613 void AppendPipeline::checkEndOfAppend()
614 {
615     ASSERT(WTF::isMainThread());
616
617     if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling))
618         return;
619
620     GST_TRACE("end of append data mark was received");
621
622     switch (m_appendState) {
623     case AppendState::Ongoing:
624         GST_TRACE("DataStarve");
625         m_appsrcNeedDataReceived = false;
626         setAppendState(AppendState::DataStarve);
627         break;
628     case AppendState::Sampling:
629         GST_TRACE("LastSample");
630         m_appsrcNeedDataReceived = false;
631         setAppendState(AppendState::LastSample);
632         break;
633     default:
634         ASSERT_NOT_REACHED();
635         break;
636     }
637 }
638
639 void AppendPipeline::appsinkNewSample(GstSample* sample)
640 {
641     ASSERT(WTF::isMainThread());
642
643     {
644         LockHolder locker(m_newSampleLock);
645
646         // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
647         if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
648             GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
649             // FIXME: Return ERROR and find a more robust way to detect that all the
650             // data has been processed, so we don't need to resort to these hacks.
651             // All in all, return OK, even if it's not the proper thing to do. We don't want to break the demuxer.
652             m_flowReturn = GST_FLOW_OK;
653             m_newSampleCondition.notifyOne();
654             return;
655         }
656
657         RefPtr<GStreamerMediaSample> mediaSample = WebCore::GStreamerMediaSample::create(sample, m_presentationSize, trackId());
658
659         GST_TRACE("append: trackId=%s PTS=%f presentationSize=%.0fx%.0f", mediaSample->trackID().string().utf8().data(), mediaSample->presentationTime().toFloat(), mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
660
661         // If we're beyond the duration, ignore this sample and the remaining ones.
662         MediaTime duration = m_mediaSourceClient->duration();
663         if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
664             GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
665             setAppendState(AppendState::LastSample);
666             m_flowReturn = GST_FLOW_OK;
667             m_newSampleCondition.notifyOne();
668             return;
669         }
670
671         // Add a gap sample if a gap is detected before the first sample.
672         if (mediaSample->decodeTime() == MediaTime::zeroTime()
673             && mediaSample->presentationTime() > MediaTime::zeroTime()
674             && mediaSample->presentationTime() <= MediaTime::createWithDouble(0.1)) {
675             GST_DEBUG("Adding gap offset");
676             mediaSample->applyPtsOffset(MediaTime::zeroTime());
677         }
678
679         m_sourceBufferPrivate->didReceiveSample(mediaSample);
680         setAppendState(AppendState::Sampling);
681         m_flowReturn = GST_FLOW_OK;
682         m_newSampleCondition.notifyOne();
683     }
684
685     checkEndOfAppend();
686 }
687
688 void AppendPipeline::appsinkEOS()
689 {
690     ASSERT(WTF::isMainThread());
691
692     switch (m_appendState) {
693     case AppendState::Aborting:
694         // Ignored. Operation completion will be managed by the Aborting->NotStarted transition.
695         return;
696     case AppendState::Ongoing:
697         // Finish Ongoing and Sampling states.
698         setAppendState(AppendState::DataStarve);
699         break;
700     case AppendState::Sampling:
701         setAppendState(AppendState::LastSample);
702         break;
703     default:
704         GST_DEBUG("Unexpected EOS");
705         break;
706     }
707 }
708
709 void AppendPipeline::didReceiveInitializationSegment()
710 {
711     ASSERT(WTF::isMainThread());
712
713     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
714
715     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
716     initializationSegment.duration = m_mediaSourceClient->duration();
717
718     switch (m_streamType) {
719     case Audio: {
720         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
721         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
722         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
723         initializationSegment.audioTracks.append(info);
724         break;
725     }
726     case Video: {
727         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
728         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
729         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
730         initializationSegment.videoTracks.append(info);
731         break;
732     }
733     default:
734         GST_ERROR("Unsupported stream type or codec");
735         break;
736     }
737
738     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
739 }
740
741 AtomicString AppendPipeline::trackId()
742 {
743     ASSERT(WTF::isMainThread());
744
745     if (!m_track)
746         return AtomicString();
747
748     return m_track->id();
749 }
750
751 void AppendPipeline::resetPipeline()
752 {
753     ASSERT(WTF::isMainThread());
754     GST_DEBUG("resetting pipeline");
755     m_appsrcAtLeastABufferLeft = false;
756     setAppsrcDataLeavingProbe();
757
758     {
759         LockHolder locker(m_newSampleLock);
760         m_newSampleCondition.notifyOne();
761         gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
762         gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
763     }
764
765 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
766     {
767         static unsigned i = 0;
768         // This is here for debugging purposes. It does not make sense to have it as class member.
769         WTF::String  dotFileName = String::format("reset-pipeline-%d", ++i);
770         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
771     }
772 #endif
773
774 }
775
776 void AppendPipeline::setAppsrcDataLeavingProbe()
777 {
778     if (m_appsrcDataLeavingProbeId)
779         return;
780
781     GST_TRACE("setting appsrc data leaving probe");
782
783     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
784     m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);
785 }
786
787 void AppendPipeline::removeAppsrcDataLeavingProbe()
788 {
789     if (!m_appsrcDataLeavingProbeId)
790         return;
791
792     GST_TRACE("removing appsrc data leaving probe");
793
794     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
795     gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);
796     m_appsrcDataLeavingProbeId = 0;
797 }
798
799 void AppendPipeline::abort()
800 {
801     ASSERT(WTF::isMainThread());
802     GST_DEBUG("aborting");
803
804     m_pendingBuffer = nullptr;
805
806     // Abort already ongoing.
807     if (m_abortPending)
808         return;
809
810     m_abortPending = true;
811     if (m_appendState == AppendState::NotStarted)
812         setAppendState(AppendState::Aborting);
813     // Else, the automatic state transitions will take care when the ongoing append finishes.
814 }
815
816 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
817 {
818     GstFlowReturn result;
819
820     if (m_abortPending) {
821         m_pendingBuffer = adoptGRef(buffer);
822         result = GST_FLOW_OK;
823     } else {
824         setAppendState(AppendPipeline::AppendState::Ongoing);
825         GST_TRACE("pushing new buffer %p", buffer);
826         result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer);
827     }
828
829     return result;
830 }
831
832 void AppendPipeline::reportAppsrcAtLeastABufferLeft()
833 {
834     GST_TRACE("buffer left appsrc, reposting to bus");
835     GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left");
836     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
837     gst_bus_post(m_bus.get(), message);
838 }
839
840 void AppendPipeline::reportAppsrcNeedDataReceived()
841 {
842     GST_TRACE("received need-data signal at appsrc, reposting to bus");
843     GstStructure* structure = gst_structure_new_empty("appsrc-need-data");
844     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
845     gst_bus_post(m_bus.get(), message);
846 }
847
848 GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
849 {
850     ASSERT(!WTF::isMainThread());
851
852     // Even if we're disabled, it's important to pull the sample out anyway to
853     // avoid deadlocks when changing to GST_STATE_NULL having a non empty appsink.
854     GRefPtr<GstSample> sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(appsink)));
855     LockHolder locker(m_newSampleLock);
856
857     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
858         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
859         return GST_FLOW_ERROR;
860     }
861
862     GstStructure* structure = gst_structure_new("appsink-new-sample", "new-sample", GST_TYPE_SAMPLE, sample.get(), nullptr);
863     GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
864     gst_bus_post(m_bus.get(), message);
865     GST_TRACE("appsink-new-sample message posted to bus");
866
867     m_newSampleCondition.wait(m_newSampleLock);
868     // We've been awaken because the sample was processed or because of
869     // an exceptional condition (entered in Invalid state, destructor, etc.).
870     // We can't reliably delete info here, appendPipelineAppsinkNewSampleMainThread will do it.
871
872     return m_flowReturn;
873 }
874
875 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
876 {
877     if (!m_appsink)
878         return;
879
880     GST_DEBUG("connecting to appsink");
881
882     if (m_demux->numsrcpads > 1) {
883         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
884         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
885         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
886         return;
887     }
888
889     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
890
891     // Only one stream per demuxer is supported.
892     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
893
894     gint64 timeLength = 0;
895     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
896         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
897         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
898     else
899         m_initialDuration = MediaTime::positiveInfiniteTime();
900
901     if (WTF::isMainThread())
902         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
903     else {
904         // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
905         LockHolder locker(m_padAddRemoveLock);
906         if (!m_playerPrivate)
907             return;
908
909         GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
910         GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
911         gst_bus_post(m_bus.get(), message);
912         GST_TRACE("demuxer-connect-to-appsink message posted to bus");
913
914         m_padAddRemoveCondition.wait(m_padAddRemoveLock);
915     }
916
917     // Must be done in the thread we were called from (usually streaming thread).
918     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
919         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
920         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
921
922     if (isData) {
923         // FIXME: Only add appsink one time. This method can be called several times.
924         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
925         if (!parent)
926             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
927
928 #if ENABLE(LEGACY_ENCRYPTED_MEDIA)
929         if (m_decryptor) {
930             gst_object_ref(m_decryptor.get());
931             gst_bin_add(GST_BIN(m_pipeline.get()), m_decryptor.get());
932
933             GRefPtr<GstPad> decryptorSinkPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "sink"));
934             gst_pad_link(demuxerSrcPad, decryptorSinkPad.get());
935
936             GRefPtr<GstPad> decryptorSrcPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "src"));
937             gst_pad_link(decryptorSrcPad.get(), appsinkSinkPad.get());
938
939             gst_element_sync_state_with_parent(m_appsink.get());
940             gst_element_sync_state_with_parent(m_decryptor.get());
941         } else {
942 #endif
943             gst_pad_link(demuxerSrcPad, appsinkSinkPad.get());
944             gst_element_sync_state_with_parent(m_appsink.get());
945 #if ENABLE(LEGACY_ENCRYPTED_MEDIA)
946         }
947 #endif
948         gst_element_set_state(m_pipeline.get(), GST_STATE_PAUSED);
949     }
950 }
951
952 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
953 {
954     ASSERT(WTF::isMainThread());
955     GST_DEBUG("Connecting to appsink");
956
957     LockHolder locker(m_padAddRemoveLock);
958     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
959
960     // Only one stream per demuxer is supported.
961     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
962
963     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
964
965     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
966         m_padAddRemoveCondition.notifyOne();
967         return;
968     }
969
970 #ifndef GST_DISABLE_GST_DEBUG
971     {
972         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
973         GST_DEBUG("%s", strcaps.get());
974     }
975 #endif
976
977     if (m_initialDuration > m_mediaSourceClient->duration()
978         || (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime()))
979         m_mediaSourceClient->durationChanged(m_initialDuration);
980
981     m_oldTrack = m_track;
982
983     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
984
985     switch (m_streamType) {
986     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
987         if (m_playerPrivate)
988             m_track = WebCore::AudioTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
989         break;
990     case WebCore::MediaSourceStreamTypeGStreamer::Video:
991         if (m_playerPrivate)
992             m_track = WebCore::VideoTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
993         break;
994     case WebCore::MediaSourceStreamTypeGStreamer::Text:
995         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
996         break;
997     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
998         {
999             GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
1000             GST_DEBUG("Unsupported track codec: %s", strcaps.get());
1001         }
1002         // This is going to cause an error which will detach the SourceBuffer and tear down this
1003         // AppendPipeline, so we need the padAddRemove lock released before continuing.
1004         m_track = nullptr;
1005         m_padAddRemoveCondition.notifyOne();
1006         locker.unlockEarly();
1007         didReceiveInitializationSegment();
1008         return;
1009     default:
1010         // No useful data, but notify anyway to complete the append operation.
1011         GST_DEBUG("Received all pending samples (no data)");
1012         m_sourceBufferPrivate->didReceiveAllPendingSamples();
1013         break;
1014     }
1015
1016     m_padAddRemoveCondition.notifyOne();
1017 }
1018
1019 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad* demuxerSrcPad)
1020 {
1021     // Must be done in the thread we were called from (usually streaming thread).
1022     if (!gst_pad_is_linked(demuxerSrcPad)) {
1023         gulong probeId = GPOINTER_TO_ULONG(g_object_get_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId"));
1024         if (probeId) {
1025             GST_DEBUG("Disconnecting black hole probe.");
1026             g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", nullptr);
1027             gst_pad_remove_probe(demuxerSrcPad, probeId);
1028         } else
1029             GST_WARNING("Not disconnecting demuxer src pad because it wasn't linked");
1030         return;
1031     }
1032
1033     GST_DEBUG("Disconnecting appsink");
1034
1035 #if ENABLE(LEGACY_ENCRYPTED_MEDIA)
1036     if (m_decryptor) {
1037         gst_element_unlink(m_decryptor.get(), m_appsink.get());
1038         gst_element_unlink(m_demux.get(), m_decryptor.get());
1039         gst_element_set_state(m_decryptor.get(), GST_STATE_NULL);
1040         gst_bin_remove(GST_BIN(m_pipeline.get()), m_decryptor.get());
1041     } else
1042 #endif
1043         gst_element_unlink(m_demux.get(), m_appsink.get());
1044 }
1045
1046 static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
1047 {
1048     GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
1049     GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
1050     gst_bus_post(appendPipeline->bus(), message);
1051     GST_TRACE("appsink-caps-changed message posted to bus");
1052 }
1053
1054 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)
1055 {
1056     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1057
1058     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1059     gsize bufferSize = gst_buffer_get_size(buffer);
1060
1061     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);
1062
1063     appendPipeline->reportAppsrcAtLeastABufferLeft();
1064
1065     return GST_PAD_PROBE_OK;
1066 }
1067
1068 #if !LOG_DISABLED
1069 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
1070 {
1071     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1072     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1073     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
1074     return GST_PAD_PROBE_OK;
1075 }
1076 #endif
1077
1078 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
1079 {
1080     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1081     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1082     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
1083     return GST_PAD_PROBE_DROP;
1084 }
1085
1086 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)
1087 {
1088     appendPipeline->reportAppsrcNeedDataReceived();
1089 }
1090
1091 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1092 {
1093     appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
1094 }
1095
1096 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1097 {
1098     appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
1099 }
1100
1101 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
1102 {
1103     return appendPipeline->handleNewAppsinkSample(appsink);
1104 }
1105
1106 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
1107 {
1108     if (WTF::isMainThread())
1109         appendPipeline->appsinkEOS();
1110     else {
1111         GstStructure* structure = gst_structure_new_empty("appsink-eos");
1112         GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
1113         gst_bus_post(appendPipeline->bus(), message);
1114         GST_TRACE("appsink-eos message posted to bus");
1115     }
1116
1117     GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
1118 }
1119
1120
1121
1122 } // namespace WebCore.
1123
1124 #endif // USE(GSTREAMER)