Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Side by Side Diff: blimp/net/blimp_connection.cc

Issue 1551583003: Implementation and fixes for Blimp client/engine E2E communication. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@dtrainor-linux-cl1528243002
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "blimp/net/blimp_connection.h" 5 #include "blimp/net/blimp_connection.h"
6 6
7 #include "base/callback_helpers.h" 7 #include "base/callback_helpers.h"
8 #include "base/logging.h" 8 #include "base/logging.h"
9 #include "base/macros.h" 9 #include "base/macros.h"
10 #include "base/memory/weak_ptr.h"
10 #include "base/message_loop/message_loop.h" 11 #include "base/message_loop/message_loop.h"
11 #include "blimp/common/proto/blimp_message.pb.h" 12 #include "blimp/common/proto/blimp_message.pb.h"
12 #include "blimp/net/blimp_message_processor.h" 13 #include "blimp/net/blimp_message_processor.h"
13 #include "blimp/net/blimp_message_pump.h" 14 #include "blimp/net/blimp_message_pump.h"
14 #include "blimp/net/common.h" 15 #include "blimp/net/common.h"
15 #include "blimp/net/connection_error_observer.h" 16 #include "blimp/net/connection_error_observer.h"
16 #include "blimp/net/packet_reader.h" 17 #include "blimp/net/packet_reader.h"
17 #include "blimp/net/packet_writer.h" 18 #include "blimp/net/packet_writer.h"
18 #include "net/base/completion_callback.h" 19 #include "net/base/completion_callback.h"
19 20
(...skipping 11 matching lines...) Expand all
31 } 32 }
32 33
33 // BlimpMessageProcessor implementation. 34 // BlimpMessageProcessor implementation.
34 void ProcessMessage(scoped_ptr<BlimpMessage> message, 35 void ProcessMessage(scoped_ptr<BlimpMessage> message,
35 const net::CompletionCallback& callback) override; 36 const net::CompletionCallback& callback) override;
36 37
37 private: 38 private:
38 void OnWritePacketComplete(int result); 39 void OnWritePacketComplete(int result);
39 40
40 PacketWriter* writer_; 41 PacketWriter* writer_;
41 ConnectionErrorObserver* error_observer_; 42 ConnectionErrorObserver* error_observer_ = nullptr;
42 scoped_refptr<net::DrainableIOBuffer> buffer_; 43 scoped_refptr<net::IOBuffer> buffer_;
haibinlu 2015/12/29 00:51:45 can you sync to the head? The base seems out-of-da
Kevin M 2015/12/30 23:08:49 Done.
43 net::CompletionCallback pending_process_msg_callback_; 44 net::CompletionCallback pending_process_msg_callback_;
45 base::WeakPtrFactory<BlimpMessageSender> weak_factory_;
44 46
45 DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender); 47 DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
46 }; 48 };
47 49
48 BlimpMessageSender::BlimpMessageSender(PacketWriter* writer) 50 BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
49 : writer_(writer), 51 : writer_(writer),
50 buffer_(new net::DrainableIOBuffer( 52 buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)),
51 new net::IOBuffer(kMaxPacketPayloadSizeBytes), 53 weak_factory_(this) {
52 kMaxPacketPayloadSizeBytes)) {
53 DCHECK(writer_); 54 DCHECK(writer_);
54 } 55 }
55 56
56 BlimpMessageSender::~BlimpMessageSender() {} 57 BlimpMessageSender::~BlimpMessageSender() {
58 DVLOG(1) << "BlimpMessageSender destroyed.";
59 }
57 60
58 void BlimpMessageSender::ProcessMessage( 61 void BlimpMessageSender::ProcessMessage(
59 scoped_ptr<BlimpMessage> message, 62 scoped_ptr<BlimpMessage> message,
60 const net::CompletionCallback& callback) { 63 const net::CompletionCallback& callback) {
64 DCHECK(error_observer_);
65 DVLOG(2) << "ProcessMessage, size=" << message->ByteSize();
61 if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) { 66 if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
62 DLOG(ERROR) << "Message is too big, size=" << message->ByteSize(); 67 DLOG(ERROR) << "Message is too big, size=" << message->ByteSize();
63 callback.Run(net::ERR_MSG_TOO_BIG); 68 callback.Run(net::ERR_MSG_TOO_BIG);
64 return; 69 return;
65 } 70 }
66 71
67 buffer_->SetOffset(0); 72 scoped_refptr<net::DrainableIOBuffer> drainable_buffer(
68 if (!message->SerializeToArray(buffer_->data(), message->ByteSize())) { 73 new net::DrainableIOBuffer(buffer_.get(), message->ByteSize()));
74 if (!message->SerializeToArray(drainable_buffer->data(),
75 message->GetCachedSize())) {
69 DLOG(ERROR) << "Failed to serialize message."; 76 DLOG(ERROR) << "Failed to serialize message.";
70 callback.Run(net::ERR_INVALID_ARGUMENT); 77 callback.Run(net::ERR_INVALID_ARGUMENT);
71 return; 78 return;
72 } 79 }
73 80
81 // Check that no other message writes are in-flight at this time.
82 DCHECK(pending_process_msg_callback_.is_null());
74 pending_process_msg_callback_ = callback; 83 pending_process_msg_callback_ = callback;
75 writer_->WritePacket(buffer_, 84
85 writer_->WritePacket(drainable_buffer,
76 base::Bind(&BlimpMessageSender::OnWritePacketComplete, 86 base::Bind(&BlimpMessageSender::OnWritePacketComplete,
77 base::Unretained(this))); 87 weak_factory_.GetWeakPtr()));
78 } 88 }
79 89
80 void BlimpMessageSender::OnWritePacketComplete(int result) { 90 void BlimpMessageSender::OnWritePacketComplete(int result) {
91 DVLOG(2) << "OnWritePacketComplete, result=" << result;
81 DCHECK_NE(net::ERR_IO_PENDING, result); 92 DCHECK_NE(net::ERR_IO_PENDING, result);
82 base::ResetAndReturn(&pending_process_msg_callback_).Run(result); 93 base::ResetAndReturn(&pending_process_msg_callback_).Run(result);
83 if (result != net::OK) { 94 if (result != net::OK) {
84 error_observer_->OnConnectionError(result); 95 error_observer_->OnConnectionError(result);
85 } 96 }
86 } 97 }
87 98
88 } // namespace 99 } // namespace
89 100
90 BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader, 101 BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader,
91 scoped_ptr<PacketWriter> writer) 102 scoped_ptr<PacketWriter> writer)
92 : reader_(std::move(reader)), 103 : reader_(std::move(reader)),
93 message_pump_(new BlimpMessagePump(reader_.get())), 104 message_pump_(new BlimpMessagePump(reader_.get())),
94 writer_(std::move(writer)), 105 writer_(std::move(writer)),
95 outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) { 106 outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) {
96 DCHECK(writer_); 107 DCHECK(writer_);
97 } 108 }
98 109
99 BlimpConnection::BlimpConnection() {} 110 BlimpConnection::BlimpConnection() {}
100 111
101 BlimpConnection::~BlimpConnection() {} 112 BlimpConnection::~BlimpConnection() {
113 DVLOG(1) << "BlimpConnection destroyed.";
114 }
102 115
103 void BlimpConnection::SetConnectionErrorObserver( 116 void BlimpConnection::SetConnectionErrorObserver(
104 ConnectionErrorObserver* observer) { 117 ConnectionErrorObserver* observer) {
105 message_pump_->set_error_observer(observer); 118 message_pump_->set_error_observer(observer);
106 BlimpMessageSender* sender = 119 BlimpMessageSender* sender =
107 static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get()); 120 static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
108 sender->set_error_observer(observer); 121 sender->set_error_observer(observer);
109 } 122 }
110 123
111 void BlimpConnection::SetIncomingMessageProcessor( 124 void BlimpConnection::SetIncomingMessageProcessor(
112 BlimpMessageProcessor* processor) { 125 BlimpMessageProcessor* processor) {
113 message_pump_->SetMessageProcessor(processor); 126 message_pump_->SetMessageProcessor(processor);
114 } 127 }
115 128
116 BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() { 129 BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() {
117 return outgoing_msg_processor_.get(); 130 return outgoing_msg_processor_.get();
118 } 131 }
119 132
120 } // namespace blimp 133 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698