Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "blimp/net/blimp_message_output_buffer.h" | 5 #include "blimp/net/blimp_message_output_buffer.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 | 8 |
| 9 #include "base/macros.h" | 9 #include "base/macros.h" |
| 10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
| 11 #include "blimp/common/proto/blimp_message.pb.h" | 11 #include "blimp/common/proto/blimp_message.pb.h" |
| 12 #include "net/base/net_errors.h" | 12 #include "net/base/net_errors.h" |
| 13 | 13 |
| 14 namespace blimp { | 14 namespace blimp { |
| 15 | 15 |
| 16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) | 16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) |
| 17 : max_buffer_size_bytes_(max_buffer_size_bytes) {} | 17 : max_buffer_size_bytes_(max_buffer_size_bytes) {} |
| 18 | 18 |
| 19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} | 19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} |
| 20 | 20 |
| 21 void BlimpMessageOutputBuffer::SetOutputProcessor( | 21 void BlimpMessageOutputBuffer::SetOutputProcessor( |
| 22 BlimpMessageProcessor* processor) { | 22 BlimpMessageProcessor* processor) { |
| 23 DVLOG(1) << "SetOutputProcessor " << processor; | |
| 23 // Check that we are setting or removing the processor, not replacing it. | 24 // Check that we are setting or removing the processor, not replacing it. |
| 24 if (processor) { | 25 if (processor) { |
| 25 DCHECK(!output_processor_); | 26 DCHECK(!output_processor_); |
| 26 output_processor_ = processor; | 27 output_processor_ = processor; |
| 27 write_complete_cb_.Reset(base::Bind( | 28 write_complete_cb_.Reset(base::Bind( |
| 28 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); | 29 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); |
| 29 WriteNextMessageIfReady(); | 30 WriteNextMessageIfReady(); |
| 30 } else { | 31 } else { |
| 31 DCHECK(output_processor_); | 32 DCHECK(output_processor_); |
| 32 output_processor_ = nullptr; | 33 output_processor_ = nullptr; |
| 33 write_complete_cb_.Cancel(); | 34 write_complete_cb_.Cancel(); |
| 34 } | 35 } |
| 35 } | 36 } |
| 36 | 37 |
| 37 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { | 38 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { |
| 38 DCHECK(output_processor_); | 39 DCHECK(output_processor_); |
| 40 DVLOG(1) << "RetransmitBufferedMessages()"; | |
| 39 | 41 |
| 40 // Prepend the entirety of |ack_buffer_| to |write_buffer_|. | 42 // Prepend the entirety of |ack_buffer_| to |write_buffer_|. |
| 41 write_buffer_.insert(write_buffer_.begin(), | 43 write_buffer_.insert(write_buffer_.begin(), |
| 42 std::make_move_iterator(ack_buffer_.begin()), | 44 std::make_move_iterator(ack_buffer_.begin()), |
| 43 std::make_move_iterator(ack_buffer_.end())); | 45 std::make_move_iterator(ack_buffer_.end())); |
| 44 ack_buffer_.clear(); | 46 ack_buffer_.clear(); |
| 45 | 47 |
| 46 WriteNextMessageIfReady(); | 48 WriteNextMessageIfReady(); |
| 47 } | 49 } |
| 48 | 50 |
| 49 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { | 51 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { |
| 50 return write_buffer_.size() + ack_buffer_.size(); | 52 return write_buffer_.size() + ack_buffer_.size(); |
| 51 } | 53 } |
| 52 | 54 |
| 53 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { | 55 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { |
| 54 return ack_buffer_.size(); | 56 return ack_buffer_.size(); |
| 55 } | 57 } |
| 56 | 58 |
| 57 void BlimpMessageOutputBuffer::ProcessMessage( | 59 void BlimpMessageOutputBuffer::ProcessMessage( |
| 58 scoped_ptr<BlimpMessage> message, | 60 scoped_ptr<BlimpMessage> message, |
| 59 const net::CompletionCallback& callback) { | 61 const net::CompletionCallback& callback) { |
| 60 VLOG(2) << "ProcessMessage (id=" << message->message_id() | 62 VLOG(2) << "OutputBuffer ProcessMessage (id=" << message->message_id() |
|
haibinlu
2015/12/29 00:51:45
DVLOG instead?
Kevin M
2015/12/30 23:08:49
Done.
| |
| 61 << ", type=" << message->type() << ")"; | 63 << ", type=" << message->type() << ", size=" << message->ByteSize() |
| 64 << ")"; | |
| 62 | 65 |
| 63 message->set_message_id(++prev_message_id_); | 66 message->set_message_id(++prev_message_id_); |
| 64 | 67 |
| 65 current_buffer_size_bytes_ += message->ByteSize(); | 68 current_buffer_size_bytes_ += message->ByteSize(); |
| 66 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); | 69 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); |
| 67 | 70 |
| 68 write_buffer_.push_back( | 71 write_buffer_.push_back( |
| 69 make_scoped_ptr(new BufferEntry(std::move(message), callback))); | 72 make_scoped_ptr(new BufferEntry(std::move(message), callback))); |
| 70 | 73 |
| 71 // Write the message | 74 // Write the message |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 113 } | 116 } |
| 114 | 117 |
| 115 BlimpMessageOutputBuffer::BufferEntry::BufferEntry( | 118 BlimpMessageOutputBuffer::BufferEntry::BufferEntry( |
| 116 scoped_ptr<BlimpMessage> message, | 119 scoped_ptr<BlimpMessage> message, |
| 117 net::CompletionCallback callback) | 120 net::CompletionCallback callback) |
| 118 : message(std::move(message)), callback(callback) {} | 121 : message(std::move(message)), callback(callback) {} |
| 119 | 122 |
| 120 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} | 123 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} |
| 121 | 124 |
| 122 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { | 125 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { |
| 126 DVLOG(3) << "WriteNextMessageIfReady"; | |
| 123 if (write_buffer_.empty()) { | 127 if (write_buffer_.empty()) { |
| 124 VLOG(2) << "Nothing to write."; | 128 DVLOG(3) << "Nothing to write."; |
| 125 return; | 129 return; |
| 126 } | 130 } |
| 127 | 131 |
| 128 scoped_ptr<BlimpMessage> message_to_write( | 132 scoped_ptr<BlimpMessage> message_to_write( |
| 129 new BlimpMessage(*write_buffer_.front()->message)); | 133 new BlimpMessage(*write_buffer_.front()->message)); |
| 130 VLOG(3) << "Writing message (id=" | 134 DVLOG(3) << "Writing message (id=" |
| 131 << write_buffer_.front()->message->message_id() | 135 << write_buffer_.front()->message->message_id() |
| 132 << ", type=" << message_to_write->type() << ")"; | 136 << ", type=" << message_to_write->type() << ")"; |
| 133 | 137 |
| 134 output_processor_->ProcessMessage(std::move(message_to_write), | 138 output_processor_->ProcessMessage(std::move(message_to_write), |
| 135 write_complete_cb_.callback()); | 139 write_complete_cb_.callback()); |
| 136 VLOG(3) << "Queue size: " << write_buffer_.size(); | 140 DVLOG(3) << "Queue size: " << write_buffer_.size(); |
| 137 } | 141 } |
| 138 | 142 |
| 139 void BlimpMessageOutputBuffer::OnWriteComplete(int result) { | 143 void BlimpMessageOutputBuffer::OnWriteComplete(int result) { |
| 140 DCHECK_LE(result, net::OK); | 144 DCHECK_LE(result, net::OK); |
| 141 VLOG(2) << "Write complete, result=" << result; | 145 VLOG(2) << "Write complete, result=" << result; |
| 142 | 146 |
| 143 if (result == net::OK) { | 147 if (result == net::OK) { |
| 144 ack_buffer_.push_back(std::move(write_buffer_.front())); | 148 ack_buffer_.push_back(std::move(write_buffer_.front())); |
| 145 write_buffer_.pop_front(); | 149 write_buffer_.pop_front(); |
| 146 WriteNextMessageIfReady(); | 150 WriteNextMessageIfReady(); |
| 147 } else { | 151 } else { |
| 148 // An error occurred while writing to the network connection. | 152 // An error occurred while writing to the network connection. |
| 149 // Stop writing more messages until a new connection is established. | 153 // Stop writing more messages until a new connection is established. |
| 150 DLOG(WARNING) << "Write error (result=" << result << ")"; | 154 DLOG(WARNING) << "Write error (result=" << result << ")"; |
| 151 } | 155 } |
| 152 } | 156 } |
| 153 | 157 |
| 154 } // namespace blimp | 158 } // namespace blimp |
| OLD | NEW |