| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "extensions/browser/api/cast_channel/cast_transport.h" | 5 #include "extensions/browser/api/cast_channel/cast_transport.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 #include <stdint.h> | 8 #include <stdint.h> |
| 9 | 9 |
| 10 #include <string> | 10 #include <string> |
| 11 #include <utility> | 11 #include <utility> |
| 12 | 12 |
| 13 #include "base/bind.h" | 13 #include "base/bind.h" |
| 14 #include "base/format_macros.h" | 14 #include "base/format_macros.h" |
| 15 #include "base/location.h" | 15 #include "base/location.h" |
| 16 #include "base/numerics/safe_conversions.h" | 16 #include "base/numerics/safe_conversions.h" |
| 17 #include "base/single_thread_task_runner.h" | 17 #include "base/single_thread_task_runner.h" |
| 18 #include "base/strings/stringprintf.h" | 18 #include "base/strings/stringprintf.h" |
| 19 #include "base/threading/thread_task_runner_handle.h" | 19 #include "base/threading/thread_task_runner_handle.h" |
| 20 #include "extensions/browser/api/cast_channel/cast_framer.h" | 20 #include "extensions/browser/api/cast_channel/cast_framer.h" |
| 21 #include "extensions/browser/api/cast_channel/cast_message_util.h" | 21 #include "extensions/browser/api/cast_channel/cast_message_util.h" |
| 22 #include "extensions/browser/api/cast_channel/logger.h" | 22 #include "extensions/browser/api/cast_channel/logger.h" |
| 23 #include "extensions/browser/api/cast_channel/logger_util.h" | |
| 24 #include "extensions/common/api/cast_channel/cast_channel.pb.h" | 23 #include "extensions/common/api/cast_channel/cast_channel.pb.h" |
| 25 #include "net/base/net_errors.h" | 24 #include "net/base/net_errors.h" |
| 26 #include "net/socket/socket.h" | 25 #include "net/socket/socket.h" |
| 27 | 26 |
| 28 #define VLOG_WITH_CONNECTION(level) \ | 27 #define VLOG_WITH_CONNECTION(level) \ |
| 29 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" << channel_auth_ \ | 28 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" << channel_auth_ \ |
| 30 << "] " | 29 << "] " |
| 31 | 30 |
| 32 namespace extensions { | 31 namespace extensions { |
| 33 namespace api { | 32 namespace api { |
| (...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 162 FROM_HERE, base::Bind(callback, net::ERR_FAILED)); | 161 FROM_HERE, base::Bind(callback, net::ERR_FAILED)); |
| 163 callback.Reset(); | 162 callback.Reset(); |
| 164 } | 163 } |
| 165 } | 164 } |
| 166 | 165 |
| 167 void CastTransportImpl::SendMessage(const CastMessage& message, | 166 void CastTransportImpl::SendMessage(const CastMessage& message, |
| 168 const net::CompletionCallback& callback) { | 167 const net::CompletionCallback& callback) { |
| 169 DCHECK(CalledOnValidThread()); | 168 DCHECK(CalledOnValidThread()); |
| 170 std::string serialized_message; | 169 std::string serialized_message; |
| 171 if (!MessageFramer::Serialize(message, &serialized_message)) { | 170 if (!MessageFramer::Serialize(message, &serialized_message)) { |
| 172 logger_->LogSocketEventForMessage(channel_id_, proto::SEND_MESSAGE_FAILED, | |
| 173 message.namespace_(), | |
| 174 "Error when serializing message."); | |
| 175 base::ThreadTaskRunnerHandle::Get()->PostTask( | 171 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 176 FROM_HERE, base::Bind(callback, net::ERR_FAILED)); | 172 FROM_HERE, base::Bind(callback, net::ERR_FAILED)); |
| 177 return; | 173 return; |
| 178 } | 174 } |
| 179 WriteRequest write_request( | 175 WriteRequest write_request( |
| 180 message.namespace_(), serialized_message, callback); | 176 message.namespace_(), serialized_message, callback); |
| 181 | 177 |
| 182 write_queue_.push(write_request); | 178 write_queue_.push(write_request); |
| 183 logger_->LogSocketEventForMessage( | |
| 184 channel_id_, proto::MESSAGE_ENQUEUED, message.namespace_(), | |
| 185 base::StringPrintf("Queue size: %" PRIuS, write_queue_.size())); | |
| 186 if (write_state_ == WRITE_STATE_IDLE) { | 179 if (write_state_ == WRITE_STATE_IDLE) { |
| 187 SetWriteState(WRITE_STATE_WRITE); | 180 SetWriteState(WRITE_STATE_WRITE); |
| 188 OnWriteResult(net::OK); | 181 OnWriteResult(net::OK); |
| 189 } | 182 } |
| 190 } | 183 } |
| 191 | 184 |
| 192 CastTransportImpl::WriteRequest::WriteRequest( | 185 CastTransportImpl::WriteRequest::WriteRequest( |
| 193 const std::string& namespace_, | 186 const std::string& namespace_, |
| 194 const std::string& payload, | 187 const std::string& payload, |
| 195 const net::CompletionCallback& callback) | 188 const net::CompletionCallback& callback) |
| 196 : message_namespace(namespace_), callback(callback) { | 189 : message_namespace(namespace_), callback(callback) { |
| 197 VLOG(2) << "WriteRequest size: " << payload.size(); | 190 VLOG(2) << "WriteRequest size: " << payload.size(); |
| 198 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(payload), | 191 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(payload), |
| 199 payload.size()); | 192 payload.size()); |
| 200 } | 193 } |
| 201 | 194 |
| 202 CastTransportImpl::WriteRequest::WriteRequest(const WriteRequest& other) = | 195 CastTransportImpl::WriteRequest::WriteRequest(const WriteRequest& other) = |
| 203 default; | 196 default; |
| 204 | 197 |
| 205 CastTransportImpl::WriteRequest::~WriteRequest() { | 198 CastTransportImpl::WriteRequest::~WriteRequest() { |
| 206 } | 199 } |
| 207 | 200 |
| 208 void CastTransportImpl::SetReadState(ReadState read_state) { | 201 void CastTransportImpl::SetReadState(ReadState read_state) { |
| 209 if (read_state_ != read_state) { | 202 if (read_state_ != read_state) |
| 210 read_state_ = read_state; | 203 read_state_ = read_state; |
| 211 logger_->LogSocketReadState(channel_id_, ReadStateToProto(read_state_)); | |
| 212 } | |
| 213 } | 204 } |
| 214 | 205 |
| 215 void CastTransportImpl::SetWriteState(WriteState write_state) { | 206 void CastTransportImpl::SetWriteState(WriteState write_state) { |
| 216 if (write_state_ != write_state) { | 207 if (write_state_ != write_state) |
| 217 write_state_ = write_state; | 208 write_state_ = write_state; |
| 218 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); | |
| 219 } | |
| 220 } | 209 } |
| 221 | 210 |
| 222 void CastTransportImpl::SetErrorState(ChannelError error_state) { | 211 void CastTransportImpl::SetErrorState(ChannelError error_state) { |
| 223 VLOG_WITH_CONNECTION(2) << "SetErrorState: " << error_state; | 212 VLOG_WITH_CONNECTION(2) << "SetErrorState: " << error_state; |
| 224 error_state_ = error_state; | 213 error_state_ = error_state; |
| 225 } | 214 } |
| 226 | 215 |
| 227 void CastTransportImpl::OnWriteResult(int result) { | 216 void CastTransportImpl::OnWriteResult(int result) { |
| 228 DCHECK(CalledOnValidThread()); | 217 DCHECK(CalledOnValidThread()); |
| 229 DCHECK_NE(WRITE_STATE_IDLE, write_state_); | 218 DCHECK_NE(WRITE_STATE_IDLE, write_state_); |
| (...skipping 30 matching lines...) Expand all Loading... |
| 260 break; | 249 break; |
| 261 default: | 250 default: |
| 262 NOTREACHED() << "Unknown state in write state machine: " << state; | 251 NOTREACHED() << "Unknown state in write state machine: " << state; |
| 263 SetWriteState(WRITE_STATE_ERROR); | 252 SetWriteState(WRITE_STATE_ERROR); |
| 264 SetErrorState(CHANNEL_ERROR_UNKNOWN); | 253 SetErrorState(CHANNEL_ERROR_UNKNOWN); |
| 265 rv = net::ERR_FAILED; | 254 rv = net::ERR_FAILED; |
| 266 break; | 255 break; |
| 267 } | 256 } |
| 268 } while (rv != net::ERR_IO_PENDING && !IsTerminalWriteState(write_state_)); | 257 } while (rv != net::ERR_IO_PENDING && !IsTerminalWriteState(write_state_)); |
| 269 | 258 |
| 270 if (IsTerminalWriteState(write_state_)) { | 259 if (write_state_ == WRITE_STATE_ERROR) { |
| 271 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_)); | 260 FlushWriteQueue(); |
| 272 | 261 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_); |
| 273 if (write_state_ == WRITE_STATE_ERROR) { | 262 VLOG_WITH_CONNECTION(2) << "Sending OnError()."; |
| 274 FlushWriteQueue(); | 263 delegate_->OnError(error_state_); |
| 275 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_); | |
| 276 VLOG_WITH_CONNECTION(2) << "Sending OnError()."; | |
| 277 delegate_->OnError(error_state_); | |
| 278 } | |
| 279 } | 264 } |
| 280 } | 265 } |
| 281 | 266 |
| 282 int CastTransportImpl::DoWrite() { | 267 int CastTransportImpl::DoWrite() { |
| 283 DCHECK(!write_queue_.empty()); | 268 DCHECK(!write_queue_.empty()); |
| 284 WriteRequest& request = write_queue_.front(); | 269 WriteRequest& request = write_queue_.front(); |
| 285 | 270 |
| 286 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " | 271 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = " |
| 287 << request.io_buffer->size() << " bytes_written " | 272 << request.io_buffer->size() << " bytes_written " |
| 288 << request.io_buffer->BytesConsumed(); | 273 << request.io_buffer->BytesConsumed(); |
| 289 | 274 |
| 290 SetWriteState(WRITE_STATE_WRITE_COMPLETE); | 275 SetWriteState(WRITE_STATE_WRITE_COMPLETE); |
| 291 | 276 |
| 292 int rv = socket_->Write( | 277 int rv = socket_->Write( |
| 293 request.io_buffer.get(), request.io_buffer->BytesRemaining(), | 278 request.io_buffer.get(), request.io_buffer->BytesRemaining(), |
| 294 base::Bind(&CastTransportImpl::OnWriteResult, base::Unretained(this))); | 279 base::Bind(&CastTransportImpl::OnWriteResult, base::Unretained(this))); |
| 295 return rv; | 280 return rv; |
| 296 } | 281 } |
| 297 | 282 |
| 298 int CastTransportImpl::DoWriteComplete(int result) { | 283 int CastTransportImpl::DoWriteComplete(int result) { |
| 299 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result; | 284 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result; |
| 300 DCHECK(!write_queue_.empty()); | 285 DCHECK(!write_queue_.empty()); |
| 301 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, result); | |
| 302 if (result <= 0) { // NOTE that 0 also indicates an error | 286 if (result <= 0) { // NOTE that 0 also indicates an error |
| 287 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, result); |
| 303 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); | 288 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); |
| 304 SetWriteState(WRITE_STATE_HANDLE_ERROR); | 289 SetWriteState(WRITE_STATE_HANDLE_ERROR); |
| 305 return result == 0 ? net::ERR_FAILED : result; | 290 return result == 0 ? net::ERR_FAILED : result; |
| 306 } | 291 } |
| 307 | 292 |
| 308 // Some bytes were successfully written | 293 // Some bytes were successfully written |
| 309 WriteRequest& request = write_queue_.front(); | 294 WriteRequest& request = write_queue_.front(); |
| 310 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; | 295 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer; |
| 311 io_buffer->DidConsume(result); | 296 io_buffer->DidConsume(result); |
| 312 if (io_buffer->BytesRemaining() == 0) { // Message fully sent | 297 if (io_buffer->BytesRemaining() == 0) { // Message fully sent |
| 313 SetWriteState(WRITE_STATE_DO_CALLBACK); | 298 SetWriteState(WRITE_STATE_DO_CALLBACK); |
| 314 } else { | 299 } else { |
| 315 SetWriteState(WRITE_STATE_WRITE); | 300 SetWriteState(WRITE_STATE_WRITE); |
| 316 } | 301 } |
| 317 | 302 |
| 318 return net::OK; | 303 return net::OK; |
| 319 } | 304 } |
| 320 | 305 |
| 321 int CastTransportImpl::DoWriteCallback() { | 306 int CastTransportImpl::DoWriteCallback() { |
| 322 VLOG_WITH_CONNECTION(2) << "DoWriteCallback"; | 307 VLOG_WITH_CONNECTION(2) << "DoWriteCallback"; |
| 323 DCHECK(!write_queue_.empty()); | 308 DCHECK(!write_queue_.empty()); |
| 324 | 309 |
| 325 WriteRequest& request = write_queue_.front(); | 310 WriteRequest& request = write_queue_.front(); |
| 326 int bytes_consumed = request.io_buffer->BytesConsumed(); | |
| 327 logger_->LogSocketEventForMessage( | |
| 328 channel_id_, proto::MESSAGE_WRITTEN, request.message_namespace, | |
| 329 base::StringPrintf("Bytes: %d", bytes_consumed)); | |
| 330 base::ThreadTaskRunnerHandle::Get()->PostTask( | 311 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 331 FROM_HERE, base::Bind(request.callback, net::OK)); | 312 FROM_HERE, base::Bind(request.callback, net::OK)); |
| 332 | 313 |
| 333 write_queue_.pop(); | 314 write_queue_.pop(); |
| 334 if (write_queue_.empty()) { | 315 if (write_queue_.empty()) { |
| 335 SetWriteState(WRITE_STATE_IDLE); | 316 SetWriteState(WRITE_STATE_IDLE); |
| 336 } else { | 317 } else { |
| 337 SetWriteState(WRITE_STATE_WRITE); | 318 SetWriteState(WRITE_STATE_WRITE); |
| 338 } | 319 } |
| 339 | 320 |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 392 NOTREACHED() << "Unknown state in read state machine: " << state; | 373 NOTREACHED() << "Unknown state in read state machine: " << state; |
| 393 SetReadState(READ_STATE_ERROR); | 374 SetReadState(READ_STATE_ERROR); |
| 394 SetErrorState(CHANNEL_ERROR_UNKNOWN); | 375 SetErrorState(CHANNEL_ERROR_UNKNOWN); |
| 395 rv = net::ERR_FAILED; | 376 rv = net::ERR_FAILED; |
| 396 break; | 377 break; |
| 397 } | 378 } |
| 398 } while (rv != net::ERR_IO_PENDING && !IsTerminalReadState(read_state_)); | 379 } while (rv != net::ERR_IO_PENDING && !IsTerminalReadState(read_state_)); |
| 399 | 380 |
| 400 if (IsTerminalReadState(read_state_)) { | 381 if (IsTerminalReadState(read_state_)) { |
| 401 DCHECK_EQ(READ_STATE_ERROR, read_state_); | 382 DCHECK_EQ(READ_STATE_ERROR, read_state_); |
| 402 logger_->LogSocketReadState(channel_id_, ReadStateToProto(read_state_)); | |
| 403 VLOG_WITH_CONNECTION(2) << "Sending OnError()."; | 383 VLOG_WITH_CONNECTION(2) << "Sending OnError()."; |
| 404 delegate_->OnError(error_state_); | 384 delegate_->OnError(error_state_); |
| 405 } | 385 } |
| 406 } | 386 } |
| 407 | 387 |
| 408 int CastTransportImpl::DoRead() { | 388 int CastTransportImpl::DoRead() { |
| 409 VLOG_WITH_CONNECTION(2) << "DoRead"; | 389 VLOG_WITH_CONNECTION(2) << "DoRead"; |
| 410 SetReadState(READ_STATE_READ_COMPLETE); | 390 SetReadState(READ_STATE_READ_COMPLETE); |
| 411 | 391 |
| 412 // Determine how many bytes need to be read. | 392 // Determine how many bytes need to be read. |
| 413 size_t num_bytes_to_read = framer_->BytesRequested(); | 393 size_t num_bytes_to_read = framer_->BytesRequested(); |
| 414 DCHECK_GT(num_bytes_to_read, 0u); | 394 DCHECK_GT(num_bytes_to_read, 0u); |
| 415 | 395 |
| 416 // Read up to num_bytes_to_read into |current_read_buffer_|. | 396 // Read up to num_bytes_to_read into |current_read_buffer_|. |
| 417 return socket_->Read( | 397 return socket_->Read( |
| 418 read_buffer_.get(), base::checked_cast<uint32_t>(num_bytes_to_read), | 398 read_buffer_.get(), base::checked_cast<uint32_t>(num_bytes_to_read), |
| 419 base::Bind(&CastTransportImpl::OnReadResult, base::Unretained(this))); | 399 base::Bind(&CastTransportImpl::OnReadResult, base::Unretained(this))); |
| 420 } | 400 } |
| 421 | 401 |
| 422 int CastTransportImpl::DoReadComplete(int result) { | 402 int CastTransportImpl::DoReadComplete(int result) { |
| 423 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result; | 403 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result; |
| 424 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, result); | |
| 425 if (result <= 0) { | 404 if (result <= 0) { |
| 405 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, result); |
| 426 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket."; | 406 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket."; |
| 427 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); | 407 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR); |
| 428 SetReadState(READ_STATE_HANDLE_ERROR); | 408 SetReadState(READ_STATE_HANDLE_ERROR); |
| 429 return result == 0 ? net::ERR_FAILED : result; | 409 return result == 0 ? net::ERR_FAILED : result; |
| 430 } | 410 } |
| 431 | 411 |
| 432 size_t message_size; | 412 size_t message_size; |
| 433 DCHECK(!current_message_); | 413 DCHECK(!current_message_); |
| 434 ChannelError framing_error; | 414 ChannelError framing_error; |
| 435 current_message_ = framer_->Ingest(result, &message_size, &framing_error); | 415 current_message_ = framer_->Ingest(result, &message_size, &framing_error); |
| 436 if (current_message_.get() && (framing_error == CHANNEL_ERROR_NONE)) { | 416 if (current_message_.get() && (framing_error == CHANNEL_ERROR_NONE)) { |
| 437 DCHECK_GT(message_size, static_cast<size_t>(0)); | 417 DCHECK_GT(message_size, static_cast<size_t>(0)); |
| 438 logger_->LogSocketEventForMessage( | |
| 439 channel_id_, proto::MESSAGE_READ, current_message_->namespace_(), | |
| 440 base::StringPrintf("Message size: %u", | |
| 441 static_cast<uint32_t>(message_size))); | |
| 442 SetReadState(READ_STATE_DO_CALLBACK); | 418 SetReadState(READ_STATE_DO_CALLBACK); |
| 443 } else if (framing_error != CHANNEL_ERROR_NONE) { | 419 } else if (framing_error != CHANNEL_ERROR_NONE) { |
| 444 DCHECK(!current_message_); | 420 DCHECK(!current_message_); |
| 445 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); | 421 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE); |
| 446 SetReadState(READ_STATE_HANDLE_ERROR); | 422 SetReadState(READ_STATE_HANDLE_ERROR); |
| 447 } else { | 423 } else { |
| 448 DCHECK(!current_message_); | 424 DCHECK(!current_message_); |
| 449 SetReadState(READ_STATE_READ); | 425 SetReadState(READ_STATE_READ); |
| 450 } | 426 } |
| 451 return net::OK; | 427 return net::OK; |
| (...skipping 16 matching lines...) Expand all Loading... |
| 468 VLOG_WITH_CONNECTION(2) << "DoReadHandleError"; | 444 VLOG_WITH_CONNECTION(2) << "DoReadHandleError"; |
| 469 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_); | 445 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_); |
| 470 DCHECK_LE(result, 0); | 446 DCHECK_LE(result, 0); |
| 471 SetReadState(READ_STATE_ERROR); | 447 SetReadState(READ_STATE_ERROR); |
| 472 return net::ERR_FAILED; | 448 return net::ERR_FAILED; |
| 473 } | 449 } |
| 474 | 450 |
| 475 } // namespace cast_channel | 451 } // namespace cast_channel |
| 476 } // namespace api | 452 } // namespace api |
| 477 } // namespace extensions | 453 } // namespace extensions |
| OLD | NEW |