1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
mojo / core / ports / port.h [blame]
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_PORT_H_
#define MOJO_CORE_PORTS_PORT_H_
#include <map>
#include <memory>
#include <queue>
#include <utility>
#include <vector>
#include "base/containers/queue.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/message_queue.h"
#include "mojo/core/ports/user_data.h"
namespace mojo {
namespace core {
namespace ports {
class PortLocker;
// A Port is essentially a node in a circular list of addresses. For the sake of
// this documentation such a list will henceforth be referred to as a "route."
// Routes are the fundamental medium upon which all Node event circulation takes
// place and are thus the backbone of all Mojo message passing.
//
// Each Port is identified by a 128-bit address within a Node (see node.h). A
// Port doesn't really *do* anything per se: it's a named collection of state,
// and its owning Node manages all event production, transmission, routing, and
// processing logic. See Node for more details on how Ports may be used to
// transmit arbitrary user messages as well as other Ports.
//
// Ports may be in any of a handful of states (see State below) which dictate
// how they react to system events targeting them. In the simplest and most
// common case, Ports are initially created as an entangled pair (i.e. a simple
// cycle consisting of two Ports) both in the |kReceiving| State. Consider Ports
// we'll label |A| and |B| here, which may be created using
// Node::CreatePortPair():
//
// +-----+ +-----+
// | |--------->| |
// | A | | B |
// | |<---------| |
// +-----+ +-----+
//
// |A| references |B| via |peer_node_name| and |peer_port_name|, while |B| in
// turn references |A|. Note that a Node is NEVER aware of who is sending events
// to a given Port; it is only aware of where it must route events FROM a given
// Port.
//
// For the sake of documentation, we refer to one receiving port in a route as
// the "conjugate" of the other. A receiving port's conjugate is also its peer
// upon initial creation, but because of proxying this may not be the case at a
// later time.
//
// ALL access to this data structure must be guarded by |lock_| acquisition,
// which is only possible using a PortLocker. PortLocker ensures that
// overlapping Port lock acquisitions on a single thread are always acquired in
// a globally consistent order.
class Port : public base::RefCountedThreadSafe<Port> {
public:
// The state of a given Port. A Port may only exist in one of these states at
// any given time.
enum State {
// The Port is not yet paired with a peer and is therefore unusable. See
// Node::CreateUninitializedPort and Node::InitializePort for motivation.
kUninitialized,
// The Port is publicly visible outside of its Node and may be used to send
// and receive user messages. There are always AT MOST two |kReceiving|
// Ports along any given route. A user message event sent from a receiving
// port is always circulated along the Port's route until it reaches either
// a dead-end -- in which case the route is broken -- or it reaches the
// other receiving Port in the route -- in which case it lands in that
// Port's incoming message queue which can by read by user code.
kReceiving,
// The Port has been taken out of the |kReceiving| state in preparation for
// proxying to a new destination. A Port enters this state immediately when
// it's attached to a user message and may only leave this state when
// transitioning to |kProxying|. See Node for more details.
kBuffering,
// The Port is forwarding all user messages (and most other events) to its
// peer without discretion. Ports in the |kProxying| state may never leave
// this state and only exist temporarily until their owning Node has
// established that no more events will target them. See Node for more
// details.
kProxying,
// The Port has been closed and is now permanently unusable. Only
// |kReceiving| ports can be closed.
kClosed
};
// The current State of the Port.
State state;
// The Node and Port address to which events should be routed FROM this Port.
// Note that this is NOT necessarily the address of the Port currently sending
// events TO this Port.
NodeName peer_node_name;
PortName peer_port_name;
// We keep track of the port that is currently sending messages to this port.
// This allows us to verify that the sender node is allowed to send messages
// to this port as a mitigation against info leak vulnerabilities.
// Tracking the previous port has the nice side effect of keeping received
// messages in order.
NodeName prev_node_name;
PortName prev_port_name;
// Mark this port as to be merged.
bool pending_merge_peer;
// Next sequence number to send for all event messages.
uint64_t next_control_sequence_num_to_send;
uint64_t next_control_sequence_num_to_receive;
// The next available sequence number to use for outgoing user message events
// originating from this port.
uint64_t next_sequence_num_to_send;
// The largest acknowledged user message event sequence number.
uint64_t last_sequence_num_acknowledged;
// The interval for which acknowledge requests will be sent. A value of N will
// cause an acknowledge request for |last_sequence_num_acknowledged| + N when
// initially set and on received acknowledge. This means that the lower bound
// for unread or in-transit messages is |next_sequence_num_to_send| -
// |last_sequence_num_acknowledged| + |sequence_number_acknowledge_interval|.
// If zero, no acknowledge requests are sent.
uint64_t sequence_num_acknowledge_interval;
// The sequence number of the last message this Port should ever expect to
// receive in its lifetime. May be used to determine that a proxying port is
// ready to be destroyed or that a receiving port's conjugate has been closed
// and we know the sequence number of the last message it sent.
uint64_t last_sequence_num_to_receive;
// The sequence number of the message for which this Port should send an
// acknowledge message. In the buffering state, holds the acknowledge request
// value that is forwarded to the peer on transition to proxying.
// This is zero in any port that's never received an acknowledge request, and
// in proxies that have forwarded a stored acknowledge.
uint64_t sequence_num_to_acknowledge;
// The queue of incoming user messages received by this Port. Only non-empty
// for buffering or receiving Ports. When a buffering port enters the proxying
// state, it flushes its queue and the proxy then bypasses the queue
// indefinitely.
//
// A receiving port's queue only has elements removed by user code reading
// messages from the port.
//
// Note that this is a priority queue which only exposes messages to consumers
// in strict sequential order.
MessageQueue message_queue;
// Buffer outgoing control messages while this port is in kBuffering state.
base::queue<std::pair<NodeName, ScopedEvent>> control_message_queue;
// In some edge cases, a Node may need to remember to route a single special
// event upon destruction of this (proxying) Port. That event is stashed here
// in the interim.
std::unique_ptr<std::pair<NodeName, ScopedEvent>> send_on_proxy_removal;
// Arbitrary user data attached to the Port. In practice, Mojo uses this to
// stash an observer interface which can be notified about various Port state
// changes.
scoped_refptr<UserData> user_data;
// Indicates that this (proxying) Port has received acknowledgement that no
// new user messages will be routed to it. If |true|, the proxy will be
// removed once it has received and forwarded all sequenced messages up to and
// including the one numbered |last_sequence_num_to_receive|.
bool remove_proxy_on_last_message;
// Indicates that this Port is aware that its nearest (in terms of forward,
// non-zero cyclic routing distance) receiving Port has been closed.
bool peer_closed;
// Indicates that this Port lost its peer unexpectedly (e.g. via process death
// rather than receiving an ObserveClosure event). In this case
// |peer_closed| will be true but |last_sequence_num_to_receive| cannot be
// known. Such ports will continue to make message available until their
// message queue is empty.
bool peer_lost_unexpectedly;
Port(uint64_t next_sequence_num_to_send,
uint64_t next_sequence_num_to_receive);
Port(const Port&) = delete;
Port& operator=(const Port&) = delete;
void AssertLockAcquired() {
#if DCHECK_IS_ON()
lock_.AssertAcquired();
#endif
}
// Check if the given event should be handled next based on the sequence
// number and sender peer.
bool IsNextEvent(const NodeName& from_node, const Event& event);
// Get the next buffered event to be processed. If none is available, |event|
// will not be modified.
void NextEvent(NodeName* from_node, ScopedEvent* event);
// Buffer the event for later processing.
void BufferEvent(const NodeName& from_node, ScopedEvent event);
// Flushes the queue of events pending peer verification and returns all user
// events
void TakePendingMessages(
std::vector<std::unique_ptr<UserMessageEvent>>& messages);
private:
using NodePortPair = std::pair<NodeName, PortName>;
using EventQueue = std::vector<std::unique_ptr<Event>>;
std::map<NodePortPair, EventQueue> control_event_queues_;
friend class base::RefCountedThreadSafe<Port>;
friend class PortLocker;
~Port();
base::Lock lock_;
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_PORT_H_