| Index: blimp/net/blimp_message_pump.cc
|
| diff --git a/blimp/net/blimp_message_pump.cc b/blimp/net/blimp_message_pump.cc
|
| index d9a250a89765a7b1cf04d88a01bca9a0c44df9ec..79082e078d22c8a924bd418734ce1b15d6c36a51 100644
|
| --- a/blimp/net/blimp_message_pump.cc
|
| +++ b/blimp/net/blimp_message_pump.cc
|
| @@ -21,58 +21,46 @@ BlimpMessagePump::BlimpMessagePump(PacketReader* reader)
|
| processor_(nullptr),
|
| buffer_(new net::GrowableIOBuffer) {
|
| DCHECK(reader_);
|
| + buffer_->SetCapacity(kMaxPacketPayloadSizeBytes);
|
| }
|
|
|
| BlimpMessagePump::~BlimpMessagePump() {}
|
|
|
| void BlimpMessagePump::SetMessageProcessor(BlimpMessageProcessor* processor) {
|
| - process_msg_callback_.Cancel();
|
| + DCHECK(!processor_);
|
| processor_ = processor;
|
| - if (!processor_) {
|
| - read_packet_callback_.Cancel();
|
| - } else if (read_packet_callback_.IsCancelled()) {
|
| - buffer_->SetCapacity(kMaxPacketPayloadSizeBytes);
|
| - ReadNextPacket();
|
| - }
|
| + ReadNextPacket();
|
| }
|
|
|
| void BlimpMessagePump::ReadNextPacket() {
|
| DCHECK(processor_);
|
| buffer_->set_offset(0);
|
| - read_packet_callback_.Reset(base::Bind(
|
| - &BlimpMessagePump::OnReadPacketComplete, base::Unretained(this)));
|
| - int result =
|
| - reader_->ReadPacket(buffer_.get(), read_packet_callback_.callback());
|
| - if (result != net::ERR_IO_PENDING) {
|
| - // Read completed synchronously.
|
| - OnReadPacketComplete(result);
|
| - }
|
| + read_callback_.Reset(base::Bind(&BlimpMessagePump::OnReadPacketComplete,
|
| + base::Unretained(this)));
|
| + reader_->ReadPacket(buffer_.get(), read_callback_.callback());
|
| }
|
|
|
| void BlimpMessagePump::OnReadPacketComplete(int result) {
|
| - read_packet_callback_.Cancel();
|
| - if (result > 0) {
|
| - // The result is the size of the packet in bytes.
|
| + if (result == net::OK) {
|
| scoped_ptr<BlimpMessage> message(new BlimpMessage);
|
| - bool parse_result =
|
| - message->ParseFromArray(buffer_->StartOfBuffer(), result);
|
| - if (parse_result) {
|
| + if (message->ParseFromArray(buffer_->StartOfBuffer(), buffer_->offset())) {
|
| process_msg_callback_.Reset(base::Bind(
|
| &BlimpMessagePump::OnProcessMessageComplete, base::Unretained(this)));
|
| processor_->ProcessMessage(std::move(message),
|
| process_msg_callback_.callback());
|
| - return;
|
| + } else {
|
| + result = net::ERR_FAILED;
|
| }
|
| - result = net::ERR_FAILED;
|
| }
|
| - if (error_observer_)
|
| +
|
| + if (result != net::OK) {
|
| error_observer_->OnConnectionError(result);
|
| + }
|
| }
|
|
|
| void BlimpMessagePump::OnProcessMessageComplete(int result) {
|
| // No error is expected from the message receiver.
|
| - DCHECK_EQ(result, net::OK);
|
| - process_msg_callback_.Cancel();
|
| + DCHECK_EQ(net::OK, result);
|
| ReadNextPacket();
|
| }
|
|
|
|
|