OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "blimp/net/blimp_message_output_buffer.h" | 5 #include "blimp/net/blimp_message_output_buffer.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 | 8 |
9 #include "base/macros.h" | 9 #include "base/macros.h" |
10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
11 #include "blimp/common/proto/blimp_message.pb.h" | 11 #include "blimp/common/proto/blimp_message.pb.h" |
12 #include "net/base/net_errors.h" | 12 #include "net/base/net_errors.h" |
13 | 13 |
14 namespace blimp { | 14 namespace blimp { |
15 | 15 |
16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) | 16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) |
17 : max_buffer_size_bytes_(max_buffer_size_bytes) {} | 17 : max_buffer_size_bytes_(max_buffer_size_bytes) {} |
18 | 18 |
19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} | 19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} |
20 | 20 |
21 void BlimpMessageOutputBuffer::SetOutputProcessor( | 21 void BlimpMessageOutputBuffer::SetOutputProcessor( |
22 BlimpMessageProcessor* processor) { | 22 BlimpMessageProcessor* processor) { |
| 23 DVLOG(1) << "SetOutputProcessor " << processor; |
23 // Check that we are setting or removing the processor, not replacing it. | 24 // Check that we are setting or removing the processor, not replacing it. |
24 if (processor) { | 25 if (processor) { |
25 DCHECK(!output_processor_); | 26 DCHECK(!output_processor_); |
26 output_processor_ = processor; | 27 output_processor_ = processor; |
27 write_complete_cb_.Reset(base::Bind( | 28 write_complete_cb_.Reset(base::Bind( |
28 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); | 29 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); |
29 WriteNextMessageIfReady(); | 30 WriteNextMessageIfReady(); |
30 } else { | 31 } else { |
31 DCHECK(output_processor_); | 32 DCHECK(output_processor_); |
32 output_processor_ = nullptr; | 33 output_processor_ = nullptr; |
33 write_complete_cb_.Cancel(); | 34 write_complete_cb_.Cancel(); |
34 } | 35 } |
35 } | 36 } |
36 | 37 |
37 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { | 38 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { |
38 DCHECK(output_processor_); | 39 DCHECK(output_processor_); |
| 40 DVLOG(1) << "RetransmitBufferedMessages()"; |
39 | 41 |
40 // Prepend the entirety of |ack_buffer_| to |write_buffer_|. | 42 // Prepend the entirety of |ack_buffer_| to |write_buffer_|. |
41 write_buffer_.insert(write_buffer_.begin(), | 43 write_buffer_.insert(write_buffer_.begin(), |
42 std::make_move_iterator(ack_buffer_.begin()), | 44 std::make_move_iterator(ack_buffer_.begin()), |
43 std::make_move_iterator(ack_buffer_.end())); | 45 std::make_move_iterator(ack_buffer_.end())); |
44 ack_buffer_.clear(); | 46 ack_buffer_.clear(); |
45 | 47 |
46 WriteNextMessageIfReady(); | 48 WriteNextMessageIfReady(); |
47 } | 49 } |
48 | 50 |
49 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { | 51 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { |
50 return write_buffer_.size() + ack_buffer_.size(); | 52 return write_buffer_.size() + ack_buffer_.size(); |
51 } | 53 } |
52 | 54 |
53 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { | 55 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { |
54 return ack_buffer_.size(); | 56 return ack_buffer_.size(); |
55 } | 57 } |
56 | 58 |
57 void BlimpMessageOutputBuffer::ProcessMessage( | 59 void BlimpMessageOutputBuffer::ProcessMessage( |
58 scoped_ptr<BlimpMessage> message, | 60 scoped_ptr<BlimpMessage> message, |
59 const net::CompletionCallback& callback) { | 61 const net::CompletionCallback& callback) { |
60 VLOG(2) << "ProcessMessage (id=" << message->message_id() | 62 DVLOG(2) << "OutputBuffer::ProcessMessage " << message; |
61 << ", type=" << message->type() << ")"; | |
62 | 63 |
63 message->set_message_id(++prev_message_id_); | 64 message->set_message_id(++prev_message_id_); |
64 | 65 |
65 current_buffer_size_bytes_ += message->ByteSize(); | 66 current_buffer_size_bytes_ += message->ByteSize(); |
66 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); | 67 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); |
67 | 68 |
68 write_buffer_.push_back( | 69 write_buffer_.push_back( |
69 make_scoped_ptr(new BufferEntry(std::move(message), callback))); | 70 make_scoped_ptr(new BufferEntry(std::move(message), callback))); |
70 | 71 |
71 // Write the message | 72 // Write the message |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
113 } | 114 } |
114 | 115 |
115 BlimpMessageOutputBuffer::BufferEntry::BufferEntry( | 116 BlimpMessageOutputBuffer::BufferEntry::BufferEntry( |
116 scoped_ptr<BlimpMessage> message, | 117 scoped_ptr<BlimpMessage> message, |
117 net::CompletionCallback callback) | 118 net::CompletionCallback callback) |
118 : message(std::move(message)), callback(callback) {} | 119 : message(std::move(message)), callback(callback) {} |
119 | 120 |
120 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} | 121 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} |
121 | 122 |
122 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { | 123 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { |
| 124 DVLOG(3) << "WriteNextMessageIfReady"; |
123 if (write_buffer_.empty()) { | 125 if (write_buffer_.empty()) { |
124 VLOG(2) << "Nothing to write."; | 126 DVLOG(3) << "Nothing to write."; |
125 return; | 127 return; |
126 } | 128 } |
127 | 129 |
128 scoped_ptr<BlimpMessage> message_to_write( | 130 scoped_ptr<BlimpMessage> message_to_write( |
129 new BlimpMessage(*write_buffer_.front()->message)); | 131 new BlimpMessage(*write_buffer_.front()->message)); |
130 VLOG(3) << "Writing message (id=" | 132 DVLOG(3) << "Writing message (id=" |
131 << write_buffer_.front()->message->message_id() | 133 << write_buffer_.front()->message->message_id() |
132 << ", type=" << message_to_write->type() << ")"; | 134 << ", type=" << message_to_write->type() << ")"; |
133 | 135 |
134 output_processor_->ProcessMessage(std::move(message_to_write), | 136 output_processor_->ProcessMessage(std::move(message_to_write), |
135 write_complete_cb_.callback()); | 137 write_complete_cb_.callback()); |
136 VLOG(3) << "Queue size: " << write_buffer_.size(); | 138 DVLOG(3) << "Queue size: " << write_buffer_.size(); |
137 } | 139 } |
138 | 140 |
139 void BlimpMessageOutputBuffer::OnWriteComplete(int result) { | 141 void BlimpMessageOutputBuffer::OnWriteComplete(int result) { |
140 DCHECK_LE(result, net::OK); | 142 DCHECK_LE(result, net::OK); |
141 VLOG(2) << "Write complete, result=" << result; | 143 VLOG(2) << "Write complete, result=" << result; |
142 | 144 |
143 if (result == net::OK) { | 145 if (result == net::OK) { |
144 ack_buffer_.push_back(std::move(write_buffer_.front())); | 146 ack_buffer_.push_back(std::move(write_buffer_.front())); |
145 write_buffer_.pop_front(); | 147 write_buffer_.pop_front(); |
146 WriteNextMessageIfReady(); | 148 WriteNextMessageIfReady(); |
147 } else { | 149 } else { |
148 // An error occurred while writing to the network connection. | 150 // An error occurred while writing to the network connection. |
149 // Stop writing more messages until a new connection is established. | 151 // Stop writing more messages until a new connection is established. |
150 DLOG(WARNING) << "Write error (result=" << result << ")"; | 152 DLOG(WARNING) << "Write error (result=" << result << ")"; |
151 } | 153 } |
152 } | 154 } |
153 | 155 |
154 } // namespace blimp | 156 } // namespace blimp |
OLD | NEW |