[Curl] WebSocket platform part is not implemented.
authorpeavo@outlook.com <peavo@outlook.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Fri, 15 May 2015 13:50:05 +0000 (13:50 +0000)
committerpeavo@outlook.com <peavo@outlook.com@268f45cc-cd09-0410-ab3c-d52691b4dbfc>
Fri, 15 May 2015 13:50:05 +0000 (13:50 +0000)
https://bugs.webkit.org/show_bug.cgi?id=144628

Reviewed by Darin Adler.

Add Curl platform code implementation for WebSockets.

* platform/network/curl/SocketStreamHandle.h:
(WebCore::SocketStreamHandle::create):
(WebCore::SocketStreamHandle::SocketData::SocketData):
* platform/network/curl/SocketStreamHandleCurl.cpp:
(WebCore::SocketStreamHandle::SocketStreamHandle):
(WebCore::SocketStreamHandle::~SocketStreamHandle):
(WebCore::SocketStreamHandle::platformSend):
(WebCore::SocketStreamHandle::platformClose):
(WebCore::SocketStreamHandle::readData):
(WebCore::SocketStreamHandle::sendData):
(WebCore::SocketStreamHandle::waitForAvailableData):
(WebCore::SocketStreamHandle::startThread):
(WebCore::SocketStreamHandle::stopThread):
(WebCore::SocketStreamHandle::didReceiveData):
(WebCore::SocketStreamHandle::didOpenSocket):
(WebCore::SocketStreamHandle::createCopy):

git-svn-id: https://svn.webkit.org/repository/webkit/trunk@184389 268f45cc-cd09-0410-ab3c-d52691b4dbfc

Source/WebCore/ChangeLog
Source/WebCore/platform/network/curl/SocketStreamHandle.h
Source/WebCore/platform/network/curl/SocketStreamHandleCurl.cpp

index 3653382..c4487b2 100644 (file)
@@ -1,3 +1,29 @@
+2015-05-15  Per Arne Vollan  <peavo@outlook.com>
+
+        [Curl] WebSocket platform part is not implemented.
+        https://bugs.webkit.org/show_bug.cgi?id=144628
+
+        Reviewed by Darin Adler.
+
+        Add Curl platform code implementation for WebSockets.
+
+        * platform/network/curl/SocketStreamHandle.h:
+        (WebCore::SocketStreamHandle::create):
+        (WebCore::SocketStreamHandle::SocketData::SocketData):
+        * platform/network/curl/SocketStreamHandleCurl.cpp:
+        (WebCore::SocketStreamHandle::SocketStreamHandle):
+        (WebCore::SocketStreamHandle::~SocketStreamHandle):
+        (WebCore::SocketStreamHandle::platformSend):
+        (WebCore::SocketStreamHandle::platformClose):
+        (WebCore::SocketStreamHandle::readData):
+        (WebCore::SocketStreamHandle::sendData):
+        (WebCore::SocketStreamHandle::waitForAvailableData):
+        (WebCore::SocketStreamHandle::startThread):
+        (WebCore::SocketStreamHandle::stopThread):
+        (WebCore::SocketStreamHandle::didReceiveData):
+        (WebCore::SocketStreamHandle::didOpenSocket):
+        (WebCore::SocketStreamHandle::createCopy):
+
 2015-05-14  Carlos Garcia Campos  <cgarcia@igalia.com>
 
         REGRESSION(r183861): [SOUP] Downloads are broken when using the Network Process
index 4e67af8..ff65559 100644 (file)
 
 #include "SocketStreamHandleBase.h"
 
-#include <wtf/PassRefPtr.h>
+#if PLATFORM(WIN)
+#include <winsock2.h>
+#endif
+
+#include <curl/curl.h>
+
+#include <mutex>
+
+#include <wtf/Deque.h>
 #include <wtf/RefCounted.h>
