1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
   14
   15
   16
   17
   18
   19
   20
   21
   22
   23
   24
   25
   26
   27
   28
   29
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   50
   51
   52
   53
   54
   55
   56
   57
   58
   59
   60
   61
   62
   63
   64
   65
   66
   67
   68
   69
   70
   71
   72
   73
   74
   75
   76
   77
   78
   79
   80
   81
   82
   83
   84
   85
   86
   87
   88
   89
   90
   91
   92
   93
   94
   95
   96
   97
   98
   99
  100
  101
  102
  103
  104
  105
  106
  107
  108
  109
  110
  111
  112
  113
  114
  115
  116
  117
  118
  119
  120
  121
  122
  123
  124
  125
  126
  127
  128
  129
  130
  131
  132
  133
  134
  135
  136
  137
  138
  139
  140
  141
  142
  143
  144
  145
  146
  147
  148
  149
  150
  151
  152
  153
  154
  155
  156
  157
  158
  159
  160
  161
  162
  163
  164
  165
  166
  167
  168
  169
  170
  171
  172
  173
  174
  175
  176
  177
  178
  179
  180
  181
  182
  183
  184
  185
  186
  187
  188
  189
  190
  191
  192
  193
  194
  195
  196
  197
  198
  199
  200
  201
  202
  203
  204
  205
  206
  207
  208

content / browser / byte_stream.h [blame]

// Copyright 2012 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef CONTENT_BROWSER_BYTE_STREAM_H_
#define CONTENT_BROWSER_BYTE_STREAM_H_

#include <stddef.h>

#include <memory>

#include "base/functional/callback.h"
#include "base/memory/ref_counted.h"
#include "content/common/content_export.h"
#include "content/public/browser/browser_task_traits.h"
#include "content/public/browser/browser_thread.h"
#include "net/base/io_buffer.h"

namespace base {
class SequencedTaskRunner;
}

namespace content {

// A byte stream is a pipe to transfer bytes between a source and a
// sink, which may be on different threads.  It is intended to be the
// only connection between source and sink; they need have no
// direct awareness of each other aside from the byte stream.  The source and
// the sink have different interfaces to a byte stream, |ByteStreamWriter|
// and |ByteStreamReader|.  A pair of connected interfaces is generated by
// calling |CreateByteStream|.
//
// The source adds bytes to the bytestream via |ByteStreamWriter::Write|
// and the sink retrieves bytes already written via |ByteStreamReader::Read|.
//
// When the source has no more data to add, it will call
// |ByteStreamWriter::Close| to indicate that.  Operation status at the source
// is indicated to the sink via an int passed to the Close() method and returned
// from the GetStatus() method. Source and sink must agree on the interpretation
// of this int.
//
// Normally the source is not managed after the relationship is setup;
// it is expected to provide data and then close itself.  If an error
// occurs on the sink, it is not signalled to the source via this
// mechanism; instead, the source will write data until it exausts the
// available space.  If the source needs to be aware of errors occuring
// on the sink, this must be signalled in some other fashion (usually
// through whatever controller setup the relationship).
//
// Callback lifetime management: No lifetime management is done in this
// class to prevent registered callbacks from being called after any
// objects to which they may refer have been destroyed.  It is the
// responsibility of the callers to avoid use-after-free references.
// This may be done by any of several mechanisms, including weak
// pointers, scoped_refptr references, or calling the registration
// function with a null callback from a destructor.  To enable the null
// callback strategy, callbacks will not be stored between retrieval and
// evaluation, so setting a null callback will guarantee that the
// previous callback will not be executed after setting.
//
// Class methods are virtual to allow mocking for tests; these classes
// aren't intended to be base classes for other classes.
//
// Sample usage (note that this does not show callback usage):
//
//    void OriginatingClass::Initialize() {
//      // Create a stream for sending bytes from IO->FILE threads.
//      std::unique_ptr<ByteStreamWriter> writer;
//      std::unique_ptr<ByteStreamReader> reader;
//      CreateByteStream(
//          GetIOThreadTaskRunner({}),
//          base::ThreadPool::CreateSequencedTaskRunner({base::MayBlock, ...}),
//          kStreamBufferSize /* e.g. 10240.  */,
//          &writer,
//          &reader);         // Presumed passed to FILE thread for reading.
//
//      // Setup callback for writing.
//      writer->RegisterCallback(base::BindRepeating(&SpaceAvailable, this));
//
//      // Do initial round of writing.
//      SpaceAvailable();
//    }
//
//    // May only be run on first argument task runner, in this case the IO
//    // thread.
//    void OriginatingClass::SpaceAvailable() {
//      while (<data available>) {
//        std::unique_ptr<net::IOBuffer> buffer;
//        size_t buffer_length;
//        // Create IOBuffer, fill in with data, and set buffer_length.
//        if (!writer->Write(buffer, buffer_length)) {
//          // No more space; return and we'll be called again
//          // when there is space.
//          return;
//        }
//      }
//      writer->Close(<operation status>);
//      writer.reset(NULL);
//    }
//
//    // On File thread; containing class setup not shown.
//
//    void ReceivingClass::Initialize() {
//      // Initialization
//      reader->RegisterCallback(base::BindRepeating(&DataAvailable, obj));
//    }
//
//    // Called whenever there's something to read.
//    void ReceivingClass::DataAvailable() {
//      scoped_refptr<net::IOBuffer> data;
//      size_t length = 0;
//
//      while (ByteStreamReader::STREAM_HAS_DATA ==
//             (state = reader->Read(&data, &length))) {
//        // Process |data|.
//      }
//
//      if (ByteStreamReader::STREAM_COMPLETE == state) {
//        int status = reader->GetStatus();
//        // Process error or successful completion in |status|.
//      }
//
//      // if |state| is STREAM_EMPTY, we're done for now; we'll be called
//      // again when there's more data.
//    }
class CONTENT_EXPORT ByteStreamWriter {
 public:
  // Inverse of the fraction of the stream buffer that must be full before
  // a notification is sent to paired Reader that there's more data.
  static const int kFractionBufferBeforeSending;

