Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(198)

Side by Side Diff: blimp/net/blimp_message_output_buffer.cc

Issue 1551583003: Implementation and fixes for Blimp client/engine E2E communication. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@dtrainor-linux-cl1528243002
Patch Set: Created 4 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "blimp/net/blimp_message_output_buffer.h" 5 #include "blimp/net/blimp_message_output_buffer.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/macros.h" 9 #include "base/macros.h"
10 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop.h"
11 #include "blimp/common/proto/blimp_message.pb.h" 11 #include "blimp/common/proto/blimp_message.pb.h"
12 #include "net/base/net_errors.h" 12 #include "net/base/net_errors.h"
13 13
14 namespace blimp { 14 namespace blimp {
15 15
16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) 16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes)
17 : max_buffer_size_bytes_(max_buffer_size_bytes) {} 17 : max_buffer_size_bytes_(max_buffer_size_bytes) {}
18 18
19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} 19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {}
20 20
21 void BlimpMessageOutputBuffer::SetOutputProcessor( 21 void BlimpMessageOutputBuffer::SetOutputProcessor(
22 BlimpMessageProcessor* processor) { 22 BlimpMessageProcessor* processor) {
23 DVLOG(1) << "SetOutputProcessor " << processor;
23 // Check that we are setting or removing the processor, not replacing it. 24 // Check that we are setting or removing the processor, not replacing it.
24 if (processor) { 25 if (processor) {
25 DCHECK(!output_processor_); 26 DCHECK(!output_processor_);
26 output_processor_ = processor; 27 output_processor_ = processor;
27 write_complete_cb_.Reset(base::Bind( 28 write_complete_cb_.Reset(base::Bind(
28 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); 29 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this)));
29 WriteNextMessageIfReady(); 30 WriteNextMessageIfReady();
30 } else { 31 } else {
31 DCHECK(output_processor_); 32 DCHECK(output_processor_);
32 output_processor_ = nullptr; 33 output_processor_ = nullptr;
33 write_complete_cb_.Cancel(); 34 write_complete_cb_.Cancel();
34 } 35 }
35 } 36 }
36 37
37 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { 38 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() {
38 DCHECK(output_processor_); 39 DCHECK(output_processor_);
40 DVLOG(1) << "RetransmitBufferedMessages()";
39 41
40 // Prepend the entirety of |ack_buffer_| to |write_buffer_|. 42 // Prepend the entirety of |ack_buffer_| to |write_buffer_|.
41 write_buffer_.insert(write_buffer_.begin(), 43 write_buffer_.insert(write_buffer_.begin(),
42 std::make_move_iterator(ack_buffer_.begin()), 44 std::make_move_iterator(ack_buffer_.begin()),
43 std::make_move_iterator(ack_buffer_.end())); 45 std::make_move_iterator(ack_buffer_.end()));
44 ack_buffer_.clear(); 46 ack_buffer_.clear();
45 47
46 WriteNextMessageIfReady(); 48 WriteNextMessageIfReady();
47 } 49 }
48 50
49 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { 51 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const {
50 return write_buffer_.size() + ack_buffer_.size(); 52 return write_buffer_.size() + ack_buffer_.size();
51 } 53 }
52 54
53 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { 55 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const {
54 return ack_buffer_.size(); 56 return ack_buffer_.size();
55 } 57 }
56 58
57 void BlimpMessageOutputBuffer::ProcessMessage( 59 void BlimpMessageOutputBuffer::ProcessMessage(
58 scoped_ptr<BlimpMessage> message, 60 scoped_ptr<BlimpMessage> message,
59 const net::CompletionCallback& callback) { 61 const net::CompletionCallback& callback) {
60 VLOG(2) << "ProcessMessage (id=" << message->message_id() 62 VLOG(2) << "OutputBuffer ProcessMessage (id=" << message->message_id()
haibinlu 2015/12/29 00:51:45 DVLOG instead?
Kevin M 2015/12/30 23:08:49 Done.
61 << ", type=" << message->type() << ")"; 63 << ", type=" << message->type() << ", size=" << message->ByteSize()
64 << ")";
62 65
63 message->set_message_id(++prev_message_id_); 66 message->set_message_id(++prev_message_id_);
64 67
65 current_buffer_size_bytes_ += message->ByteSize(); 68 current_buffer_size_bytes_ += message->ByteSize();
66 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); 69 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_);
67 70
68 write_buffer_.push_back( 71 write_buffer_.push_back(
69 make_scoped_ptr(new BufferEntry(std::move(message), callback))); 72 make_scoped_ptr(new BufferEntry(std::move(message), callback)));
70 73
71 // Write the message 74 // Write the message
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
113 } 116 }
114 117
115 BlimpMessageOutputBuffer::BufferEntry::BufferEntry( 118 BlimpMessageOutputBuffer::BufferEntry::BufferEntry(
116 scoped_ptr<BlimpMessage> message, 119 scoped_ptr<BlimpMessage> message,
117 net::CompletionCallback callback) 120 net::CompletionCallback callback)
118 : message(std::move(message)), callback(callback) {} 121 : message(std::move(message)), callback(callback) {}
119 122
120 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} 123 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {}
121 124
122 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { 125 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() {
126 DVLOG(3) << "WriteNextMessageIfReady";
123 if (write_buffer_.empty()) { 127 if (write_buffer_.empty()) {
124 VLOG(2) << "Nothing to write."; 128 DVLOG(3) << "Nothing to write.";
125 return; 129 return;
126 } 130 }
127 131
128 scoped_ptr<BlimpMessage> message_to_write( 132 scoped_ptr<BlimpMessage> message_to_write(
129 new BlimpMessage(*write_buffer_.front()->message)); 133 new BlimpMessage(*write_buffer_.front()->message));
130 VLOG(3) << "Writing message (id=" 134 DVLOG(3) << "Writing message (id="
131 << write_buffer_.front()->message->message_id() 135 << write_buffer_.front()->message->message_id()
132 << ", type=" << message_to_write->type() << ")"; 136 << ", type=" << message_to_write->type() << ")";
133 137
134 output_processor_->ProcessMessage(std::move(message_to_write), 138 output_processor_->ProcessMessage(std::move(message_to_write),
135 write_complete_cb_.callback()); 139 write_complete_cb_.callback());
136 VLOG(3) << "Queue size: " << write_buffer_.size(); 140 DVLOG(3) << "Queue size: " << write_buffer_.size();
137 } 141 }
138 142
139 void BlimpMessageOutputBuffer::OnWriteComplete(int result) { 143 void BlimpMessageOutputBuffer::OnWriteComplete(int result) {
140 DCHECK_LE(result, net::OK); 144 DCHECK_LE(result, net::OK);
141 VLOG(2) << "Write complete, result=" << result; 145 VLOG(2) << "Write complete, result=" << result;
142 146
143 if (result == net::OK) { 147 if (result == net::OK) {
144 ack_buffer_.push_back(std::move(write_buffer_.front())); 148 ack_buffer_.push_back(std::move(write_buffer_.front()));
145 write_buffer_.pop_front(); 149 write_buffer_.pop_front();
146 WriteNextMessageIfReady(); 150 WriteNextMessageIfReady();
147 } else { 151 } else {
148 // An error occurred while writing to the network connection. 152 // An error occurred while writing to the network connection.
149 // Stop writing more messages until a new connection is established. 153 // Stop writing more messages until a new connection is established.
150 DLOG(WARNING) << "Write error (result=" << result << ")"; 154 DLOG(WARNING) << "Write error (result=" << result << ")";
151 } 155 }
152 } 156 }
153 157
154 } // namespace blimp 158 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698