[WTF] Move currentCPUTime and sleep(Seconds) to CPUTime.h and Seconds.h respectively
[WebKit-https.git] / Source / WTF / wtf / ParkingLot.cpp
1 /*
2  * Copyright (C) 2015-2016 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
17  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
24  */
25
26 #include "config.h"
27 #include "ParkingLot.h"
28
29 #include "DataLog.h"
30 #include "HashFunctions.h"
31 #include "StringPrintStream.h"
32 #include "ThreadSpecific.h"
33 #include "Threading.h"
34 #include "Vector.h"
35 #include "WeakRandom.h"
36 #include "WordLock.h"
37 #include <condition_variable>
38 #include <mutex>
39 #include <thread>
40
41 namespace WTF {
42
43 namespace {
44
45 const bool verbose = false;
46
47 struct ThreadData : public ThreadSafeRefCounted<ThreadData> {
48     WTF_MAKE_FAST_ALLOCATED;
49 public:
50     
51     ThreadData();
52     ~ThreadData();
53
54     Ref<Thread> thread;
55     
56     Mutex parkingLock;
57     ThreadCondition parkingCondition;
58
59     const void* address { nullptr };
60     
61     ThreadData* nextInQueue { nullptr };
62     
63     intptr_t token { 0 };
64 };
65
66 enum class DequeueResult {
67     Ignore,
68     RemoveAndContinue,
69     RemoveAndStop
70 };
71
72 struct Bucket {
73     WTF_MAKE_FAST_ALLOCATED;
74 public:
75     Bucket()
76         : random(static_cast<unsigned>(bitwise_cast<intptr_t>(this))) // Cannot use default seed since that recurses into Lock.
77     {
78     }
79     
80     void enqueue(ThreadData* data)
81     {
82         if (verbose)
83             dataLog(toString(Thread::current(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
84         ASSERT(data->address);
85         ASSERT(!data->nextInQueue);
86         
87         if (queueTail) {
88             queueTail->nextInQueue = data;
89             queueTail = data;
90             return;
91         }
92
93         queueHead = data;
94         queueTail = data;
95     }
96
97     template<typename Functor>
98     void genericDequeue(const Functor& functor)
99     {
100         if (verbose)
101             dataLog(toString(Thread::current(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
102         
103         if (!queueHead) {
104             if (verbose)
105                 dataLog(toString(Thread::current(), ": empty.\n"));
106             return;
107         }
108
109         // This loop is a very clever abomination. The induction variables are the pointer to the
110         // pointer to the current node, and the pointer to the previous node. This gives us everything
111         // we need to both proceed forward to the next node, and to remove nodes while maintaining the
112         // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head
113         // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then
114         // we'd want queueTail to be set to nullptr. This works because:
115         //
116         //     currentPtr == &queueHead
117         //     previous == nullptr
118         //
119         // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer
120         // that used to point to this node to instead point to this node's successor. Another example:
121         // if we were at the second node in the queue, then we'd have:
122         //
123         //     currentPtr == &queueHead->nextInQueue
124         //     previous == queueHead
125         //
126         // If this node is not equal to queueTail, then removing it simply means making
127         // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm
128         // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set
129         // queueTail to previous, which in this case is queueHead - thus making the queue look like a
130         // proper one-element queue with queueHead == queueTail.
131         bool shouldContinue = true;
132         ThreadData** currentPtr = &queueHead;
133         ThreadData* previous = nullptr;
134
135         MonotonicTime time = MonotonicTime::now();
136         bool timeToBeFair = false;
137         if (time > nextFairTime)
138             timeToBeFair = true;
139         
140         bool didDequeue = false;
141         
142         while (shouldContinue) {
143             ThreadData* current = *currentPtr;
144             if (verbose)
145                 dataLog(toString(Thread::current(), ": got thread ", RawPointer(current), "\n"));
146             if (!current)
147                 break;
148             DequeueResult result = functor(current, timeToBeFair);
149             switch (result) {
150             case DequeueResult::Ignore:
151                 if (verbose)
152                     dataLog(toString(Thread::current(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
153                 previous = current;
154                 currentPtr = &(*currentPtr)->nextInQueue;
155                 break;
156             case DequeueResult::RemoveAndStop:
157                 shouldContinue = false;
158                 FALLTHROUGH;
159             case DequeueResult::RemoveAndContinue:
160                 if (verbose)
161                     dataLog(toString(Thread::current(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
162                 if (current == queueTail)
163                     queueTail = previous;
164                 didDequeue = true;
165                 *currentPtr = current->nextInQueue;
166                 current->nextInQueue = nullptr;
167                 break;
168             }
169         }
170         
171         if (timeToBeFair && didDequeue)
172             nextFairTime = time + Seconds::fromMilliseconds(random.get());
173
174         ASSERT(!!queueHead == !!queueTail);
175     }
176     
177     ThreadData* dequeue()
178     {
179         ThreadData* result = nullptr;
180         genericDequeue(
181             [&] (ThreadData* element, bool) -> DequeueResult {
182                 result = element;
183                 return DequeueResult::RemoveAndStop;
184             });
185         return result;
186     }
187
188     ThreadData* queueHead { nullptr };
189     ThreadData* queueTail { nullptr };
190
191     // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding
192     // this lock.
193     WordLock lock;
194     
195     MonotonicTime nextFairTime;
196     
197     WeakRandom random;
198
199     // Put some distane between buckets in memory. This is one of several mitigations against false
200     // sharing.
201     char padding[64];
202 };
203
204 struct Hashtable;
205
206 // We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors.
207 Vector<Hashtable*>* hashtables;
208 StaticWordLock hashtablesLock;
209
210 struct Hashtable {
211     unsigned size;
212     Atomic<Bucket*> data[1];
213
214     static Hashtable* create(unsigned size)
215     {
216         ASSERT(size >= 1);
217         
218         Hashtable* result = static_cast<Hashtable*>(
219             fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
220         result->size = size;
221
222         {
223             // This is not fast and it's not data-access parallel, but that's fine, because
224             // hashtable resizing is guaranteed to be rare and it will never happen in steady
225             // state.
226             WordLockHolder locker(hashtablesLock);
227             if (!hashtables)
228                 hashtables = new Vector<Hashtable*>();
229             hashtables->append(result);
230         }
231         
232         return result;
233     }
234
235     static void destroy(Hashtable* hashtable)
236     {
237         {
238             // This is not fast, but that's OK. See comment in create().
239             WordLockHolder locker(hashtablesLock);
240             hashtables->removeFirst(hashtable);
241         }
242         
243         fastFree(hashtable);
244     }
245 };
246
247 Atomic<Hashtable*> hashtable;
248 Atomic<unsigned> numThreads;
249
250 // With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the
251 // memory usage per thread will still be less than 1KB.
252 const unsigned maxLoadFactor = 3;
253
254 const unsigned growthFactor = 2;
255
256 unsigned hashAddress(const void* address)
257 {
258     return WTF::PtrHash<const void*>::hash(address);
259 }
260
261 Hashtable* ensureHashtable()
262 {
263     for (;;) {
264         Hashtable* currentHashtable = hashtable.load();
265
266         if (currentHashtable)
267             return currentHashtable;
268
269         if (!currentHashtable) {
270             currentHashtable = Hashtable::create(maxLoadFactor);
271             if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
272                 if (verbose)
273                     dataLog(toString(Thread::current(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
274                 return currentHashtable;
275             }
276
277             Hashtable::destroy(currentHashtable);
278         }
279     }
280 }
281
282 // Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different
283 // after this returns than when you called it. Guarantees that there is a hashtable. This is pretty
284 // slow and not scalable, so it's only used during thread creation and for debugging/testing.
285 Vector<Bucket*> lockHashtable()
286 {
287     for (;;) {
288         Hashtable* currentHashtable = ensureHashtable();
289
290         ASSERT(currentHashtable);
291
292         // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that
293         // we can lock all of the buckets, not just the ones that are materialized.
294         Vector<Bucket*> buckets;
295         for (unsigned i = currentHashtable->size; i--;) {
296             Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];
297
298             for (;;) {
299                 Bucket* bucket = bucketPointer.load();
300
301                 if (!bucket) {
302                     bucket = new Bucket();
303                     if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
304                         delete bucket;
305                         continue;
306                     }
307                 }
308
309                 buckets.append(bucket);
310                 break;
311             }
312         }
313
314         // Now lock the buckets in the right order.
315         std::sort(buckets.begin(), buckets.end());
316         for (Bucket* bucket : buckets)
317             bucket->lock.lock();
318
319         // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it
320         // now.
321         if (hashtable.load() == currentHashtable)
322             return buckets;
323
324         // The hashtable rehashed. Unlock everything and try again.
325         for (Bucket* bucket : buckets)
326             bucket->lock.unlock();
327     }
328 }
329
330 void unlockHashtable(const Vector<Bucket*>& buckets)
331 {
332     for (Bucket* bucket : buckets)
333         bucket->lock.unlock();
334 }
335
336 // Rehash the hashtable to handle numThreads threads.
337 void ensureHashtableSize(unsigned numThreads)
338 {
339     // We try to ensure that the size of the hashtable used for thread queues is always large enough
340     // to avoid collisions. So, since we started a new thread, we may need to increase the size of the
341     // hashtable. This does just that. Note that we never free the old spine, since we never lock
342     // around spine accesses (i.e. the "hashtable" global variable).
343
344     // First do a fast check to see if rehashing is needed.
345     Hashtable* oldHashtable = hashtable.load();
346     if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
347         if (verbose)
348             dataLog(toString(Thread::current(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
349         return;
350     }
351
352     // Seems like we *might* have to rehash, so lock the hashtable and try again.
353     Vector<Bucket*> bucketsToUnlock = lockHashtable();
354
355     // Check again, since the hashtable could have rehashed while we were locking it. Also,
356     // lockHashtable() creates an initial hashtable for us.
357     oldHashtable = hashtable.load();
358     if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
359         if (verbose)
360             dataLog(toString(Thread::current(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
361         unlockHashtable(bucketsToUnlock);
362         return;
363     }
364
365     Vector<Bucket*> reusableBuckets = bucketsToUnlock;
366
367     // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas
368     // are placed into the vector in queue order.
369     Vector<ThreadData*> threadDatas;
370     for (Bucket* bucket : reusableBuckets) {
371         while (ThreadData* threadData = bucket->dequeue())
372             threadDatas.append(threadData);
373     }
374
375     unsigned newSize = numThreads * growthFactor * maxLoadFactor;
376     RELEASE_ASSERT(newSize > oldHashtable->size);
377     
378     Hashtable* newHashtable = Hashtable::create(newSize);
379     if (verbose)
380         dataLog(toString(Thread::current(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
381     for (ThreadData* threadData : threadDatas) {
382         if (verbose)
383             dataLog(toString(Thread::current(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
384         unsigned hash = hashAddress(threadData->address);
385         unsigned index = hash % newHashtable->size;
386         if (verbose)
387             dataLog(toString(Thread::current(), ": index = ", index, "\n"));
388         Bucket* bucket = newHashtable->data[index].load();
389         if (!bucket) {
390             if (reusableBuckets.isEmpty())
391                 bucket = new Bucket();
392             else
393                 bucket = reusableBuckets.takeLast();
394             newHashtable->data[index].store(bucket);
395         }
396         
397         bucket->enqueue(threadData);
398     }
399     
400     // At this point there may be some buckets left unreused. This could easily happen if the
401     // number of enqueued threads right now is low but the high watermark of the number of threads
402     // enqueued was high. We place these buckets into the hashtable basically at random, just to
403     // make sure we don't leak them.
404     for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
405         Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
406         if (bucketPtr.load())
407             continue;
408         bucketPtr.store(reusableBuckets.takeLast());
409     }
410     
411     // Since we increased the size of the hashtable, we should have exhausted our preallocated
412     // buckets by now.
413     ASSERT(reusableBuckets.isEmpty());
414     
415     // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and
416     // roll. After we install the new hashtable, we can release all bucket locks.
417     
418     bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable;
419     RELEASE_ASSERT(result);
420
421     unlockHashtable(bucketsToUnlock);
422 }
423
424 ThreadData::ThreadData()
425     : thread(Thread::current())
426 {
427     unsigned currentNumThreads;
428     for (;;) {
429         unsigned oldNumThreads = numThreads.load();
430         currentNumThreads = oldNumThreads + 1;
431         if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
432             break;
433     }
434
435     ensureHashtableSize(currentNumThreads);
436 }
437
438 ThreadData::~ThreadData()
439 {
440     for (;;) {
441         unsigned oldNumThreads = numThreads.load();
442         if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
443             break;
444     }
445 }
446
447 ThreadData* myThreadData()
448 {
449     static ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>* threadData;
450     static std::once_flag initializeOnce;
451     std::call_once(
452         initializeOnce,
453         [] {
454             threadData = new ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>();
455         });
456     
457     RefPtr<ThreadData>& result = **threadData;
458     
459     if (!result)
460         result = adoptRef(new ThreadData());
461     
462     return result.get();
463 }
464
465 template<typename Functor>
466 bool enqueue(const void* address, const Functor& functor)
467 {
468     unsigned hash = hashAddress(address);
469
470     for (;;) {
471         Hashtable* myHashtable = ensureHashtable();
472         unsigned index = hash % myHashtable->size;
473         Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
474         Bucket* bucket;
475         for (;;) {
476             bucket = bucketPointer.load();
477             if (!bucket) {
478                 bucket = new Bucket();
479                 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
480                     delete bucket;
481                     continue;
482                 }
483             }
484             break;
485         }
486         if (verbose)
487             dataLog(toString(Thread::current(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
488         bucket->lock.lock();
489
490         // At this point the hashtable could have rehashed under us.
491         if (hashtable.load() != myHashtable) {
492             bucket->lock.unlock();
493             continue;
494         }
495
496         ThreadData* threadData = functor();
497         bool result;
498         if (threadData) {
499             if (verbose)
500                 dataLog(toString(Thread::current(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
501             bucket->enqueue(threadData);
502             result = true;
503         } else
504             result = false;
505         bucket->lock.unlock();
506         return result;
507     }
508 }
509
510 enum class BucketMode {
511     EnsureNonEmpty,
512     IgnoreEmpty
513 };
514
515 template<typename DequeueFunctor, typename FinishFunctor>
516 bool dequeue(
517     const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
518     const FinishFunctor& finishFunctor)
519 {
520     unsigned hash = hashAddress(address);
521
522     for (;;) {
523         Hashtable* myHashtable = ensureHashtable();
524         unsigned index = hash % myHashtable->size;
525         Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
526         Bucket* bucket = bucketPointer.load();
527         if (!bucket) {
528             if (bucketMode == BucketMode::IgnoreEmpty)
529                 return false;
530
531             for (;;) {
532                 bucket = bucketPointer.load();
533                 if (!bucket) {
534                     bucket = new Bucket();
535                     if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
536                         delete bucket;
537                         continue;
538                     }
539                 }
540                 break;
541             }
542         }
543
544         bucket->lock.lock();
545
546         // At this point the hashtable could have rehashed under us.
547         if (hashtable.load() != myHashtable) {
548             bucket->lock.unlock();
549             continue;
550         }
551
552         bucket->genericDequeue(dequeueFunctor);
553         bool result = !!bucket->queueHead;
554         finishFunctor(result);
555         bucket->lock.unlock();
556         return result;
557     }
558 }
559
560 } // anonymous namespace
561
562 NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl(
563     const void* address,
564     const ScopedLambda<bool()>& validation,
565     const ScopedLambda<void()>& beforeSleep,
566     const TimeWithDynamicClockType& timeout)
567 {
568     if (verbose)
569         dataLog(toString(Thread::current(), ": parking.\n"));
570     
571     ThreadData* me = myThreadData();
572     me->token = 0;
573
574     // Guard against someone calling parkConditionally() recursively from beforeSleep().
575     RELEASE_ASSERT(!me->address);
576
577     bool enqueueResult = enqueue(
578         address,
579         [&] () -> ThreadData* {
580             if (!validation())
581                 return nullptr;
582
583             me->address = address;
584             return me;
585         });
586
587     if (!enqueueResult)
588         return ParkResult();
589
590     beforeSleep();
591     
592     bool didGetDequeued;
593     {
594         MutexLocker locker(me->parkingLock);
595         while (me->address && timeout.nowWithSameClock() < timeout) {
596             me->parkingCondition.timedWait(
597                 me->parkingLock, timeout.approximateWallTime());
598             
599             // It's possible for the OS to decide not to wait. If it does that then it will also
600             // decide not to release the lock. If there's a bug in the time math, then this could
601             // result in a deadlock. Flashing the lock means that at worst it's just a CPU-eating
602             // spin.
603             me->parkingLock.unlock();
604             me->parkingLock.lock();
605         }
606         ASSERT(!me->address || me->address == address);
607         didGetDequeued = !me->address;
608     }
609     
610     if (didGetDequeued) {
611         // Great! We actually got dequeued rather than the timeout expiring.
612         ParkResult result;
613         result.wasUnparked = true;
614         result.token = me->token;
615         return result;
616     }
617
618     // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet.
619
620     bool didDequeue = false;
621     dequeue(
622         address, BucketMode::IgnoreEmpty,
623         [&] (ThreadData* element, bool) {
624             if (element == me) {
625                 didDequeue = true;
626                 return DequeueResult::RemoveAndStop;
627             }
628             return DequeueResult::Ignore;
629         },
630         [] (bool) { });
631     
632     // If didDequeue is true, then we dequeued ourselves. This means that we were not unparked.
633     // If didDequeue is false, then someone unparked us.
634     
635     RELEASE_ASSERT(!me->nextInQueue);
636
637     // Make sure that no matter what, me->address is null after this point.
638     {
639         MutexLocker locker(me->parkingLock);
640         if (!didDequeue) {
641             // If we did not dequeue ourselves, then someone else did. They will set our address to
642             // null. We don't want to proceed until they do this, because otherwise, they may set
643             // our address to null in some distant future when we're already trying to wait for
644             // other things.
645             while (me->address)
646                 me->parkingCondition.wait(me->parkingLock);
647         }
648         me->address = nullptr;
649     }
650
651     ParkResult result;
652     result.wasUnparked = !didDequeue;
653     if (!didDequeue) {
654         // If we were unparked then there should be a token.
655         result.token = me->token;
656     }
657     return result;
658 }
659
660 NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
661 {
662     if (verbose)
663         dataLog(toString(Thread::current(), ": unparking one.\n"));
664     
665     UnparkResult result;
666
667     RefPtr<ThreadData> threadData;
668     result.mayHaveMoreThreads = dequeue(
669         address,
670         // Why is this here?
671         // FIXME: It seems like this could be IgnoreEmpty, but I switched this to EnsureNonEmpty
672         // without explanation in r199760. We need it to use EnsureNonEmpty if we need to perform
673         // some operation while holding the bucket lock, which usually goes into the finish func.
674         // But if that operation is a no-op, then it's not clear why we need this.
675         BucketMode::EnsureNonEmpty,
676         [&] (ThreadData* element, bool) {
677             if (element->address != address)
678                 return DequeueResult::Ignore;
679             threadData = element;
680             result.didUnparkThread = true;
681             return DequeueResult::RemoveAndStop;
682         },
683         [] (bool) { });
684
685     if (!threadData) {
686         ASSERT(!result.didUnparkThread);
687         result.mayHaveMoreThreads = false;
688         return result;
689     }
690     
691     ASSERT(threadData->address);
692     
693     {
694         MutexLocker locker(threadData->parkingLock);
695         threadData->address = nullptr;
696         threadData->token = 0;
697     }
698     threadData->parkingCondition.signal();
699
700     return result;
701 }
702
703 NEVER_INLINE void ParkingLot::unparkOneImpl(
704     const void* address,
705     const ScopedLambda<intptr_t(ParkingLot::UnparkResult)>& callback)
706 {
707     if (verbose)
708         dataLog(toString(Thread::current(), ": unparking one the hard way.\n"));
709     
710     RefPtr<ThreadData> threadData;
711     bool timeToBeFair = false;
712     dequeue(
713         address,
714         BucketMode::EnsureNonEmpty,
715         [&] (ThreadData* element, bool passedTimeToBeFair) {
716             if (element->address != address)
717                 return DequeueResult::Ignore;
718             threadData = element;
719             timeToBeFair = passedTimeToBeFair;
720             return DequeueResult::RemoveAndStop;
721         },
722         [&] (bool mayHaveMoreThreads) {
723             UnparkResult result;
724             result.didUnparkThread = !!threadData;
725             result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
726             if (timeToBeFair)
727                 RELEASE_ASSERT(threadData);
728             result.timeToBeFair = timeToBeFair;
729             intptr_t token = callback(result);
730             if (threadData)
731                 threadData->token = token;
732         });
733
734     if (!threadData)
735         return;
736
737     ASSERT(threadData->address);
738     
739     {
740         MutexLocker locker(threadData->parkingLock);
741         threadData->address = nullptr;
742     }
743     // At this point, the threadData may die. Good thing we have a RefPtr<> on it.
744     threadData->parkingCondition.signal();
745 }
746
747 NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count)
748 {
749     if (!count)
750         return 0;
751     
752     if (verbose)
753         dataLog(toString(Thread::current(), ": unparking count = ", count, " from ", RawPointer(address), ".\n"));
754     
755     Vector<RefPtr<ThreadData>, 8> threadDatas;
756     dequeue(
757         address,
758         // FIXME: It seems like this ought to be EnsureNonEmpty if we follow what unparkOne() does,
759         // but that seems wrong.
760         BucketMode::IgnoreEmpty,
761         [&] (ThreadData* element, bool) {
762             if (verbose)
763                 dataLog(toString(Thread::current(), ": Observing element with address = ", RawPointer(element->address), "\n"));
764             if (element->address != address)
765                 return DequeueResult::Ignore;
766             threadDatas.append(element);
767             if (threadDatas.size() == count)
768                 return DequeueResult::RemoveAndStop;
769             return DequeueResult::RemoveAndContinue;
770         },
771         [] (bool) { });
772
773     for (RefPtr<ThreadData>& threadData : threadDatas) {
774         if (verbose)
775             dataLog(toString(Thread::current(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n"));
776         ASSERT(threadData->address);
777         {
778             MutexLocker locker(threadData->parkingLock);
779             threadData->address = nullptr;
780         }
781         threadData->parkingCondition.signal();
782     }
783
784     if (verbose)
785         dataLog(toString(Thread::current(), ": done unparking.\n"));
786     
787     return threadDatas.size();
788 }
789
790 NEVER_INLINE void ParkingLot::unparkAll(const void* address)
791 {
792     unparkCount(address, UINT_MAX);
793 }
794
795 NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(Thread&, const void*)>& callback)
796 {
797     Vector<Bucket*> bucketsToUnlock = lockHashtable();
798
799     Hashtable* currentHashtable = hashtable.load();
800     for (unsigned i = currentHashtable->size; i--;) {
801         Bucket* bucket = currentHashtable->data[i].load();
802         if (!bucket)
803             continue;
804         for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
805             callback(currentThreadData->thread.get(), currentThreadData->address);
806     }
807     
808     unlockHashtable(bucketsToUnlock);
809 }
810
811 } // namespace WTF
812