| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "blimp/net/blimp_message_output_buffer.h" | 5 #include "blimp/net/blimp_message_output_buffer.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 | 8 |
| 9 #include "base/macros.h" | 9 #include "base/macros.h" |
| 10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
| 11 #include "blimp/common/proto/blimp_message.pb.h" | 11 #include "blimp/common/proto/blimp_message.pb.h" |
| 12 #include "net/base/net_errors.h" | 12 #include "net/base/net_errors.h" |
| 13 | 13 |
| 14 namespace blimp { | 14 namespace blimp { |
| 15 | 15 |
| 16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) | 16 BlimpMessageOutputBuffer::BlimpMessageOutputBuffer(int max_buffer_size_bytes) |
| 17 : max_buffer_size_bytes_(max_buffer_size_bytes) {} | 17 : max_buffer_size_bytes_(max_buffer_size_bytes) {} |
| 18 | 18 |
| 19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} | 19 BlimpMessageOutputBuffer::~BlimpMessageOutputBuffer() {} |
| 20 | 20 |
| 21 void BlimpMessageOutputBuffer::SetOutputProcessor( | 21 void BlimpMessageOutputBuffer::SetOutputProcessor( |
| 22 BlimpMessageProcessor* processor) { | 22 BlimpMessageProcessor* processor) { |
| 23 DVLOG(1) << "SetOutputProcessor " << processor; |
| 23 // Check that we are setting or removing the processor, not replacing it. | 24 // Check that we are setting or removing the processor, not replacing it. |
| 24 if (processor) { | 25 if (processor) { |
| 25 DCHECK(!output_processor_); | 26 DCHECK(!output_processor_); |
| 26 output_processor_ = processor; | 27 output_processor_ = processor; |
| 27 write_complete_cb_.Reset(base::Bind( | 28 write_complete_cb_.Reset(base::Bind( |
| 28 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); | 29 &BlimpMessageOutputBuffer::OnWriteComplete, base::Unretained(this))); |
| 29 WriteNextMessageIfReady(); | 30 WriteNextMessageIfReady(); |
| 30 } else { | 31 } else { |
| 31 DCHECK(output_processor_); | 32 DCHECK(output_processor_); |
| 32 output_processor_ = nullptr; | 33 output_processor_ = nullptr; |
| 33 write_complete_cb_.Cancel(); | 34 write_complete_cb_.Cancel(); |
| 34 } | 35 } |
| 35 } | 36 } |
| 36 | 37 |
| 37 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { | 38 void BlimpMessageOutputBuffer::RetransmitBufferedMessages() { |
| 38 DCHECK(output_processor_); | 39 DCHECK(output_processor_); |
| 40 DVLOG(1) << "RetransmitBufferedMessages()"; |
| 39 | 41 |
| 40 // Prepend the entirety of |ack_buffer_| to |write_buffer_|. | 42 // Prepend the entirety of |ack_buffer_| to |write_buffer_|. |
| 41 write_buffer_.insert(write_buffer_.begin(), | 43 write_buffer_.insert(write_buffer_.begin(), |
| 42 std::make_move_iterator(ack_buffer_.begin()), | 44 std::make_move_iterator(ack_buffer_.begin()), |
| 43 std::make_move_iterator(ack_buffer_.end())); | 45 std::make_move_iterator(ack_buffer_.end())); |
| 44 ack_buffer_.clear(); | 46 ack_buffer_.clear(); |
| 45 | 47 |
| 46 WriteNextMessageIfReady(); | 48 WriteNextMessageIfReady(); |
| 47 } | 49 } |
| 48 | 50 |
| 49 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { | 51 int BlimpMessageOutputBuffer::GetBufferByteSizeForTest() const { |
| 50 return write_buffer_.size() + ack_buffer_.size(); | 52 return write_buffer_.size() + ack_buffer_.size(); |
| 51 } | 53 } |
| 52 | 54 |
| 53 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { | 55 int BlimpMessageOutputBuffer::GetUnacknowledgedMessageCountForTest() const { |
| 54 return ack_buffer_.size(); | 56 return ack_buffer_.size(); |
| 55 } | 57 } |
| 56 | 58 |
| 57 void BlimpMessageOutputBuffer::ProcessMessage( | 59 void BlimpMessageOutputBuffer::ProcessMessage( |
| 58 scoped_ptr<BlimpMessage> message, | 60 scoped_ptr<BlimpMessage> message, |
| 59 const net::CompletionCallback& callback) { | 61 const net::CompletionCallback& callback) { |
| 60 VLOG(2) << "ProcessMessage (id=" << message->message_id() | 62 DVLOG(2) << "OutputBuffer::ProcessMessage " << message; |
| 61 << ", type=" << message->type() << ")"; | |
| 62 | 63 |
| 63 message->set_message_id(++prev_message_id_); | 64 message->set_message_id(++prev_message_id_); |
| 64 | 65 |
| 65 current_buffer_size_bytes_ += message->ByteSize(); | 66 current_buffer_size_bytes_ += message->ByteSize(); |
| 66 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); | 67 DCHECK_GE(max_buffer_size_bytes_, current_buffer_size_bytes_); |
| 67 | 68 |
| 68 write_buffer_.push_back( | 69 write_buffer_.push_back( |
| 69 make_scoped_ptr(new BufferEntry(std::move(message), callback))); | 70 make_scoped_ptr(new BufferEntry(std::move(message), callback))); |
| 70 | 71 |
| 71 // Write the message | 72 // Write the message |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 113 } | 114 } |
| 114 | 115 |
| 115 BlimpMessageOutputBuffer::BufferEntry::BufferEntry( | 116 BlimpMessageOutputBuffer::BufferEntry::BufferEntry( |
| 116 scoped_ptr<BlimpMessage> message, | 117 scoped_ptr<BlimpMessage> message, |
| 117 net::CompletionCallback callback) | 118 net::CompletionCallback callback) |
| 118 : message(std::move(message)), callback(callback) {} | 119 : message(std::move(message)), callback(callback) {} |
| 119 | 120 |
| 120 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} | 121 BlimpMessageOutputBuffer::BufferEntry::~BufferEntry() {} |
| 121 | 122 |
| 122 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { | 123 void BlimpMessageOutputBuffer::WriteNextMessageIfReady() { |
| 124 DVLOG(3) << "WriteNextMessageIfReady"; |
| 123 if (write_buffer_.empty()) { | 125 if (write_buffer_.empty()) { |
| 124 VLOG(2) << "Nothing to write."; | 126 DVLOG(3) << "Nothing to write."; |
| 125 return; | 127 return; |
| 126 } | 128 } |
| 127 | 129 |
| 128 scoped_ptr<BlimpMessage> message_to_write( | 130 scoped_ptr<BlimpMessage> message_to_write( |
| 129 new BlimpMessage(*write_buffer_.front()->message)); | 131 new BlimpMessage(*write_buffer_.front()->message)); |
| 130 VLOG(3) << "Writing message (id=" | 132 DVLOG(3) << "Writing message (id=" |
| 131 << write_buffer_.front()->message->message_id() | 133 << write_buffer_.front()->message->message_id() |
| 132 << ", type=" << message_to_write->type() << ")"; | 134 << ", type=" << message_to_write->type() << ")"; |
| 133 | 135 |
| 134 output_processor_->ProcessMessage(std::move(message_to_write), | 136 output_processor_->ProcessMessage(std::move(message_to_write), |
| 135 write_complete_cb_.callback()); | 137 write_complete_cb_.callback()); |
| 136 VLOG(3) << "Queue size: " << write_buffer_.size(); | 138 DVLOG(3) << "Queue size: " << write_buffer_.size(); |
| 137 } | 139 } |
| 138 | 140 |
| 139 void BlimpMessageOutputBuffer::OnWriteComplete(int result) { | 141 void BlimpMessageOutputBuffer::OnWriteComplete(int result) { |
| 140 DCHECK_LE(result, net::OK); | 142 DCHECK_LE(result, net::OK); |
| 141 VLOG(2) << "Write complete, result=" << result; | 143 VLOG(2) << "Write complete, result=" << result; |
| 142 | 144 |
| 143 if (result == net::OK) { | 145 if (result == net::OK) { |
| 144 ack_buffer_.push_back(std::move(write_buffer_.front())); | 146 ack_buffer_.push_back(std::move(write_buffer_.front())); |
| 145 write_buffer_.pop_front(); | 147 write_buffer_.pop_front(); |
| 146 WriteNextMessageIfReady(); | 148 WriteNextMessageIfReady(); |
| 147 } else { | 149 } else { |
| 148 // An error occurred while writing to the network connection. | 150 // An error occurred while writing to the network connection. |
| 149 // Stop writing more messages until a new connection is established. | 151 // Stop writing more messages until a new connection is established. |
| 150 DLOG(WARNING) << "Write error (result=" << result << ")"; | 152 DLOG(WARNING) << "Write error (result=" << result << ")"; |
| 151 } | 153 } |
| 152 } | 154 } |
| 153 | 155 |
| 154 } // namespace blimp | 156 } // namespace blimp |
| OLD | NEW |