OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #ifndef GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H |
| 35 #define GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H |
| 36 |
| 37 #include <grpc++/impl/codegen/channel_interface.h> |
| 38 #include <grpc++/impl/codegen/call.h> |
| 39 #include <grpc++/impl/codegen/service_type.h> |
| 40 #include <grpc++/impl/codegen/server_context.h> |
| 41 #include <grpc++/impl/codegen/status.h> |
| 42 |
| 43 namespace grpc { |
| 44 |
| 45 class CompletionQueue; |
| 46 |
| 47 /// Common interface for all client side asynchronous streaming. |
| 48 class ClientAsyncStreamingInterface { |
| 49 public: |
| 50 virtual ~ClientAsyncStreamingInterface() {} |
| 51 |
| 52 /// Request notification of the reading of the initial metadata. Completion |
| 53 /// will be notified by \a tag on the associated completion queue. |
| 54 /// |
| 55 /// \param[in] tag Tag identifying this request. |
| 56 virtual void ReadInitialMetadata(void* tag) = 0; |
| 57 |
| 58 /// Request notification completion. |
| 59 /// |
| 60 /// \param[out] status To be updated with the operation status. |
| 61 /// \param[in] tag Tag identifying this request. |
| 62 virtual void Finish(Status* status, void* tag) = 0; |
| 63 }; |
| 64 |
| 65 /// An interface that yields a sequence of messages of type \a R. |
| 66 template <class R> |
| 67 class AsyncReaderInterface { |
| 68 public: |
| 69 virtual ~AsyncReaderInterface() {} |
| 70 |
| 71 /// Read a message of type \a R into \a msg. Completion will be notified by \a |
| 72 /// tag on the associated completion queue. |
| 73 /// |
| 74 /// \param[out] msg Where to eventually store the read message. |
| 75 /// \param[in] tag The tag identifying the operation. |
| 76 virtual void Read(R* msg, void* tag) = 0; |
| 77 }; |
| 78 |
| 79 /// An interface that can be fed a sequence of messages of type \a W. |
| 80 template <class W> |
| 81 class AsyncWriterInterface { |
| 82 public: |
| 83 virtual ~AsyncWriterInterface() {} |
| 84 |
| 85 /// Request the writing of \a msg with identifying tag \a tag. |
| 86 /// |
| 87 /// Only one write may be outstanding at any given time. This means that |
| 88 /// after calling Write, one must wait to receive \a tag from the completion |
| 89 /// queue BEFORE calling Write again. |
| 90 /// |
| 91 /// \param[in] msg The message to be written. |
| 92 /// \param[in] tag The tag identifying the operation. |
| 93 virtual void Write(const W& msg, void* tag) = 0; |
| 94 }; |
| 95 |
| 96 template <class R> |
| 97 class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, |
| 98 public AsyncReaderInterface<R> {}; |
| 99 |
| 100 template <class R> |
| 101 class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { |
| 102 public: |
| 103 /// Create a stream and write the first request out. |
| 104 template <class W> |
| 105 ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, |
| 106 const RpcMethod& method, ClientContext* context, |
| 107 const W& request, void* tag) |
| 108 : context_(context), call_(channel->CreateCall(method, context, cq)) { |
| 109 init_ops_.set_output_tag(tag); |
| 110 init_ops_.SendInitialMetadata(context->send_initial_metadata_); |
| 111 // TODO(ctiller): don't assert |
| 112 GPR_ASSERT(init_ops_.SendMessage(request).ok()); |
| 113 init_ops_.ClientSendClose(); |
| 114 call_.PerformOps(&init_ops_); |
| 115 } |
| 116 |
| 117 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { |
| 118 GPR_ASSERT(!context_->initial_metadata_received_); |
| 119 |
| 120 meta_ops_.set_output_tag(tag); |
| 121 meta_ops_.RecvInitialMetadata(context_); |
| 122 call_.PerformOps(&meta_ops_); |
| 123 } |
| 124 |
| 125 void Read(R* msg, void* tag) GRPC_OVERRIDE { |
| 126 read_ops_.set_output_tag(tag); |
| 127 if (!context_->initial_metadata_received_) { |
| 128 read_ops_.RecvInitialMetadata(context_); |
| 129 } |
| 130 read_ops_.RecvMessage(msg); |
| 131 call_.PerformOps(&read_ops_); |
| 132 } |
| 133 |
| 134 void Finish(Status* status, void* tag) GRPC_OVERRIDE { |
| 135 finish_ops_.set_output_tag(tag); |
| 136 if (!context_->initial_metadata_received_) { |
| 137 finish_ops_.RecvInitialMetadata(context_); |
| 138 } |
| 139 finish_ops_.ClientRecvStatus(context_, status); |
| 140 call_.PerformOps(&finish_ops_); |
| 141 } |
| 142 |
| 143 private: |
| 144 ClientContext* context_; |
| 145 Call call_; |
| 146 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> |
| 147 init_ops_; |
| 148 CallOpSet<CallOpRecvInitialMetadata> meta_ops_; |
| 149 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; |
| 150 CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; |
| 151 }; |
| 152 |
| 153 /// Common interface for client side asynchronous writing. |
| 154 template <class W> |
| 155 class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, |
| 156 public AsyncWriterInterface<W> { |
| 157 public: |
| 158 /// Signal the client is done with the writes. |
| 159 /// |
| 160 /// \param[in] tag The tag identifying the operation. |
| 161 virtual void WritesDone(void* tag) = 0; |
| 162 }; |
| 163 |
| 164 template <class W> |
| 165 class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { |
| 166 public: |
| 167 template <class R> |
| 168 ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, |
| 169 const RpcMethod& method, ClientContext* context, |
| 170 R* response, void* tag) |
| 171 : context_(context), call_(channel->CreateCall(method, context, cq)) { |
| 172 finish_ops_.RecvMessage(response); |
| 173 |
| 174 init_ops_.set_output_tag(tag); |
| 175 init_ops_.SendInitialMetadata(context->send_initial_metadata_); |
| 176 call_.PerformOps(&init_ops_); |
| 177 } |
| 178 |
| 179 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { |
| 180 GPR_ASSERT(!context_->initial_metadata_received_); |
| 181 |
| 182 meta_ops_.set_output_tag(tag); |
| 183 meta_ops_.RecvInitialMetadata(context_); |
| 184 call_.PerformOps(&meta_ops_); |
| 185 } |
| 186 |
| 187 void Write(const W& msg, void* tag) GRPC_OVERRIDE { |
| 188 write_ops_.set_output_tag(tag); |
| 189 // TODO(ctiller): don't assert |
| 190 GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
| 191 call_.PerformOps(&write_ops_); |
| 192 } |
| 193 |
| 194 void WritesDone(void* tag) GRPC_OVERRIDE { |
| 195 writes_done_ops_.set_output_tag(tag); |
| 196 writes_done_ops_.ClientSendClose(); |
| 197 call_.PerformOps(&writes_done_ops_); |
| 198 } |
| 199 |
| 200 void Finish(Status* status, void* tag) GRPC_OVERRIDE { |
| 201 finish_ops_.set_output_tag(tag); |
| 202 if (!context_->initial_metadata_received_) { |
| 203 finish_ops_.RecvInitialMetadata(context_); |
| 204 } |
| 205 finish_ops_.ClientRecvStatus(context_, status); |
| 206 call_.PerformOps(&finish_ops_); |
| 207 } |
| 208 |
| 209 private: |
| 210 ClientContext* context_; |
| 211 Call call_; |
| 212 CallOpSet<CallOpSendInitialMetadata> init_ops_; |
| 213 CallOpSet<CallOpRecvInitialMetadata> meta_ops_; |
| 214 CallOpSet<CallOpSendMessage> write_ops_; |
| 215 CallOpSet<CallOpClientSendClose> writes_done_ops_; |
| 216 CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, |
| 217 CallOpClientRecvStatus> finish_ops_; |
| 218 }; |
| 219 |
| 220 /// Client-side interface for asynchronous bi-directional streaming. |
| 221 template <class W, class R> |
| 222 class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface, |
| 223 public AsyncWriterInterface<W>, |
| 224 public AsyncReaderInterface<R> { |
| 225 public: |
| 226 /// Signal the client is done with the writes. |
| 227 /// |
| 228 /// \param[in] tag The tag identifying the operation. |
| 229 virtual void WritesDone(void* tag) = 0; |
| 230 }; |
| 231 |
| 232 template <class W, class R> |
| 233 class ClientAsyncReaderWriter GRPC_FINAL |
| 234 : public ClientAsyncReaderWriterInterface<W, R> { |
| 235 public: |
| 236 ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq, |
| 237 const RpcMethod& method, ClientContext* context, |
| 238 void* tag) |
| 239 : context_(context), call_(channel->CreateCall(method, context, cq)) { |
| 240 init_ops_.set_output_tag(tag); |
| 241 init_ops_.SendInitialMetadata(context->send_initial_metadata_); |
| 242 call_.PerformOps(&init_ops_); |
| 243 } |
| 244 |
| 245 void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { |
| 246 GPR_ASSERT(!context_->initial_metadata_received_); |
| 247 |
| 248 meta_ops_.set_output_tag(tag); |
| 249 meta_ops_.RecvInitialMetadata(context_); |
| 250 call_.PerformOps(&meta_ops_); |
| 251 } |
| 252 |
| 253 void Read(R* msg, void* tag) GRPC_OVERRIDE { |
| 254 read_ops_.set_output_tag(tag); |
| 255 if (!context_->initial_metadata_received_) { |
| 256 read_ops_.RecvInitialMetadata(context_); |
| 257 } |
| 258 read_ops_.RecvMessage(msg); |
| 259 call_.PerformOps(&read_ops_); |
| 260 } |
| 261 |
| 262 void Write(const W& msg, void* tag) GRPC_OVERRIDE { |
| 263 write_ops_.set_output_tag(tag); |
| 264 // TODO(ctiller): don't assert |
| 265 GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
| 266 call_.PerformOps(&write_ops_); |
| 267 } |
| 268 |
| 269 void WritesDone(void* tag) GRPC_OVERRIDE { |
| 270 writes_done_ops_.set_output_tag(tag); |
| 271 writes_done_ops_.ClientSendClose(); |
| 272 call_.PerformOps(&writes_done_ops_); |
| 273 } |
| 274 |
| 275 void Finish(Status* status, void* tag) GRPC_OVERRIDE { |
| 276 finish_ops_.set_output_tag(tag); |
| 277 if (!context_->initial_metadata_received_) { |
| 278 finish_ops_.RecvInitialMetadata(context_); |
| 279 } |
| 280 finish_ops_.ClientRecvStatus(context_, status); |
| 281 call_.PerformOps(&finish_ops_); |
| 282 } |
| 283 |
| 284 private: |
| 285 ClientContext* context_; |
| 286 Call call_; |
| 287 CallOpSet<CallOpSendInitialMetadata> init_ops_; |
| 288 CallOpSet<CallOpRecvInitialMetadata> meta_ops_; |
| 289 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; |
| 290 CallOpSet<CallOpSendMessage> write_ops_; |
| 291 CallOpSet<CallOpClientSendClose> writes_done_ops_; |
| 292 CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; |
| 293 }; |
| 294 |
| 295 template <class W, class R> |
| 296 class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, |
| 297 public AsyncReaderInterface<R> { |
| 298 public: |
| 299 explicit ServerAsyncReader(ServerContext* ctx) |
| 300 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} |
| 301 |
| 302 void SendInitialMetadata(void* tag) GRPC_OVERRIDE { |
| 303 GPR_ASSERT(!ctx_->sent_initial_metadata_); |
| 304 |
| 305 meta_ops_.set_output_tag(tag); |
| 306 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 307 ctx_->sent_initial_metadata_ = true; |
| 308 call_.PerformOps(&meta_ops_); |
| 309 } |
| 310 |
| 311 void Read(R* msg, void* tag) GRPC_OVERRIDE { |
| 312 read_ops_.set_output_tag(tag); |
| 313 read_ops_.RecvMessage(msg); |
| 314 call_.PerformOps(&read_ops_); |
| 315 } |
| 316 |
| 317 void Finish(const W& msg, const Status& status, void* tag) { |
| 318 finish_ops_.set_output_tag(tag); |
| 319 if (!ctx_->sent_initial_metadata_) { |
| 320 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 321 ctx_->sent_initial_metadata_ = true; |
| 322 } |
| 323 // The response is dropped if the status is not OK. |
| 324 if (status.ok()) { |
| 325 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, |
| 326 finish_ops_.SendMessage(msg)); |
| 327 } else { |
| 328 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); |
| 329 } |
| 330 call_.PerformOps(&finish_ops_); |
| 331 } |
| 332 |
| 333 void FinishWithError(const Status& status, void* tag) { |
| 334 GPR_ASSERT(!status.ok()); |
| 335 finish_ops_.set_output_tag(tag); |
| 336 if (!ctx_->sent_initial_metadata_) { |
| 337 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 338 ctx_->sent_initial_metadata_ = true; |
| 339 } |
| 340 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); |
| 341 call_.PerformOps(&finish_ops_); |
| 342 } |
| 343 |
| 344 private: |
| 345 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } |
| 346 |
| 347 Call call_; |
| 348 ServerContext* ctx_; |
| 349 CallOpSet<CallOpSendInitialMetadata> meta_ops_; |
| 350 CallOpSet<CallOpRecvMessage<R>> read_ops_; |
| 351 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, |
| 352 CallOpServerSendStatus> finish_ops_; |
| 353 }; |
| 354 |
| 355 template <class W> |
| 356 class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, |
| 357 public AsyncWriterInterface<W> { |
| 358 public: |
| 359 explicit ServerAsyncWriter(ServerContext* ctx) |
| 360 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} |
| 361 |
| 362 void SendInitialMetadata(void* tag) GRPC_OVERRIDE { |
| 363 GPR_ASSERT(!ctx_->sent_initial_metadata_); |
| 364 |
| 365 meta_ops_.set_output_tag(tag); |
| 366 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 367 ctx_->sent_initial_metadata_ = true; |
| 368 call_.PerformOps(&meta_ops_); |
| 369 } |
| 370 |
| 371 void Write(const W& msg, void* tag) GRPC_OVERRIDE { |
| 372 write_ops_.set_output_tag(tag); |
| 373 if (!ctx_->sent_initial_metadata_) { |
| 374 write_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 375 ctx_->sent_initial_metadata_ = true; |
| 376 } |
| 377 // TODO(ctiller): don't assert |
| 378 GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
| 379 call_.PerformOps(&write_ops_); |
| 380 } |
| 381 |
| 382 void Finish(const Status& status, void* tag) { |
| 383 finish_ops_.set_output_tag(tag); |
| 384 if (!ctx_->sent_initial_metadata_) { |
| 385 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 386 ctx_->sent_initial_metadata_ = true; |
| 387 } |
| 388 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); |
| 389 call_.PerformOps(&finish_ops_); |
| 390 } |
| 391 |
| 392 private: |
| 393 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } |
| 394 |
| 395 Call call_; |
| 396 ServerContext* ctx_; |
| 397 CallOpSet<CallOpSendInitialMetadata> meta_ops_; |
| 398 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; |
| 399 CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; |
| 400 }; |
| 401 |
| 402 /// Server-side interface for asynchronous bi-directional streaming. |
| 403 template <class W, class R> |
| 404 class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, |
| 405 public AsyncWriterInterface<W>, |
| 406 public AsyncReaderInterface<R> { |
| 407 public: |
| 408 explicit ServerAsyncReaderWriter(ServerContext* ctx) |
| 409 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} |
| 410 |
| 411 void SendInitialMetadata(void* tag) GRPC_OVERRIDE { |
| 412 GPR_ASSERT(!ctx_->sent_initial_metadata_); |
| 413 |
| 414 meta_ops_.set_output_tag(tag); |
| 415 meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 416 ctx_->sent_initial_metadata_ = true; |
| 417 call_.PerformOps(&meta_ops_); |
| 418 } |
| 419 |
| 420 void Read(R* msg, void* tag) GRPC_OVERRIDE { |
| 421 read_ops_.set_output_tag(tag); |
| 422 read_ops_.RecvMessage(msg); |
| 423 call_.PerformOps(&read_ops_); |
| 424 } |
| 425 |
| 426 void Write(const W& msg, void* tag) GRPC_OVERRIDE { |
| 427 write_ops_.set_output_tag(tag); |
| 428 if (!ctx_->sent_initial_metadata_) { |
| 429 write_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 430 ctx_->sent_initial_metadata_ = true; |
| 431 } |
| 432 // TODO(ctiller): don't assert |
| 433 GPR_ASSERT(write_ops_.SendMessage(msg).ok()); |
| 434 call_.PerformOps(&write_ops_); |
| 435 } |
| 436 |
| 437 void Finish(const Status& status, void* tag) { |
| 438 finish_ops_.set_output_tag(tag); |
| 439 if (!ctx_->sent_initial_metadata_) { |
| 440 finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); |
| 441 ctx_->sent_initial_metadata_ = true; |
| 442 } |
| 443 finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); |
| 444 call_.PerformOps(&finish_ops_); |
| 445 } |
| 446 |
| 447 private: |
| 448 friend class ::grpc::Server; |
| 449 |
| 450 void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; } |
| 451 |
| 452 Call call_; |
| 453 ServerContext* ctx_; |
| 454 CallOpSet<CallOpSendInitialMetadata> meta_ops_; |
| 455 CallOpSet<CallOpRecvMessage<R>> read_ops_; |
| 456 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; |
| 457 CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; |
| 458 }; |
| 459 |
| 460 } // namespace grpc |
| 461 |
| 462 #endif // GRPCXX_IMPL_CODEGEN_ASYNC_STREAM_H |
OLD | NEW |