1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
base / task / post_job.cc [blame]
// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/post_job.h"
#include "base/task/scoped_set_task_priority_for_current_thread.h"
#include "base/task/thread_pool/job_task_source.h"
#include "base/task/thread_pool/pooled_task_runner_delegate.h"
#include "base/task/thread_pool/thread_pool_impl.h"
#include "base/task/thread_pool/thread_pool_instance.h"
namespace base {
namespace {
scoped_refptr<internal::JobTaskSource> CreateJobTaskSource(
const Location& from_here,
const TaskTraits& traits,
RepeatingCallback<void(JobDelegate*)> worker_task,
MaxConcurrencyCallback max_concurrency_callback) {
DCHECK(ThreadPoolInstance::Get())
<< "Hint: if this is in a unit test, you're likely merely missing a "
"base::test::TaskEnvironment member in your fixture.\n";
return base::MakeRefCounted<internal::JobTaskSource>(
from_here, traits, std::move(worker_task),
std::move(max_concurrency_callback),
static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get()));
}
} // namespace
JobDelegate::JobDelegate(
internal::JobTaskSource* task_source,
internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate)
: task_source_(task_source),
pooled_task_runner_delegate_(pooled_task_runner_delegate) {
DCHECK(task_source_);
}
JobDelegate::~JobDelegate() {
if (task_id_ != kInvalidTaskId)
task_source_->ReleaseTaskId(task_id_);
}
bool JobDelegate::ShouldYield() {
#if DCHECK_IS_ON()
// ShouldYield() shouldn't be called again after returning true.
DCHECK(!last_should_yield_);
#endif // DCHECK_IS_ON()
const bool should_yield =
task_source_->ShouldYield() ||
(pooled_task_runner_delegate_ &&
pooled_task_runner_delegate_->ShouldYield(task_source_));
#if DCHECK_IS_ON()
last_should_yield_ = should_yield;
#endif // DCHECK_IS_ON()
return should_yield;
}
void JobDelegate::YieldIfNeeded() {
// TODO(crbug.com/40574605): Implement this.
}
void JobDelegate::NotifyConcurrencyIncrease() {
task_source_->NotifyConcurrencyIncrease();
}
uint8_t JobDelegate::GetTaskId() {
if (task_id_ == kInvalidTaskId)
task_id_ = task_source_->AcquireTaskId();
return task_id_;
}
JobHandle::JobHandle() = default;
JobHandle::JobHandle(scoped_refptr<internal::JobTaskSource> task_source)
: task_source_(std::move(task_source)) {}
JobHandle::~JobHandle() {
DCHECK(!task_source_)
<< "The Job must be cancelled, detached or joined before its "
"JobHandle is destroyed.";
}
JobHandle::JobHandle(JobHandle&&) = default;
JobHandle& JobHandle::operator=(JobHandle&& other) {
DCHECK(!task_source_)
<< "The Job must be cancelled, detached or joined before its "
"JobHandle is re-assigned.";
task_source_ = std::move(other.task_source_);
return *this;
}
bool JobHandle::IsActive() const {
return task_source_->IsActive();
}
void JobHandle::UpdatePriority(TaskPriority new_priority) {
if (!internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
task_source_->delegate())) {
return;
}
task_source_->delegate()->UpdateJobPriority(task_source_, new_priority);
}
void JobHandle::NotifyConcurrencyIncrease() {
if (!internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
task_source_->delegate())) {
return;
}
task_source_->NotifyConcurrencyIncrease();
}
void JobHandle::Join() {
DCHECK(internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
task_source_->delegate()));
DCHECK_GE(internal::GetTaskPriorityForCurrentThread(),
task_source_->priority_racy())
<< "Join may not be called on Job with higher priority than the current "
"thread.";
UpdatePriority(internal::GetTaskPriorityForCurrentThread());
if (task_source_->GetRemainingConcurrency() != 0) {
// Make sure the task source is in the queue if not enough workers are
// contributing. This is necessary for CreateJob(...).Join(). This is a
// noop if the task source was already in the queue.
task_source_->delegate()->EnqueueJobTaskSource(task_source_);
}
bool must_run = task_source_->WillJoin();
while (must_run)
must_run = task_source_->RunJoinTask();
// Remove |task_source_| from the ThreadPool to prevent access to
// |max_concurrency_callback| after Join().
task_source_->delegate()->RemoveJobTaskSource(task_source_);
task_source_ = nullptr;
}
void JobHandle::Cancel() {
DCHECK(internal::PooledTaskRunnerDelegate::MatchesCurrentDelegate(
task_source_->delegate()));
task_source_->Cancel();
bool must_run = task_source_->WillJoin();
DCHECK(!must_run);
// Remove |task_source_| from the ThreadPool to prevent access to
// |max_concurrency_callback| after Join().
task_source_->delegate()->RemoveJobTaskSource(task_source_);
task_source_ = nullptr;
}
void JobHandle::CancelAndDetach() {
task_source_->Cancel();
Detach();
}
void JobHandle::Detach() {
DCHECK(task_source_);
task_source_ = nullptr;
}
JobHandle PostJob(const Location& from_here,
const TaskTraits& traits,
RepeatingCallback<void(JobDelegate*)> worker_task,
MaxConcurrencyCallback max_concurrency_callback) {
auto task_source =
CreateJobTaskSource(from_here, traits, std::move(worker_task),
std::move(max_concurrency_callback));
const bool queued =
static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get())
->EnqueueJobTaskSource(task_source);
if (queued) {
return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
}
return JobHandle();
}
JobHandle CreateJob(const Location& from_here,
const TaskTraits& traits,
RepeatingCallback<void(JobDelegate*)> worker_task,
MaxConcurrencyCallback max_concurrency_callback) {
auto task_source =
CreateJobTaskSource(from_here, traits, std::move(worker_task),
std::move(max_concurrency_callback));
return internal::JobTaskSource::CreateJobHandle(std::move(task_source));
}
} // namespace base