1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
   14
   15
   16
   17
   18
   19
   20
   21
   22
   23
   24
   25
   26
   27
   28
   29
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   60
   61
   62
   63
   64
   65
   66
   67
   68
   69
   70
   71
   72
   73
   74
   75
   76
   77
   78
   79
   80
   81
   82
   83
   84
   85
   86
   87
   88
   89
   90
   91
   92
   93
   94
   95
   96
   97
   98
   99
  100
  101
  102
  103
  104
  105
  106
  107
  108
  109
  110
  111
  112
  113
  114
  115
  116
  117
  118
  119
  120
  121
  122
  123
  124
  125
  126
  127
  128
  129
  130
  131
  132
  133
  134
  135
  136
  137
  138
  139
  140
  141
  142
  143
  144
  145
  146
  147
  148
  149
  150
  151
  152
  153
  154
  155
  156
  157
  158
  159
  160
  161
  162
  163
  164
  165
  166
  167
  168
  169
  170
  171
  172
  173
  174
  175
  176
  177
  178
  179
  180
  181
  182
  183
  184
  185
  186
  187
  188
  189
  190
  191
  192
  193
  194
  195
  196
  197
  198
  199
  200
  201
  202
  203
  204
  205
  206
  207
  208
  209
  210
  211
  212

base / task / thread_pool / sequence.h [blame]

// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef BASE_TASK_THREAD_POOL_SEQUENCE_H_
#define BASE_TASK_THREAD_POOL_SEQUENCE_H_

#include <stddef.h>

#include "base/base_export.h"
#include "base/compiler_specific.h"
#include "base/containers/intrusive_heap.h"
#include "base/containers/queue.h"
#include "base/sequence_token.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/pooled_parallel_task_runner.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/task_source.h"
#include "base/task/thread_pool/task_source_sort_key.h"
#include "base/thread_annotations.h"
#include "base/threading/sequence_local_storage_map.h"

namespace base {
namespace internal {

// A Sequence is intended to hold delayed tasks and immediate tasks.
// Delayed tasks are held in a prority_queue until they are ripe and
// immediate tasks in a simple fifo queue.
// When Sequence::TakeTask is called, we select the next appropriate task
// from both queues and return it.
// Each queue holds slots each containing up to a single Task that must be
// executed in posting/runtime order.
//
// In comments below, an "empty Sequence" is a Sequence with no slot.
//
// Note: there is a known refcounted-ownership cycle in the Scheduler
// architecture: Sequence -> Task -> TaskRunner -> Sequence -> ...
// This is okay so long as the other owners of Sequence (PriorityQueue and
// WorkerThread in alternation and
// ThreadGroup::WorkerThreadDelegateImpl::GetWork()
// temporarily) keep running it (and taking Tasks from it as a result). A
// dangling reference cycle would only occur should they release their reference
// to it while it's not empty. In other words, it is only correct for them to
// release it after PopTask() returns false to indicate it was made empty by
// that call (in which case the next PushImmediateTask() will return true to
// indicate to the caller that the Sequence should be re-enqueued for
// execution). This class is thread-safe.
class BASE_EXPORT Sequence : public TaskSource {
 public:
  // A Transaction can perform multiple operations atomically on a
  // Sequence. While a Transaction is alive, it is guaranteed that nothing
  // else will access the Sequence; the Sequence's lock is held for the
  // lifetime of the Transaction.
  class BASE_EXPORT Transaction : public TaskSource::Transaction {
   public:
    Transaction(Transaction&& other);
    Transaction(const Transaction&) = delete;
    Transaction& operator=(const Transaction&) = delete;
    ~Transaction();

    // Returns true if the sequence must be added to the immediate queue after
    // receiving a new immediate Task in order to be scheduled. If the caller
    // doesn't want the sequence to be scheduled, it may not add the sequence to
    // the immediate queue even if this returns true.
    bool WillPushImmediateTask();

    // Adds immediate |task| to the end of this sequence.
    void PushImmediateTask(Task task);

    // Adds a delayed |task| in this sequence, and returns true if the sequence
    // needs to be re-enqueued in the delayed queue as a result of this
    // sequence's delayed sort key changing.
    bool PushDelayedTask(Task task);

    Sequence* sequence() const { return static_cast<Sequence*>(task_source()); }

   private:
    friend class Sequence;

    explicit Transaction(Sequence* sequence);
  };

  // |traits| is metadata that applies to all Tasks in the Sequence.
  // |task_runner| is a reference to the TaskRunner feeding this TaskSource.
  // |task_runner| can be nullptr only for tasks with no TaskRunner, in which
  // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
  // execution mode of |task_runner|.
  Sequence(const TaskTraits& traits,
           SequencedTaskRunner* task_runner,
           TaskSourceExecutionMode execution_mode);
  Sequence(const Sequence&) = delete;
  Sequence& operator=(const Sequence&) = delete;

