Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2709)

Unified Diff: blimp/net/blimp_connection.cc

Issue 1551583003: Implementation and fixes for Blimp client/engine E2E communication. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@dtrainor-linux-cl1528243002
Patch Set: Addressed haibin's feedback, made an ObserverList for ConnectionErrorObserver Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: blimp/net/blimp_connection.cc
diff --git a/blimp/net/blimp_connection.cc b/blimp/net/blimp_connection.cc
index dee90c139c53665efc4e92d1c5c0c555c23bbe2b..391d49b0c313c352aaae486d1c9a8bd1aa831ec8 100644
--- a/blimp/net/blimp_connection.cc
+++ b/blimp/net/blimp_connection.cc
@@ -7,6 +7,7 @@
#include "base/callback_helpers.h"
#include "base/logging.h"
#include "base/macros.h"
+#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "blimp/common/proto/blimp_message.pb.h"
#include "blimp/net/blimp_message_processor.h"
@@ -38,43 +39,57 @@ class BlimpMessageSender : public BlimpMessageProcessor {
void OnWritePacketComplete(int result);
PacketWriter* writer_;
- ConnectionErrorObserver* error_observer_;
+ ConnectionErrorObserver* error_observer_ = nullptr;
scoped_refptr<net::IOBuffer> buffer_;
net::CompletionCallback pending_process_msg_callback_;
+ base::WeakPtrFactory<BlimpMessageSender> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(BlimpMessageSender);
};
BlimpMessageSender::BlimpMessageSender(PacketWriter* writer)
- : writer_(writer), buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)) {
+ : writer_(writer),
+ buffer_(new net::IOBuffer(kMaxPacketPayloadSizeBytes)),
+ weak_factory_(this) {
DCHECK(writer_);
}
-BlimpMessageSender::~BlimpMessageSender() {}
+BlimpMessageSender::~BlimpMessageSender() {
+ DVLOG(1) << "BlimpMessageSender destroyed.";
+}
void BlimpMessageSender::ProcessMessage(
scoped_ptr<BlimpMessage> message,
const net::CompletionCallback& callback) {
+ DCHECK(error_observer_);
+ DVLOG(2) << "Sender::ProcessMessage " << *message;
+
if (message->ByteSize() > static_cast<int>(kMaxPacketPayloadSizeBytes)) {
- DLOG(ERROR) << "Message is too big, size=" << message->ByteSize();
+ DLOG(ERROR) << "Message rejected (too large): " << *message;
haibinlu 2016/01/04 19:45:45 this generates big logs. the size of the message i
Kevin M 2016/01/04 20:42:12 I think that knowing the message type would also b
callback.Run(net::ERR_MSG_TOO_BIG);
return;
}
- if (!message->SerializeToArray(buffer_->data(), message->ByteSize())) {
+ scoped_refptr<net::DrainableIOBuffer> drainable_buffer(
+ new net::DrainableIOBuffer(buffer_.get(), message->ByteSize()));
+ if (!message->SerializeToArray(drainable_buffer->data(),
+ message->GetCachedSize())) {
DLOG(ERROR) << "Failed to serialize message.";
callback.Run(net::ERR_INVALID_ARGUMENT);
return;
}
+ // Check that no other message writes are in-flight at this time.
+ DCHECK(pending_process_msg_callback_.is_null());
pending_process_msg_callback_ = callback;
- writer_->WritePacket(
- new net::DrainableIOBuffer(buffer_.get(), message->ByteSize()),
- base::Bind(&BlimpMessageSender::OnWritePacketComplete,
- base::Unretained(this)));
+
+ writer_->WritePacket(drainable_buffer,
haibinlu 2016/01/04 19:45:45 why not create DrainableIOBuffer here?
Kevin M 2016/01/04 20:42:13 Good point! Done.
+ base::Bind(&BlimpMessageSender::OnWritePacketComplete,
+ weak_factory_.GetWeakPtr()));
}
void BlimpMessageSender::OnWritePacketComplete(int result) {
+ DVLOG(2) << "OnWritePacketComplete, result=" << result;
DCHECK_NE(net::ERR_IO_PENDING, result);
base::ResetAndReturn(&pending_process_msg_callback_).Run(result);
if (result != net::OK) {
@@ -91,18 +106,33 @@ BlimpConnection::BlimpConnection(scoped_ptr<PacketReader> reader,
writer_(std::move(writer)),
outgoing_msg_processor_(new BlimpMessageSender(writer_.get())) {
DCHECK(writer_);
+
+ // Observe the connection errors received by any of this connection's network
+ // objects.
+ message_pump_->set_error_observer(this);
+ BlimpMessageSender* sender =
+ static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
+ sender->set_error_observer(this);
}
BlimpConnection::BlimpConnection() {}
-BlimpConnection::~BlimpConnection() {}
+BlimpConnection::~BlimpConnection() {
+ DVLOG(1) << "BlimpConnection destroyed.";
+}
-void BlimpConnection::SetConnectionErrorObserver(
+void BlimpConnection::AddConnectionErrorObserver(
ConnectionErrorObserver* observer) {
- message_pump_->set_error_observer(observer);
- BlimpMessageSender* sender =
- static_cast<BlimpMessageSender*>(outgoing_msg_processor_.get());
- sender->set_error_observer(observer);
+ error_observers_.AddObserver(observer);
+}
+
+void BlimpConnection::ClearConnectionErrorObservers() {
+ error_observers_.Clear();
+}
+
+void BlimpConnection::RemoveConnectionErrorObserver(
+ ConnectionErrorObserver* observer) {
+ error_observers_.RemoveObserver(observer);
}
void BlimpConnection::SetIncomingMessageProcessor(
@@ -114,4 +144,12 @@ BlimpMessageProcessor* BlimpConnection::GetOutgoingMessageProcessor() {
return outgoing_msg_processor_.get();
}
+void BlimpConnection::OnConnectionError(int error) {
+ VLOG(1) << "OnConnectionError, error=" << error;
+
+ // Propagate the error to all observers.
+ FOR_EACH_OBSERVER(ConnectionErrorObserver, error_observers_,
+ OnConnectionError(error));
+}
+
} // namespace blimp

Powered by Google App Engine
This is Rietveld 408576698