2 * Copyright (C) 2015-2016 Apple Inc. All rights reserved.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
17 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 #include "ParkingLot.h"
30 #include "HashFunctions.h"
31 #include "StringPrintStream.h"
32 #include "ThreadSpecific.h"
33 #include "ThreadingPrimitives.h"
36 #include <condition_variable>
44 const bool verbose = false;
47 WTF_MAKE_FAST_ALLOCATED;
53 ThreadIdentifier threadIdentifier;
55 std::mutex parkingLock;
56 std::condition_variable parkingCondition;
58 const void* address { nullptr };
60 ThreadData* nextInQueue { nullptr };
63 enum class DequeueResult {
70 WTF_MAKE_FAST_ALLOCATED;
72 void enqueue(ThreadData* data)
75 dataLog(toString(currentThread(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
76 ASSERT(data->address);
77 ASSERT(!data->nextInQueue);
80 queueTail->nextInQueue = data;
89 template<typename Functor>
90 void genericDequeue(const Functor& functor)
93 dataLog(toString(currentThread(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
97 dataLog(toString(currentThread(), ": empty.\n"));
101 // This loop is a very clever abomination. The induction variables are the pointer to the
102 // pointer to the current node, and the pointer to the previous node. This gives us everything
103 // we need to both proceed forward to the next node, and to remove nodes while maintaining the
104 // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head
105 // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then
106 // we'd want queueTail to be set to nullptr. This works because:
108 // currentPtr == &queueHead
109 // previous == nullptr
111 // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer
112 // that used to point to this node to instead point to this node's successor. Another example:
113 // if we were at the second node in the queue, then we'd have:
115 // currentPtr == &queueHead->nextInQueue
116 // previous == queueHead
118 // If this node is not equal to queueTail, then removing it simply means making
119 // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm
120 // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set
121 // queueTail to previous, which in this case is queueHead - thus making the queue look like a
122 // proper one-element queue with queueHead == queueTail.
123 bool shouldContinue = true;
124 ThreadData** currentPtr = &queueHead;
125 ThreadData* previous = nullptr;
126 while (shouldContinue) {
127 ThreadData* current = *currentPtr;
129 dataLog(toString(currentThread(), ": got thread ", RawPointer(current), "\n"));
132 DequeueResult result = functor(current);
134 case DequeueResult::Ignore:
136 dataLog(toString(currentThread(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
138 currentPtr = &(*currentPtr)->nextInQueue;
140 case DequeueResult::RemoveAndContinue:
141 case DequeueResult::RemoveAndStop:
143 dataLog(toString(currentThread(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
144 if (current == queueTail)
145 queueTail = previous;
146 *currentPtr = current->nextInQueue;
147 current->nextInQueue = nullptr;
148 if (result == DequeueResult::RemoveAndStop)
149 shouldContinue = false;
154 ASSERT(!!queueHead == !!queueTail);
157 ThreadData* dequeue()
159 ThreadData* result = nullptr;
161 [&] (ThreadData* element) -> DequeueResult {
163 return DequeueResult::RemoveAndStop;
168 ThreadData* queueHead { nullptr };
169 ThreadData* queueTail { nullptr };
171 // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding
175 // Put some distane between buckets in memory. This is one of several mitigations against false
182 // We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors.
183 Vector<Hashtable*>* hashtables;
184 StaticWordLock hashtablesLock;
188 Atomic<Bucket*> data[1];
190 static Hashtable* create(unsigned size)
194 Hashtable* result = static_cast<Hashtable*>(
195 fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
199 // This is not fast and it's not data-access parallel, but that's fine, because
200 // hashtable resizing is guaranteed to be rare and it will never happen in steady
202 WordLockHolder locker(hashtablesLock);
204 hashtables = new Vector<Hashtable*>();
205 hashtables->append(result);
211 static void destroy(Hashtable* hashtable)
214 // This is not fast, but that's OK. See comment in create().
215 WordLockHolder locker(hashtablesLock);
216 hashtables->removeFirst(hashtable);
223 ThreadSpecific<ThreadData>* threadData;
224 Atomic<Hashtable*> hashtable;
225 Atomic<unsigned> numThreads;
227 // With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the
228 // memory usage per thread will still be less than 1KB.
229 const unsigned maxLoadFactor = 3;
231 const unsigned growthFactor = 2;
233 unsigned hashAddress(const void* address)
235 return WTF::PtrHash<const void*>::hash(address);
238 Hashtable* ensureHashtable()
241 Hashtable* currentHashtable = hashtable.load();
243 if (currentHashtable)
244 return currentHashtable;
246 if (!currentHashtable) {
247 currentHashtable = Hashtable::create(maxLoadFactor);
248 if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
250 dataLog(toString(currentThread(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
251 return currentHashtable;
254 Hashtable::destroy(currentHashtable);
259 // Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different
260 // after this returns than when you called it. Guarantees that there is a hashtable. This is pretty
261 // slow and not scalable, so it's only used during thread creation and for debugging/testing.
262 Vector<Bucket*> lockHashtable()
265 Hashtable* currentHashtable = ensureHashtable();
267 ASSERT(currentHashtable);
269 // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that
270 // we can lock all of the buckets, not just the ones that are materialized.
271 Vector<Bucket*> buckets;
272 for (unsigned i = currentHashtable->size; i--;) {
273 Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];
276 Bucket* bucket = bucketPointer.load();
279 bucket = new Bucket();
280 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
286 buckets.append(bucket);
291 // Now lock the buckets in the right order.
292 std::sort(buckets.begin(), buckets.end());
293 for (Bucket* bucket : buckets)
296 // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it
298 if (hashtable.load() == currentHashtable)
301 // The hashtable rehashed. Unlock everything and try again.
302 for (Bucket* bucket : buckets)
303 bucket->lock.unlock();
307 void unlockHashtable(const Vector<Bucket*>& buckets)
309 for (Bucket* bucket : buckets)
310 bucket->lock.unlock();
313 // Rehash the hashtable to handle numThreads threads.
314 void ensureHashtableSize(unsigned numThreads)
316 // We try to ensure that the size of the hashtable used for thread queues is always large enough
317 // to avoid collisions. So, since we started a new thread, we may need to increase the size of the
318 // hashtable. This does just that. Note that we never free the old spine, since we never lock
319 // around spine accesses (i.e. the "hashtable" global variable).
321 // First do a fast check to see if rehashing is needed.
322 Hashtable* oldHashtable = hashtable.load();
323 if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
325 dataLog(toString(currentThread(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
329 // Seems like we *might* have to rehash, so lock the hashtable and try again.
330 Vector<Bucket*> bucketsToUnlock = lockHashtable();
332 // Check again, since the hashtable could have rehashed while we were locking it. Also,
333 // lockHashtable() creates an initial hashtable for us.
334 oldHashtable = hashtable.load();
335 if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
337 dataLog(toString(currentThread(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
338 unlockHashtable(bucketsToUnlock);
342 Vector<Bucket*> reusableBuckets = bucketsToUnlock;
344 // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas
345 // are placed into the vector in queue order.
346 Vector<ThreadData*> threadDatas;
347 for (Bucket* bucket : reusableBuckets) {
348 while (ThreadData* threadData = bucket->dequeue())
349 threadDatas.append(threadData);
352 unsigned newSize = numThreads * growthFactor * maxLoadFactor;
353 RELEASE_ASSERT(newSize > oldHashtable->size);
355 Hashtable* newHashtable = Hashtable::create(newSize);
357 dataLog(toString(currentThread(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
358 for (ThreadData* threadData : threadDatas) {
360 dataLog(toString(currentThread(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
361 unsigned hash = hashAddress(threadData->address);
362 unsigned index = hash % newHashtable->size;
364 dataLog(toString(currentThread(), ": index = ", index, "\n"));
365 Bucket* bucket = newHashtable->data[index].load();
367 if (reusableBuckets.isEmpty())
368 bucket = new Bucket();
370 bucket = reusableBuckets.takeLast();
371 newHashtable->data[index].store(bucket);
374 bucket->enqueue(threadData);
377 // At this point there may be some buckets left unreused. This could easily happen if the
378 // number of enqueued threads right now is low but the high watermark of the number of threads
379 // enqueued was high. We place these buckets into the hashtable basically at random, just to
380 // make sure we don't leak them.
381 for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
382 Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
383 if (bucketPtr.load())
385 bucketPtr.store(reusableBuckets.takeLast());
388 // Since we increased the size of the hashtable, we should have exhausted our preallocated
390 ASSERT(reusableBuckets.isEmpty());
392 // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and
393 // roll. After we install the new hashtable, we can release all bucket locks.
395 bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable);
396 RELEASE_ASSERT(result);
398 unlockHashtable(bucketsToUnlock);
401 ThreadData::ThreadData()
402 : threadIdentifier(currentThread())
404 unsigned currentNumThreads;
406 unsigned oldNumThreads = numThreads.load();
407 currentNumThreads = oldNumThreads + 1;
408 if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
412 ensureHashtableSize(currentNumThreads);
415 ThreadData::~ThreadData()
418 unsigned oldNumThreads = numThreads.load();
419 if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
424 ThreadData* myThreadData()
426 static std::once_flag initializeOnce;
430 threadData = new ThreadSpecific<ThreadData>();
436 template<typename Functor>
437 bool enqueue(const void* address, const Functor& functor)
439 unsigned hash = hashAddress(address);
442 Hashtable* myHashtable = ensureHashtable();
443 unsigned index = hash % myHashtable->size;
444 Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
447 bucket = bucketPointer.load();
449 bucket = new Bucket();
450 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
458 dataLog(toString(currentThread(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
461 // At this point the hashtable could have rehashed under us.
462 if (hashtable.load() != myHashtable) {
463 bucket->lock.unlock();
467 ThreadData* threadData = functor();
471 dataLog(toString(currentThread(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
472 bucket->enqueue(threadData);
476 bucket->lock.unlock();
481 enum class BucketMode {
486 template<typename DequeueFunctor, typename FinishFunctor>
488 const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
489 const FinishFunctor& finishFunctor)
491 unsigned hash = hashAddress(address);
494 Hashtable* myHashtable = ensureHashtable();
495 unsigned index = hash % myHashtable->size;
496 Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
497 Bucket* bucket = bucketPointer.load();
499 if (bucketMode == BucketMode::IgnoreEmpty)
503 bucket = bucketPointer.load();
505 bucket = new Bucket();
506 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
517 // At this point the hashtable could have rehashed under us.
518 if (hashtable.load() != myHashtable) {
519 bucket->lock.unlock();
523 bucket->genericDequeue(dequeueFunctor);
524 bool result = !!bucket->queueHead;
525 finishFunctor(result);
526 bucket->lock.unlock();
531 } // anonymous namespace
533 NEVER_INLINE bool ParkingLot::parkConditionallyImpl(
535 const ScopedLambda<bool()>& validation,
536 const ScopedLambda<void()>& beforeSleep,
537 Clock::time_point timeout)
540 dataLog(toString(currentThread(), ": parking.\n"));
542 ThreadData* me = myThreadData();
544 // Guard against someone calling parkConditionally() recursively from beforeSleep().
545 RELEASE_ASSERT(!me->address);
547 bool result = enqueue(
549 [&] () -> ThreadData* {
553 me->address = address;
564 std::unique_lock<std::mutex> locker(me->parkingLock);
565 while (me->address && Clock::now() < timeout) {
566 // Falling back to wait() works around a bug in libstdc++ implementation of std::condition_variable. See:
567 // - https://bugs.webkit.org/show_bug.cgi?id=148027
568 // - https://gcc.gnu.org/bugzilla/show_bug.cgi?id=58931
569 if (timeout == Clock::time_point::max())
570 me->parkingCondition.wait(locker);
572 me->parkingCondition.wait_until(locker, timeout);
574 // Because of the above, we do this thing, which is hilariously awful, but ensures that the worst case is
575 // a CPU-eating spin but not a deadlock.
579 ASSERT(!me->address || me->address == address);
580 didGetDequeued = !me->address;
583 if (didGetDequeued) {
584 // Great! We actually got dequeued rather than the timeout expiring.
588 // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet.
590 // It's possible that we get unparked right here, just before dequeue() grabs a lock. It's
591 // probably worthwhile to detect when this happens, and return true in that case, to ensure
592 // that when we return false it really means that no unpark could have been responsible for us
593 // waking up, and that if an unpark call did happen, it woke someone else up.
595 address, BucketMode::IgnoreEmpty,
596 [&] (ThreadData* element) {
598 return DequeueResult::RemoveAndStop;
599 return DequeueResult::Ignore;
603 ASSERT(!me->nextInQueue);
605 // Make sure that no matter what, me->address is null after this point.
607 std::lock_guard<std::mutex> locker(me->parkingLock);
608 me->address = nullptr;
611 // If we were not found in the search above, then we know that someone unparked us.
615 NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
618 dataLog(toString(currentThread(), ": unparking one.\n"));
622 ThreadData* threadData = nullptr;
623 result.mayHaveMoreThreads = dequeue(
625 BucketMode::EnsureNonEmpty,
626 [&] (ThreadData* element) {
627 if (element->address != address)
628 return DequeueResult::Ignore;
629 threadData = element;
630 result.didUnparkThread = true;
631 return DequeueResult::RemoveAndStop;
636 ASSERT(!result.didUnparkThread);
637 result.mayHaveMoreThreads = false;
641 ASSERT(threadData->address);
644 std::unique_lock<std::mutex> locker(threadData->parkingLock);
645 threadData->address = nullptr;
647 threadData->parkingCondition.notify_one();
652 NEVER_INLINE void ParkingLot::unparkOneImpl(
654 const ScopedLambda<void(ParkingLot::UnparkResult)>& callback)
657 dataLog(toString(currentThread(), ": unparking one the hard way.\n"));
659 ThreadData* threadData = nullptr;
662 BucketMode::EnsureNonEmpty,
663 [&] (ThreadData* element) {
664 if (element->address != address)
665 return DequeueResult::Ignore;
666 threadData = element;
667 return DequeueResult::RemoveAndStop;
669 [&] (bool mayHaveMoreThreads) {
671 result.didUnparkThread = !!threadData;
672 result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
679 ASSERT(threadData->address);
682 std::unique_lock<std::mutex> locker(threadData->parkingLock);
683 threadData->address = nullptr;
685 threadData->parkingCondition.notify_one();
688 NEVER_INLINE void ParkingLot::unparkAll(const void* address)
691 dataLog(toString(currentThread(), ": unparking all from ", RawPointer(address), ".\n"));
693 Vector<ThreadData*, 8> threadDatas;
696 BucketMode::IgnoreEmpty,
697 [&] (ThreadData* element) {
699 dataLog(toString(currentThread(), ": Observing element with address = ", RawPointer(element->address), "\n"));
700 if (element->address != address)
701 return DequeueResult::Ignore;
702 threadDatas.append(element);
703 return DequeueResult::RemoveAndContinue;
707 for (ThreadData* threadData : threadDatas) {
709 dataLog(toString(currentThread(), ": unparking ", RawPointer(threadData), " with address ", RawPointer(threadData->address), "\n"));
710 ASSERT(threadData->address);
712 std::unique_lock<std::mutex> locker(threadData->parkingLock);
713 threadData->address = nullptr;
715 threadData->parkingCondition.notify_one();
719 dataLog(toString(currentThread(), ": done unparking.\n"));
722 NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(ThreadIdentifier, const void*)>& callback)
724 Vector<Bucket*> bucketsToUnlock = lockHashtable();
726 Hashtable* currentHashtable = hashtable.load();
727 for (unsigned i = currentHashtable->size; i--;) {
728 Bucket* bucket = currentHashtable->data[i].load();
731 for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
732 callback(currentThreadData->threadIdentifier, currentThreadData->address);
735 unlockHashtable(bucketsToUnlock);