#include <gst/pbutils/missing-plugins.h>
#include <wtf/MainThread.h>
#include <wtf/Noncopyable.h>
-#include <wtf/glib/GMutexLocker.h>
#include <wtf/glib/GRefPtr.h>
#include <wtf/glib/GUniquePtr.h>
#include <wtf/text/CString.h>
using namespace WebCore;
-class StreamingClient {
-public:
- StreamingClient(WebKitWebSrc*, ResourceRequest&&);
- virtual ~StreamingClient();
-
-protected:
- char* createReadBuffer(size_t requestedSize, size_t& actualSize);
- void handleResponseReceived(const ResourceResponse&);
- void handleDataReceived(const char*, int);
- void handleNotifyFinished();
-
- GRefPtr<GstElement> m_src;
- ResourceRequest m_request;
-};
-
-class CachedResourceStreamingClient final : public PlatformMediaResourceClient, public StreamingClient {
+class CachedResourceStreamingClient final : public PlatformMediaResourceClient {
WTF_MAKE_NONCOPYABLE(CachedResourceStreamingClient);
public:
CachedResourceStreamingClient(WebKitWebSrc*, ResourceRequest&&);
virtual ~CachedResourceStreamingClient();
-
private:
- // PlatformMediaResourceClient virtual methods.
#if USE(SOUP)
- char* getOrCreateReadBuffer(PlatformMediaResource&, size_t requestedSize, size_t& actualSize) override;
+ char* getOrCreateReadBuffer(PlatformMediaResource&, size_t requestedSize, size_t& actualSize);
#endif
+ // PlatformMediaResourceClient virtual methods.
void responseReceived(PlatformMediaResource&, const ResourceResponse&) override;
void dataReceived(PlatformMediaResource&, const char*, int) override;
void accessControlCheckFailed(PlatformMediaResource&, const ResourceError&) override;
void loadFailed(PlatformMediaResource&, const ResourceError&) override;
void loadFinished(PlatformMediaResource&) override;
-};
-class ResourceHandleStreamingClient : public ThreadSafeRefCounted<ResourceHandleStreamingClient>, public ResourceHandleClient, public StreamingClient {
-public:
- static Ref<ResourceHandleStreamingClient> create(WebKitWebSrc* src, ResourceRequest&& request)
- {
- return adoptRef(*new ResourceHandleStreamingClient(src, WTFMove(request)));
- }
- virtual ~ResourceHandleStreamingClient();
-
- void invalidate();
-
- // StreamingClient virtual methods.
- bool loadFailed() const;
- void setDefersLoading(bool);
-
-private:
- ResourceHandleStreamingClient(WebKitWebSrc*, ResourceRequest&&);
- void cleanupAndStopRunLoop();
-
- // ResourceHandleClient virtual methods.
-#if USE(SOUP)
- char* getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize) override;
-#endif
- ResourceRequest willSendRequest(ResourceHandle*, ResourceRequest&&, ResourceResponse&&) override;
- void didReceiveResponse(ResourceHandle*, ResourceResponse&&) override;
- void didReceiveData(ResourceHandle*, const char*, unsigned, int) override;
- void didReceiveBuffer(ResourceHandle*, Ref<SharedBuffer>&&, int encodedLength) override;
- void didFinishLoading(ResourceHandle*) override;
- void didFail(ResourceHandle*, const ResourceError&) override;
- void wasBlocked(ResourceHandle*) override;
- void cannotShowURL(ResourceHandle*) override;
-
- RefPtr<Thread> m_thread;
- Lock m_initializeRunLoopConditionMutex;
- Condition m_initializeRunLoopCondition;
- RunLoop* m_runLoop { nullptr };
- Lock m_terminateRunLoopConditionMutex;
- Condition m_terminateRunLoopCondition;
- RefPtr<ResourceHandle> m_resource;
-#if USE(SOUP)
- std::unique_ptr<SoupNetworkSession> m_session;
-#endif
+ GRefPtr<GstElement> m_src;
+ ResourceRequest m_request;
};
enum MainThreadSourceNotification {
RefPtr<PlatformMediaResourceLoader> loader;
RefPtr<PlatformMediaResource> resource;
- RefPtr<ResourceHandleStreamingClient> client;
bool didPassAccessControlCheck;
guint64 requestedOffset;
- bool createdInMainThread;
RefPtr<MainThreadNotifier<MainThreadSourceNotification>> notifier;
GRefPtr<GstBuffer> buffer;
};
src->priv = priv;
new (priv) WebKitWebSrcPrivate();
- priv->createdInMainThread = isMainThread();
priv->notifier = MainThreadNotifier<MainThreadSourceNotification>::create();
priv->appsrc = GST_APP_SRC(gst_element_factory_make("appsrc", nullptr));
gst_bin_add(GST_BIN(src), GST_ELEMENT(priv->appsrc));
-
GRefPtr<GstPad> targetPad = adoptGRef(gst_element_get_static_pad(GST_ELEMENT(priv->appsrc), "src"));
priv->srcpad = webkitGstGhostPadFromStaticTemplate(&srcTemplate, "src", targetPad.get());
priv->notifier = nullptr;
}
- priv->player = nullptr;
-
GST_CALL_PARENT(G_OBJECT_CLASS, dispose, (object));
}
WebKitWebSrc* src = WEBKIT_WEB_SRC(object);
WebKitWebSrcPrivate* priv = src->priv;
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
switch (propID) {
case PROP_LOCATION:
g_value_set_string(value, priv->originalURI.data());
priv->notifier->notify(MainThreadSourceNotification::Stop, [protector, keepAlive = priv->keepAlive] {
WebKitWebSrcPrivate* priv = protector->priv;
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(protector.get()));
if (priv->resource) {
priv->resource->stop();
priv->resource->setClient(nullptr);
});
}
- if (priv->client) {
- priv->client->invalidate();
- priv->client = nullptr;
- }
-
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
-
bool wasSeeking = std::exchange(priv->isSeeking, false);
if (priv->buffer) {
priv->player = nullptr;
}
- locker.unlock();
-
if (priv->appsrc) {
gst_app_src_set_caps(priv->appsrc, nullptr);
if (!wasSeeking)
static void webKitWebSrcStart(WebKitWebSrc* src)
{
WebKitWebSrcPrivate* priv = src->priv;
-
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
+ ASSERT(priv->player);
priv->didPassAccessControlCheck = false;
if (priv->originalURI.isNull()) {
GST_ERROR_OBJECT(src, "No URI provided");
- locker.unlock();
webKitWebSrcStop(src);
return;
}
- ASSERT(!priv->client);
-
GST_DEBUG_OBJECT(src, "Fetching %s", priv->originalURI.data());
URL url = URL(URL(), priv->originalURI.data());
priv->size = 0;
- if (priv->player)
- request.setHTTPReferrer(priv->player->referrer());
+ request.setHTTPReferrer(priv->player->referrer());
if (priv->httpMethod.get())
request.setHTTPMethod(priv->httpMethod.get());
// We always request Icecast/Shoutcast metadata, just in case ...
request.setHTTPHeaderField(HTTPHeaderName::IcyMetadata, "1");
- if (!priv->player || !priv->createdInMainThread) {
- priv->client = ResourceHandleStreamingClient::create(src, WTFMove(request));
- if (priv->client->loadFailed()) {
- GST_ERROR_OBJECT(src, "Failed to setup streaming client");
- locker.unlock();
- webKitWebSrcStop(src);
- } else
- GST_DEBUG_OBJECT(src, "Started request");
- return;
- }
-
- locker.unlock();
GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
priv->notifier->notify(MainThreadSourceNotification::Start, [protector, request = WTFMove(request)] {
WebKitWebSrcPrivate* priv = protector->priv;
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(protector.get()));
if (!priv->loader)
priv->loader = priv->player->createResourceLoader();
} else {
GST_ERROR_OBJECT(protector.get(), "Failed to setup streaming client");
priv->loader = nullptr;
- locker.unlock();
webKitWebSrcStop(protector.get());
}
});
static gboolean webKitWebSrcQueryWithParent(GstPad* pad, GstObject* parent, GstQuery* query)
{
WebKitWebSrc* src = WEBKIT_WEB_SRC(GST_ELEMENT(parent));
+ WebKitWebSrcPrivate* priv = src->priv;
gboolean result = FALSE;
switch (GST_QUERY_TYPE(query)) {
gst_query_parse_duration(query, &format, nullptr);
GST_DEBUG_OBJECT(src, "duration query in format %s", gst_format_get_name(format));
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- if (format == GST_FORMAT_BYTES && src->priv->size > 0) {
- gst_query_set_duration(query, format, src->priv->size);
+ if (format == GST_FORMAT_BYTES && priv->size > 0) {
+ gst_query_set_duration(query, format, priv->size);
result = TRUE;
}
break;
}
case GST_QUERY_URI: {
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- gst_query_set_uri(query, src->priv->originalURI.data());
- if (!src->priv->redirectedURI.isNull())
- gst_query_set_uri_redirection(query, src->priv->redirectedURI.data());
+ gst_query_set_uri(query, priv->originalURI.data());
+ if (!priv->redirectedURI.isNull())
+ gst_query_set_uri_redirection(query, priv->redirectedURI.data());
result = TRUE;
break;
}
static gchar* webKitWebSrcGetUri(GstURIHandler* handler)
{
WebKitWebSrc* src = WEBKIT_WEB_SRC(handler);
- gchar* ret;
-
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- ret = g_strdup(src->priv->originalURI.data());
+ gchar* ret = g_strdup(src->priv->originalURI.data());
return ret;
}
return FALSE;
}
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
-
priv->redirectedURI = CString();
priv->originalURI = CString();
if (!uri)
GST_DEBUG_OBJECT(src, "Need more data");
- {
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- if (!priv->paused)
- return;
- priv->paused = false;
- if (priv->client) {
- priv->client->setDefersLoading(false);
- return;
- }
- }
+ if (!priv->paused)
+ return;
+ priv->paused = false;
GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
priv->notifier->notify(MainThreadSourceNotification::NeedData, [protector] {
GST_DEBUG_OBJECT(src, "Have enough data");
- {
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- if (priv->paused)
- return;
- priv->paused = true;
- if (priv->client) {
- priv->client->setDefersLoading(true);
- return;
- }
- }
+ if (priv->paused)
+ return;
+ priv->paused = true;
GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
priv->notifier->notify(MainThreadSourceNotification::EnoughData, [protector] {
{
WebKitWebSrcPrivate* priv = src->priv;
- {
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- if (offset == priv->offset && priv->requestedOffset == priv->offset)
- return TRUE;
+ if (offset == priv->offset && priv->requestedOffset == priv->offset)
+ return TRUE;
- if (!priv->seekable)
- return FALSE;
+ if (!priv->seekable)
+ return FALSE;
- priv->isSeeking = true;
- priv->requestedOffset = offset;
- }
+ priv->isSeeking = true;
+ priv->requestedOffset = offset;
GST_DEBUG_OBJECT(src, "Seeking to offset: %" G_GUINT64_FORMAT, src->priv->requestedOffset);
- if (priv->client) {
- webKitWebSrcStop(src);
- webKitWebSrcStart(src);
- return TRUE;
- }
GRefPtr<WebKitWebSrc> protector = WTF::ensureGRef(src);
priv->notifier->notify(MainThreadSourceNotification::Seek, [protector] {
void webKitWebSrcSetMediaPlayer(WebKitWebSrc* src, WebCore::MediaPlayer* player)
{
ASSERT(player);
- ASSERT(src->priv->createdInMainThread);
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
src->priv->player = player;
}
return src->priv->didPassAccessControlCheck;
}
-StreamingClient::StreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
+CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
: m_src(GST_ELEMENT(src))
, m_request(WTFMove(request))
{
}
-StreamingClient::~StreamingClient()
+CachedResourceStreamingClient::~CachedResourceStreamingClient()
{
}
-char* StreamingClient::createReadBuffer(size_t requestedSize, size_t& actualSize)
+#if USE(SOUP)
+char* CachedResourceStreamingClient::getOrCreateReadBuffer(PlatformMediaResource&, size_t requestedSize, size_t& actualSize)
{
WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
WebKitWebSrcPrivate* priv = src->priv;
mapGstBuffer(buffer, GST_MAP_WRITE);
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
priv->buffer = adoptGRef(buffer);
- locker.unlock();
actualSize = gst_buffer_get_size(buffer);
return getGstBufferDataPointer(buffer);
}
+#endif
-void StreamingClient::handleResponseReceived(const ResourceResponse& response)
+void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, const ResourceResponse& response)
{
WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
WebKitWebSrcPrivate* priv = src->priv;
+ priv->didPassAccessControlCheck = priv->resource->didPassAccessControlCheck();
GST_DEBUG_OBJECT(src, "Received response: %d", response.httpStatusCode());
return;
}
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
-
if (priv->isSeeking) {
GST_DEBUG_OBJECT(src, "Seek in progress, ignoring response");
return;
priv->offset = 0;
} else if (response.httpStatusCode() != 206) {
// Range request completely failed.
- locker.unlock();
GST_ELEMENT_ERROR(src, RESOURCE, READ, ("Received unexpected %d HTTP status code", response.httpStatusCode()), (nullptr));
gst_app_src_end_of_stream(priv->appsrc);
webKitWebSrcStop(src);
priv->size = length >= 0 ? length : 0;
priv->seekable = length > 0 && g_ascii_strcasecmp("none", response.httpHeaderField(HTTPHeaderName::AcceptRanges).utf8().data());
- locker.unlock();
-
// notify size/duration
if (length > 0) {
gst_app_src_set_size(priv->appsrc, length);
gst_pad_push_event(GST_BASE_SRC_PAD(priv->appsrc), gst_event_new_custom(GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, httpHeaders));
}
-void StreamingClient::handleDataReceived(const char* data, int length)
+void CachedResourceStreamingClient::dataReceived(PlatformMediaResource&, const char* data, int length)
{
WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
WebKitWebSrcPrivate* priv = src->priv;
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
-
GST_LOG_OBJECT(src, "Have %lld bytes of data", priv->buffer ? static_cast<long long>(gst_buffer_get_size(priv->buffer.get())) : length);
ASSERT(!priv->buffer || data == getGstBufferDataPointer(priv->buffer.get()));
}
GST_BUFFER_OFFSET_END(priv->buffer.get()) = priv->offset;
- locker.unlock();
-
GstFlowReturn ret = gst_app_src_push_buffer(priv->appsrc, priv->buffer.leakRef());
if (ret != GST_FLOW_OK && ret != GST_FLOW_EOS)
GST_ELEMENT_ERROR(src, CORE, FAILED, (nullptr), (nullptr));
}
-void StreamingClient::handleNotifyFinished()
-{
- WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
- WebKitWebSrcPrivate* priv = src->priv;
-
- GST_DEBUG_OBJECT(src, "Have EOS");
-
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- if (!priv->isSeeking) {
- locker.unlock();
- gst_app_src_end_of_stream(priv->appsrc);
- }
-}
-
-CachedResourceStreamingClient::CachedResourceStreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
- : StreamingClient(src, WTFMove(request))
-{
-}
-
-CachedResourceStreamingClient::~CachedResourceStreamingClient()
-{
-}
-
-#if USE(SOUP)
-char* CachedResourceStreamingClient::getOrCreateReadBuffer(PlatformMediaResource&, size_t requestedSize, size_t& actualSize)
-{
- return createReadBuffer(requestedSize, actualSize);
-}
-#endif
-
-void CachedResourceStreamingClient::responseReceived(PlatformMediaResource&, const ResourceResponse& response)
-{
- WebKitWebSrcPrivate* priv = WEBKIT_WEB_SRC(m_src.get())->priv;
- priv->didPassAccessControlCheck = priv->resource->didPassAccessControlCheck();
- handleResponseReceived(response);
-}
-
-void CachedResourceStreamingClient::dataReceived(PlatformMediaResource&, const char* data, int length)
-{
- handleDataReceived(data, length);
-}
-
void CachedResourceStreamingClient::accessControlCheckFailed(PlatformMediaResource&, const ResourceError& error)
{
WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
}
void CachedResourceStreamingClient::loadFinished(PlatformMediaResource&)
-{
- handleNotifyFinished();
-}
-
-ResourceHandleStreamingClient::ResourceHandleStreamingClient(WebKitWebSrc* src, ResourceRequest&& request)
- : StreamingClient(src, WTFMove(request))
-{
- LockHolder locker(m_initializeRunLoopConditionMutex);
- m_thread = Thread::create("ResourceHandleStreamingClient", [this] {
- {
- LockHolder locker(m_initializeRunLoopConditionMutex);
- m_runLoop = &RunLoop::current();
-#if USE(SOUP)
- m_session = std::make_unique<SoupNetworkSession>();
- m_resource = ResourceHandle::create(*m_session, m_request, this, true, false);
-#else
- // FIXME: This create will hit an assert in debug builds. See https://bugs.webkit.org/show_bug.cgi?id=167003.
- m_resource = ResourceHandle::create(nullptr, m_request, this, true, false);
-#endif
- m_initializeRunLoopCondition.notifyOne();
- }
- if (!m_resource)
- return;
-
- m_runLoop->dispatch([this] { m_resource->setDefersLoading(false); });
- m_runLoop->run();
- });
- m_initializeRunLoopCondition.wait(m_initializeRunLoopConditionMutex);
-}
-
-ResourceHandleStreamingClient::~ResourceHandleStreamingClient()
-{
- if (m_thread) {
- m_thread->detach();
- m_thread = nullptr;
- }
-}
-
-void ResourceHandleStreamingClient::cleanupAndStopRunLoop()
-{
- m_resource->clearClient();
- m_resource->cancel();
- m_resource = nullptr;
-#if USE(SOUP)
- m_session = nullptr;
-#endif
- m_runLoop->stop();
-}
-
-void ResourceHandleStreamingClient::invalidate()
-{
- if (m_runLoop == &RunLoop::current()) {
- cleanupAndStopRunLoop();
- return;
- }
-
- LockHolder locker(m_terminateRunLoopConditionMutex);
- m_runLoop->dispatch([this, protectedThis = makeRef(*this)] {
- cleanupAndStopRunLoop();
- LockHolder locker(m_terminateRunLoopConditionMutex);
- m_terminateRunLoopCondition.notifyOne();
- });
- m_terminateRunLoopCondition.wait(m_terminateRunLoopConditionMutex);
-}
-
-bool ResourceHandleStreamingClient::loadFailed() const
-{
- return !m_resource;
-}
-
-void ResourceHandleStreamingClient::setDefersLoading(bool defers)
-{
- m_runLoop->dispatch([this, protectedThis = makeRef(*this), defers] {
- if (m_resource)
- m_resource->setDefersLoading(defers);
- });
-}
-
-#if USE(SOUP)
-char* ResourceHandleStreamingClient::getOrCreateReadBuffer(size_t requestedSize, size_t& actualSize)
-{
- return createReadBuffer(requestedSize, actualSize);
-}
-#endif
-
-ResourceRequest ResourceHandleStreamingClient::willSendRequest(ResourceHandle*, ResourceRequest&& request, ResourceResponse&&)
-{
- return WTFMove(request);
-}
-
-void ResourceHandleStreamingClient::didReceiveResponse(ResourceHandle*, ResourceResponse&& response)
-{
- if (m_resource)
- handleResponseReceived(response);
-}
-
-void ResourceHandleStreamingClient::didReceiveData(ResourceHandle*, const char* /* data */, unsigned /* length */, int)
-{
- ASSERT_NOT_REACHED();
-}
-
-void ResourceHandleStreamingClient::didReceiveBuffer(ResourceHandle*, Ref<SharedBuffer>&& buffer, int /* encodedLength */)
-{
- if (!m_resource)
- return;
-
- for (const auto& element : buffer.get())
- handleDataReceived(element.segment->data(), element.segment->size());
-}
-
-void ResourceHandleStreamingClient::didFinishLoading(ResourceHandle*)
-{
- if (m_resource)
- handleNotifyFinished();
-}
-
-void ResourceHandleStreamingClient::didFail(ResourceHandle*, const ResourceError& error)
-{
- WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
-
- GST_ERROR_OBJECT(src, "Have failure: %s", error.localizedDescription().utf8().data());
- GST_ELEMENT_ERROR(src, RESOURCE, FAILED, ("%s", error.localizedDescription().utf8().data()), (nullptr));
- gst_app_src_end_of_stream(src->priv->appsrc);
-}
-
-void ResourceHandleStreamingClient::wasBlocked(ResourceHandle*)
{
WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
- GUniquePtr<gchar> uri;
-
- GST_ERROR_OBJECT(src, "Request was blocked");
-
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- uri.reset(g_strdup(src->priv->originalURI.data()));
- locker.unlock();
-
- GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, ("Access to \"%s\" was blocked", uri.get()), (nullptr));
-}
-
-void ResourceHandleStreamingClient::cannotShowURL(ResourceHandle*)
-{
- WebKitWebSrc* src = WEBKIT_WEB_SRC(m_src.get());
- GUniquePtr<gchar> uri;
-
- GST_ERROR_OBJECT(src, "Cannot show URL");
+ WebKitWebSrcPrivate* priv = src->priv;
- WTF::GMutexLocker<GMutex> locker(*GST_OBJECT_GET_LOCK(src));
- uri.reset(g_strdup(src->priv->originalURI.data()));
- locker.unlock();
+ GST_DEBUG_OBJECT(src, "Have EOS");
- GST_ELEMENT_ERROR(src, RESOURCE, OPEN_READ, ("Can't show \"%s\"", uri.get()), (nullptr));
+ if (!priv->isSeeking)
+ gst_app_src_end_of_stream(priv->appsrc);
}
#endif // USE(GSTREAMER)