| Index: blimp/net/blimp_connection.cc
|
| diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc
|
| index 57a3580d74394de21c9ab30ecba1141a29171e29..870215952848609d082b84856ddd4df09339d991 100644
|
| --- a/blimp/net/blimp_connection.cc
|
| +++ b/blimp/net/blimp_connection.cc
|
| @@ -24,89 +24,6 @@
|
|
|
| namespace blimp {
|
|
|
| -// Forwards incoming blimp messages to PacketWriter.
|
| -class BlimpMessageSender : public BlimpMessageProcessor {
|
| - public:
|
| - explicit BlimpMessageSender(PacketWriter* writer);
|
| - ~BlimpMessageSender() override;
|
| -
|
| - void set_error_observer(ConnectionErrorObserver* observer) {
|
| - error_observer_ = observer;
|
| - }
|
| -
|
| - // BlimpMessageProcessor implementation.
|
| - // |callback| receives net::OK on write success, or receives an error code
|
| - // otherwise.
|
| - void ProcessMessage(std::unique_ptr<BlimpMessage> message,
|
| - const net::CompletionCallback& callback) override;
|
| -
|
| - private:
|
| - void OnWritePacketComplete(int result);
|
| -
|
| - PacketWriter* writer_;
|
| - ConnectionErrorObserver* error_observer_ = nullptr;
|
| - scoped_refptr<net::IOBuffer> buffer_;
|
| - net::CompletionCallback pending_process_msg_callback_;
|
| - base::WeakPtrFactory<BlimpMessageSender> weak_factory_;
|
| -
|
| - DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
|
| -};
|
| -
|
| -BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
|
| - : writer_(writer),
|
| - buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)),
|
| - weak_factory_(this) {
|
| - DCHECK(writer_);
|
| -}
|
| -
|
| -BlimpMessageSender::~BlimpMessageSender() {
|
| - DVLOG(1) << "BlimpMessageSender destroyed.";
|
| -}
|
| -
|
| -void BlimpMessageSender::ProcessMessage(
|
| - std::unique_ptr<BlimpMessage> message,
|
| - const net::CompletionCallback& callback) {
|
| - DCHECK(error_observer_);
|
| - VLOG(1) << "Sending " << *message;
|
| -
|
| - if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
|
| - DLOG(ERROR) << "Message rejected (too large): " << *message;
|
| - callback.Run(net::ERR_MSG_TOO_BIG);
|
| - return;
|
| - }
|
| - if (!message->SerializeToArray(buffer_->data(), message->GetCachedSize())) {
|
| - DLOG(ERROR) << "Failed to serialize message.";
|
| - callback.Run(net::ERR_INVALID_ARGUMENT);
|
| - return;
|
| - }
|
| -
|
| - // Check that no other message writes are in-flight at this time.
|
| - DCHECK(pending_process_msg_callback_.is_null());
|
| - pending_process_msg_callback_ = callback;
|
| -
|
| - writer_->WritePacket(
|
| - scoped_refptr<net::DrainableIOBuffer>(
|
| - new net::DrainableIOBuffer(buffer_.get(), message->ByteSize())),
|
| - base::Bind(&BlimpMessageSender::OnWritePacketComplete,
|
| - weak_factory_.GetWeakPtr()));
|
| -}
|
| -
|
| -void BlimpMessageSender::OnWritePacketComplete(int result) {
|
| - DVLOG(2) << "OnWritePacketComplete, result=" << result;
|
| - DCHECK_NE(net::ERR_IO_PENDING, result);
|
| -
|
| - // Create a stack-local copy of |pending_process_msg_callback_|, in case an
|
| - // observer deletes |this|.
|
| - net::CompletionCallback process_callback =
|
| - base::ResetAndReturn(&pending_process_msg_callback_);
|
| -
|
| - if (result != net::OK) {
|
| - error_observer_->OnConnectionError(result);
|
| - }
|
| -
|
| - process_callback.Run(result);
|
| -}
|
| -
|
| // MessageProcessor filter used to route EndConnection messages through to
|
| // OnConnectionError notifications on the owning BlimpConnection.
|
| class BlimpConnection::EndConnectionFilter : public BlimpMessageProcessor {
|
| @@ -152,16 +69,9 @@ void BlimpConnection::EndConnectionFilter::ProcessMessage(
|
| message_handler_->ProcessMessage(std::move(message), callback);
|
| }
|
|
|
| -BlimpConnection::BlimpConnection(std::unique_ptr<MessagePort> message_port)
|
| - : message_port_(std::move(message_port)),
|
| - message_pump_(new BlimpMessagePump(message_port_->reader())),
|
| - outgoing_msg_processor_(new BlimpMessageSender(message_port_->writer())),
|
| - end_connection_filter_(new EndConnectionFilter(this)) {
|
| - message_pump_->set_error_observer(this);
|
| - outgoing_msg_processor_->set_error_observer(this);
|
| -}
|
| +BlimpConnection::BlimpConnection()
|
| + : end_connection_filter_(new EndConnectionFilter(this)) {}
|
|
|
| -BlimpConnection::BlimpConnection() {}
|
|
|
| BlimpConnection::~BlimpConnection() {
|
| VLOG(1) << "BlimpConnection destroyed.";
|
| @@ -177,23 +87,22 @@ void BlimpConnection::RemoveConnectionErrorObserver(
|
| error_observers_.RemoveObserver(observer);
|
| }
|
|
|
| -void BlimpConnection::SetIncomingMessageProcessor(
|
| +void BlimpConnection::AddEndConnectionProcessor(
|
| BlimpMessageProcessor* processor) {
|
| end_connection_filter_->set_message_handler(processor);
|
| - message_pump_->SetMessageProcessor(processor ? end_connection_filter_.get()
|
| - : nullptr);
|
| }
|
|
|
| -BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() {
|
| - return outgoing_msg_processor_.get();
|
| +BlimpMessageProcessor* BlimpConnection::GetEndConnectionProcessor() const {
|
| + return end_connection_filter_.get();
|
| }
|
|
|
| void BlimpConnection::OnConnectionError(int error) {
|
| VLOG(1) << "OnConnectionError, error=" << error;
|
|
|
| // Propagate the error to all observers.
|
| - for (auto& observer : error_observers_)
|
| + for (auto& observer : error_observers_) {
|
| observer.OnConnectionError(error);
|
| + }
|
| }
|
|
|
| } // namespace blimp
|
|
|