Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(261)

Side by Side Diff: third_party/grpc/include/grpc++/impl/codegen/sync_stream.h

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015-2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
35 #define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
36
37 #include <grpc++/impl/codegen/call.h>
38 #include <grpc++/impl/codegen/channel_interface.h>
39 #include <grpc++/impl/codegen/client_context.h>
40 #include <grpc++/impl/codegen/completion_queue.h>
41 #include <grpc++/impl/codegen/server_context.h>
42 #include <grpc++/impl/codegen/service_type.h>
43 #include <grpc++/impl/codegen/status.h>
44 #include <grpc/impl/codegen/log.h>
45
46 namespace grpc {
47
48 /// Common interface for all synchronous client side streaming.
49 class ClientStreamingInterface {
50 public:
51 virtual ~ClientStreamingInterface() {}
52
53 /// Wait until the stream finishes, and return the final status. When the
54 /// client side declares it has no more message to send, either implicitly or
55 /// by calling \a WritesDone(), it needs to make sure there is no more message
56 /// to be received from the server, either implicitly or by getting a false
57 /// from a \a Read().
58 ///
59 /// This function will return either:
60 /// - when all incoming messages have been read and the server has returned
61 /// status.
62 /// - OR when the server has returned a non-OK status.
63 virtual Status Finish() = 0;
64 };
65
66 /// An interface that yields a sequence of messages of type \a R.
67 template <class R>
68 class ReaderInterface {
69 public:
70 virtual ~ReaderInterface() {}
71
72 /// Blocking read a message and parse to \a msg. Returns \a true on success.
73 ///
74 /// \param[out] msg The read message.
75 ///
76 /// \return \a false when there will be no more incoming messages, either
77 /// because the other side has called \a WritesDone() or the stream has failed
78 /// (or been cancelled).
79 virtual bool Read(R* msg) = 0;
80 };
81
82 /// An interface that can be fed a sequence of messages of type \a W.
83 template <class W>
84 class WriterInterface {
85 public:
86 virtual ~WriterInterface() {}
87
88 /// Blocking write \a msg to the stream with options.
89 ///
90 /// \param msg The message to be written to the stream.
91 /// \param options Options affecting the write operation.
92 ///
93 /// \return \a true on success, \a false when the stream has been closed.
94 virtual bool Write(const W& msg, const WriteOptions& options) = 0;
95
96 /// Blocking write \a msg to the stream with default options.
97 ///
98 /// \param msg The message to be written to the stream.
99 ///
100 /// \return \a true on success, \a false when the stream has been closed.
101 inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
102 };
103
104 /// Client-side interface for streaming reads of message of type \a R.
105 template <class R>
106 class ClientReaderInterface : public ClientStreamingInterface,
107 public ReaderInterface<R> {
108 public:
109 /// Blocking wait for initial metadata from server. The received metadata
110 /// can only be accessed after this call returns. Should only be called before
111 /// the first read. Calling this method is optional, and if it is not called
112 /// the metadata will be available in ClientContext after the first read.
113 virtual void WaitForInitialMetadata() = 0;
114 };
115
116 template <class R>
117 class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
118 public:
119 /// Blocking create a stream and write the first request out.
120 template <class W>
121 ClientReader(ChannelInterface* channel, const RpcMethod& method,
122 ClientContext* context, const W& request)
123 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
124 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
125 CallOpClientSendClose> ops;
126 ops.SendInitialMetadata(context->send_initial_metadata_);
127 // TODO(ctiller): don't assert
128 GPR_ASSERT(ops.SendMessage(request).ok());
129 ops.ClientSendClose();
130 call_.PerformOps(&ops);
131 cq_.Pluck(&ops);
132 }
133
134 void WaitForInitialMetadata() GRPC_OVERRIDE {
135 GPR_ASSERT(!context_->initial_metadata_received_);
136
137 CallOpSet<CallOpRecvInitialMetadata> ops;
138 ops.RecvInitialMetadata(context_);
139 call_.PerformOps(&ops);
140 cq_.Pluck(&ops); /// status ignored
141 }
142
143 bool Read(R* msg) GRPC_OVERRIDE {
144 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
145 if (!context_->initial_metadata_received_) {
146 ops.RecvInitialMetadata(context_);
147 }
148 ops.RecvMessage(msg);
149 call_.PerformOps(&ops);
150 return cq_.Pluck(&ops) && ops.got_message;
151 }
152
153 Status Finish() GRPC_OVERRIDE {
154 CallOpSet<CallOpClientRecvStatus> ops;
155 Status status;
156 ops.ClientRecvStatus(context_, &status);
157 call_.PerformOps(&ops);
158 GPR_ASSERT(cq_.Pluck(&ops));
159 return status;
160 }
161
162 private:
163 ClientContext* context_;
164 CompletionQueue cq_;
165 Call call_;
166 };
167
168 /// Client-side interface for streaming writes of message of type \a W.
169 template <class W>
170 class ClientWriterInterface : public ClientStreamingInterface,
171 public WriterInterface<W> {
172 public:
173 /// Half close writing from the client.
174 /// Block until writes are completed.
175 ///
176 /// \return Whether the writes were successful.
177 virtual bool WritesDone() = 0;
178 };
179
180 template <class W>
181 class ClientWriter : public ClientWriterInterface<W> {
182 public:
183 /// Blocking create a stream.
184 template <class R>
185 ClientWriter(ChannelInterface* channel, const RpcMethod& method,
186 ClientContext* context, R* response)
187 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
188 finish_ops_.RecvMessage(response);
189
190 CallOpSet<CallOpSendInitialMetadata> ops;
191 ops.SendInitialMetadata(context->send_initial_metadata_);
192 call_.PerformOps(&ops);
193 cq_.Pluck(&ops);
194 }
195
196 void WaitForInitialMetadata() {
197 GPR_ASSERT(!context_->initial_metadata_received_);
198
199 CallOpSet<CallOpRecvInitialMetadata> ops;
200 ops.RecvInitialMetadata(context_);
201 call_.PerformOps(&ops);
202 cq_.Pluck(&ops); // status ignored
203 }
204
205 using WriterInterface<W>::Write;
206 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
207 CallOpSet<CallOpSendMessage> ops;
208 if (!ops.SendMessage(msg, options).ok()) {
209 return false;
210 }
211 call_.PerformOps(&ops);
212 return cq_.Pluck(&ops);
213 }
214
215 bool WritesDone() GRPC_OVERRIDE {
216 CallOpSet<CallOpClientSendClose> ops;
217 ops.ClientSendClose();
218 call_.PerformOps(&ops);
219 return cq_.Pluck(&ops);
220 }
221
222 /// Read the final response and wait for the final status.
223 Status Finish() GRPC_OVERRIDE {
224 Status status;
225 if (!context_->initial_metadata_received_) {
226 finish_ops_.RecvInitialMetadata(context_);
227 }
228 finish_ops_.ClientRecvStatus(context_, &status);
229 call_.PerformOps(&finish_ops_);
230 GPR_ASSERT(cq_.Pluck(&finish_ops_));
231 return status;
232 }
233
234 private:
235 ClientContext* context_;
236 CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
237 CallOpClientRecvStatus> finish_ops_;
238 CompletionQueue cq_;
239 Call call_;
240 };
241
242 /// Client-side interface for bi-directional streaming.
243 template <class W, class R>
244 class ClientReaderWriterInterface : public ClientStreamingInterface,
245 public WriterInterface<W>,
246 public ReaderInterface<R> {
247 public:
248 /// Blocking wait for initial metadata from server. The received metadata
249 /// can only be accessed after this call returns. Should only be called before
250 /// the first read. Calling this method is optional, and if it is not called
251 /// the metadata will be available in ClientContext after the first read.
252 virtual void WaitForInitialMetadata() = 0;
253
254 /// Block until writes are completed.
255 ///
256 /// \return Whether the writes were successful.
257 virtual bool WritesDone() = 0;
258 };
259
260 template <class W, class R>
261 class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
262 public:
263 /// Blocking create a stream.
264 ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
265 ClientContext* context)
266 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
267 CallOpSet<CallOpSendInitialMetadata> ops;
268 ops.SendInitialMetadata(context->send_initial_metadata_);
269 call_.PerformOps(&ops);
270 cq_.Pluck(&ops);
271 }
272
273 void WaitForInitialMetadata() GRPC_OVERRIDE {
274 GPR_ASSERT(!context_->initial_metadata_received_);
275
276 CallOpSet<CallOpRecvInitialMetadata> ops;
277 ops.RecvInitialMetadata(context_);
278 call_.PerformOps(&ops);
279 cq_.Pluck(&ops); // status ignored
280 }
281
282 bool Read(R* msg) GRPC_OVERRIDE {
283 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
284 if (!context_->initial_metadata_received_) {
285 ops.RecvInitialMetadata(context_);
286 }
287 ops.RecvMessage(msg);
288 call_.PerformOps(&ops);
289 return cq_.Pluck(&ops) && ops.got_message;
290 }
291
292 using WriterInterface<W>::Write;
293 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
294 CallOpSet<CallOpSendMessage> ops;
295 if (!ops.SendMessage(msg, options).ok()) return false;
296 call_.PerformOps(&ops);
297 return cq_.Pluck(&ops);
298 }
299
300 bool WritesDone() GRPC_OVERRIDE {
301 CallOpSet<CallOpClientSendClose> ops;
302 ops.ClientSendClose();
303 call_.PerformOps(&ops);
304 return cq_.Pluck(&ops);
305 }
306
307 Status Finish() GRPC_OVERRIDE {
308 CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops;
309 if (!context_->initial_metadata_received_) {
310 ops.RecvInitialMetadata(context_);
311 }
312 Status status;
313 ops.ClientRecvStatus(context_, &status);
314 call_.PerformOps(&ops);
315 GPR_ASSERT(cq_.Pluck(&ops));
316 return status;
317 }
318
319 private:
320 ClientContext* context_;
321 CompletionQueue cq_;
322 Call call_;
323 };
324
325 template <class R>
326 class ServerReader GRPC_FINAL : public ReaderInterface<R> {
327 public:
328 ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
329
330 void SendInitialMetadata() {
331 GPR_ASSERT(!ctx_->sent_initial_metadata_);
332
333 CallOpSet<CallOpSendInitialMetadata> ops;
334 ops.SendInitialMetadata(ctx_->initial_metadata_);
335 ctx_->sent_initial_metadata_ = true;
336 call_->PerformOps(&ops);
337 call_->cq()->Pluck(&ops);
338 }
339
340 bool Read(R* msg) GRPC_OVERRIDE {
341 CallOpSet<CallOpRecvMessage<R>> ops;
342 ops.RecvMessage(msg);
343 call_->PerformOps(&ops);
344 return call_->cq()->Pluck(&ops) && ops.got_message;
345 }
346
347 private:
348 Call* const call_;
349 ServerContext* const ctx_;
350 };
351
352 template <class W>
353 class ServerWriter GRPC_FINAL : public WriterInterface<W> {
354 public:
355 ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
356
357 void SendInitialMetadata() {
358 GPR_ASSERT(!ctx_->sent_initial_metadata_);
359
360 CallOpSet<CallOpSendInitialMetadata> ops;
361 ops.SendInitialMetadata(ctx_->initial_metadata_);
362 ctx_->sent_initial_metadata_ = true;
363 call_->PerformOps(&ops);
364 call_->cq()->Pluck(&ops);
365 }
366
367 using WriterInterface<W>::Write;
368 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
369 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
370 if (!ops.SendMessage(msg, options).ok()) {
371 return false;
372 }
373 if (!ctx_->sent_initial_metadata_) {
374 ops.SendInitialMetadata(ctx_->initial_metadata_);
375 ctx_->sent_initial_metadata_ = true;
376 }
377 call_->PerformOps(&ops);
378 return call_->cq()->Pluck(&ops);
379 }
380
381 private:
382 Call* const call_;
383 ServerContext* const ctx_;
384 };
385
386 /// Server-side interface for bi-directional streaming.
387 template <class W, class R>
388 class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
389 public ReaderInterface<R> {
390 public:
391 ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
392
393 void SendInitialMetadata() {
394 GPR_ASSERT(!ctx_->sent_initial_metadata_);
395
396 CallOpSet<CallOpSendInitialMetadata> ops;
397 ops.SendInitialMetadata(ctx_->initial_metadata_);
398 ctx_->sent_initial_metadata_ = true;
399 call_->PerformOps(&ops);
400 call_->cq()->Pluck(&ops);
401 }
402
403 bool Read(R* msg) GRPC_OVERRIDE {
404 CallOpSet<CallOpRecvMessage<R>> ops;
405 ops.RecvMessage(msg);
406 call_->PerformOps(&ops);
407 return call_->cq()->Pluck(&ops) && ops.got_message;
408 }
409
410 using WriterInterface<W>::Write;
411 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
412 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
413 if (!ops.SendMessage(msg, options).ok()) {
414 return false;
415 }
416 if (!ctx_->sent_initial_metadata_) {
417 ops.SendInitialMetadata(ctx_->initial_metadata_);
418 ctx_->sent_initial_metadata_ = true;
419 }
420 call_->PerformOps(&ops);
421 return call_->cq()->Pluck(&ops);
422 }
423
424 private:
425 Call* const call_;
426 ServerContext* const ctx_;
427 };
428
429 } // namespace grpc
430
431 #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H
OLDNEW
« no previous file with comments | « third_party/grpc/include/grpc++/impl/codegen/sync_no_cxx11.h ('k') | third_party/grpc/include/grpc++/impl/codegen/time.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698