1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
base / task / thread_pool / sequence.cc [blame]
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/thread_pool/sequence.h"
#include <utility>
#include "base/check.h"
#include "base/critical_closure.h"
#include "base/feature_list.h"
#include "base/functional/bind.h"
#include "base/memory/ptr_util.h"
#include "base/memory/stack_allocated.h"
#include "base/task/task_features.h"
#include "base/time/time.h"
namespace base {
namespace internal {
namespace {
// Asserts that a lock is acquired and annotates the scope such that
// base/thread_annotations.h can recognize that the lock is acquired.
class SCOPED_LOCKABLE AnnotateLockAcquired {
STACK_ALLOCATED();
public:
explicit AnnotateLockAcquired(const CheckedLock& lock)
EXCLUSIVE_LOCK_FUNCTION(lock)
: acquired_lock_(lock) {
acquired_lock_.AssertAcquired();
}
~AnnotateLockAcquired() UNLOCK_FUNCTION() { acquired_lock_.AssertAcquired(); }
private:
const CheckedLock& acquired_lock_;
};
void MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,
Task& task) {
switch (shutdown_behavior) {
case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
// Nothing to do.
break;
case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
// MakeCriticalClosure() is arguably useful for SKIP_ON_SHUTDOWN, possibly
// in combination with is_immediate=false. However, SKIP_ON_SHUTDOWN is
// the default and hence the theoretical benefits don't warrant the
// performance implications.
break;
case TaskShutdownBehavior::BLOCK_SHUTDOWN:
task.task =
MakeCriticalClosure(task.posted_from, std::move(task.task),
/*is_immediate=*/task.delayed_run_time.is_null());
break;
}
}
} // namespace
Sequence::Transaction::Transaction(Sequence* sequence)
: TaskSource::Transaction(sequence) {}
Sequence::Transaction::Transaction(Sequence::Transaction&& other) = default;
Sequence::Transaction::~Transaction() = default;
bool Sequence::Transaction::WillPushImmediateTask() {
// In a Transaction.
AnnotateLockAcquired annotate(sequence()->lock_);
bool was_immediate =
sequence()->is_immediate_.exchange(true, std::memory_order_relaxed);
return !was_immediate;
}
void Sequence::Transaction::PushImmediateTask(Task task) {
// In a Transaction.
AnnotateLockAcquired annotate(sequence()->lock_);
// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
// for details.
CHECK(task.task);
DCHECK(!task.queue_time.is_null());
DCHECK(sequence()->is_immediate_.load(std::memory_order_relaxed));
bool was_unretained = sequence()->IsEmpty() && !sequence()->has_worker_;
bool queue_was_empty = sequence()->queue_.empty();
MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
sequence()->queue_.push(std::move(task));
if (queue_was_empty)
sequence()->UpdateReadyTimes();
// AddRef() matched by manual Release() when the sequence has no more tasks
// to run (in DidProcessTask() or Clear()).
if (was_unretained && sequence()->task_runner())
sequence()->task_runner()->AddRef();
}
bool Sequence::Transaction::PushDelayedTask(Task task) {
// In a Transaction.
AnnotateLockAcquired annotate(sequence()->lock_);
// Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
// for details.
CHECK(task.task);
DCHECK(!task.queue_time.is_null());
DCHECK(!task.delayed_run_time.is_null());
bool top_will_change = sequence()->DelayedSortKeyWillChange(task);
bool was_empty = sequence()->IsEmpty();
MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
sequence()->delayed_queue_.insert(std::move(task));
if (sequence()->queue_.empty())
sequence()->UpdateReadyTimes();
// AddRef() matched by manual Release() when the sequence has no more tasks
// to run (in DidProcessTask() or Clear()).
if (was_empty && !sequence()->has_worker_ && sequence()->task_runner())
sequence()->task_runner()->AddRef();
return top_will_change;
}
// Delayed tasks are ordered by latest_delayed_run_time(). The top task may
// not be the first task eligible to run, but tasks will always become ripe
// before their latest_delayed_run_time().
bool Sequence::DelayedTaskGreater::operator()(const Task& lhs,
const Task& rhs) const {
TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
return std::tie(lhs_latest_delayed_run_time, lhs.sequence_num) >
std::tie(rhs_latest_delayed_run_time, rhs.sequence_num);
}
TaskSource::RunStatus Sequence::WillRunTask() {
// There should never be a second call to WillRunTask() before DidProcessTask
// since the RunStatus is always marked a saturated.
DCHECK(!has_worker_);
// It's ok to access |has_worker_| outside of a Transaction since
// WillRunTask() is externally synchronized, always called in sequence with
// TakeTask() and DidProcessTask() and only called if HasReadyTasks(), which
// means it won't race with Push[Immediate/Delayed]Task().
has_worker_ = true;
return RunStatus::kAllowedSaturated;
}
bool Sequence::OnBecomeReady() {
DCHECK(!has_worker_);
// std::memory_order_relaxed is sufficient because no other state is
// synchronized with |is_immediate_| outside of |lock_|.
return !is_immediate_.exchange(true, std::memory_order_relaxed);
}
size_t Sequence::GetRemainingConcurrency() const {
return 1;
}
Task Sequence::TakeNextImmediateTask() {
Task next_task = std::move(queue_.front());
queue_.pop();
return next_task;
}
Task Sequence::TakeEarliestTask() {
if (queue_.empty())
return delayed_queue_.take_top();
if (delayed_queue_.empty())
return TakeNextImmediateTask();
// Both queues contain at least a task. Decide from which one the task should
// be taken.
if (queue_.front().queue_time <=
delayed_queue_.top().latest_delayed_run_time())
return TakeNextImmediateTask();
return delayed_queue_.take_top();
}
void Sequence::UpdateReadyTimes() {
DCHECK(!IsEmpty());
if (queue_.empty()) {
latest_ready_time_.store(delayed_queue_.top().latest_delayed_run_time(),
std::memory_order_relaxed);
earliest_ready_time_.store(delayed_queue_.top().earliest_delayed_run_time(),
std::memory_order_relaxed);
return;
}
if (delayed_queue_.empty()) {
latest_ready_time_.store(queue_.front().queue_time,
std::memory_order_relaxed);
} else {
latest_ready_time_.store(
std::min(queue_.front().queue_time,
delayed_queue_.top().latest_delayed_run_time()),
std::memory_order_relaxed);
}
earliest_ready_time_.store(TimeTicks(), std::memory_order_relaxed);
}
Task Sequence::TakeTask(TaskSource::Transaction* transaction) {
CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
AnnotateLockAcquired annotate(lock_);
DCHECK(has_worker_);
DCHECK(is_immediate_.load(std::memory_order_relaxed));
DCHECK(!queue_.empty() || !delayed_queue_.empty());
auto next_task = TakeEarliestTask();
if (!IsEmpty())
UpdateReadyTimes();
return next_task;
}
bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) {
CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
AnnotateLockAcquired annotate(lock_);
// There should never be a call to DidProcessTask without an associated
// WillRunTask().
DCHECK(has_worker_);
has_worker_ = false;
// See comment on TaskSource::task_runner_ for lifetime management details.
if (IsEmpty()) {
is_immediate_.store(false, std::memory_order_relaxed);
ReleaseTaskRunner();
return false;
}
// Let the caller re-enqueue this non-empty Sequence regardless of
// |run_result| so it can continue churning through this Sequence's tasks and
// skip/delete them in the proper scope.
return true;
}
bool Sequence::WillReEnqueue(TimeTicks now,
TaskSource::Transaction* transaction) {
CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
AnnotateLockAcquired annotate(lock_);
// This should always be called from a worker thread and it will be
// called after DidProcessTask().
DCHECK(is_immediate_.load(std::memory_order_relaxed));
bool has_ready_tasks = HasReadyTasks(now);
if (!has_ready_tasks)
is_immediate_.store(false, std::memory_order_relaxed);
return has_ready_tasks;
}
bool Sequence::DelayedSortKeyWillChange(const Task& delayed_task) const {
// If sequence has already been picked up by a worker or moved, no need to
// proceed further here.
if (is_immediate_.load(std::memory_order_relaxed)) {
return false;
}
if (IsEmpty()) {
return true;
}
return delayed_task.latest_delayed_run_time() <
delayed_queue_.top().latest_delayed_run_time();
}
bool Sequence::HasReadyTasks(TimeTicks now) const {
return now >= TS_UNCHECKED_READ(earliest_ready_time_)
.load(std::memory_order_relaxed);
}
bool Sequence::HasImmediateTasks() const {
return !queue_.empty();
}
TaskSourceSortKey Sequence::GetSortKey() const {
return TaskSourceSortKey(
priority_racy(),
TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed));
}
TimeTicks Sequence::GetDelayedSortKey() const {
return TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed);
}
std::optional<Task> Sequence::Clear(TaskSource::Transaction* transaction) {
CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
AnnotateLockAcquired annotate(lock_);
// See comment on TaskSource::task_runner_ for lifetime management details.
if (!IsEmpty() && !has_worker_) {
ReleaseTaskRunner();
}
return Task(
FROM_HERE,
base::BindOnce(
[](base::queue<Task> queue,
base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue) {
while (!queue.empty())
queue.pop();
while (!delayed_queue.empty())
delayed_queue.pop();
},
std::move(queue_), std::move(delayed_queue_)),
TimeTicks(), TimeDelta(), TimeDelta(),
static_cast<int>(reinterpret_cast<intptr_t>(this)));
}
void Sequence::ReleaseTaskRunner() {
if (!task_runner())
return;
// No member access after this point, releasing |task_runner()| might delete
// |this|.
task_runner()->Release();
}
Sequence::Sequence(const TaskTraits& traits,
SequencedTaskRunner* task_runner,
TaskSourceExecutionMode execution_mode)
: TaskSource(traits, execution_mode), task_runner_(task_runner) {}
Sequence::~Sequence() = default;
Sequence::Transaction Sequence::BeginTransaction() {
return Transaction(this);
}
ExecutionEnvironment Sequence::GetExecutionEnvironment() {
if (execution_mode() == TaskSourceExecutionMode::kSingleThread) {
return {token_, &sequence_local_storage_,
static_cast<SingleThreadTaskRunner*>(task_runner())};
}
return {token_, &sequence_local_storage_, task_runner()};
}
bool Sequence::IsEmpty() const {
return queue_.empty() && delayed_queue_.empty();
}
} // namespace internal
} // namespace base