Make it possible for apps that use both WK1 and WK2 to use MessagePorts.
[WebKit-https.git] / Source / WebCore / dom / messageports / MessagePortChannel.cpp
1 /*
2  * Copyright (C) 2018 Apple Inc. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  *    notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  *    notice, this list of conditions and the following disclaimer in the
11  *    documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23  * THE POSSIBILITY OF SUCH DAMAGE.
24  */
25
26 #include "config.h"
27 #include "MessagePortChannel.h"
28
29 #include "Logging.h"
30 #include "MessagePortChannelRegistry.h"
31 #include <wtf/CompletionHandler.h>
32 #include <wtf/MainThread.h>
33
34 namespace WebCore {
35
36 Ref<MessagePortChannel> MessagePortChannel::create(MessagePortChannelRegistry& registry, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
37 {
38     return adoptRef(*new MessagePortChannel(registry, port1, port2));
39 }
40
41 MessagePortChannel::MessagePortChannel(MessagePortChannelRegistry& registry, const MessagePortIdentifier& port1, const MessagePortIdentifier& port2)
42     : m_registry(registry)
43 {
44     ASSERT(isMainThread());
45
46     relaxAdoptionRequirement();
47
48     m_ports[0] = port1;
49     m_processes[0] = port1.processIdentifier;
50     m_entangledToProcessProtectors[0] = this;
51     m_ports[1] = port2;
52     m_processes[1] = port2.processIdentifier;
53     m_entangledToProcessProtectors[1] = this;
54
55     m_registry.messagePortChannelCreated(*this);
56 }
57
58 MessagePortChannel::~MessagePortChannel()
59 {
60     m_registry.messagePortChannelDestroyed(*this);
61 }
62
63 std::optional<ProcessIdentifier> MessagePortChannel::processForPort(const MessagePortIdentifier& port)
64 {
65     ASSERT(isMainThread());
66     ASSERT(port == m_ports[0] || port == m_ports[1]);
67     size_t i = port == m_ports[0] ? 0 : 1;
68     return m_processes[i];
69 }
70
71 bool MessagePortChannel::includesPort(const MessagePortIdentifier& port)
72 {
73     ASSERT(isMainThread());
74
75     return m_ports[0] == port || m_ports[1] == port;
76 }
77
78 void MessagePortChannel::entanglePortWithProcess(const MessagePortIdentifier& port, ProcessIdentifier process)
79 {
80     ASSERT(isMainThread());
81
82     ASSERT(port == m_ports[0] || port == m_ports[1]);
83     size_t i = port == m_ports[0] ? 0 : 1;
84
85     LOG(MessagePorts, "MessagePortChannel %s (%p) entangling port %s (that port has %zu messages available)", logString().utf8().data(), this, port.logString().utf8().data(), m_pendingMessages[i].size());
86
87     ASSERT(!m_processes[i] || *m_processes[i] == process);
88     m_processes[i] = process;
89     m_entangledToProcessProtectors[i] = this;
90 }
91
92 void MessagePortChannel::disentanglePort(const MessagePortIdentifier& port)
93 {
94     ASSERT(isMainThread());
95
96     LOG(MessagePorts, "MessagePortChannel %s (%p) disentangling port %s", logString().utf8().data(), this, port.logString().utf8().data());
97
98     ASSERT(port == m_ports[0] || port == m_ports[1]);
99     size_t i = port == m_ports[0] ? 0 : 1;
100
101     ASSERT(m_processes[i] || m_isClosed[i]);
102     m_processes[i] = std::nullopt;
103
104     // This set of steps is to guarantee that the lock is unlocked before the
105     // last ref to this object is released.
106     auto protectedThis = WTFMove(m_entangledToProcessProtectors[i]);
107 }
108
109 void MessagePortChannel::closePort(const MessagePortIdentifier& port)
110 {
111     ASSERT(isMainThread());
112
113     ASSERT(port == m_ports[0] || port == m_ports[1]);
114     size_t i = port == m_ports[0] ? 0 : 1;
115
116     m_processes[i] = std::nullopt;
117     m_isClosed[i] = true;
118
119     // This set of steps is to guarantee that the lock is unlocked before the
120     // last ref to this object is released.
121     auto protectedThis = makeRef(*this);
122
123     m_pendingMessages[i].clear();
124     m_pendingMessagePortTransfers[i].clear();
125     m_pendingMessageProtectors[i] = nullptr;
126     m_entangledToProcessProtectors[i] = nullptr;
127 }
128
129 bool MessagePortChannel::postMessageToRemote(MessageWithMessagePorts&& message, const MessagePortIdentifier& remoteTarget)
130 {
131     ASSERT(isMainThread());
132
133     ASSERT(remoteTarget == m_ports[0] || remoteTarget == m_ports[1]);
134     size_t i = remoteTarget == m_ports[0] ? 0 : 1;
135
136     for (auto& channelPair : message.transferredPorts) {
137         auto* channel = m_registry.existingChannelContainingPort(channelPair.first);
138         // One of the ports in the channel might have been closed, therefore removing record of the channel.
139         // That's okay; such ports can still be transferred. We just don't have to protect the channel.
140         if (!channel)
141             continue;
142
143         ASSERT(channel->includesPort(channelPair.second));
144
145 #ifndef NDEBUG
146         if (auto* otherChannel = m_registry.existingChannelContainingPort(channelPair.second))
147             ASSERT(channel == otherChannel);
148 #endif
149         // Having a pending message should keep a port alive with a ref.
150         // The ref will be cleared after the batch of pending messages has been delivered.
151         m_pendingMessagePortTransfers[i].add(channel);
152     }
153
154     m_pendingMessages[i].append(WTFMove(message));
155     LOG(MessagePorts, "MessagePortChannel %s (%p) now has %zu messages pending on port %s", logString().utf8().data(), this, m_pendingMessages[i].size(), remoteTarget.logString().utf8().data());
156
157     if (m_pendingMessages[i].size() == 1) {
158         m_pendingMessageProtectors[i] = this;
159         return true;
160     }
161
162     ASSERT(m_pendingMessageProtectors[i] == this);
163     return false;
164 }
165
166 void MessagePortChannel::takeAllMessagesForPort(const MessagePortIdentifier& port, Function<void(Vector<MessageWithMessagePorts>&&, Function<void()>&&)>&& callback)
167 {
168     ASSERT(isMainThread());
169
170     LOG(MessagePorts, "MessagePortChannel %p taking all messages for port %s", this, port.logString().utf8().data());
171
172     ASSERT(port == m_ports[0] || port == m_ports[1]);
173     size_t i = port == m_ports[0] ? 0 : 1;
174
175     if (m_pendingMessages[i].isEmpty()) {
176         callback({ }, [] { });
177         return;
178     }
179
180     ASSERT(m_pendingMessageProtectors[i]);
181
182     Vector<MessageWithMessagePorts> result;
183     result.swap(m_pendingMessages[i]);
184
185     ++m_messageBatchesInFlight;
186
187     LOG(MessagePorts, "There are %zu messages to take for port %s. Taking them now, messages in flight is now %" PRIu64, result.size(), port.logString().utf8().data(), m_messageBatchesInFlight);
188
189     auto size = result.size();
190     HashSet<RefPtr<MessagePortChannel>> transferredPortProtectors;
191     transferredPortProtectors.swap(m_pendingMessagePortTransfers[i]);
192
193     callback(WTFMove(result), [size, this, port, protectedThis = WTFMove(m_pendingMessageProtectors[i]), transferredPortProtectors = WTFMove(transferredPortProtectors)] {
194         UNUSED_PARAM(port);
195 #if LOG_DISABLED
196         UNUSED_PARAM(size);
197 #endif
198         --m_messageBatchesInFlight;
199         LOG(MessagePorts, "Message port channel %s was notified that a batch of %zu message port messages targeted for port %s just completed dispatch, in flight is now %" PRIu64, logString().utf8().data(), size, port.logString().utf8().data(), m_messageBatchesInFlight);
200
201     });
202 }
203
204 void MessagePortChannel::checkRemotePortForActivity(const MessagePortIdentifier& remotePort, CompletionHandler<void(MessagePortChannelProvider::HasActivity)>&& callback)
205 {
206     ASSERT(isMainThread());
207     ASSERT(remotePort == m_ports[0] || remotePort == m_ports[1]);
208
209     // If the remote port is closed there is no pending activity.
210     size_t i = remotePort == m_ports[0] ? 0 : 1;
211     if (m_isClosed[i]) {
212         callback(MessagePortChannelProvider::HasActivity::No);
213         return;
214     }
215
216     // If there are any messages in flight between the ports, there is pending activity.
217     if (hasAnyMessagesPendingOrInFlight()) {
218         callback(MessagePortChannelProvider::HasActivity::Yes);
219         return;
220     }
221
222     // If the port is not currently in a process then it's being transferred as part of a postMessage.
223     // We treat these ports as if they do have activity since they will be revived when the message is delivered.
224     if (!m_processes[i]) {
225         callback(MessagePortChannelProvider::HasActivity::Yes);
226         return;
227     }
228
229     auto outerCallback = CompletionHandler<void(MessagePortChannelProvider::HasActivity)> { [this, protectedThis = makeRef(*this), callback = WTFMove(callback)] (MessagePortChannelProvider::HasActivity hasActivity) {
230         if (hasActivity == MessagePortChannelProvider::HasActivity::Yes) {
231             callback(hasActivity);
232             return;
233         }
234
235         // If the remote port said it had no activity, check again for any messages that might be in flight.
236         // This is because it might have asynchronously sent a message just before it was asked about local activity.
237         if (hasAnyMessagesPendingOrInFlight())
238             hasActivity = MessagePortChannelProvider::HasActivity::Yes;
239
240         callback(hasActivity);
241     } };
242
243     m_registry.provider().checkProcessLocalPortForActivity(remotePort, *m_processes[i], WTFMove(outerCallback));
244 }
245
246 bool MessagePortChannel::hasAnyMessagesPendingOrInFlight() const
247 {
248     ASSERT(isMainThread());
249     return m_messageBatchesInFlight || !m_pendingMessages[0].isEmpty() || !m_pendingMessages[1].isEmpty();
250 }
251
252 } // namespace WebCore