Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(781)

Side by Side Diff: third_party/grpc/include/grpc++/impl/codegen/async_stream.h

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #ifndef GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
36
37 #include <grpc++/impl/codegen/channel_interface.h>
38 #include <grpc++/impl/codegen/call.h>
39 #include <grpc++/impl/codegen/service_type.h>
40 #include <grpc++/impl/codegen/server_context.h>
41 #include <grpc++/impl/codegen/status.h>
42
43 namespace grpc {
44
45 class CompletionQueue;
46
47 /// Common interface for all client side asynchronous streaming.
48 class ClientAsyncStreamingInterface {
49 public:
50 virtual ~ClientAsyncStreamingInterface() {}
51
52 /// Request notification of the reading of the initial metadata. Completion
53 /// will be notified by \a tag on the associated completion queue.
54 ///
55 /// \param[in] tag Tag identifying this request.
56 virtual void ReadInitialMetadata(void* tag) = 0;
57
58 /// Request notification completion.
59 ///
60 /// \param[out] status To be updated with the operation status.
61 /// \param[in] tag Tag identifying this request.
62 virtual void Finish(Status* status, void* tag) = 0;
63 };
64
65 /// An interface that yields a sequence of messages of type \a R.
66 template <class R>
67 class AsyncReaderInterface {
68 public:
69 virtual ~AsyncReaderInterface() {}
70
71 /// Read a message of type \a R into \a msg. Completion will be notified by \a
72 /// tag on the associated completion queue.
73 ///
74 /// \param[out] msg Where to eventually store the read message.
75 /// \param[in] tag The tag identifying the operation.
76 virtual void Read(R* msg, void* tag) = 0;
77 };
78
79 /// An interface that can be fed a sequence of messages of type \a W.
80 template <class W>
81 class AsyncWriterInterface {
82 public:
83 virtual ~AsyncWriterInterface() {}
84
85 /// Request the writing of \a msg with identifying tag \a tag.
86 ///
87 /// Only one write may be outstanding at any given time. This means that
88 /// after calling Write, one must wait to receive \a tag from the completion
89 /// queue BEFORE calling Write again.
90 ///
91 /// \param[in] msg The message to be written.
92 /// \param[in] tag The tag identifying the operation.
93 virtual void Write(const W& msg, void* tag) = 0;
94 };
95
96 template <class R>
97 class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
98 public AsyncReaderInterface<R> {};
99
100 template <class R>
101 class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
102 public:
103 /// Create a stream and write the first request out.
104 template <class W>
105 ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
106 const RpcMethod& method, ClientContext* context,
107 const W& request, void* tag)
108 : context_(context), call_(channel->CreateCall(method, context, cq)) {
109 init_ops_.set_output_tag(tag);
110 init_ops_.SendInitialMetadata(context->send_initial_metadata_);
111 // TODO(ctiller): don't assert
112 GPR_ASSERT(init_ops_.SendMessage(request).ok());
113 init_ops_.ClientSendClose();
114 call_.PerformOps(&init_ops_);
115 }
116
117 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
118 GPR_ASSERT(!context_->initial_metadata_received_);
119
120 meta_ops_.set_output_tag(tag);
121 meta_ops_.RecvInitialMetadata(context_);
122 call_.PerformOps(&meta_ops_);
123 }
124
125 void Read(R* msg, void* tag) GRPC_OVERRIDE {
126 read_ops_.set_output_tag(tag);
127 if (!context_->initial_metadata_received_) {
128 read_ops_.RecvInitialMetadata(context_);
129 }
130 read_ops_.RecvMessage(msg);
131 call_.PerformOps(&read_ops_);
132 }
133
134 void Finish(Status* status, void* tag) GRPC_OVERRIDE {
135 finish_ops_.set_output_tag(tag);
136 if (!context_->initial_metadata_received_) {
137 finish_ops_.RecvInitialMetadata(context_);
138 }
139 finish_ops_.ClientRecvStatus(context_, status);
140 call_.PerformOps(&finish_ops_);
141 }
142
143 private:
144 ClientContext* context_;
145 Call call_;
146 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
147 init_ops_;
148 CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
149 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
150 CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
151 };
152
153 /// Common interface for client side asynchronous writing.
154 template <class W>
155 class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
156 public AsyncWriterInterface<W> {
157 public:
158 /// Signal the client is done with the writes.
159 ///
160 /// \param[in] tag The tag identifying the operation.
161 virtual void WritesDone(void* tag) = 0;
162 };
163
164 template <class W>
165 class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
166 public:
167 template <class R>
168 ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
169 const RpcMethod& method, ClientContext* context,
170 R* response, void* tag)
171 : context_(context), call_(channel->CreateCall(method, context, cq)) {
172 finish_ops_.RecvMessage(response);
173
174 init_ops_.set_output_tag(tag);
175 init_ops_.SendInitialMetadata(context->send_initial_metadata_);
176 call_.PerformOps(&init_ops_);
177 }
178
179 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
180 GPR_ASSERT(!context_->initial_metadata_received_);
181
182 meta_ops_.set_output_tag(tag);
183 meta_ops_.RecvInitialMetadata(context_);
184 call_.PerformOps(&meta_ops_);
185 }
186
187 void Write(const W& msg, void* tag) GRPC_OVERRIDE {
188 write_ops_.set_output_tag(tag);
189 // TODO(ctiller): don't assert
190 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
191 call_.PerformOps(&write_ops_);
192 }
193
194 void WritesDone(void* tag) GRPC_OVERRIDE {
195 writes_done_ops_.set_output_tag(tag);
196 writes_done_ops_.ClientSendClose();
197 call_.PerformOps(&writes_done_ops_);
198 }
199
200 void Finish(Status* status, void* tag) GRPC_OVERRIDE {
201 finish_ops_.set_output_tag(tag);
202 if (!context_->initial_metadata_received_) {
203 finish_ops_.RecvInitialMetadata(context_);
204 }
205 finish_ops_.ClientRecvStatus(context_, status);
206 call_.PerformOps(&finish_ops_);
207 }
208
209 private:
210 ClientContext* context_;
211 Call call_;
212 CallOpSet<CallOpSendInitialMetadata> init_ops_;
213 CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
214 CallOpSet<CallOpSendMessage> write_ops_;
215 CallOpSet<CallOpClientSendClose> writes_done_ops_;
216 CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
217 CallOpClientRecvStatus> finish_ops_;
218 };
219
220 /// Client-side interface for asynchronous bi-directional streaming.
221 template <class W, class R>
222 class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface,
223 public AsyncWriterInterface<W>,
224 public AsyncReaderInterface<R> {
225 public:
226 /// Signal the client is done with the writes.
227 ///
228 /// \param[in] tag The tag identifying the operation.
229 virtual void WritesDone(void* tag) = 0;
230 };
231
232 template <class W, class R>
233 class ClientAsyncReaderWriter GRPC_FINAL
234 : public ClientAsyncReaderWriterInterface<W, R> {
235 public:
236 ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
237 const RpcMethod& method, ClientContext* context,
238 void* tag)
239 : context_(context), call_(channel->CreateCall(method, context, cq)) {
240 init_ops_.set_output_tag(tag);
241 init_ops_.SendInitialMetadata(context->send_initial_metadata_);
242 call_.PerformOps(&init_ops_);
243 }
244
245 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
246 GPR_ASSERT(!context_->initial_metadata_received_);
247
248 meta_ops_.set_output_tag(tag);
249 meta_ops_.RecvInitialMetadata(context_);
250 call_.PerformOps(&meta_ops_);
251 }
252
253 void Read(R* msg, void* tag) GRPC_OVERRIDE {
254 read_ops_.set_output_tag(tag);
255 if (!context_->initial_metadata_received_) {
256 read_ops_.RecvInitialMetadata(context_);
257 }
258 read_ops_.RecvMessage(msg);
259 call_.PerformOps(&read_ops_);
260 }
261
262 void Write(const W& msg, void* tag) GRPC_OVERRIDE {
263 write_ops_.set_output_tag(tag);
264 // TODO(ctiller): don't assert
265 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
266 call_.PerformOps(&write_ops_);
267 }
268
269 void WritesDone(void* tag) GRPC_OVERRIDE {
270 writes_done_ops_.set_output_tag(tag);
271 writes_done_ops_.ClientSendClose();
272 call_.PerformOps(&writes_done_ops_);
273 }
274
275 void Finish(Status* status, void* tag) GRPC_OVERRIDE {
276 finish_ops_.set_output_tag(tag);
277 if (!context_->initial_metadata_received_) {
278 finish_ops_.RecvInitialMetadata(context_);
279 }
280 finish_ops_.ClientRecvStatus(context_, status);
281 call_.PerformOps(&finish_ops_);
282 }
283
284 private:
285 ClientContext* context_;
286 Call call_;
287 CallOpSet<CallOpSendInitialMetadata> init_ops_;
288 CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
289 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
290 CallOpSet<CallOpSendMessage> write_ops_;
291 CallOpSet<CallOpClientSendClose> writes_done_ops_;
292 CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
293 };
294
295 template <class W, class R>
296 class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
297 public AsyncReaderInterface<R> {
298 public:
299 explicit ServerAsyncReader(ServerContext* ctx)
300 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
301
302 void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
303 GPR_ASSERT(!ctx_->sent_initial_metadata_);
304
305 meta_ops_.set_output_tag(tag);
306 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
307 ctx_->sent_initial_metadata_ = true;
308 call_.PerformOps(&meta_ops_);
309 }
310
311 void Read(R* msg, void* tag) GRPC_OVERRIDE {
312 read_ops_.set_output_tag(tag);
313 read_ops_.RecvMessage(msg);
314 call_.PerformOps(&read_ops_);
315 }
316
317 void Finish(const W& msg, const Status& status, void* tag) {
318 finish_ops_.set_output_tag(tag);
319 if (!ctx_->sent_initial_metadata_) {
320 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
321 ctx_->sent_initial_metadata_ = true;
322 }
323 // The response is dropped if the status is not OK.
324 if (status.ok()) {
325 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
326 finish_ops_.SendMessage(msg));
327 } else {
328 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
329 }
330 call_.PerformOps(&finish_ops_);
331 }
332
333 void FinishWithError(const Status& status, void* tag) {
334 GPR_ASSERT(!status.ok());
335 finish_ops_.set_output_tag(tag);
336 if (!ctx_->sent_initial_metadata_) {
337 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
338 ctx_->sent_initial_metadata_ = true;
339 }
340 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
341 call_.PerformOps(&finish_ops_);
342 }
343
344 private:
345 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
346
347 Call call_;
348 ServerContext* ctx_;
349 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
350 CallOpSet<CallOpRecvMessage<R>> read_ops_;
351 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
352 CallOpServerSendStatus> finish_ops_;
353 };
354
355 template <class W>
356 class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
357 public AsyncWriterInterface<W> {
358 public:
359 explicit ServerAsyncWriter(ServerContext* ctx)
360 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
361
362 void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
363 GPR_ASSERT(!ctx_->sent_initial_metadata_);
364
365 meta_ops_.set_output_tag(tag);
366 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
367 ctx_->sent_initial_metadata_ = true;
368 call_.PerformOps(&meta_ops_);
369 }
370
371 void Write(const W& msg, void* tag) GRPC_OVERRIDE {
372 write_ops_.set_output_tag(tag);
373 if (!ctx_->sent_initial_metadata_) {
374 write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
375 ctx_->sent_initial_metadata_ = true;
376 }
377 // TODO(ctiller): don't assert
378 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
379 call_.PerformOps(&write_ops_);
380 }
381
382 void Finish(const Status& status, void* tag) {
383 finish_ops_.set_output_tag(tag);
384 if (!ctx_->sent_initial_metadata_) {
385 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
386 ctx_->sent_initial_metadata_ = true;
387 }
388 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
389 call_.PerformOps(&finish_ops_);
390 }
391
392 private:
393 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
394
395 Call call_;
396 ServerContext* ctx_;
397 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
398 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
399 CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
400 };
401
402 /// Server-side interface for asynchronous bi-directional streaming.
403 template <class W, class R>
404 class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
405 public AsyncWriterInterface<W>,
406 public AsyncReaderInterface<R> {
407 public:
408 explicit ServerAsyncReaderWriter(ServerContext* ctx)
409 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
410
411 void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
412 GPR_ASSERT(!ctx_->sent_initial_metadata_);
413
414 meta_ops_.set_output_tag(tag);
415 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
416 ctx_->sent_initial_metadata_ = true;
417 call_.PerformOps(&meta_ops_);
418 }
419
420 void Read(R* msg, void* tag) GRPC_OVERRIDE {
421 read_ops_.set_output_tag(tag);
422 read_ops_.RecvMessage(msg);
423 call_.PerformOps(&read_ops_);
424 }
425
426 void Write(const W& msg, void* tag) GRPC_OVERRIDE {
427 write_ops_.set_output_tag(tag);
428 if (!ctx_->sent_initial_metadata_) {
429 write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
430 ctx_->sent_initial_metadata_ = true;
431 }
432 // TODO(ctiller): don't assert
433 GPR_ASSERT(write_ops_.SendMessage(msg).ok());
434 call_.PerformOps(&write_ops_);
435 }
436
437 void Finish(const Status& status, void* tag) {
438 finish_ops_.set_output_tag(tag);
439 if (!ctx_->sent_initial_metadata_) {
440 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
441 ctx_->sent_initial_metadata_ = true;
442 }
443 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
444 call_.PerformOps(&finish_ops_);
445 }
446
447 private:
448 friend class ::grpc::Server;
449
450 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
451
452 Call call_;
453 ServerContext* ctx_;
454 CallOpSet<CallOpSendInitialMetadata> meta_ops_;
455 CallOpSet<CallOpRecvMessage<R>> read_ops_;
456 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
457 CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
458 };
459
460 } // namespace grpc
461
462 #endif // GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698