2 * Copyright (C) 2013 Samsung Electronics. All rights reserved.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
27 #include "DispatchQueueEfl.h"
29 #include <DispatchQueueWorkItemEfl.h>
30 #include <sys/timerfd.h>
32 #include <wtf/Assertions.h>
33 #include <wtf/CurrentTime.h>
34 #include <wtf/Threading.h>
36 static const int microSecondsPerSecond = 1000000;
37 static const int nanoSecondsPerSecond = 1000000000;
38 static const int invalidSocketDescriptor = -1;
39 static const char wakeUpThreadMessage = 'W';
41 class DispatchQueue::ThreadContext {
43 static void start(const char* name, PassRefPtr<DispatchQueue> dispatchQueue)
45 // The DispatchQueueThreadContext instance will be passed to the thread function and deleted in it.
46 detachThread(createThread(reinterpret_cast<WTF::ThreadFunction>(&ThreadContext::function), new ThreadContext(dispatchQueue), name));
50 ThreadContext(PassRefPtr<DispatchQueue> dispatchQueue)
51 : m_dispatchQueue(dispatchQueue)
55 static void* function(ThreadContext* threadContext)
57 std::unique_ptr<ThreadContext>(threadContext)->m_dispatchQueue->dispatchQueueThread();
61 RefPtr<DispatchQueue> m_dispatchQueue;
64 PassRefPtr<DispatchQueue> DispatchQueue::create(const char* name)
66 RefPtr<DispatchQueue> dispatchQueue = adoptRef<DispatchQueue>(new DispatchQueue());
68 ThreadContext::start(name, dispatchQueue);
70 return dispatchQueue.release();
73 DispatchQueue::DispatchQueue()
74 : m_isThreadRunning(true)
80 m_readFromPipeDescriptor = fds[0];
81 m_writeToPipeDescriptor = fds[1];
82 FD_ZERO(&m_fileDescriptorSet);
83 FD_SET(m_readFromPipeDescriptor, &m_fileDescriptorSet);
84 m_maxFileDescriptor = m_readFromPipeDescriptor;
86 m_socketDescriptor = invalidSocketDescriptor;
89 DispatchQueue::~DispatchQueue()
91 close(m_readFromPipeDescriptor);
92 close(m_writeToPipeDescriptor);
95 void DispatchQueue::dispatch(std::unique_ptr<WorkItem> item)
98 MutexLocker locker(m_workItemsLock);
99 m_workItems.append(WTF::move(item));
105 void DispatchQueue::dispatch(std::unique_ptr<TimerWorkItem> item)
107 insertTimerWorkItem(WTF::move(item));
111 void DispatchQueue::stopThread()
113 ASSERT(m_socketDescriptor == invalidSocketDescriptor);
115 m_isThreadRunning = false;
119 void DispatchQueue::setSocketEventHandler(int fileDescriptor, std::function<void ()> function)
121 ASSERT(m_socketDescriptor == invalidSocketDescriptor);
123 m_socketDescriptor = fileDescriptor;
124 m_socketEventHandler = WTF::move(function);
126 if (fileDescriptor > m_maxFileDescriptor)
127 m_maxFileDescriptor = fileDescriptor;
128 FD_SET(fileDescriptor, &m_fileDescriptorSet);
131 void DispatchQueue::clearSocketEventHandler()
133 ASSERT(m_socketDescriptor != invalidSocketDescriptor);
135 if (m_socketDescriptor == m_maxFileDescriptor)
136 m_maxFileDescriptor = m_readFromPipeDescriptor;
138 FD_CLR(m_socketDescriptor, &m_fileDescriptorSet);
140 m_socketDescriptor = invalidSocketDescriptor;
143 void DispatchQueue::performWork()
146 Vector<std::unique_ptr<WorkItem>> workItems;
149 MutexLocker locker(m_workItemsLock);
150 if (m_workItems.isEmpty())
153 m_workItems.swap(workItems);
156 for (size_t i = 0; i < workItems.size(); ++i)
157 workItems[i]->dispatch();
161 void DispatchQueue::performTimerWork()
163 Vector<std::unique_ptr<TimerWorkItem>> timerWorkItems;
166 // Protects m_timerWorkItems.
167 MutexLocker locker(m_timerWorkItemsLock);
168 if (m_timerWorkItems.isEmpty())
171 // Copies all the timer work items in m_timerWorkItems to local vector.
172 m_timerWorkItems.swap(timerWorkItems);
175 double currentTimeNanoSeconds = monotonicallyIncreasingTime() * nanoSecondsPerSecond;
177 for (size_t i = 0; i < timerWorkItems.size(); ++i) {
178 if (!timerWorkItems[i]->hasExpired(currentTimeNanoSeconds)) {
179 insertTimerWorkItem(WTF::move(timerWorkItems[i]));
183 // If a timer work item has expired, dispatch the function of the work item.
184 timerWorkItems[i]->dispatch();
188 void DispatchQueue::performFileDescriptorWork()
190 fd_set readFileDescriptorSet = m_fileDescriptorSet;
192 if (select(m_maxFileDescriptor + 1, &readFileDescriptorSet, 0, 0, getNextTimeOut()) >= 0) {
193 if (FD_ISSET(m_readFromPipeDescriptor, &readFileDescriptorSet)) {
195 if (read(m_readFromPipeDescriptor, &message, 1) == -1)
196 LOG_ERROR("Failed to read from DispatchQueue Thread pipe");
198 ASSERT(message == wakeUpThreadMessage);
201 if (m_socketDescriptor != invalidSocketDescriptor && FD_ISSET(m_socketDescriptor, &readFileDescriptorSet))
202 m_socketEventHandler();
206 void DispatchQueue::insertTimerWorkItem(std::unique_ptr<TimerWorkItem> item)
212 MutexLocker locker(m_timerWorkItemsLock);
213 // The items should be ordered by expire time.
214 for (; position < m_timerWorkItems.size(); ++position)
215 if (item->expirationTimeNanoSeconds() < m_timerWorkItems[position]->expirationTimeNanoSeconds())
218 m_timerWorkItems.insert(position, WTF::move(item));
221 void DispatchQueue::dispatchQueueThread()
223 while (m_isThreadRunning) {
226 performFileDescriptorWork();
230 void DispatchQueue::wakeUpThread()
232 MutexLocker locker(m_writeToPipeDescriptorLock);
233 if (write(m_writeToPipeDescriptor, &wakeUpThreadMessage, sizeof(char)) == -1)
234 LOG_ERROR("Failed to wake up DispatchQueue Thread");
237 timeval* DispatchQueue::getNextTimeOut() const
239 MutexLocker locker(m_timerWorkItemsLock);
240 if (m_timerWorkItems.isEmpty())
243 static timeval timeValue;
244 timeValue.tv_sec = 0;
245 timeValue.tv_usec = 0;
246 double timeOutSeconds = (m_timerWorkItems[0]->expirationTimeNanoSeconds() - monotonicallyIncreasingTime() * nanoSecondsPerSecond) / nanoSecondsPerSecond;
247 if (timeOutSeconds > 0) {
248 timeValue.tv_sec = static_cast<long>(timeOutSeconds);
249 timeValue.tv_usec = static_cast<long>((timeOutSeconds - timeValue.tv_sec) * microSecondsPerSecond);