1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
base / task / thread_pool / thread_pool_impl.h [blame]
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_THREAD_POOL_THREAD_POOL_IMPL_H_
#define BASE_TASK_THREAD_POOL_THREAD_POOL_IMPL_H_
#include <memory>
#include <optional>
#include <string_view>
#include "base/base_export.h"
#include "base/dcheck_is_on.h"
#include "base/functional/callback.h"
#include "base/memory/ptr_util.h"
#include "base/sequence_checker.h"
#include "base/synchronization/atomic_flag.h"
#include "base/task/single_thread_task_runner_thread_mode.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/delayed_task_manager.h"
#include "base/task/thread_pool/environment_config.h"
#include "base/task/thread_pool/pooled_single_thread_task_runner_manager.h"
#include "base/task/thread_pool/pooled_task_runner_delegate.h"
#include "base/task/thread_pool/service_thread.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/task_tracker.h"
#include "base/task/thread_pool/thread_group.h"
#include "base/task/thread_pool/thread_pool_instance.h"
#include "base/task/updateable_sequenced_task_runner.h"
#include "base/thread_annotations.h"
#include "build/build_config.h"
#if BUILDFLAG(IS_WIN)
#include "base/win/com_init_check_hook.h"
#endif
namespace base {
namespace internal {
// Default ThreadPoolInstance implementation. This class is thread-safe, except
// for methods noted otherwise in thread_pool_instance.h.
class BASE_EXPORT ThreadPoolImpl : public ThreadPoolInstance,
public ThreadGroup::Delegate,
public PooledTaskRunnerDelegate {
public:
using TaskTrackerImpl = TaskTracker;
// Creates a ThreadPoolImpl with a production TaskTracker. |histogram_label|
// is used to label histograms. No histograms are recorded if it is empty.
explicit ThreadPoolImpl(std::string_view histogram_label);
// For testing only. Creates a ThreadPoolImpl with a custom TaskTracker.
// If |!use_background_threads|, background threads will run with default
// priority.
ThreadPoolImpl(std::string_view histogram_label,
std::unique_ptr<TaskTrackerImpl> task_tracker,
bool use_background_threads = true);
ThreadPoolImpl(const ThreadPoolImpl&) = delete;
ThreadPoolImpl& operator=(const ThreadPoolImpl&) = delete;
~ThreadPoolImpl() override;
// ThreadPoolInstance:
void Start(const ThreadPoolInstance::InitParams& init_params,
WorkerThreadObserver* worker_thread_observer) override;
bool WasStarted() const final;
bool WasStartedUnsafe() const final;
size_t GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(
const TaskTraits& traits) const override;
void Shutdown() override;
void FlushForTesting() override;
void FlushAsyncForTesting(OnceClosure flush_callback) override;
void JoinForTesting() override;
void BeginFence() override;
void EndFence() override;
void BeginBestEffortFence() override;
void EndBestEffortFence() override;
void BeginRestrictedTasks() override;
void EndRestrictedTasks() override;
void BeginFizzlingBlockShutdownTasks() override;
void EndFizzlingBlockShutdownTasks() override;
// PooledTaskRunnerDelegate:
bool EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source) override;
void RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source) override;
void UpdatePriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) override;
void UpdateJobPriority(scoped_refptr<TaskSource> task_source,
TaskPriority priority) override;
// Returns the TimeTicks of the next task scheduled on ThreadPool (Now() if
// immediate, nullopt if none). This is thread-safe, i.e., it's safe if tasks
// are being posted in parallel with this call but such a situation obviously
// results in a race as to whether this call will see the new tasks in time.
std::optional<TimeTicks> NextScheduledRunTimeForTesting() const;
// Forces ripe delayed tasks to be posted (e.g. when time is mocked and
// advances faster than the real-time delay on ServiceThread).
void ProcessRipeDelayedTasksForTesting();
// Requests that all threads started by future ThreadPoolImpls in this process
// have a synchronous start (if |enabled|; cancels this behavior otherwise).
// Must be called while no ThreadPoolImpls are alive in this process. This is
// exposed here on this internal API rather than as a ThreadPoolInstance
// configuration param because only one internal test truly needs this.
static void SetSynchronousThreadStartForTesting(bool enabled);
// Posts |task| with a |delay| and specific |traits|. |delay| can be zero. For
// one off tasks that don't require a TaskRunner. Returns false if the task
// definitely won't run because of current shutdown state.
bool PostDelayedTask(const Location& from_here,
const TaskTraits& traits,
OnceClosure task,
TimeDelta delay);
// Returns a TaskRunner whose PostTask invocations result in scheduling tasks
// using |traits|. Tasks may run in any order and in parallel.
scoped_refptr<TaskRunner> CreateTaskRunner(const TaskTraits& traits);
// Returns a SequencedTaskRunner whose PostTask invocations result in
// scheduling tasks using |traits|. Tasks run one at a time in posting order.
scoped_refptr<SequencedTaskRunner> CreateSequencedTaskRunner(
const TaskTraits& traits);
// Returns a SingleThreadTaskRunner whose PostTask invocations result in
// scheduling tasks using |traits|. Tasks run on a single thread in posting
// order. If |traits| identifies an existing thread,
// SingleThreadTaskRunnerThreadMode::SHARED must be used.
scoped_refptr<SingleThreadTaskRunner> CreateSingleThreadTaskRunner(
const TaskTraits& traits,
SingleThreadTaskRunnerThreadMode thread_mode);
#if BUILDFLAG(IS_WIN)
// Returns a SingleThreadTaskRunner whose PostTask invocations result in
// scheduling tasks using |traits| in a COM Single-Threaded Apartment. Tasks
// run in the same Single-Threaded Apartment in posting order for the returned
// SingleThreadTaskRunner. If |traits| identifies an existing thread,
// SingleThreadTaskRunnerThreadMode::SHARED must be used.
scoped_refptr<SingleThreadTaskRunner> CreateCOMSTATaskRunner(
const TaskTraits& traits,
SingleThreadTaskRunnerThreadMode thread_mode);
#endif // BUILDFLAG(IS_WIN)
// Returns a task runner whose PostTask invocations result in scheduling tasks
// using |traits|. The priority in |traits| can be updated at any time via
// UpdateableSequencedTaskRunner::UpdatePriority(). An update affects all
// tasks posted to the task runner that aren't running yet. Tasks run one at a
// time in posting order.
//
// |traits| requirements:
// - base::ThreadPolicy must be specified if the priority of the task runner
// will ever be increased from BEST_EFFORT.
scoped_refptr<UpdateableSequencedTaskRunner>
CreateUpdateableSequencedTaskRunner(const TaskTraits& traits);
private:
// Invoked after |num_fences_| or |num_best_effort_fences_| is updated. Sets
// the CanRunPolicy in TaskTracker and wakes up workers as appropriate.
void UpdateCanRunPolicy();
const ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) const;
// ThreadGroup::Delegate:
ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override;
// Posts |task| to be executed by the appropriate thread group as part of
// |sequence|. This must only be called after |task| has gone through
// TaskTracker::WillPostTask() and after |task|'s delayed run time.
bool PostTaskWithSequenceNow(Task task, scoped_refptr<Sequence> sequence);
// PooledTaskRunnerDelegate:
bool PostTaskWithSequence(Task task,
scoped_refptr<Sequence> sequence) override;
bool ShouldYield(const TaskSource* task_source) override;
const std::string histogram_label_;
const std::unique_ptr<TaskTrackerImpl> task_tracker_;
ServiceThread service_thread_;
DelayedTaskManager delayed_task_manager_;
PooledSingleThreadTaskRunnerManager single_thread_task_runner_manager_;
std::unique_ptr<ThreadGroup> foreground_thread_group_;
std::unique_ptr<ThreadGroup> utility_thread_group_;
std::unique_ptr<ThreadGroup> background_thread_group_;
// Whether this TaskScheduler was started.
bool started_ GUARDED_BY_CONTEXT(sequence_checker_) = false;
// Whether the --disable-best-effort-tasks switch is preventing execution of
// BEST_EFFORT tasks until shutdown.
const bool has_disable_best_effort_switch_;
// Number of fences preventing execution of tasks of any/BEST_EFFORT priority.
int num_fences_ GUARDED_BY_CONTEXT(sequence_checker_) = 0;
int num_best_effort_fences_ GUARDED_BY_CONTEXT(sequence_checker_) = 0;
#if DCHECK_IS_ON()
// Set once JoinForTesting() has returned.
AtomicFlag join_for_testing_returned_;
#endif
#if BUILDFLAG(IS_WIN) && defined(COM_INIT_CHECK_HOOK_ENABLED)
// Provides COM initialization verification for supported builds.
base::win::ComInitCheckHook com_init_check_hook_;
#endif
// Asserts that operations occur in sequence with Start().
SEQUENCE_CHECKER(sequence_checker_);
TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_;
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_THREAD_POOL_THREAD_POOL_IMPL_H_