1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
base / task / post_job.h [blame]
// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_POST_JOB_H_
#define BASE_TASK_POST_JOB_H_
#include <limits>
#include "base/base_export.h"
#include "base/dcheck_is_on.h"
#include "base/functional/callback.h"
#include "base/location.h"
#include "base/memory/stack_allocated.h"
namespace base {
namespace internal {
class JobTaskSource;
class PooledTaskRunnerDelegate;
}
class TaskTraits;
enum class TaskPriority : uint8_t;
// Delegate that's passed to Job's worker task, providing an entry point to
// communicate with the scheduler. To prevent deadlocks, JobDelegate methods
// should never be called while holding a user lock.
class BASE_EXPORT JobDelegate {
STACK_ALLOCATED();
public:
// A JobDelegate is instantiated for each worker task that is run.
// |task_source| is the task source whose worker task is running with this
// delegate and |pooled_task_runner_delegate| is used by ShouldYield() to
// check whether the pool wants this worker task to yield (null if this worker
// should never yield -- e.g. when the main thread is a worker).
JobDelegate(internal::JobTaskSource* task_source,
internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate);
JobDelegate(const JobDelegate&) = delete;
JobDelegate& operator=(const JobDelegate&) = delete;
~JobDelegate();
// Returns true if this thread *must* return from the worker task on the
// current thread ASAP. Workers should periodically invoke ShouldYield (or
// YieldIfNeeded()) as often as is reasonable.
bool ShouldYield();
// If ShouldYield(), this will pause the current thread (allowing it to be
// replaced in the pool); no-ops otherwise. If it pauses, it will resume and
// return from this call whenever higher priority work completes.
// Prefer ShouldYield() over this (only use YieldIfNeeded() when unwinding
// the stack is not possible).
void YieldIfNeeded();
// Notifies the scheduler that max concurrency was increased, and the number
// of worker should be adjusted accordingly. See PostJob() for more details.
void NotifyConcurrencyIncrease();
// Returns a task_id unique among threads currently running this job, such
// that GetTaskId() < worker count. To achieve this, the same task_id may be
// reused by a different thread after a worker_task returns.
uint8_t GetTaskId();
// Returns true if the current task is called from the thread currently
// running JobHandle::Join().
bool IsJoiningThread() const {
return pooled_task_runner_delegate_ == nullptr;
}
private:
static constexpr uint8_t kInvalidTaskId = std::numeric_limits<uint8_t>::max();
internal::JobTaskSource* task_source_ = nullptr;
internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate_ = nullptr;
uint8_t task_id_ = kInvalidTaskId;
#if DCHECK_IS_ON()
// Value returned by the last call to ShouldYield().
bool last_should_yield_ = false;
#endif
};
// Handle returned when posting a Job. Provides methods to control execution of
// the posted Job. To prevent deadlocks, JobHandle methods should never be
// called while holding a user lock.
class BASE_EXPORT JobHandle {
public:
JobHandle();
JobHandle(const JobHandle&) = delete;
JobHandle& operator=(const JobHandle&) = delete;
// A job must either be joined, canceled or detached before the JobHandle is
// destroyed.
~JobHandle();
JobHandle(JobHandle&&);
JobHandle& operator=(JobHandle&&);
// Returns true if associated with a Job.
explicit operator bool() const { return task_source_ != nullptr; }
// Returns true if there's any work pending or any worker running.
bool IsActive() const;
// Update this Job's priority.
void UpdatePriority(TaskPriority new_priority);
// Notifies the scheduler that max concurrency was increased, and the number
// of workers should be adjusted accordingly. See PostJob() for more details.
void NotifyConcurrencyIncrease();
// Contributes to the job on this thread. Doesn't return until all tasks have
// completed and max concurrency becomes 0. This also promotes this Job's
// priority to be at least as high as the calling thread's priority. When
// called immediately, prefer CreateJob(...).Join() over PostJob(...).Join()
// to avoid having too many workers scheduled for executing the workload.
void Join();
// Forces all existing workers to yield ASAP. Waits until they have all
// returned from the Job's callback before returning.
void Cancel();
// Forces all existing workers to yield ASAP but doesn’t wait for them.
// Warning, this is dangerous if the Job's callback is bound to or has access
// to state which may be deleted after this call.
void CancelAndDetach();
// Can be invoked before ~JobHandle() to avoid waiting on the job completing.
void Detach();
private:
friend class internal::JobTaskSource;
explicit JobHandle(scoped_refptr<internal::JobTaskSource> task_source);
scoped_refptr<internal::JobTaskSource> task_source_;
};
// Callback used in PostJob() to control the maximum number of threads calling
// the worker task concurrently.
// Returns the maximum number of threads which may call a job's worker task
// concurrently. |worker_count| is the number of threads currently assigned to
// this job which some callers may need to determine their return value.
using MaxConcurrencyCallback =
RepeatingCallback<size_t(size_t /*worker_count*/)>;
// Posts a repeating |worker_task| with specific |traits| to run in parallel on
// base::ThreadPool.
// Returns a JobHandle associated with the Job, which can be joined, canceled or
// detached.
// ThreadPool APIs, including PostJob() and methods of the returned JobHandle,
// must never be called while holding a lock that could be acquired by
// |worker_task| or |max_concurrency_callback| -- that could result in a
// deadlock. This is because [1] |max_concurrency_callback| may be invoked while
// holding internal ThreadPool lock (A), hence |max_concurrency_callback| can
// only use a lock (B) if that lock is *never* held while calling back into a
// ThreadPool entry point from any thread (A=>B/B=>A deadlock) and [2]
// |worker_task| or |max_concurrency_callback| is invoked synchronously from
// JobHandle::Join() (A=>JobHandle::Join()=>A deadlock).
// To avoid scheduling overhead, |worker_task| should do as much work as
// possible in a loop when invoked, and JobDelegate::ShouldYield() should be
// periodically invoked to conditionally exit and let the scheduler prioritize
// work.
//
// A canonical implementation of |worker_task| looks like:
// void WorkerTask(JobDelegate* job_delegate) {
// while (!job_delegate->ShouldYield()) {
// auto work_item = worker_queue.TakeWorkItem(); // Smallest unit of work.
// if (!work_item)
// return:
// ProcessWork(work_item);
// }
// }
//
// |max_concurrency_callback| controls the maximum number of threads calling
// |worker_task| concurrently. |worker_task| is only invoked if the number of
// threads previously running |worker_task| was less than the value returned by
// |max_concurrency_callback|. In general, |max_concurrency_callback| should
// return the latest number of incomplete work items (smallest unit of work)
// left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must*
// be invoked shortly after |max_concurrency_callback| starts returning a value
// larger than previously returned values. This usually happens when new work
// items are added and the API user wants additional threads to invoke
// |worker_task| concurrently. The callbacks may be called concurrently on any
// thread until the job is complete. If the job handle is detached, the
// callbacks may still be called, so they must not access global state that
// could be destroyed.
//
// |traits| requirements:
// - base::ThreadPolicy must be specified if the priority of the task runner
// will ever be increased from BEST_EFFORT.
JobHandle BASE_EXPORT PostJob(const Location& from_here,
const TaskTraits& traits,
RepeatingCallback<void(JobDelegate*)> worker_task,
MaxConcurrencyCallback max_concurrency_callback);
// Creates and returns a JobHandle associated with a Job. Unlike PostJob(), this
// doesn't immediately schedules |worker_task| to run on base::ThreadPool
// workers; the Job is then scheduled by calling either
// NotifyConcurrencyIncrease() or Join().
JobHandle BASE_EXPORT
CreateJob(const Location& from_here,
const TaskTraits& traits,
RepeatingCallback<void(JobDelegate*)> worker_task,
MaxConcurrencyCallback max_concurrency_callback);
} // namespace base
#endif // BASE_TASK_POST_JOB_H_