Chromium Code Reviews| Index: blimp/net/grpc_stream.cc |
| diff --git a/blimp/net/grpc_stream.cc b/blimp/net/grpc_stream.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8ebb96af656c4ddc1c3d24b73922c48fa42372c2 |
| --- /dev/null |
| +++ b/blimp/net/grpc_stream.cc |
| @@ -0,0 +1,186 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "blimp/net/grpc_stream.h" |
| + |
| +#include "base/bind_helpers.h" |
| +#include "base/callback.h" |
| +#include "base/callback_helpers.h" |
| +#include "base/command_line.h" |
| +#include "base/memory/ptr_util.h" |
| +#include "base/synchronization/lock.h" |
| +#include "base/threading/thread.h" |
| +#include "base/threading/thread_restrictions.h" |
| +#include "base/threading/thread_task_runner_handle.h" |
| + |
| +#include "content/public/browser/browser_thread.h" |
| + |
| +namespace blimp { |
| + |
| +GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback) |
| + : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| + connection_callback_(connection_callback), |
| + grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")), |
| + weak_factory_(this) {} |
| + |
| +GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() { |
| + return GrpcTag::Connect(connection_callback_); |
| +} |
| + |
| +GrpcStream::GrpcTag* GrpcStream::GrpcReadTag( |
| + const HeliumMessageReceivedCb& received_cb) { |
| + return GrpcTag::Read(received_cb); |
| +} |
| + |
| +GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag( |
| + const HeliumMessageSentCb& sent_cb) { |
| + return GrpcTag::Write(sent_cb); |
| +} |
| + |
| +GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() { |
| + return GrpcTag::WritesDone(); |
| +} |
| + |
| +void GrpcStream::StartCompletionQueueThread( |
| + grpc::CompletionQueue* completion_q) { |
| + if (grpc_thread_ == nullptr) { |
| + LOG(FATAL) << "Completion queue thread can only be started exactly once."; |
| + return; |
| + } |
| + |
| + base::Thread::Options options; |
| + // options.joinable = false; |
| + grpc_thread_->StartWithOptions(options); |
| + grpc_thread_.release()->task_runner()->PostTask( |
| + FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q, |
| + callback_task_runner_)); |
| +} |
| + |
| +void GrpcStream::CompletionQueueThread( |
| + grpc::CompletionQueue* completion_q, |
| + scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) { |
| + base::ThreadRestrictions::SetIOAllowed(true); |
| + |
| + DVLOG(3) << "Starting Completion Queue thread."; |
| + static int event_count = 0; |
| + bool ok = false; |
| + void* tag = nullptr; |
| + DVLOG(3) << "Waiting for next event (grpc stream; count so far = " |
| + << event_count << ")."; |
| + ++event_count; |
| + |
| + if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) { |
| + if (tag != nullptr) { |
| + auto grpc_tag = reinterpret_cast<GrpcTag*>(tag); |
| + if (callback_task_runner != nullptr) { |
| + DVLOG(3) << "Triggering GrpcTag (from gRPC thread): " |
| + << static_cast<int>(grpc_tag->tag_type_) |
| + << "; Result: " << (ok ? "OK" : "ERROR"); |
| + callback_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread, |
| + base::Passed(base::WrapUnique(grpc_tag)), ok)); |
| + } else { |
| + LOG(ERROR) << "Unable to trigger gRPC callbacks."; |
| + delete grpc_tag; |
| + } |
| + } else { |
| + LOG(ERROR) << "Completion queue shutting down; tag = " << tag; |
| + } |
| + |
| + if (ok) { |
| + auto task_runner = base::ThreadTaskRunnerHandle::Get(); |
| + if (task_runner != nullptr) { |
| + task_runner->PostTask(FROM_HERE, |
| + base::Bind(&GrpcStream::CompletionQueueThread, |
| + completion_q, callback_task_runner)); |
| + } |
| + } else { |
| + LOG(INFO) << "Tag " << tag << " had problems. Exiting."; |
| + } |
| + } else { |
| + LOG(ERROR) << "Completion queue thread has no more events."; |
| + } |
| +} |
| + |
| +GrpcStream::~GrpcStream() { |
| + callback_task_runner_ = nullptr; |
| +} |
| + |
| +GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) { |
| + DVLOG(3) << "Created Tag " << this; |
| +} |
| + |
| +GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect( |
| + const net::CompletionCallback& connection_cb) { |
| + auto tag = new GrpcStream::GrpcTag(); |
| + tag->tag_type_ = TagType::CONNECT; |
| + tag->connection_cb_ = connection_cb; |
| + return tag; |
| +} |
| + |
| +GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write( |
| + const GrpcStream::HeliumMessageSentCb& callback) { |
| + auto tag = new GrpcStream::GrpcTag(); |
| + tag->tag_type_ = TagType::WRITE; |
| + tag->sent_cb_ = callback; |
| + return tag; |
| +} |
| + |
| +GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read( |
| + const GrpcStream::HeliumMessageReceivedCb& callback) { |
| + auto tag = new GrpcStream::GrpcTag(); |
| + tag->tag_type_ = TagType::READ; |
| + tag->received_msg_ = base::MakeUnique<HeliumWrapper>(); |
| + tag->received_cb_ = callback; |
| + return tag; |
| +} |
| + |
| +GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() { |
| + auto tag = new GrpcStream::GrpcTag(); |
| + tag->tag_type_ = TagType::WRITES_DONE; |
| + return tag; |
| +} |
| + |
| +HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const { |
| + CHECK_EQ(TagType::READ, tag_type_); |
| + return received_msg_.get(); |
| +} |
| + |
| +void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) { |
| + auto status_code = (ok ? grpc::OK : -grpc::UNKNOWN); |
| + DVLOG(3) << "Processing GrpcTag (on IO thread): " |
| + << static_cast<int>(tag_type_) |
| + << " status_code = " << static_cast<int>(status_code); |
| + switch (tag_type_) { |
| + case TagType::CONNECT: |
| + DVLOG(1) << "gRPC Stream is now setup."; |
| + if (!connection_cb_.is_null()) { |
| + base::ResetAndReturn(&connection_cb_).Run(status_code); |
| + } |
| + break; |
| + case TagType::READ: |
| + DVLOG(1) << "Received Helium Message = " |
| + << "; size = " << received_msg_->ByteSize(); |
| + |
| + base::ResetAndReturn(&received_cb_) |
| + .Run(std::move(received_msg_), status_code); |
| + break; |
| + case TagType::WRITE: |
| + // TODO(perumaal): Apply correct helium results here. |
| + base::ResetAndReturn(&sent_cb_).Run(status_code); |
| + break; |
| + case TagType::WRITES_DONE: |
| + break; |
| + default: |
|
Kevin M
2016/10/31 21:33:25
General point: don't use default, so that accident
|
| + LOG(FATAL) << "GrpcTag has invalid tag type : " |
| + << static_cast<int>(tag_type_) << "; this = " << this; |
| + } |
| +} |
| + |
| +GrpcStream::GrpcTag::~GrpcTag() { |
| + DVLOG(3) << "Deleting tag " << ((void*)this); |
| +} |
| + |
| +} // namespace blimp |