| Index: blimp/net/grpc_stream.cc
|
| diff --git a/blimp/net/grpc_stream.cc b/blimp/net/grpc_stream.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..7cfee91253169822b13e9283083bdccac1336ee3
|
| --- /dev/null
|
| +++ b/blimp/net/grpc_stream.cc
|
| @@ -0,0 +1,259 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "blimp/net/grpc_stream.h"
|
| +
|
| +#include <memory>
|
| +#include <utility>
|
| +
|
| +#include "base/bind_helpers.h"
|
| +#include "base/callback.h"
|
| +#include "base/callback_helpers.h"
|
| +#include "base/command_line.h"
|
| +#include "base/memory/ptr_util.h"
|
| +#include "base/memory/ref_counted.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "base/threading/thread.h"
|
| +#include "base/threading/thread_restrictions.h"
|
| +#include "base/threading/thread_task_runner_handle.h"
|
| +
|
| +namespace blimp {
|
| +
|
| +// GrpcStream::SharedData code. It's intended to be used as a simple
|
| +// data-structure.
|
| +GrpcStream::SharedData::SharedData()
|
| + : callback_task_runner(base::ThreadTaskRunnerHandle::Get()),
|
| + event_count(0),
|
| + grpc_thread(new base::Thread("GrpcThread")) {}
|
| +
|
| +GrpcStream::SharedData::~SharedData() {
|
| + // When the shared data is being destructed, the gRPC thread object is being
|
| + // explicitly relinquished beforehand so as to not have any circular
|
| + // references.
|
| + DCHECK_EQ(nullptr, grpc_thread);
|
| +
|
| + // The thread checker is purely for verification inside the completion queue
|
| + // thread.
|
| + DCHECK_EQ(nullptr, grpc_thread_checker.get());
|
| +}
|
| +
|
| +// A wrapper class around a gRPC tag targeted for use by the completion queue.
|
| +struct GrpcTag {
|
| + public:
|
| + GrpcTag();
|
| + ~GrpcTag();
|
| +
|
| + // This method calls the correct callback based on the given tag-type.
|
| + void ApplyCallback(bool ok);
|
| +
|
| + GrpcStream::TagType tag_type;
|
| +
|
| + // For initial connection.
|
| + net::CompletionCallback connection_cb;
|
| +
|
| + // For sending.
|
| + Stream::SendMessageCallback sent_cb;
|
| +
|
| + // For receiving.
|
| + std::unique_ptr<HeliumWrapper> received_msg;
|
| + Stream::ReceiveMessageCallback received_cb;
|
| +
|
| + base::ThreadChecker thread_checker;
|
| +
|
| + friend class GrpcStream;
|
| +};
|
| +
|
| +GrpcTag::GrpcTag() : tag_type(GrpcStream::TagType::UNKNOWN) {
|
| + DVLOG(3) << "Created Tag " << this;
|
| +}
|
| +
|
| +// Applies the callback (on the IO thread).
|
| +void GrpcTag::ApplyCallback(bool ok) {
|
| + DCHECK(thread_checker.CalledOnValidThread());
|
| + Result status_code =
|
| + (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR);
|
| + DVLOG(3) << "Processing GrpcTag (on IO thread): "
|
| + << static_cast<int>(tag_type)
|
| + << " status_code = " << static_cast<int>(status_code);
|
| + switch (tag_type) {
|
| + case GrpcStream::TagType::CONNECT:
|
| + DVLOG(1) << "gRPC Stream is now setup.";
|
| + if (!connection_cb.is_null()) {
|
| + base::ResetAndReturn(&connection_cb).Run(status_code);
|
| + }
|
| + break;
|
| + case GrpcStream::TagType::READ:
|
| + DVLOG(1) << "Received Helium Message = "
|
| + << "; size = " << received_msg->ByteSize();
|
| +
|
| + base::ResetAndReturn(&received_cb)
|
| + .Run(std::move(received_msg), status_code);
|
| + break;
|
| + case GrpcStream::TagType::WRITE:
|
| + // TODO(perumaal): Apply correct helium results here.
|
| + base::ResetAndReturn(&sent_cb).Run(status_code);
|
| + break;
|
| + default:
|
| + LOG(FATAL) << "GrpcTag has invalid tag type : "
|
| + << static_cast<int>(tag_type) << "; this = " << this;
|
| + }
|
| +}
|
| +
|
| +GrpcTag::~GrpcTag() {
|
| + DVLOG(3) << "Deleting tag " << (reinterpret_cast<void*>(this));
|
| +}
|
| +
|
| +// GrpcStream class.
|
| +GrpcStream::GrpcStream() : weak_factory_(this) {}
|
| +
|
| +GrpcStream::~GrpcStream() {}
|
| +
|
| +// Various ways of creating tags. Note that the tag is an opaque structure for
|
| +// the users of GrpcStream including the sub-classes so all the implementation
|
| +// is self-contained in the cc file without exposing the details. All the tags
|
| +// must be created on the same thread as the |GrpcStream| - i.e. IO thread.
|
| +
|
| +// Tag used during initial connection setup.
|
| +GrpcTag* GrpcStream::ConnectTag(
|
| + const net::CompletionCallback& connection_callback) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + GrpcTag* tag = new GrpcTag();
|
| + tag->tag_type = GrpcStream::TagType::CONNECT;
|
| + tag->connection_cb = connection_callback;
|
| + return tag;
|
| +}
|
| +
|
| +// Tag that delivers a read-message from the completion queue.
|
| +GrpcTag* GrpcStream::ReadTag(const Stream::ReceiveMessageCallback& received_cb,
|
| + HeliumWrapper** received_msg) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + GrpcTag* tag = new GrpcTag();
|
| + tag->tag_type = GrpcStream::TagType::READ;
|
| + tag->received_msg = base::MakeUnique<HeliumWrapper>();
|
| + tag->received_cb = received_cb;
|
| + *received_msg = tag->received_msg.get();
|
| + return tag;
|
| +}
|
| +
|
| +// Tag that indicates when a message has been sent (i.e. in the completion
|
| +// queue).
|
| +GrpcTag* GrpcStream::WriteTag(const Stream::SendMessageCallback& sent_cb) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + GrpcTag* tag = new GrpcTag();
|
| + tag->tag_type = GrpcStream::TagType::WRITE;
|
| + tag->sent_cb = sent_cb;
|
| + return tag;
|
| +}
|
| +
|
| +// Starts the completion queue with the provided |shared_data| that is owned by
|
| +// both the completion queue thread and the IO thread.
|
| +void GrpcStream::StartCompletionQueueThread(
|
| + scoped_refptr<SharedData> shared_data) {
|
| + if (shared_data->grpc_thread == nullptr) {
|
| + LOG(FATAL) << "Completion queue thread can only be started exactly once.";
|
| + return;
|
| + }
|
| +
|
| + base::Thread::Options options;
|
| + options.message_loop_type = base::MessageLoop::TYPE_IO;
|
| + shared_data->grpc_thread->StartWithOptions(options);
|
| + shared_data->grpc_thread->task_runner()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&GrpcStream::CompletionQueueThread,
|
| + base::Passed(std::move(shared_data)), TagType::UNKNOWN));
|
| +}
|
| +
|
| +// Ensures that the given "tag-type to tag-type transition" is OK.
|
| +/* static */
|
| +GrpcStream::TagType GrpcStream::CheckTagType(
|
| + GrpcStream::TagType current_tag_type,
|
| + GrpcStream::TagType previous_tag_type) {
|
| + switch (current_tag_type) {
|
| + case TagType::CONNECT:
|
| + DCHECK_EQ(TagType::UNKNOWN, previous_tag_type);
|
| + break;
|
| + case TagType::WRITE:
|
| + case TagType::READ:
|
| + DCHECK(previous_tag_type != TagType::UNKNOWN);
|
| + break;
|
| + default:
|
| + LOG(FATAL) << "Unrecognized tag-type transition; current: "
|
| + << static_cast<int>(current_tag_type)
|
| + << "; previous: " << static_cast<int>(previous_tag_type);
|
| + break;
|
| + }
|
| + return current_tag_type;
|
| +}
|
| +
|
| +// This is the core Completion queue thread that processes gRPC tasks and
|
| +// delivers callbacks in the right callback thread (i.e. IO thread). This will
|
| +// continue scheduling further tasks until (a) either the completion queue is no
|
| +// longer active or (b) the completion queue encounters a problem (i.e. it's
|
| +// shutting down).
|
| +/* static */
|
| +void GrpcStream::CompletionQueueThread(scoped_refptr<SharedData> shared_data,
|
| + TagType previous_tag_type) {
|
| + base::ThreadRestrictions::SetIOAllowed(true);
|
| +
|
| + if (shared_data->grpc_thread_checker == nullptr) {
|
| + shared_data->grpc_thread_checker = base::MakeUnique<base::ThreadChecker>();
|
| + }
|
| +
|
| + // Create once in the completion queue thread and ensure we always get called
|
| + // into the same thread until the completion queue is shutdown.
|
| + DCHECK(shared_data->grpc_thread_checker->CalledOnValidThread());
|
| + bool ok = false;
|
| + void* tag = nullptr;
|
| + DVLOG(3) << "Waiting for next event (grpc stream; count so far = "
|
| + << shared_data->event_count << ").";
|
| + ++shared_data->event_count;
|
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner =
|
| + base::ThreadTaskRunnerHandle::Get();
|
| +
|
| + // This is a blocking call. Waits until (a) either a tag is available for
|
| + // processing or (b) when the completion queue is being shutdown.
|
| + if (shared_data->completion_queue->Next(reinterpret_cast<void**>(&tag),
|
| + &ok)) {
|
| + if (tag != nullptr) {
|
| + std::unique_ptr<GrpcTag> grpc_tag =
|
| + base::WrapUnique<GrpcTag>(reinterpret_cast<GrpcTag*>(tag));
|
| + if (shared_data->callback_task_runner != nullptr) {
|
| + DVLOG(3) << "Triggering GrpcTag (from gRPC thread): "
|
| + << static_cast<int>(grpc_tag->tag_type)
|
| + << "; Result: " << (ok ? "OK" : "ERROR");
|
| + previous_tag_type =
|
| + GrpcStream::CheckTagType(grpc_tag->tag_type, previous_tag_type);
|
| + shared_data->callback_task_runner->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&GrpcTag::ApplyCallback, std::move(grpc_tag), ok));
|
| + } else {
|
| + LOG(ERROR) << "Unable to trigger gRPC callbacks.";
|
| + }
|
| + }
|
| +
|
| + if (!ok) {
|
| + DVLOG(3) << "Tag " << tag
|
| + << " had problems. Continuing to process remaining tags.";
|
| + }
|
| + task_runner->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&GrpcStream::CompletionQueueThread,
|
| + base::Passed(std::move(shared_data)), previous_tag_type));
|
| + return;
|
| + } else {
|
| + LOG(ERROR) << "Completion queue thread has no more events.";
|
| + }
|
| +
|
| + // We reach here once the completion queue has shutdown or when there is an
|
| + // unrecoverable error. Destroy the thread and the shared_data now.
|
| +
|
| + // Ensure that the |shared_data| thread checker is destructed in the same
|
| + // thread as it was created in.
|
| + shared_data->grpc_thread_checker = nullptr;
|
| +
|
| + // TODO(perumaal): Figure out how to clean up the thread cleanly.
|
| + shared_data->grpc_thread = nullptr;
|
| +}
|
| +
|
| +} // namespace blimp
|
|
|