OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015-2016, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #ifndef GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H |
| 35 #define GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H |
| 36 |
| 37 #include <grpc++/impl/codegen/call.h> |
| 38 #include <grpc++/impl/codegen/channel_interface.h> |
| 39 #include <grpc++/impl/codegen/client_context.h> |
| 40 #include <grpc++/impl/codegen/completion_queue.h> |
| 41 #include <grpc++/impl/codegen/server_context.h> |
| 42 #include <grpc++/impl/codegen/service_type.h> |
| 43 #include <grpc++/impl/codegen/status.h> |
| 44 #include <grpc/impl/codegen/log.h> |
| 45 |
| 46 namespace grpc { |
| 47 |
| 48 /// Common interface for all synchronous client side streaming. |
| 49 class ClientStreamingInterface { |
| 50 public: |
| 51 virtual ~ClientStreamingInterface() {} |
| 52 |
| 53 /// Wait until the stream finishes, and return the final status. When the |
| 54 /// client side declares it has no more message to send, either implicitly or |
| 55 /// by calling \a WritesDone(), it needs to make sure there is no more message |
| 56 /// to be received from the server, either implicitly or by getting a false |
| 57 /// from a \a Read(). |
| 58 /// |
| 59 /// This function will return either: |
| 60 /// - when all incoming messages have been read and the server has returned |
| 61 /// status. |
| 62 /// - OR when the server has returned a non-OK status. |
| 63 virtual Status Finish() = 0; |
| 64 }; |
| 65 |
| 66 /// An interface that yields a sequence of messages of type \a R. |
| 67 template <class R> |
| 68 class ReaderInterface { |
| 69 public: |
| 70 virtual ~ReaderInterface() {} |
| 71 |
| 72 /// Blocking read a message and parse to \a msg. Returns \a true on success. |
| 73 /// |
| 74 /// \param[out] msg The read message. |
| 75 /// |
| 76 /// \return \a false when there will be no more incoming messages, either |
| 77 /// because the other side has called \a WritesDone() or the stream has failed |
| 78 /// (or been cancelled). |
| 79 virtual bool Read(R* msg) = 0; |
| 80 }; |
| 81 |
| 82 /// An interface that can be fed a sequence of messages of type \a W. |
| 83 template <class W> |
| 84 class WriterInterface { |
| 85 public: |
| 86 virtual ~WriterInterface() {} |
| 87 |
| 88 /// Blocking write \a msg to the stream with options. |
| 89 /// |
| 90 /// \param msg The message to be written to the stream. |
| 91 /// \param options Options affecting the write operation. |
| 92 /// |
| 93 /// \return \a true on success, \a false when the stream has been closed. |
| 94 virtual bool Write(const W& msg, const WriteOptions& options) = 0; |
| 95 |
| 96 /// Blocking write \a msg to the stream with default options. |
| 97 /// |
| 98 /// \param msg The message to be written to the stream. |
| 99 /// |
| 100 /// \return \a true on success, \a false when the stream has been closed. |
| 101 inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } |
| 102 }; |
| 103 |
| 104 /// Client-side interface for streaming reads of message of type \a R. |
| 105 template <class R> |
| 106 class ClientReaderInterface : public ClientStreamingInterface, |
| 107 public ReaderInterface<R> { |
| 108 public: |
| 109 /// Blocking wait for initial metadata from server. The received metadata |
| 110 /// can only be accessed after this call returns. Should only be called before |
| 111 /// the first read. Calling this method is optional, and if it is not called |
| 112 /// the metadata will be available in ClientContext after the first read. |
| 113 virtual void WaitForInitialMetadata() = 0; |
| 114 }; |
| 115 |
| 116 template <class R> |
| 117 class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { |
| 118 public: |
| 119 /// Blocking create a stream and write the first request out. |
| 120 template <class W> |
| 121 ClientReader(ChannelInterface* channel, const RpcMethod& method, |
| 122 ClientContext* context, const W& request) |
| 123 : context_(context), call_(channel->CreateCall(method, context, &cq_)) { |
| 124 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, |
| 125 CallOpClientSendClose> ops; |
| 126 ops.SendInitialMetadata(context->send_initial_metadata_); |
| 127 // TODO(ctiller): don't assert |
| 128 GPR_ASSERT(ops.SendMessage(request).ok()); |
| 129 ops.ClientSendClose(); |
| 130 call_.PerformOps(&ops); |
| 131 cq_.Pluck(&ops); |
| 132 } |
| 133 |
| 134 void WaitForInitialMetadata() GRPC_OVERRIDE { |
| 135 GPR_ASSERT(!context_->initial_metadata_received_); |
| 136 |
| 137 CallOpSet<CallOpRecvInitialMetadata> ops; |
| 138 ops.RecvInitialMetadata(context_); |
| 139 call_.PerformOps(&ops); |
| 140 cq_.Pluck(&ops); /// status ignored |
| 141 } |
| 142 |
| 143 bool Read(R* msg) GRPC_OVERRIDE { |
| 144 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; |
| 145 if (!context_->initial_metadata_received_) { |
| 146 ops.RecvInitialMetadata(context_); |
| 147 } |
| 148 ops.RecvMessage(msg); |
| 149 call_.PerformOps(&ops); |
| 150 return cq_.Pluck(&ops) && ops.got_message; |
| 151 } |
| 152 |
| 153 Status Finish() GRPC_OVERRIDE { |
| 154 CallOpSet<CallOpClientRecvStatus> ops; |
| 155 Status status; |
| 156 ops.ClientRecvStatus(context_, &status); |
| 157 call_.PerformOps(&ops); |
| 158 GPR_ASSERT(cq_.Pluck(&ops)); |
| 159 return status; |
| 160 } |
| 161 |
| 162 private: |
| 163 ClientContext* context_; |
| 164 CompletionQueue cq_; |
| 165 Call call_; |
| 166 }; |
| 167 |
| 168 /// Client-side interface for streaming writes of message of type \a W. |
| 169 template <class W> |
| 170 class ClientWriterInterface : public ClientStreamingInterface, |
| 171 public WriterInterface<W> { |
| 172 public: |
| 173 /// Half close writing from the client. |
| 174 /// Block until writes are completed. |
| 175 /// |
| 176 /// \return Whether the writes were successful. |
| 177 virtual bool WritesDone() = 0; |
| 178 }; |
| 179 |
| 180 template <class W> |
| 181 class ClientWriter : public ClientWriterInterface<W> { |
| 182 public: |
| 183 /// Blocking create a stream. |
| 184 template <class R> |
| 185 ClientWriter(ChannelInterface* channel, const RpcMethod& method, |
| 186 ClientContext* context, R* response) |
| 187 : context_(context), call_(channel->CreateCall(method, context, &cq_)) { |
| 188 finish_ops_.RecvMessage(response); |
| 189 |
| 190 CallOpSet<CallOpSendInitialMetadata> ops; |
| 191 ops.SendInitialMetadata(context->send_initial_metadata_); |
| 192 call_.PerformOps(&ops); |
| 193 cq_.Pluck(&ops); |
| 194 } |
| 195 |
| 196 void WaitForInitialMetadata() { |
| 197 GPR_ASSERT(!context_->initial_metadata_received_); |
| 198 |
| 199 CallOpSet<CallOpRecvInitialMetadata> ops; |
| 200 ops.RecvInitialMetadata(context_); |
| 201 call_.PerformOps(&ops); |
| 202 cq_.Pluck(&ops); // status ignored |
| 203 } |
| 204 |
| 205 using WriterInterface<W>::Write; |
| 206 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { |
| 207 CallOpSet<CallOpSendMessage> ops; |
| 208 if (!ops.SendMessage(msg, options).ok()) { |
| 209 return false; |
| 210 } |
| 211 call_.PerformOps(&ops); |
| 212 return cq_.Pluck(&ops); |
| 213 } |
| 214 |
| 215 bool WritesDone() GRPC_OVERRIDE { |
| 216 CallOpSet<CallOpClientSendClose> ops; |
| 217 ops.ClientSendClose(); |
| 218 call_.PerformOps(&ops); |
| 219 return cq_.Pluck(&ops); |
| 220 } |
| 221 |
| 222 /// Read the final response and wait for the final status. |
| 223 Status Finish() GRPC_OVERRIDE { |
| 224 Status status; |
| 225 if (!context_->initial_metadata_received_) { |
| 226 finish_ops_.RecvInitialMetadata(context_); |
| 227 } |
| 228 finish_ops_.ClientRecvStatus(context_, &status); |
| 229 call_.PerformOps(&finish_ops_); |
| 230 GPR_ASSERT(cq_.Pluck(&finish_ops_)); |
| 231 return status; |
| 232 } |
| 233 |
| 234 private: |
| 235 ClientContext* context_; |
| 236 CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, |
| 237 CallOpClientRecvStatus> finish_ops_; |
| 238 CompletionQueue cq_; |
| 239 Call call_; |
| 240 }; |
| 241 |
| 242 /// Client-side interface for bi-directional streaming. |
| 243 template <class W, class R> |
| 244 class ClientReaderWriterInterface : public ClientStreamingInterface, |
| 245 public WriterInterface<W>, |
| 246 public ReaderInterface<R> { |
| 247 public: |
| 248 /// Blocking wait for initial metadata from server. The received metadata |
| 249 /// can only be accessed after this call returns. Should only be called before |
| 250 /// the first read. Calling this method is optional, and if it is not called |
| 251 /// the metadata will be available in ClientContext after the first read. |
| 252 virtual void WaitForInitialMetadata() = 0; |
| 253 |
| 254 /// Block until writes are completed. |
| 255 /// |
| 256 /// \return Whether the writes were successful. |
| 257 virtual bool WritesDone() = 0; |
| 258 }; |
| 259 |
| 260 template <class W, class R> |
| 261 class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { |
| 262 public: |
| 263 /// Blocking create a stream. |
| 264 ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, |
| 265 ClientContext* context) |
| 266 : context_(context), call_(channel->CreateCall(method, context, &cq_)) { |
| 267 CallOpSet<CallOpSendInitialMetadata> ops; |
| 268 ops.SendInitialMetadata(context->send_initial_metadata_); |
| 269 call_.PerformOps(&ops); |
| 270 cq_.Pluck(&ops); |
| 271 } |
| 272 |
| 273 void WaitForInitialMetadata() GRPC_OVERRIDE { |
| 274 GPR_ASSERT(!context_->initial_metadata_received_); |
| 275 |
| 276 CallOpSet<CallOpRecvInitialMetadata> ops; |
| 277 ops.RecvInitialMetadata(context_); |
| 278 call_.PerformOps(&ops); |
| 279 cq_.Pluck(&ops); // status ignored |
| 280 } |
| 281 |
| 282 bool Read(R* msg) GRPC_OVERRIDE { |
| 283 CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; |
| 284 if (!context_->initial_metadata_received_) { |
| 285 ops.RecvInitialMetadata(context_); |
| 286 } |
| 287 ops.RecvMessage(msg); |
| 288 call_.PerformOps(&ops); |
| 289 return cq_.Pluck(&ops) && ops.got_message; |
| 290 } |
| 291 |
| 292 using WriterInterface<W>::Write; |
| 293 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { |
| 294 CallOpSet<CallOpSendMessage> ops; |
| 295 if (!ops.SendMessage(msg, options).ok()) return false; |
| 296 call_.PerformOps(&ops); |
| 297 return cq_.Pluck(&ops); |
| 298 } |
| 299 |
| 300 bool WritesDone() GRPC_OVERRIDE { |
| 301 CallOpSet<CallOpClientSendClose> ops; |
| 302 ops.ClientSendClose(); |
| 303 call_.PerformOps(&ops); |
| 304 return cq_.Pluck(&ops); |
| 305 } |
| 306 |
| 307 Status Finish() GRPC_OVERRIDE { |
| 308 CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops; |
| 309 if (!context_->initial_metadata_received_) { |
| 310 ops.RecvInitialMetadata(context_); |
| 311 } |
| 312 Status status; |
| 313 ops.ClientRecvStatus(context_, &status); |
| 314 call_.PerformOps(&ops); |
| 315 GPR_ASSERT(cq_.Pluck(&ops)); |
| 316 return status; |
| 317 } |
| 318 |
| 319 private: |
| 320 ClientContext* context_; |
| 321 CompletionQueue cq_; |
| 322 Call call_; |
| 323 }; |
| 324 |
| 325 template <class R> |
| 326 class ServerReader GRPC_FINAL : public ReaderInterface<R> { |
| 327 public: |
| 328 ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} |
| 329 |
| 330 void SendInitialMetadata() { |
| 331 GPR_ASSERT(!ctx_->sent_initial_metadata_); |
| 332 |
| 333 CallOpSet<CallOpSendInitialMetadata> ops; |
| 334 ops.SendInitialMetadata(ctx_->initial_metadata_); |
| 335 ctx_->sent_initial_metadata_ = true; |
| 336 call_->PerformOps(&ops); |
| 337 call_->cq()->Pluck(&ops); |
| 338 } |
| 339 |
| 340 bool Read(R* msg) GRPC_OVERRIDE { |
| 341 CallOpSet<CallOpRecvMessage<R>> ops; |
| 342 ops.RecvMessage(msg); |
| 343 call_->PerformOps(&ops); |
| 344 return call_->cq()->Pluck(&ops) && ops.got_message; |
| 345 } |
| 346 |
| 347 private: |
| 348 Call* const call_; |
| 349 ServerContext* const ctx_; |
| 350 }; |
| 351 |
| 352 template <class W> |
| 353 class ServerWriter GRPC_FINAL : public WriterInterface<W> { |
| 354 public: |
| 355 ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} |
| 356 |
| 357 void SendInitialMetadata() { |
| 358 GPR_ASSERT(!ctx_->sent_initial_metadata_); |
| 359 |
| 360 CallOpSet<CallOpSendInitialMetadata> ops; |
| 361 ops.SendInitialMetadata(ctx_->initial_metadata_); |
| 362 ctx_->sent_initial_metadata_ = true; |
| 363 call_->PerformOps(&ops); |
| 364 call_->cq()->Pluck(&ops); |
| 365 } |
| 366 |
| 367 using WriterInterface<W>::Write; |
| 368 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { |
| 369 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; |
| 370 if (!ops.SendMessage(msg, options).ok()) { |
| 371 return false; |
| 372 } |
| 373 if (!ctx_->sent_initial_metadata_) { |
| 374 ops.SendInitialMetadata(ctx_->initial_metadata_); |
| 375 ctx_->sent_initial_metadata_ = true; |
| 376 } |
| 377 call_->PerformOps(&ops); |
| 378 return call_->cq()->Pluck(&ops); |
| 379 } |
| 380 |
| 381 private: |
| 382 Call* const call_; |
| 383 ServerContext* const ctx_; |
| 384 }; |
| 385 |
| 386 /// Server-side interface for bi-directional streaming. |
| 387 template <class W, class R> |
| 388 class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, |
| 389 public ReaderInterface<R> { |
| 390 public: |
| 391 ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} |
| 392 |
| 393 void SendInitialMetadata() { |
| 394 GPR_ASSERT(!ctx_->sent_initial_metadata_); |
| 395 |
| 396 CallOpSet<CallOpSendInitialMetadata> ops; |
| 397 ops.SendInitialMetadata(ctx_->initial_metadata_); |
| 398 ctx_->sent_initial_metadata_ = true; |
| 399 call_->PerformOps(&ops); |
| 400 call_->cq()->Pluck(&ops); |
| 401 } |
| 402 |
| 403 bool Read(R* msg) GRPC_OVERRIDE { |
| 404 CallOpSet<CallOpRecvMessage<R>> ops; |
| 405 ops.RecvMessage(msg); |
| 406 call_->PerformOps(&ops); |
| 407 return call_->cq()->Pluck(&ops) && ops.got_message; |
| 408 } |
| 409 |
| 410 using WriterInterface<W>::Write; |
| 411 bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { |
| 412 CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; |
| 413 if (!ops.SendMessage(msg, options).ok()) { |
| 414 return false; |
| 415 } |
| 416 if (!ctx_->sent_initial_metadata_) { |
| 417 ops.SendInitialMetadata(ctx_->initial_metadata_); |
| 418 ctx_->sent_initial_metadata_ = true; |
| 419 } |
| 420 call_->PerformOps(&ops); |
| 421 return call_->cq()->Pluck(&ops); |
| 422 } |
| 423 |
| 424 private: |
| 425 Call* const call_; |
| 426 ServerContext* const ctx_; |
| 427 }; |
| 428 |
| 429 } // namespace grpc |
| 430 |
| 431 #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H |
OLD | NEW |