Unreviewed, roll out http://trac.webkit.org/changeset/187972.
[WebKit-https.git] / Source / WTF / wtf / efl / DispatchQueueEfl.cpp
1 /*
2  * Copyright (C) 2013 Samsung Electronics. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "DispatchQueueEfl.h"
28
29 #include <DispatchQueueWorkItemEfl.h>
30 #include <sys/timerfd.h>
31 #include <unistd.h>
32 #include <wtf/Assertions.h>
33 #include <wtf/CurrentTime.h>
34 #include <wtf/Threading.h>
35
36 static const int microSecondsPerSecond = 1000000;
37 static const int nanoSecondsPerSecond = 1000000000;
38 static const int invalidSocketDescriptor = -1;
39 static const char wakeUpThreadMessage = 'W';
40
41 class DispatchQueue::ThreadContext {
42 public:
43     static void start(const char* name, PassRefPtr<DispatchQueue> dispatchQueue)
44     {
45         // The DispatchQueueThreadContext instance will be passed to the thread function and deleted in it.
46         detachThread(createThread(reinterpret_cast<WTF::ThreadFunction>(&ThreadContext::function), new ThreadContext(dispatchQueue), name));
47     }
48
49 private:
50     ThreadContext(PassRefPtr<DispatchQueue> dispatchQueue)
51         : m_dispatchQueue(dispatchQueue)
52     {
53     }
54
55     static void* function(ThreadContext* threadContext)
56     {
57         std::unique_ptr<ThreadContext>(threadContext)->m_dispatchQueue->dispatchQueueThread();
58         return 0;
59     }
60
61     RefPtr<DispatchQueue> m_dispatchQueue;
62 };
63
64 PassRefPtr<DispatchQueue> DispatchQueue::create(const char* name)
65 {
66     RefPtr<DispatchQueue> dispatchQueue = adoptRef<DispatchQueue>(new DispatchQueue());
67
68     ThreadContext::start(name, dispatchQueue);
69
70     return dispatchQueue.release();
71 }
72
73 DispatchQueue::DispatchQueue()
74     : m_isThreadRunning(true)
75 {
76     int fds[2];
77     if (pipe(fds))
78         ASSERT_NOT_REACHED();
79
80     m_readFromPipeDescriptor = fds[0];
81     m_writeToPipeDescriptor = fds[1];
82     FD_ZERO(&m_fileDescriptorSet);
83     FD_SET(m_readFromPipeDescriptor, &m_fileDescriptorSet);
84     m_maxFileDescriptor = m_readFromPipeDescriptor;
85
86     m_socketDescriptor = invalidSocketDescriptor;
87 }
88
89 DispatchQueue::~DispatchQueue()
90 {
91     close(m_readFromPipeDescriptor);
92     close(m_writeToPipeDescriptor);
93 }
94
95 void DispatchQueue::dispatch(std::unique_ptr<WorkItem> item)
96 {
97     {
98         MutexLocker locker(m_workItemsLock);
99         m_workItems.append(WTF::move(item));
100     }
101
102     wakeUpThread();
103 }
104
105 void DispatchQueue::dispatch(std::unique_ptr<TimerWorkItem> item)
106 {
107     insertTimerWorkItem(WTF::move(item));
108     wakeUpThread();
109 }
110
111 void DispatchQueue::stopThread()
112 {
113     ASSERT(m_socketDescriptor == invalidSocketDescriptor);
114
115     m_isThreadRunning = false;
116     wakeUpThread();
117 }
118
119 void DispatchQueue::setSocketEventHandler(int fileDescriptor, std::function<void ()> function)
120 {
121     ASSERT(m_socketDescriptor == invalidSocketDescriptor);
122
123     m_socketDescriptor = fileDescriptor;
124     m_socketEventHandler = WTF::move(function);
125
126     if (fileDescriptor > m_maxFileDescriptor)
127         m_maxFileDescriptor = fileDescriptor;
128     FD_SET(fileDescriptor, &m_fileDescriptorSet);
129 }
130
131 void DispatchQueue::clearSocketEventHandler()
132 {
133     ASSERT(m_socketDescriptor != invalidSocketDescriptor);
134
135     if (m_socketDescriptor == m_maxFileDescriptor)
136         m_maxFileDescriptor = m_readFromPipeDescriptor;
137
138     FD_CLR(m_socketDescriptor, &m_fileDescriptorSet);
139
140     m_socketDescriptor = invalidSocketDescriptor;
141 }
142
143 void DispatchQueue::performWork()
144 {
145     while (true) {
146         Vector<std::unique_ptr<WorkItem>> workItems;
147
148         {
149             MutexLocker locker(m_workItemsLock);
150             if (m_workItems.isEmpty())
151                 return;
152
153             m_workItems.swap(workItems);
154         }
155
156         for (size_t i = 0; i < workItems.size(); ++i)
157             workItems[i]->dispatch();
158     }
159 }
160
161 void DispatchQueue::performTimerWork()
162 {
163     Vector<std::unique_ptr<TimerWorkItem>> timerWorkItems;
164
165     {
166         // Protects m_timerWorkItems.
167         MutexLocker locker(m_timerWorkItemsLock);
168         if (m_timerWorkItems.isEmpty())
169             return;
170
171         // Copies all the timer work items in m_timerWorkItems to local vector.
172         m_timerWorkItems.swap(timerWorkItems);
173     }
174
175     double currentTimeNanoSeconds = monotonicallyIncreasingTime() * nanoSecondsPerSecond;
176
177     for (size_t i = 0; i < timerWorkItems.size(); ++i) {
178         if (!timerWorkItems[i]->hasExpired(currentTimeNanoSeconds)) {
179             insertTimerWorkItem(WTF::move(timerWorkItems[i]));
180             continue;
181         }
182
183         // If a timer work item has expired, dispatch the function of the work item.
184         timerWorkItems[i]->dispatch();
185     }
186 }
187
188 void DispatchQueue::performFileDescriptorWork()
189 {
190     fd_set readFileDescriptorSet = m_fileDescriptorSet;
191
192     if (select(m_maxFileDescriptor + 1, &readFileDescriptorSet, 0, 0, getNextTimeOut()) >= 0) {
193         if (FD_ISSET(m_readFromPipeDescriptor, &readFileDescriptorSet)) {
194             char message;
195             if (read(m_readFromPipeDescriptor, &message, 1) == -1)
196                 LOG_ERROR("Failed to read from DispatchQueue Thread pipe");
197
198             ASSERT(message == wakeUpThreadMessage);
199         }
200
201         if (m_socketDescriptor != invalidSocketDescriptor && FD_ISSET(m_socketDescriptor, &readFileDescriptorSet))
202             m_socketEventHandler();
203     }
204 }
205
206 void DispatchQueue::insertTimerWorkItem(std::unique_ptr<TimerWorkItem> item)
207 {
208     ASSERT(item);
209
210     size_t position = 0;
211
212     MutexLocker locker(m_timerWorkItemsLock);
213     // The items should be ordered by expire time.
214     for (; position < m_timerWorkItems.size(); ++position)
215         if (item->expirationTimeNanoSeconds() < m_timerWorkItems[position]->expirationTimeNanoSeconds())
216             break;
217
218     m_timerWorkItems.insert(position, WTF::move(item));
219 }
220
221 void DispatchQueue::dispatchQueueThread()
222 {
223     while (m_isThreadRunning) {
224         performWork();
225         performTimerWork();
226         performFileDescriptorWork();
227     }
228 }
229
230 void DispatchQueue::wakeUpThread()
231 {
232     MutexLocker locker(m_writeToPipeDescriptorLock);
233     if (write(m_writeToPipeDescriptor, &wakeUpThreadMessage, sizeof(char)) == -1)
234         LOG_ERROR("Failed to wake up DispatchQueue Thread");
235 }
236
237 timeval* DispatchQueue::getNextTimeOut() const
238 {
239     MutexLocker locker(m_timerWorkItemsLock);
240     if (m_timerWorkItems.isEmpty())
241         return 0;
242
243     static timeval timeValue;
244     timeValue.tv_sec = 0;
245     timeValue.tv_usec = 0;
246     double timeOutSeconds = (m_timerWorkItems[0]->expirationTimeNanoSeconds() - monotonicallyIncreasingTime() * nanoSecondsPerSecond) / nanoSecondsPerSecond;
247     if (timeOutSeconds > 0) {
248         timeValue.tv_sec = static_cast<long>(timeOutSeconds);
249         timeValue.tv_usec = static_cast<long>((timeOutSeconds - timeValue.tv_sec) * microSecondsPerSecond);
250     }
251
252     return &timeValue;
253 }