+#include <wtf/Threading.h>
 
 namespace WebCore {
 
-    class AuthenticationChallenge;
-    class Credential;
-    class SocketStreamHandleClient;
+class AuthenticationChallenge;
+class Credential;
+class SocketStreamHandleClient;
+
+class SocketStreamHandle : public ThreadSafeRefCounted<SocketStreamHandle>, public SocketStreamHandleBase {
+public:
+    static Ref<SocketStreamHandle> create(const URL& url, SocketStreamHandleClient* client) { return adoptRef(*new SocketStreamHandle(url, client)); }
+
+    virtual ~SocketStreamHandle();
+
+private:
+    SocketStreamHandle(const URL&, SocketStreamHandleClient*);
 
-    class SocketStreamHandle : public RefCounted<SocketStreamHandle>, public SocketStreamHandleBase {
-    public:
-        static PassRefPtr<SocketStreamHandle> create(const URL& url, SocketStreamHandleClient* client) { return adoptRef(new SocketStreamHandle(url, client)); }
+    int platformSend(const char* data, int length) override;
+    void platformClose() override;
 
-        virtual ~SocketStreamHandle();
+    bool readData(CURL*);
+    bool sendData(CURL*);
+    bool waitForAvailableData(CURL*, std::chrono::milliseconds selectTimeout);
 
-    protected:
-        virtual int platformSend(const char* data, int length);
-        virtual void platformClose();
+    void startThread();
+    void stopThread();
 
-    private:
-        SocketStreamHandle(const URL&, SocketStreamHandleClient*);
+    void didReceiveData();
+    void didOpenSocket();
 
-        // No authentication for streams per se, but proxy may ask for credentials.
-        void didReceiveAuthenticationChallenge(const AuthenticationChallenge&);
-        void receivedCredential(const AuthenticationChallenge&, const Credential&);
-        void receivedRequestToContinueWithoutCredential(const AuthenticationChallenge&);
-        void receivedCancellation(const AuthenticationChallenge&);
-        void receivedRequestToPerformDefaultHandling(const AuthenticationChallenge&);
-        void receivedChallengeRejection(const AuthenticationChallenge&);
+    static std::unique_ptr<char[]> createCopy(const char* data, int length);
+
+    // No authentication for streams per se, but proxy may ask for credentials.
+    void didReceiveAuthenticationChallenge(const AuthenticationChallenge&);
+    void receivedCredential(const AuthenticationChallenge&, const Credential&);
+    void receivedRequestToContinueWithoutCredential(const AuthenticationChallenge&);
+    void receivedCancellation(const AuthenticationChallenge&);
+    void receivedRequestToPerformDefaultHandling(const AuthenticationChallenge&);
+    void receivedChallengeRejection(const AuthenticationChallenge&);
+
+    struct SocketData {
+        SocketData(std::unique_ptr<char[]>&& source, int length)
+        {
+            data = WTF::move(source);
+            size = length;
+        }
+
+        SocketData(SocketData&& other)
+        {
+            data = WTF::move(other.data);
+            size = other.size;
+            other.size = 0;
+        }
+
+        std::unique_ptr<char[]> data;
+        int size { 0 };
     };
 
-}  // namespace WebCore
+    ThreadIdentifier m_workerThread { 0 };
+    std::atomic<bool> m_stopThread { false };
+    std::mutex m_mutexSend;
+    std::mutex m_mutexReceive;
+    Deque<SocketData> m_sendData;
+    Deque<SocketData> m_receiveData;
+};
+
+} // namespace WebCore
 
-#endif  // SocketStreamHandle_h
+#endif // SocketStreamHandle_h
index e8db879..2072543 100644 (file)
 
 #if USE(CURL)
 
-#include "URL.h"
 #include "Logging.h"
 #include "NotImplemented.h"
 #include "SocketStreamHandleClient.h"
