Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: extensions/browser/api/cast_channel/cast_transport.cc

Issue 2926313002: Revert of [cast_channel] Move cast_channel related files from //extensions to //components (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "extensions/browser/api/cast_channel/cast_transport.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <string>
11 #include <utility>
12
13 #include "base/bind.h"
14 #include "base/format_macros.h"
15 #include "base/location.h"
16 #include "base/numerics/safe_conversions.h"
17 #include "base/single_thread_task_runner.h"
18 #include "base/strings/stringprintf.h"
19 #include "base/threading/thread_task_runner_handle.h"
20 #include "extensions/browser/api/cast_channel/cast_framer.h"
21 #include "extensions/browser/api/cast_channel/cast_message_util.h"
22 #include "extensions/browser/api/cast_channel/logger.h"
23 #include "extensions/common/api/cast_channel/cast_channel.pb.h"
24 #include "net/base/net_errors.h"
25 #include "net/socket/socket.h"
26
27 #define VLOG_WITH_CONNECTION(level) \
28 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" \
29 << ::cast_channel::ChannelAuthTypeToString(channel_auth_) \
30 << "] "
31
32 namespace extensions {
33 namespace api {
34 namespace cast_channel {
35
36 CastTransportImpl::CastTransportImpl(net::Socket* socket,
37 int channel_id,
38 const net::IPEndPoint& ip_endpoint,
39 ChannelAuthType channel_auth,
40 scoped_refptr<Logger> logger)
41 : started_(false),
42 socket_(socket),
43 write_state_(WRITE_STATE_IDLE),
44 read_state_(READ_STATE_READ),
45 error_state_(ChannelError::NONE),
46 channel_id_(channel_id),
47 ip_endpoint_(ip_endpoint),
48 channel_auth_(channel_auth),
49 logger_(logger) {
50 DCHECK(socket);
51
52 // Buffer is reused across messages to minimize unnecessary buffer
53 // [re]allocations.
54 read_buffer_ = new net::GrowableIOBuffer();
55 read_buffer_->SetCapacity(MessageFramer::MessageHeader::max_message_size());
56 framer_.reset(new MessageFramer(read_buffer_));
57 }
58
59 CastTransportImpl::~CastTransportImpl() {
60 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
61 FlushWriteQueue();
62 }
63
64 bool CastTransportImpl::IsTerminalWriteState(
65 CastTransportImpl::WriteState write_state) {
66 return write_state == WRITE_STATE_ERROR || write_state == WRITE_STATE_IDLE;
67 }
68
69 bool CastTransportImpl::IsTerminalReadState(
70 CastTransportImpl::ReadState read_state) {
71 return read_state == READ_STATE_ERROR;
72 }
73
74 // static
75 proto::ReadState CastTransportImpl::ReadStateToProto(
76 CastTransportImpl::ReadState state) {
77 switch (state) {
78 case CastTransportImpl::READ_STATE_UNKNOWN:
79 return proto::READ_STATE_UNKNOWN;
80 case CastTransportImpl::READ_STATE_READ:
81 return proto::READ_STATE_READ;
82 case CastTransportImpl::READ_STATE_READ_COMPLETE:
83 return proto::READ_STATE_READ_COMPLETE;
84 case CastTransportImpl::READ_STATE_DO_CALLBACK:
85 return proto::READ_STATE_DO_CALLBACK;
86 case CastTransportImpl::READ_STATE_HANDLE_ERROR:
87 return proto::READ_STATE_HANDLE_ERROR;
88 case CastTransportImpl::READ_STATE_ERROR:
89 return proto::READ_STATE_ERROR;
90 default:
91 NOTREACHED();
92 return proto::READ_STATE_UNKNOWN;
93 }
94 }
95
96 // static
97 proto::WriteState CastTransportImpl::WriteStateToProto(
98 CastTransportImpl::WriteState state) {
99 switch (state) {
100 case CastTransportImpl::WRITE_STATE_IDLE:
101 return proto::WRITE_STATE_IDLE;
102 case CastTransportImpl::WRITE_STATE_UNKNOWN:
103 return proto::WRITE_STATE_UNKNOWN;
104 case CastTransportImpl::WRITE_STATE_WRITE:
105 return proto::WRITE_STATE_WRITE;
106 case CastTransportImpl::WRITE_STATE_WRITE_COMPLETE:
107 return proto::WRITE_STATE_WRITE_COMPLETE;
108 case CastTransportImpl::WRITE_STATE_DO_CALLBACK:
109 return proto::WRITE_STATE_DO_CALLBACK;
110 case CastTransportImpl::WRITE_STATE_HANDLE_ERROR:
111 return proto::WRITE_STATE_HANDLE_ERROR;
112 case CastTransportImpl::WRITE_STATE_ERROR:
113 return proto::WRITE_STATE_ERROR;
114 default:
115 NOTREACHED();
116 return proto::WRITE_STATE_UNKNOWN;
117 }
118 }
119
120 // static
121 proto::ErrorState CastTransportImpl::ErrorStateToProto(ChannelError state) {
122 switch (state) {
123 case ChannelError::NONE:
124 return proto::CHANNEL_ERROR_NONE;
125 case ChannelError::CHANNEL_NOT_OPEN:
126 return proto::CHANNEL_ERROR_CHANNEL_NOT_OPEN;
127 case ChannelError::AUTHENTICATION_ERROR:
128 return proto::CHANNEL_ERROR_AUTHENTICATION_ERROR;
129 case ChannelError::CONNECT_ERROR:
130 return proto::CHANNEL_ERROR_CONNECT_ERROR;
131 case ChannelError::CAST_SOCKET_ERROR:
132 return proto::CHANNEL_ERROR_SOCKET_ERROR;
133 case ChannelError::TRANSPORT_ERROR:
134 return proto::CHANNEL_ERROR_TRANSPORT_ERROR;
135 case ChannelError::INVALID_MESSAGE:
136 return proto::CHANNEL_ERROR_INVALID_MESSAGE;
137 case ChannelError::INVALID_CHANNEL_ID:
138 return proto::CHANNEL_ERROR_INVALID_CHANNEL_ID;
139 case ChannelError::CONNECT_TIMEOUT:
140 return proto::CHANNEL_ERROR_CONNECT_TIMEOUT;
141 case ChannelError::UNKNOWN:
142 return proto::CHANNEL_ERROR_UNKNOWN;
143 default:
144 NOTREACHED();
145 return proto::CHANNEL_ERROR_NONE;
146 }
147 }
148
149 void CastTransportImpl::SetReadDelegate(std::unique_ptr<Delegate> delegate) {
150 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
151 DCHECK(delegate);
152 delegate_ = std::move(delegate);
153 if (started_) {
154 delegate_->Start();
155 }
156 }
157
158 void CastTransportImpl::FlushWriteQueue() {
159 for (; !write_queue_.empty(); write_queue_.pop()) {
160 net::CompletionCallback& callback = write_queue_.front().callback;
161 base::ThreadTaskRunnerHandle::Get()->PostTask(
162 FROM_HERE, base::Bind(callback, net::ERR_FAILED));
163 callback.Reset();
164 }
165 }
166
167 void CastTransportImpl::SendMessage(const CastMessage& message,
168 const net::CompletionCallback& callback) {
169 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
170 std::string serialized_message;
171 if (!MessageFramer::Serialize(message, &serialized_message)) {
172 base::ThreadTaskRunnerHandle::Get()->PostTask(
173 FROM_HERE, base::Bind(callback, net::ERR_FAILED));
174 return;
175 }
176 WriteRequest write_request(
177 message.namespace_(), serialized_message, callback);
178
179 write_queue_.push(write_request);
180 if (write_state_ == WRITE_STATE_IDLE) {
181 SetWriteState(WRITE_STATE_WRITE);
182 OnWriteResult(net::OK);
183 }
184 }
185
186 CastTransportImpl::WriteRequest::WriteRequest(
187 const std::string& namespace_,
188 const std::string& payload,
189 const net::CompletionCallback& callback)
190 : message_namespace(namespace_), callback(callback) {
191 VLOG(2) << "WriteRequest size: " << payload.size();
192 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(payload),
193 payload.size());
194 }
195
196 CastTransportImpl::WriteRequest::WriteRequest(const WriteRequest& other) =
197 default;
198
199 CastTransportImpl::WriteRequest::~WriteRequest() {
200 }
201
202 void CastTransportImpl::SetReadState(ReadState read_state) {
203 if (read_state_ != read_state)
204 read_state_ = read_state;
205 }
206
207 void CastTransportImpl::SetWriteState(WriteState write_state) {
208 if (write_state_ != write_state)
209 write_state_ = write_state;
210 }
211
212 void CastTransportImpl::SetErrorState(ChannelError error_state) {
213 VLOG_WITH_CONNECTION(2) << "SetErrorState: "
214 << ::cast_channel::ChannelErrorToString(error_state);
215 error_state_ = error_state;
216 }
217
218 void CastTransportImpl::OnWriteResult(int result) {
219 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
220 DCHECK_NE(WRITE_STATE_IDLE, write_state_);
221 if (write_queue_.empty()) {
222 SetWriteState(WRITE_STATE_IDLE);
223 return;
224 }
225
226 // Network operations can either finish synchronously or asynchronously.
227 // This method executes the state machine transitions in a loop so that
228 // write state transitions happen even when network operations finish
229 // synchronously.
230 int rv = result;
231 do {
232 VLOG_WITH_CONNECTION(2) << "OnWriteResult (state=" << write_state_ << ", "
233 << "result=" << rv << ", "
234 << "queue size=" << write_queue_.size() << ")";
235
236 WriteState state = write_state_;
237 write_state_ = WRITE_STATE_UNKNOWN;
238 switch (state) {
239 case WRITE_STATE_WRITE:
240 rv = DoWrite();
241 break;
242 case WRITE_STATE_WRITE_COMPLETE:
243 rv = DoWriteComplete(rv);
244 break;
245 case WRITE_STATE_DO_CALLBACK:
246 rv = DoWriteCallback();
247 break;
248 case WRITE_STATE_HANDLE_ERROR:
249 rv = DoWriteHandleError(rv);
250 DCHECK_EQ(WRITE_STATE_ERROR, write_state_);
251 break;
252 default:
253 NOTREACHED() << "Unknown state in write state machine: " << state;
254 SetWriteState(WRITE_STATE_ERROR);
255 SetErrorState(ChannelError::UNKNOWN);
256 rv = net::ERR_FAILED;
257 break;
258 }
259 } while (rv != net::ERR_IO_PENDING && !IsTerminalWriteState(write_state_));
260
261 if (write_state_ == WRITE_STATE_ERROR) {
262 FlushWriteQueue();
263 DCHECK_NE(ChannelError::NONE, error_state_);
264 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
265 delegate_->OnError(error_state_);
266 }
267 }
268
269 int CastTransportImpl::DoWrite() {
270 DCHECK(!write_queue_.empty());
271 WriteRequest& request = write_queue_.front();
272
273 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
274 << request.io_buffer->size() << " bytes_written "
275 << request.io_buffer->BytesConsumed();
276
277 SetWriteState(WRITE_STATE_WRITE_COMPLETE);
278
279 int rv = socket_->Write(
280 request.io_buffer.get(), request.io_buffer->BytesRemaining(),
281 base::Bind(&CastTransportImpl::OnWriteResult, base::Unretained(this)));
282 return rv;
283 }
284
285 int CastTransportImpl::DoWriteComplete(int result) {
286 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result;
287 DCHECK(!write_queue_.empty());
288 if (result <= 0) { // NOTE that 0 also indicates an error
289 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, result);
290 SetErrorState(ChannelError::CAST_SOCKET_ERROR);
291 SetWriteState(WRITE_STATE_HANDLE_ERROR);
292 return result == 0 ? net::ERR_FAILED : result;
293 }
294
295 // Some bytes were successfully written
296 WriteRequest& request = write_queue_.front();
297 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
298 io_buffer->DidConsume(result);
299 if (io_buffer->BytesRemaining() == 0) { // Message fully sent
300 SetWriteState(WRITE_STATE_DO_CALLBACK);
301 } else {
302 SetWriteState(WRITE_STATE_WRITE);
303 }
304
305 return net::OK;
306 }
307
308 int CastTransportImpl::DoWriteCallback() {
309 VLOG_WITH_CONNECTION(2) << "DoWriteCallback";
310 DCHECK(!write_queue_.empty());
311
312 WriteRequest& request = write_queue_.front();
313 base::ThreadTaskRunnerHandle::Get()->PostTask(
314 FROM_HERE, base::Bind(request.callback, net::OK));
315
316 write_queue_.pop();
317 if (write_queue_.empty()) {
318 SetWriteState(WRITE_STATE_IDLE);
319 } else {
320 SetWriteState(WRITE_STATE_WRITE);
321 }
322
323 return net::OK;
324 }
325
326 int CastTransportImpl::DoWriteHandleError(int result) {
327 VLOG_WITH_CONNECTION(2) << "DoWriteHandleError result=" << result;
328 DCHECK_NE(ChannelError::NONE, error_state_);
329 DCHECK_LT(result, 0);
330 SetWriteState(WRITE_STATE_ERROR);
331 return net::ERR_FAILED;
332 }
333
334 void CastTransportImpl::Start() {
335 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
336 DCHECK(!started_);
337 DCHECK_EQ(READ_STATE_READ, read_state_);
338 DCHECK(delegate_) << "Read delegate must be set prior to calling Start()";
339 started_ = true;
340 delegate_->Start();
341 SetReadState(READ_STATE_READ);
342
343 // Start the read state machine.
344 OnReadResult(net::OK);
345 }
346
347 void CastTransportImpl::OnReadResult(int result) {
348 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
349 // Network operations can either finish synchronously or asynchronously.
350 // This method executes the state machine transitions in a loop so that
351 // write state transitions happen even when network operations finish
352 // synchronously.
353 int rv = result;
354 do {
355 VLOG_WITH_CONNECTION(2) << "OnReadResult(state=" << read_state_
356 << ", result=" << rv << ")";
357 ReadState state = read_state_;
358 read_state_ = READ_STATE_UNKNOWN;
359
360 switch (state) {
361 case READ_STATE_READ:
362 rv = DoRead();
363 break;
364 case READ_STATE_READ_COMPLETE:
365 rv = DoReadComplete(rv);
366 break;
367 case READ_STATE_DO_CALLBACK:
368 rv = DoReadCallback();
369 break;
370 case READ_STATE_HANDLE_ERROR:
371 rv = DoReadHandleError(rv);
372 DCHECK_EQ(read_state_, READ_STATE_ERROR);
373 break;
374 default:
375 NOTREACHED() << "Unknown state in read state machine: " << state;
376 SetReadState(READ_STATE_ERROR);
377 SetErrorState(ChannelError::UNKNOWN);
378 rv = net::ERR_FAILED;
379 break;
380 }
381 } while (rv != net::ERR_IO_PENDING && !IsTerminalReadState(read_state_));
382
383 if (IsTerminalReadState(read_state_)) {
384 DCHECK_EQ(READ_STATE_ERROR, read_state_);
385 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
386 delegate_->OnError(error_state_);
387 }
388 }
389
390 int CastTransportImpl::DoRead() {
391 VLOG_WITH_CONNECTION(2) << "DoRead";
392 SetReadState(READ_STATE_READ_COMPLETE);
393
394 // Determine how many bytes need to be read.
395 size_t num_bytes_to_read = framer_->BytesRequested();
396 DCHECK_GT(num_bytes_to_read, 0u);
397
398 // Read up to num_bytes_to_read into |current_read_buffer_|.
399 return socket_->Read(
400 read_buffer_.get(), base::checked_cast<uint32_t>(num_bytes_to_read),
401 base::Bind(&CastTransportImpl::OnReadResult, base::Unretained(this)));
402 }
403
404 int CastTransportImpl::DoReadComplete(int result) {
405 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result;
406 if (result <= 0) {
407 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, result);
408 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket.";
409 SetErrorState(ChannelError::CAST_SOCKET_ERROR);
410 SetReadState(READ_STATE_HANDLE_ERROR);
411 return result == 0 ? net::ERR_FAILED : result;
412 }
413
414 size_t message_size;
415 DCHECK(!current_message_);
416 ChannelError framing_error;
417 current_message_ = framer_->Ingest(result, &message_size, &framing_error);
418 if (current_message_.get() && (framing_error == ChannelError::NONE)) {
419 DCHECK_GT(message_size, static_cast<size_t>(0));
420 SetReadState(READ_STATE_DO_CALLBACK);
421 } else if (framing_error != ChannelError::NONE) {
422 DCHECK(!current_message_);
423 SetErrorState(ChannelError::INVALID_MESSAGE);
424 SetReadState(READ_STATE_HANDLE_ERROR);
425 } else {
426 DCHECK(!current_message_);
427 SetReadState(READ_STATE_READ);
428 }
429 return net::OK;
430 }
431
432 int CastTransportImpl::DoReadCallback() {
433 VLOG_WITH_CONNECTION(2) << "DoReadCallback";
434 if (!IsCastMessageValid(*current_message_)) {
435 SetReadState(READ_STATE_HANDLE_ERROR);
436 SetErrorState(ChannelError::INVALID_MESSAGE);
437 return net::ERR_INVALID_RESPONSE;
438 }
439 SetReadState(READ_STATE_READ);
440 delegate_->OnMessage(*current_message_);
441 current_message_.reset();
442 return net::OK;
443 }
444
445 int CastTransportImpl::DoReadHandleError(int result) {
446 VLOG_WITH_CONNECTION(2) << "DoReadHandleError";
447 DCHECK_NE(ChannelError::NONE, error_state_);
448 DCHECK_LE(result, 0);
449 SetReadState(READ_STATE_ERROR);
450 return net::ERR_FAILED;
451 }
452
453 } // namespace cast_channel
454 } // namespace api
455 } // namespace extensions
OLDNEW
« no previous file with comments | « extensions/browser/api/cast_channel/cast_transport.h ('k') | extensions/browser/api/cast_channel/cast_transport_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698