OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "blimp/net/blimp_message_pump.h" | 5 #include "blimp/net/blimp_message_pump.h" |
6 | 6 |
7 #include "base/macros.h" | 7 #include "base/macros.h" |
8 #include "blimp/common/proto/blimp_message.pb.h" | 8 #include "blimp/common/proto/blimp_message.pb.h" |
9 #include "blimp/net/blimp_message_processor.h" | 9 #include "blimp/net/blimp_message_processor.h" |
10 #include "blimp/net/common.h" | 10 #include "blimp/net/common.h" |
11 #include "blimp/net/connection_error_observer.h" | 11 #include "blimp/net/connection_error_observer.h" |
12 #include "blimp/net/packet_reader.h" | 12 #include "blimp/net/packet_reader.h" |
13 #include "net/base/io_buffer.h" | 13 #include "net/base/io_buffer.h" |
14 #include "net/base/net_errors.h" | 14 #include "net/base/net_errors.h" |
15 | 15 |
16 namespace blimp { | 16 namespace blimp { |
17 | 17 |
18 BlimpMessagePump::BlimpMessagePump(PacketReader* reader) | 18 BlimpMessagePump::BlimpMessagePump(PacketReader* reader) |
19 : reader_(reader), | 19 : reader_(reader), |
20 error_observer_(nullptr), | 20 error_observer_(nullptr), |
21 processor_(nullptr), | 21 processor_(nullptr), |
22 buffer_(new net::GrowableIOBuffer) { | 22 buffer_(new net::GrowableIOBuffer) { |
23 DCHECK(reader_); | 23 DCHECK(reader_); |
| 24 buffer_->SetCapacity(kMaxPacketPayloadSizeBytes); |
24 } | 25 } |
25 | 26 |
26 BlimpMessagePump::~BlimpMessagePump() {} | 27 BlimpMessagePump::~BlimpMessagePump() {} |
27 | 28 |
28 void BlimpMessagePump::SetMessageProcessor(BlimpMessageProcessor* processor) { | 29 void BlimpMessagePump::SetMessageProcessor(BlimpMessageProcessor* processor) { |
29 process_msg_callback_.Cancel(); | 30 DCHECK(!processor_); |
30 processor_ = processor; | 31 processor_ = processor; |
31 if (!processor_) { | 32 ReadNextPacket(); |
32 read_packet_callback_.Cancel(); | |
33 } else if (read_packet_callback_.IsCancelled()) { | |
34 buffer_->SetCapacity(kMaxPacketPayloadSizeBytes); | |
35 ReadNextPacket(); | |
36 } | |
37 } | 33 } |
38 | 34 |
39 void BlimpMessagePump::ReadNextPacket() { | 35 void BlimpMessagePump::ReadNextPacket() { |
40 DCHECK(processor_); | 36 DCHECK(processor_); |
41 buffer_->set_offset(0); | 37 buffer_->set_offset(0); |
42 read_packet_callback_.Reset(base::Bind( | 38 read_callback_.Reset(base::Bind(&BlimpMessagePump::OnReadPacketComplete, |
43 &BlimpMessagePump::OnReadPacketComplete, base::Unretained(this))); | 39 base::Unretained(this))); |
44 int result = | 40 reader_->ReadPacket(buffer_.get(), read_callback_.callback()); |
45 reader_->ReadPacket(buffer_.get(), read_packet_callback_.callback()); | |
46 if (result != net::ERR_IO_PENDING) { | |
47 // Read completed synchronously. | |
48 OnReadPacketComplete(result); | |
49 } | |
50 } | 41 } |
51 | 42 |
52 void BlimpMessagePump::OnReadPacketComplete(int result) { | 43 void BlimpMessagePump::OnReadPacketComplete(int result) { |
53 read_packet_callback_.Cancel(); | 44 if (result == net::OK) { |
54 if (result > 0) { | |
55 // The result is the size of the packet in bytes. | |
56 scoped_ptr<BlimpMessage> message(new BlimpMessage); | 45 scoped_ptr<BlimpMessage> message(new BlimpMessage); |
57 bool parse_result = | 46 if (message->ParseFromArray(buffer_->StartOfBuffer(), buffer_->offset())) { |
58 message->ParseFromArray(buffer_->StartOfBuffer(), result); | |
59 if (parse_result) { | |
60 process_msg_callback_.Reset(base::Bind( | 47 process_msg_callback_.Reset(base::Bind( |
61 &BlimpMessagePump::OnProcessMessageComplete, base::Unretained(this))); | 48 &BlimpMessagePump::OnProcessMessageComplete, base::Unretained(this))); |
62 processor_->ProcessMessage(std::move(message), | 49 processor_->ProcessMessage(std::move(message), |
63 process_msg_callback_.callback()); | 50 process_msg_callback_.callback()); |
64 return; | 51 } else { |
| 52 result = net::ERR_FAILED; |
65 } | 53 } |
66 result = net::ERR_FAILED; | |
67 } | 54 } |
68 if (error_observer_) | 55 |
| 56 if (result != net::OK) { |
69 error_observer_->OnConnectionError(result); | 57 error_observer_->OnConnectionError(result); |
| 58 } |
70 } | 59 } |
71 | 60 |
72 void BlimpMessagePump::OnProcessMessageComplete(int result) { | 61 void BlimpMessagePump::OnProcessMessageComplete(int result) { |
73 // No error is expected from the message receiver. | 62 // No error is expected from the message receiver. |
74 DCHECK_EQ(result, net::OK); | 63 DCHECK_EQ(net::OK, result); |
75 process_msg_callback_.Cancel(); | |
76 ReadNextPacket(); | 64 ReadNextPacket(); |
77 } | 65 } |
78 | 66 |
79 } // namespace blimp | 67 } // namespace blimp |
OLD | NEW |