Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(118)

Side by Side Diff: blimp/net/tcp_connection.cc

Issue 2439403003: Refactor BlimpConnection to TCPConnection (Closed)
Patch Set: Added missing Engine Transport Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "blimp/net/blimp_connection.h" 5 #include "blimp/net/tcp_connection.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/callback_helpers.h" 9 #include "base/callback_helpers.h"
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "base/macros.h" 11 #include "base/macros.h"
12 #include "base/memory/weak_ptr.h" 12 #include "base/memory/weak_ptr.h"
13 #include "base/message_loop/message_loop.h" 13 #include "base/message_loop/message_loop.h"
14 #include "blimp/common/create_blimp_message.h" 14 #include "blimp/common/create_blimp_message.h"
15 #include "blimp/common/logging.h" 15 #include "blimp/common/logging.h"
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
62 BlimpMessageSender::~BlimpMessageSender() { 62 BlimpMessageSender::~BlimpMessageSender() {
63 DVLOG(1) << "BlimpMessageSender destroyed."; 63 DVLOG(1) << "BlimpMessageSender destroyed.";
64 } 64 }
65 65
66 void BlimpMessageSender::ProcessMessage( 66 void BlimpMessageSender::ProcessMessage(
67 std::unique_ptr<BlimpMessage> message, 67 std::unique_ptr<BlimpMessage> message,
68 const net::CompletionCallback& callback) { 68 const net::CompletionCallback& callback) {
69 DCHECK(error_observer_); 69 DCHECK(error_observer_);
70 VLOG(1) << "Sending " << *message; 70 VLOG(1) << "Sending " << *message;
71 71
72 if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) { 72 const int msg_byte_size = message->ByteSize();
Wez 2016/11/09 22:47:17 nit: Per style-guide, this should be message_bytes
73 if (msg_byte_size > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
73 DLOG(ERROR) << "Message rejected (too large): " << *message; 74 DLOG(ERROR) << "Message rejected (too large): " << *message;
74 callback.Run(net::ERR_MSG_TOO_BIG); 75 callback.Run(net::ERR_MSG_TOO_BIG);
75 return; 76 return;
76 } 77 }
77 if (!message->SerializeToArray(buffer_->data(), message->GetCachedSize())) { 78 if (!message->SerializeToArray(buffer_->data(), msg_byte_size)) {
78 DLOG(ERROR) << "Failed to serialize message."; 79 DLOG(ERROR) << "Failed to serialize message.";
79 callback.Run(net::ERR_INVALID_ARGUMENT); 80 callback.Run(net::ERR_INVALID_ARGUMENT);
80 return; 81 return;
81 } 82 }
82 83
83 // Check that no other message writes are in-flight at this time. 84 // Check that no other message writes are in-flight at this time.
84 DCHECK(pending_process_msg_callback_.is_null()); 85 DCHECK(pending_process_msg_callback_.is_null());
85 pending_process_msg_callback_ = callback; 86 pending_process_msg_callback_ = callback;
86 87
87 writer_->WritePacket( 88 writer_->WritePacket(
88 scoped_refptr<net::DrainableIOBuffer>( 89 scoped_refptr<net::DrainableIOBuffer>(
89 new net::DrainableIOBuffer(buffer_.get(), message->ByteSize())), 90 new net::DrainableIOBuffer(buffer_.get(), msg_byte_size)),
90 base::Bind(&BlimpMessageSender::OnWritePacketComplete, 91 base::Bind(&BlimpMessageSender::OnWritePacketComplete,
91 weak_factory_.GetWeakPtr())); 92 weak_factory_.GetWeakPtr()));
92 } 93 }
93 94
94 void BlimpMessageSender::OnWritePacketComplete(int result) { 95 void BlimpMessageSender::OnWritePacketComplete(int result) {
95 DVLOG(2) << "OnWritePacketComplete, result=" << result; 96 DVLOG(2) << "OnWritePacketComplete, result=" << result;
96 DCHECK_NE(net::ERR_IO_PENDING, result); 97 DCHECK_NE(net::ERR_IO_PENDING, result);
97 98
98 // Create a stack-local copy of |pending_process_msg_callback_|, in case an 99 // Create a stack-local copy of |pending_process_msg_callback_|, in case an
99 // observer deletes |this|. 100 // observer deletes |this|.
100 net::CompletionCallback process_callback = 101 net::CompletionCallback process_callback =
101 base::ResetAndReturn(&pending_process_msg_callback_); 102 base::ResetAndReturn(&pending_process_msg_callback_);
102 103
103 if (result != net::OK) { 104 if (result != net::OK) {
104 error_observer_->OnConnectionError(result); 105 error_observer_->OnConnectionError(result);
105 } 106 }
106 107
107 process_callback.Run(result); 108 process_callback.Run(result);
108 } 109 }
109 110
110 // MessageProcessor filter used to route EndConnection messages through to 111 TCPConnection::TCPConnection(std::unique_ptr<MessagePort> message_port)
111 // OnConnectionError notifications on the owning BlimpConnection. 112 : BlimpConnection(),
112 class BlimpConnection::EndConnectionFilter : public BlimpMessageProcessor { 113 message_port_(std::move(message_port)),
113 public:
114 explicit EndConnectionFilter(BlimpConnection* connection);
115
116 void set_message_handler(BlimpMessageProcessor* message_handler) {
117 message_handler_ = message_handler;
118 }
119
120 // BlimpMessageProcessor implementation.
121 void ProcessMessage(std::unique_ptr<BlimpMessage> message,
122 const net::CompletionCallback& callback) override;
123
124 private:
125 // Owning BlimpConnection, on which to call OnConnectionError.
126 BlimpConnection* connection_;
127
128 // Caller-provided message handler to forward non-EndConnection messages to.
129 BlimpMessageProcessor* message_handler_;
130
131 DISALLOW_COPY_AND_ASSIGN(EndConnectionFilter);
132 };
133
134 BlimpConnection::EndConnectionFilter::EndConnectionFilter(
135 BlimpConnection* connection)
136 : connection_(connection), message_handler_(nullptr) {}
137
138 void BlimpConnection::EndConnectionFilter::ProcessMessage(
139 std::unique_ptr<BlimpMessage> message,
140 const net::CompletionCallback& callback) {
141 if (message->has_protocol_control() &&
142 message->protocol_control().has_end_connection()) {
143 // Report the EndConnection reason to connection error observers.
144 connection_->OnConnectionError(
145 message->protocol_control().end_connection().reason());
146
147 // Caller must ensure |callback| safe to call after OnConnectionError.
148 callback.Run(message->protocol_control().end_connection().reason());
149 return;
150 }
151
152 message_handler_->ProcessMessage(std::move(message), callback);
153 }
154
155 BlimpConnection::BlimpConnection(std::unique_ptr<MessagePort> message_port)
156 : message_port_(std::move(message_port)),
157 message_pump_(new BlimpMessagePump(message_port_->reader())), 114 message_pump_(new BlimpMessagePump(message_port_->reader())),
158 outgoing_msg_processor_(new BlimpMessageSender(message_port_->writer())), 115 outgoing_msg_processor_(new BlimpMessageSender(message_port_->writer())) {
159 end_connection_filter_(new EndConnectionFilter(this)) {
160 message_pump_->set_error_observer(this); 116 message_pump_->set_error_observer(this);
161 outgoing_msg_processor_->set_error_observer(this); 117 outgoing_msg_processor_->set_error_observer(this);
162 } 118 }
163 119
164 BlimpConnection::BlimpConnection() {} 120 TCPConnection::~TCPConnection() {
165 121 VLOG(1) << "TCPConnection destroyed.";
166 BlimpConnection::~BlimpConnection() {
167 VLOG(1) << "BlimpConnection destroyed.";
168 } 122 }
169 123
170 void BlimpConnection::AddConnectionErrorObserver( 124 void TCPConnection::SetIncomingMessageProcessor(
171 ConnectionErrorObserver* observer) { 125 BlimpMessageProcessor* processor) {
172 error_observers_.AddObserver(observer); 126 AddEndConnectionProcessor(processor);
127 message_pump_->SetMessageProcessor(
128 (processor != nullptr) ? GetEndConnectionProcessor() : nullptr);
Wez 2016/11/09 22:47:17 nit: You don't need the != nullptr here.
173 } 129 }
174 130
175 void BlimpConnection::RemoveConnectionErrorObserver( 131 BlimpMessageProcessor* TCPConnection::GetOutgoingMessageProcessor() {
176 ConnectionErrorObserver* observer) {
177 error_observers_.RemoveObserver(observer);
178 }
179
180 void BlimpConnection::SetIncomingMessageProcessor(
181 BlimpMessageProcessor* processor) {
182 end_connection_filter_->set_message_handler(processor);
183 message_pump_->SetMessageProcessor(processor ? end_connection_filter_.get()
184 : nullptr);
185 }
186
187 BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() {
188 return outgoing_msg_processor_.get(); 132 return outgoing_msg_processor_.get();
189 } 133 }
190 134
191 void BlimpConnection::OnConnectionError(int error) {
192 VLOG(1) << "OnConnectionError, error=" << error;
193
194 // Propagate the error to all observers.
195 for (auto& observer : error_observers_)
196 observer.OnConnectionError(error);
197 }
198
199 } // namespace blimp 135 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698