1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
mojo / core / ports / node.h [blame]
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_NODE_H_
#define MOJO_CORE_PORTS_NODE_H_
#include <stddef.h>
#include <stdint.h>
#include <queue>
#include <unordered_map>
#include "base/component_export.h"
#include "base/containers/flat_map.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/scoped_refptr.h"
#include "base/synchronization/lock.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/name.h"
#include "mojo/core/ports/port.h"
#include "mojo/core/ports/port_ref.h"
#include "mojo/core/ports/user_data.h"
namespace mojo {
namespace core {
namespace ports {
enum : int {
OK = 0,
ERROR_PORT_UNKNOWN = -10,
ERROR_PORT_EXISTS = -11,
ERROR_PORT_STATE_UNEXPECTED = -12,
ERROR_PORT_CANNOT_SEND_SELF = -13,
ERROR_PORT_PEER_CLOSED = -14,
ERROR_PORT_CANNOT_SEND_PEER = -15,
ERROR_NOT_IMPLEMENTED = -100,
};
struct PortStatus {
bool has_messages;
bool receiving_messages;
bool peer_closed;
bool peer_remote;
size_t queued_message_count;
size_t queued_num_bytes;
size_t unacknowledged_message_count;
};
struct PendingUpdatePreviousPeer {
NodeName receiver;
PortName port;
PortName from_port;
uint64_t sequence_num;
NodeName new_prev_node;
PortName new_prev_port;
};
class MessageFilter;
class NodeDelegate;
// A Node maintains a collection of Ports (see port.h) indexed by unique 128-bit
// addresses (names), performing routing and processing of events among the
// Ports within the Node and to or from other Nodes in the system. Typically
// (and practically, in all uses today) there is a single Node per system
// process. Thus a Node boundary effectively models a process boundary.
//
// New Ports can be created uninitialized using CreateUninitializedPort (and
// later initialized using InitializePort), or created in a fully initialized
// state using CreatePortPair(). Initialized ports have exactly one conjugate
// port which is the ultimate receiver of any user messages sent by that port.
// See SendUserMessage().
//
// In addition to routing user message events, various control events are used
// by Nodes to coordinate Port behavior and lifetime within and across Nodes.
// See Event documentation for description of different types of events used by
// a Node to coordinate behavior.
class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
public:
enum class ShutdownPolicy {
DONT_ALLOW_LOCAL_PORTS,
ALLOW_LOCAL_PORTS,
};
// Does not take ownership of the delegate.
Node(const NodeName& name, NodeDelegate* delegate);
Node(const Node&) = delete;
Node& operator=(const Node&) = delete;
~Node();
// Returns true iff there are no open ports referring to another node or ports
// in the process of being transferred from this node to another. If this
// returns false, then to ensure clean shutdown, it is necessary to keep the
// node alive and continue routing messages to it via AcceptMessage. This
// method may be called again after AcceptMessage to check if the Node is now
// ready to be destroyed.
//
// If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return
// |true| even if some ports remain alive, as long as none of them are proxies
// to another node.
bool CanShutdownCleanly(
ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS);
// Lookup the named port.
int GetPort(const PortName& port_name, PortRef* port_ref);
// Creates a port on this node. Before the port can be used, it must be
// initialized using InitializePort. This method is useful for bootstrapping
// a connection between two nodes. Generally, ports are created using
// CreatePortPair instead.
int CreateUninitializedPort(PortRef* port_ref);
// Initializes a newly created port.
int InitializePort(const PortRef& port_ref,
const NodeName& peer_node_name,
const PortName& peer_port_name,
const NodeName& prev_node_name,
const PortName& prev_port_name);
// Generates a new connected pair of ports bound to this node. These ports
// are initialized and ready to go.
int CreatePortPair(PortRef* port0_ref, PortRef* port1_ref);
// User data associated with the port.
int SetUserData(const PortRef& port_ref, scoped_refptr<UserData> user_data);
int GetUserData(const PortRef& port_ref, scoped_refptr<UserData>* user_data);
// Prevents further messages from being sent from this port or delivered to
// this port. The port is removed, and the port's peer is notified of the
// closure after it has consumed all pending messages.
int ClosePort(const PortRef& port_ref);
// Returns the current status of the port.
int GetStatus(const PortRef& port_ref, PortStatus* port_status);
// Returns the next available message on the specified port or returns a null
// message if there are none available. Returns ERROR_PORT_PEER_CLOSED to
// indicate that this port's peer has closed. In such cases GetMessage may
// be called until it yields a null message, indicating that no more messages
// may be read from the port.
//
// If |filter| is non-null, the next available message is returned only if it
// is matched by the filter. If the provided filter does not match the next
// available message, GetMessage() behaves as if there is no message
// available. Ownership of |filter| is not taken, and it must outlive the
// extent of this call.
int GetMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter);
// Sends a message from the specified port to its peer. Note that the message
// notification may arrive synchronously (via PortStatusChanged() on the
// delegate) if the peer is local to this Node.
int SendUserMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent> message);
// Makes the port send acknowledge requests to its conjugate to acknowledge
// at least every |sequence_number_acknowledge_interval| messages as they're
// read from the conjugate. The number of unacknowledged messages is exposed
// in the |unacknowledged_message_count| field of PortStatus. This allows
// bounding the number of unread and/or in-transit messages from this port
// to its conjugate between zero and |unacknowledged_message_count|.
int SetAcknowledgeRequestInterval(
const PortRef& port_ref,
uint64_t sequence_number_acknowledge_interval);
// Corresponding to NodeDelegate::ForwardEvent.
int AcceptEvent(const NodeName& from_node, ScopedEvent event);
// Called to merge two ports with each other. If you have two independent
// port pairs A <=> B and C <=> D, the net result of merging B and C is a
// single connected port pair A <=> D.
//
// Note that the behavior of this operation is undefined if either port to be
// merged (B or C above) has ever been read from or written to directly, and
// this must ONLY be called on one side of the merge, though it doesn't matter
// which side.
//
// It is safe for the non-merged peers (A and D above) to be transferred,
// closed, and/or written to before, during, or after the merge.
int MergePorts(const PortRef& port_ref,
const NodeName& destination_node_name,
const PortName& destination_port_name);
// Like above but merges two ports local to this node. Because both ports are
// local this can also verify that neither port has been written to before the
// merge. If this fails for any reason, both ports are closed. Otherwise OK
// is returned and the ports' receiving peers are connected to each other.
int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref);
// Called to inform this node that communication with another node is lost
// indefinitely. This triggers cleanup of ports bound to this node.
int LostConnectionToNode(const NodeName& node_name);
private:
// Helper to ensure that a Node always calls into its delegate safely, i.e.
// without holding any internal locks.
class DelegateHolder {
public:
DelegateHolder(Node* node, NodeDelegate* delegate);
DelegateHolder(const DelegateHolder&) = delete;
DelegateHolder& operator=(const DelegateHolder&) = delete;
~DelegateHolder();
NodeDelegate* operator->() const {
EnsureSafeDelegateAccess();
return delegate_;
}
private:
#if DCHECK_IS_ON()
void EnsureSafeDelegateAccess() const;
#else
void EnsureSafeDelegateAccess() const {}
#endif
const raw_ptr<Node, DanglingUntriaged> node_;
const raw_ptr<NodeDelegate, DanglingUntriaged> delegate_;
};
int OnUserMessage(const PortRef& port_ref,
const NodeName& from_node,
std::unique_ptr<UserMessageEvent> message);
int OnPortAccepted(const PortRef& port_ref,
std::unique_ptr<PortAcceptedEvent> event);
int OnObserveProxy(const PortRef& port_ref,
std::unique_ptr<ObserveProxyEvent> event);
int OnObserveProxyAck(const PortRef& port_ref,
std::unique_ptr<ObserveProxyAckEvent> event);
int OnObserveClosure(const PortRef& port_ref,
std::unique_ptr<ObserveClosureEvent> event);
int OnMergePort(const PortRef& port_ref,
std::unique_ptr<MergePortEvent> event);
int OnUserMessageReadAckRequest(
const PortRef& port_ref,
std::unique_ptr<UserMessageReadAckRequestEvent> event);
int OnUserMessageReadAck(const PortRef& port_ref,
std::unique_ptr<UserMessageReadAckEvent> event);
int OnUpdatePreviousPeer(const PortRef& port_ref,
std::unique_ptr<UpdatePreviousPeerEvent> event);
int AddPortWithName(const PortName& port_name, scoped_refptr<Port> port);
void ErasePort(const PortName& port_name);
// Check if the event is sent by the previous peer of the port to decide if
// we can check the sequence number.
// This is not the case for example for PortAccepted or broadcasted events.
bool IsEventFromPreviousPeer(const Event& event);
int AcceptEventInternal(const PortRef& port_ref,
const NodeName& from_node,
ScopedEvent event);
int SendUserMessageInternal(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message);
int MergePortsInternal(const PortRef& port0_ref,
const PortRef& port1_ref,
bool allow_close_on_bad_state);
void ConvertToProxy(Port* port,
const NodeName& to_node_name,
PortName* port_name,
Event::PortDescriptor* port_descriptor,
PendingUpdatePreviousPeer* pending_update);
int AcceptPort(const PortName& port_name,
const Event::PortDescriptor& port_descriptor);
int PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
Port::State expected_port_state,
bool ignore_closed_peer,
UserMessageEvent* message,
NodeName* forward_to_node);
int BeginProxying(const PortRef& port_ref);
int ForwardUserMessagesFromProxy(const PortRef& port_ref);
void InitiateProxyRemoval(const PortRef& port_ref);
void TryRemoveProxy(const PortRef& port_ref);
void DestroyAllPortsWithPeer(const NodeName& node_name,
const PortName& port_name);
// Changes the peer node and port name referenced by |port|. Note that both
// |ports_lock_| MUST be held through the extent of this method.
// |local_port|'s lock must be held if and only if a reference to |local_port|
// exist in |ports_|.
void UpdatePortPeerAddress(const PortName& local_port_name,
Port* local_port,
const NodeName& new_peer_node,
const PortName& new_peer_port);
// Removes an entry from |peer_port_map_| corresponding to |local_port|'s peer
// address, if valid.
void RemoveFromPeerPortMap(const PortName& local_port_name, Port* local_port);
// Swaps the peer information for two local ports. Used during port merges.
// Note that |ports_lock_| must be held along with each of the two port's own
// locks, through the extent of this method.
void SwapPortPeers(const PortName& port0_name,
Port* port0,
const PortName& port1_name,
Port* port1);
// Sends an acknowledge request to the peer if the port has a non-zero
// |sequence_num_acknowledge_interval|. This needs to be done when the port's
// peer changes, as the previous peer proxy may not have forwarded any prior
// acknowledge request before deleting itself.
void MaybeResendAckRequest(const PortRef& port_ref);
// Forwards a stored acknowledge request to the peer if the proxy has a
// non-zero |sequence_num_acknowledge_interval|.
void MaybeForwardAckRequest(const PortRef& port_ref);
// Sends an acknowledge of the most recently read sequence number to the peer
// if any messages have been read, and the port has a non-zero
// |sequence_num_to_acknowledge|.
void MaybeResendAck(const PortRef& port_ref);
const NodeName name_;
const DelegateHolder delegate_;
// Just to clarify readability of the types below.
using LocalPortName = PortName;
using PeerPortName = PortName;
// Guards access to |ports_| and |peer_port_maps_| below.
//
// This must never be acquired while an individual port's lock is held on the
// same thread. Conversely, individual port locks may be acquired while this
// one is held.
//
// Because UserMessage events may execute arbitrary user code during
// destruction, it is also important to ensure that such events are never
// destroyed while this (or any individual Port) lock is held.
base::Lock ports_lock_;
std::unordered_map<LocalPortName, scoped_refptr<Port>> ports_;
// Maps a peer port name to a list of PortRefs for all local ports which have
// the port name key designated as their peer port. The set of local ports
// which have the same peer port is expected to always be relatively small and
// usually 1. Hence we just use a flat_map of local PortRefs keyed on each
// local port's name.
using PeerPortMap =
std::unordered_map<PeerPortName, base::flat_map<LocalPortName, PortRef>>;
// A reverse mapping which can be used to find all local ports that reference
// a given peer node or a local port that references a specific given peer
// port on a peer node. The key to this map is the corresponding peer node
// name.
std::unordered_map<NodeName, PeerPortMap> peer_port_maps_;
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_NODE_H_