Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(501)

Side by Side Diff: blimp/net/grpc_stream.h

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « blimp/net/grpc_engine_transport.cc ('k') | blimp/net/grpc_stream.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef BLIMP_NET_GRPC_STREAM_H_
6 #define BLIMP_NET_GRPC_STREAM_H_
7
8 #include <list>
9 #include <memory>
10
11 #include "base/callback.h"
12 #include "base/command_line.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/thread_checker.h"
19
20 #include "blimp/common/proto/helium_service.grpc.pb.h"
21 #include "blimp/helium/stream.h"
22 #include "blimp/net/blimp_net_export.h"
23 #include "net/base/completion_callback.h"
24
25 namespace blimp {
26
27 using helium::Stream;
28 using helium::Result;
29
30 struct GrpcTag;
31 class GrpcEngineStream;
32 class GrpcClientStream;
33
34 class BLIMP_NET_EXPORT GrpcStream : public helium::Stream {
35 public:
36 GrpcStream();
37 ~GrpcStream() override;
38
39 protected:
40 // Tags used by the completion queue. Callers are expected to use this as an
41 // opaque object and hence the implementation detail is hidden in the
42 // sourc-file.
43 GrpcTag* ConnectTag(const net::CompletionCallback& connection_callback);
44 GrpcTag* ReadTag(const Stream::ReceiveMessageCallback& received_cb,
45 HeliumWrapper** received_msg);
46 GrpcTag* WriteTag(const Stream::SendMessageCallback& sent_cb);
47 GrpcTag* WritesDoneTag();
48
49 // Data that is shared between the main (IO) thread as well as by the
50 // completion queue. It is ref-counted and memory managed in a thread-safe
51 // manner. Note: This object is meant to be accessed from either thread!
52 // Properties:
53 // * IO thread accesses this to send and receive messages.
54 // * Completion queue thread accesses this to process low-level gRPC messages.
55 // * Memory is freed when the IO thread has stopped sending/receiving messages
56 // and the completion queue has processed any pending gRPC queue tasks.
57 // Order in which the threads relinquish this object is explicitly unspecified
58 // and non-deterministic given that the completion queue may be stopped at any
59 // time (if the connection drops) and that the IO thread can also stop any
60 // time due to user action (closing the connection explicitly).
61 // This also cleanly separates the concern between what the completion queue
62 // and the subclass(es) know / operate on (as opposed to making the entire
63 // class instance be passed to the completion queue which is not clean).
64 struct SharedData : public base::RefCountedThreadSafe<SharedData> {
65 public:
66 SharedData();
67
68 // ThreadChecker to ensure that the completion queue is invoked only on the
69 // completion queue thread.
70 std::unique_ptr<base::ThreadChecker> grpc_thread_checker;
71
72 // The completion queue (for either the client or the server).
73 std::unique_ptr<grpc::CompletionQueue> completion_queue;
74
75 // The |TaskRunner| to post the gRPC callbacks on.
76 scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner;
77
78 // For debugging purposes only. Counts the number of tags processed thus
79 // far.
80 int event_count;
81
82 // State specific to GrpcStream independent of client/engine.
83 base::Thread* grpc_thread;
84
85 protected:
86 virtual ~SharedData();
87
88 private:
89 friend class base::RefCountedThreadSafe<SharedData>;
90 };
91
92 // Starts the completion queue thread with shared data between the IO and the
93 // completion queue thread.
94 void StartCompletionQueueThread(scoped_refptr<SharedData> shared_data);
95
96 private:
97 enum class TagType {
98 // Invalid tag type.
99 UNKNOWN = 0,
100 // Tag used during initial connection.
101 CONNECT = 1,
102 // Tag used during waiting for a read to finish.
103 READ = 2,
104 // Tag used during waiting for a single write to finish.
105 WRITE = 3,
106 };
107
108 // GrpcTag is a data/utility class for processing the completion queue.
109 friend struct GrpcTag;
110
111 static void CompletionQueueThread(scoped_refptr<SharedData> shared_data,
112 TagType previous_tag_type);
113
114 static TagType CheckTagType(TagType current_tag_type,
115 TagType previous_tag_type);
116
117 base::ThreadChecker thread_checker_;
118
119 base::WeakPtrFactory<GrpcStream> weak_factory_;
120 };
121
122 } // namespace blimp
123
124 #endif // BLIMP_NET_GRPC_STREAM_H_
OLDNEW
« no previous file with comments | « blimp/net/grpc_engine_transport.cc ('k') | blimp/net/grpc_stream.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698