Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(811)

Side by Side Diff: blimp/net/blimp_connection.cc

Issue 1551583003: Implementation and fixes for Blimp client/engine E2E communication. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@dtrainor-linux-cl1528243002
Patch Set: Addressed haibin's feedback, made an ObserverList for ConnectionErrorObserver Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "blimp/net/blimp_connection.h" 5 #include "blimp/net/blimp_connection.h"
6 6
7 #include "base/callback_helpers.h" 7 #include "base/callback_helpers.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/macros.h" 9 #include "base/macros.h"
10 #include "base/memory/weak_ptr.h"
10 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop.h"
11 #include "blimp/common/proto/blimp_message.pb.h" 12 #include "blimp/common/proto/blimp_message.pb.h"
12 #include "blimp/net/blimp_message_processor.h" 13 #include "blimp/net/blimp_message_processor.h"
13 #include "blimp/net/blimp_message_pump.h" 14 #include "blimp/net/blimp_message_pump.h"
14 #include "blimp/net/common.h" 15 #include "blimp/net/common.h"
15 #include "blimp/net/connection_error_observer.h" 16 #include "blimp/net/connection_error_observer.h"
16 #include "blimp/net/packet_reader.h" 17 #include "blimp/net/packet_reader.h"
17 #include "blimp/net/packet_writer.h" 18 #include "blimp/net/packet_writer.h"
18 #include "net/base/completion_callback.h" 19 #include "net/base/completion_callback.h"
19 20
(...skipping 11 matching lines...) Expand all
31 } 32 }
32 33
33 // BlimpMessageProcessor implementation. 34 // BlimpMessageProcessor implementation.
34 void ProcessMessage(scoped_ptr<BlimpMessage> message, 35 void ProcessMessage(scoped_ptr<BlimpMessage> message,
35 const net::CompletionCallback& callback) override; 36 const net::CompletionCallback& callback) override;
36 37
37 private: 38 private:
38 void OnWritePacketComplete(int result); 39 void OnWritePacketComplete(int result);
39 40
40 PacketWriter* writer_; 41 PacketWriter* writer_;
41 ConnectionErrorObserver* error_observer_; 42 ConnectionErrorObserver* error_observer_ = nullptr;
42 scoped_refptr<net::IOBuffer> buffer_; 43 scoped_refptr<net::IOBuffer> buffer_;
43 net::CompletionCallback pending_process_msg_callback_; 44 net::CompletionCallback pending_process_msg_callback_;
45 base::WeakPtrFactory<BlimpMessageSender> weak_factory_;
44 46
45 DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender); 47 DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
46 }; 48 };
47 49
48 BlimpMessageSender::BlimpMessageSender(PacketWriter* writer) 50 BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
49 : writer_(writer), buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)) { 51 : writer_(writer),
52 buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)),
53 weak_factory_(this) {
50 DCHECK(writer_); 54 DCHECK(writer_);
51 } 55 }
52 56
53 BlimpMessageSender::~BlimpMessageSender() {} 57 BlimpMessageSender::~BlimpMessageSender() {
58 DVLOG(1) << "BlimpMessageSender destroyed.";
59 }
54 60
55 void BlimpMessageSender::ProcessMessage( 61 void BlimpMessageSender::ProcessMessage(
56 scoped_ptr<BlimpMessage> message, 62 scoped_ptr<BlimpMessage> message,
57 const net::CompletionCallback& callback) { 63 const net::CompletionCallback& callback) {
64 DCHECK(error_observer_);
65 DVLOG(2) << "Sender::ProcessMessage " << *message;
66
58 if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) { 67 if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
59 DLOG(ERROR) << "Message is too big, size=" << message->ByteSize(); 68 DLOG(ERROR) << "Message rejected (too large): " << *message;
haibinlu 2016/01/04 19:45:45 this generates big logs. the size of the message i
Kevin M 2016/01/04 20:42:12 I think that knowing the message type would also b
60 callback.Run(net::ERR_MSG_TOO_BIG); 69 callback.Run(net::ERR_MSG_TOO_BIG);
61 return; 70 return;
62 } 71 }
63 72
64 if (!message->SerializeToArray(buffer_->data(), message->ByteSize())) { 73 scoped_refptr<net::DrainableIOBuffer> drainable_buffer(
74 new net::DrainableIOBuffer(buffer_.get(), message->ByteSize()));
75 if (!message->SerializeToArray(drainable_buffer->data(),
76 message->GetCachedSize())) {
65 DLOG(ERROR) << "Failed to serialize message."; 77 DLOG(ERROR) << "Failed to serialize message.";
66 callback.Run(net::ERR_INVALID_ARGUMENT); 78 callback.Run(net::ERR_INVALID_ARGUMENT);
67 return; 79 return;
68 } 80 }
69 81
82 // Check that no other message writes are in-flight at this time.
83 DCHECK(pending_process_msg_callback_.is_null());
70 pending_process_msg_callback_ = callback; 84 pending_process_msg_callback_ = callback;
71 writer_->WritePacket( 85
72 new net::DrainableIOBuffer(buffer_.get(), message->ByteSize()), 86 writer_->WritePacket(drainable_buffer,
haibinlu 2016/01/04 19:45:45 why not create DrainableIOBuffer here?
Kevin M 2016/01/04 20:42:13 Good point! Done.
73 base::Bind(&BlimpMessageSender::OnWritePacketComplete, 87 base::Bind(&BlimpMessageSender::OnWritePacketComplete,
74 base::Unretained(this))); 88 weak_factory_.GetWeakPtr()));
75 } 89 }
76 90
77 void BlimpMessageSender::OnWritePacketComplete(int result) { 91 void BlimpMessageSender::OnWritePacketComplete(int result) {
92 DVLOG(2) << "OnWritePacketComplete, result=" << result;
78 DCHECK_NE(net::ERR_IO_PENDING, result); 93 DCHECK_NE(net::ERR_IO_PENDING, result);
79 base::ResetAndReturn(&pending_process_msg_callback_).Run(result); 94 base::ResetAndReturn(&pending_process_msg_callback_).Run(result);
80 if (result != net::OK) { 95 if (result != net::OK) {
81 error_observer_->OnConnectionError(result); 96 error_observer_->OnConnectionError(result);
82 } 97 }
83 } 98 }
84 99
85 } // namespace 100 } // namespace
86 101
87 BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader, 102 BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader,
88 scoped_ptr<PacketWriter> writer) 103 scoped_ptr<PacketWriter> writer)
89 : reader_(std::move(reader)), 104 : reader_(std::move(reader)),
90 message_pump_(new BlimpMessagePump(reader_.get())), 105 message_pump_(new BlimpMessagePump(reader_.get())),
91 writer_(std::move(writer)), 106 writer_(std::move(writer)),
92 outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) { 107 outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) {
93 DCHECK(writer_); 108 DCHECK(writer_);
109
110 // Observe the connection errors received by any of this connection's network
111 // objects.
112 message_pump_->set_error_observer(this);
113 BlimpMessageSender* sender =
114 static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
115 sender->set_error_observer(this);
94 } 116 }
95 117
96 BlimpConnection::BlimpConnection() {} 118 BlimpConnection::BlimpConnection() {}
97 119
98 BlimpConnection::~BlimpConnection() {} 120 BlimpConnection::~BlimpConnection() {
121 DVLOG(1) << "BlimpConnection destroyed.";
122 }
99 123
100 void BlimpConnection::SetConnectionErrorObserver( 124 void BlimpConnection::AddConnectionErrorObserver(
101 ConnectionErrorObserver* observer) { 125 ConnectionErrorObserver* observer) {
102 message_pump_->set_error_observer(observer); 126 error_observers_.AddObserver(observer);
103 BlimpMessageSender* sender = 127 }
104 static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get()); 128
105 sender->set_error_observer(observer); 129 void BlimpConnection::ClearConnectionErrorObservers() {
130 error_observers_.Clear();
131 }
132
133 void BlimpConnection::RemoveConnectionErrorObserver(
134 ConnectionErrorObserver* observer) {
135 error_observers_.RemoveObserver(observer);
106 } 136 }
107 137
108 void BlimpConnection::SetIncomingMessageProcessor( 138 void BlimpConnection::SetIncomingMessageProcessor(
109 BlimpMessageProcessor* processor) { 139 BlimpMessageProcessor* processor) {
110 message_pump_->SetMessageProcessor(processor); 140 message_pump_->SetMessageProcessor(processor);
111 } 141 }
112 142
113 BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() { 143 BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() {
114 return outgoing_msg_processor_.get(); 144 return outgoing_msg_processor_.get();
115 } 145 }
116 146
147 void BlimpConnection::OnConnectionError(int error) {
148 VLOG(1) << "OnConnectionError, error=" << error;
149
150 // Propagate the error to all observers.
151 FOR_EACH_OBSERVER(ConnectionErrorObserver, error_observers_,
152 OnConnectionError(error));
153 }
154
117 } // namespace blimp 155 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698