1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
   14
   15
   16
   17
   18
   19
   20
   21
   22
   23
   24
   25
   26
   27
   28
   29
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   60
   61
   62
   63
   64
   65
   66
   67
   68
   69
   70
   71
   72
   73
   74
   75
   76
   77
   78
   79
   80
   81
   82
   83
   84
   85
   86
   87
   88
   89
   90
   91
   92
   93
   94
   95
   96
   97
   98
   99
  100
  101
  102
  103
  104
  105
  106
  107
  108
  109
  110
  111
  112
  113
  114
  115
  116
  117
  118
  119
  120
  121
  122
  123
  124
  125
  126
  127
  128
  129
  130
  131
  132
  133
  134
  135
  136
  137
  138
  139
  140
  141
  142
  143
  144
  145
  146
  147
  148
  149
  150
  151
  152
  153
  154
  155
  156
  157
  158
  159
  160
  161
  162
  163
  164
  165
  166
  167
  168
  169
  170
  171
  172
  173
  174
  175
  176
  177
  178
  179
  180
  181
  182
  183
  184
  185
  186
  187
  188
  189
  190
  191
  192
  193
  194
  195
  196
  197
  198
  199
  200
  201
  202
  203
  204
  205
  206
  207
  208

base / tracing / perfetto_task_runner_unittest.cc [blame]

// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/tracing/perfetto_task_runner.h"

#include <memory>
#include <utility>
#include <vector>

#include "base/files/scoped_file.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/thread_pool.h"
#include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "base/threading/simple_thread.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h"

#if BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)
#include <sys/socket.h>
#include <sys/types.h>
#include "base/posix/eintr_wrapper.h"
#endif

namespace base {
namespace tracing {
namespace {

class TaskDestination {
 public:
  TaskDestination(size_t number_of_sequences,
                  size_t expected_tasks,
                  base::OnceClosure on_complete)
      : expected_tasks_(expected_tasks),
        on_complete_(std::move(on_complete)),
        last_task_id_(number_of_sequences) {}

  size_t tasks_run() const { return tasks_run_; }

  void TestTask(int n, size_t sequence_number = 0) {
    EXPECT_LT(sequence_number, last_task_id_.size());
    EXPECT_GT(expected_tasks_, tasks_run_);
    EXPECT_GE(n, last_task_id_[sequence_number]);
    last_task_id_[sequence_number] = n;

    if (++tasks_run_ == expected_tasks_) {
      std::move(on_complete_).Run();
    }
  }

  base::WeakPtr<TaskDestination> GetWeakPtr() {
    return weak_ptr_factory_.GetWeakPtr();
  }

 private:
  const size_t expected_tasks_;
  base::OnceClosure on_complete_;
  std::vector<int> last_task_id_;
  size_t tasks_run_ = 0;

  base::WeakPtrFactory<TaskDestination> weak_ptr_factory_{this};
};

class PosterThread : public base::SimpleThread {
 public:
  PosterThread(PerfettoTaskRunner* task_runner,
               base::WeakPtr<TaskDestination> weak_ptr,
               int n,
               size_t sequence_number)
      : SimpleThread("TaskPostThread"),
        task_runner_(task_runner),
        weak_ptr_(weak_ptr),
        n_(n),
        sequence_number_(sequence_number) {}
  ~PosterThread() override = default;

  // base::SimpleThread overrides.
  void BeforeStart() override {}
  void BeforeJoin() override {}

  void Run() override {
    for (int i = 0; i < n_; ++i) {
      auto weak_ptr = weak_ptr_;
      auto sequence_number = sequence_number_;
      task_runner_->PostTask([weak_ptr, i, sequence_number] {
        weak_ptr->TestTask(i, sequence_number);
      });
    }
  }

 private:
  raw_ptr<PerfettoTaskRunner> task_runner_;
  base::WeakPtr<TaskDestination> weak_ptr_;
  const int n_;
  const size_t sequence_number_;
};

class PerfettoTaskRunnerTest : public testing::Test {
 public:
  void SetUp() override {
    sequenced_task_runner_ = CreateNewTaskrunner();
    task_runner_ = std::make_unique<PerfettoTaskRunner>(sequenced_task_runner_);
  }

