Index: blimp/net/blimp_connection.cc |
diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc |
index 57a3580d74394de21c9ab30ecba1141a29171e29..a27a9fa0526642e88b3793dcb540f88d18200cab 100644 |
--- a/blimp/net/blimp_connection.cc |
+++ b/blimp/net/blimp_connection.cc |
@@ -24,89 +24,6 @@ |
namespace blimp { |
-// Forwards incoming blimp messages to PacketWriter. |
-class BlimpMessageSender : public BlimpMessageProcessor { |
- public: |
- explicit BlimpMessageSender(PacketWriter* writer); |
- ~BlimpMessageSender() override; |
- |
- void set_error_observer(ConnectionErrorObserver* observer) { |
- error_observer_ = observer; |
- } |
- |
- // BlimpMessageProcessor implementation. |
- // |callback| receives net::OK on write success, or receives an error code |
- // otherwise. |
- void ProcessMessage(std::unique_ptr<BlimpMessage> message, |
- const net::CompletionCallback& callback) override; |
- |
- private: |
- void OnWritePacketComplete(int result); |
- |
- PacketWriter* writer_; |
- ConnectionErrorObserver* error_observer_ = nullptr; |
- scoped_refptr<net::IOBuffer> buffer_; |
- net::CompletionCallback pending_process_msg_callback_; |
- base::WeakPtrFactory<BlimpMessageSender> weak_factory_; |
- |
- DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender); |
-}; |
- |
-BlimpMessageSender::BlimpMessageSender(PacketWriter* writer) |
- : writer_(writer), |
- buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)), |
- weak_factory_(this) { |
- DCHECK(writer_); |
-} |
- |
-BlimpMessageSender::~BlimpMessageSender() { |
- DVLOG(1) << "BlimpMessageSender destroyed."; |
-} |
- |
-void BlimpMessageSender::ProcessMessage( |
- std::unique_ptr<BlimpMessage> message, |
- const net::CompletionCallback& callback) { |
- DCHECK(error_observer_); |
- VLOG(1) << "Sending " << *message; |
- |
- if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) { |
- DLOG(ERROR) << "Message rejected (too large): " << *message; |
- callback.Run(net::ERR_MSG_TOO_BIG); |
- return; |
- } |
- if (!message->SerializeToArray(buffer_->data(), message->GetCachedSize())) { |
- DLOG(ERROR) << "Failed to serialize message."; |
- callback.Run(net::ERR_INVALID_ARGUMENT); |
- return; |
- } |
- |
- // Check that no other message writes are in-flight at this time. |
- DCHECK(pending_process_msg_callback_.is_null()); |
- pending_process_msg_callback_ = callback; |
- |
- writer_->WritePacket( |
- scoped_refptr<net::DrainableIOBuffer>( |
- new net::DrainableIOBuffer(buffer_.get(), message->ByteSize())), |
- base::Bind(&BlimpMessageSender::OnWritePacketComplete, |
- weak_factory_.GetWeakPtr())); |
-} |
- |
-void BlimpMessageSender::OnWritePacketComplete(int result) { |
- DVLOG(2) << "OnWritePacketComplete, result=" << result; |
- DCHECK_NE(net::ERR_IO_PENDING, result); |
- |
- // Create a stack-local copy of |pending_process_msg_callback_|, in case an |
- // observer deletes |this|. |
- net::CompletionCallback process_callback = |
- base::ResetAndReturn(&pending_process_msg_callback_); |
- |
- if (result != net::OK) { |
- error_observer_->OnConnectionError(result); |
- } |
- |
- process_callback.Run(result); |
-} |
- |
// MessageProcessor filter used to route EndConnection messages through to |
// OnConnectionError notifications on the owning BlimpConnection. |
class BlimpConnection::EndConnectionFilter : public BlimpMessageProcessor { |
@@ -152,16 +69,8 @@ void BlimpConnection::EndConnectionFilter::ProcessMessage( |
message_handler_->ProcessMessage(std::move(message), callback); |
} |
-BlimpConnection::BlimpConnection(std::unique_ptr<MessagePort> message_port) |
- : message_port_(std::move(message_port)), |
- message_pump_(new BlimpMessagePump(message_port_->reader())), |
- outgoing_msg_processor_(new BlimpMessageSender(message_port_->writer())), |
- end_connection_filter_(new EndConnectionFilter(this)) { |
- message_pump_->set_error_observer(this); |
- outgoing_msg_processor_->set_error_observer(this); |
-} |
- |
-BlimpConnection::BlimpConnection() {} |
+BlimpConnection::BlimpConnection() |
+ : end_connection_filter_(new EndConnectionFilter(this)) {} |
BlimpConnection::~BlimpConnection() { |
VLOG(1) << "BlimpConnection destroyed."; |
@@ -177,23 +86,22 @@ void BlimpConnection::RemoveConnectionErrorObserver( |
error_observers_.RemoveObserver(observer); |
} |
-void BlimpConnection::SetIncomingMessageProcessor( |
+void BlimpConnection::AddEndConnectionProcessor( |
BlimpMessageProcessor* processor) { |
end_connection_filter_->set_message_handler(processor); |
- message_pump_->SetMessageProcessor(processor ? end_connection_filter_.get() |
- : nullptr); |
} |
-BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() { |
- return outgoing_msg_processor_.get(); |
+BlimpMessageProcessor* BlimpConnection::GetEndConnectionProcessor() const { |
+ return end_connection_filter_.get(); |
Wez
2016/11/09 22:47:17
This isn't symmetric with AddEndConnectionProcesso
|
} |
void BlimpConnection::OnConnectionError(int error) { |
VLOG(1) << "OnConnectionError, error=" << error; |
// Propagate the error to all observers. |
- for (auto& observer : error_observers_) |
+ for (auto& observer : error_observers_) { |
observer.OnConnectionError(error); |
+ } |
} |
} // namespace blimp |