Index: blimp/net/blimp_message_pump.cc |
diff --git a/blimp/net/blimp_message_pump.cc b/blimp/net/blimp_message_pump.cc |
index d9a250a89765a7b1cf04d88a01bca9a0c44df9ec..79082e078d22c8a924bd418734ce1b15d6c36a51 100644 |
--- a/blimp/net/blimp_message_pump.cc |
+++ b/blimp/net/blimp_message_pump.cc |
@@ -21,58 +21,46 @@ BlimpMessagePump::BlimpMessagePump(PacketReader* reader) |
processor_(nullptr), |
buffer_(new net::GrowableIOBuffer) { |
DCHECK(reader_); |
+ buffer_->SetCapacity(kMaxPacketPayloadSizeBytes); |
} |
BlimpMessagePump::~BlimpMessagePump() {} |
void BlimpMessagePump::SetMessageProcessor(BlimpMessageProcessor* processor) { |
- process_msg_callback_.Cancel(); |
+ DCHECK(!processor_); |
processor_ = processor; |
- if (!processor_) { |
- read_packet_callback_.Cancel(); |
- } else if (read_packet_callback_.IsCancelled()) { |
- buffer_->SetCapacity(kMaxPacketPayloadSizeBytes); |
- ReadNextPacket(); |
- } |
+ ReadNextPacket(); |
} |
void BlimpMessagePump::ReadNextPacket() { |
DCHECK(processor_); |
buffer_->set_offset(0); |
- read_packet_callback_.Reset(base::Bind( |
- &BlimpMessagePump::OnReadPacketComplete, base::Unretained(this))); |
- int result = |
- reader_->ReadPacket(buffer_.get(), read_packet_callback_.callback()); |
- if (result != net::ERR_IO_PENDING) { |
- // Read completed synchronously. |
- OnReadPacketComplete(result); |
- } |
+ read_callback_.Reset(base::Bind(&BlimpMessagePump::OnReadPacketComplete, |
+ base::Unretained(this))); |
+ reader_->ReadPacket(buffer_.get(), read_callback_.callback()); |
} |
void BlimpMessagePump::OnReadPacketComplete(int result) { |
- read_packet_callback_.Cancel(); |
- if (result > 0) { |
- // The result is the size of the packet in bytes. |
+ if (result == net::OK) { |
scoped_ptr<BlimpMessage> message(new BlimpMessage); |
- bool parse_result = |
- message->ParseFromArray(buffer_->StartOfBuffer(), result); |
- if (parse_result) { |
+ if (message->ParseFromArray(buffer_->StartOfBuffer(), buffer_->offset())) { |
process_msg_callback_.Reset(base::Bind( |
&BlimpMessagePump::OnProcessMessageComplete, base::Unretained(this))); |
processor_->ProcessMessage(std::move(message), |
process_msg_callback_.callback()); |
- return; |
+ } else { |
+ result = net::ERR_FAILED; |
} |
- result = net::ERR_FAILED; |
} |
- if (error_observer_) |
+ |
+ if (result != net::OK) { |
error_observer_->OnConnectionError(result); |
+ } |
} |
void BlimpMessagePump::OnProcessMessageComplete(int result) { |
// No error is expected from the message receiver. |
- DCHECK_EQ(result, net::OK); |
- process_msg_callback_.Cancel(); |
+ DCHECK_EQ(net::OK, result); |
ReadNextPacket(); |
} |