OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "blimp/net/stream_packet_reader.h" | |
6 | |
7 #include <iostream> | |
8 | |
9 #include "base/callback_helpers.h" | |
10 #include "base/location.h" | |
11 #include "base/logging.h" | |
12 #include "base/memory/weak_ptr.h" | |
13 #include "base/single_thread_task_runner.h" | |
14 #include "base/sys_byteorder.h" | |
15 #include "base/threading/thread_task_runner_handle.h" | |
16 #include "blimp/net/blimp_stats.h" | |
17 #include "blimp/net/common.h" | |
18 #include "net/base/io_buffer.h" | |
19 #include "net/base/net_errors.h" | |
20 #include "net/socket/stream_socket.h" | |
21 | |
22 namespace blimp { | |
23 | |
24 std::ostream& operator<<(std::ostream& out, | |
25 const StreamPacketReader::ReadState state) { | |
26 switch (state) { | |
27 case StreamPacketReader::ReadState::HEADER: | |
28 out << "HEADER"; | |
29 break; | |
30 case StreamPacketReader::ReadState::PAYLOAD: | |
31 out << "PAYLOAD"; | |
32 break; | |
33 case StreamPacketReader::ReadState::IDLE: | |
34 out << "IDLE"; | |
35 break; | |
36 } | |
37 return out; | |
38 } | |
39 | |
40 StreamPacketReader::StreamPacketReader(net::StreamSocket* socket) | |
41 : read_state_(ReadState::IDLE), | |
42 socket_(socket), | |
43 payload_size_(0), | |
44 weak_factory_(this) { | |
45 DCHECK(socket_); | |
46 header_buffer_ = new net::GrowableIOBuffer; | |
47 header_buffer_->SetCapacity(kPacketHeaderSizeBytes); | |
48 } | |
49 | |
50 StreamPacketReader::~StreamPacketReader() {} | |
51 | |
52 void StreamPacketReader::ReadPacket( | |
53 const scoped_refptr<net::GrowableIOBuffer>& buf, | |
54 const net::CompletionCallback& callback) { | |
55 DCHECK_EQ(ReadState::IDLE, read_state_); | |
56 if (static_cast<size_t>(buf->capacity()) < kPacketHeaderSizeBytes) { | |
57 buf->SetCapacity(kPacketHeaderSizeBytes); | |
58 } | |
59 | |
60 header_buffer_->set_offset(0); | |
61 payload_buffer_ = buf; | |
62 payload_buffer_->set_offset(0); | |
63 read_state_ = ReadState::HEADER; | |
64 | |
65 int result = DoReadLoop(net::OK); | |
66 if (result != net::ERR_IO_PENDING) { | |
67 // Release the payload buffer, since the read operation has completed | |
68 // synchronously. | |
69 payload_buffer_ = nullptr; | |
70 | |
71 // Adapt synchronous completion to an asynchronous style. | |
72 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
73 FROM_HERE, | |
74 base::Bind(callback, result == net::OK ? payload_size_ : result)); | |
75 } else { | |
76 callback_ = callback; | |
77 } | |
78 } | |
79 | |
80 int StreamPacketReader::DoReadLoop(int result) { | |
81 DCHECK_NE(net::ERR_IO_PENDING, result); | |
82 DCHECK_GE(result, 0); | |
83 DCHECK_NE(ReadState::IDLE, read_state_); | |
84 | |
85 while (result >= 0 && read_state_ != ReadState::IDLE) { | |
86 VLOG(2) << "DoReadLoop (state=" << read_state_ << ", result=" << result | |
87 << ")"; | |
88 | |
89 switch (read_state_) { | |
90 case ReadState::HEADER: | |
91 result = DoReadHeader(result); | |
92 break; | |
93 case ReadState::PAYLOAD: | |
94 result = DoReadPayload(result); | |
95 break; | |
96 case ReadState::IDLE: | |
97 NOTREACHED(); | |
98 result = net::ERR_UNEXPECTED; | |
99 break; | |
100 } | |
101 } | |
102 | |
103 return result; | |
104 } | |
105 | |
106 int StreamPacketReader::DoReadHeader(int result) { | |
107 DCHECK_EQ(ReadState::HEADER, read_state_); | |
108 DCHECK_GT(kPacketHeaderSizeBytes, | |
109 static_cast<size_t>(header_buffer_->offset())); | |
110 DCHECK_GE(result, 0); | |
111 | |
112 header_buffer_->set_offset(header_buffer_->offset() + result); | |
113 if (static_cast<size_t>(header_buffer_->offset()) < kPacketHeaderSizeBytes) { | |
114 // There is more header to read. | |
115 return DoRead(header_buffer_.get(), | |
116 kPacketHeaderSizeBytes - header_buffer_->offset()); | |
117 } | |
118 | |
119 // Finished reading the header. Parse the size and prepare for payload read. | |
120 payload_size_ = base::NetToHost32( | |
121 *reinterpret_cast<uint32_t*>(header_buffer_->StartOfBuffer())); | |
122 if (payload_size_ == 0 || payload_size_ > kMaxPacketPayloadSizeBytes) { | |
123 DLOG(ERROR) << "Illegal payload size: " << payload_size_; | |
124 return net::ERR_INVALID_RESPONSE; | |
125 } | |
126 if (static_cast<size_t>(payload_buffer_->capacity()) < payload_size_) { | |
127 payload_buffer_->SetCapacity(payload_size_); | |
128 } | |
129 read_state_ = ReadState::PAYLOAD; | |
130 return net::OK; | |
131 } | |
132 | |
133 int StreamPacketReader::DoReadPayload(int result) { | |
134 DCHECK_EQ(ReadState::PAYLOAD, read_state_); | |
135 DCHECK_GE(result, 0); | |
136 | |
137 payload_buffer_->set_offset(payload_buffer_->offset() + result); | |
138 if (static_cast<size_t>(payload_buffer_->offset()) < payload_size_) { | |
139 return DoRead(payload_buffer_.get(), | |
140 payload_size_ - payload_buffer_->offset()); | |
141 } | |
142 BlimpStats::GetInstance()->Add(BlimpStats::BYTES_RECEIVED, payload_size_); | |
143 | |
144 // Finished reading the payload. | |
145 read_state_ = ReadState::IDLE; | |
146 payload_buffer_->set_offset(0); | |
147 return payload_size_; | |
148 } | |
149 | |
150 void StreamPacketReader::OnReadComplete(int result) { | |
151 DCHECK_NE(net::ERR_IO_PENDING, result); | |
152 | |
153 if (result == 0 /* EOF */) { | |
154 payload_buffer_ = nullptr; | |
155 base::ResetAndReturn(&callback_).Run(net::ERR_CONNECTION_CLOSED); | |
156 return; | |
157 } | |
158 | |
159 // If the read was successful, then process the result. | |
160 if (result > 0) { | |
161 result = DoReadLoop(result); | |
162 } | |
163 | |
164 // If all reading completed, either successfully or by error, inform the | |
165 // caller. | |
166 if (result != net::ERR_IO_PENDING) { | |
167 payload_buffer_ = nullptr; | |
168 base::ResetAndReturn(&callback_).Run(result); | |
169 } | |
170 } | |
171 | |
172 int StreamPacketReader::DoRead(net::IOBuffer* buf, int buf_len) { | |
173 int result = socket_->Read(buf, buf_len, | |
174 base::Bind(&StreamPacketReader::OnReadComplete, | |
175 weak_factory_.GetWeakPtr())); | |
176 return (result != 0 ? result : net::ERR_CONNECTION_CLOSED); | |
177 } | |
178 | |
179 } // namespace blimp | |
OLD | NEW |