1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
   14
   15
   16
   17
   18
   19
   20
   21
   22
   23
   24
   25
   26
   27
   28
   29
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   60
   61
   62
   63
   64
   65
   66
   67
   68
   69
   70
   71
   72
   73
   74
   75
   76
   77
   78
   79
   80
   81
   82
   83
   84
   85
   86
   87
   88
   89
   90
   91
   92
   93
   94
   95
   96
   97
   98
   99
  100
  101
  102
  103
  104
  105
  106
  107
  108
  109
  110
  111
  112
  113
  114
  115
  116
  117
  118
  119
  120
  121
  122
  123
  124
  125
  126
  127
  128
  129
  130
  131
  132
  133
  134
  135
  136
  137
  138
  139
  140
  141
  142
  143
  144
  145
  146
  147
  148
  149
  150
  151
  152
  153
  154
  155
  156
  157
  158
  159
  160
  161
  162
  163
  164
  165
  166
  167
  168
  169
  170
  171
  172
  173
  174
  175
  176
  177
  178
  179
  180
  181
  182
  183
  184
  185
  186
  187
  188
  189
  190
  191
  192
  193
  194
  195
  196
  197
  198
  199
  200
  201
  202
  203
  204
  205

mojo / core / ipcz_driver / data_pipe.h [blame]

// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef MOJO_CORE_IPCZ_DRIVER_DATA_PIPE_H_
#define MOJO_CORE_IPCZ_DRIVER_DATA_PIPE_H_

#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>

#include "base/containers/span.h"
#include "base/memory/scoped_refptr.h"
#include "base/synchronization/lock.h"
#include "mojo/core/ipcz_driver/object.h"
#include "mojo/core/ipcz_driver/ring_buffer.h"
#include "mojo/core/ipcz_driver/shared_buffer.h"
#include "mojo/core/scoped_ipcz_handle.h"
#include "mojo/public/c/system/data_pipe.h"
#include "mojo/public/c/system/types.h"
#include "third_party/ipcz/include/ipcz/ipcz.h"

namespace mojo::core::ipcz_driver {

class Transport;

// DataPipe implements a Mojo data pipe producer or consumer endpoint by
// wrapping a shared memory ring buffer and using ipcz portals to communicate
// read and write quantities end-to-end.
//
// TODO(crbug.com/40058840): Once everything is transitioned to mojo-ipcz
// this object (and builtin data pipe bindings support in general) can be
// deprecated in favor of a mojom-based library implementation of data pipes,
// built directly on ipcz portals. For now they're implemented as ipcz driver
// objects so they can continue to be represented on the wire as a single Mojo
// handle.
class DataPipe : public Object<DataPipe> {
 public:
  enum class EndpointType : uint32_t {
    kProducer,
    kConsumer,
  };

  struct Config {
    // The size of each "element" in bytes. Relevant for Mojo data pipe APIs
    // which read in write in terms of element counts.
    size_t element_size;

    // The total byte capacity of the data pipe. This is a best-effort limit on
    // the number of unread bytes allowed to accumulate at the consumer before
    // the producer waits to produce more data.
    size_t byte_capacity;

    // Indicates whether the peer is known to be closed.
    bool is_peer_closed;
  };

  // A wrapper for the DataPipe's underlying portal, used for thread-safe portal
  // ownership and access.
  class PortalWrapper : public base::RefCountedThreadSafe<PortalWrapper> {
   public:
    explicit PortalWrapper(ScopedIpczHandle portal);
    PortalWrapper(const PortalWrapper&) = delete;
    void operator=(const PortalWrapper&) = delete;

    IpczHandle handle() const { return handle_.get(); }
    void set_handle(ScopedIpczHandle handle) { handle_ = std::move(handle); }

    ScopedIpczHandle TakeHandle() { return std::move(handle_); }

   private:
    friend class base::RefCountedThreadSafe<PortalWrapper>;

    ~PortalWrapper();

    ScopedIpczHandle handle_;
  };

  // Constructs a partial DataPipe endpoint of type `endpoint_type`, configured
  // according to `config`, and using `buffer` for the underlying transfer
  // buffer. `mapping` must be a valid mapping of all the memory referenced by
  // `buffer`.
  //
  // This DataPipe is not usable until it's given a portal via AdoptPortal().
  DataPipe(EndpointType endpoint_type,
           const Config& config,
           scoped_refptr<SharedBuffer> buffer,
           scoped_refptr<SharedBufferMapping> mapping);

  static Type object_type() { return kDataPipe; }

