[MSE][GStreamer] Introduce AbortableTaskQueue
[WebKit-https.git] / Source / WebCore / platform / graphics / gstreamer / mse / AppendPipeline.cpp
1 /*
2  * Copyright (C) 2016, 2017 Metrological Group B.V.
3  * Copyright (C) 2016, 2017 Igalia S.L
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public License
16  * aint with this library; see the file COPYING.LIB.  If not, write to
17  * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 #include "config.h"
22 #include "AppendPipeline.h"
23
24 #if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26 #include "AudioTrackPrivateGStreamer.h"
27 #include "GStreamerCommon.h"
28 #include "GStreamerEMEUtilities.h"
29 #include "GStreamerMediaDescription.h"
30 #include "MediaSampleGStreamer.h"
31 #include "InbandTextTrackPrivateGStreamer.h"
32 #include "MediaDescription.h"
33 #include "SourceBufferPrivateGStreamer.h"
34 #include "VideoTrackPrivateGStreamer.h"
35 #include <functional>
36 #include <gst/app/gstappsink.h>
37 #include <gst/app/gstappsrc.h>
38 #include <gst/gst.h>
39 #include <gst/pbutils/pbutils.h>
40 #include <gst/video/video.h>
41 #include <wtf/Condition.h>
42 #include <wtf/glib/GLibUtilities.h>
43 #include <wtf/glib/RunLoopSourcePriority.h>
44
45 GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
46 #define GST_CAT_DEFAULT webkit_mse_debug
47
48 namespace WebCore {
49
50 GType AppendPipeline::s_endOfAppendMetaType = 0;
51 const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr;
52 std::once_flag AppendPipeline::s_staticInitializationFlag;
53
54 struct EndOfAppendMeta {
55     GstMeta base;
56     static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; }
57     static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); }
58     static void free(GstMeta*, GstBuffer*) { }
59 };
60
61 void AppendPipeline::staticInitialization()
62 {
63     ASSERT(isMainThread());
64
65     const char* tags[] = { nullptr };
66     s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
67     s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
68 }
69
70 const char* AppendPipeline::dumpAppendState(AppendPipeline::AppendState appendState)
71 {
72     switch (appendState) {
73     case AppendPipeline::AppendState::Invalid:
74         return "Invalid";
75     case AppendPipeline::AppendState::NotStarted:
76         return "NotStarted";
77     case AppendPipeline::AppendState::Ongoing:
78         return "Ongoing";
79     case AppendPipeline::AppendState::DataStarve:
80         return "DataStarve";
81     case AppendPipeline::AppendState::Sampling:
82         return "Sampling";
83     case AppendPipeline::AppendState::LastSample:
84         return "LastSample";
85     case AppendPipeline::AppendState::Aborting:
86         return "Aborting";
87     default:
88         return "(unknown)";
89     }
90 }
91
92 #if !LOG_DISABLED
93 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
94 #endif
95
96 #if ENABLE(ENCRYPTED_MEDIA)
97 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
98 #endif
99
100 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
101
102 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
103
104 AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
105     : m_mediaSourceClient(mediaSourceClient.get())
106     , m_sourceBufferPrivate(sourceBufferPrivate.get())
107     , m_playerPrivate(&playerPrivate)
108     , m_id(0)
109     , m_wasBusAlreadyNotifiedOfAvailableSamples(false)
110     , m_appendState(AppendState::NotStarted)
111     , m_abortPending(false)
112     , m_streamType(Unknown)
113 {
114     ASSERT(isMainThread());
115     std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
116
117     GST_TRACE("Creating AppendPipeline (%p)", this);
118
119     // FIXME: give a name to the pipeline, maybe related with the track it's managing.
120     // The track name is still unknown at this time, though.
121     static size_t appendPipelineCount = 0;
122     String pipelineName = String::format("append-pipeline-%s-%zu",
123         m_sourceBufferPrivate->type().containerType().replace("/", "-").utf8().data(), appendPipelineCount++);
124     m_pipeline = gst_pipeline_new(pipelineName.utf8().data());
125
126     m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
127     gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
128     gst_bus_enable_sync_message_emission(m_bus.get());
129
130     g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
131         appendPipeline->handleNeedContextSyncMessage(message);
132     }), this);
133     g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
134         appendPipeline->handleStateChangeMessage(message);
135     }), this);
136
137     // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
138     // below will already take the initial reference and we need an additional one for us.
139     m_appsrc = gst_element_factory_make("appsrc", nullptr);
140
141     GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
142     gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) {
143         return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo);
144     }, this, nullptr);
145
146     const String& type = m_sourceBufferPrivate->type().containerType();
147     if (type.endsWith("mp4"))
148         m_demux = gst_element_factory_make("qtdemux", nullptr);
149     else if (type.endsWith("webm"))
150         m_demux = gst_element_factory_make("matroskademux", nullptr);
151     else
152         ASSERT_NOT_REACHED();
153
154     m_appsink = gst_element_factory_make("appsink", nullptr);
155
156     gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
157     gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
158     gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
159
160     GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
161     g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) {
162         appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
163             appendPipeline->appsinkCapsChanged();
164         });
165     }), this);
166
167 #if !LOG_DISABLED
168     GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
169     m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
170     m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
171     m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
172     m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
173     m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
174     m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
175 #endif
176
177 #if ENABLE(ENCRYPTED_MEDIA)
178     m_appsinkPadEventProbeInformation.appendPipeline = this;
179     m_appsinkPadEventProbeInformation.description = "appsink event probe";
180     m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr);
181 #endif
182
183     // These signals won't be connected outside of the lifetime of "this".
184     g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
185         appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad);
186     }), this);
187     g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
188         appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
189     }), this);
190     g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
191         ASSERT(!isMainThread());
192         GST_DEBUG("Posting no-more-pads task to main thread");
193         appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
194             appendPipeline->demuxerNoMorePads();
195         });
196     }), this);
197     g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) {
198         appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
199     }), this);
200     g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
201         ASSERT(!isMainThread());
202         GST_DEBUG("Posting appsink-eos task to main thread");
203         appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
204             appendPipeline->appsinkEOS();
205         });
206     }), this);
207
208     // Add_many will take ownership of a reference. That's why we used an assignment before.
209     gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
210     gst_element_link(m_appsrc.get(), m_demux.get());
211
212     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
213 };
214
215 AppendPipeline::~AppendPipeline()
216 {
217     ASSERT(isMainThread());
218
219     setAppendState(AppendState::Invalid);
220     // Forget all pending tasks and unblock the streaming thread if it was blocked.
221     m_taskQueue.startAborting();
222
223     GST_TRACE("Destroying AppendPipeline (%p)", this);
224
225     // FIXME: Maybe notify appendComplete here?
226
227     if (m_pipeline) {
228         ASSERT(m_bus);
229         g_signal_handlers_disconnect_by_data(m_bus.get(), this);
230         gst_bus_disable_sync_message_emission(m_bus.get());
231         gst_bus_remove_signal_watch(m_bus.get());
232         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
233         m_pipeline = nullptr;
234     }
235
236     if (m_appsrc) {
237         g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
238         m_appsrc = nullptr;
239     }
240
241     if (m_demux) {
242 #if !LOG_DISABLED
243         GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
244         gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
245 #endif
246
247         g_signal_handlers_disconnect_by_data(m_demux.get(), this);
248         m_demux = nullptr;
249     }
250
251     if (m_appsink) {
252         GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
253         g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
254         g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
255
256 #if !LOG_DISABLED
257         gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
258 #endif
259
260 #if ENABLE(ENCRYPTED_MEDIA)
261         gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId);
262 #endif
263         m_appsink = nullptr;
264     }
265
266     m_appsinkCaps = nullptr;
267     m_demuxerSrcPadCaps = nullptr;
268 };
269
270 GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
271 {
272     ASSERT(!isMainThread());
273     m_streamingThread = &WTF::Thread::current();
274
275     GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
276     ASSERT(GST_IS_BUFFER(buffer));
277
278     EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType));
279     if (!endOfAppendMeta) {
280         // Normal buffer, nothing to do.
281         return GST_PAD_PROBE_OK;
282     }
283
284     GST_TRACE_OBJECT(m_pipeline.get(), "Posting end-of-append task to the main thread");
285     m_taskQueue.enqueueTask([this]() {
286         handleEndOfAppend();
287     });
288     return GST_PAD_PROBE_DROP;
289 }
290
291 void AppendPipeline::clearPlayerPrivate()
292 {
293     ASSERT(isMainThread());
294     GST_DEBUG("cleaning private player");
295
296     // Make sure that AppendPipeline won't process more data from now on and
297     // instruct handleNewSample to abort itself from now on as well.
298     setAppendState(AppendState::Invalid);
299     m_taskQueue.startAborting();
300
301     // And now that no handleNewSample operations will remain stalled waiting
302     // for the main thread, stop the pipeline.
303     if (m_pipeline)
304         gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
305 }
306
307 void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
308 {
309     const gchar* contextType = nullptr;
310     gst_message_parse_context_type(message, &contextType);
311     GST_TRACE("context type: %s", contextType);
312
313     // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
314     if (m_playerPrivate)
315         m_playerPrivate->handleSyncMessage(message);
316 }
317
318 void AppendPipeline::demuxerNoMorePads()
319 {
320     GST_TRACE("calling didReceiveInitializationSegment");
321     didReceiveInitializationSegment();
322     GST_TRACE("set pipeline to playing");
323     gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
324 }
325
326 void AppendPipeline::handleStateChangeMessage(GstMessage* message)
327 {
328     ASSERT(isMainThread());
329
330     if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
331         GstState currentState, newState;
332         gst_message_parse_state_changed(message, &currentState, &newState, nullptr);
333         CString sourceBufferType = String(m_sourceBufferPrivate->type().raw())
334             .replace("/", "_").replace(" ", "_")
335             .replace("\"", "").replace("\'", "").utf8();
336         CString dotFileName = String::format("webkit-append-%s-%s_%s",
337             sourceBufferType.data(),
338             gst_element_state_get_name(currentState),
339             gst_element_state_get_name(newState)).utf8();
340         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data());
341     }
342 }
343
344 gint AppendPipeline::id()
345 {
346     ASSERT(isMainThread());
347
348     if (m_id)
349         return m_id;
350
351     static gint s_totalAudio = 0;
352     static gint s_totalVideo = 0;
353     static gint s_totalText = 0;
354
355     switch (m_streamType) {
356     case Audio:
357         m_id = ++s_totalAudio;
358         break;
359     case Video:
360         m_id = ++s_totalVideo;
361         break;
362     case Text:
363         m_id = ++s_totalText;
364         break;
365     case Unknown:
366     case Invalid:
367         GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
368         ASSERT_NOT_REACHED();
369         break;
370     }
371
372     GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
373
374     return m_id;
375 }
376
377 void AppendPipeline::setAppendState(AppendState newAppendState)
378 {
379     ASSERT(isMainThread());
380     // Valid transitions:
381     // NotStarted-->Ongoing-->DataStarve-->NotStarted
382     //           |         |            `->Aborting-->NotStarted
383     //           |         `->Sampling-ยทยทยท->Sampling-->LastSample-->NotStarted
384     //           |                                               `->Aborting-->NotStarted
385     //           `->Aborting-->NotStarted
386     AppendState oldAppendState = m_appendState;
387     AppendState nextAppendState = AppendState::Invalid;
388
389     if (oldAppendState != newAppendState)
390         GST_TRACE("%s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
391
392     bool ok = false;
393
394     switch (oldAppendState) {
395     case AppendState::NotStarted:
396         switch (newAppendState) {
397         case AppendState::Ongoing:
398             ok = true;
399             gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
400             break;
401         case AppendState::NotStarted:
402             ok = true;
403             if (m_pendingBuffer) {
404                 GST_TRACE("pushing pending buffer %" GST_PTR_FORMAT, m_pendingBuffer.get());
405                 gst_app_src_push_buffer(GST_APP_SRC(appsrc()), m_pendingBuffer.leakRef());
406                 nextAppendState = AppendState::Ongoing;
407             }
408             break;
409         case AppendState::Aborting:
410             ok = true;
411             nextAppendState = AppendState::NotStarted;
412             break;
413         case AppendState::Invalid:
414             ok = true;
415             break;
416         default:
417             break;
418         }
419         break;
420     case AppendState::Ongoing:
421         switch (newAppendState) {
422         case AppendState::Sampling:
423         case AppendState::Invalid:
424             ok = true;
425             break;
426         case AppendState::DataStarve:
427             ok = true;
428             GST_DEBUG("received all pending samples");
429             m_sourceBufferPrivate->didReceiveAllPendingSamples();
430             if (m_abortPending)
431                 nextAppendState = AppendState::Aborting;
432             else
433                 nextAppendState = AppendState::NotStarted;
434             break;
435         default:
436             break;
437         }
438         break;
439     case AppendState::DataStarve:
440         switch (newAppendState) {
441         case AppendState::NotStarted:
442         case AppendState::Invalid:
443             ok = true;
444             break;
445         case AppendState::Aborting:
446             ok = true;
447             nextAppendState = AppendState::NotStarted;
448             break;
449         default:
450             break;
451         }
452         break;
453     case AppendState::Sampling:
454         switch (newAppendState) {
455         case AppendState::Sampling:
456         case AppendState::Invalid:
457             ok = true;
458             break;
459         case AppendState::LastSample:
460             ok = true;
461             GST_DEBUG("received all pending samples");
462             m_sourceBufferPrivate->didReceiveAllPendingSamples();
463             if (m_abortPending)
464                 nextAppendState = AppendState::Aborting;
465             else
466                 nextAppendState = AppendState::NotStarted;
467             break;
468         default:
469             break;
470         }
471         break;
472     case AppendState::LastSample:
473         switch (newAppendState) {
474         case AppendState::NotStarted:
475         case AppendState::Invalid:
476             ok = true;
477             break;
478         case AppendState::Aborting:
479             ok = true;
480             nextAppendState = AppendState::NotStarted;
481             break;
482         default:
483             break;
484         }
485         break;
486     case AppendState::Aborting:
487         switch (newAppendState) {
488         case AppendState::NotStarted:
489             ok = true;
490             resetPipeline();
491             m_abortPending = false;
492             nextAppendState = AppendState::NotStarted;
493             break;
494         case AppendState::Invalid:
495             ok = true;
496             break;
497         default:
498             break;
499         }
500         break;
501     case AppendState::Invalid:
502         ok = true;
503         break;
504     }
505
506     if (ok)
507         m_appendState = newAppendState;
508     else
509         GST_ERROR("Invalid append state transition %s --> %s", dumpAppendState(oldAppendState), dumpAppendState(newAppendState));
510
511     ASSERT(ok);
512
513     if (nextAppendState != AppendState::Invalid)
514         setAppendState(nextAppendState);
515 }
516
517 void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
518 {
519     ASSERT(isMainThread());
520
521     m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
522     m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
523
524     const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get());
525     if (!MediaPlayerPrivateGStreamerMSE::supportsCodec(originalMediaType)) {
526             m_presentationSize = WebCore::FloatSize();
527             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
528     } else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
529         std::optional<FloatSize> size = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get());
530         if (size.has_value())
531             m_presentationSize = size.value();
532         else
533             m_presentationSize = WebCore::FloatSize();
534
535         m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
536     } else {
537         m_presentationSize = WebCore::FloatSize();
538         if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX))
539             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
540         else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX))
541             m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
542     }
543 }
544
545 void AppendPipeline::appsinkCapsChanged()
546 {
547     ASSERT(isMainThread());
548
549     if (!m_appsink)
550         return;
551
552     GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
553     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
554
555     if (!caps)
556         return;
557
558     // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
559     bool previousCapsWereNull = !m_appsinkCaps;
560
561     if (m_appsinkCaps != caps) {
562         m_appsinkCaps = WTFMove(caps);
563         if (m_playerPrivate)
564             m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull);
565         gst_element_set_state(m_pipeline.get(), GST_STATE_PLAYING);
566     }
567 }
568
569 void AppendPipeline::handleEndOfAppend()
570 {
571     ASSERT(isMainThread());
572     GST_TRACE_OBJECT(m_pipeline.get(), "received end-of-append");
573
574     // Regardless of the state transition, the result is the same: didReceiveAllPendingSamples() is called.
575     switch (m_appendState) {
576     case AppendState::Ongoing:
577         GST_TRACE("DataStarve");
578         setAppendState(AppendState::DataStarve);
579         break;
580     case AppendState::Sampling:
581         GST_TRACE("LastSample");
582         setAppendState(AppendState::LastSample);
583         break;
584     default:
585         ASSERT_NOT_REACHED();
586         break;
587     }
588 }
589
590 void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
591 {
592     ASSERT(isMainThread());
593
594     if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
595         GST_WARNING("Received sample without buffer from appsink.");
596         return;
597     }
598
599     RefPtr<MediaSampleGStreamer> mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());
600
601     GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
602         mediaSample->trackID().string().utf8().data(),
603         mediaSample->presentationTime().toString().utf8().data(),
604         mediaSample->decodeTime().toString().utf8().data(),
605         mediaSample->duration().toString().utf8().data(),
606         mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
607
608     // If we're beyond the duration, ignore this sample and the remaining ones.
609     MediaTime duration = m_mediaSourceClient->duration();
610     if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
611         GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
612         setAppendState(AppendState::LastSample);
613         return;
614     }
615
616     // Add a gap sample if a gap is detected before the first sample.
617     if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) {
618         GST_DEBUG("Adding gap offset");
619         mediaSample->applyPtsOffset(MediaTime::zeroTime());
620     }
621
622     m_sourceBufferPrivate->didReceiveSample(*mediaSample);
623     setAppendState(AppendState::Sampling);
624 }
625
626 void AppendPipeline::appsinkEOS()
627 {
628     ASSERT(isMainThread());
629
630     switch (m_appendState) {
631     case AppendState::Aborting:
632         // Ignored. Operation completion will be managed by the Aborting->NotStarted transition.
633         return;
634     case AppendState::Ongoing:
635         // Finish Ongoing and Sampling states.
636         setAppendState(AppendState::DataStarve);
637         break;
638     case AppendState::Sampling:
639         setAppendState(AppendState::LastSample);
640         break;
641     default:
642         GST_DEBUG("Unexpected EOS");
643         break;
644     }
645 }
646
647 void AppendPipeline::didReceiveInitializationSegment()
648 {
649     ASSERT(isMainThread());
650
651     WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
652
653     GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
654     initializationSegment.duration = m_mediaSourceClient->duration();
655
656     switch (m_streamType) {
657     case Audio: {
658         WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
659         info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
660         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
661         initializationSegment.audioTracks.append(info);
662         break;
663     }
664     case Video: {
665         WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
666         info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
667         info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
668         initializationSegment.videoTracks.append(info);
669         break;
670     }
671     default:
672         GST_ERROR("Unsupported stream type or codec");
673         break;
674     }
675
676     m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
677 }
678
679 AtomicString AppendPipeline::trackId()
680 {
681     ASSERT(isMainThread());
682
683     if (!m_track)
684         return AtomicString();
685
686     return m_track->id();
687 }
688
689 void AppendPipeline::consumeAppsinkAvailableSamples()
690 {
691     ASSERT(isMainThread());
692
693     GRefPtr<GstSample> sample;
694     int batchedSampleCount = 0;
695     while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
696         appsinkNewSample(WTFMove(sample));
697         batchedSampleCount++;
698     }
699
700     GST_TRACE_OBJECT(m_pipeline.get(), "batchedSampleCount = %d", batchedSampleCount);
701 }
702
703 void AppendPipeline::resetPipeline()
704 {
705     ASSERT(isMainThread());
706     GST_DEBUG("resetting pipeline");
707
708     gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
709     gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
710
711 #if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
712     {
713         static unsigned i = 0;
714         // This is here for debugging purposes. It does not make sense to have it as class member.
715         WTF::String  dotFileName = String::format("reset-pipeline-%d", ++i);
716         gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
717     }
718 #endif
719
720 }
721
722 void AppendPipeline::abort()
723 {
724     ASSERT(isMainThread());
725     GST_DEBUG("aborting");
726
727     m_pendingBuffer = nullptr;
728
729     // Abort already ongoing.
730     if (m_abortPending)
731         return;
732
733     m_abortPending = true;
734     if (m_appendState == AppendState::NotStarted)
735         setAppendState(AppendState::Aborting);
736     // Else, the automatic state transitions will take care when the ongoing append finishes.
737 }
738
739 GstFlowReturn AppendPipeline::pushNewBuffer(GstBuffer* buffer)
740 {
741     if (m_abortPending) {
742         m_pendingBuffer = adoptGRef(buffer);
743         return GST_FLOW_OK;
744     }
745
746     setAppendState(AppendPipeline::AppendState::Ongoing);
747
748     GST_TRACE_OBJECT(m_pipeline.get(), "pushing data buffer %" GST_PTR_FORMAT, buffer);
749     GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer);
750     // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should
751     // be true at this point.
752     g_return_val_if_fail(pushDataBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
753
754     // Push an additional empty buffer that marks the end of the append.
755     // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful
756     // completion of the append.
757     //
758     // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the
759     // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case
760     // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous
761     // buffer has completed.
762
763     GstBuffer* endOfAppendBuffer = gst_buffer_new();
764     gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr);
765
766     GST_TRACE_OBJECT(m_pipeline.get(), "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer);
767     GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer);
768     g_return_val_if_fail(pushEndOfAppendBufferRet == GST_FLOW_OK, GST_FLOW_ERROR);
769
770     return GST_FLOW_OK;
771 }
772
773 GstFlowReturn AppendPipeline::handleAppsinkNewSampleFromStreamingThread(GstElement*)
774 {
775     ASSERT(!isMainThread());
776     if (&WTF::Thread::current() != m_streamingThread) {
777         // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
778         // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
779         // This error will only raise if someone modifies the pipeline to include more than one streaming thread or
780         // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken.
781         // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has
782         // been demuxed completely.;
783         g_critical("Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run.");
784         ASSERT_NOT_REACHED();
785     }
786
787     if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
788         GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
789         return GST_FLOW_ERROR;
790     }
791
792     if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
793         GST_TRACE("Posting appsink-new-sample task to the main thread");
794         m_taskQueue.enqueueTask([this]() {
795             m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
796             consumeAppsinkAvailableSamples();
797         });
798     }
799
800     return GST_FLOW_OK;
801 }
802
803 static GRefPtr<GstElement>
804 createOptionalParserForFormat(GstPad* demuxerSrcPad)
805 {
806     GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
807     GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
808     const char* mediaType = gst_structure_get_name(structure);
809
810     GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
811     GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
812
813     if (!g_strcmp0(mediaType, "audio/x-opus")) {
814         GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get());
815         ASSERT(opusparse);
816         g_return_val_if_fail(opusparse, nullptr);
817         return GRefPtr<GstElement>(opusparse);
818     }
819     if (!g_strcmp0(mediaType, "audio/x-vorbis")) {
820         GstElement* vorbisparse = gst_element_factory_make("vorbisparse", parserName.get());
821         ASSERT(vorbisparse);
822         g_return_val_if_fail(vorbisparse, nullptr);
823         return GRefPtr<GstElement>(vorbisparse);
824     }
825     if (!g_strcmp0(mediaType, "video/x-h264")) {
826         GstElement* h264parse = gst_element_factory_make("h264parse", parserName.get());
827         ASSERT(h264parse);
828         g_return_val_if_fail(h264parse, nullptr);
829         return GRefPtr<GstElement>(h264parse);
830     }
831
832     return nullptr;
833 }
834
835 void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad)
836 {
837     ASSERT(!isMainThread());
838     if (!m_appsink)
839         return;
840
841     GST_DEBUG("connecting to appsink");
842
843     if (m_demux->numsrcpads > 1) {
844         GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
845         gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
846         g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
847         return;
848     }
849
850     GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
851
852     // Only one stream per demuxer is supported.
853     ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
854
855     gint64 timeLength = 0;
856     if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
857         && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
858         m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
859     else
860         m_initialDuration = MediaTime::positiveInfiniteTime();
861
862     GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread");
863     auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() {
864         connectDemuxerSrcPadToAppsink(demuxerSrcPad);
865         return AbortableTaskQueue::Void();
866     });
867     if (!response) {
868         // The AppendPipeline has been destroyed or aborted before we received a response.
869         return;
870     }
871
872     // Must be done in the thread we were called from (usually streaming thread).
873     bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
874         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
875         || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
876
877     if (isData) {
878         // FIXME: Only add appsink one time. This method can be called several times.
879         GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
880         if (!parent)
881             gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
882
883         // Current head of the pipeline being built.
884         GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
885
886         // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
887         // the contained audio streams in order to know the duration of the frames.
888         // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
889         m_parser = createOptionalParserForFormat(currentSrcPad.get());
890         if (m_parser) {
891             gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
892             gst_element_sync_state_with_parent(m_parser.get());
893
894             GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
895             GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
896
897             gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
898             currentSrcPad = parserSrcPad;
899         }
900
901         gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
902
903         gst_element_sync_state_with_parent(m_appsink.get());
904
905         gst_element_set_state(m_pipeline.get(), GST_STATE_PAUSED);
906         gst_element_sync_state_with_parent(m_appsink.get());
907
908         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
909     }
910 }
911
912 void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
913 {
914     ASSERT(isMainThread());
915     GST_DEBUG("Connecting to appsink");
916
917     const String& type = m_sourceBufferPrivate->type().containerType();
918     if (type.endsWith("webm"))
919         gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
920
921     GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
922
923     // Only one stream per demuxer is supported.
924     ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
925
926     GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
927
928     if (!caps || m_appendState == AppendState::Invalid || !m_playerPrivate) {
929         return;
930     }
931
932 #ifndef GST_DISABLE_GST_DEBUG
933     {
934         GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
935         GST_DEBUG("%s", strcaps.get());
936     }
937 #endif
938
939     if (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime())
940         m_mediaSourceClient->durationChanged(m_initialDuration);
941
942     parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
943
944     switch (m_streamType) {
945     case WebCore::MediaSourceStreamTypeGStreamer::Audio:
946         if (m_playerPrivate)
947             m_track = WebCore::AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
948         break;
949     case WebCore::MediaSourceStreamTypeGStreamer::Video:
950         if (m_playerPrivate)
951             m_track = WebCore::VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
952         break;
953     case WebCore::MediaSourceStreamTypeGStreamer::Text:
954         m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
955         break;
956     case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
957         {
958             GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
959             GST_DEBUG("Unsupported track codec: %s", strcaps.get());
960         }
961         // This is going to cause an error which will detach the SourceBuffer and tear down this
962         // AppendPipeline, so we need the padAddRemove lock released before continuing.
963         m_track = nullptr;
964         didReceiveInitializationSegment();
965         return;
966     default:
967         // No useful data.
968         break;
969     }
970
971     m_appsinkCaps = WTFMove(caps);
972     if (m_playerPrivate)
973         m_playerPrivate->trackDetected(this, m_track, true);
974 }
975
976 void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
977 {
978     // Note: This function can be called either from the streaming thread (e.g. if a strange initialization segment with
979     // incompatible tracks is appended and the srcpad disconnected) or -- more usually -- from the main thread, when
980     // a state change is made to bring the demuxer down. (State change operations run in the main thread.)
981     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
982
983     GST_DEBUG("Disconnecting appsink");
984
985     if (m_parser) {
986         gst_element_set_state(m_parser.get(), GST_STATE_NULL);
987         gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
988         m_parser = nullptr;
989     }
990
991     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
992 }
993
994 #if !LOG_DISABLED
995 static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
996 {
997     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
998     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
999     GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
1000     return GST_PAD_PROBE_OK;
1001 }
1002 #endif
1003
1004 #if ENABLE(ENCRYPTED_MEDIA)
1005 static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation *padProbeInformation)
1006 {
1007     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
1008     GstEvent* event = gst_pad_probe_info_get_event(info);
1009     GST_DEBUG("Handling event %s on append pipeline appsinkPad", GST_EVENT_TYPE_NAME(event));
1010     WebCore::AppendPipeline* appendPipeline = padProbeInformation->appendPipeline;
1011
1012     switch (GST_EVENT_TYPE(event)) {
1013     case GST_EVENT_PROTECTION:
1014         if (appendPipeline && appendPipeline->playerPrivate())
1015             appendPipeline->playerPrivate()->handleProtectionEvent(event);
1016         return GST_PAD_PROBE_DROP;
1017     default:
1018         break;
1019     }
1020
1021     return GST_PAD_PROBE_OK;
1022 }
1023 #endif
1024
1025 static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
1026 {
1027     ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
1028     GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
1029     GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
1030     return GST_PAD_PROBE_DROP;
1031 }
1032
1033 static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
1034 {
1035     // matroskademux sets GstSegment.start to the PTS of the first frame.
1036     //
1037     // This way in the unlikely case a user made a .mkv or .webm file where a certain portion of the movie is skipped
1038     // (e.g. by concatenating a MSE initialization segment with any MSE media segment other than the first) and opened
1039     // it with a regular player, playback would start immediately. GstSegment.duration is not modified in any case.
1040     //
1041     // Leaving the usefulness of that feature aside, the fact that it uses GstSegment.start is problematic for MSE.
1042     // In MSE is not unusual to process unordered MSE media segments. In this case, a frame may have
1043     // PTS <<< GstSegment.start and be discarded by downstream. This happens for instance in elements derived from
1044     // audiobasefilter, such as opusparse.
1045     //
1046     // This probe remedies the problem by setting GstSegment.start to 0 in all cases, not only when the PTS of the first
1047     // frame is zero.
1048
1049     ASSERT(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
1050     GstEvent* event = static_cast<GstEvent*>(info->data);
1051     if (event->type == GST_EVENT_SEGMENT) {
1052         GstSegment segment;
1053         gst_event_copy_segment(event, &segment);
1054
1055         segment.start = 0;
1056
1057         GRefPtr<GstEvent> newEvent = adoptGRef(gst_event_new_segment(&segment));
1058         gst_event_replace(reinterpret_cast<GstEvent**>(&info->data), newEvent.get());
1059     }
1060     return GST_PAD_PROBE_OK;
1061 }
1062
1063 } // namespace WebCore.
1064
1065 #endif // USE(GSTREAMER)