| Index: blimp/net/grpc_stream.cc
|
| diff --git a/blimp/net/grpc_stream.cc b/blimp/net/grpc_stream.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..960c49c0cb453af304ea322fcb2f7178baa08c0e
|
| --- /dev/null
|
| +++ b/blimp/net/grpc_stream.cc
|
| @@ -0,0 +1,186 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "blimp/net/grpc_stream.h"
|
| +
|
| +#include "base/bind_helpers.h"
|
| +#include "base/callback.h"
|
| +#include "base/callback_helpers.h"
|
| +#include "base/command_line.h"
|
| +#include "base/memory/ptr_util.h"
|
| +#include "base/memory/ref_counted.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "base/threading/thread.h"
|
| +#include "base/threading/thread_restrictions.h"
|
| +#include "base/threading/thread_task_runner_handle.h"
|
| +
|
| +#include "content/public/browser/browser_thread.h"
|
| +
|
| +namespace blimp {
|
| +
|
| +GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback)
|
| + : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()),
|
| + connection_callback_(connection_callback),
|
| + grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")),
|
| + weak_factory_(this) {}
|
| +
|
| +GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() {
|
| + return GrpcTag::Connect(connection_callback_);
|
| +}
|
| +
|
| +GrpcStream::GrpcTag* GrpcStream::GrpcReadTag(
|
| + const Stream::ReceiveMessageCallback& received_cb) {
|
| + return GrpcTag::Read(received_cb);
|
| +}
|
| +
|
| +GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag(
|
| + const Stream::SendMessageCallback& sent_cb) {
|
| + return GrpcTag::Write(sent_cb);
|
| +}
|
| +
|
| +GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() {
|
| + return GrpcTag::WritesDone();
|
| +}
|
| +
|
| +void GrpcStream::StartCompletionQueueThread(
|
| + grpc::CompletionQueue* completion_q) {
|
| + if (grpc_thread_ == nullptr) {
|
| + LOG(FATAL) << "Completion queue thread can only be started exactly once.";
|
| + return;
|
| + }
|
| +
|
| + base::Thread::Options options;
|
| + grpc_thread_->StartWithOptions(options);
|
| + grpc_thread_.release()->task_runner()->PostTask(
|
| + FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q,
|
| + callback_task_runner_));
|
| +}
|
| +
|
| +void GrpcStream::CompletionQueueThread(
|
| + grpc::CompletionQueue* completion_q,
|
| + scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) {
|
| + base::ThreadRestrictions::SetIOAllowed(true);
|
| +
|
| + DVLOG(3) << "Starting Completion Queue thread.";
|
| + static int event_count = 0;
|
| + bool ok = false;
|
| + void* tag = nullptr;
|
| + DVLOG(3) << "Waiting for next event (grpc stream; count so far = "
|
| + << event_count << ").";
|
| + ++event_count;
|
| +
|
| + if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) {
|
| + if (tag != nullptr) {
|
| + GrpcTag* grpc_tag = reinterpret_cast<GrpcTag*>(tag);
|
| + if (callback_task_runner != nullptr) {
|
| + DVLOG(3) << "Triggering GrpcTag (from gRPC thread): "
|
| + << static_cast<int>(grpc_tag->tag_type_)
|
| + << "; Result: " << (ok ? "OK" : "ERROR");
|
| + callback_task_runner->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread,
|
| + base::Passed(base::WrapUnique(grpc_tag)), ok));
|
| + } else {
|
| + LOG(ERROR) << "Unable to trigger gRPC callbacks.";
|
| + delete grpc_tag;
|
| + }
|
| + } else {
|
| + LOG(ERROR) << "Completion queue shutting down; tag = " << tag;
|
| + }
|
| +
|
| + if (ok) {
|
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner =
|
| + base::ThreadTaskRunnerHandle::Get();
|
| + if (task_runner != nullptr) {
|
| + task_runner->PostTask(FROM_HERE,
|
| + base::Bind(&GrpcStream::CompletionQueueThread,
|
| + completion_q, callback_task_runner));
|
| + }
|
| + } else {
|
| + LOG(INFO) << "Tag " << tag << " had problems. Exiting.";
|
| + }
|
| + } else {
|
| + LOG(ERROR) << "Completion queue thread has no more events.";
|
| + }
|
| +}
|
| +
|
| +GrpcStream::~GrpcStream() { callback_task_runner_ = nullptr; }
|
| +
|
| +GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) {
|
| + DVLOG(3) << "Created Tag " << this;
|
| +}
|
| +
|
| +GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect(
|
| + const net::CompletionCallback& connection_cb) {
|
| + GrpcTag* tag = new GrpcStream::GrpcTag();
|
| + tag->tag_type_ = TagType::CONNECT;
|
| + tag->connection_cb_ = connection_cb;
|
| + return tag;
|
| +}
|
| +
|
| +GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write(
|
| + const Stream::SendMessageCallback& callback) {
|
| + GrpcTag* tag = new GrpcStream::GrpcTag();
|
| + tag->tag_type_ = TagType::WRITE;
|
| + tag->sent_cb_ = callback;
|
| + return tag;
|
| +}
|
| +
|
| +GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read(
|
| + const Stream::ReceiveMessageCallback& callback) {
|
| + GrpcTag* tag = new GrpcStream::GrpcTag();
|
| + tag->tag_type_ = TagType::READ;
|
| + tag->received_msg_ = base::MakeUnique<HeliumWrapper>();
|
| + tag->received_cb_ = callback;
|
| + return tag;
|
| +}
|
| +
|
| +GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() {
|
| + GrpcTag* tag = new GrpcStream::GrpcTag();
|
| + tag->tag_type_ = TagType::WRITES_DONE;
|
| + return tag;
|
| +}
|
| +
|
| +HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const {
|
| + CHECK_EQ(TagType::READ, tag_type_);
|
| + return received_msg_.get();
|
| +}
|
| +
|
| +void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) {
|
| + Result status_code =
|
| + (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR);
|
| + DVLOG(3) << "Processing GrpcTag (on IO thread): "
|
| + << static_cast<int>(tag_type_)
|
| + << " status_code = " << static_cast<int>(status_code);
|
| + switch (tag_type_) {
|
| + case TagType::CONNECT:
|
| + DVLOG(1) << "gRPC Stream is now setup.";
|
| + if (!connection_cb_.is_null()) {
|
| + base::ResetAndReturn(&connection_cb_).Run(status_code);
|
| + }
|
| + break;
|
| + case TagType::READ:
|
| + DVLOG(1) << "Received Helium Message = "
|
| + << "; size = " << received_msg_->ByteSize();
|
| +
|
| + base::ResetAndReturn(&received_cb_)
|
| + .Run(std::move(received_msg_), status_code);
|
| + break;
|
| + case TagType::WRITE:
|
| + // TODO(perumaal): Apply correct helium results here.
|
| + base::ResetAndReturn(&sent_cb_).Run(status_code);
|
| + break;
|
| + case TagType::WRITES_DONE:
|
| + break;
|
| + default:
|
| + LOG(FATAL) << "GrpcTag has invalid tag type : "
|
| + << static_cast<int>(tag_type_) << "; this = " << this;
|
| + }
|
| +}
|
| +
|
| +GrpcStream::GrpcTag::~GrpcTag() {
|
| + DVLOG(3) << "Deleting tag " << ((void*)this);
|
| +}
|
| +
|
| +} // namespace blimp
|
|
|