Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(277)

Side by Side Diff: blimp/net/blimp_message_output_buffer.cc

Issue 1551583003: Implementation and fixes for Blimp client/engine E2E communication. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@dtrainor-linux-cl1528243002
Patch Set: Fixed misplaced EXPORT directive Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « blimp/net/blimp_message_demultiplexer_unittest.cc ('k') | blimp/net/blimp_message_pump.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "blimp/net/blimp_message_output_buffer.h" 5 #include "blimp/net/blimp_message_output_buffer.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/macros.h" 9 #include "base/macros.h"
10 #include "base/message_loop/message_loop.h" 10 #include "base/message_loop/message_loop.h"
11 #include "blimp/common/proto/blimp_message.pb.h" 11 #include "blimp/common/proto/blimp_message.pb.h"
12 #include "net/base/net_errors.h" 12 #include "net/base/net_errors.h"
13 13
14 namespace blimp { 14 namespace blimp {
15 15
16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) 16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes)
17 : max_buffer_size_bytes_(max_buffer_size_bytes) {} 17 : max_buffer_size_bytes_(max_buffer_size_bytes) {}
18 18
19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} 19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {}
20 20
21 void BlimpMessageOutputBuffer::SetOutputProcessor( 21 void BlimpMessageOutputBuffer::SetOutputProcessor(
22 BlimpMessageProcessor* processor) { 22 BlimpMessageProcessor* processor) {
23 DVLOG(1) << "SetOutputProcessor " << processor;
23 // Check that we are setting or removing the processor, not replacing it. 24 // Check that we are setting or removing the processor, not replacing it.
24 if (processor) { 25 if (processor) {
25 DCHECK(!output_processor_); 26 DCHECK(!output_processor_);
26 output_processor_ = processor; 27 output_processor_ = processor;
27 write_complete_cb_.Reset(base::Bind( 28 write_complete_cb_.Reset(base::Bind(
28 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); 29 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this)));
29 WriteNextMessageIfReady(); 30 WriteNextMessageIfReady();
30 } else { 31 } else {
31 DCHECK(output_processor_); 32 DCHECK(output_processor_);
32 output_processor_ = nullptr; 33 output_processor_ = nullptr;
33 write_complete_cb_.Cancel(); 34 write_complete_cb_.Cancel();
34 } 35 }
35 } 36 }
36 37
37 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { 38 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() {
38 DCHECK(output_processor_); 39 DCHECK(output_processor_);
40 DVLOG(1) << "RetransmitBufferedMessages()";
39 41
40 // Prepend the entirety of |ack_buffer_| to |write_buffer_|. 42 // Prepend the entirety of |ack_buffer_| to |write_buffer_|.
41 write_buffer_.insert(write_buffer_.begin(), 43 write_buffer_.insert(write_buffer_.begin(),
42 std::make_move_iterator(ack_buffer_.begin()), 44 std::make_move_iterator(ack_buffer_.begin()),
43 std::make_move_iterator(ack_buffer_.end())); 45 std::make_move_iterator(ack_buffer_.end()));
44 ack_buffer_.clear(); 46 ack_buffer_.clear();
45 47
46 WriteNextMessageIfReady(); 48 WriteNextMessageIfReady();
47 } 49 }
48 50
49 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { 51 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const {
50 return write_buffer_.size() + ack_buffer_.size(); 52 return write_buffer_.size() + ack_buffer_.size();
51 } 53 }
52 54
53 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { 55 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const {
54 return ack_buffer_.size(); 56 return ack_buffer_.size();
55 } 57 }
56 58
57 void BlimpMessageOutputBuffer::ProcessMessage( 59 void BlimpMessageOutputBuffer::ProcessMessage(
58 scoped_ptr<BlimpMessage> message, 60 scoped_ptr<BlimpMessage> message,
59 const net::CompletionCallback& callback) { 61 const net::CompletionCallback& callback) {
60 VLOG(2) << "ProcessMessage (id=" << message->message_id() 62 DVLOG(2) << "OutputBuffer::ProcessMessage " << message;
61 << ", type=" << message->type() << ")";
62 63
63 message->set_message_id(++prev_message_id_); 64 message->set_message_id(++prev_message_id_);
64 65
65 current_buffer_size_bytes_ += message->ByteSize(); 66 current_buffer_size_bytes_ += message->ByteSize();
66 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); 67 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_);
67 68
68 write_buffer_.push_back( 69 write_buffer_.push_back(
69 make_scoped_ptr(new BufferEntry(std::move(message), callback))); 70 make_scoped_ptr(new BufferEntry(std::move(message), callback)));
70 71
71 // Write the message 72 // Write the message
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
113 } 114 }
114 115
115 BlimpMessageOutputBuffer::BufferEntry::BufferEntry( 116 BlimpMessageOutputBuffer::BufferEntry::BufferEntry(
116 scoped_ptr<BlimpMessage> message, 117 scoped_ptr<BlimpMessage> message,
117 net::CompletionCallback callback) 118 net::CompletionCallback callback)
118 : message(std::move(message)), callback(callback) {} 119 : message(std::move(message)), callback(callback) {}
119 120
120 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} 121 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {}
121 122
122 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { 123 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() {
124 DVLOG(3) << "WriteNextMessageIfReady";
123 if (write_buffer_.empty()) { 125 if (write_buffer_.empty()) {
124 VLOG(2) << "Nothing to write."; 126 DVLOG(3) << "Nothing to write.";
125 return; 127 return;
126 } 128 }
127 129
128 scoped_ptr<BlimpMessage> message_to_write( 130 scoped_ptr<BlimpMessage> message_to_write(
129 new BlimpMessage(*write_buffer_.front()->message)); 131 new BlimpMessage(*write_buffer_.front()->message));
130 VLOG(3) << "Writing message (id=" 132 DVLOG(3) << "Writing message (id="
131 << write_buffer_.front()->message->message_id() 133 << write_buffer_.front()->message->message_id()
132 << ", type=" << message_to_write->type() << ")"; 134 << ", type=" << message_to_write->type() << ")";
133 135
134 output_processor_->ProcessMessage(std::move(message_to_write), 136 output_processor_->ProcessMessage(std::move(message_to_write),
135 write_complete_cb_.callback()); 137 write_complete_cb_.callback());
136 VLOG(3) << "Queue size: " << write_buffer_.size(); 138 DVLOG(3) << "Queue size: " << write_buffer_.size();
137 } 139 }
138 140
139 void BlimpMessageOutputBuffer::OnWriteComplete(int result) { 141 void BlimpMessageOutputBuffer::OnWriteComplete(int result) {
140 DCHECK_LE(result, net::OK); 142 DCHECK_LE(result, net::OK);
141 VLOG(2) << "Write complete, result=" << result; 143 VLOG(2) << "Write complete, result=" << result;
142 144
143 if (result == net::OK) { 145 if (result == net::OK) {
144 ack_buffer_.push_back(std::move(write_buffer_.front())); 146 ack_buffer_.push_back(std::move(write_buffer_.front()));
145 write_buffer_.pop_front(); 147 write_buffer_.pop_front();
146 WriteNextMessageIfReady(); 148 WriteNextMessageIfReady();
147 } else { 149 } else {
148 // An error occurred while writing to the network connection. 150 // An error occurred while writing to the network connection.
149 // Stop writing more messages until a new connection is established. 151 // Stop writing more messages until a new connection is established.
150 DLOG(WARNING) << "Write error (result=" << result << ")"; 152 DLOG(WARNING) << "Write error (result=" << result << ")";
151 } 153 }
152 } 154 }
153 155
154 } // namespace blimp 156 } // namespace blimp
OLDNEW
« no previous file with comments | « blimp/net/blimp_message_demultiplexer_unittest.cc ('k') | blimp/net/blimp_message_pump.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698