| Index: blimp/net/blimp_connection.cc
|
| diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc
|
| index dee90c139c53665efc4e92d1c5c0c555c23bbe2b..d2e2f78803afb6901738ee31b99681aa0c9ec2f6 100644
|
| --- a/blimp/net/blimp_connection.cc
|
| +++ b/blimp/net/blimp_connection.cc
|
| @@ -7,6 +7,7 @@
|
| #include "base/callback_helpers.h"
|
| #include "base/logging.h"
|
| #include "base/macros.h"
|
| +#include "base/memory/weak_ptr.h"
|
| #include "base/message_loop/message_loop.h"
|
| #include "blimp/common/proto/blimp_message.pb.h"
|
| #include "blimp/net/blimp_message_processor.h"
|
| @@ -38,43 +39,56 @@ class BlimpMessageSender : public BlimpMessageProcessor {
|
| void OnWritePacketComplete(int result);
|
|
|
| PacketWriter* writer_;
|
| - ConnectionErrorObserver* error_observer_;
|
| + ConnectionErrorObserver* error_observer_ = nullptr;
|
| scoped_refptr<net::IOBuffer> buffer_;
|
| net::CompletionCallback pending_process_msg_callback_;
|
| + base::WeakPtrFactory<BlimpMessageSender> weak_factory_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
|
| };
|
|
|
| BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
|
| - : writer_(writer), buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)) {
|
| + : writer_(writer),
|
| + buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)),
|
| + weak_factory_(this) {
|
| DCHECK(writer_);
|
| }
|
|
|
| -BlimpMessageSender::~BlimpMessageSender() {}
|
| +BlimpMessageSender::~BlimpMessageSender() {
|
| + DVLOG(1) << "BlimpMessageSender destroyed.";
|
| +}
|
|
|
| void BlimpMessageSender::ProcessMessage(
|
| scoped_ptr<BlimpMessage> message,
|
| const net::CompletionCallback& callback) {
|
| + DCHECK(error_observer_);
|
| + DVLOG(2) << "Sender::ProcessMessage " << *message;
|
| +
|
| if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
|
| - DLOG(ERROR) << "Message is too big, size=" << message->ByteSize();
|
| + DLOG(ERROR) << "Message rejected (too large): " << *message;
|
| callback.Run(net::ERR_MSG_TOO_BIG);
|
| return;
|
| }
|
|
|
| - if (!message->SerializeToArray(buffer_->data(), message->ByteSize())) {
|
| + if (!message->SerializeToArray(buffer_->data(), message->GetCachedSize())) {
|
| DLOG(ERROR) << "Failed to serialize message.";
|
| callback.Run(net::ERR_INVALID_ARGUMENT);
|
| return;
|
| }
|
|
|
| + // Check that no other message writes are in-flight at this time.
|
| + DCHECK(pending_process_msg_callback_.is_null());
|
| pending_process_msg_callback_ = callback;
|
| +
|
| writer_->WritePacket(
|
| - new net::DrainableIOBuffer(buffer_.get(), message->ByteSize()),
|
| + scoped_refptr<net::DrainableIOBuffer>(
|
| + new net::DrainableIOBuffer(buffer_.get(), message->ByteSize())),
|
| base::Bind(&BlimpMessageSender::OnWritePacketComplete,
|
| - base::Unretained(this)));
|
| + weak_factory_.GetWeakPtr()));
|
| }
|
|
|
| void BlimpMessageSender::OnWritePacketComplete(int result) {
|
| + DVLOG(2) << "OnWritePacketComplete, result=" << result;
|
| DCHECK_NE(net::ERR_IO_PENDING, result);
|
| base::ResetAndReturn(&pending_process_msg_callback_).Run(result);
|
| if (result != net::OK) {
|
| @@ -91,18 +105,29 @@ BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader,
|
| writer_(std::move(writer)),
|
| outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) {
|
| DCHECK(writer_);
|
| +
|
| + // Observe the connection errors received by any of this connection's network
|
| + // objects.
|
| + message_pump_->set_error_observer(this);
|
| + BlimpMessageSender* sender =
|
| + static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
|
| + sender->set_error_observer(this);
|
| }
|
|
|
| BlimpConnection::BlimpConnection() {}
|
|
|
| -BlimpConnection::~BlimpConnection() {}
|
| +BlimpConnection::~BlimpConnection() {
|
| + DVLOG(1) << "BlimpConnection destroyed.";
|
| +}
|
|
|
| -void BlimpConnection::SetConnectionErrorObserver(
|
| +void BlimpConnection::AddConnectionErrorObserver(
|
| ConnectionErrorObserver* observer) {
|
| - message_pump_->set_error_observer(observer);
|
| - BlimpMessageSender* sender =
|
| - static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
|
| - sender->set_error_observer(observer);
|
| + error_observers_.AddObserver(observer);
|
| +}
|
| +
|
| +void BlimpConnection::RemoveConnectionErrorObserver(
|
| + ConnectionErrorObserver* observer) {
|
| + error_observers_.RemoveObserver(observer);
|
| }
|
|
|
| void BlimpConnection::SetIncomingMessageProcessor(
|
| @@ -114,4 +139,12 @@ BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() {
|
| return outgoing_msg_processor_.get();
|
| }
|
|
|
| +void BlimpConnection::OnConnectionError(int error) {
|
| + VLOG(1) << "OnConnectionError, error=" << error;
|
| +
|
| + // Propagate the error to all observers.
|
| + FOR_EACH_OBSERVER(ConnectionErrorObserver, error_observers_,
|
| + OnConnectionError(error));
|
| +}
|
| +
|
| } // namespace blimp
|
|
|