  virtual ~ByteStreamWriter() = 0;

  // Always adds the data passed into the ByteStream.  Returns true
  // if more data may be added without exceeding the class limit
  // on data.  Takes ownership of |buffer|.
  virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
                     size_t byte_count) = 0;

  // Flushes contents buffered in this writer to the corresponding reader
  // regardless if buffer filling rate is greater than
  // kFractionBufferBeforeSending or not. Does nothing if there's no contents
  // buffered.
  virtual void Flush() = 0;

  // Signal that all data that is going to be sent, has been sent,
  // and provide a status.
  virtual void Close(int status) = 0;

  // Register a callback to be called when the stream transitions from
  // full to having space available.  The callback will always be
  // called on the task runner associated with the ByteStreamWriter.
  // This callback will only be called if a call to Write has previously
  // returned false (i.e. the ByteStream has been filled).
  // Multiple calls to this function are supported, though note that it
  // is the callers responsibility to handle races with space becoming
  // available (i.e. in the case of that race either of the before
  // or after callbacks may be called).
  // The callback will not be called after ByteStreamWriter destruction.
  virtual void RegisterCallback(base::RepeatingClosure source_callback) = 0;

  // Returns the number of bytes sent to the reader but not yet reported by
  // the reader as read.
  virtual size_t GetTotalBufferedBytes() const = 0;
};

class CONTENT_EXPORT ByteStreamReader {
 public:
  // Inverse of the fraction of the stream buffer that must be empty before
  // a notification is send to paired Writer that there's more room.
  static const int kFractionReadBeforeWindowUpdate;

  enum StreamState { STREAM_EMPTY, STREAM_HAS_DATA, STREAM_COMPLETE };

  virtual ~ByteStreamReader() = 0;

  // Returns STREAM_EMPTY if there is no data on the ByteStream and
  // Close() has not been called, and STREAM_COMPLETE if there
  // is no data on the ByteStream and Close() has been called.
  // If there is data on the ByteStream, returns STREAM_HAS_DATA
  // and fills in |*data| with a pointer to the data, and |*length|
  // with its length.
  virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
                           size_t* length) = 0;

  // Only valid to call if Read() has returned STREAM_COMPLETE.
  virtual int GetStatus() const = 0;

  // Register a callback to be called when data is added or the source
  // completes.  The callback will be always be called on the owning
  // task runner.  Multiple calls to this function are supported,
  // though note that it is the callers responsibility to handle races
  // with data becoming available (i.e. in the case of that race
  // either of the before or after callbacks may be called).
  // The callback will not be called after ByteStreamReader destruction.
  virtual void RegisterCallback(base::RepeatingClosure sink_callback) = 0;
};

CONTENT_EXPORT void CreateByteStream(
    scoped_refptr<base::SequencedTaskRunner> input_task_runner,
    scoped_refptr<base::SequencedTaskRunner> output_task_runner,
    size_t buffer_size,
    std::unique_ptr<ByteStreamWriter>* input,
    std::unique_ptr<ByteStreamReader>* output);

}  // namespace content

#endif  // CONTENT_BROWSER_BYTE_STREAM_H_