+#include "URL.h"
+#include <wtf/MainThread.h>
+#include <wtf/text/CString.h>
 
 namespace WebCore {
 
@@ -45,27 +47,239 @@ SocketStreamHandle::SocketStreamHandle(const URL& url, SocketStreamHandleClient*
     : SocketStreamHandleBase(url, client)
 {
     LOG(Network, "SocketStreamHandle %p new client %p", this, m_client);
-    notImplemented();
+    ASSERT(isMainThread());
+    startThread();
 }
 
 SocketStreamHandle::~SocketStreamHandle()
 {
     LOG(Network, "SocketStreamHandle %p delete", this);
-    setClient(0);
-    notImplemented();
+    ASSERT(!m_workerThread);
 }
 
-int SocketStreamHandle::platformSend(const char*, int)
+int SocketStreamHandle::platformSend(const char* data, int length)
 {
     LOG(Network, "SocketStreamHandle %p platformSend", this);
-    notImplemented();
-    return 0;
+
+    ASSERT(isMainThread());
+
+    startThread();
+
+    auto copy = createCopy(data, length);
+
+    std::lock_guard<std::mutex> lock(m_mutexSend);
+    m_sendData.append(SocketData { WTF::move(copy), length });
+
+    return length;
 }
 
 void SocketStreamHandle::platformClose()
 {
     LOG(Network, "SocketStreamHandle %p platformClose", this);
-    notImplemented();
+
+    ASSERT(isMainThread());
+
+    stopThread();
+
+    if (m_client)
+        m_client->didCloseSocketStream(this);
+}
+
+bool SocketStreamHandle::readData(CURL* curlHandle)
+{
+    ASSERT(!isMainThread());
+
+    const int bufferSize = 1024;
+    std::unique_ptr<char[]> data(new char[bufferSize]);
+    size_t bytesRead = 0;
+
+    CURLcode ret = curl_easy_recv(curlHandle, data.get(), bufferSize, &bytesRead);
+
+    if (ret == CURLE_OK && bytesRead >= 0) {
+        m_mutexReceive.lock();
+        m_receiveData.append(SocketData { WTF::move(data), static_cast<int>(bytesRead) });
+        m_mutexReceive.unlock();
+
+        ref();
+
+        callOnMainThread([this] {
+            didReceiveData();
+            deref();
+        });
+
+        return true;
+    }
+
+    if (ret == CURLE_AGAIN)
+        return true;
+
+    return false;
+}
+
+bool SocketStreamHandle::sendData(CURL* curlHandle)
+{
+    ASSERT(!isMainThread());
+
+    while (true) {
+
+        m_mutexSend.lock();
+        if (!m_sendData.size()) {
+            m_mutexSend.unlock();
+            break;
+        }
+        auto sendData = m_sendData.takeFirst();
+        m_mutexSend.unlock();
+
+        int totalBytesSent = 0;
+        while (totalBytesSent < sendData.size) {
+            size_t bytesSent = 0;
+            CURLcode ret = curl_easy_send(curlHandle, sendData.data.get() + totalBytesSent, sendData.size - totalBytesSent, &bytesSent);
+            if (ret == CURLE_OK)
+                totalBytesSent += bytesSent;
+            else
+                break;
+        }
+
+        // Insert remaining data into send queue.
+
+        if (totalBytesSent < sendData.size) {
+            const int restLength = sendData.size - totalBytesSent;
+            auto copy = createCopy(sendData.data.get() + totalBytesSent, restLength);
+
+            std::lock_guard<std::mutex> lock(m_mutexSend);
+            m_sendData.prepend(SocketData { WTF::move(copy), restLength });
+
+            return false;
+        }
+    }
+
+    return true;
+}
+
+bool SocketStreamHandle::waitForAvailableData(CURL* curlHandle, std::chrono::milliseconds selectTimeout)
+{
+    ASSERT(!isMainThread());
+
+    std::chrono::microseconds usec = std::chrono::duration_cast<std::chrono::microseconds>(selectTimeout);
+
+    struct timeval timeout;
+    if (usec <= std::chrono::microseconds(0)) {
+        timeout.tv_sec = 0;
+        timeout.tv_usec = 0;
+    } else {
+        timeout.tv_sec = usec.count() / 1000000;
+        timeout.tv_usec = usec.count() % 1000000;
+    }
+
+    long socket;
+    if (curl_easy_getinfo(curlHandle, CURLINFO_LASTSOCKET, &socket) != CURLE_OK)
+        return false;
+
+    fd_set fdread;
+    FD_ZERO(&fdread);
+    FD_SET(socket, &fdread);
+    int rc = ::select(0, &fdread, nullptr, nullptr, &timeout);
+    return rc == 1;
+}
+
+void SocketStreamHandle::startThread()
+{
+    ASSERT(isMainThread());
+
+    if (m_workerThread)
+        return;
+
+    ref(); // stopThread() will call deref().
+
+    m_workerThread = createThread("WebSocket thread", [this] {
+
+        ASSERT(!isMainThread());
+
+        CURL* curlHandle = curl_easy_init();
+
+        if (!curlHandle)
+            return;
+
+        curl_easy_setopt(curlHandle, CURLOPT_URL, m_url.host().utf8().data());
+        curl_easy_setopt(curlHandle, CURLOPT_PORT, m_url.port());
+        curl_easy_setopt(curlHandle, CURLOPT_CONNECT_ONLY);
+
+        // Connect to host
+        if (curl_easy_perform(curlHandle) != CURLE_OK)
+            return;
+
+        ref();
+
+        callOnMainThread([this] {
+            // Check reference count to fix a crash.
+            // When the call is invoked on the main thread after all other references are released, the SocketStreamClient
+            // is already deleted. Accessing the SocketStreamClient in didOpenSocket() will then cause a crash.
+            if (refCount() > 1)
+                didOpenSocket();
+            deref();
+        });
+
+        while (!m_stopThread) {
+            // Send queued data
+            sendData(curlHandle);
+
+            // Wait until socket has available data
+            if (waitForAvailableData(curlHandle, std::chrono::milliseconds(20)))
+                readData(curlHandle);
+        }
+
+        curl_easy_cleanup(curlHandle);
+    });
+}
+
+void SocketStreamHandle::stopThread()
+{
+    ASSERT(isMainThread());
+
+    if (!m_workerThread)
+        return;
+
+    m_stopThread = true;
+    waitForThreadCompletion(m_workerThread);
+    m_workerThread = 0;
+    deref();
+}
+
+void SocketStreamHandle::didReceiveData()
+{
+    ASSERT(isMainThread());
+
+    m_mutexReceive.lock();
+
+    auto receiveData = WTF::move(m_receiveData);
+
+    m_mutexReceive.unlock();
+
+    for (auto& socketData : receiveData) {
+        if (socketData.size > 0) {
+            if (m_client && state() == Open)
+                m_client->didReceiveSocketStreamData(this, socketData.data.get(), socketData.size);
+        } else
+            platformClose();
+    }
+}
+
+void SocketStreamHandle::didOpenSocket()
+{
+    ASSERT(isMainThread());
+
+    m_state = Open;
+
+    if (m_client)
+        m_client->didOpenSocketStream(this);
+}
+
+std::unique_ptr<char[]> SocketStreamHandle::createCopy(const char* data, int length)
+{
+    std::unique_ptr<char[]> copy(new char[length]);
+    memcpy(copy.get(), data, length);
+
+    return WTF::move(copy);
 }
 
 void SocketStreamHandle::didReceiveAuthenticationChallenge(const AuthenticationChallenge&)
@@ -98,6 +312,6 @@ void SocketStreamHandle::receivedChallengeRejection(const AuthenticationChalleng
     notImplemented();
 }
 
-}  // namespace WebCore
+} // namespace WebCore
 
 #endif