  // Begins a Transaction. This method cannot be called on a thread which has an
  // active Sequence::Transaction.
  [[nodiscard]] Transaction BeginTransaction();

  // TaskSource:
  ExecutionEnvironment GetExecutionEnvironment() override;
  size_t GetRemainingConcurrency() const override;
  TaskSourceSortKey GetSortKey() const override;
  TimeTicks GetDelayedSortKey() const override;

  // Returns a token that uniquely identifies this Sequence.
  const SequenceToken& token() const LIFETIME_BOUND { return token_; }

  SequenceLocalStorageMap* sequence_local_storage() {
    return &sequence_local_storage_;
  }

  bool OnBecomeReady() override;

  bool has_worker_for_testing() const NO_THREAD_SAFETY_ANALYSIS {
    return has_worker_;
  }
  bool is_immediate_for_testing() const { return is_immediate_; }
  bool IsEmptyForTesting() const NO_THREAD_SAFETY_ANALYSIS { return IsEmpty(); }

  // A reference to TaskRunner is only retained between
  // PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns
  // false, guaranteeing it is safe to dereference this pointer. Otherwise, the
  // caller should guarantee such TaskRunner still exists before dereferencing.
  SequencedTaskRunner* task_runner() const { return task_runner_; }

 private:
  ~Sequence() override;

  struct DelayedTaskGreater {
    bool operator()(const Task& lhs, const Task& rhs) const;
  };

  // TaskSource:
  RunStatus WillRunTask() override;
  Task TakeTask(TaskSource::Transaction* transaction) override;
  std::optional<Task> Clear(TaskSource::Transaction* transaction) override;
  bool DidProcessTask(TaskSource::Transaction* transaction) override;
  bool WillReEnqueue(TimeTicks now,
                     TaskSource::Transaction* transaction) override;

  // Returns true if the delayed task to be posted will cause the delayed sort
  // key to change.
  bool DelayedSortKeyWillChange(const Task& delayed_task) const
      EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Selects the earliest task to run, either from immediate or
  // delayed queue and return it.
  // Expects this sequence to have at least one task that can run
  // immediately.
  Task TakeEarliestTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Get and return next task from immediate queue
  Task TakeNextImmediateTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Update the next earliest/latest ready time.
  void UpdateReadyTimes() EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Returns true if there are immediate tasks
  bool HasImmediateTasks() const EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Returns true if tasks ready to be executed
  bool HasReadyTasks(TimeTicks now) const override;

  bool IsEmpty() const EXCLUSIVE_LOCKS_REQUIRED(lock_);

  // Releases reference to TaskRunner.
  void ReleaseTaskRunner();

  const SequenceToken token_ = SequenceToken::Create();

  // A pointer to the TaskRunner that posts to this TaskSource, if any. The
  // derived class is responsible for calling AddRef() when a TaskSource from
  // which no Task is executing becomes non-empty and Release() when
  // it becomes empty again (e.g. when DidProcessTask() returns false).
  //
  // In practise, this pointer is going to become dangling. See task_runner()
  // comment.
  raw_ptr<SequencedTaskRunner, DisableDanglingPtrDetection> task_runner_;

  // Queues of tasks to execute.
  base::queue<Task> queue_ GUARDED_BY(lock_);
  base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue_
      GUARDED_BY(lock_);

  // Caches the latest/earliest ready time for atomic access. Writes are
  // protected by |lock_|, but allows atomic reads outside of |lock_|. If this
  // sequence is empty, these are in an unknown state and shouldn't be read.

  // Minimum of latest_delayed_run_time() of next delayed task if any, and
  // |queue_time| of next immediate task if any.
  std::atomic<TimeTicks> latest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
  // is_null() if there is an immediate task, or earliest_delayed_run_time() of
  // next delayed task otherwise.
  std::atomic<TimeTicks> earliest_ready_time_ GUARDED_BY(lock_){TimeTicks()};

  // True if a worker is currently associated with a Task from this Sequence.
  bool has_worker_ = false;

  // True if the sequence has ready tasks and requested to be queued as such
  // through WillPushImmediateTask() or OnBecomeReady(). Reset to false once all
  // ready tasks are done being processed and either DidProcessTask() or
  // WillReEnqueue() returned false. Normally, |is_immediate_| is protected by
  // |lock_|, except in OnBecomeReady() hence the use of atomics.
  std::atomic_bool is_immediate_{false};

  // Holds data stored through the SequenceLocalStorageSlot API.
  SequenceLocalStorageMap sequence_local_storage_;
};

}  // namespace internal
}  // namespace base

#endif  // BASE_TASK_THREAD_POOL_SEQUENCE_H_