Index: blimp/net/grpc_stream.cc |
diff --git a/blimp/net/grpc_stream.cc b/blimp/net/grpc_stream.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8ebb96af656c4ddc1c3d24b73922c48fa42372c2 |
--- /dev/null |
+++ b/blimp/net/grpc_stream.cc |
@@ -0,0 +1,186 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "blimp/net/grpc_stream.h" |
+ |
+#include "base/bind_helpers.h" |
+#include "base/callback.h" |
+#include "base/callback_helpers.h" |
+#include "base/command_line.h" |
+#include "base/memory/ptr_util.h" |
+#include "base/synchronization/lock.h" |
+#include "base/threading/thread.h" |
+#include "base/threading/thread_restrictions.h" |
+#include "base/threading/thread_task_runner_handle.h" |
+ |
+#include "content/public/browser/browser_thread.h" |
+ |
+namespace blimp { |
+ |
+GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback) |
+ : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
+ connection_callback_(connection_callback), |
+ grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")), |
+ weak_factory_(this) {} |
+ |
+GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() { |
+ return GrpcTag::Connect(connection_callback_); |
+} |
+ |
+GrpcStream::GrpcTag* GrpcStream::GrpcReadTag( |
+ const HeliumMessageReceivedCb& received_cb) { |
+ return GrpcTag::Read(received_cb); |
+} |
+ |
+GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag( |
+ const HeliumMessageSentCb& sent_cb) { |
+ return GrpcTag::Write(sent_cb); |
+} |
+ |
+GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() { |
+ return GrpcTag::WritesDone(); |
+} |
+ |
+void GrpcStream::StartCompletionQueueThread( |
+ grpc::CompletionQueue* completion_q) { |
+ if (grpc_thread_ == nullptr) { |
+ LOG(FATAL) << "Completion queue thread can only be started exactly once."; |
+ return; |
+ } |
+ |
+ base::Thread::Options options; |
+ // options.joinable = false; |
+ grpc_thread_->StartWithOptions(options); |
+ grpc_thread_.release()->task_runner()->PostTask( |
+ FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q, |
+ callback_task_runner_)); |
+} |
+ |
+void GrpcStream::CompletionQueueThread( |
+ grpc::CompletionQueue* completion_q, |
+ scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) { |
+ base::ThreadRestrictions::SetIOAllowed(true); |
+ |
+ DVLOG(3) << "Starting Completion Queue thread."; |
+ static int event_count = 0; |
+ bool ok = false; |
+ void* tag = nullptr; |
+ DVLOG(3) << "Waiting for next event (grpc stream; count so far = " |
+ << event_count << ")."; |
+ ++event_count; |
+ |
+ if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) { |
+ if (tag != nullptr) { |
+ auto grpc_tag = reinterpret_cast<GrpcTag*>(tag); |
+ if (callback_task_runner != nullptr) { |
+ DVLOG(3) << "Triggering GrpcTag (from gRPC thread): " |
+ << static_cast<int>(grpc_tag->tag_type_) |
+ << "; Result: " << (ok ? "OK" : "ERROR"); |
+ callback_task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread, |
+ base::Passed(base::WrapUnique(grpc_tag)), ok)); |
+ } else { |
+ LOG(ERROR) << "Unable to trigger gRPC callbacks."; |
+ delete grpc_tag; |
+ } |
+ } else { |
+ LOG(ERROR) << "Completion queue shutting down; tag = " << tag; |
+ } |
+ |
+ if (ok) { |
+ auto task_runner = base::ThreadTaskRunnerHandle::Get(); |
+ if (task_runner != nullptr) { |
+ task_runner->PostTask(FROM_HERE, |
+ base::Bind(&GrpcStream::CompletionQueueThread, |
+ completion_q, callback_task_runner)); |
+ } |
+ } else { |
+ LOG(INFO) << "Tag " << tag << " had problems. Exiting."; |
+ } |
+ } else { |
+ LOG(ERROR) << "Completion queue thread has no more events."; |
+ } |
+} |
+ |
+GrpcStream::~GrpcStream() { |
+ callback_task_runner_ = nullptr; |
+} |
+ |
+GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) { |
+ DVLOG(3) << "Created Tag " << this; |
+} |
+ |
+GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect( |
+ const net::CompletionCallback& connection_cb) { |
+ auto tag = new GrpcStream::GrpcTag(); |
+ tag->tag_type_ = TagType::CONNECT; |
+ tag->connection_cb_ = connection_cb; |
+ return tag; |
+} |
+ |
+GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write( |
+ const GrpcStream::HeliumMessageSentCb& callback) { |
+ auto tag = new GrpcStream::GrpcTag(); |
+ tag->tag_type_ = TagType::WRITE; |
+ tag->sent_cb_ = callback; |
+ return tag; |
+} |
+ |
+GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read( |
+ const GrpcStream::HeliumMessageReceivedCb& callback) { |
+ auto tag = new GrpcStream::GrpcTag(); |
+ tag->tag_type_ = TagType::READ; |
+ tag->received_msg_ = base::MakeUnique<HeliumWrapper>(); |
+ tag->received_cb_ = callback; |
+ return tag; |
+} |
+ |
+GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() { |
+ auto tag = new GrpcStream::GrpcTag(); |
+ tag->tag_type_ = TagType::WRITES_DONE; |
+ return tag; |
+} |
+ |
+HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const { |
+ CHECK_EQ(TagType::READ, tag_type_); |
+ return received_msg_.get(); |
+} |
+ |
+void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) { |
+ auto status_code = (ok ? grpc::OK : -grpc::UNKNOWN); |
+ DVLOG(3) << "Processing GrpcTag (on IO thread): " |
+ << static_cast<int>(tag_type_) |
+ << " status_code = " << static_cast<int>(status_code); |
+ switch (tag_type_) { |
+ case TagType::CONNECT: |
+ DVLOG(1) << "gRPC Stream is now setup."; |
+ if (!connection_cb_.is_null()) { |
+ base::ResetAndReturn(&connection_cb_).Run(status_code); |
+ } |
+ break; |
+ case TagType::READ: |
+ DVLOG(1) << "Received Helium Message = " |
+ << "; size = " << received_msg_->ByteSize(); |
+ |
+ base::ResetAndReturn(&received_cb_) |
+ .Run(std::move(received_msg_), status_code); |
+ break; |
+ case TagType::WRITE: |
+ // TODO(perumaal): Apply correct helium results here. |
+ base::ResetAndReturn(&sent_cb_).Run(status_code); |
+ break; |
+ case TagType::WRITES_DONE: |
+ break; |
+ default: |
Kevin M
2016/10/31 21:33:25
General point: don't use default, so that accident
|
+ LOG(FATAL) << "GrpcTag has invalid tag type : " |
+ << static_cast<int>(tag_type_) << "; this = " << this; |
+ } |
+} |
+ |
+GrpcStream::GrpcTag::~GrpcTag() { |
+ DVLOG(3) << "Deleting tag " << ((void*)this); |
+} |
+ |
+} // namespace blimp |