Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: blimp/net/stream_packet_reader.cc

Issue 2632803002: Remove all blimp network code. (Closed)
Patch Set: merge from origin/master for good measure Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « blimp/net/stream_packet_reader.h ('k') | blimp/net/stream_packet_reader_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "blimp/net/stream_packet_reader.h"
6
7 #include <iostream>
8
9 #include "base/callback_helpers.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/memory/weak_ptr.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/sys_byteorder.h"
15 #include "base/threading/thread_task_runner_handle.h"
16 #include "blimp/net/blimp_stats.h"
17 #include "blimp/net/common.h"
18 #include "net/base/io_buffer.h"
19 #include "net/base/net_errors.h"
20 #include "net/socket/stream_socket.h"
21
22 namespace blimp {
23
24 std::ostream& operator<<(std::ostream& out,
25 const StreamPacketReader::ReadState state) {
26 switch (state) {
27 case StreamPacketReader::ReadState::HEADER:
28 out << "HEADER";
29 break;
30 case StreamPacketReader::ReadState::PAYLOAD:
31 out << "PAYLOAD";
32 break;
33 case StreamPacketReader::ReadState::IDLE:
34 out << "IDLE";
35 break;
36 }
37 return out;
38 }
39
40 StreamPacketReader::StreamPacketReader(net::StreamSocket* socket)
41 : read_state_(ReadState::IDLE),
42 socket_(socket),
43 payload_size_(0),
44 weak_factory_(this) {
45 DCHECK(socket_);
46 header_buffer_ = new net::GrowableIOBuffer;
47 header_buffer_->SetCapacity(kPacketHeaderSizeBytes);
48 }
49
50 StreamPacketReader::~StreamPacketReader() {}
51
52 void StreamPacketReader::ReadPacket(
53 const scoped_refptr<net::GrowableIOBuffer>& buf,
54 const net::CompletionCallback& callback) {
55 DCHECK_EQ(ReadState::IDLE, read_state_);
56 if (static_cast<size_t>(buf->capacity()) < kPacketHeaderSizeBytes) {
57 buf->SetCapacity(kPacketHeaderSizeBytes);
58 }
59
60 header_buffer_->set_offset(0);
61 payload_buffer_ = buf;
62 payload_buffer_->set_offset(0);
63 read_state_ = ReadState::HEADER;
64
65 int result = DoReadLoop(net::OK);
66 if (result != net::ERR_IO_PENDING) {
67 // Release the payload buffer, since the read operation has completed
68 // synchronously.
69 payload_buffer_ = nullptr;
70
71 // Adapt synchronous completion to an asynchronous style.
72 base::ThreadTaskRunnerHandle::Get()->PostTask(
73 FROM_HERE,
74 base::Bind(callback, result == net::OK ? payload_size_ : result));
75 } else {
76 callback_ = callback;
77 }
78 }
79
80 int StreamPacketReader::DoReadLoop(int result) {
81 DCHECK_NE(net::ERR_IO_PENDING, result);
82 DCHECK_GE(result, 0);
83 DCHECK_NE(ReadState::IDLE, read_state_);
84
85 while (result >= 0 && read_state_ != ReadState::IDLE) {
86 VLOG(2) << "DoReadLoop (state=" << read_state_ << ", result=" << result
87 << ")";
88
89 switch (read_state_) {
90 case ReadState::HEADER:
91 result = DoReadHeader(result);
92 break;
93 case ReadState::PAYLOAD:
94 result = DoReadPayload(result);
95 break;
96 case ReadState::IDLE:
97 NOTREACHED();
98 result = net::ERR_UNEXPECTED;
99 break;
100 }
101 }
102
103 return result;
104 }
105
106 int StreamPacketReader::DoReadHeader(int result) {
107 DCHECK_EQ(ReadState::HEADER, read_state_);
108 DCHECK_GT(kPacketHeaderSizeBytes,
109 static_cast<size_t>(header_buffer_->offset()));
110 DCHECK_GE(result, 0);
111
112 header_buffer_->set_offset(header_buffer_->offset() + result);
113 if (static_cast<size_t>(header_buffer_->offset()) < kPacketHeaderSizeBytes) {
114 // There is more header to read.
115 return DoRead(header_buffer_.get(),
116 kPacketHeaderSizeBytes - header_buffer_->offset());
117 }
118
119 // Finished reading the header. Parse the size and prepare for payload read.
120 payload_size_ = base::NetToHost32(
121 *reinterpret_cast<uint32_t*>(header_buffer_->StartOfBuffer()));
122 if (payload_size_ == 0 || payload_size_ > kMaxPacketPayloadSizeBytes) {
123 DLOG(ERROR) << "Illegal payload size: " << payload_size_;
124 return net::ERR_INVALID_RESPONSE;
125 }
126 if (static_cast<size_t>(payload_buffer_->capacity()) < payload_size_) {
127 payload_buffer_->SetCapacity(payload_size_);
128 }
129 read_state_ = ReadState::PAYLOAD;
130 return net::OK;
131 }
132
133 int StreamPacketReader::DoReadPayload(int result) {
134 DCHECK_EQ(ReadState::PAYLOAD, read_state_);
135 DCHECK_GE(result, 0);
136
137 payload_buffer_->set_offset(payload_buffer_->offset() + result);
138 if (static_cast<size_t>(payload_buffer_->offset()) < payload_size_) {
139 return DoRead(payload_buffer_.get(),
140 payload_size_ - payload_buffer_->offset());
141 }
142 BlimpStats::GetInstance()->Add(BlimpStats::BYTES_RECEIVED, payload_size_);
143
144 // Finished reading the payload.
145 read_state_ = ReadState::IDLE;
146 payload_buffer_->set_offset(0);
147 return payload_size_;
148 }
149
150 void StreamPacketReader::OnReadComplete(int result) {
151 DCHECK_NE(net::ERR_IO_PENDING, result);
152
153 if (result == 0 /* EOF */) {
154 payload_buffer_ = nullptr;
155 base::ResetAndReturn(&callback_).Run(net::ERR_CONNECTION_CLOSED);
156 return;
157 }
158
159 // If the read was successful, then process the result.
160 if (result > 0) {
161 result = DoReadLoop(result);
162 }
163
164 // If all reading completed, either successfully or by error, inform the
165 // caller.
166 if (result != net::ERR_IO_PENDING) {
167 payload_buffer_ = nullptr;
168 base::ResetAndReturn(&callback_).Run(result);
169 }
170 }
171
172 int StreamPacketReader::DoRead(net::IOBuffer* buf, int buf_len) {
173 int result = socket_->Read(buf, buf_len,
174 base::Bind(&StreamPacketReader::OnReadComplete,
175 weak_factory_.GetWeakPtr()));
176 return (result != 0 ? result : net::ERR_CONNECTION_CLOSED);
177 }
178
179 } // namespace blimp
OLDNEW
« no previous file with comments | « blimp/net/stream_packet_reader.h ('k') | blimp/net/stream_packet_reader_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698