Index: blimp/net/blimp_connection.cc |
diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc |
index dee90c139c53665efc4e92d1c5c0c555c23bbe2b..391d49b0c313c352aaae486d1c9a8bd1aa831ec8 100644 |
--- a/blimp/net/blimp_connection.cc |
+++ b/blimp/net/blimp_connection.cc |
@@ -7,6 +7,7 @@ |
#include "base/callback_helpers.h" |
#include "base/logging.h" |
#include "base/macros.h" |
+#include "base/memory/weak_ptr.h" |
#include "base/message_loop/message_loop.h" |
#include "blimp/common/proto/blimp_message.pb.h" |
#include "blimp/net/blimp_message_processor.h" |
@@ -38,43 +39,57 @@ class BlimpMessageSender : public BlimpMessageProcessor { |
void OnWritePacketComplete(int result); |
PacketWriter* writer_; |
- ConnectionErrorObserver* error_observer_; |
+ ConnectionErrorObserver* error_observer_ = nullptr; |
scoped_refptr<net::IOBuffer> buffer_; |
net::CompletionCallback pending_process_msg_callback_; |
+ base::WeakPtrFactory<BlimpMessageSender> weak_factory_; |
DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender); |
}; |
BlimpMessageSender::BlimpMessageSender(PacketWriter* writer) |
- : writer_(writer), buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)) { |
+ : writer_(writer), |
+ buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)), |
+ weak_factory_(this) { |
DCHECK(writer_); |
} |
-BlimpMessageSender::~BlimpMessageSender() {} |
+BlimpMessageSender::~BlimpMessageSender() { |
+ DVLOG(1) << "BlimpMessageSender destroyed."; |
+} |
void BlimpMessageSender::ProcessMessage( |
scoped_ptr<BlimpMessage> message, |
const net::CompletionCallback& callback) { |
+ DCHECK(error_observer_); |
+ DVLOG(2) << "Sender::ProcessMessage " << *message; |
+ |
if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) { |
- DLOG(ERROR) << "Message is too big, size=" << message->ByteSize(); |
+ DLOG(ERROR) << "Message rejected (too large): " << *message; |
haibinlu
2016/01/04 19:45:45
this generates big logs. the size of the message i
Kevin M
2016/01/04 20:42:12
I think that knowing the message type would also b
|
callback.Run(net::ERR_MSG_TOO_BIG); |
return; |
} |
- if (!message->SerializeToArray(buffer_->data(), message->ByteSize())) { |
+ scoped_refptr<net::DrainableIOBuffer> drainable_buffer( |
+ new net::DrainableIOBuffer(buffer_.get(), message->ByteSize())); |
+ if (!message->SerializeToArray(drainable_buffer->data(), |
+ message->GetCachedSize())) { |
DLOG(ERROR) << "Failed to serialize message."; |
callback.Run(net::ERR_INVALID_ARGUMENT); |
return; |
} |
+ // Check that no other message writes are in-flight at this time. |
+ DCHECK(pending_process_msg_callback_.is_null()); |
pending_process_msg_callback_ = callback; |
- writer_->WritePacket( |
- new net::DrainableIOBuffer(buffer_.get(), message->ByteSize()), |
- base::Bind(&BlimpMessageSender::OnWritePacketComplete, |
- base::Unretained(this))); |
+ |
+ writer_->WritePacket(drainable_buffer, |
haibinlu
2016/01/04 19:45:45
why not create DrainableIOBuffer here?
Kevin M
2016/01/04 20:42:13
Good point! Done.
|
+ base::Bind(&BlimpMessageSender::OnWritePacketComplete, |
+ weak_factory_.GetWeakPtr())); |
} |
void BlimpMessageSender::OnWritePacketComplete(int result) { |
+ DVLOG(2) << "OnWritePacketComplete, result=" << result; |
DCHECK_NE(net::ERR_IO_PENDING, result); |
base::ResetAndReturn(&pending_process_msg_callback_).Run(result); |
if (result != net::OK) { |
@@ -91,18 +106,33 @@ BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader, |
writer_(std::move(writer)), |
outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) { |
DCHECK(writer_); |
+ |
+ // Observe the connection errors received by any of this connection's network |
+ // objects. |
+ message_pump_->set_error_observer(this); |
+ BlimpMessageSender* sender = |
+ static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get()); |
+ sender->set_error_observer(this); |
} |
BlimpConnection::BlimpConnection() {} |
-BlimpConnection::~BlimpConnection() {} |
+BlimpConnection::~BlimpConnection() { |
+ DVLOG(1) << "BlimpConnection destroyed."; |
+} |
-void BlimpConnection::SetConnectionErrorObserver( |
+void BlimpConnection::AddConnectionErrorObserver( |
ConnectionErrorObserver* observer) { |
- message_pump_->set_error_observer(observer); |
- BlimpMessageSender* sender = |
- static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get()); |
- sender->set_error_observer(observer); |
+ error_observers_.AddObserver(observer); |
+} |
+ |
+void BlimpConnection::ClearConnectionErrorObservers() { |
+ error_observers_.Clear(); |
+} |
+ |
+void BlimpConnection::RemoveConnectionErrorObserver( |
+ ConnectionErrorObserver* observer) { |
+ error_observers_.RemoveObserver(observer); |
} |
void BlimpConnection::SetIncomingMessageProcessor( |
@@ -114,4 +144,12 @@ BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() { |
return outgoing_msg_processor_.get(); |
} |
+void BlimpConnection::OnConnectionError(int error) { |
+ VLOG(1) << "OnConnectionError, error=" << error; |
+ |
+ // Propagate the error to all observers. |
+ FOR_EACH_OBSERVER(ConnectionErrorObserver, error_observers_, |
+ OnConnectionError(error)); |
+} |
+ |
} // namespace blimp |