Clean up the ParkingLot uparking API a bit
[WebKit-https.git] / Source / WTF / wtf / ParkingLot.cpp
1 /*
2  * Copyright (C) 2015-2016 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL APPLE INC. OR
17  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
24  */
25
26 #include "config.h"
27 #include "ParkingLot.h"
28
29 #include "DataLog.h"
30 #include "HashFunctions.h"
31 #include "StringPrintStream.h"
32 #include "ThreadSpecific.h"
33 #include "ThreadingPrimitives.h"
34 #include "Vector.h"
35 #include "WordLock.h"
36 #include <condition_variable>
37 #include <mutex>
38 #include <thread>
39
40 namespace WTF {
41
42 namespace {
43
44 const bool verbose = false;
45
46 struct ThreadData {
47     WTF_MAKE_FAST_ALLOCATED;
48 public:
49     
50     ThreadData();
51     ~ThreadData();
52
53     ThreadIdentifier threadIdentifier;
54     
55     std::mutex parkingLock;
56     std::condition_variable parkingCondition;
57
58     const void* address { nullptr };
59     
60     ThreadData* nextInQueue { nullptr };
61 };
62
63 enum class DequeueResult {
64     Ignore,
65     RemoveAndContinue,
66     RemoveAndStop
67 };
68
69 struct Bucket {
70     WTF_MAKE_FAST_ALLOCATED;
71 public:
72     void enqueue(ThreadData* data)
73     {
74         if (verbose)
75             dataLog(toString(currentThread(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
76         ASSERT(data->address);
77         ASSERT(!data->nextInQueue);
78         
79         if (queueTail) {
80             queueTail->nextInQueue = data;
81             queueTail = data;
82             return;
83         }
84
85         queueHead = data;
86         queueTail = data;
87     }
88
89     template<typename Functor>
90     void genericDequeue(const Functor& functor)
91     {
92         if (verbose)
93             dataLog(toString(currentThread(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
94         
95         if (!queueHead) {
96             if (verbose)
97                 dataLog(toString(currentThread(), ": empty.\n"));
98             return;
99         }
100
101         // This loop is a very clever abomination. The induction variables are the pointer to the
102         // pointer to the current node, and the pointer to the previous node. This gives us everything
103         // we need to both proceed forward to the next node, and to remove nodes while maintaining the
104         // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head
105         // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then
106         // we'd want queueTail to be set to nullptr. This works because:
107         //
108         //     currentPtr == &queueHead
109         //     previous == nullptr
110         //
111         // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer
112         // that used to point to this node to instead point to this node's successor. Another example:
113         // if we were at the second node in the queue, then we'd have:
114         //
115         //     currentPtr == &queueHead->nextInQueue
116         //     previous == queueHead
117         //
118         // If this node is not equal to queueTail, then removing it simply means making
119         // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm
120         // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set
121         // queueTail to previous, which in this case is queueHead - thus making the queue look like a
122         // proper one-element queue with queueHead == queueTail.
123         bool shouldContinue = true;
124         ThreadData** currentPtr = &queueHead;
125         ThreadData* previous = nullptr;
126         while (shouldContinue) {
127             ThreadData* current = *currentPtr;
128             if (verbose)
129                 dataLog(toString(currentThread(), ": got thread ", RawPointer(current), "\n"));
130             if (!current)
131                 break;
132             DequeueResult result = functor(current);
133             switch (result) {
134             case DequeueResult::Ignore:
135                 if (verbose)
136                     dataLog(toString(currentThread(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
137                 previous = current;
138                 currentPtr = &(*currentPtr)->nextInQueue;
139                 break;
140             case DequeueResult::RemoveAndContinue:
141             case DequeueResult::RemoveAndStop:
142                 if (verbose)
143                     dataLog(toString(currentThread(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
144                 if (current == queueTail)
145                     queueTail = previous;
146                 *currentPtr = current->nextInQueue;
147                 current->nextInQueue = nullptr;
148                 if (result == DequeueResult::RemoveAndStop)
149                     shouldContinue = false;
150                 break;
151             }
152         }
153
154         ASSERT(!!queueHead == !!queueTail);
155     }
156     
157     ThreadData* dequeue()
158     {
159         ThreadData* result = nullptr;
160         genericDequeue(
161             [&] (ThreadData* element) -> DequeueResult {
162                 result = element;
163                 return DequeueResult::RemoveAndStop;
164             });
165         return result;
166     }
167
168     ThreadData* queueHead { nullptr };
169     ThreadData* queueTail { nullptr };
170
171     // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding
172     // this lock.
173     WordLock lock;
174
175     // Put some distane between buckets in memory. This is one of several mitigations against false
176     // sharing.
177     char padding[64];
178 };
179
180 struct Hashtable;
181
182 // We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors.
183 Vector<Hashtable*>* hashtables;
184 StaticWordLock hashtablesLock;
185
186 struct Hashtable {
187     unsigned size;
188     Atomic<Bucket*> data[1];
189
190     static Hashtable* create(unsigned size)
191     {
192         ASSERT(size >= 1);
193         
194         Hashtable* result = static_cast<Hashtable*>(
195             fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
196         result->size = size;
197
198         {
199             // This is not fast and it's not data-access parallel, but that's fine, because
200             // hashtable resizing is guaranteed to be rare and it will never happen in steady
201             // state.
202             WordLockHolder locker(hashtablesLock);
203             if (!hashtables)
204                 hashtables = new Vector<Hashtable*>();
205             hashtables->append(result);
206         }
207         
208         return result;
209     }
210
211     static void destroy(Hashtable* hashtable)
212     {
213         {
214             // This is not fast, but that's OK. See comment in create().
215             WordLockHolder locker(hashtablesLock);
216             hashtables->removeFirst(hashtable);
217         }
218         
219         fastFree(hashtable);
220     }
221 };
222
223 ThreadSpecific<ThreadData>* threadData;
224 Atomic<Hashtable*> hashtable;
225 Atomic<unsigned> numThreads;
226
227 // With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the
228 // memory usage per thread will still be less than 1KB.
229 const unsigned maxLoadFactor = 3;
230
231 const unsigned growthFactor = 2;
232
233 unsigned hashAddress(const void* address)
234 {
235     return WTF::PtrHash<const void*>::hash(address);
236 }
237
238 Hashtable* ensureHashtable()
239 {
240     for (;;) {
241         Hashtable* currentHashtable = hashtable.load();
242
243         if (currentHashtable)
244             return currentHashtable;
245
246         if (!currentHashtable) {
247             currentHashtable = Hashtable::create(maxLoadFactor);
248             if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
249                 if (verbose)
250                     dataLog(toString(currentThread(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
251                 return currentHashtable;
252             }
253
254             Hashtable::destroy(currentHashtable);
255         }
256     }
257 }
258
259 // Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different
260 // after this returns than when you called it. Guarantees that there is a hashtable. This is pretty
261 // slow and not scalable, so it's only used during thread creation and for debugging/testing.
262 Vector<Bucket*> lockHashtable()
263 {
264     for (;;) {
265         Hashtable* currentHashtable = ensureHashtable();
266
267         ASSERT(currentHashtable);
268
269         // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that
270         // we can lock all of the buckets, not just the ones that are materialized.
271         Vector<Bucket*> buckets;
272         for (unsigned i = currentHashtable->size; i--;) {
273             Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];
274
275             for (;;) {
276                 Bucket* bucket = bucketPointer.load();
277
278                 if (!bucket) {
279                     bucket = new Bucket();
280                     if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
281                         delete bucket;
282                         continue;
283                     }
284                 }
285
286                 buckets.append(bucket);
287                 break;
288             }
289         }
290
291         // Now lock the buckets in the right order.
292         std::sort(buckets.begin(), buckets.end());
293         for (Bucket* bucket : buckets)
294             bucket->lock.lock();
295
296         // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it
297         // now.
298         if (hashtable.load() == currentHashtable)
299             return buckets;
300
301         // The hashtable rehashed. Unlock everything and try again.
302         for (Bucket* bucket : buckets)
303             bucket->lock.unlock();
304     }
305 }
306
307 void unlockHashtable(const Vector<Bucket*>& buckets)
308 {
309     for (Bucket* bucket : buckets)
310         bucket->lock.unlock();
311 }
312
313 // Rehash the hashtable to handle numThreads threads.
314 void ensureHashtableSize(unsigned numThreads)
315 {
316     // We try to ensure that the size of the hashtable used for thread queues is always large enough
317     // to avoid collisions. So, since we started a new thread, we may need to increase the size of the
318     // hashtable. This does just that. Note that we never free the old spine, since we never lock
319     // around spine accesses (i.e. the "hashtable" global variable).
320
321     // First do a fast check to see if rehashing is needed.
322     Hashtable* oldHashtable = hashtable.load();
323     if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
324         if (verbose)
325             dataLog(toString(currentThread(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
326         return;
327     }
328
329     // Seems like we *might* have to rehash, so lock the hashtable and try again.
330     Vector<Bucket*> bucketsToUnlock = lockHashtable();
331
332     // Check again, since the hashtable could have rehashed while we were locking it. Also,
333     // lockHashtable() creates an initial hashtable for us.
334     oldHashtable = hashtable.load();
335     if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
336         if (verbose)
337             dataLog(toString(currentThread(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
338         unlockHashtable(bucketsToUnlock);
339         return;
340     }
341
342     Vector<Bucket*> reusableBuckets = bucketsToUnlock;
343
344     // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas
345     // are placed into the vector in queue order.
346     Vector<ThreadData*> threadDatas;
347     for (Bucket* bucket : reusableBuckets) {
348         while (ThreadData* threadData = bucket->dequeue())
349             threadDatas.append(threadData);
350     }
351
352     unsigned newSize = numThreads * growthFactor * maxLoadFactor;
353     RELEASE_ASSERT(newSize > oldHashtable->size);
354     
355     Hashtable* newHashtable = Hashtable::create(newSize);
356     if (verbose)
357         dataLog(toString(currentThread(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
358     for (ThreadData* threadData : threadDatas) {
359         if (verbose)
360             dataLog(toString(currentThread(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
361         unsigned hash = hashAddress(threadData->address);
362         unsigned index = hash % newHashtable->size;
363         if (verbose)
364             dataLog(toString(currentThread(), ": index = ", index, "\n"));
365         Bucket* bucket = newHashtable->data[index].load();
366         if (!bucket) {
367             if (reusableBuckets.isEmpty())
368                 bucket = new Bucket();
369             else
370                 bucket = reusableBuckets.takeLast();
371             newHashtable->data[index].store(bucket);
372         }
373         
374         bucket->enqueue(threadData);
375     }
376     
377     // At this point there may be some buckets left unreused. This could easily happen if the
378     // number of enqueued threads right now is low but the high watermark of the number of threads
379     // enqueued was high. We place these buckets into the hashtable basically at random, just to
380     // make sure we don't leak them.
381     for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
382         Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
383         if (bucketPtr.load())
384             continue;
385         bucketPtr.store(reusableBuckets.takeLast());
386     }
387     
388     // Since we increased the size of the hashtable, we should have exhausted our preallocated
389     // buckets by now.
390     ASSERT(reusableBuckets.isEmpty());
391     
392     // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and
393     // roll. After we install the new hashtable, we can release all bucket locks.
394     
395     bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable);
396     RELEASE_ASSERT(result);
397
398     unlockHashtable(bucketsToUnlock);
399 }
400
401 ThreadData::ThreadData()
402     : threadIdentifier(currentThread())
403 {
404     unsigned currentNumThreads;
405     for (;;) {
406         unsigned oldNumThreads = numThreads.load();
407         currentNumThreads = oldNumThreads + 1;
408         if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
409             break;
410     }
411
412     ensureHashtableSize(currentNumThreads);
413 }
414
415 ThreadData::~ThreadData()
416 {
417     for (;;) {
418         unsigned oldNumThreads = numThreads.load();
419         if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
420             break;
421     }
422 }
423
424 ThreadData* myThreadData()
425 {
426     static std::once_flag initializeOnce;
427     std::call_once(
428         initializeOnce,
429         [] {
430             threadData = new ThreadSpecific<ThreadData>();
431         });
432
433     return *threadData;
434 }
435
436 template<typename Functor>
437 bool enqueue(const void* address, const Functor& functor)
438 {
439     unsigned hash = hashAddress(address);
440
441     for (;;) {
442         Hashtable* myHashtable = ensureHashtable();
443         unsigned index = hash % myHashtable->size;
444         Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
445         Bucket* bucket;
446         for (;;) {
447             bucket = bucketPointer.load();
448             if (!bucket) {
449                 bucket = new Bucket();
450                 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
451                     delete bucket;
452                     continue;
453                 }
454             }
455             break;
456         }
457         if (verbose)
458             dataLog(toString(currentThread(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
459         bucket->lock.lock();
460
461         // At this point the hashtable could have rehashed under us.
462         if (hashtable.load() != myHashtable) {
463             bucket->lock.unlock();
464             continue;
465         }
466
467         ThreadData* threadData = functor();
468         bool result;
469         if (threadData) {
470             if (verbose)
471                 dataLog(toString(currentThread(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
472             bucket->enqueue(threadData);
473             result = true;
474         } else
475             result = false;
476         bucket->lock.unlock();
477         return result;
478     }
479 }
480
481 enum class BucketMode {
482     EnsureNonEmpty,
483     IgnoreEmpty
484 };
485
486 template<typename DequeueFunctor, typename FinishFunctor>
487 bool dequeue(
488     const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
489     const FinishFunctor& finishFunctor)
490 {
491     unsigned hash = hashAddress(address);
492
493     for (;;) {
494         Hashtable* myHashtable = ensureHashtable();
495         unsigned index = hash % myHashtable->size;
496         Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
497         Bucket* bucket = bucketPointer.load();
498         if (!bucket) {
499             if (bucketMode == BucketMode::IgnoreEmpty)
500                 return false;
501
502             for (;;) {
503                 bucket = bucketPointer.load();
504                 if (!bucket) {
505                     bucket = new Bucket();
506                     if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
507                         delete bucket;
508                         continue;
509                     }
510                 }
511                 break;
512             }
513         }
514
515         bucket->lock.lock();
516
517         // At this point the hashtable could have rehashed under us.
518         if (hashtable.load() != myHashtable) {
519             bucket->lock.unlock();
520             continue;
521         }
522
523         bucket->genericDequeue(dequeueFunctor);
524         bool result = !!bucket->queueHead;
525         finishFunctor(result);
526         bucket->lock.unlock();
527         return result;
528     }
529 }
530
531 } // anonymous namespace
532
533 NEVER_INLINE bool ParkingLot::parkConditionallyImpl(
534     const void* address,
535     const ScopedLambda<bool()>& validation,
536     const ScopedLambda<void()>& beforeSleep,
537     Clock::time_point timeout)
538 {
539     if (verbose)
540         dataLog(toString(currentThread(), ": parking.\n"));
541     
542     ThreadData* me = myThreadData();
543
544     // Guard against someone calling parkConditionally() recursively from beforeSleep().
545     RELEASE_ASSERT(!me->address);
546
547     bool result = enqueue(
548         address,
549         [&] () -> ThreadData* {
550             if (!validation())
551                 return nullptr;
552
553             me->address = address;
554             return me;
555         });
556
557     if (!result)
558         return false;
559
560     beforeSleep();
561     
562     bool didGetDequeued;
563     {
564         std::unique_lock<std::mutex> locker(me->parkingLock);
565         while (me->address && Clock::now() < timeout) {
566             // Falling back to wait() works around a bug in libstdc++ implementation of std::condition_variable. See:
567             // - https://bugs.webkit.org/show_bug.cgi?id=148027
568             // - https://gcc.gnu.org/bugzilla/show_bug.cgi?id=58931
569             if (timeout == Clock::time_point::max())
570                 me->parkingCondition.wait(locker);
571             else
572                 me->parkingCondition.wait_until(locker, timeout);
573             
574             // Because of the above, we do this thing, which is hilariously awful, but ensures that the worst case is
575             // a CPU-eating spin but not a deadlock.
576             locker.unlock();
577             locker.lock();
578         }
579         ASSERT(!me->address || me->address == address);
580         didGetDequeued = !me->address;
581     }
582     
583     if (didGetDequeued) {
584         // Great! We actually got dequeued rather than the timeout expiring.
585         return true;
586     }
587
588     // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet.
589
590     // It's possible that we get unparked right here, just before dequeue() grabs a lock. It's
591     // probably worthwhile to detect when this happens, and return true in that case, to ensure
592     // that when we return false it really means that no unpark could have been responsible for us
593     // waking up, and that if an unpark call did happen, it woke someone else up.
594     dequeue(
595         address, BucketMode::IgnoreEmpty,
596         [&] (ThreadData* element) {
597             if (element == me)
598                 return DequeueResult::RemoveAndStop;
599             return DequeueResult::Ignore;
600         },
601         [] (bool) { });
602
603     ASSERT(!me->nextInQueue);
604
605     // Make sure that no matter what, me->address is null after this point.
606     {
607         std::lock_guard<std::mutex> locker(me->parkingLock);
608         me->address = nullptr;
609     }
610
611     // If we were not found in the search above, then we know that someone unparked us.
612     return false;
613 }
614
615 NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
616 {
617     if (verbose)
618         dataLog(toString(currentThread(), ": unparking one.\n"));
619     
620     UnparkResult result;
621
622     ThreadData* threadData = nullptr;
623     result.mayHaveMoreThreads = dequeue(
624         address,
625         BucketMode::EnsureNonEmpty,
626         [&] (ThreadData* element) {
627             if (element->address != address)
628                 return DequeueResult::Ignore;
629             threadData = element;
630             result.didUnparkThread = true;
631             return DequeueResult::RemoveAndStop;
632         },
633         [] (bool) { });
634
635     if (!threadData) {
636         ASSERT(!result.didUnparkThread);
637         result.mayHaveMoreThreads = false;
638         return result;
639     }
640     
641     ASSERT(threadData->address);
642     
643     {
644         std::unique_lock<std::mutex> locker(threadData->parkingLock);
645         threadData->address = nullptr;
646     }
647     threadData->parkingCondition.notify_one();
648
649     return result;
650 }
651
652 NEVER_INLINE void ParkingLot::unparkOneImpl(
653     const void* address,
654     const ScopedLambda<void(ParkingLot::UnparkResult)>& callback)
655 {
656     if (verbose)
657         dataLog(toString(currentThread(), ": unparking one the hard way.\n"));
658
659     ThreadData* threadData = nullptr;
660     dequeue(
661         address,
662         BucketMode::EnsureNonEmpty,
663         [&] (ThreadData* element) {
664             if (element->address != address)
665                 return DequeueResult::Ignore;
666             threadData = element;
667             return DequeueResult::RemoveAndStop;
668         },
669         [&] (bool mayHaveMoreThreads) {
670             UnparkResult result;
671             result.didUnparkThread = !!threadData;
672             result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
673             callback(result);
674         });
675
676     if (!threadData)
677         return;
678
679     ASSERT(threadData->address);
680     
681     {
682         std::unique_lock<std::mutex> locker(threadData->parkingLock);
683         threadData->address = nullptr;
684     }
685     threadData->parkingCondition.notify_one();
686 }
687
688 NEVER_INLINE void ParkingLot::unparkAll(const void* address)
689 {
690     if (verbose)
691         dataLog(toString(currentThread(), ": unparking all from ", RawPointer(address), ".\n"));
692     
693     Vector<ThreadData*, 8> threadDatas;
694     dequeue(
695         address,
696         BucketMode::IgnoreEmpty,
697         [&] (ThreadData* element) {
698             if (verbose)
699                 dataLog(toString(currentThread(), ": Observing element with address = ", RawPointer(element->address), "\n"));
700             if (element->address != address)
701                 return DequeueResult::Ignore;
702             threadDatas.append(element);
703             return DequeueResult::RemoveAndContinue;
704         },
705         [] (bool) { });
706
707     for (ThreadData* threadData : threadDatas) {
708         if (verbose)
709             dataLog(toString(currentThread(), ": unparking ", RawPointer(threadData), " with address ", RawPointer(threadData->address), "\n"));
710         ASSERT(threadData->address);
711         {
712             std::unique_lock<std::mutex> locker(threadData->parkingLock);
713             threadData->address = nullptr;
714         }
715         threadData->parkingCondition.notify_one();
716     }
717
718     if (verbose)
719         dataLog(toString(currentThread(), ": done unparking.\n"));
720 }
721
722 NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(ThreadIdentifier, const void*)>& callback)
723 {
724     Vector<Bucket*> bucketsToUnlock = lockHashtable();
725
726     Hashtable* currentHashtable = hashtable.load();
727     for (unsigned i = currentHashtable->size; i--;) {
728         Bucket* bucket = currentHashtable->data[i].load();
729         if (!bucket)
730             continue;
731         for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
732             callback(currentThreadData->threadIdentifier, currentThreadData->address);
733     }
734     
735     unlockHashtable(bucketsToUnlock);
736 }
737
738 } // namespace WTF
739