OLD | NEW |
| (Empty) |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "media/remoting/remote_demuxer_stream_adapter.h" | |
6 | |
7 #include <memory> | |
8 #include <vector> | |
9 | |
10 #include "base/callback_helpers.h" | |
11 #include "base/run_loop.h" | |
12 #include "media/base/decoder_buffer.h" | |
13 #include "media/base/demuxer_stream.h" | |
14 #include "media/remoting/fake_remoting_controller.h" | |
15 #include "media/remoting/fake_remoting_demuxer_stream_provider.h" | |
16 #include "media/remoting/rpc/proto_utils.h" | |
17 #include "testing/gmock/include/gmock/gmock.h" | |
18 #include "testing/gtest/include/gtest/gtest.h" | |
19 | |
20 using testing::_; | |
21 using testing::Invoke; | |
22 using testing::Return; | |
23 | |
24 namespace media { | |
25 namespace remoting { | |
26 | |
27 class MockRemoteDemuxerStreamAdapter { | |
28 public: | |
29 MockRemoteDemuxerStreamAdapter( | |
30 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner, | |
31 scoped_refptr<base::SingleThreadTaskRunner> media_task_runner, | |
32 const std::string& name, | |
33 ::media::DemuxerStream* demuxer_stream, | |
34 mojom::RemotingDataStreamSenderPtrInfo stream_sender_info, | |
35 mojo::ScopedDataPipeProducerHandle producer_handle) | |
36 : weak_factory_(this) { | |
37 rpc_broker_.reset(new RpcBroker( | |
38 base::Bind(&MockRemoteDemuxerStreamAdapter::OnSendMessageToSink, | |
39 weak_factory_.GetWeakPtr()))); | |
40 demuxer_stream_adapter_.reset(new RemoteDemuxerStreamAdapter( | |
41 std::move(main_task_runner), std::move(media_task_runner), name, | |
42 demuxer_stream, rpc_broker_->GetWeakPtr(), | |
43 rpc_broker_->GetUniqueHandle(), std::move(stream_sender_info), | |
44 std::move(producer_handle), | |
45 base::Bind(&MockRemoteDemuxerStreamAdapter::OnError, | |
46 weak_factory_.GetWeakPtr()))); | |
47 | |
48 // Faking initialization with random callback handle to start mojo watcher. | |
49 demuxer_stream_adapter_->Initialize(3); | |
50 } | |
51 | |
52 ~MockRemoteDemuxerStreamAdapter() { | |
53 // Make sure unit tests that did not expect errors did not cause any errors. | |
54 EXPECT_TRUE(errors_.empty()); | |
55 } | |
56 | |
57 int rpc_handle() const { return demuxer_stream_adapter_->rpc_handle(); } | |
58 | |
59 base::WeakPtr<MockRemoteDemuxerStreamAdapter> GetWeakPtr() { | |
60 return weak_factory_.GetWeakPtr(); | |
61 } | |
62 | |
63 void DoDuplicateInitialize() { demuxer_stream_adapter_->Initialize(999); } | |
64 | |
65 void TakeErrors(std::vector<StopTrigger>* errors) { | |
66 errors->swap(errors_); | |
67 errors_.clear(); | |
68 } | |
69 | |
70 // Fake to signal that it's in reading state. | |
71 void FakeReadUntil(int read_until_count, int callback_handle) { | |
72 std::unique_ptr<pb::RpcMessage> rpc(new pb::RpcMessage()); | |
73 rpc->set_handle(rpc_handle()); | |
74 rpc->set_proc(pb::RpcMessage::RPC_DS_READUNTIL); | |
75 auto* read_message = rpc->mutable_demuxerstream_readuntil_rpc(); | |
76 read_message->set_callback_handle( | |
77 callback_handle); // Given an unique callback handle. | |
78 read_message->set_count(read_until_count); // Request 1 frame | |
79 | |
80 demuxer_stream_adapter_->OnReceivedRpc(std::move(rpc)); | |
81 } | |
82 void OnNewBuffer(const scoped_refptr<::media::DecoderBuffer>& frame) { | |
83 demuxer_stream_adapter_->OnNewBuffer(DemuxerStream::kOk, frame); | |
84 } | |
85 | |
86 void SignalFlush(bool flush) { demuxer_stream_adapter_->SignalFlush(flush); } | |
87 | |
88 pb::RpcMessage* last_received_rpc() const { return last_received_rpc_.get(); } | |
89 | |
90 private: | |
91 void OnSendMessageToSink(std::unique_ptr<std::vector<uint8_t>> message) { | |
92 last_received_rpc_.reset(new remoting::pb::RpcMessage()); | |
93 CHECK(last_received_rpc_->ParseFromArray(message->data(), message->size())); | |
94 } | |
95 | |
96 void OnError(StopTrigger stop_trigger) { errors_.push_back(stop_trigger); } | |
97 | |
98 std::unique_ptr<RpcBroker> rpc_broker_; | |
99 std::unique_ptr<RemoteDemuxerStreamAdapter> demuxer_stream_adapter_; | |
100 std::unique_ptr<remoting::pb::RpcMessage> last_received_rpc_; | |
101 | |
102 std::vector<StopTrigger> errors_; | |
103 | |
104 base::WeakPtrFactory<MockRemoteDemuxerStreamAdapter> weak_factory_; | |
105 | |
106 DISALLOW_COPY_AND_ASSIGN(MockRemoteDemuxerStreamAdapter); | |
107 }; | |
108 | |
109 class RemoteDemuxerStreamAdapterTest : public ::testing::Test { | |
110 public: | |
111 RemoteDemuxerStreamAdapterTest() {} | |
112 ~RemoteDemuxerStreamAdapterTest() override = default; | |
113 | |
114 void SetUpDataPipe() { | |
115 constexpr size_t kDataPipeCapacity = 256; | |
116 demuxer_stream_.reset(new DummyDemuxerStream(true)); // audio. | |
117 const MojoCreateDataPipeOptions data_pipe_options{ | |
118 sizeof(MojoCreateDataPipeOptions), | |
119 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, kDataPipeCapacity}; | |
120 mojom::RemotingDataStreamSenderPtr stream_sender; | |
121 mojo::ScopedDataPipeProducerHandle producer_end; | |
122 mojo::ScopedDataPipeConsumerHandle consumer_end; | |
123 CHECK_EQ( | |
124 MOJO_RESULT_OK, | |
125 mojo::CreateDataPipe(&data_pipe_options, &producer_end, &consumer_end)); | |
126 | |
127 data_stream_sender_.reset(new FakeRemotingDataStreamSender( | |
128 MakeRequest(&stream_sender), std::move(consumer_end))); | |
129 demuxer_stream_adapter_.reset(new MockRemoteDemuxerStreamAdapter( | |
130 message_loop_.task_runner(), message_loop_.task_runner(), "test", | |
131 demuxer_stream_.get(), stream_sender.PassInterface(), | |
132 std::move(producer_end))); | |
133 // RemoteDemuxerStreamAdapter constructor posts task to main thread to | |
134 // register MessageReceiverCallback. Therefore it should call | |
135 // RunPendingTasks() to make sure task is executed. | |
136 RunPendingTasks(); | |
137 } | |
138 | |
139 void TearDown() override { base::RunLoop().RunUntilIdle(); } | |
140 | |
141 void RunPendingTasks() { base::RunLoop().RunUntilIdle(); } | |
142 | |
143 protected: | |
144 void SetUp() override { SetUpDataPipe(); } | |
145 | |
146 // TODO(miu): Add separate media thread, to test threading also. | |
147 base::MessageLoop message_loop_; | |
148 std::unique_ptr<DummyDemuxerStream> demuxer_stream_; | |
149 std::unique_ptr<FakeRemotingDataStreamSender> data_stream_sender_; | |
150 std::unique_ptr<MockRemoteDemuxerStreamAdapter> demuxer_stream_adapter_; | |
151 | |
152 private: | |
153 DISALLOW_COPY_AND_ASSIGN(RemoteDemuxerStreamAdapterTest); | |
154 }; | |
155 | |
156 TEST_F(RemoteDemuxerStreamAdapterTest, SingleReadUntil) { | |
157 // Read will be called once since it doesn't return frame buffer in the dummy | |
158 // implementation. | |
159 EXPECT_CALL(*demuxer_stream_, Read(_)).Times(1); | |
160 | |
161 demuxer_stream_adapter_->FakeReadUntil(3, 999); | |
162 RunPendingTasks(); | |
163 } | |
164 | |
165 TEST_F(RemoteDemuxerStreamAdapterTest, MultiReadUntil) { | |
166 // Read will be called once since it doesn't return frame buffer in the dummy | |
167 // implementation, and 2nd one will not proceed when there is ongoing read. | |
168 EXPECT_CALL(*demuxer_stream_, Read(_)).Times(1); | |
169 | |
170 demuxer_stream_adapter_->FakeReadUntil(1, 100); | |
171 RunPendingTasks(); | |
172 | |
173 demuxer_stream_adapter_->FakeReadUntil(2, 101); | |
174 RunPendingTasks(); | |
175 } | |
176 | |
177 TEST_F(RemoteDemuxerStreamAdapterTest, WriteOneFrameSmallerThanCapacity) { | |
178 // Sends a frame with size 50 bytes, pts = 1 and key frame. | |
179 demuxer_stream_->CreateFakeFrame(50, true, 1 /* pts */); | |
180 demuxer_stream_adapter_->FakeReadUntil(1, 999); | |
181 RunPendingTasks(); | |
182 | |
183 // Checks if it's sent to consumer side and data is correct | |
184 ASSERT_EQ(data_stream_sender_->send_frame_count(), 1U); | |
185 ASSERT_EQ(data_stream_sender_->consume_data_chunk_count(), 1U); | |
186 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 50, true, 1)); | |
187 pb::RpcMessage* last_rpc = demuxer_stream_adapter_->last_received_rpc(); | |
188 ASSERT_TRUE(last_rpc); | |
189 ASSERT_EQ(last_rpc->proc(), pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | |
190 ASSERT_EQ(last_rpc->handle(), 999); | |
191 data_stream_sender_->ResetHistory(); | |
192 } | |
193 | |
194 TEST_F(RemoteDemuxerStreamAdapterTest, WriteOneFrameLargerThanCapacity) { | |
195 // Sends a frame with size 800 bytes, pts = 1 and key frame. | |
196 demuxer_stream_->CreateFakeFrame(800, true, 1 /* pts */); | |
197 demuxer_stream_adapter_->FakeReadUntil(1, 999); | |
198 RunPendingTasks(); | |
199 | |
200 // Checks if it's sent to consumer side and data is correct | |
201 ASSERT_EQ(data_stream_sender_->send_frame_count(), 1U); | |
202 ASSERT_EQ(data_stream_sender_->consume_data_chunk_count(), 4U); | |
203 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 800, true, 1)); | |
204 pb::RpcMessage* last_rpc = demuxer_stream_adapter_->last_received_rpc(); | |
205 ASSERT_TRUE(last_rpc); | |
206 ASSERT_EQ(last_rpc->proc(), pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | |
207 ASSERT_EQ(last_rpc->handle(), 999); | |
208 data_stream_sender_->ResetHistory(); | |
209 } | |
210 | |
211 TEST_F(RemoteDemuxerStreamAdapterTest, SendFrameAndSignalFlushMix) { | |
212 // Sends a frame with size 50 bytes, pts = 1 and key frame. | |
213 demuxer_stream_->CreateFakeFrame(50, true, 1 /* pts */); | |
214 // Issues ReadUntil request with frame count up to 1 (fetch #0). | |
215 demuxer_stream_adapter_->FakeReadUntil(1, 100); | |
216 RunPendingTasks(); | |
217 ASSERT_EQ(data_stream_sender_->send_frame_count(), 1U); | |
218 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 50, true, 1)); | |
219 pb::RpcMessage* last_rpc = demuxer_stream_adapter_->last_received_rpc(); | |
220 ASSERT_TRUE(last_rpc); | |
221 ASSERT_EQ(last_rpc->proc(), pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | |
222 ASSERT_EQ(last_rpc->handle(), 100); | |
223 data_stream_sender_->ResetHistory(); | |
224 | |
225 // Sends two frames with size 100 + 150 bytes | |
226 demuxer_stream_->CreateFakeFrame(100, false, 2 /* pts */); | |
227 demuxer_stream_->CreateFakeFrame(150, false, 3 /* pts */); | |
228 // Issues ReadUntil request with frame count up to 3 (fetch #1 and #2). | |
229 demuxer_stream_adapter_->FakeReadUntil(3, 101); | |
230 RunPendingTasks(); | |
231 ASSERT_EQ(data_stream_sender_->send_frame_count(), 2U); | |
232 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 100, false, 2)); | |
233 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(1, 150, false, 3)); | |
234 last_rpc = demuxer_stream_adapter_->last_received_rpc(); | |
235 ASSERT_TRUE(last_rpc); | |
236 ASSERT_EQ(last_rpc->proc(), pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | |
237 ASSERT_EQ(last_rpc->handle(), 101); | |
238 data_stream_sender_->ResetHistory(); | |
239 | |
240 // Signal flush | |
241 ASSERT_EQ(data_stream_sender_->cancel_in_flight_count(), 0U); | |
242 demuxer_stream_adapter_->SignalFlush(true); | |
243 RunPendingTasks(); | |
244 ASSERT_EQ(data_stream_sender_->cancel_in_flight_count(), 1U); | |
245 | |
246 // ReadUntil request after flush signaling should be ignored. | |
247 demuxer_stream_->CreateFakeFrame(100, false, 4 /* pts */); | |
248 demuxer_stream_->CreateFakeFrame(100, false, 5 /* pts */); | |
249 // Issues ReadUntil request with frame count up to 5 (fetch #3 and #4). | |
250 demuxer_stream_adapter_->FakeReadUntil(5, 102); | |
251 RunPendingTasks(); | |
252 ASSERT_EQ(data_stream_sender_->send_frame_count(), 0U); | |
253 | |
254 // Signal flush done | |
255 demuxer_stream_adapter_->SignalFlush(false); | |
256 RunPendingTasks(); | |
257 ASSERT_EQ(data_stream_sender_->cancel_in_flight_count(), 1U); | |
258 data_stream_sender_->ResetHistory(); | |
259 | |
260 // Re-issues ReadUntil request with frame count up to 4 (fetch #3). | |
261 demuxer_stream_adapter_->FakeReadUntil(4, 103); | |
262 RunPendingTasks(); | |
263 ASSERT_EQ(data_stream_sender_->send_frame_count(), 1U); | |
264 ASSERT_TRUE(data_stream_sender_->ValidateFrameBuffer(0, 100, false, 4)); | |
265 last_rpc = demuxer_stream_adapter_->last_received_rpc(); | |
266 ASSERT_TRUE(last_rpc); | |
267 ASSERT_EQ(last_rpc->proc(), pb::RpcMessage::RPC_DS_READUNTIL_CALLBACK); | |
268 ASSERT_EQ(last_rpc->handle(), 103); | |
269 data_stream_sender_->ResetHistory(); | |
270 } | |
271 | |
272 TEST_F(RemoteDemuxerStreamAdapterTest, DuplicateInitializeCausesFatalError) { | |
273 std::vector<StopTrigger> errors; | |
274 demuxer_stream_adapter_->TakeErrors(&errors); | |
275 ASSERT_TRUE(errors.empty()); | |
276 | |
277 demuxer_stream_adapter_->DoDuplicateInitialize(); | |
278 demuxer_stream_adapter_->TakeErrors(&errors); | |
279 ASSERT_EQ(1u, errors.size()); | |
280 EXPECT_EQ(PEERS_OUT_OF_SYNC, errors[0]); | |
281 } | |
282 | |
283 TEST_F(RemoteDemuxerStreamAdapterTest, ClosingPipeCausesFatalError) { | |
284 std::vector<StopTrigger> errors; | |
285 demuxer_stream_adapter_->TakeErrors(&errors); | |
286 ASSERT_TRUE(errors.empty()); | |
287 | |
288 // Closes one end of mojo message and data pipes. | |
289 data_stream_sender_.reset(); | |
290 RunPendingTasks(); // Allow notification from mojo to propagate. | |
291 | |
292 demuxer_stream_adapter_->TakeErrors(&errors); | |
293 ASSERT_EQ(1u, errors.size()); | |
294 EXPECT_EQ(MOJO_PIPE_ERROR, errors[0]); | |
295 } | |
296 | |
297 } // namesapce remoting | |
298 } // namespace media | |
OLD | NEW |