| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "blimp/net/stream_packet_reader.h" | |
| 6 | |
| 7 #include <iostream> | |
| 8 | |
| 9 #include "base/callback_helpers.h" | |
| 10 #include "base/location.h" | |
| 11 #include "base/logging.h" | |
| 12 #include "base/memory/weak_ptr.h" | |
| 13 #include "base/single_thread_task_runner.h" | |
| 14 #include "base/sys_byteorder.h" | |
| 15 #include "base/threading/thread_task_runner_handle.h" | |
| 16 #include "blimp/net/blimp_stats.h" | |
| 17 #include "blimp/net/common.h" | |
| 18 #include "net/base/io_buffer.h" | |
| 19 #include "net/base/net_errors.h" | |
| 20 #include "net/socket/stream_socket.h" | |
| 21 | |
| 22 namespace blimp { | |
| 23 | |
| 24 std::ostream& operator<<(std::ostream& out, | |
| 25 const StreamPacketReader::ReadState state) { | |
| 26 switch (state) { | |
| 27 case StreamPacketReader::ReadState::HEADER: | |
| 28 out << "HEADER"; | |
| 29 break; | |
| 30 case StreamPacketReader::ReadState::PAYLOAD: | |
| 31 out << "PAYLOAD"; | |
| 32 break; | |
| 33 case StreamPacketReader::ReadState::IDLE: | |
| 34 out << "IDLE"; | |
| 35 break; | |
| 36 } | |
| 37 return out; | |
| 38 } | |
| 39 | |
| 40 StreamPacketReader::StreamPacketReader(net::StreamSocket* socket) | |
| 41 : read_state_(ReadState::IDLE), | |
| 42 socket_(socket), | |
| 43 payload_size_(0), | |
| 44 weak_factory_(this) { | |
| 45 DCHECK(socket_); | |
| 46 header_buffer_ = new net::GrowableIOBuffer; | |
| 47 header_buffer_->SetCapacity(kPacketHeaderSizeBytes); | |
| 48 } | |
| 49 | |
| 50 StreamPacketReader::~StreamPacketReader() {} | |
| 51 | |
| 52 void StreamPacketReader::ReadPacket( | |
| 53 const scoped_refptr<net::GrowableIOBuffer>& buf, | |
| 54 const net::CompletionCallback& callback) { | |
| 55 DCHECK_EQ(ReadState::IDLE, read_state_); | |
| 56 if (static_cast<size_t>(buf->capacity()) < kPacketHeaderSizeBytes) { | |
| 57 buf->SetCapacity(kPacketHeaderSizeBytes); | |
| 58 } | |
| 59 | |
| 60 header_buffer_->set_offset(0); | |
| 61 payload_buffer_ = buf; | |
| 62 payload_buffer_->set_offset(0); | |
| 63 read_state_ = ReadState::HEADER; | |
| 64 | |
| 65 int result = DoReadLoop(net::OK); | |
| 66 if (result != net::ERR_IO_PENDING) { | |
| 67 // Release the payload buffer, since the read operation has completed | |
| 68 // synchronously. | |
| 69 payload_buffer_ = nullptr; | |
| 70 | |
| 71 // Adapt synchronous completion to an asynchronous style. | |
| 72 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 73 FROM_HERE, | |
| 74 base::Bind(callback, result == net::OK ? payload_size_ : result)); | |
| 75 } else { | |
| 76 callback_ = callback; | |
| 77 } | |
| 78 } | |
| 79 | |
| 80 int StreamPacketReader::DoReadLoop(int result) { | |
| 81 DCHECK_NE(net::ERR_IO_PENDING, result); | |
| 82 DCHECK_GE(result, 0); | |
| 83 DCHECK_NE(ReadState::IDLE, read_state_); | |
| 84 | |
| 85 while (result >= 0 && read_state_ != ReadState::IDLE) { | |
| 86 VLOG(2) << "DoReadLoop (state=" << read_state_ << ", result=" << result | |
| 87 << ")"; | |
| 88 | |
| 89 switch (read_state_) { | |
| 90 case ReadState::HEADER: | |
| 91 result = DoReadHeader(result); | |
| 92 break; | |
| 93 case ReadState::PAYLOAD: | |
| 94 result = DoReadPayload(result); | |
| 95 break; | |
| 96 case ReadState::IDLE: | |
| 97 NOTREACHED(); | |
| 98 result = net::ERR_UNEXPECTED; | |
| 99 break; | |
| 100 } | |
| 101 } | |
| 102 | |
| 103 return result; | |
| 104 } | |
| 105 | |
| 106 int StreamPacketReader::DoReadHeader(int result) { | |
| 107 DCHECK_EQ(ReadState::HEADER, read_state_); | |
| 108 DCHECK_GT(kPacketHeaderSizeBytes, | |
| 109 static_cast<size_t>(header_buffer_->offset())); | |
| 110 DCHECK_GE(result, 0); | |
| 111 | |
| 112 header_buffer_->set_offset(header_buffer_->offset() + result); | |
| 113 if (static_cast<size_t>(header_buffer_->offset()) < kPacketHeaderSizeBytes) { | |
| 114 // There is more header to read. | |
| 115 return DoRead(header_buffer_.get(), | |
| 116 kPacketHeaderSizeBytes - header_buffer_->offset()); | |
| 117 } | |
| 118 | |
| 119 // Finished reading the header. Parse the size and prepare for payload read. | |
| 120 payload_size_ = base::NetToHost32( | |
| 121 *reinterpret_cast<uint32_t*>(header_buffer_->StartOfBuffer())); | |
| 122 if (payload_size_ == 0 || payload_size_ > kMaxPacketPayloadSizeBytes) { | |
| 123 DLOG(ERROR) << "Illegal payload size: " << payload_size_; | |
| 124 return net::ERR_INVALID_RESPONSE; | |
| 125 } | |
| 126 if (static_cast<size_t>(payload_buffer_->capacity()) < payload_size_) { | |
| 127 payload_buffer_->SetCapacity(payload_size_); | |
| 128 } | |
| 129 read_state_ = ReadState::PAYLOAD; | |
| 130 return net::OK; | |
| 131 } | |
| 132 | |
| 133 int StreamPacketReader::DoReadPayload(int result) { | |
| 134 DCHECK_EQ(ReadState::PAYLOAD, read_state_); | |
| 135 DCHECK_GE(result, 0); | |
| 136 | |
| 137 payload_buffer_->set_offset(payload_buffer_->offset() + result); | |
| 138 if (static_cast<size_t>(payload_buffer_->offset()) < payload_size_) { | |
| 139 return DoRead(payload_buffer_.get(), | |
| 140 payload_size_ - payload_buffer_->offset()); | |
| 141 } | |
| 142 BlimpStats::GetInstance()->Add(BlimpStats::BYTES_RECEIVED, payload_size_); | |
| 143 | |
| 144 // Finished reading the payload. | |
| 145 read_state_ = ReadState::IDLE; | |
| 146 payload_buffer_->set_offset(0); | |
| 147 return payload_size_; | |
| 148 } | |
| 149 | |
| 150 void StreamPacketReader::OnReadComplete(int result) { | |
| 151 DCHECK_NE(net::ERR_IO_PENDING, result); | |
| 152 | |
| 153 if (result == 0 /* EOF */) { | |
| 154 payload_buffer_ = nullptr; | |
| 155 base::ResetAndReturn(&callback_).Run(net::ERR_CONNECTION_CLOSED); | |
| 156 return; | |
| 157 } | |
| 158 | |
| 159 // If the read was successful, then process the result. | |
| 160 if (result > 0) { | |
| 161 result = DoReadLoop(result); | |
| 162 } | |
| 163 | |
| 164 // If all reading completed, either successfully or by error, inform the | |
| 165 // caller. | |
| 166 if (result != net::ERR_IO_PENDING) { | |
| 167 payload_buffer_ = nullptr; | |
| 168 base::ResetAndReturn(&callback_).Run(result); | |
| 169 } | |
| 170 } | |
| 171 | |
| 172 int StreamPacketReader::DoRead(net::IOBuffer* buf, int buf_len) { | |
| 173 int result = socket_->Read(buf, buf_len, | |
| 174 base::Bind(&StreamPacketReader::OnReadComplete, | |
| 175 weak_factory_.GetWeakPtr())); | |
| 176 return (result != 0 ? result : net::ERR_CONNECTION_CLOSED); | |
| 177 } | |
| 178 | |
| 179 } // namespace blimp | |
| OLD | NEW |