Unreviewed, rolling out r220060.
[WebKit.git] / Source / WTF / wtf / ParkingLot.cpp
1 /*
2  * Copyright (C) 2015-2016 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
17  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
24  */
25
26 #include "config.h"
27 #include "ParkingLot.h"
28
29 #include "CurrentTime.h"
30 #include "DataLog.h"
31 #include "HashFunctions.h"
32 #include "StringPrintStream.h"
33 #include "ThreadSpecific.h"
34 #include "ThreadingPrimitives.h"
35 #include "Vector.h"
36 #include "WeakRandom.h"
37 #include "WordLock.h"
38 #include <condition_variable>
39 #include <mutex>
40 #include <thread>
41
42 namespace WTF {
43
44 namespace {
45
46 const bool verbose = false;
47
48 struct ThreadData : public ThreadSafeRefCounted<ThreadData> {
49     WTF_MAKE_FAST_ALLOCATED;
50 public:
51     
52     ThreadData();
53     ~ThreadData();
54
55     Ref<Thread> thread;
56     
57     Mutex parkingLock;
58     ThreadCondition parkingCondition;
59
60     const void* address { nullptr };
61     
62     ThreadData* nextInQueue { nullptr };
63     
64     intptr_t token { 0 };
65 };
66
67 enum class DequeueResult {
68     Ignore,
69     RemoveAndContinue,
70     RemoveAndStop
71 };
72
73 struct Bucket {
74     WTF_MAKE_FAST_ALLOCATED;
75 public:
76     Bucket()
77         : random(static_cast<unsigned>(bitwise_cast<intptr_t>(this))) // Cannot use default seed since that recurses into Lock.
78     {
79     }
80     
81     void enqueue(ThreadData* data)
82     {
83         if (verbose)
84             dataLog(toString(currentThread(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
85         ASSERT(data->address);
86         ASSERT(!data->nextInQueue);
87         
88         if (queueTail) {
89             queueTail->nextInQueue = data;
90             queueTail = data;
91             return;
92         }
93
94         queueHead = data;
95         queueTail = data;
96     }
97
98     template<typename Functor>
99     void genericDequeue(const Functor& functor)
100     {
101         if (verbose)
102             dataLog(toString(currentThread(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
103         
104         if (!queueHead) {
105             if (verbose)
106                 dataLog(toString(currentThread(), ": empty.\n"));
107             return;
108         }
109
110         // This loop is a very clever abomination. The induction variables are the pointer to the
111         // pointer to the current node, and the pointer to the previous node. This gives us everything
112         // we need to both proceed forward to the next node, and to remove nodes while maintaining the
113         // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head
114         // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then
115         // we'd want queueTail to be set to nullptr. This works because:
116         //
117         //     currentPtr == &queueHead
118         //     previous == nullptr
119         //
120         // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer
121         // that used to point to this node to instead point to this node's successor. Another example:
122         // if we were at the second node in the queue, then we'd have:
123         //
124         //     currentPtr == &queueHead->nextInQueue
125         //     previous == queueHead
126         //
127         // If this node is not equal to queueTail, then removing it simply means making
128         // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm
129         // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set
130         // queueTail to previous, which in this case is queueHead - thus making the queue look like a
131         // proper one-element queue with queueHead == queueTail.
132         bool shouldContinue = true;
133         ThreadData** currentPtr = &queueHead;
134         ThreadData* previous = nullptr;
135
136         double time = monotonicallyIncreasingTimeMS();
137         bool timeToBeFair = false;
138         if (time > nextFairTime)
139             timeToBeFair = true;
140         
141         bool didDequeue = false;
142         
143         while (shouldContinue) {
144             ThreadData* current = *currentPtr;
145             if (verbose)
146                 dataLog(toString(currentThread(), ": got thread ", RawPointer(current), "\n"));
147             if (!current)
148                 break;
149             DequeueResult result = functor(current, timeToBeFair);
150             switch (result) {
151             case DequeueResult::Ignore:
152                 if (verbose)
153                     dataLog(toString(currentThread(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
154                 previous = current;
155                 currentPtr = &(*currentPtr)->nextInQueue;
156                 break;
157             case DequeueResult::RemoveAndStop:
158                 shouldContinue = false;
159                 FALLTHROUGH;
160             case DequeueResult::RemoveAndContinue:
161                 if (verbose)
162                     dataLog(toString(currentThread(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
163                 if (current == queueTail)
164                     queueTail = previous;
165                 didDequeue = true;
166                 *currentPtr = current->nextInQueue;
167                 current->nextInQueue = nullptr;
168                 break;
169             }
170         }
171         
172         if (timeToBeFair && didDequeue)
173             nextFairTime = time + random.get();
174
175         ASSERT(!!queueHead == !!queueTail);
176     }
177     
178     ThreadData* dequeue()
179     {
180         ThreadData* result = nullptr;
181         genericDequeue(
182             [&] (ThreadData* element, bool) -> DequeueResult {
183                 result = element;
184                 return DequeueResult::RemoveAndStop;
185             });
186         return result;
187     }
188
189     ThreadData* queueHead { nullptr };
190     ThreadData* queueTail { nullptr };
191
192     // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding
193     // this lock.
194     WordLock lock;
195     
196     double nextFairTime { 0 };
197     
198     WeakRandom random;
199
200     // Put some distane between buckets in memory. This is one of several mitigations against false
201     // sharing.
202     char padding[64];
203 };
204
205 struct Hashtable;
206
207 // We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors.
208 Vector<Hashtable*>* hashtables;
209 StaticWordLock hashtablesLock;
210
211 struct Hashtable {
212     unsigned size;
213     Atomic<Bucket*> data[1];
214
215     static Hashtable* create(unsigned size)
216     {
217         ASSERT(size >= 1);
218         
219         Hashtable* result = static_cast<Hashtable*>(
220             fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
221         result->size = size;
222
223         {
224             // This is not fast and it's not data-access parallel, but that's fine, because
225             // hashtable resizing is guaranteed to be rare and it will never happen in steady
226             // state.
227             WordLockHolder locker(hashtablesLock);
228             if (!hashtables)
229                 hashtables = new Vector<Hashtable*>();
230             hashtables->append(result);
231         }
232         
233         return result;
234     }
235
236     static void destroy(Hashtable* hashtable)
237     {
238         {
239             // This is not fast, but that's OK. See comment in create().
240             WordLockHolder locker(hashtablesLock);
241             hashtables->removeFirst(hashtable);
242         }
243         
244         fastFree(hashtable);
245     }
246 };
247
248 Atomic<Hashtable*> hashtable;
249 Atomic<unsigned> numThreads;
250
251 // With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the
252 // memory usage per thread will still be less than 1KB.
253 const unsigned maxLoadFactor = 3;
254
255 const unsigned growthFactor = 2;
256
257 unsigned hashAddress(const void* address)
258 {
259     return WTF::PtrHash<const void*>::hash(address);
260 }
261
262 Hashtable* ensureHashtable()
263 {
264     for (;;) {
265         Hashtable* currentHashtable = hashtable.load();
266
267         if (currentHashtable)
268             return currentHashtable;
269
270         if (!currentHashtable) {
271             currentHashtable = Hashtable::create(maxLoadFactor);
272             if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
273                 if (verbose)
274                     dataLog(toString(currentThread(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
275                 return currentHashtable;
276             }
277
278             Hashtable::destroy(currentHashtable);
279         }
280     }
281 }
282
283 // Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different
284 // after this returns than when you called it. Guarantees that there is a hashtable. This is pretty
285 // slow and not scalable, so it's only used during thread creation and for debugging/testing.
286 Vector<Bucket*> lockHashtable()
287 {
288     for (;;) {
289         Hashtable* currentHashtable = ensureHashtable();
290
291         ASSERT(currentHashtable);
292
293         // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that
294         // we can lock all of the buckets, not just the ones that are materialized.
295         Vector<Bucket*> buckets;
296         for (unsigned i = currentHashtable->size; i--;) {
297             Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];
298
299             for (;;) {
300                 Bucket* bucket = bucketPointer.load();
301
302                 if (!bucket) {
303                     bucket = new Bucket();
304                     if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
305                         delete bucket;
306                         continue;
307                     }
308                 }
309
310                 buckets.append(bucket);
311                 break;
312             }
313         }
314
315         // Now lock the buckets in the right order.
316         std::sort(buckets.begin(), buckets.end());
317         for (Bucket* bucket : buckets)
318             bucket->lock.lock();
319
320         // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it
321         // now.
322         if (hashtable.load() == currentHashtable)
323             return buckets;
324
325         // The hashtable rehashed. Unlock everything and try again.
326         for (Bucket* bucket : buckets)
327             bucket->lock.unlock();
328     }
329 }
330
331 void unlockHashtable(const Vector<Bucket*>& buckets)
332 {
333     for (Bucket* bucket : buckets)
334         bucket->lock.unlock();
335 }
336
337 // Rehash the hashtable to handle numThreads threads.
338 void ensureHashtableSize(unsigned numThreads)
339 {
340     // We try to ensure that the size of the hashtable used for thread queues is always large enough
341     // to avoid collisions. So, since we started a new thread, we may need to increase the size of the
342     // hashtable. This does just that. Note that we never free the old spine, since we never lock
343     // around spine accesses (i.e. the "hashtable" global variable).
344
345     // First do a fast check to see if rehashing is needed.
346     Hashtable* oldHashtable = hashtable.load();
347     if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
348         if (verbose)
349             dataLog(toString(currentThread(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
350         return;
351     }
352
353     // Seems like we *might* have to rehash, so lock the hashtable and try again.
354     Vector<Bucket*> bucketsToUnlock = lockHashtable();
355
356     // Check again, since the hashtable could have rehashed while we were locking it. Also,
357     // lockHashtable() creates an initial hashtable for us.
358     oldHashtable = hashtable.load();
359     if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
360         if (verbose)
361             dataLog(toString(currentThread(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
362         unlockHashtable(bucketsToUnlock);
363         return;
364     }
365
366     Vector<Bucket*> reusableBuckets = bucketsToUnlock;
367
368     // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas
369     // are placed into the vector in queue order.
370     Vector<ThreadData*> threadDatas;
371     for (Bucket* bucket : reusableBuckets) {
372         while (ThreadData* threadData = bucket->dequeue())
373             threadDatas.append(threadData);
374     }
375
376     unsigned newSize = numThreads * growthFactor * maxLoadFactor;
377     RELEASE_ASSERT(newSize > oldHashtable->size);
378     
379     Hashtable* newHashtable = Hashtable::create(newSize);
380     if (verbose)
381         dataLog(toString(currentThread(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
382     for (ThreadData* threadData : threadDatas) {
383         if (verbose)
384             dataLog(toString(currentThread(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
385         unsigned hash = hashAddress(threadData->address);
386         unsigned index = hash % newHashtable->size;
387         if (verbose)
388             dataLog(toString(currentThread(), ": index = ", index, "\n"));
389         Bucket* bucket = newHashtable->data[index].load();
390         if (!bucket) {
391             if (reusableBuckets.isEmpty())
392                 bucket = new Bucket();
393             else
394                 bucket = reusableBuckets.takeLast();
395             newHashtable->data[index].store(bucket);
396         }
397         
398         bucket->enqueue(threadData);
399     }
400     
401     // At this point there may be some buckets left unreused. This could easily happen if the
402     // number of enqueued threads right now is low but the high watermark of the number of threads
403     // enqueued was high. We place these buckets into the hashtable basically at random, just to
404     // make sure we don't leak them.
405     for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
406         Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
407         if (bucketPtr.load())
408             continue;
409         bucketPtr.store(reusableBuckets.takeLast());
410     }
411     
412     // Since we increased the size of the hashtable, we should have exhausted our preallocated
413     // buckets by now.
414     ASSERT(reusableBuckets.isEmpty());
415     
416     // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and
417     // roll. After we install the new hashtable, we can release all bucket locks.
418     
419     bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable;
420     RELEASE_ASSERT(result);
421
422     unlockHashtable(bucketsToUnlock);
423 }
424
425 ThreadData::ThreadData()
426     : thread(Thread::current())
427 {
428     unsigned currentNumThreads;
429     for (;;) {
430         unsigned oldNumThreads = numThreads.load();
431         currentNumThreads = oldNumThreads + 1;
432         if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
433             break;
434     }
435
436     ensureHashtableSize(currentNumThreads);
437 }
438
439 ThreadData::~ThreadData()
440 {
441     for (;;) {
442         unsigned oldNumThreads = numThreads.load();
443         if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
444             break;
445     }
446 }
447
448 ThreadData* myThreadData()
449 {
450     static ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>* threadData;
451     static std::once_flag initializeOnce;
452     std::call_once(
453         initializeOnce,
454         [] {
455             threadData = new ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>();
456         });
457     
458     RefPtr<ThreadData>& result = **threadData;
459     
460     if (!result)
461         result = adoptRef(new ThreadData());
462     
463     return result.get();
464 }
465
466 template<typename Functor>
467 bool enqueue(const void* address, const Functor& functor)
468 {
469     unsigned hash = hashAddress(address);
470
471     for (;;) {
472         Hashtable* myHashtable = ensureHashtable();
473         unsigned index = hash % myHashtable->size;
474         Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
475         Bucket* bucket;
476         for (;;) {
477             bucket = bucketPointer.load();
478             if (!bucket) {
479                 bucket = new Bucket();
480                 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
481                     delete bucket;
482                     continue;
483                 }
484             }
485             break;
486         }
487         if (verbose)
488             dataLog(toString(currentThread(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
489         bucket->lock.lock();
490
491         // At this point the hashtable could have rehashed under us.
492         if (hashtable.load() != myHashtable) {
493             bucket->lock.unlock();
494             continue;
495         }
496
497         ThreadData* threadData = functor();
498         bool result;
499         if (threadData) {
500             if (verbose)
501                 dataLog(toString(currentThread(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
502             bucket->enqueue(threadData);
503             result = true;
504         } else
505             result = false;
506         bucket->lock.unlock();
507         return result;
508     }
509 }
510
511 enum class BucketMode {
512     EnsureNonEmpty,
513     IgnoreEmpty
514 };
515
516 template<typename DequeueFunctor, typename FinishFunctor>
517 bool dequeue(
518     const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
519     const FinishFunctor& finishFunctor)
520 {
521     unsigned hash = hashAddress(address);
522
523     for (;;) {
524         Hashtable* myHashtable = ensureHashtable();
525         unsigned index = hash % myHashtable->size;
526         Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
527         Bucket* bucket = bucketPointer.load();
528         if (!bucket) {
529             if (bucketMode == BucketMode::IgnoreEmpty)
530                 return false;
531
532             for (;;) {
533                 bucket = bucketPointer.load();
534                 if (!bucket) {
535                     bucket = new Bucket();
536                     if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
537                         delete bucket;
538                         continue;
539                     }
540                 }
541                 break;
542             }
543         }
544
545         bucket->lock.lock();
546
547         // At this point the hashtable could have rehashed under us.
548         if (hashtable.load() != myHashtable) {
549             bucket->lock.unlock();
550             continue;
551         }
552
553         bucket->genericDequeue(dequeueFunctor);
554         bool result = !!bucket->queueHead;
555         finishFunctor(result);
556         bucket->lock.unlock();
557         return result;
558     }
559 }
560
561 } // anonymous namespace
562
563 NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl(
564     const void* address,
565     const ScopedLambda<bool()>& validation,
566     const ScopedLambda<void()>& beforeSleep,
567     const TimeWithDynamicClockType& timeout)
568 {
569     if (verbose)
570         dataLog(toString(currentThread(), ": parking.\n"));
571     
572     ThreadData* me = myThreadData();
573     me->token = 0;
574
575     // Guard against someone calling parkConditionally() recursively from beforeSleep().
576     RELEASE_ASSERT(!me->address);
577
578     bool enqueueResult = enqueue(
579         address,
580         [&] () -> ThreadData* {
581             if (!validation())
582                 return nullptr;
583
584             me->address = address;
585             return me;
586         });
587
588     if (!enqueueResult)
589         return ParkResult();
590
591     beforeSleep();
592     
593     bool didGetDequeued;
594     {
595         MutexLocker locker(me->parkingLock);
596         while (me->address && timeout.nowWithSameClock() < timeout) {
597             me->parkingCondition.timedWait(
598                 me->parkingLock, timeout.approximateWallTime().secondsSinceEpoch().value());
599             
600             // It's possible for the OS to decide not to wait. If it does that then it will also
601             // decide not to release the lock. If there's a bug in the time math, then this could
602             // result in a deadlock. Flashing the lock means that at worst it's just a CPU-eating
603             // spin.
604             me->parkingLock.unlock();
605             me->parkingLock.lock();
606         }
607         ASSERT(!me->address || me->address == address);
608         didGetDequeued = !me->address;
609     }
610     
611     if (didGetDequeued) {
612         // Great! We actually got dequeued rather than the timeout expiring.
613         ParkResult result;
614         result.wasUnparked = true;
615         result.token = me->token;
616         return result;
617     }
618
619     // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet.
620
621     bool didDequeue = false;
622     dequeue(
623         address, BucketMode::IgnoreEmpty,
624         [&] (ThreadData* element, bool) {
625             if (element == me) {
626                 didDequeue = true;
627                 return DequeueResult::RemoveAndStop;
628             }
629             return DequeueResult::Ignore;
630         },
631         [] (bool) { });
632     
633     // If didDequeue is true, then we dequeued ourselves. This means that we were not unparked.
634     // If didDequeue is false, then someone unparked us.
635     
636     RELEASE_ASSERT(!me->nextInQueue);
637
638     // Make sure that no matter what, me->address is null after this point.
639     {
640         MutexLocker locker(me->parkingLock);
641         if (!didDequeue) {
642             // If we did not dequeue ourselves, then someone else did. They will set our address to
643             // null. We don't want to proceed until they do this, because otherwise, they may set
644             // our address to null in some distant future when we're already trying to wait for
645             // other things.
646             while (me->address)
647                 me->parkingCondition.wait(me->parkingLock);
648         }
649         me->address = nullptr;
650     }
651
652     ParkResult result;
653     result.wasUnparked = !didDequeue;
654     if (!didDequeue) {
655         // If we were unparked then there should be a token.
656         result.token = me->token;
657     }
658     return result;
659 }
660
661 NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
662 {
663     if (verbose)
664         dataLog(toString(currentThread(), ": unparking one.\n"));
665     
666     UnparkResult result;
667
668     RefPtr<ThreadData> threadData;
669     result.mayHaveMoreThreads = dequeue(
670         address,
671         // Why is this here?
672         // FIXME: It seems like this could be IgnoreEmpty, but I switched this to EnsureNonEmpty
673         // without explanation in r199760. We need it to use EnsureNonEmpty if we need to perform
674         // some operation while holding the bucket lock, which usually goes into the finish func.
675         // But if that operation is a no-op, then it's not clear why we need this.
676         BucketMode::EnsureNonEmpty,
677         [&] (ThreadData* element, bool) {
678             if (element->address != address)
679                 return DequeueResult::Ignore;
680             threadData = element;
681             result.didUnparkThread = true;
682             return DequeueResult::RemoveAndStop;
683         },
684         [] (bool) { });
685
686     if (!threadData) {
687         ASSERT(!result.didUnparkThread);
688         result.mayHaveMoreThreads = false;
689         return result;
690     }
691     
692     ASSERT(threadData->address);
693     
694     {
695         MutexLocker locker(threadData->parkingLock);
696         threadData->address = nullptr;
697         threadData->token = 0;
698     }
699     threadData->parkingCondition.signal();
700
701     return result;
702 }
703
704 NEVER_INLINE void ParkingLot::unparkOneImpl(
705     const void* address,
706     const ScopedLambda<intptr_t(ParkingLot::UnparkResult)>& callback)
707 {
708     if (verbose)
709         dataLog(toString(currentThread(), ": unparking one the hard way.\n"));
710     
711     RefPtr<ThreadData> threadData;
712     bool timeToBeFair = false;
713     dequeue(
714         address,
715         BucketMode::EnsureNonEmpty,
716         [&] (ThreadData* element, bool passedTimeToBeFair) {
717             if (element->address != address)
718                 return DequeueResult::Ignore;
719             threadData = element;
720             timeToBeFair = passedTimeToBeFair;
721             return DequeueResult::RemoveAndStop;
722         },
723         [&] (bool mayHaveMoreThreads) {
724             UnparkResult result;
725             result.didUnparkThread = !!threadData;
726             result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
727             if (timeToBeFair)
728                 RELEASE_ASSERT(threadData);
729             result.timeToBeFair = timeToBeFair;
730             intptr_t token = callback(result);
731             if (threadData)
732                 threadData->token = token;
733         });
734
735     if (!threadData)
736         return;
737
738     ASSERT(threadData->address);
739     
740     {
741         MutexLocker locker(threadData->parkingLock);
742         threadData->address = nullptr;
743     }
744     // At this point, the threadData may die. Good thing we have a RefPtr<> on it.
745     threadData->parkingCondition.signal();
746 }
747
748 NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count)
749 {
750     if (!count)
751         return 0;
752     
753     if (verbose)
754         dataLog(toString(currentThread(), ": unparking count = ", count, " from ", RawPointer(address), ".\n"));
755     
756     Vector<RefPtr<ThreadData>, 8> threadDatas;
757     dequeue(
758         address,
759         // FIXME: It seems like this ought to be EnsureNonEmpty if we follow what unparkOne() does,
760         // but that seems wrong.
761         BucketMode::IgnoreEmpty,
762         [&] (ThreadData* element, bool) {
763             if (verbose)
764                 dataLog(toString(currentThread(), ": Observing element with address = ", RawPointer(element->address), "\n"));
765             if (element->address != address)
766                 return DequeueResult::Ignore;
767             threadDatas.append(element);
768             if (threadDatas.size() == count)
769                 return DequeueResult::RemoveAndStop;
770             return DequeueResult::RemoveAndContinue;
771         },
772         [] (bool) { });
773
774     for (RefPtr<ThreadData>& threadData : threadDatas) {
775         if (verbose)
776             dataLog(toString(currentThread(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n"));
777         ASSERT(threadData->address);
778         {
779             MutexLocker locker(threadData->parkingLock);
780             threadData->address = nullptr;
781         }
782         threadData->parkingCondition.signal();
783     }
784
785     if (verbose)
786         dataLog(toString(currentThread(), ": done unparking.\n"));
787     
788     return threadDatas.size();
789 }
790
791 NEVER_INLINE void ParkingLot::unparkAll(const void* address)
792 {
793     unparkCount(address, UINT_MAX);
794 }
795
796 NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(Thread&, const void*)>& callback)
797 {
798     Vector<Bucket*> bucketsToUnlock = lockHashtable();
799
800     Hashtable* currentHashtable = hashtable.load();
801     for (unsigned i = currentHashtable->size; i--;) {
802         Bucket* bucket = currentHashtable->data[i].load();
803         if (!bucket)
804             continue;
805         for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
806             callback(currentThreadData->thread.get(), currentThreadData->address);
807     }
808     
809     unlockHashtable(bucketsToUnlock);
810 }
811
812 } // namespace WTF
813