Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: blimp/net/grpc_stream.h

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Fixed a few minor comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef BLIMP_NET_GRPC_STREAM_H_
6 #define BLIMP_NET_GRPC_STREAM_H_
7
8 #include <list>
9 #include <memory>
10
11 #include "base/callback.h"
12 #include "base/command_line.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/thread_checker.h"
19
20 #include "blimp/common/proto/helium_service.grpc.pb.h"
21 #include "blimp/helium/stream.h"
22 #include "blimp/net/blimp_net_export.h"
23 #include "net/base/completion_callback.h"
24
25 namespace blimp {
26
27 using helium::Stream;
28 using helium::Result;
29
30 struct GrpcTag;
31 class GrpcEngineStream;
32 class GrpcClientStream;
33
34 class BLIMP_NET_EXPORT GrpcStream : public helium::Stream {
35 public:
36 GrpcStream();
37 ~GrpcStream() override;
38
39 protected:
40 // Tags used by the completion queue. Callers are expected to use this as an
41 // opaque object and hence the implementation detail is hidden in the
42 // sourc-file.
43 GrpcTag* ConnectTag(const net::CompletionCallback& connection_callback);
44 GrpcTag* ReadTag(const Stream::ReceiveMessageCallback& received_cb,
45 HeliumWrapper** received_msg);
46 GrpcTag* WriteTag(const Stream::SendMessageCallback& sent_cb);
47 GrpcTag* WritesDoneTag();
48
49 // Data that is shared between the main (IO) thread as well as by the
50 // completion queue. It is ref-counted and memory managed in a thread-safe
51 // manner. Note: This object is meant to be accessed from either thread!
52 // Properties:
53 // * IO thread accesses this to send and receive messages.
54 // * Completion queue thread accesses this to process low-level gRPC messages.
55 // * Memory is freed when the IO thread has stopped sending/receiving messages
56 // and the completion queue has processed any pending gRPC queue tasks.
57 // Order in which the threads relinquish this object is explicitly unspecified
58 // and non-deterministic given that the completion queue may be stopped at any
59 // time (if the connection drops) and that the IO thread can also stop any
60 // time due to user action (closing the connection explicitly).
61 // This also cleanly separates the concern between what the completion queue
62 // and the subclass(es) know / operate on (as opposed to making the entire
63 // class instance be passed to the completion queue which is not clean).
64 struct SharedData : public base::RefCountedThreadSafe<SharedData> {
65 public:
66 SharedData();
67
68 // State specific to GrpcStream independent of client/engine.
69
70 std::unique_ptr<base::Thread> grpc_thread;
71
72 // ThreadChecker to ensure that the completion queue is invoked only on the
73 // completion queue thread.
74 std::unique_ptr<base::ThreadChecker> grpc_thread_checker;
75
76 // The completion queue (for either the client or the server).
77 std::unique_ptr<grpc::CompletionQueue> completion_queue;
78
79 // The |TaskRunner| to post the gRPC callbacks on.
80 scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner;
81
82 // For debugging purposes only. Counts the number of tags processed thus
83 // far.
84 int event_count;
85
86 protected:
87 virtual ~SharedData();
88
89 private:
90 friend class base::RefCountedThreadSafe<SharedData>;
91 };
92
93 // Starts the completion queue thread with shared data between the IO and the
94 // completion queue thread.
95 void StartCompletionQueueThread(scoped_refptr<SharedData> shared_data);
96
97 private:
98 enum class TagType {
99 // Invalid tag type.
100 UNKNOWN = 0,
101 // Tag used during initial connection.
102 CONNECT = 1,
103 // Tag used during waiting for a read to finish.
104 READ = 2,
105 // Tag used during waiting for a single write to finish.
106 WRITE = 3,
107 };
108
109 // GrpcTag is a data/utility class for processing the completion queue.
110 friend struct GrpcTag;
111
112 static void CompletionQueueThread(scoped_refptr<SharedData> shared_data,
113 TagType previous_tag_type);
114
115 static TagType CheckTagType(TagType current_tag_type,
116 TagType previous_tag_type);
117
118 base::ThreadChecker thread_checker_;
119
120 base::WeakPtrFactory<GrpcStream> weak_factory_;
121 };
122
123 } // namespace blimp
124
125 #endif // BLIMP_NET_GRPC_STREAM_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698