Index: blimp/net/grpc_stream.cc |
diff --git a/blimp/net/grpc_stream.cc b/blimp/net/grpc_stream.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..7cfee91253169822b13e9283083bdccac1336ee3 |
--- /dev/null |
+++ b/blimp/net/grpc_stream.cc |
@@ -0,0 +1,259 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "blimp/net/grpc_stream.h" |
+ |
+#include <memory> |
+#include <utility> |
+ |
+#include "base/bind_helpers.h" |
+#include "base/callback.h" |
+#include "base/callback_helpers.h" |
+#include "base/command_line.h" |
+#include "base/memory/ptr_util.h" |
+#include "base/memory/ref_counted.h" |
+#include "base/synchronization/lock.h" |
+#include "base/threading/thread.h" |
+#include "base/threading/thread_restrictions.h" |
+#include "base/threading/thread_task_runner_handle.h" |
+ |
+namespace blimp { |
+ |
+// GrpcStream::SharedData code. It's intended to be used as a simple |
+// data-structure. |
+GrpcStream::SharedData::SharedData() |
+ : callback_task_runner(base::ThreadTaskRunnerHandle::Get()), |
+ event_count(0), |
+ grpc_thread(new base::Thread("GrpcThread")) {} |
+ |
+GrpcStream::SharedData::~SharedData() { |
+ // When the shared data is being destructed, the gRPC thread object is being |
+ // explicitly relinquished beforehand so as to not have any circular |
+ // references. |
+ DCHECK_EQ(nullptr, grpc_thread); |
+ |
+ // The thread checker is purely for verification inside the completion queue |
+ // thread. |
+ DCHECK_EQ(nullptr, grpc_thread_checker.get()); |
+} |
+ |
+// A wrapper class around a gRPC tag targeted for use by the completion queue. |
+struct GrpcTag { |
+ public: |
+ GrpcTag(); |
+ ~GrpcTag(); |
+ |
+ // This method calls the correct callback based on the given tag-type. |
+ void ApplyCallback(bool ok); |
+ |
+ GrpcStream::TagType tag_type; |
+ |
+ // For initial connection. |
+ net::CompletionCallback connection_cb; |
+ |
+ // For sending. |
+ Stream::SendMessageCallback sent_cb; |
+ |
+ // For receiving. |
+ std::unique_ptr<HeliumWrapper> received_msg; |
+ Stream::ReceiveMessageCallback received_cb; |
+ |
+ base::ThreadChecker thread_checker; |
+ |
+ friend class GrpcStream; |
+}; |
+ |
+GrpcTag::GrpcTag() : tag_type(GrpcStream::TagType::UNKNOWN) { |
+ DVLOG(3) << "Created Tag " << this; |
+} |
+ |
+// Applies the callback (on the IO thread). |
+void GrpcTag::ApplyCallback(bool ok) { |
+ DCHECK(thread_checker.CalledOnValidThread()); |
+ Result status_code = |
+ (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR); |
+ DVLOG(3) << "Processing GrpcTag (on IO thread): " |
+ << static_cast<int>(tag_type) |
+ << " status_code = " << static_cast<int>(status_code); |
+ switch (tag_type) { |
+ case GrpcStream::TagType::CONNECT: |
+ DVLOG(1) << "gRPC Stream is now setup."; |
+ if (!connection_cb.is_null()) { |
+ base::ResetAndReturn(&connection_cb).Run(status_code); |
+ } |
+ break; |
+ case GrpcStream::TagType::READ: |
+ DVLOG(1) << "Received Helium Message = " |
+ << "; size = " << received_msg->ByteSize(); |
+ |
+ base::ResetAndReturn(&received_cb) |
+ .Run(std::move(received_msg), status_code); |
+ break; |
+ case GrpcStream::TagType::WRITE: |
+ // TODO(perumaal): Apply correct helium results here. |
+ base::ResetAndReturn(&sent_cb).Run(status_code); |
+ break; |
+ default: |
+ LOG(FATAL) << "GrpcTag has invalid tag type : " |
+ << static_cast<int>(tag_type) << "; this = " << this; |
+ } |
+} |
+ |
+GrpcTag::~GrpcTag() { |
+ DVLOG(3) << "Deleting tag " << (reinterpret_cast<void*>(this)); |
+} |
+ |
+// GrpcStream class. |
+GrpcStream::GrpcStream() : weak_factory_(this) {} |
+ |
+GrpcStream::~GrpcStream() {} |
+ |
+// Various ways of creating tags. Note that the tag is an opaque structure for |
+// the users of GrpcStream including the sub-classes so all the implementation |
+// is self-contained in the cc file without exposing the details. All the tags |
+// must be created on the same thread as the |GrpcStream| - i.e. IO thread. |
+ |
+// Tag used during initial connection setup. |
+GrpcTag* GrpcStream::ConnectTag( |
+ const net::CompletionCallback& connection_callback) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ GrpcTag* tag = new GrpcTag(); |
+ tag->tag_type = GrpcStream::TagType::CONNECT; |
+ tag->connection_cb = connection_callback; |
+ return tag; |
+} |
+ |
+// Tag that delivers a read-message from the completion queue. |
+GrpcTag* GrpcStream::ReadTag(const Stream::ReceiveMessageCallback& received_cb, |
+ HeliumWrapper** received_msg) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ GrpcTag* tag = new GrpcTag(); |
+ tag->tag_type = GrpcStream::TagType::READ; |
+ tag->received_msg = base::MakeUnique<HeliumWrapper>(); |
+ tag->received_cb = received_cb; |
+ *received_msg = tag->received_msg.get(); |
+ return tag; |
+} |
+ |
+// Tag that indicates when a message has been sent (i.e. in the completion |
+// queue). |
+GrpcTag* GrpcStream::WriteTag(const Stream::SendMessageCallback& sent_cb) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ GrpcTag* tag = new GrpcTag(); |
+ tag->tag_type = GrpcStream::TagType::WRITE; |
+ tag->sent_cb = sent_cb; |
+ return tag; |
+} |
+ |
+// Starts the completion queue with the provided |shared_data| that is owned by |
+// both the completion queue thread and the IO thread. |
+void GrpcStream::StartCompletionQueueThread( |
+ scoped_refptr<SharedData> shared_data) { |
+ if (shared_data->grpc_thread == nullptr) { |
+ LOG(FATAL) << "Completion queue thread can only be started exactly once."; |
+ return; |
+ } |
+ |
+ base::Thread::Options options; |
+ options.message_loop_type = base::MessageLoop::TYPE_IO; |
+ shared_data->grpc_thread->StartWithOptions(options); |
+ shared_data->grpc_thread->task_runner()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&GrpcStream::CompletionQueueThread, |
+ base::Passed(std::move(shared_data)), TagType::UNKNOWN)); |
+} |
+ |
+// Ensures that the given "tag-type to tag-type transition" is OK. |
+/* static */ |
+GrpcStream::TagType GrpcStream::CheckTagType( |
+ GrpcStream::TagType current_tag_type, |
+ GrpcStream::TagType previous_tag_type) { |
+ switch (current_tag_type) { |
+ case TagType::CONNECT: |
+ DCHECK_EQ(TagType::UNKNOWN, previous_tag_type); |
+ break; |
+ case TagType::WRITE: |
+ case TagType::READ: |
+ DCHECK(previous_tag_type != TagType::UNKNOWN); |
+ break; |
+ default: |
+ LOG(FATAL) << "Unrecognized tag-type transition; current: " |
+ << static_cast<int>(current_tag_type) |
+ << "; previous: " << static_cast<int>(previous_tag_type); |
+ break; |
+ } |
+ return current_tag_type; |
+} |
+ |
+// This is the core Completion queue thread that processes gRPC tasks and |
+// delivers callbacks in the right callback thread (i.e. IO thread). This will |
+// continue scheduling further tasks until (a) either the completion queue is no |
+// longer active or (b) the completion queue encounters a problem (i.e. it's |
+// shutting down). |
+/* static */ |
+void GrpcStream::CompletionQueueThread(scoped_refptr<SharedData> shared_data, |
+ TagType previous_tag_type) { |
+ base::ThreadRestrictions::SetIOAllowed(true); |
+ |
+ if (shared_data->grpc_thread_checker == nullptr) { |
+ shared_data->grpc_thread_checker = base::MakeUnique<base::ThreadChecker>(); |
+ } |
+ |
+ // Create once in the completion queue thread and ensure we always get called |
+ // into the same thread until the completion queue is shutdown. |
+ DCHECK(shared_data->grpc_thread_checker->CalledOnValidThread()); |
+ bool ok = false; |
+ void* tag = nullptr; |
+ DVLOG(3) << "Waiting for next event (grpc stream; count so far = " |
+ << shared_data->event_count << ")."; |
+ ++shared_data->event_count; |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner = |
+ base::ThreadTaskRunnerHandle::Get(); |
+ |
+ // This is a blocking call. Waits until (a) either a tag is available for |
+ // processing or (b) when the completion queue is being shutdown. |
+ if (shared_data->completion_queue->Next(reinterpret_cast<void**>(&tag), |
+ &ok)) { |
+ if (tag != nullptr) { |
+ std::unique_ptr<GrpcTag> grpc_tag = |
+ base::WrapUnique<GrpcTag>(reinterpret_cast<GrpcTag*>(tag)); |
+ if (shared_data->callback_task_runner != nullptr) { |
+ DVLOG(3) << "Triggering GrpcTag (from gRPC thread): " |
+ << static_cast<int>(grpc_tag->tag_type) |
+ << "; Result: " << (ok ? "OK" : "ERROR"); |
+ previous_tag_type = |
+ GrpcStream::CheckTagType(grpc_tag->tag_type, previous_tag_type); |
+ shared_data->callback_task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&GrpcTag::ApplyCallback, std::move(grpc_tag), ok)); |
+ } else { |
+ LOG(ERROR) << "Unable to trigger gRPC callbacks."; |
+ } |
+ } |
+ |
+ if (!ok) { |
+ DVLOG(3) << "Tag " << tag |
+ << " had problems. Continuing to process remaining tags."; |
+ } |
+ task_runner->PostTask( |
+ FROM_HERE, |
+ base::Bind(&GrpcStream::CompletionQueueThread, |
+ base::Passed(std::move(shared_data)), previous_tag_type)); |
+ return; |
+ } else { |
+ LOG(ERROR) << "Completion queue thread has no more events."; |
+ } |
+ |
+ // We reach here once the completion queue has shutdown or when there is an |
+ // unrecoverable error. Destroy the thread and the shared_data now. |
+ |
+ // Ensure that the |shared_data| thread checker is destructed in the same |
+ // thread as it was created in. |
+ shared_data->grpc_thread_checker = nullptr; |
+ |
+ // TODO(perumaal): Figure out how to clean up the thread cleanly. |
+ shared_data->grpc_thread = nullptr; |
+} |
+ |
+} // namespace blimp |