Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(327)

Unified Diff: blimp/net/grpc_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « blimp/net/grpc_stream.h ('k') | blimp/net/grpc_stream_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: blimp/net/grpc_stream.cc
diff --git a/blimp/net/grpc_stream.cc b/blimp/net/grpc_stream.cc
new file mode 100644
index 0000000000000000000000000000000000000000..7cfee91253169822b13e9283083bdccac1336ee3
--- /dev/null
+++ b/blimp/net/grpc_stream.cc
@@ -0,0 +1,259 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "blimp/net/grpc_stream.h"
+
+#include <memory>
+#include <utility>
+
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "base/callback_helpers.h"
+#include "base/command_line.h"
+#include "base/memory/ptr_util.h"
+#include "base/memory/ref_counted.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/thread.h"
+#include "base/threading/thread_restrictions.h"
+#include "base/threading/thread_task_runner_handle.h"
+
+namespace blimp {
+
+// GrpcStream::SharedData code. It's intended to be used as a simple
+// data-structure.
+GrpcStream::SharedData::SharedData()
+ : callback_task_runner(base::ThreadTaskRunnerHandle::Get()),
+ event_count(0),
+ grpc_thread(new base::Thread("GrpcThread")) {}
+
+GrpcStream::SharedData::~SharedData() {
+ // When the shared data is being destructed, the gRPC thread object is being
+ // explicitly relinquished beforehand so as to not have any circular
+ // references.
+ DCHECK_EQ(nullptr, grpc_thread);
+
+ // The thread checker is purely for verification inside the completion queue
+ // thread.
+ DCHECK_EQ(nullptr, grpc_thread_checker.get());
+}
+
+// A wrapper class around a gRPC tag targeted for use by the completion queue.
+struct GrpcTag {
+ public:
+ GrpcTag();
+ ~GrpcTag();
+
+ // This method calls the correct callback based on the given tag-type.
+ void ApplyCallback(bool ok);
+
+ GrpcStream::TagType tag_type;
+
+ // For initial connection.
+ net::CompletionCallback connection_cb;
+
+ // For sending.
+ Stream::SendMessageCallback sent_cb;
+
+ // For receiving.
+ std::unique_ptr<HeliumWrapper> received_msg;
+ Stream::ReceiveMessageCallback received_cb;
+
+ base::ThreadChecker thread_checker;
+
+ friend class GrpcStream;
+};
+
+GrpcTag::GrpcTag() : tag_type(GrpcStream::TagType::UNKNOWN) {
+ DVLOG(3) << "Created Tag " << this;
+}
+
+// Applies the callback (on the IO thread).
+void GrpcTag::ApplyCallback(bool ok) {
+ DCHECK(thread_checker.CalledOnValidThread());
+ Result status_code =
+ (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR);
+ DVLOG(3) << "Processing GrpcTag (on IO thread): "
+ << static_cast<int>(tag_type)
+ << " status_code = " << static_cast<int>(status_code);
+ switch (tag_type) {
+ case GrpcStream::TagType::CONNECT:
+ DVLOG(1) << "gRPC Stream is now setup.";
+ if (!connection_cb.is_null()) {
+ base::ResetAndReturn(&connection_cb).Run(status_code);
+ }
+ break;
+ case GrpcStream::TagType::READ:
+ DVLOG(1) << "Received Helium Message = "
+ << "; size = " << received_msg->ByteSize();
+
+ base::ResetAndReturn(&received_cb)
+ .Run(std::move(received_msg), status_code);
+ break;
+ case GrpcStream::TagType::WRITE:
+ // TODO(perumaal): Apply correct helium results here.
+ base::ResetAndReturn(&sent_cb).Run(status_code);
+ break;
+ default:
+ LOG(FATAL) << "GrpcTag has invalid tag type : "
+ << static_cast<int>(tag_type) << "; this = " << this;
+ }
+}
+
+GrpcTag::~GrpcTag() {
+ DVLOG(3) << "Deleting tag " << (reinterpret_cast<void*>(this));
+}
+
+// GrpcStream class.
+GrpcStream::GrpcStream() : weak_factory_(this) {}
+
+GrpcStream::~GrpcStream() {}
+
+// Various ways of creating tags. Note that the tag is an opaque structure for
+// the users of GrpcStream including the sub-classes so all the implementation
+// is self-contained in the cc file without exposing the details. All the tags
+// must be created on the same thread as the |GrpcStream| - i.e. IO thread.
+
+// Tag used during initial connection setup.
+GrpcTag* GrpcStream::ConnectTag(
+ const net::CompletionCallback& connection_callback) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ GrpcTag* tag = new GrpcTag();
+ tag->tag_type = GrpcStream::TagType::CONNECT;
+ tag->connection_cb = connection_callback;
+ return tag;
+}
+
+// Tag that delivers a read-message from the completion queue.
+GrpcTag* GrpcStream::ReadTag(const Stream::ReceiveMessageCallback& received_cb,
+ HeliumWrapper** received_msg) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ GrpcTag* tag = new GrpcTag();
+ tag->tag_type = GrpcStream::TagType::READ;
+ tag->received_msg = base::MakeUnique<HeliumWrapper>();
+ tag->received_cb = received_cb;
+ *received_msg = tag->received_msg.get();
+ return tag;
+}
+
+// Tag that indicates when a message has been sent (i.e. in the completion
+// queue).
+GrpcTag* GrpcStream::WriteTag(const Stream::SendMessageCallback& sent_cb) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ GrpcTag* tag = new GrpcTag();
+ tag->tag_type = GrpcStream::TagType::WRITE;
+ tag->sent_cb = sent_cb;
+ return tag;
+}
+
+// Starts the completion queue with the provided |shared_data| that is owned by
+// both the completion queue thread and the IO thread.
+void GrpcStream::StartCompletionQueueThread(
+ scoped_refptr<SharedData> shared_data) {
+ if (shared_data->grpc_thread == nullptr) {
+ LOG(FATAL) << "Completion queue thread can only be started exactly once.";
+ return;
+ }
+
+ base::Thread::Options options;
+ options.message_loop_type = base::MessageLoop::TYPE_IO;
+ shared_data->grpc_thread->StartWithOptions(options);
+ shared_data->grpc_thread->task_runner()->PostTask(
+ FROM_HERE,
+ base::Bind(&GrpcStream::CompletionQueueThread,
+ base::Passed(std::move(shared_data)), TagType::UNKNOWN));
+}
+
+// Ensures that the given "tag-type to tag-type transition" is OK.
+/* static */
+GrpcStream::TagType GrpcStream::CheckTagType(
+ GrpcStream::TagType current_tag_type,
+ GrpcStream::TagType previous_tag_type) {
+ switch (current_tag_type) {
+ case TagType::CONNECT:
+ DCHECK_EQ(TagType::UNKNOWN, previous_tag_type);
+ break;
+ case TagType::WRITE:
+ case TagType::READ:
+ DCHECK(previous_tag_type != TagType::UNKNOWN);
+ break;
+ default:
+ LOG(FATAL) << "Unrecognized tag-type transition; current: "
+ << static_cast<int>(current_tag_type)
+ << "; previous: " << static_cast<int>(previous_tag_type);
+ break;
+ }
+ return current_tag_type;
+}
+
+// This is the core Completion queue thread that processes gRPC tasks and
+// delivers callbacks in the right callback thread (i.e. IO thread). This will
+// continue scheduling further tasks until (a) either the completion queue is no
+// longer active or (b) the completion queue encounters a problem (i.e. it's
+// shutting down).
+/* static */
+void GrpcStream::CompletionQueueThread(scoped_refptr<SharedData> shared_data,
+ TagType previous_tag_type) {
+ base::ThreadRestrictions::SetIOAllowed(true);
+
+ if (shared_data->grpc_thread_checker == nullptr) {
+ shared_data->grpc_thread_checker = base::MakeUnique<base::ThreadChecker>();
+ }
+
+ // Create once in the completion queue thread and ensure we always get called
+ // into the same thread until the completion queue is shutdown.
+ DCHECK(shared_data->grpc_thread_checker->CalledOnValidThread());
+ bool ok = false;
+ void* tag = nullptr;
+ DVLOG(3) << "Waiting for next event (grpc stream; count so far = "
+ << shared_data->event_count << ").";
+ ++shared_data->event_count;
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner =
+ base::ThreadTaskRunnerHandle::Get();
+
+ // This is a blocking call. Waits until (a) either a tag is available for
+ // processing or (b) when the completion queue is being shutdown.
+ if (shared_data->completion_queue->Next(reinterpret_cast<void**>(&tag),
+ &ok)) {
+ if (tag != nullptr) {
+ std::unique_ptr<GrpcTag> grpc_tag =
+ base::WrapUnique<GrpcTag>(reinterpret_cast<GrpcTag*>(tag));
+ if (shared_data->callback_task_runner != nullptr) {
+ DVLOG(3) << "Triggering GrpcTag (from gRPC thread): "
+ << static_cast<int>(grpc_tag->tag_type)
+ << "; Result: " << (ok ? "OK" : "ERROR");
+ previous_tag_type =
+ GrpcStream::CheckTagType(grpc_tag->tag_type, previous_tag_type);
+ shared_data->callback_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&GrpcTag::ApplyCallback, std::move(grpc_tag), ok));
+ } else {
+ LOG(ERROR) << "Unable to trigger gRPC callbacks.";
+ }
+ }
+
+ if (!ok) {
+ DVLOG(3) << "Tag " << tag
+ << " had problems. Continuing to process remaining tags.";
+ }
+ task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&GrpcStream::CompletionQueueThread,
+ base::Passed(std::move(shared_data)), previous_tag_type));
+ return;
+ } else {
+ LOG(ERROR) << "Completion queue thread has no more events.";
+ }
+
+ // We reach here once the completion queue has shutdown or when there is an
+ // unrecoverable error. Destroy the thread and the shared_data now.
+
+ // Ensure that the |shared_data| thread checker is destructed in the same
+ // thread as it was created in.
+ shared_data->grpc_thread_checker = nullptr;
+
+ // TODO(perumaal): Figure out how to clean up the thread cleanly.
+ shared_data->grpc_thread = nullptr;
+}
+
+} // namespace blimp
« no previous file with comments | « blimp/net/grpc_stream.h ('k') | blimp/net/grpc_stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698