1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
base / task / thread_pool / sequence.h [blame]
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_THREAD_POOL_SEQUENCE_H_
#define BASE_TASK_THREAD_POOL_SEQUENCE_H_
#include <stddef.h>
#include "base/base_export.h"
#include "base/compiler_specific.h"
#include "base/containers/intrusive_heap.h"
#include "base/containers/queue.h"
#include "base/sequence_token.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/pooled_parallel_task_runner.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/task_source_sort_key.h"
#include "base/thread_annotations.h"
#include "base/threading/sequence_local_storage_map.h"
namespace base {
namespace internal {
// A Sequence is intended to hold delayed tasks and immediate tasks.
// Delayed tasks are held in a prority_queue until they are ripe and
// immediate tasks in a simple fifo queue.
// When Sequence::TakeTask is called, we select the next appropriate task
// from both queues and return it.
// Each queue holds slots each containing up to a single Task that must be
// executed in posting/runtime order.
//
// In comments below, an "empty Sequence" is a Sequence with no slot.
//
// Note: there is a known refcounted-ownership cycle in the Scheduler
// architecture: Sequence -> Task -> TaskRunner -> Sequence -> ...
// This is okay so long as the other owners of Sequence (PriorityQueue and
// WorkerThread in alternation and
// ThreadGroup::WorkerThreadDelegateImpl::GetWork()
// temporarily) keep running it (and taking Tasks from it as a result). A
// dangling reference cycle would only occur should they release their reference
// to it while it's not empty. In other words, it is only correct for them to
// release it after PopTask() returns false to indicate it was made empty by
// that call (in which case the next PushImmediateTask() will return true to
// indicate to the caller that the Sequence should be re-enqueued for
// execution). This class is thread-safe.
class BASE_EXPORT Sequence : public TaskSource {
public:
// A Transaction can perform multiple operations atomically on a
// Sequence. While a Transaction is alive, it is guaranteed that nothing
// else will access the Sequence; the Sequence's lock is held for the
// lifetime of the Transaction.
class BASE_EXPORT Transaction : public TaskSource::Transaction {
public:
Transaction(Transaction&& other);
Transaction(const Transaction&) = delete;
Transaction& operator=(const Transaction&) = delete;
~Transaction();
// Returns true if the sequence must be added to the immediate queue after
// receiving a new immediate Task in order to be scheduled. If the caller
// doesn't want the sequence to be scheduled, it may not add the sequence to
// the immediate queue even if this returns true.
bool WillPushImmediateTask();
// Adds immediate |task| to the end of this sequence.
void PushImmediateTask(Task task);
// Adds a delayed |task| in this sequence, and returns true if the sequence
// needs to be re-enqueued in the delayed queue as a result of this
// sequence's delayed sort key changing.
bool PushDelayedTask(Task task);
Sequence* sequence() const { return static_cast<Sequence*>(task_source()); }
private:
friend class Sequence;
explicit Transaction(Sequence* sequence);
};
// |traits| is metadata that applies to all Tasks in the Sequence.
// |task_runner| is a reference to the TaskRunner feeding this TaskSource.
// |task_runner| can be nullptr only for tasks with no TaskRunner, in which
// case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
// execution mode of |task_runner|.
Sequence(const TaskTraits& traits,
SequencedTaskRunner* task_runner,
TaskSourceExecutionMode execution_mode);
Sequence(const Sequence&) = delete;
Sequence& operator=(const Sequence&) = delete;
// Begins a Transaction. This method cannot be called on a thread which has an
// active Sequence::Transaction.
[[nodiscard]] Transaction BeginTransaction();
// TaskSource:
ExecutionEnvironment GetExecutionEnvironment() override;
size_t GetRemainingConcurrency() const override;
TaskSourceSortKey GetSortKey() const override;
TimeTicks GetDelayedSortKey() const override;
// Returns a token that uniquely identifies this Sequence.
const SequenceToken& token() const LIFETIME_BOUND { return token_; }
SequenceLocalStorageMap* sequence_local_storage() {
return &sequence_local_storage_;
}
bool OnBecomeReady() override;
bool has_worker_for_testing() const NO_THREAD_SAFETY_ANALYSIS {
return has_worker_;
}
bool is_immediate_for_testing() const { return is_immediate_; }
bool IsEmptyForTesting() const NO_THREAD_SAFETY_ANALYSIS { return IsEmpty(); }
// A reference to TaskRunner is only retained between
// PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns
// false, guaranteeing it is safe to dereference this pointer. Otherwise, the
// caller should guarantee such TaskRunner still exists before dereferencing.
SequencedTaskRunner* task_runner() const { return task_runner_; }
private:
~Sequence() override;
struct DelayedTaskGreater {
bool operator()(const Task& lhs, const Task& rhs) const;
};
// TaskSource:
RunStatus WillRunTask() override;
Task TakeTask(TaskSource::Transaction* transaction) override;
std::optional<Task> Clear(TaskSource::Transaction* transaction) override;
bool DidProcessTask(TaskSource::Transaction* transaction) override;
bool WillReEnqueue(TimeTicks now,
TaskSource::Transaction* transaction) override;
// Returns true if the delayed task to be posted will cause the delayed sort
// key to change.
bool DelayedSortKeyWillChange(const Task& delayed_task) const
EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Selects the earliest task to run, either from immediate or
// delayed queue and return it.
// Expects this sequence to have at least one task that can run
// immediately.
Task TakeEarliestTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Get and return next task from immediate queue
Task TakeNextImmediateTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Update the next earliest/latest ready time.
void UpdateReadyTimes() EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns true if there are immediate tasks
bool HasImmediateTasks() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Returns true if tasks ready to be executed
bool HasReadyTasks(TimeTicks now) const override;
bool IsEmpty() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Releases reference to TaskRunner.
void ReleaseTaskRunner();
const SequenceToken token_ = SequenceToken::Create();
// A pointer to the TaskRunner that posts to this TaskSource, if any. The
// derived class is responsible for calling AddRef() when a TaskSource from
// which no Task is executing becomes non-empty and Release() when
// it becomes empty again (e.g. when DidProcessTask() returns false).
//
// In practise, this pointer is going to become dangling. See task_runner()
// comment.
raw_ptr<SequencedTaskRunner, DisableDanglingPtrDetection> task_runner_;
// Queues of tasks to execute.
base::queue<Task> queue_ GUARDED_BY(lock_);
base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue_
GUARDED_BY(lock_);
// Caches the latest/earliest ready time for atomic access. Writes are
// protected by |lock_|, but allows atomic reads outside of |lock_|. If this
// sequence is empty, these are in an unknown state and shouldn't be read.
// Minimum of latest_delayed_run_time() of next delayed task if any, and
// |queue_time| of next immediate task if any.
std::atomic<TimeTicks> latest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
// is_null() if there is an immediate task, or earliest_delayed_run_time() of
// next delayed task otherwise.
std::atomic<TimeTicks> earliest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
// True if a worker is currently associated with a Task from this Sequence.
bool has_worker_ = false;
// True if the sequence has ready tasks and requested to be queued as such
// through WillPushImmediateTask() or OnBecomeReady(). Reset to false once all
// ready tasks are done being processed and either DidProcessTask() or
// WillReEnqueue() returned false. Normally, |is_immediate_| is protected by
// |lock_|, except in OnBecomeReady() hence the use of atomics.
std::atomic_bool is_immediate_{false};
// Holds data stored through the SequenceLocalStorageSlot API.
SequenceLocalStorageMap sequence_local_storage_;
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_THREAD_POOL_SEQUENCE_H_