  scoped_refptr<base::SequencedTaskRunner> CreateNewTaskrunner() {
    return base::ThreadPool::CreateSingleThreadTaskRunner(
        {base::MayBlock()}, base::SingleThreadTaskRunnerThreadMode::DEDICATED);
  }
  void SetTaskExpectations(base::OnceClosure on_complete,
                           size_t expected_tasks,
                           size_t number_of_sequences = 1) {
    task_destination_ = std::make_unique<TaskDestination>(
        number_of_sequences, expected_tasks, std::move(on_complete));
  }

  void TearDown() override {
    sequenced_task_runner_->DeleteSoon(FROM_HERE, std::move(task_runner_));
  }

  PerfettoTaskRunner* task_runner() { return task_runner_.get(); }
  TaskDestination* destination() { return task_destination_.get(); }
  base::test::TaskEnvironment& task_environment() { return task_environment_; }

 private:
  base::test::TaskEnvironment task_environment_;
  scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner_;
  std::unique_ptr<PerfettoTaskRunner> task_runner_;
  std::unique_ptr<TaskDestination> task_destination_;
};

TEST_F(PerfettoTaskRunnerTest, SequentialTasks) {
  base::RunLoop wait_for_tasks;
  SetTaskExpectations(wait_for_tasks.QuitClosure(), 3);

  auto weak_ptr = destination()->GetWeakPtr();
  for (int i = 1; i <= 3; ++i) {
    task_runner()->PostTask([=]() mutable {
      auto* dest = weak_ptr.get();
      // The weak pointer must be reset before TestTask() is called, otherwise
      // there will be a race where the factory could be destructed on main
      // thread while still bound to the task runner sequence.
      weak_ptr.reset();
      dest->TestTask(i);
    });
  }

  wait_for_tasks.Run();
}

#if BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)
// Tests file descriptor reuse that causes crashes.
TEST_F(PerfettoTaskRunnerTest, FileDescriptorReuse) {
  int sockets[2];
  // Use sockets because we need a FD that supports epoll().
  ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, sockets));
  base::ScopedFD fd(sockets[0]), write_fd(sockets[1]);
  ASSERT_TRUE(fd.is_valid());

  constexpr int data_value = 0x12ab34cd;
  bool run_callback_1 = false, run_callback_2 = false;
  int data = data_value;
  constexpr ssize_t data_size = static_cast<ssize_t>(sizeof(data));

  // Trigger the file descriptor watcher callback.
  ASSERT_EQ(data_size, HANDLE_EINTR(write(write_fd.get(), &data, data_size)));
  data = 0;

  base::RunLoop run_loop;

  task_runner()->PostTask(
      [&] {
        // The 1st add operation posts a task.
        task_runner()->AddFileDescriptorWatch(fd.get(), [&] {
          run_callback_1 = true;
          ASSERT_EQ(data_size, HANDLE_EINTR(read(fd.get(), &data, data_size)));
          run_loop.Quit();
        });
        // Remove so the 2nd add operation can succeed.
        task_runner()->RemoveFileDescriptorWatch(fd.get());

        // Simulate FD reuse. The 2nd add operation also posts a task.
        task_runner()->AddFileDescriptorWatch(fd.get(), [&] {
          run_callback_2 = true;
          ASSERT_EQ(data_size, HANDLE_EINTR(read(fd.get(), &data, data_size)));
          run_loop.Quit();
        });
      });

  // Make all posted tasks run.
  run_loop.Run();

  ASSERT_FALSE(run_callback_1);
  ASSERT_TRUE(run_callback_2);
  ASSERT_EQ(data, data_value);

  task_runner()->PostTask([&] {
    // Cleanup the FD watcher.
    task_runner()->RemoveFileDescriptorWatch(fd.get());
  });
  task_environment().RunUntilIdle();
}
#endif
}  // namespace
}  // namespace tracing
}  // namespace base