Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(516)

Side by Side Diff: blimp/net/stream_packet_reader.cc

Issue 1452823011: Make PacketReader/PacketWriter interfaces async-only. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Address wez feedback Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "blimp/net/stream_packet_reader.h" 5 #include "blimp/net/stream_packet_reader.h"
6 6
7 #include <iostream> 7 #include <iostream>
8 8
9 #include "base/callback_helpers.h" 9 #include "base/callback_helpers.h"
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "base/memory/weak_ptr.h" 11 #include "base/memory/weak_ptr.h"
12 #include "base/message_loop/message_loop.h"
12 #include "base/sys_byteorder.h" 13 #include "base/sys_byteorder.h"
13 #include "blimp/net/common.h" 14 #include "blimp/net/common.h"
14 #include "net/base/io_buffer.h" 15 #include "net/base/io_buffer.h"
15 #include "net/base/net_errors.h" 16 #include "net/base/net_errors.h"
16 #include "net/socket/stream_socket.h" 17 #include "net/socket/stream_socket.h"
17 18
18 namespace blimp { 19 namespace blimp {
19 20
20 std::ostream& operator<<(std::ostream& out, 21 std::ostream& operator<<(std::ostream& out,
21 const StreamPacketReader::ReadState state) { 22 const StreamPacketReader::ReadState state) {
(...skipping 13 matching lines...) Expand all
35 36
36 StreamPacketReader::StreamPacketReader(net::StreamSocket* socket) 37 StreamPacketReader::StreamPacketReader(net::StreamSocket* socket)
37 : read_state_(ReadState::IDLE), socket_(socket), weak_factory_(this) { 38 : read_state_(ReadState::IDLE), socket_(socket), weak_factory_(this) {
38 DCHECK(socket_); 39 DCHECK(socket_);
39 header_buffer_ = new net::GrowableIOBuffer; 40 header_buffer_ = new net::GrowableIOBuffer;
40 header_buffer_->SetCapacity(kPacketHeaderSizeBytes); 41 header_buffer_->SetCapacity(kPacketHeaderSizeBytes);
41 } 42 }
42 43
43 StreamPacketReader::~StreamPacketReader() {} 44 StreamPacketReader::~StreamPacketReader() {}
44 45
45 int StreamPacketReader::ReadPacket( 46 void StreamPacketReader::ReadPacket(
46 const scoped_refptr<net::GrowableIOBuffer>& buf, 47 const scoped_refptr<net::GrowableIOBuffer>& buf,
47 const net::CompletionCallback& callback) { 48 const net::CompletionCallback& callback) {
48 DCHECK_EQ(ReadState::IDLE, read_state_); 49 DCHECK_EQ(ReadState::IDLE, read_state_);
49 DCHECK_GT(buf->capacity(), 0); 50 DCHECK_GT(buf->capacity(), 0);
50 51
51 header_buffer_->set_offset(0); 52 header_buffer_->set_offset(0);
52 payload_buffer_ = buf; 53 payload_buffer_ = buf;
53 payload_buffer_->set_offset(0); 54 payload_buffer_->set_offset(0);
54 read_state_ = ReadState::HEADER; 55 read_state_ = ReadState::HEADER;
55 56
56 int result = DoReadLoop(net::OK); 57 int result = DoReadLoop(net::OK);
57 if (result == net::ERR_IO_PENDING) { 58 if (result != net::ERR_IO_PENDING) {
58 // Store the completion callback to invoke when read completes
59 // asynchronously.
60 callback_ = callback;
61 } else {
62 // Release the payload buffer, since the read operation has completed 59 // Release the payload buffer, since the read operation has completed
63 // synchronously. 60 // synchronously.
64 payload_buffer_ = nullptr; 61 payload_buffer_ = nullptr;
62
63 // Adapt synchronous completion to an asynchronous style.
64 base::MessageLoop::current()->PostTask(FROM_HERE,
65 base::Bind(callback, result));
66 } else {
67 callback_ = callback;
65 } 68 }
66
67 return result;
68 } 69 }
69 70
70 int StreamPacketReader::DoReadLoop(int result) { 71 int StreamPacketReader::DoReadLoop(int result) {
71 DCHECK_NE(net::ERR_IO_PENDING, result); 72 DCHECK_NE(net::ERR_IO_PENDING, result);
72 DCHECK_GE(result, 0); 73 DCHECK_GE(result, 0);
73 DCHECK_NE(ReadState::IDLE, read_state_); 74 DCHECK_NE(ReadState::IDLE, read_state_);
74 75
75 while (result >= 0 && read_state_ != ReadState::IDLE) { 76 while (result >= 0 && read_state_ != ReadState::IDLE) {
76 VLOG(2) << "DoReadLoop (state=" << read_state_ << ", result=" << result 77 VLOG(2) << "DoReadLoop (state=" << read_state_ << ", result=" << result
77 << ")"; 78 << ")";
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
127 payload_buffer_->set_offset(payload_buffer_->offset() + result); 128 payload_buffer_->set_offset(payload_buffer_->offset() + result);
128 if (static_cast<size_t>(payload_buffer_->offset()) < payload_size_) { 129 if (static_cast<size_t>(payload_buffer_->offset()) < payload_size_) {
129 return socket_->Read(payload_buffer_.get(), 130 return socket_->Read(payload_buffer_.get(),
130 payload_size_ - payload_buffer_->offset(), 131 payload_size_ - payload_buffer_->offset(),
131 base::Bind(&StreamPacketReader::OnReadComplete, 132 base::Bind(&StreamPacketReader::OnReadComplete,
132 weak_factory_.GetWeakPtr())); 133 weak_factory_.GetWeakPtr()));
133 } 134 }
134 135
135 // Finished reading the payload. 136 // Finished reading the payload.
136 read_state_ = ReadState::IDLE; 137 read_state_ = ReadState::IDLE;
137 return payload_size_; 138 return net::OK;
138 } 139 }
139 140
140 void StreamPacketReader::OnReadComplete(int result) { 141 void StreamPacketReader::OnReadComplete(int result) {
141 DCHECK_NE(net::ERR_IO_PENDING, result); 142 DCHECK_NE(net::ERR_IO_PENDING, result);
142 143
143 // If the read was succesful, then process the result. 144 // If the read was succesful, then process the result.
144 if (result > 0) { 145 if (result > 0) {
145 result = DoReadLoop(result); 146 result = DoReadLoop(result);
146 } 147 }
147 148
148 // If all reading completed, either successfully or by error, inform the 149 // If all reading completed, either successfully or by error, inform the
149 // caller. 150 // caller.
150 if (result != net::ERR_IO_PENDING) { 151 if (result != net::ERR_IO_PENDING) {
151 payload_buffer_ = nullptr; 152 payload_buffer_ = nullptr;
152 base::ResetAndReturn(&callback_).Run(result); 153 base::ResetAndReturn(&callback_).Run(result);
153 } 154 }
154 } 155 }
155 156
156 } // namespace blimp 157 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698