Index: blimp/net/grpc_stream.h |
diff --git a/blimp/net/grpc_stream.h b/blimp/net/grpc_stream.h |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2a98be6ad6379b24af99fed54e8beb06a6ac59bb |
--- /dev/null |
+++ b/blimp/net/grpc_stream.h |
@@ -0,0 +1,124 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#ifndef BLIMP_NET_GRPC_STREAM_H_ |
+#define BLIMP_NET_GRPC_STREAM_H_ |
+ |
+#include <list> |
+#include <memory> |
+ |
+#include "base/callback.h" |
+#include "base/command_line.h" |
+#include "base/macros.h" |
+#include "base/memory/ptr_util.h" |
+#include "base/memory/ref_counted.h" |
+#include "base/synchronization/lock.h" |
+#include "base/threading/thread.h" |
+#include "base/threading/thread_checker.h" |
+ |
+#include "blimp/common/proto/helium_service.grpc.pb.h" |
+#include "blimp/helium/stream.h" |
+#include "blimp/net/blimp_net_export.h" |
+#include "net/base/completion_callback.h" |
+ |
+namespace blimp { |
+ |
+using helium::Stream; |
+using helium::Result; |
+ |
+struct GrpcTag; |
+class GrpcEngineStream; |
+class GrpcClientStream; |
+ |
+class BLIMP_NET_EXPORT GrpcStream : public helium::Stream { |
+ public: |
+ GrpcStream(); |
+ ~GrpcStream() override; |
+ |
+ protected: |
+ // Tags used by the completion queue. Callers are expected to use this as an |
+ // opaque object and hence the implementation detail is hidden in the |
+ // sourc-file. |
+ GrpcTag* ConnectTag(const net::CompletionCallback& connection_callback); |
+ GrpcTag* ReadTag(const Stream::ReceiveMessageCallback& received_cb, |
+ HeliumWrapper** received_msg); |
+ GrpcTag* WriteTag(const Stream::SendMessageCallback& sent_cb); |
+ GrpcTag* WritesDoneTag(); |
+ |
+ // Data that is shared between the main (IO) thread as well as by the |
+ // completion queue. It is ref-counted and memory managed in a thread-safe |
+ // manner. Note: This object is meant to be accessed from either thread! |
+ // Properties: |
+ // * IO thread accesses this to send and receive messages. |
+ // * Completion queue thread accesses this to process low-level gRPC messages. |
+ // * Memory is freed when the IO thread has stopped sending/receiving messages |
+ // and the completion queue has processed any pending gRPC queue tasks. |
+ // Order in which the threads relinquish this object is explicitly unspecified |
+ // and non-deterministic given that the completion queue may be stopped at any |
+ // time (if the connection drops) and that the IO thread can also stop any |
+ // time due to user action (closing the connection explicitly). |
+ // This also cleanly separates the concern between what the completion queue |
+ // and the subclass(es) know / operate on (as opposed to making the entire |
+ // class instance be passed to the completion queue which is not clean). |
+ struct SharedData : public base::RefCountedThreadSafe<SharedData> { |
+ public: |
+ SharedData(); |
+ |
+ // ThreadChecker to ensure that the completion queue is invoked only on the |
+ // completion queue thread. |
+ std::unique_ptr<base::ThreadChecker> grpc_thread_checker; |
+ |
+ // The completion queue (for either the client or the server). |
+ std::unique_ptr<grpc::CompletionQueue> completion_queue; |
+ |
+ // The |TaskRunner| to post the gRPC callbacks on. |
+ scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner; |
+ |
+ // For debugging purposes only. Counts the number of tags processed thus |
+ // far. |
+ int event_count; |
+ |
+ // State specific to GrpcStream independent of client/engine. |
+ base::Thread* grpc_thread; |
+ |
+ protected: |
+ virtual ~SharedData(); |
+ |
+ private: |
+ friend class base::RefCountedThreadSafe<SharedData>; |
+ }; |
+ |
+ // Starts the completion queue thread with shared data between the IO and the |
+ // completion queue thread. |
+ void StartCompletionQueueThread(scoped_refptr<SharedData> shared_data); |
+ |
+ private: |
+ enum class TagType { |
+ // Invalid tag type. |
+ UNKNOWN = 0, |
+ // Tag used during initial connection. |
+ CONNECT = 1, |
+ // Tag used during waiting for a read to finish. |
+ READ = 2, |
+ // Tag used during waiting for a single write to finish. |
+ WRITE = 3, |
+ }; |
+ |
+ // GrpcTag is a data/utility class for processing the completion queue. |
+ friend struct GrpcTag; |
+ |
+ static void CompletionQueueThread(scoped_refptr<SharedData> shared_data, |
+ TagType previous_tag_type); |
+ |
+ static TagType CheckTagType(TagType current_tag_type, |
+ TagType previous_tag_type); |
+ |
+ base::ThreadChecker thread_checker_; |
+ |
+ base::WeakPtrFactory<GrpcStream> weak_factory_; |
+}; |
+ |
+} // namespace blimp |
+ |
+#endif // BLIMP_NET_GRPC_STREAM_H_ |