Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(99)

Side by Side Diff: blimp/net/blimp_connection.cc

Issue 2439403003: Refactor BlimpConnection to TCPConnection (Closed)
Patch Set: Added missing Engine Transport Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "blimp/net/blimp_connection.h" 5 #include "blimp/net/blimp_connection.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/callback_helpers.h" 9 #include "base/callback_helpers.h"
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "base/macros.h" 11 #include "base/macros.h"
12 #include "base/memory/weak_ptr.h" 12 #include "base/memory/weak_ptr.h"
13 #include "base/message_loop/message_loop.h" 13 #include "base/message_loop/message_loop.h"
14 #include "blimp/common/create_blimp_message.h" 14 #include "blimp/common/create_blimp_message.h"
15 #include "blimp/common/logging.h" 15 #include "blimp/common/logging.h"
16 #include "blimp/common/proto/blimp_message.pb.h" 16 #include "blimp/common/proto/blimp_message.pb.h"
17 #include "blimp/net/blimp_message_processor.h" 17 #include "blimp/net/blimp_message_processor.h"
18 #include "blimp/net/blimp_message_pump.h" 18 #include "blimp/net/blimp_message_pump.h"
19 #include "blimp/net/common.h" 19 #include "blimp/net/common.h"
20 #include "blimp/net/connection_error_observer.h" 20 #include "blimp/net/connection_error_observer.h"
21 #include "blimp/net/message_port.h" 21 #include "blimp/net/message_port.h"
22 #include "blimp/net/packet_writer.h" 22 #include "blimp/net/packet_writer.h"
23 #include "net/base/completion_callback.h" 23 #include "net/base/completion_callback.h"
24 24
25 namespace blimp { 25 namespace blimp {
26 26
27 // Forwards incoming blimp messages to PacketWriter.
28 class BlimpMessageSender : public BlimpMessageProcessor {
29 public:
30 explicit BlimpMessageSender(PacketWriter* writer);
31 ~BlimpMessageSender() override;
32
33 void set_error_observer(ConnectionErrorObserver* observer) {
34 error_observer_ = observer;
35 }
36
37 // BlimpMessageProcessor implementation.
38 // |callback| receives net::OK on write success, or receives an error code
39 // otherwise.
40 void ProcessMessage(std::unique_ptr<BlimpMessage> message,
41 const net::CompletionCallback& callback) override;
42
43 private:
44 void OnWritePacketComplete(int result);
45
46 PacketWriter* writer_;
47 ConnectionErrorObserver* error_observer_ = nullptr;
48 scoped_refptr<net::IOBuffer> buffer_;
49 net::CompletionCallback pending_process_msg_callback_;
50 base::WeakPtrFactory<BlimpMessageSender> weak_factory_;
51
52 DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
53 };
54
55 BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
56 : writer_(writer),
57 buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)),
58 weak_factory_(this) {
59 DCHECK(writer_);
60 }
61
62 BlimpMessageSender::~BlimpMessageSender() {
63 DVLOG(1) << "BlimpMessageSender destroyed.";
64 }
65
66 void BlimpMessageSender::ProcessMessage(
67 std::unique_ptr<BlimpMessage> message,
68 const net::CompletionCallback& callback) {
69 DCHECK(error_observer_);
70 VLOG(1) << "Sending " << *message;
71
72 if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
73 DLOG(ERROR) << "Message rejected (too large): " << *message;
74 callback.Run(net::ERR_MSG_TOO_BIG);
75 return;
76 }
77 if (!message->SerializeToArray(buffer_->data(), message->GetCachedSize())) {
78 DLOG(ERROR) << "Failed to serialize message.";
79 callback.Run(net::ERR_INVALID_ARGUMENT);
80 return;
81 }
82
83 // Check that no other message writes are in-flight at this time.
84 DCHECK(pending_process_msg_callback_.is_null());
85 pending_process_msg_callback_ = callback;
86
87 writer_->WritePacket(
88 scoped_refptr<net::DrainableIOBuffer>(
89 new net::DrainableIOBuffer(buffer_.get(), message->ByteSize())),
90 base::Bind(&BlimpMessageSender::OnWritePacketComplete,
91 weak_factory_.GetWeakPtr()));
92 }
93
94 void BlimpMessageSender::OnWritePacketComplete(int result) {
95 DVLOG(2) << "OnWritePacketComplete, result=" << result;
96 DCHECK_NE(net::ERR_IO_PENDING, result);
97
98 // Create a stack-local copy of |pending_process_msg_callback_|, in case an
99 // observer deletes |this|.
100 net::CompletionCallback process_callback =
101 base::ResetAndReturn(&pending_process_msg_callback_);
102
103 if (result != net::OK) {
104 error_observer_->OnConnectionError(result);
105 }
106
107 process_callback.Run(result);
108 }
109
110 // MessageProcessor filter used to route EndConnection messages through to 27 // MessageProcessor filter used to route EndConnection messages through to
111 // OnConnectionError notifications on the owning BlimpConnection. 28 // OnConnectionError notifications on the owning BlimpConnection.
112 class BlimpConnection::EndConnectionFilter : public BlimpMessageProcessor { 29 class BlimpConnection::EndConnectionFilter : public BlimpMessageProcessor {
113 public: 30 public:
114 explicit EndConnectionFilter(BlimpConnection* connection); 31 explicit EndConnectionFilter(BlimpConnection* connection);
115 32
116 void set_message_handler(BlimpMessageProcessor* message_handler) { 33 void set_message_handler(BlimpMessageProcessor* message_handler) {
117 message_handler_ = message_handler; 34 message_handler_ = message_handler;
118 } 35 }
119 36
(...skipping 25 matching lines...) Expand all
145 message->protocol_control().end_connection().reason()); 62 message->protocol_control().end_connection().reason());
146 63
147 // Caller must ensure |callback| safe to call after OnConnectionError. 64 // Caller must ensure |callback| safe to call after OnConnectionError.
148 callback.Run(message->protocol_control().end_connection().reason()); 65 callback.Run(message->protocol_control().end_connection().reason());
149 return; 66 return;
150 } 67 }
151 68
152 message_handler_->ProcessMessage(std::move(message), callback); 69 message_handler_->ProcessMessage(std::move(message), callback);
153 } 70 }
154 71
155 BlimpConnection::BlimpConnection(std::unique_ptr<MessagePort> message_port) 72 BlimpConnection::BlimpConnection()
156 : message_port_(std::move(message_port)), 73 : end_connection_filter_(new EndConnectionFilter(this)) {}
157 message_pump_(new BlimpMessagePump(message_port_->reader())),
158 outgoing_msg_processor_(new BlimpMessageSender(message_port_->writer())),
159 end_connection_filter_(new EndConnectionFilter(this)) {
160 message_pump_->set_error_observer(this);
161 outgoing_msg_processor_->set_error_observer(this);
162 }
163
164 BlimpConnection::BlimpConnection() {}
165 74
166 BlimpConnection::~BlimpConnection() { 75 BlimpConnection::~BlimpConnection() {
167 VLOG(1) << "BlimpConnection destroyed."; 76 VLOG(1) << "BlimpConnection destroyed.";
168 } 77 }
169 78
170 void BlimpConnection::AddConnectionErrorObserver( 79 void BlimpConnection::AddConnectionErrorObserver(
171 ConnectionErrorObserver* observer) { 80 ConnectionErrorObserver* observer) {
172 error_observers_.AddObserver(observer); 81 error_observers_.AddObserver(observer);
173 } 82 }
174 83
175 void BlimpConnection::RemoveConnectionErrorObserver( 84 void BlimpConnection::RemoveConnectionErrorObserver(
176 ConnectionErrorObserver* observer) { 85 ConnectionErrorObserver* observer) {
177 error_observers_.RemoveObserver(observer); 86 error_observers_.RemoveObserver(observer);
178 } 87 }
179 88
180 void BlimpConnection::SetIncomingMessageProcessor( 89 void BlimpConnection::AddEndConnectionProcessor(
181 BlimpMessageProcessor* processor) { 90 BlimpMessageProcessor* processor) {
182 end_connection_filter_->set_message_handler(processor); 91 end_connection_filter_->set_message_handler(processor);
183 message_pump_->SetMessageProcessor(processor ? end_connection_filter_.get()
184 : nullptr);
185 } 92 }
186 93
187 BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() { 94 BlimpMessageProcessor* BlimpConnection::GetEndConnectionProcessor() const {
188 return outgoing_msg_processor_.get(); 95 return end_connection_filter_.get();
Wez 2016/11/09 22:47:17 This isn't symmetric with AddEndConnectionProcesso
189 } 96 }
190 97
191 void BlimpConnection::OnConnectionError(int error) { 98 void BlimpConnection::OnConnectionError(int error) {
192 VLOG(1) << "OnConnectionError, error=" << error; 99 VLOG(1) << "OnConnectionError, error=" << error;
193 100
194 // Propagate the error to all observers. 101 // Propagate the error to all observers.
195 for (auto& observer : error_observers_) 102 for (auto& observer : error_observers_) {
196 observer.OnConnectionError(error); 103 observer.OnConnectionError(error);
104 }
197 } 105 }
198 106
199 } // namespace blimp 107 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698