1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
base / synchronization / waitable_event_watcher_posix.cc [blame]
// Copyright 2012 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/synchronization/waitable_event_watcher.h"
#include <utility>
#include "base/check.h"
#include "base/functional/bind.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
namespace base {
// -----------------------------------------------------------------------------
// WaitableEventWatcher (async waits).
//
// The basic design is that we add an AsyncWaiter to the wait-list of the event.
// That AsyncWaiter has a pointer to SequencedTaskRunner, and a Task to be
// posted to it. The task ends up calling the callback when it runs on the
// sequence.
//
// Since the wait can be canceled, we have a thread-safe Flag object which is
// set when the wait has been canceled. At each stage in the above, we check the
// flag before going onto the next stage. Since the wait may only be canceled in
// the sequence which runs the Task, we are assured that the callback cannot be
// called after canceling...
// -----------------------------------------------------------------------------
// A thread-safe, reference-counted, write-once flag.
// -----------------------------------------------------------------------------
class Flag : public RefCountedThreadSafe<Flag> {
public:
Flag() { flag_ = false; }
Flag(const Flag&) = delete;
Flag& operator=(const Flag&) = delete;
void Set() {
AutoLock locked(lock_);
flag_ = true;
}
bool value() const {
AutoLock locked(lock_);
return flag_;
}
private:
friend class RefCountedThreadSafe<Flag>;
~Flag() = default;
mutable Lock lock_;
bool flag_;
};
// -----------------------------------------------------------------------------
// This is an asynchronous waiter which posts a task to a SequencedTaskRunner
// when fired. An AsyncWaiter may only be in a single wait-list.
// -----------------------------------------------------------------------------
class AsyncWaiter : public WaitableEvent::Waiter {
public:
AsyncWaiter(scoped_refptr<SequencedTaskRunner> task_runner,
base::OnceClosure callback,
Flag* flag)
: task_runner_(std::move(task_runner)),
callback_(std::move(callback)),
flag_(flag) {}
bool Fire(WaitableEvent* event) override {
// Post the callback if we haven't been cancelled.
if (!flag_->value())
task_runner_->PostTask(FROM_HERE, std::move(callback_));
// We are removed from the wait-list by the WaitableEvent itself. It only
// remains to delete ourselves.
delete this;
// We can always return true because an AsyncWaiter is never in two
// different wait-lists at the same time.
return true;
}
// See StopWatching for discussion
bool Compare(void* tag) override { return tag == flag_.get(); }
private:
const scoped_refptr<SequencedTaskRunner> task_runner_;
base::OnceClosure callback_;
const scoped_refptr<Flag> flag_;
};
// -----------------------------------------------------------------------------
// For async waits we need to run a callback on a sequence. We do this by
// posting an AsyncCallbackHelper task, which calls the callback and keeps track
// of when the event is canceled.
// -----------------------------------------------------------------------------
void AsyncCallbackHelper(Flag* flag,
WaitableEventWatcher::EventCallback callback,
WaitableEvent* event) {
// Runs on the sequence that called StartWatching().
if (!flag->value()) {
// This is to let the WaitableEventWatcher know that the event has occured.
flag->Set();
std::move(callback).Run(event);
}
}
WaitableEventWatcher::WaitableEventWatcher() {
DETACH_FROM_SEQUENCE(sequence_checker_);
}
WaitableEventWatcher::~WaitableEventWatcher() {
// The destructor may be called from a different sequence than StartWatching()
// when there is no active watch. To avoid triggering a DCHECK in
// StopWatching(), do not call it when there is no active watch.
if (cancel_flag_ && !cancel_flag_->value())
StopWatching();
}
// -----------------------------------------------------------------------------
// The Handle is how the user cancels a wait. After deleting the Handle we
// insure that the delegate cannot be called.
// -----------------------------------------------------------------------------
bool WaitableEventWatcher::StartWatching(
WaitableEvent* event,
EventCallback callback,
scoped_refptr<SequencedTaskRunner> task_runner) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
// A user may call StartWatching from within the callback function. In this
// case, we won't know that we have finished watching, expect that the Flag
// will have been set in AsyncCallbackHelper().
if (cancel_flag_.get() && cancel_flag_->value())
cancel_flag_ = nullptr;
DCHECK(!cancel_flag_) << "StartWatching called while still watching";
cancel_flag_ = new Flag;
// UnsafeDanglingUntriaged triggered by test:
// WaitableEventWatcherDeletionTest.SignalAndDelete
// TODO(crbug.com/40061562): Remove `UnsafeDanglingUntriaged`
OnceClosure internal_callback =
base::BindOnce(&AsyncCallbackHelper, base::RetainedRef(cancel_flag_),
std::move(callback), base::UnsafeDanglingUntriaged(event));
WaitableEvent::WaitableEventKernel* kernel = event->kernel_.get();
AutoLock locked(kernel->lock_);
if (kernel->signaled_) {
if (!kernel->manual_reset_)
kernel->signaled_ = false;
// No hairpinning - we can't call the delegate directly here. We have to
// post a task to |task_runner| as usual.
task_runner->PostTask(FROM_HERE, std::move(internal_callback));
return true;
}
kernel_ = kernel;
waiter_ = new AsyncWaiter(std::move(task_runner),
std::move(internal_callback), cancel_flag_.get());
event->Enqueue(waiter_);
return true;
}
void WaitableEventWatcher::StopWatching() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!cancel_flag_.get()) // if not currently watching...
return;
if (cancel_flag_->value()) {
// In this case, the event has fired, but we haven't figured that out yet.
// The WaitableEvent may have been deleted too.
cancel_flag_ = nullptr;
return;
}
if (!kernel_.get()) {
// We have no kernel. This means that we never enqueued a Waiter on an
// event because the event was already signaled when StartWatching was
// called.
//
// In this case, a task was enqueued on the MessageLoop and will run.
// We set the flag in case the task hasn't yet run. The flag will stop the
// delegate getting called. If the task has run then we have the last
// reference to the flag and it will be deleted immediately after.
cancel_flag_->Set();
cancel_flag_ = nullptr;
return;
}
AutoLock locked(kernel_->lock_);
// We have a lock on the kernel. No one else can signal the event while we
// have it.
// We have a possible ABA issue here. If Dequeue was to compare only the
// pointer values then it's possible that the AsyncWaiter could have been
// fired, freed and the memory reused for a different Waiter which was
// enqueued in the same wait-list. We would think that that waiter was our
// AsyncWaiter and remove it.
//
// To stop this, Dequeue also takes a tag argument which is passed to the
// virtual Compare function before the two are considered a match. So we need
// a tag which is good for the lifetime of this handle: the Flag. Since we
// have a reference to the Flag, its memory cannot be reused while this object
// still exists. So if we find a waiter with the correct pointer value, and
// which shares a Flag pointer, we have a real match.
if (kernel_->Dequeue(waiter_, cancel_flag_.get())) {
// Case 2: the waiter hasn't been signaled yet; it was still on the wait
// list. We've removed it, thus we can delete it and the task (which cannot
// have been enqueued with the MessageLoop because the waiter was never
// signaled)
delete waiter_;
cancel_flag_ = nullptr;
return;
}
// Case 3: the waiter isn't on the wait-list, thus it was signaled. It may not
// have run yet, so we set the flag to tell it not to bother enqueuing the
// task on the SequencedTaskRunner, but to delete it instead. The Waiter
// deletes itself once run.
cancel_flag_->Set();
cancel_flag_ = nullptr;
// If the waiter has already run then the task has been enqueued. If the Task
// hasn't yet run, the flag will stop the delegate from getting called. (This
// is thread safe because one may only delete a Handle from the sequence that
// called StartWatching()).
//
// If the delegate has already been called then we have nothing to do. The
// task has been deleted by the MessageLoop.
}
} // namespace base