1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
media / fuchsia / common / vmo_buffer_writer_queue.h [blame]
// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MEDIA_FUCHSIA_COMMON_VMO_BUFFER_WRITER_QUEUE_H_
#define MEDIA_FUCHSIA_COMMON_VMO_BUFFER_WRITER_QUEUE_H_
#include <fuchsia/media/cpp/fidl.h>
#include <fuchsia/sysmem/cpp/fidl.h>
#include <deque>
#include <memory>
#include <vector>
#include "base/memory/weak_ptr.h"
#include "base/threading/thread_checker.h"
#include "media/base/media_export.h"
#include "media/fuchsia/common/stream_processor_helper.h"
#include "media/fuchsia/common/vmo_buffer.h"
namespace media {
class DecoderBuffer;
// A helper that keeps a queue of pending DecodeBuffers, writes them to a set of
// VmoBuffers and generates StreamProcessor packets.
class MEDIA_EXPORT VmoBufferWriterQueue {
public:
// Callback passed to StartSender(). |buffer| corresponds to the original
// buffer from which the |packet| was generated.
using SendPacketCB =
base::RepeatingCallback<void(const DecoderBuffer* buffer,
StreamProcessorHelper::IoPacket packet)>;
// Called when processing DecoderBuffer that's marked as end-of-stream.
using EndOfStreamCB = base::RepeatingClosure;
VmoBufferWriterQueue();
~VmoBufferWriterQueue();
VmoBufferWriterQueue(VmoBufferWriterQueue&) = delete;
VmoBufferWriterQueue& operator=(VmoBufferWriterQueue&) = delete;
// Enqueues buffer to the queue.
void EnqueueBuffer(scoped_refptr<DecoderBuffer> buffer);
// Sets the buffers to use and starts sending outgoing packets using
// |send_packet_cb|. |end_of_stream_cb| will be called when processing each
// end-of-stream buffer.
void Start(std::vector<VmoBuffer> buffers,
SendPacketCB send_packet_cb,
EndOfStreamCB end_of_stream_cb);
// Resets all pending buffers. Keeps the underlying sysmem buffers.
void ResetQueue();
// Resets the buffers. Keeps the current pending buffers, so they will still
// be sent once the new collection is allocated and passed to Start().
void ResetBuffers();
// Resets pending queue position to the start of the queue and pauses the
// writer. All pending buffers will be resent when Unpause() is called.
// This method is used to handle OnStreamFailed event received from
// StreamProcessor, particularly to handle NoKey error in CDM. When that event
// is received the StreamProcessor client should assumes that all queued
// packets were not processed. Once the error condition is resolved (e.g. by
// adding a new decryption key), the client should start a new stream and
// resend all failed packets, which is achieved by calling Unpause()
void ResetPositionAndPause();
// Resumes sending packets on stream that was previously paused with
// ResetPositionAndPause(). Should be called after starting a new stream in
// the StreamProcessor (e.g. by calling StreamProcessorHelper::Reset()).
void Unpause();
// Number of buffers in the sysmem collection or 0 if sysmem buffers has not
// been allocated (i.e. before Start()).
size_t num_buffers() const;
// Returns true of the queue is currently blocked, i.e. buffers passed
// to EnqueueBuffer() will not be sent immediately.
bool IsBlocked() const;
private:
struct PendingBuffer {
PendingBuffer(scoped_refptr<DecoderBuffer> buffer);
~PendingBuffer();
PendingBuffer(PendingBuffer&& other);
PendingBuffer& operator=(PendingBuffer&& other) = default;
const uint8_t* data() const;
size_t bytes_left() const;
void AdvanceCurrentPos(size_t bytes);
scoped_refptr<DecoderBuffer> buffer;
size_t buffer_pos = 0;
// Set to true when the consumer has finished processing the buffer and it
// can be released.
bool is_complete = false;
// Index of the last buffer in the sysmem buffer collection that was used to
// send this input buffer. Should be set only when |bytes_left()==0|.
std::optional<size_t> tail_sysmem_buffer_index;
};
class SysmemBuffer;
// Pumps pending buffers to SendPacketCB.
void PumpPackets();
// Callback called when a packet is destroyed. It marks the buffer as unused
// and tries to reuse it for other buffers if any.
void ReleaseBuffer(size_t buffer_index);
// Buffers that are waiting to be sent. A buffer is removed from the queue
// when it and all previous buffers have finished decoding.
std::deque<PendingBuffer> pending_buffers_;
// Position of the current buffer in |pending_buffers_|.
size_t input_queue_position_ = 0;
// Indicates that the stream is paused and no packets should be sent until
// Unpause() is called.
bool is_paused_ = false;
// Buffers for sysmem buffer collection. Empty until Start() is called.
std::vector<VmoBuffer> buffers_;
// Usd to store indices of the buffers that are not being used currently.
std::vector<size_t> unused_buffers_;
SendPacketCB send_packet_cb_;
EndOfStreamCB end_of_stream_cb_;
// FIDL interfaces are thread-affine (see crbug.com/1012875).
THREAD_CHECKER(thread_checker_);
base::WeakPtrFactory<VmoBufferWriterQueue> weak_factory_{this};
};
} // namespace media
#endif // MEDIA_FUCHSIA_COMMON_VMO_BUFFER_WRITER_QUEUE_H_