  // Constructs a new pair of DataPipe endpoints, one for reading and one for
  // writing. May fail and return null if the data pipe's shared memory backing
  // could not be allocated.
  struct Pair {
    Pair();

    // Move-only type to avoid ref-chrun on unintentional copy.
    Pair(const Pair&) = delete;
    Pair(Pair&&);
    Pair& operator=(const Pair&) = delete;
    Pair& operator=(Pair&&);
    ~Pair();

    scoped_refptr<DataPipe> consumer;
    scoped_refptr<DataPipe> producer;
  };
  static std::optional<Pair> CreatePair(const Config& config);

  bool is_producer() const { return endpoint_type_ == EndpointType::kProducer; }
  bool is_consumer() const { return endpoint_type_ == EndpointType::kConsumer; }

  // Provides this DataPipe instance with a portal to own and use for I/O. Must
  // only be called on a DataPipe that does not already have a portal. Returns
  // true if successful or false if `portal` is not a valid portal handle.
  bool AdoptPortal(ScopedIpczHandle portal);

  // Returns a reference to the underlying portal which can be safely used from
  // any thread. May return null if no portal has been adopted by this DataPipe
  // yet.
  scoped_refptr<PortalWrapper> GetPortal();

  // Takes ownership of the DataPipe's portal (for serialization) and returns
  // the handle to it.
  ScopedIpczHandle TakePortal();

  // Implements Mojo's WriteData API.
  MojoResult WriteData(const void* elements,
                       uint32_t& num_bytes,
                       MojoWriteDataFlags flags);
  MojoResult BeginWriteData(void*& data,
                            uint32_t& num_bytes,
                            MojoBeginWriteDataFlags flags);
  MojoResult EndWriteData(size_t num_bytes_produced);
  MojoResult ReadData(void* elements,
                      uint32_t& num_bytes,
                      MojoReadDataFlags flags);
  MojoResult BeginReadData(const void*& buffer, uint32_t& buffer_num_bytes);
  MojoResult EndReadData(size_t num_bytes_consumed);

  // ObjectBase:
  void Close() override;
  bool IsSerializable() const override;
  bool GetSerializedDimensions(Transport& transmitter,
                               size_t& num_bytes,
                               size_t& num_handles) override;
  bool Serialize(Transport& transmitter,
                 base::span<uint8_t> data,
                 base::span<PlatformHandle> handles) override;

  static scoped_refptr<DataPipe> Deserialize(
      base::span<const uint8_t> data,
      base::span<PlatformHandle> handles);

  // Returns Mojo signals to reflect the effective state of this DataPipe and
  // its control portal within `signals_state`. Returns true on success or false
  // if the DataPipe's signal state is unspecified due to impending closure. In
  // the latter case `signals_state` is zeroed out.
  bool GetSignals(MojoHandleSignalsState& signals_state);

  // Flushes any incoming status updates from the peer. Note that this may
  // trigger trap events before returning, since it can modify the state of the
  // control portal.
  void FlushUpdatesFromPeer() LOCKS_EXCLUDED(lock_);

 private:
  ~DataPipe() override;

  bool DeserializeRingBuffer(const RingBuffer::SerializedState& state);

  const EndpointType endpoint_type_;
  const size_t element_size_;

  mutable base::Lock lock_;

  // A portal used to transfer control messages between producer and consumer.
  // Ref-counted separately since this object needs to maintain thread-safe
  // access and ensure that Close() doesn't race with other operations on the
  // underlying portal.
  scoped_refptr<PortalWrapper> portal_ GUARDED_BY(lock_);

  // Owns a reference to the underlying shared memory region, and manages this
  // data pipe endpoint's local cache of the buffer state.
  scoped_refptr<SharedBuffer> buffer_ GUARDED_BY(lock_);
  RingBuffer data_ GUARDED_BY(lock_);
  std::optional<RingBuffer::DirectWriter> two_phase_writer_;
  std::optional<RingBuffer::DirectReader> two_phase_reader_;

  // Indicates whether this endpoint is in the process of being serialized and
  // transmitted elsewhere.
  bool in_transit_ GUARDED_BY(lock_) = false;

  // Indicates whether the peer endpoint is known to be closed.
  bool is_peer_closed_ GUARDED_BY(lock_) = false;

  // This loosely tracks whether new data has arrived since the last ReadData or
  // BeginReadData attempt.
  bool has_new_data_ GUARDED_BY(lock_) = false;
};

}  // namespace mojo::core::ipcz_driver

#endif  // MOJO_CORE_IPCZ_DRIVER_DATA_PIPE_H_