OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "blimp/net/tcp_connection.h" | |
6 | |
7 #include <utility> | |
8 | |
9 #include "base/callback_helpers.h" | |
10 #include "base/logging.h" | |
11 #include "base/macros.h" | |
12 #include "base/memory/weak_ptr.h" | |
13 #include "base/message_loop/message_loop.h" | |
14 #include "blimp/common/create_blimp_message.h" | |
15 #include "blimp/common/logging.h" | |
16 #include "blimp/common/proto/blimp_message.pb.h" | |
17 #include "blimp/net/blimp_message_processor.h" | |
18 #include "blimp/net/blimp_message_pump.h" | |
19 #include "blimp/net/common.h" | |
20 #include "blimp/net/connection_error_observer.h" | |
21 #include "blimp/net/message_port.h" | |
22 #include "blimp/net/packet_writer.h" | |
23 #include "net/base/completion_callback.h" | |
24 | |
25 namespace blimp { | |
26 | |
27 // Forwards incoming blimp messages to PacketWriter. | |
28 class BlimpMessageSender : public BlimpMessageProcessor { | |
29 public: | |
30 explicit BlimpMessageSender(PacketWriter* writer); | |
31 ~BlimpMessageSender() override; | |
32 | |
33 void set_error_observer(ConnectionErrorObserver* observer) { | |
34 error_observer_ = observer; | |
35 } | |
36 | |
37 // BlimpMessageProcessor implementation. | |
38 // |callback| receives net::OK on write success, or receives an error code | |
39 // otherwise. | |
40 void ProcessMessage(std::unique_ptr<BlimpMessage> message, | |
41 const net::CompletionCallback& callback) override; | |
42 | |
43 private: | |
44 void OnWritePacketComplete(int result); | |
45 | |
46 PacketWriter* writer_; | |
47 ConnectionErrorObserver* error_observer_ = nullptr; | |
48 scoped_refptr<net::IOBuffer> buffer_; | |
49 net::CompletionCallback pending_process_msg_callback_; | |
50 base::WeakPtrFactory<BlimpMessageSender> weak_factory_; | |
51 | |
52 DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender); | |
53 }; | |
54 | |
55 BlimpMessageSender::BlimpMessageSender(PacketWriter* writer) | |
56 : writer_(writer), | |
57 buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)), | |
58 weak_factory_(this) { | |
59 DCHECK(writer_); | |
60 } | |
61 | |
62 BlimpMessageSender::~BlimpMessageSender() { | |
63 DVLOG(1) << "BlimpMessageSender destroyed."; | |
64 } | |
65 | |
66 void BlimpMessageSender::ProcessMessage( | |
67 std::unique_ptr<BlimpMessage> message, | |
68 const net::CompletionCallback& callback) { | |
69 DCHECK(error_observer_); | |
70 VLOG(1) << "Sending " << *message; | |
71 | |
72 const int msg_byte_size = message->ByteSize(); | |
73 if (msg_byte_size > static_cast<int>(kMaxPacketPayloadSizeBytes)) { | |
74 DLOG(ERROR) << "Message rejected (too large): " << *message; | |
75 callback.Run(net::ERR_MSG_TOO_BIG); | |
76 return; | |
77 } | |
78 if (!message->SerializeToArray(buffer_->data(), msg_byte_size)) { | |
79 DLOG(ERROR) << "Failed to serialize message."; | |
80 callback.Run(net::ERR_INVALID_ARGUMENT); | |
81 return; | |
82 } | |
83 | |
84 // Check that no other message writes are in-flight at this time. | |
85 DCHECK(pending_process_msg_callback_.is_null()); | |
86 pending_process_msg_callback_ = callback; | |
87 | |
88 writer_->WritePacket( | |
89 scoped_refptr<net::DrainableIOBuffer>( | |
90 new net::DrainableIOBuffer(buffer_.get(), msg_byte_size)), | |
91 base::Bind(&BlimpMessageSender::OnWritePacketComplete, | |
92 weak_factory_.GetWeakPtr())); | |
93 } | |
94 | |
95 void BlimpMessageSender::OnWritePacketComplete(int result) { | |
96 DVLOG(2) << "OnWritePacketComplete, result=" << result; | |
97 DCHECK_NE(net::ERR_IO_PENDING, result); | |
98 | |
99 // Create a stack-local copy of |pending_process_msg_callback_|, in case an | |
100 // observer deletes |this|. | |
101 net::CompletionCallback process_callback = | |
102 base::ResetAndReturn(&pending_process_msg_callback_); | |
103 | |
104 if (result != net::OK) { | |
105 error_observer_->OnConnectionError(result); | |
106 } | |
107 | |
108 process_callback.Run(result); | |
109 } | |
110 | |
111 TCPConnection::TCPConnection(std::unique_ptr<MessagePort> message_port) | |
112 : BlimpConnection(), | |
113 message_port_(std::move(message_port)), | |
114 message_pump_(new BlimpMessagePump(message_port_->reader())), | |
115 outgoing_msg_processor_(new BlimpMessageSender(message_port_->writer())) { | |
116 message_pump_->set_error_observer(this); | |
117 outgoing_msg_processor_->set_error_observer(this); | |
118 } | |
119 | |
120 TCPConnection::~TCPConnection() { | |
121 VLOG(1) << "TCPConnection destroyed."; | |
122 } | |
123 | |
124 void TCPConnection::SetIncomingMessageProcessor( | |
125 BlimpMessageProcessor* processor) { | |
126 AddEndConnectionProcessor(processor); | |
127 message_pump_->SetMessageProcessor( | |
128 (processor != nullptr) ? GetEndConnectionProcessor() : nullptr); | |
129 } | |
130 | |
131 BlimpMessageProcessor* TCPConnection::GetOutgoingMessageProcessor() { | |
132 return outgoing_msg_processor_.get(); | |
133 } | |
134 | |
135 } // namespace blimp | |
OLD | NEW |