Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(899)

Side by Side Diff: components/cast_channel/cast_transport.cc

Issue 2926313002: Revert of [cast_channel] Move cast_channel related files from //extensions to //components (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/cast_channel/cast_transport.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <string>
11 #include <utility>
12
13 #include "base/bind.h"
14 #include "base/format_macros.h"
15 #include "base/location.h"
16 #include "base/numerics/safe_conversions.h"
17 #include "base/single_thread_task_runner.h"
18 #include "base/strings/stringprintf.h"
19 #include "base/threading/thread_task_runner_handle.h"
20 #include "components/cast_channel/cast_framer.h"
21 #include "components/cast_channel/cast_message_util.h"
22 #include "components/cast_channel/logger.h"
23 #include "components/cast_channel/proto/cast_channel.pb.h"
24 #include "net/base/net_errors.h"
25 #include "net/socket/socket.h"
26
27 #define VLOG_WITH_CONNECTION(level) \
28 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" \
29 << ::cast_channel::ChannelAuthTypeToString(channel_auth_) \
30 << "] "
31
32 namespace cast_channel {
33
34 CastTransportImpl::CastTransportImpl(net::Socket* socket,
35 int channel_id,
36 const net::IPEndPoint& ip_endpoint,
37 ChannelAuthType channel_auth,
38 scoped_refptr<Logger> logger)
39 : started_(false),
40 socket_(socket),
41 write_state_(WRITE_STATE_IDLE),
42 read_state_(READ_STATE_READ),
43 error_state_(ChannelError::NONE),
44 channel_id_(channel_id),
45 ip_endpoint_(ip_endpoint),
46 channel_auth_(channel_auth),
47 logger_(logger) {
48 DCHECK(socket);
49
50 // Buffer is reused across messages to minimize unnecessary buffer
51 // [re]allocations.
52 read_buffer_ = new net::GrowableIOBuffer();
53 read_buffer_->SetCapacity(MessageFramer::MessageHeader::max_message_size());
54 framer_.reset(new MessageFramer(read_buffer_));
55 }
56
57 CastTransportImpl::~CastTransportImpl() {
58 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
59 FlushWriteQueue();
60 }
61
62 bool CastTransportImpl::IsTerminalWriteState(
63 CastTransportImpl::WriteState write_state) {
64 return write_state == WRITE_STATE_ERROR || write_state == WRITE_STATE_IDLE;
65 }
66
67 bool CastTransportImpl::IsTerminalReadState(
68 CastTransportImpl::ReadState read_state) {
69 return read_state == READ_STATE_ERROR;
70 }
71
72 // static
73 proto::ReadState CastTransportImpl::ReadStateToProto(
74 CastTransportImpl::ReadState state) {
75 switch (state) {
76 case CastTransportImpl::READ_STATE_UNKNOWN:
77 return proto::READ_STATE_UNKNOWN;
78 case CastTransportImpl::READ_STATE_READ:
79 return proto::READ_STATE_READ;
80 case CastTransportImpl::READ_STATE_READ_COMPLETE:
81 return proto::READ_STATE_READ_COMPLETE;
82 case CastTransportImpl::READ_STATE_DO_CALLBACK:
83 return proto::READ_STATE_DO_CALLBACK;
84 case CastTransportImpl::READ_STATE_HANDLE_ERROR:
85 return proto::READ_STATE_HANDLE_ERROR;
86 case CastTransportImpl::READ_STATE_ERROR:
87 return proto::READ_STATE_ERROR;
88 default:
89 NOTREACHED();
90 return proto::READ_STATE_UNKNOWN;
91 }
92 }
93
94 // static
95 proto::WriteState CastTransportImpl::WriteStateToProto(
96 CastTransportImpl::WriteState state) {
97 switch (state) {
98 case CastTransportImpl::WRITE_STATE_IDLE:
99 return proto::WRITE_STATE_IDLE;
100 case CastTransportImpl::WRITE_STATE_UNKNOWN:
101 return proto::WRITE_STATE_UNKNOWN;
102 case CastTransportImpl::WRITE_STATE_WRITE:
103 return proto::WRITE_STATE_WRITE;
104 case CastTransportImpl::WRITE_STATE_WRITE_COMPLETE:
105 return proto::WRITE_STATE_WRITE_COMPLETE;
106 case CastTransportImpl::WRITE_STATE_DO_CALLBACK:
107 return proto::WRITE_STATE_DO_CALLBACK;
108 case CastTransportImpl::WRITE_STATE_HANDLE_ERROR:
109 return proto::WRITE_STATE_HANDLE_ERROR;
110 case CastTransportImpl::WRITE_STATE_ERROR:
111 return proto::WRITE_STATE_ERROR;
112 default:
113 NOTREACHED();
114 return proto::WRITE_STATE_UNKNOWN;
115 }
116 }
117
118 // static
119 proto::ErrorState CastTransportImpl::ErrorStateToProto(ChannelError state) {
120 switch (state) {
121 case ChannelError::NONE:
122 return proto::CHANNEL_ERROR_NONE;
123 case ChannelError::CHANNEL_NOT_OPEN:
124 return proto::CHANNEL_ERROR_CHANNEL_NOT_OPEN;
125 case ChannelError::AUTHENTICATION_ERROR:
126 return proto::CHANNEL_ERROR_AUTHENTICATION_ERROR;
127 case ChannelError::CONNECT_ERROR:
128 return proto::CHANNEL_ERROR_CONNECT_ERROR;
129 case ChannelError::CAST_SOCKET_ERROR:
130 return proto::CHANNEL_ERROR_SOCKET_ERROR;
131 case ChannelError::TRANSPORT_ERROR:
132 return proto::CHANNEL_ERROR_TRANSPORT_ERROR;
133 case ChannelError::INVALID_MESSAGE:
134 return proto::CHANNEL_ERROR_INVALID_MESSAGE;
135 case ChannelError::INVALID_CHANNEL_ID:
136 return proto::CHANNEL_ERROR_INVALID_CHANNEL_ID;
137 case ChannelError::CONNECT_TIMEOUT:
138 return proto::CHANNEL_ERROR_CONNECT_TIMEOUT;
139 case ChannelError::UNKNOWN:
140 return proto::CHANNEL_ERROR_UNKNOWN;
141 default:
142 NOTREACHED();
143 return proto::CHANNEL_ERROR_NONE;
144 }
145 }
146
147 void CastTransportImpl::SetReadDelegate(std::unique_ptr<Delegate> delegate) {
148 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
149 DCHECK(delegate);
150 delegate_ = std::move(delegate);
151 if (started_) {
152 delegate_->Start();
153 }
154 }
155
156 void CastTransportImpl::FlushWriteQueue() {
157 for (; !write_queue_.empty(); write_queue_.pop()) {
158 net::CompletionCallback& callback = write_queue_.front().callback;
159 base::ThreadTaskRunnerHandle::Get()->PostTask(
160 FROM_HERE, base::Bind(callback, net::ERR_FAILED));
161 callback.Reset();
162 }
163 }
164
165 void CastTransportImpl::SendMessage(const CastMessage& message,
166 const net::CompletionCallback& callback) {
167 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
168 std::string serialized_message;
169 if (!MessageFramer::Serialize(message, &serialized_message)) {
170 base::ThreadTaskRunnerHandle::Get()->PostTask(
171 FROM_HERE, base::Bind(callback, net::ERR_FAILED));
172 return;
173 }
174 WriteRequest write_request(message.namespace_(), serialized_message,
175 callback);
176
177 write_queue_.push(write_request);
178 if (write_state_ == WRITE_STATE_IDLE) {
179 SetWriteState(WRITE_STATE_WRITE);
180 OnWriteResult(net::OK);
181 }
182 }
183
184 CastTransportImpl::WriteRequest::WriteRequest(
185 const std::string& namespace_,
186 const std::string& payload,
187 const net::CompletionCallback& callback)
188 : message_namespace(namespace_), callback(callback) {
189 VLOG(2) << "WriteRequest size: " << payload.size();
190 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(payload),
191 payload.size());
192 }
193
194 CastTransportImpl::WriteRequest::WriteRequest(const WriteRequest& other) =
195 default;
196
197 CastTransportImpl::WriteRequest::~WriteRequest() {}
198
199 void CastTransportImpl::SetReadState(ReadState read_state) {
200 if (read_state_ != read_state)
201 read_state_ = read_state;
202 }
203
204 void CastTransportImpl::SetWriteState(WriteState write_state) {
205 if (write_state_ != write_state)
206 write_state_ = write_state;
207 }
208
209 void CastTransportImpl::SetErrorState(ChannelError error_state) {
210 VLOG_WITH_CONNECTION(2) << "SetErrorState: "
211 << ::cast_channel::ChannelErrorToString(error_state);
212 error_state_ = error_state;
213 }
214
215 void CastTransportImpl::OnWriteResult(int result) {
216 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
217 DCHECK_NE(WRITE_STATE_IDLE, write_state_);
218 if (write_queue_.empty()) {
219 SetWriteState(WRITE_STATE_IDLE);
220 return;
221 }
222
223 // Network operations can either finish synchronously or asynchronously.
224 // This method executes the state machine transitions in a loop so that
225 // write state transitions happen even when network operations finish
226 // synchronously.
227 int rv = result;
228 do {
229 VLOG_WITH_CONNECTION(2) << "OnWriteResult (state=" << write_state_ << ", "
230 << "result=" << rv << ", "
231 << "queue size=" << write_queue_.size() << ")";
232
233 WriteState state = write_state_;
234 write_state_ = WRITE_STATE_UNKNOWN;
235 switch (state) {
236 case WRITE_STATE_WRITE:
237 rv = DoWrite();
238 break;
239 case WRITE_STATE_WRITE_COMPLETE:
240 rv = DoWriteComplete(rv);
241 break;
242 case WRITE_STATE_DO_CALLBACK:
243 rv = DoWriteCallback();
244 break;
245 case WRITE_STATE_HANDLE_ERROR:
246 rv = DoWriteHandleError(rv);
247 DCHECK_EQ(WRITE_STATE_ERROR, write_state_);
248 break;
249 default:
250 NOTREACHED() << "Unknown state in write state machine: " << state;
251 SetWriteState(WRITE_STATE_ERROR);
252 SetErrorState(ChannelError::UNKNOWN);
253 rv = net::ERR_FAILED;
254 break;
255 }
256 } while (rv != net::ERR_IO_PENDING && !IsTerminalWriteState(write_state_));
257
258 if (write_state_ == WRITE_STATE_ERROR) {
259 FlushWriteQueue();
260 DCHECK_NE(ChannelError::NONE, error_state_);
261 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
262 delegate_->OnError(error_state_);
263 }
264 }
265
266 int CastTransportImpl::DoWrite() {
267 DCHECK(!write_queue_.empty());
268 WriteRequest& request = write_queue_.front();
269
270 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
271 << request.io_buffer->size() << " bytes_written "
272 << request.io_buffer->BytesConsumed();
273
274 SetWriteState(WRITE_STATE_WRITE_COMPLETE);
275
276 int rv = socket_->Write(
277 request.io_buffer.get(), request.io_buffer->BytesRemaining(),
278 base::Bind(&CastTransportImpl::OnWriteResult, base::Unretained(this)));
279 return rv;
280 }
281
282 int CastTransportImpl::DoWriteComplete(int result) {
283 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result;
284 DCHECK(!write_queue_.empty());
285 if (result <= 0) { // NOTE that 0 also indicates an error
286 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, result);
287 SetErrorState(ChannelError::CAST_SOCKET_ERROR);
288 SetWriteState(WRITE_STATE_HANDLE_ERROR);
289 return result == 0 ? net::ERR_FAILED : result;
290 }
291
292 // Some bytes were successfully written
293 WriteRequest& request = write_queue_.front();
294 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
295 io_buffer->DidConsume(result);
296 if (io_buffer->BytesRemaining() == 0) { // Message fully sent
297 SetWriteState(WRITE_STATE_DO_CALLBACK);
298 } else {
299 SetWriteState(WRITE_STATE_WRITE);
300 }
301
302 return net::OK;
303 }
304
305 int CastTransportImpl::DoWriteCallback() {
306 VLOG_WITH_CONNECTION(2) << "DoWriteCallback";
307 DCHECK(!write_queue_.empty());
308
309 WriteRequest& request = write_queue_.front();
310 base::ThreadTaskRunnerHandle::Get()->PostTask(
311 FROM_HERE, base::Bind(request.callback, net::OK));
312
313 write_queue_.pop();
314 if (write_queue_.empty()) {
315 SetWriteState(WRITE_STATE_IDLE);
316 } else {
317 SetWriteState(WRITE_STATE_WRITE);
318 }
319
320 return net::OK;
321 }
322
323 int CastTransportImpl::DoWriteHandleError(int result) {
324 VLOG_WITH_CONNECTION(2) << "DoWriteHandleError result=" << result;
325 DCHECK_NE(ChannelError::NONE, error_state_);
326 DCHECK_LT(result, 0);
327 SetWriteState(WRITE_STATE_ERROR);
328 return net::ERR_FAILED;
329 }
330
331 void CastTransportImpl::Start() {
332 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
333 DCHECK(!started_);
334 DCHECK_EQ(READ_STATE_READ, read_state_);
335 DCHECK(delegate_) << "Read delegate must be set prior to calling Start()";
336 started_ = true;
337 delegate_->Start();
338 SetReadState(READ_STATE_READ);
339
340 // Start the read state machine.
341 OnReadResult(net::OK);
342 }
343
344 void CastTransportImpl::OnReadResult(int result) {
345 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
346 // Network operations can either finish synchronously or asynchronously.
347 // This method executes the state machine transitions in a loop so that
348 // write state transitions happen even when network operations finish
349 // synchronously.
350 int rv = result;
351 do {
352 VLOG_WITH_CONNECTION(2)
353 << "OnReadResult(state=" << read_state_ << ", result=" << rv << ")";
354 ReadState state = read_state_;
355 read_state_ = READ_STATE_UNKNOWN;
356
357 switch (state) {
358 case READ_STATE_READ:
359 rv = DoRead();
360 break;
361 case READ_STATE_READ_COMPLETE:
362 rv = DoReadComplete(rv);
363 break;
364 case READ_STATE_DO_CALLBACK:
365 rv = DoReadCallback();
366 break;
367 case READ_STATE_HANDLE_ERROR:
368 rv = DoReadHandleError(rv);
369 DCHECK_EQ(read_state_, READ_STATE_ERROR);
370 break;
371 default:
372 NOTREACHED() << "Unknown state in read state machine: " << state;
373 SetReadState(READ_STATE_ERROR);
374 SetErrorState(ChannelError::UNKNOWN);
375 rv = net::ERR_FAILED;
376 break;
377 }
378 } while (rv != net::ERR_IO_PENDING && !IsTerminalReadState(read_state_));
379
380 if (IsTerminalReadState(read_state_)) {
381 DCHECK_EQ(READ_STATE_ERROR, read_state_);
382 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
383 delegate_->OnError(error_state_);
384 }
385 }
386
387 int CastTransportImpl::DoRead() {
388 VLOG_WITH_CONNECTION(2) << "DoRead";
389 SetReadState(READ_STATE_READ_COMPLETE);
390
391 // Determine how many bytes need to be read.
392 size_t num_bytes_to_read = framer_->BytesRequested();
393 DCHECK_GT(num_bytes_to_read, 0u);
394
395 // Read up to num_bytes_to_read into |current_read_buffer_|.
396 return socket_->Read(
397 read_buffer_.get(), base::checked_cast<uint32_t>(num_bytes_to_read),
398 base::Bind(&CastTransportImpl::OnReadResult, base::Unretained(this)));
399 }
400
401 int CastTransportImpl::DoReadComplete(int result) {
402 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result;
403 if (result <= 0) {
404 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, result);
405 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket.";
406 SetErrorState(ChannelError::CAST_SOCKET_ERROR);
407 SetReadState(READ_STATE_HANDLE_ERROR);
408 return result == 0 ? net::ERR_FAILED : result;
409 }
410
411 size_t message_size;
412 DCHECK(!current_message_);
413 ChannelError framing_error;
414 current_message_ = framer_->Ingest(result, &message_size, &framing_error);
415 if (current_message_.get() && (framing_error == ChannelError::NONE)) {
416 DCHECK_GT(message_size, static_cast<size_t>(0));
417 SetReadState(READ_STATE_DO_CALLBACK);
418 } else if (framing_error != ChannelError::NONE) {
419 DCHECK(!current_message_);
420 SetErrorState(ChannelError::INVALID_MESSAGE);
421 SetReadState(READ_STATE_HANDLE_ERROR);
422 } else {
423 DCHECK(!current_message_);
424 SetReadState(READ_STATE_READ);
425 }
426 return net::OK;
427 }
428
429 int CastTransportImpl::DoReadCallback() {
430 VLOG_WITH_CONNECTION(2) << "DoReadCallback";
431 if (!IsCastMessageValid(*current_message_)) {
432 SetReadState(READ_STATE_HANDLE_ERROR);
433 SetErrorState(ChannelError::INVALID_MESSAGE);
434 return net::ERR_INVALID_RESPONSE;
435 }
436 SetReadState(READ_STATE_READ);
437 delegate_->OnMessage(*current_message_);
438 current_message_.reset();
439 return net::OK;
440 }
441
442 int CastTransportImpl::DoReadHandleError(int result) {
443 VLOG_WITH_CONNECTION(2) << "DoReadHandleError";
444 DCHECK_NE(ChannelError::NONE, error_state_);
445 DCHECK_LE(result, 0);
446 SetReadState(READ_STATE_ERROR);
447 return net::ERR_FAILED;
448 }
449
450 } // namespace cast_channel
OLDNEW
« no previous file with comments | « components/cast_channel/cast_transport.h ('k') | components/cast_channel/cast_transport_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698