597964e266d9fc12059755bb8870b09eab507f48
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016 Metrological Group B.V.
3  * Copyright (C) 2016 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GRefPtrGStreamer.h"
28 #include "GStreamerMediaDescription.h"
29 #include "GStreamerMediaSample.h"
30 #include "GStreamerUtilities.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35
36 #include <gst/app/gstappsink.h>
37 #include <gst/app/gstappsrc.h>
38 #include <gst/gst.h>
39 #include <gst/pbutils/pbutils.h>
40 #include <gst/video/video.h>
41 #include <wtf/Condition.h>
42
43 namespace WebCore {
44
45 static const char* dumpAppendState(AppendPipeline::AppendState appendState)
46 {
47     switch (appendState) {
48     case AppendPipeline::AppendState::Invalid:
49         return "Invalid";
50     case AppendPipeline::AppendState::NotStarted:
51         return "NotStarted";
52     case AppendPipeline::AppendState::Ongoing:
53         return "Ongoing";
54     case AppendPipeline::AppendState::KeyNegotiation:
55         return "KeyNegotiation";
56     case AppendPipeline::AppendState::DataStarve:
57         return "DataStarve";
58     case AppendPipeline::AppendState::Sampling:
59         return "Sampling";
60     case AppendPipeline::AppendState::LastSample:
61         return "LastSample";
62     case AppendPipeline::AppendState::Aborting:
63         return "Aborting";
64     default:
65         return "(unknown)";
66     }
67 }
68
69 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline*);
70 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad*, AppendPipeline*);
71 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline*);
72 static void appendPipelineAppsinkCapsChanged(GObject*, GParamSpec*, AppendPipeline*);
73 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo*, AppendPipeline*);
74 #if !LOG_DISABLED
75 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
76 #endif
77 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement*, AppendPipeline*);
78 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline*);
79
80 static void appendPipelineElementMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
81 {
82     appendPipeline->handleElementMessage(message);
83 }
84
85 static void appendPipelineApplicationMessageCallback(GstBus*, GstMessage* message, AppendPipeline* appendPipeline)
86 {
87     appendPipeline->handleApplicationMessage(message);
88 }
89
90 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
91     : m_mediaSourceClient(mediaSourceClient.get())
92     , m_sourceBufferPrivate(sourceBufferPrivate.get())
93     , m_playerPrivate(&playerPrivate)
94     , m_id(0)
95     , m_appsrcAtLeastABufferLeft(false)
96     , m_appsrcNeedDataReceived(false)
97     , m_appsrcDataLeavingProbeId(0)
98     , m_appendState(AppendState::NotStarted)
99     , m_abortPending(false)
100     , m_streamType(Unknown)
101 {
102     ASSERT(WTF::isMainThread());
103
104     GST_TRACE("Creating AppendPipeline (%p)", this);
105
106     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
107     // The track name is still unknown at this time, though.
108     m_pipeline = adoptGRef(gst_pipeline_new(nullptr));
109
110     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
111     gst_bus_add_signal_watch(m_bus.get());
112     gst_bus_enable_sync_message_emission(m_bus.get());
113
114     g_signal_connect(m_bus.get(), "sync-message::element", G_CALLBACK(appendPipelineElementMessageCallback), this);
115     g_signal_connect(m_bus.get(), "message::application", G_CALLBACK(appendPipelineApplicationMessageCallback), this);
116
117     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
118     // below will already take the initial reference and we need an additional one for us.
119     m_appsrc = gst_element_factory_make("appsrc", nullptr);
120     m_demux = gst_element_factory_make("qtdemux", nullptr);
121     m_appsink = gst_element_factory_make("appsink", nullptr);
122
123     g_object_set(G_OBJECT(m_demux.get()), "always-honor-tfdt", TRUE, nullptr);
124     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
125     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
126
127     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
128     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(appendPipelineAppsinkCapsChanged), this);
129
130     setAppsrcDataLeavingProbe();
131
132 #if !LOG_DISABLED
133     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
134     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
135     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
136     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
137     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
138     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
139     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
140 #endif
141
142     // These signals won't be connected outside of the lifetime of "this".
143     g_signal_connect(m_appsrc.get(), "need-data", G_CALLBACK(appendPipelineAppsrcNeedData), this);
144     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(appendPipelineDemuxerPadAdded), this);
145     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(appendPipelineDemuxerPadRemoved), this);
146     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(appendPipelineAppsinkNewSample), this);
147     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(appendPipelineAppsinkEOS), this);
148
149     // Add_many will take ownership of a reference. That's why we used an assignment before.
150     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
151     gst_element_link(m_appsrc.get(), m_demux.get());
152
153     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
154 };
155
156 AppendPipeline::~AppendPipeline()
157 {
158     ASSERT(WTF::isMainThread());
159
160     {
161         LockHolder locker(m_newSampleLock);
162         setAppendState(AppendState::Invalid);
163         m_newSampleCondition.notifyOne();
164     }
165
166     {
167         LockHolder locker(m_padAddRemoveLock);
168         m_playerPrivate = nullptr;
169         m_padAddRemoveCondition.notifyOne();
170     }
171
172     GST_TRACE("Destroying AppendPipeline (%p)", this);
173
174     // FIXME: Maybe notify appendComplete here?
175
176     if (m_pipeline) {
177         ASSERT(m_bus);
178         gst_bus_remove_signal_watch(m_bus.get());
179         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
180         m_pipeline = nullptr;
181     }
182
183     if (m_appsrc) {
184         removeAppsrcDataLeavingProbe();
185         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
186         m_appsrc = nullptr;
187     }
188
189     if (m_demux) {
190 #if !LOG_DISABLED
191         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
192         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
193 #endif
194
195         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
196         m_demux = nullptr;
197     }
198
199     if (m_appsink) {
200         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
201         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
202         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
203
204 #if !LOG_DISABLED
205         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
206 #endif
207
208         m_appsink = nullptr;
209     }
210
211     m_appsinkCaps = nullptr;
212     m_demuxerSrcPadCaps = nullptr;
213 };
214
215 void AppendPipeline::clearPlayerPrivate()
216 {
217     ASSERT(WTF::isMainThread());
218     GST_DEBUG("cleaning private player");
219
220     {
221         LockHolder locker(m_newSampleLock);
222         // Make sure that AppendPipeline won't process more data from now on and
223         // instruct handleNewSample to abort itself from now on as well.
224         setAppendState(AppendState::Invalid);
225
226         // Awake any pending handleNewSample operation in the streaming thread.
227         m_newSampleCondition.notifyOne();
228     }
229
230     {
231         LockHolder locker(m_padAddRemoveLock);
232         m_playerPrivate = nullptr;
233         m_padAddRemoveCondition.notifyOne();
234     }
235
236     // And now that no handleNewSample operations will remain stalled waiting
237     // for the main thread, stop the pipeline.
238     if (m_pipeline)
239         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
240 }
241
242 void AppendPipeline::handleElementMessage(GstMessage* message)
243 {
244     ASSERT(WTF::isMainThread());
245
246     const GstStructure* structure = gst_message_get_structure(message);
247     if (gst_structure_has_name(structure, "drm-key-needed"))
248         setAppendState(AppendPipeline::AppendState::KeyNegotiation);
249
250     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
251     if (m_playerPrivate)
252         m_playerPrivate->handleSyncMessage(message);
253 }
254
255 void AppendPipeline::handleApplicationMessage(GstMessage* message)
256 {
257     ASSERT(WTF::isMainThread());
258
259     const GstStructure* structure = gst_message_get_structure(message);
260
261     if (gst_structure_has_name(structure, "appsrc-need-data")) {
262         handleAppsrcNeedDataReceived();
263         return;
264     }
265
266     if (gst_structure_has_name(structure, "appsrc-buffer-left")) {
267         handleAppsrcAtLeastABufferLeft();
268         return;
269     }
270
271     if (gst_structure_has_name(structure, "demuxer-connect-to-appsink")) {
272         GRefPtr<GstPad> demuxerSrcPad;
273         gst_structure_get(structure, "demuxer-src-pad", G_TYPE_OBJECT, &demuxerSrcPad.outPtr(), nullptr);
274         ASSERT(demuxerSrcPad);
275         connectDemuxerSrcPadToAppsink(demuxerSrcPad.get());
276         return;
277     }
278
279     if (gst_structure_has_name(structure, "appsink-caps-changed")) {
280         appsinkCapsChanged();
281         return;
282     }
283
284     if (gst_structure_has_name(structure, "appsink-new-sample")) {
285         GRefPtr<GstSample> newSample;
286         gst_structure_get(structure, "new-sample", GST_TYPE_SAMPLE, &newSample.outPtr(), nullptr);
287
288         appsinkNewSample(newSample.get());
289         return;
290     }
291
292     if (gst_structure_has_name(structure, "appsink-eos")) {
293         appsinkEOS();
294         return;
295     }
296
297     ASSERT_NOT_REACHED();
298 }
299
300 void AppendPipeline::handleAppsrcNeedDataReceived()
301 {
302     if (!m_appsrcAtLeastABufferLeft) {
303         GST_TRACE("discarding until at least a buffer leaves appsrc");
304         return;
305     }
306
307     ASSERT(m_appendState == AppendState::Ongoing || m_appendState == AppendState::Sampling);
308     ASSERT(!m_appsrcNeedDataReceived);
309
310     GST_TRACE("received need-data from appsrc");
311
312     m_appsrcNeedDataReceived = true;
313     checkEndOfAppend();
314 }
315
316 void AppendPipeline::handleAppsrcAtLeastABufferLeft()
317 {
318     m_appsrcAtLeastABufferLeft = true;
319     GST_TRACE("received buffer-left from appsrc");
320 #if LOG_DISABLED
321     removeAppsrcDataLeavingProbe();
322 #endif
323 }
324
325 gint AppendPipeline::id()
326 {
327     ASSERT(WTF::isMainThread());
328
329     if (m_id)
330         return m_id;
331
332     static gint s_totalAudio = 0;
333     static gint s_totalVideo = 0;
334     static gint s_totalText = 0;
335
336     switch (m_streamType) {
337     case Audio:
338         m_id = ++s_totalAudio;
339         break;
340     case Video:
341         m_id = ++s_totalVideo;
342         break;
343     case Text:
344         m_id = ++s_totalText;
345         break;
346     case Unknown:
347     case Invalid:
348         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
349         ASSERT_NOT_REACHED();
350         break;
351     }
352
353     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
354
355     return m_id;
356 }
357
358 void AppendPipeline::setAppendState(AppendState newAppendState)
359 {
360     ASSERT(WTF::isMainThread());
361     // Valid transitions:
362     // NotStarted-->Ongoing-->DataStarve-->NotStarted
363     //           |         |            `->Aborting-->NotStarted
364     //           |         `->Sampling-ยทยทยท->Sampling-->LastSample-->NotStarted
365     //           |         |                                     `->Aborting-->NotStarted
366     //           |         `->KeyNegotiation-->Ongoing-->[...]
367     //           `->Aborting-->NotStarted
368     AppendState oldAppendState = m_appendState;
369     AppendState nextAppendState = AppendState::Invalid;
370
371     if (oldAppendState != newAppendState)
372         GST_TRACE("%s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
373
374     bool ok = false;
375
376     switch (oldAppendState) {
377     case AppendState::NotStarted:
378         switch (newAppendState) {
379         case AppendState::Ongoing:
380             ok = true;
381             gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
382             break;
383         case AppendState::NotStarted:
384             ok = true;
385             if (m_pendingBuffer) {
386                 GST_TRACE("pushing pending buffer %p", m_pendingBuffer.get());
387                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
388                 nextAppendState = AppendState::Ongoing;
389             }
390             break;
391         case AppendState::Aborting:
392             ok = true;
393             nextAppendState = AppendState::NotStarted;
394             break;
395         case AppendState::Invalid:
396             ok = true;
397             break;
398         default:
399             break;
400         }
401         break;
402     case AppendState::KeyNegotiation:
403         switch (newAppendState) {
404         case AppendState::Ongoing:
405         case AppendState::Invalid:
406             ok = true;
407             break;
408         default:
409             break;
410         }
411         break;
412     case AppendState::Ongoing:
413         switch (newAppendState) {
414         case AppendState::KeyNegotiation:
415         case AppendState::Sampling:
416         case AppendState::Invalid:
417             ok = true;
418             break;
419         case AppendState::DataStarve:
420             ok = true;
421             GST_DEBUG("received all pending samples");
422             m_sourceBufferPrivate->didReceiveAllPendingSamples();
423             if (m_abortPending)
424                 nextAppendState = AppendState::Aborting;
425             else
426                 nextAppendState = AppendState::NotStarted;
427             break;
428         default:
429             break;
430         }
431         break;
432     case AppendState::DataStarve:
433         switch (newAppendState) {
434         case AppendState::NotStarted:
435         case AppendState::Invalid:
436             ok = true;
437             break;
438         case AppendState::Aborting:
439             ok = true;
440             nextAppendState = AppendState::NotStarted;
441             break;
442         default:
443             break;
444         }
445         break;
446     case AppendState::Sampling:
447         switch (newAppendState) {
448         case AppendState::Sampling:
449         case AppendState::Invalid:
450             ok = true;
451             break;
452         case AppendState::LastSample:
453             ok = true;
454             GST_DEBUG("received all pending samples");
455             m_sourceBufferPrivate->didReceiveAllPendingSamples();
456             if (m_abortPending)
457                 nextAppendState = AppendState::Aborting;
458             else
459                 nextAppendState = AppendState::NotStarted;
460             break;
461         default:
462             break;
463         }
464         break;
465     case AppendState::LastSample:
466         switch (newAppendState) {
467         case AppendState::NotStarted:
468         case AppendState::Invalid:
469             ok = true;
470             break;
471         case AppendState::Aborting:
472             ok = true;
473             nextAppendState = AppendState::NotStarted;
474             break;
475         default:
476             break;
477         }
478         break;
479     case AppendState::Aborting:
480         switch (newAppendState) {
481         case AppendState::NotStarted:
482             ok = true;
483             resetPipeline();
484             m_abortPending = false;
485             nextAppendState = AppendState::NotStarted;
486             break;
487         case AppendState::Invalid:
488             ok = true;
489             break;
490         default:
491             break;
492         }
493         break;
494     case AppendState::Invalid:
495         ok = true;
496         break;
497     }
498
499     if (ok)
500         m_appendState = newAppendState;
501     else
502         GST_ERROR("Invalid append state transition %s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
503
504     ASSERT(ok);
505
506     if (nextAppendState != AppendState::Invalid)
507         setAppendState(nextAppendState);
508 }
509
510 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
511 {
512     ASSERT(WTF::isMainThread());
513
514     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
515     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
516
517     GstStructure* structure = gst_caps_get_structure(m_demuxerSrcPadCaps.get(), 0);
518     bool sizeConfigured = false;
519
520 #if GST_CHECK_VERSION(1, 5, 3) && ENABLE(LEGACY_ENCRYPTED_MEDIA)
521     if (gst_structure_has_name(structure, "application/x-cenc")) {
522         // Any previous decryptor should have been removed from the pipeline by disconnectFromAppSinkFromStreamingThread()
523         ASSERT(!m_decryptor);
524
525         m_decryptor = adoptGRef(WebCore::createGstDecryptor(gst_structure_get_string(structure, "protection-system")));
526         if (!m_decryptor) {
527             GST_ERROR("decryptor not found for caps: %" GST_PTR_FORMAT, m_demuxerSrcPadCaps.get());
528             return;
529         }
530
531         const gchar* originalMediaType = gst_structure_get_string(structure, "original-media-type");
532
533         if (g_str_has_prefix(originalMediaType, "video/")) {
534             int width = 0;
535             int height = 0;
536             float finalHeight = 0;
537
538             if (gst_structure_get_int(structure, "width", &width) && gst_structure_get_int(structure, "height", &height)) {
539                 int ratioNumerator = 1;
540                 int ratioDenominator = 1;
541
542                 gst_structure_get_fraction(structure, "pixel-aspect-ratio", &ratioNumerator, &ratioDenominator);
543                 finalHeight = height * ((float) ratioDenominator / (float) ratioNumerator);
544             }
545
546             m_presentationSize = WebCore::FloatSize(width, finalHeight);
547             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
548         } else {
549             m_presentationSize = WebCore::FloatSize();
550             if (g_str_has_prefix(originalMediaType, "audio/"))
551                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
552             else if (g_str_has_prefix(originalMediaType, "text/"))
553                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
554         }
555         sizeConfigured = true;
556     }
557 #endif
558
559     if (!sizeConfigured) {
560         const char* structureName = gst_structure_get_name(structure);
561         GstVideoInfo info;
562
563         if (g_str_has_prefix(structureName, "video/") && gst_video_info_from_caps(&info, demuxerSrcPadCaps)) {
564             float width, height;
565
566             width = info.width;
567             height = info.height * ((float) info.par_d / (float) info.par_n);
568
569             m_presentationSize = WebCore::FloatSize(width, height);
570             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
571         } else {
572             m_presentationSize = WebCore::FloatSize();
573             if (g_str_has_prefix(structureName, "audio/"))
574                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
575             else if (g_str_has_prefix(structureName, "text/"))
576                 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
577         }
578     }
579 }
580
581 void AppendPipeline::appsinkCapsChanged()
582 {
583     ASSERT(WTF::isMainThread());
584
585     if (!m_appsink)
586         return;
587
588     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
589     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
590
591     if (!caps)
592         return;
593
594     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
595     bool previousCapsWereNull = !m_appsinkCaps;
596
597     if (gst_caps_replace(&m_appsinkCaps.outPtr(), caps.get())) {
598         if (m_playerPrivate && previousCapsWereNull)
599             m_playerPrivate->trackDetected(this, m_oldTrack, m_track);
600         didReceiveInitializationSegment();
601         gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
602     }
603 }
604
605 void AppendPipeline::checkEndOfAppend()
606 {
607     ASSERT(WTF::isMainThread());
608
609     if (!m_appsrcNeedDataReceived || (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling))
610         return;
611
612     GST_TRACE("end of append data mark was received");
613
614     switch (m_appendState) {
615     case AppendState::Ongoing:
616         GST_TRACE("DataStarve");
617         m_appsrcNeedDataReceived = false;
618         setAppendState(AppendState::DataStarve);
619         break;
620     case AppendState::Sampling:
621         GST_TRACE("LastSample");
622         m_appsrcNeedDataReceived = false;
623         setAppendState(AppendState::LastSample);
624         break;
625     default:
626         ASSERT_NOT_REACHED();
627         break;
628     }
629 }
630
631 void AppendPipeline::appsinkNewSample(GstSample* sample)
632 {
633     ASSERT(WTF::isMainThread());
634
635     {
636         LockHolder locker(m_newSampleLock);
637
638         // Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
639         if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
640             GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
641             // FIXME: Return ERROR and find a more robust way to detect that all the
642             // data has been processed, so we don't need to resort to these hacks.
643             // All in all, return OK, even if it's not the proper thing to do. We don't want to break the demuxer.
644             m_flowReturn = GST_FLOW_OK;
645             m_newSampleCondition.notifyOne();
646             return;
647         }
648
649         RefPtr<GStreamerMediaSample> mediaSample = WebCore::GStreamerMediaSample::create(sample, m_presentationSize, trackId());
650
651         GST_TRACE("append: trackId=%s PTS=%f presentationSize=%.0fx%.0f", mediaSample->trackID().string().utf8().data(), mediaSample->presentationTime().toFloat(), mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
652
653         // If we're beyond the duration, ignore this sample and the remaining ones.
654         MediaTime duration = m_mediaSourceClient->duration();
655         if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
656             GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
657             setAppendState(AppendState::LastSample);
658             m_flowReturn = GST_FLOW_OK;
659             m_newSampleCondition.notifyOne();
660             return;
661         }
662
663         // Add a gap sample if a gap is detected before the first sample.
664         if (mediaSample->decodeTime() == MediaTime::zeroTime()
665             && mediaSample->presentationTime() > MediaTime::zeroTime()
666             && mediaSample->presentationTime() <= MediaTime::createWithDouble(0.1)) {
667             GST_DEBUG("Adding gap offset");
668             mediaSample->applyPtsOffset(MediaTime::zeroTime());
669         }
670
671         m_sourceBufferPrivate->didReceiveSample(mediaSample);
672         setAppendState(AppendState::Sampling);
673         m_flowReturn = GST_FLOW_OK;
674         m_newSampleCondition.notifyOne();
675     }
676
677     checkEndOfAppend();
678 }
679
680 void AppendPipeline::appsinkEOS()
681 {
682     ASSERT(WTF::isMainThread());
683
684     switch (m_appendState) {
685     case AppendState::Aborting:
686         // Ignored. Operation completion will be managed by the Aborting->NotStarted transition.
687         return;
688     case AppendState::Ongoing:
689         // Finish Ongoing and Sampling states.
690         setAppendState(AppendState::DataStarve);
691         break;
692     case AppendState::Sampling:
693         setAppendState(AppendState::LastSample);
694         break;
695     default:
696         GST_DEBUG("Unexpected EOS");
697         break;
698     }
699 }
700
701 void AppendPipeline::didReceiveInitializationSegment()
702 {
703     ASSERT(WTF::isMainThread());
704
705     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
706
707     GST_DEBUG("Notifying SourceBuffer for track %s", m_track->id().string().utf8().data());
708     initializationSegment.duration = m_mediaSourceClient->duration();
709     switch (m_streamType) {
710     case Audio: {
711         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
712         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
713         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
714         initializationSegment.audioTracks.append(info);
715         break;
716     }
717     case Video: {
718         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
719         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
720         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
721         initializationSegment.videoTracks.append(info);
722         break;
723     }
724     default:
725         GST_ERROR("Unsupported or unknown stream type");
726         ASSERT_NOT_REACHED();
727         break;
728     }
729
730     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
731 }
732
733 AtomicString AppendPipeline::trackId()
734 {
735     ASSERT(WTF::isMainThread());
736
737     if (!m_track)
738         return AtomicString();
739
740     return m_track->id();
741 }
742
743 void AppendPipeline::resetPipeline()
744 {
745     ASSERT(WTF::isMainThread());
746     GST_DEBUG("resetting pipeline");
747     m_appsrcAtLeastABufferLeft = false;
748     setAppsrcDataLeavingProbe();
749
750     {
751         LockHolder locker(m_newSampleLock);
752         m_newSampleCondition.notifyOne();
753         gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
754         gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
755     }
756
757 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
758     {
759         static unsigned i = 0;
760         // This is here for debugging purposes. It does not make sense to have it as class member.
761         WTF::String  dotFileName = String::format("reset-pipeline-%d", ++i);
762         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
763     }
764 #endif
765
766 }
767
768 void AppendPipeline::setAppsrcDataLeavingProbe()
769 {
770     if (m_appsrcDataLeavingProbeId)
771         return;
772
773     GST_TRACE("setting appsrc data leaving probe");
774
775     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
776     m_appsrcDataLeavingProbeId = gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsrcDataLeaving), this, nullptr);
777 }
778
779 void AppendPipeline::removeAppsrcDataLeavingProbe()
780 {
781     if (!m_appsrcDataLeavingProbeId)
782         return;
783
784     GST_TRACE("removing appsrc data leaving probe");
785
786     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
787     gst_pad_remove_probe(appsrcPad.get(), m_appsrcDataLeavingProbeId);
788     m_appsrcDataLeavingProbeId = 0;
789 }
790
791 void AppendPipeline::abort()
792 {
793     ASSERT(WTF::isMainThread());
794     GST_DEBUG("aborting");
795
796     m_pendingBuffer = nullptr;
797
798     // Abort already ongoing.
799     if (m_abortPending)
800         return;
801
802     m_abortPending = true;
803     if (m_appendState == AppendState::NotStarted)
804         setAppendState(AppendState::Aborting);
805     // Else, the automatic state transitions will take care when the ongoing append finishes.
806 }
807
808 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
809 {
810     GstFlowReturn result;
811
812     if (m_abortPending) {
813         m_pendingBuffer = adoptGRef(buffer);
814         result = GST_FLOW_OK;
815     } else {
816         setAppendState(AppendPipeline::AppendState::Ongoing);
817         GST_TRACE("pushing new buffer %p", buffer);
818         result = gst_app_src_push_buffer(GST_APP_SRC(appsrc()), buffer);
819     }
820
821     return result;
822 }
823
824 void AppendPipeline::reportAppsrcAtLeastABufferLeft()
825 {
826     GST_TRACE("buffer left appsrc, reposting to bus");
827     GstStructure* structure = gst_structure_new_empty("appsrc-buffer-left");
828     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
829     gst_bus_post(m_bus.get(), message);
830 }
831
832 void AppendPipeline::reportAppsrcNeedDataReceived()
833 {
834     GST_TRACE("received need-data signal at appsrc, reposting to bus");
835     GstStructure* structure = gst_structure_new_empty("appsrc-need-data");
836     GstMessage* message = gst_message_new_application(GST_OBJECT(m_appsrc.get()), structure);
837     gst_bus_post(m_bus.get(), message);
838 }
839
840 GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
841 {
842     ASSERT(!WTF::isMainThread());
843
844     // Even if we're disabled, it's important to pull the sample out anyway to
845     // avoid deadlocks when changing to GST_STATE_NULL having a non empty appsink.
846     GRefPtr<GstSample> sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(appsink)));
847     LockHolder locker(m_newSampleLock);
848
849     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
850         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
851         return GST_FLOW_ERROR;
852     }
853
854     GstStructure* structure = gst_structure_new("appsink-new-sample", "new-sample", GST_TYPE_SAMPLE, sample.get(), nullptr);
855     GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
856     gst_bus_post(m_bus.get(), message);
857     GST_TRACE("appsink-new-sample message posted to bus");
858
859     m_newSampleCondition.wait(m_newSampleLock);
860     // We've been awaken because the sample was processed or because of
861     // an exceptional condition (entered in Invalid state, destructor, etc.).
862     // We can't reliably delete info here, appendPipelineAppsinkNewSampleMainThread will do it.
863
864     return m_flowReturn;
865 }
866
867 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromAnyThread(GstPad* demuxerSrcPad)
868 {
869     if (!m_appsink)
870         return;
871
872     GST_DEBUG("connecting to appsink");
873
874     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
875
876     // Only one stream per demuxer is supported.
877     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
878
879     gint64 timeLength = 0;
880     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
881         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
882         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
883     else
884         m_initialDuration = MediaTime::positiveInfiniteTime();
885
886     if (WTF::isMainThread())
887         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
888     else {
889         // Call connectDemuxerSrcPadToAppsink() in the main thread and wait.
890         LockHolder locker(m_padAddRemoveLock);
891         if (!m_playerPrivate)
892             return;
893
894         GstStructure* structure = gst_structure_new("demuxer-connect-to-appsink", "demuxer-src-pad", G_TYPE_OBJECT, demuxerSrcPad, nullptr);
895         GstMessage* message = gst_message_new_application(GST_OBJECT(m_demux.get()), structure);
896         gst_bus_post(m_bus.get(), message);
897         GST_TRACE("demuxer-connect-to-appsink message posted to bus");
898
899         m_padAddRemoveCondition.wait(m_padAddRemoveLock);
900     }
901
902     // Must be done in the thread we were called from (usually streaming thread).
903     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
904         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
905         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
906
907     if (isData) {
908         // FIXME: Only add appsink one time. This method can be called several times.
909         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
910         if (!parent)
911             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
912
913 #if ENABLE(LEGACY_ENCRYPTED_MEDIA)
914         if (m_decryptor) {
915             gst_object_ref(m_decryptor.get());
916             gst_bin_add(GST_BIN(m_pipeline.get()), m_decryptor.get());
917
918             GRefPtr<GstPad> decryptorSinkPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "sink"));
919             gst_pad_link(demuxerSrcPad, decryptorSinkPad.get());
920
921             GRefPtr<GstPad> decryptorSrcPad = adoptGRef(gst_element_get_static_pad(m_decryptor.get(), "src"));
922             gst_pad_link(decryptorSrcPad.get(), appsinkSinkPad.get());
923
924             gst_element_sync_state_with_parent(m_appsink.get());
925             gst_element_sync_state_with_parent(m_decryptor.get());
926         } else {
927 #endif
928             gst_pad_link(demuxerSrcPad, appsinkSinkPad.get());
929             gst_element_sync_state_with_parent(m_appsink.get());
930 #if ENABLE(LEGACY_ENCRYPTED_MEDIA)
931         }
932 #endif
933         gst_element_set_state(m_pipeline.get(), GST_STATE_PAUSED);
934     }
935 }
936
937 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
938 {
939     ASSERT(WTF::isMainThread());
940     GST_DEBUG("Connecting to appsink");
941
942     LockHolder locker(m_padAddRemoveLock);
943     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
944
945     // Only one stream per demuxer is supported.
946     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
947
948     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
949
950     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
951         m_padAddRemoveCondition.notifyOne();
952         return;
953     }
954
955 #ifndef GST_DISABLE_GST_DEBUG
956     {
957         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
958         GST_DEBUG("%s", strcaps.get());
959     }
960 #endif
961
962     if (m_initialDuration > m_mediaSourceClient->duration())
963         m_mediaSourceClient->durationChanged(m_initialDuration);
964
965     m_oldTrack = m_track;
966
967     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
968
969     switch (m_streamType) {
970     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
971         if (m_playerPrivate)
972             m_track = WebCore::AudioTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
973         break;
974     case WebCore::MediaSourceStreamTypeGStreamer::Video:
975         if (m_playerPrivate)
976             m_track = WebCore::VideoTrackPrivateGStreamer::create(m_playerPrivate->pipeline(), id(), sinkSinkPad.get());
977         break;
978     case WebCore::MediaSourceStreamTypeGStreamer::Text:
979         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
980         break;
981     default:
982         // No useful data, but notify anyway to complete the append operation.
983         GST_DEBUG("Received all pending samples (no data)");
984         m_sourceBufferPrivate->didReceiveAllPendingSamples();
985         break;
986     }
987
988     m_padAddRemoveCondition.notifyOne();
989 }
990
991 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread()
992 {
993     GST_DEBUG("Disconnecting appsink");
994
995     // Must be done in the thread we were called from (usually streaming thread).
996 #if ENABLE(LEGACY_ENCRYPTED_MEDIA)
997     if (m_decryptor) {
998         gst_element_unlink(m_decryptor.get(), m_appsink.get());
999         gst_element_unlink(m_demux.get(), m_decryptor.get());
1000         gst_element_set_state(m_decryptor.get(), GST_STATE_NULL);
1001         gst_bin_remove(GST_BIN(m_pipeline.get()), m_decryptor.get());
1002     } else
1003 #endif
1004         gst_element_unlink(m_demux.get(), m_appsink.get());
1005 }
1006
1007 static void appendPipelineAppsinkCapsChanged(GObject* appsinkPad, GParamSpec*, AppendPipeline* appendPipeline)
1008 {
1009     GstStructure* structure = gst_structure_new_empty("appsink-caps-changed");
1010     GstMessage* message = gst_message_new_application(GST_OBJECT(appsinkPad), structure);
1011     gst_bus_post(appendPipeline->bus(), message);
1012     GST_TRACE("appsink-caps-changed message posted to bus");
1013 }
1014
1015 static GstPadProbeReturn appendPipelineAppsrcDataLeaving(GstPad*, GstPadProbeInfo* info, AppendPipeline* appendPipeline)
1016 {
1017     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1018
1019     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1020     gsize bufferSize = gst_buffer_get_size(buffer);
1021
1022     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " going thru", bufferSize);
1023
1024     appendPipeline->reportAppsrcAtLeastABufferLeft();
1025
1026     return GST_PAD_PROBE_OK;
1027 }
1028
1029 #if !LOG_DISABLED
1030 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
1031 {
1032     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1033     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1034     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
1035     return GST_PAD_PROBE_OK;
1036 }
1037 #endif
1038
1039 static void appendPipelineAppsrcNeedData(GstAppSrc*, guint, AppendPipeline* appendPipeline)
1040 {
1041     appendPipeline->reportAppsrcNeedDataReceived();
1042 }
1043
1044 static void appendPipelineDemuxerPadAdded(GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline)
1045 {
1046     appendPipeline->connectDemuxerSrcPadToAppsinkFromAnyThread(demuxerSrcPad);
1047 }
1048
1049 static void appendPipelineDemuxerPadRemoved(GstElement*, GstPad*, AppendPipeline* appendPipeline)
1050 {
1051     appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread();
1052 }
1053
1054 static GstFlowReturn appendPipelineAppsinkNewSample(GstElement* appsink, AppendPipeline* appendPipeline)
1055 {
1056     return appendPipeline->handleNewAppsinkSample(appsink);
1057 }
1058
1059 static void appendPipelineAppsinkEOS(GstElement*, AppendPipeline* appendPipeline)
1060 {
1061     if (WTF::isMainThread())
1062         appendPipeline->appsinkEOS();
1063     else {
1064         GstStructure* structure = gst_structure_new_empty("appsink-eos");
1065         GstMessage* message = gst_message_new_application(GST_OBJECT(appendPipeline->appsink()), structure);
1066         gst_bus_post(appendPipeline->bus(), message);
1067         GST_TRACE("appsink-eos message posted to bus");
1068     }
1069
1070     GST_DEBUG("%s main thread", (WTF::isMainThread()) ? "Is" : "Not");
1071 }
1072
1073
1074
1075 } // namespace WebCore.
1076
1077 #endif // USE(GSTREAMER)