Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(840)

Unified Diff: blimp/net/grpc_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« blimp/net/grpc_stream.h ('K') | « blimp/net/grpc_stream.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: blimp/net/grpc_stream.cc
diff --git a/blimp/net/grpc_stream.cc b/blimp/net/grpc_stream.cc
new file mode 100644
index 0000000000000000000000000000000000000000..8ebb96af656c4ddc1c3d24b73922c48fa42372c2
--- /dev/null
+++ b/blimp/net/grpc_stream.cc
@@ -0,0 +1,186 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "blimp/net/grpc_stream.h"
+
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "base/callback_helpers.h"
+#include "base/command_line.h"
+#include "base/memory/ptr_util.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/thread.h"
+#include "base/threading/thread_restrictions.h"
+#include "base/threading/thread_task_runner_handle.h"
+
+#include "content/public/browser/browser_thread.h"
+
+namespace blimp {
+
+GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback)
+ : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()),
+ connection_callback_(connection_callback),
+ grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")),
+ weak_factory_(this) {}
+
+GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() {
+ return GrpcTag::Connect(connection_callback_);
+}
+
+GrpcStream::GrpcTag* GrpcStream::GrpcReadTag(
+ const HeliumMessageReceivedCb& received_cb) {
+ return GrpcTag::Read(received_cb);
+}
+
+GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag(
+ const HeliumMessageSentCb& sent_cb) {
+ return GrpcTag::Write(sent_cb);
+}
+
+GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() {
+ return GrpcTag::WritesDone();
+}
+
+void GrpcStream::StartCompletionQueueThread(
+ grpc::CompletionQueue* completion_q) {
+ if (grpc_thread_ == nullptr) {
+ LOG(FATAL) << "Completion queue thread can only be started exactly once.";
+ return;
+ }
+
+ base::Thread::Options options;
+ // options.joinable = false;
+ grpc_thread_->StartWithOptions(options);
+ grpc_thread_.release()->task_runner()->PostTask(
+ FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q,
+ callback_task_runner_));
+}
+
+void GrpcStream::CompletionQueueThread(
+ grpc::CompletionQueue* completion_q,
+ scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) {
+ base::ThreadRestrictions::SetIOAllowed(true);
+
+ DVLOG(3) << "Starting Completion Queue thread.";
+ static int event_count = 0;
+ bool ok = false;
+ void* tag = nullptr;
+ DVLOG(3) << "Waiting for next event (grpc stream; count so far = "
+ << event_count << ").";
+ ++event_count;
+
+ if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) {
+ if (tag != nullptr) {
+ auto grpc_tag = reinterpret_cast<GrpcTag*>(tag);
+ if (callback_task_runner != nullptr) {
+ DVLOG(3) << "Triggering GrpcTag (from gRPC thread): "
+ << static_cast<int>(grpc_tag->tag_type_)
+ << "; Result: " << (ok ? "OK" : "ERROR");
+ callback_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread,
+ base::Passed(base::WrapUnique(grpc_tag)), ok));
+ } else {
+ LOG(ERROR) << "Unable to trigger gRPC callbacks.";
+ delete grpc_tag;
+ }
+ } else {
+ LOG(ERROR) << "Completion queue shutting down; tag = " << tag;
+ }
+
+ if (ok) {
+ auto task_runner = base::ThreadTaskRunnerHandle::Get();
+ if (task_runner != nullptr) {
+ task_runner->PostTask(FROM_HERE,
+ base::Bind(&GrpcStream::CompletionQueueThread,
+ completion_q, callback_task_runner));
+ }
+ } else {
+ LOG(INFO) << "Tag " << tag << " had problems. Exiting.";
+ }
+ } else {
+ LOG(ERROR) << "Completion queue thread has no more events.";
+ }
+}
+
+GrpcStream::~GrpcStream() {
+ callback_task_runner_ = nullptr;
+}
+
+GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) {
+ DVLOG(3) << "Created Tag " << this;
+}
+
+GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect(
+ const net::CompletionCallback& connection_cb) {
+ auto tag = new GrpcStream::GrpcTag();
+ tag->tag_type_ = TagType::CONNECT;
+ tag->connection_cb_ = connection_cb;
+ return tag;
+}
+
+GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write(
+ const GrpcStream::HeliumMessageSentCb& callback) {
+ auto tag = new GrpcStream::GrpcTag();
+ tag->tag_type_ = TagType::WRITE;
+ tag->sent_cb_ = callback;
+ return tag;
+}
+
+GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read(
+ const GrpcStream::HeliumMessageReceivedCb& callback) {
+ auto tag = new GrpcStream::GrpcTag();
+ tag->tag_type_ = TagType::READ;
+ tag->received_msg_ = base::MakeUnique<HeliumWrapper>();
+ tag->received_cb_ = callback;
+ return tag;
+}
+
+GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() {
+ auto tag = new GrpcStream::GrpcTag();
+ tag->tag_type_ = TagType::WRITES_DONE;
+ return tag;
+}
+
+HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const {
+ CHECK_EQ(TagType::READ, tag_type_);
+ return received_msg_.get();
+}
+
+void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) {
+ auto status_code = (ok ? grpc::OK : -grpc::UNKNOWN);
+ DVLOG(3) << "Processing GrpcTag (on IO thread): "
+ << static_cast<int>(tag_type_)
+ << " status_code = " << static_cast<int>(status_code);
+ switch (tag_type_) {
+ case TagType::CONNECT:
+ DVLOG(1) << "gRPC Stream is now setup.";
+ if (!connection_cb_.is_null()) {
+ base::ResetAndReturn(&connection_cb_).Run(status_code);
+ }
+ break;
+ case TagType::READ:
+ DVLOG(1) << "Received Helium Message = "
+ << "; size = " << received_msg_->ByteSize();
+
+ base::ResetAndReturn(&received_cb_)
+ .Run(std::move(received_msg_), status_code);
+ break;
+ case TagType::WRITE:
+ // TODO(perumaal): Apply correct helium results here.
+ base::ResetAndReturn(&sent_cb_).Run(status_code);
+ break;
+ case TagType::WRITES_DONE:
+ break;
+ default:
Kevin M 2016/10/31 21:33:25 General point: don't use default, so that accident
+ LOG(FATAL) << "GrpcTag has invalid tag type : "
+ << static_cast<int>(tag_type_) << "; this = " << this;
+ }
+}
+
+GrpcStream::GrpcTag::~GrpcTag() {
+ DVLOG(3) << "Deleting tag " << ((void*)this);
+}
+
+} // namespace blimp
« blimp/net/grpc_stream.h ('K') | « blimp/net/grpc_stream.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698