Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1149)

Side by Side Diff: blimp/net/blimp_connection.cc

Issue 1551583003: Implementation and fixes for Blimp client/engine E2E communication. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@dtrainor-linux-cl1528243002
Patch Set: Fixed misplaced EXPORT directive Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « blimp/net/blimp_connection.h ('k') | blimp/net/blimp_connection_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "blimp/net/blimp_connection.h" 5 #include "blimp/net/blimp_connection.h"
6 6
7 #include "base/callback_helpers.h" 7 #include "base/callback_helpers.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/macros.h" 9 #include "base/macros.h"
10 #include "base/memory/weak_ptr.h"
10 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop.h"
11 #include "blimp/common/proto/blimp_message.pb.h" 12 #include "blimp/common/proto/blimp_message.pb.h"
12 #include "blimp/net/blimp_message_processor.h" 13 #include "blimp/net/blimp_message_processor.h"
13 #include "blimp/net/blimp_message_pump.h" 14 #include "blimp/net/blimp_message_pump.h"
14 #include "blimp/net/common.h" 15 #include "blimp/net/common.h"
15 #include "blimp/net/connection_error_observer.h" 16 #include "blimp/net/connection_error_observer.h"
16 #include "blimp/net/packet_reader.h" 17 #include "blimp/net/packet_reader.h"
17 #include "blimp/net/packet_writer.h" 18 #include "blimp/net/packet_writer.h"
18 #include "net/base/completion_callback.h" 19 #include "net/base/completion_callback.h"
19 20
(...skipping 11 matching lines...) Expand all
31 } 32 }
32 33
33 // BlimpMessageProcessor implementation. 34 // BlimpMessageProcessor implementation.
34 void ProcessMessage(scoped_ptr<BlimpMessage> message, 35 void ProcessMessage(scoped_ptr<BlimpMessage> message,
35 const net::CompletionCallback& callback) override; 36 const net::CompletionCallback& callback) override;
36 37
37 private: 38 private:
38 void OnWritePacketComplete(int result); 39 void OnWritePacketComplete(int result);
39 40
40 PacketWriter* writer_; 41 PacketWriter* writer_;
41 ConnectionErrorObserver* error_observer_; 42 ConnectionErrorObserver* error_observer_ = nullptr;
42 scoped_refptr<net::IOBuffer> buffer_; 43 scoped_refptr<net::IOBuffer> buffer_;
43 net::CompletionCallback pending_process_msg_callback_; 44 net::CompletionCallback pending_process_msg_callback_;
45 base::WeakPtrFactory<BlimpMessageSender> weak_factory_;
44 46
45 DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender); 47 DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
46 }; 48 };
47 49
48 BlimpMessageSender::BlimpMessageSender(PacketWriter* writer) 50 BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
49 : writer_(writer), buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)) { 51 : writer_(writer),
52 buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)),
53 weak_factory_(this) {
50 DCHECK(writer_); 54 DCHECK(writer_);
51 } 55 }
52 56
53 BlimpMessageSender::~BlimpMessageSender() {} 57 BlimpMessageSender::~BlimpMessageSender() {
58 DVLOG(1) << "BlimpMessageSender destroyed.";
59 }
54 60
55 void BlimpMessageSender::ProcessMessage( 61 void BlimpMessageSender::ProcessMessage(
56 scoped_ptr<BlimpMessage> message, 62 scoped_ptr<BlimpMessage> message,
57 const net::CompletionCallback& callback) { 63 const net::CompletionCallback& callback) {
64 DCHECK(error_observer_);
65 DVLOG(2) << "Sender::ProcessMessage " << *message;
66
58 if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) { 67 if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
59 DLOG(ERROR) << "Message is too big, size=" << message->ByteSize(); 68 DLOG(ERROR) << "Message rejected (too large): " << *message;
60 callback.Run(net::ERR_MSG_TOO_BIG); 69 callback.Run(net::ERR_MSG_TOO_BIG);
61 return; 70 return;
62 } 71 }
63 72
64 if (!message->SerializeToArray(buffer_->data(), message->ByteSize())) { 73 if (!message->SerializeToArray(buffer_->data(), message->GetCachedSize())) {
65 DLOG(ERROR) << "Failed to serialize message."; 74 DLOG(ERROR) << "Failed to serialize message.";
66 callback.Run(net::ERR_INVALID_ARGUMENT); 75 callback.Run(net::ERR_INVALID_ARGUMENT);
67 return; 76 return;
68 } 77 }
69 78
79 // Check that no other message writes are in-flight at this time.
80 DCHECK(pending_process_msg_callback_.is_null());
70 pending_process_msg_callback_ = callback; 81 pending_process_msg_callback_ = callback;
82
71 writer_->WritePacket( 83 writer_->WritePacket(
72 new net::DrainableIOBuffer(buffer_.get(), message->ByteSize()), 84 scoped_refptr<net::DrainableIOBuffer>(
85 new net::DrainableIOBuffer(buffer_.get(), message->ByteSize())),
73 base::Bind(&BlimpMessageSender::OnWritePacketComplete, 86 base::Bind(&BlimpMessageSender::OnWritePacketComplete,
74 base::Unretained(this))); 87 weak_factory_.GetWeakPtr()));
75 } 88 }
76 89
77 void BlimpMessageSender::OnWritePacketComplete(int result) { 90 void BlimpMessageSender::OnWritePacketComplete(int result) {
91 DVLOG(2) << "OnWritePacketComplete, result=" << result;
78 DCHECK_NE(net::ERR_IO_PENDING, result); 92 DCHECK_NE(net::ERR_IO_PENDING, result);
79 base::ResetAndReturn(&pending_process_msg_callback_).Run(result); 93 base::ResetAndReturn(&pending_process_msg_callback_).Run(result);
80 if (result != net::OK) { 94 if (result != net::OK) {
81 error_observer_->OnConnectionError(result); 95 error_observer_->OnConnectionError(result);
82 } 96 }
83 } 97 }
84 98
85 } // namespace 99 } // namespace
86 100
87 BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader, 101 BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader,
88 scoped_ptr<PacketWriter> writer) 102 scoped_ptr<PacketWriter> writer)
89 : reader_(std::move(reader)), 103 : reader_(std::move(reader)),
90 message_pump_(new BlimpMessagePump(reader_.get())), 104 message_pump_(new BlimpMessagePump(reader_.get())),
91 writer_(std::move(writer)), 105 writer_(std::move(writer)),
92 outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) { 106 outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) {
93 DCHECK(writer_); 107 DCHECK(writer_);
108
109 // Observe the connection errors received by any of this connection's network
110 // objects.
111 message_pump_->set_error_observer(this);
112 BlimpMessageSender* sender =
113 static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
114 sender->set_error_observer(this);
94 } 115 }
95 116
96 BlimpConnection::BlimpConnection() {} 117 BlimpConnection::BlimpConnection() {}
97 118
98 BlimpConnection::~BlimpConnection() {} 119 BlimpConnection::~BlimpConnection() {
120 DVLOG(1) << "BlimpConnection destroyed.";
121 }
99 122
100 void BlimpConnection::SetConnectionErrorObserver( 123 void BlimpConnection::AddConnectionErrorObserver(
101 ConnectionErrorObserver* observer) { 124 ConnectionErrorObserver* observer) {
102 message_pump_->set_error_observer(observer); 125 error_observers_.AddObserver(observer);
103 BlimpMessageSender* sender = 126 }
104 static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get()); 127
105 sender->set_error_observer(observer); 128 void BlimpConnection::RemoveConnectionErrorObserver(
129 ConnectionErrorObserver* observer) {
130 error_observers_.RemoveObserver(observer);
106 } 131 }
107 132
108 void BlimpConnection::SetIncomingMessageProcessor( 133 void BlimpConnection::SetIncomingMessageProcessor(
109 BlimpMessageProcessor* processor) { 134 BlimpMessageProcessor* processor) {
110 message_pump_->SetMessageProcessor(processor); 135 message_pump_->SetMessageProcessor(processor);
111 } 136 }
112 137
113 BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() { 138 BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() {
114 return outgoing_msg_processor_.get(); 139 return outgoing_msg_processor_.get();
115 } 140 }
116 141
142 void BlimpConnection::OnConnectionError(int error) {
143 VLOG(1) << "OnConnectionError, error=" << error;
144
145 // Propagate the error to all observers.
146 FOR_EACH_OBSERVER(ConnectionErrorObserver, error_observers_,
147 OnConnectionError(error));
148 }
149
117 } // namespace blimp 150 } // namespace blimp
OLDNEW
« no previous file with comments | « blimp/net/blimp_connection.h ('k') | blimp/net/blimp_connection_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698