Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(572)

Side by Side Diff: third_party/grpc/src/ruby/lib/grpc/generic/active_call.rb

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015, Google Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following disclaimer
12 # in the documentation and/or other materials provided with the
13 # distribution.
14 # * Neither the name of Google Inc. nor the names of its
15 # contributors may be used to endorse or promote products derived from
16 # this software without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 require 'forwardable'
31 require 'grpc/generic/bidi_call'
32
33 class Struct
34 # BatchResult is the struct returned by calls to call#start_batch.
35 class BatchResult
36 # check_status returns the status, raising an error if the status
37 # is non-nil and not OK.
38 def check_status
39 return nil if status.nil?
40 fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
41 if status.code != GRPC::Core::StatusCodes::OK
42 GRPC.logger.debug("Failing with status #{status}")
43 # raise BadStatus, propagating the metadata if present.
44 md = status.metadata
45 with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }]
46 fail GRPC::BadStatus.new(status.code, status.details, **with_sym_keys)
47 end
48 status
49 end
50 end
51 end
52
53 # GRPC contains the General RPC module.
54 module GRPC
55 # The ActiveCall class provides simple methods for sending marshallable
56 # data to a call
57 class ActiveCall
58 include Core::TimeConsts
59 include Core::CallOps
60 extend Forwardable
61 attr_reader(:deadline)
62 def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=
63
64 # client_invoke begins a client invocation.
65 #
66 # Flow Control note: this blocks until flow control accepts that client
67 # request can go ahead.
68 #
69 # deadline is the absolute deadline for the call.
70 #
71 # == Keyword Arguments ==
72 # any keyword arguments are treated as metadata to be sent to the server
73 # if a keyword value is a list, multiple metadata for it's key are sent
74 #
75 # @param call [Call] a call on which to start and invocation
76 # @param q [CompletionQueue] the completion queue
77 def self.client_invoke(call, q, **kw)
78 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
79 unless q.is_a? Core::CompletionQueue
80 fail(TypeError, '!Core::CompletionQueue')
81 end
82 metadata_tag = Object.new
83 call.run_batch(q, metadata_tag, INFINITE_FUTURE,
84 SEND_INITIAL_METADATA => kw)
85 metadata_tag
86 end
87
88 # Creates an ActiveCall.
89 #
90 # ActiveCall should only be created after a call is accepted. That
91 # means different things on a client and a server. On the client, the
92 # call is accepted after calling call.invoke. On the server, this is
93 # after call.accept.
94 #
95 # #initialize cannot determine if the call is accepted or not; so if a
96 # call that's not accepted is used here, the error won't be visible until
97 # the ActiveCall methods are called.
98 #
99 # deadline is the absolute deadline for the call.
100 #
101 # @param call [Call] the call used by the ActiveCall
102 # @param q [CompletionQueue] the completion queue used to accept
103 # the call
104 # @param marshal [Function] f(obj)->string that marshal requests
105 # @param unmarshal [Function] f(string)->obj that unmarshals responses
106 # @param deadline [Fixnum] the deadline for the call to complete
107 # @param metadata_tag [Object] the object use obtain metadata for clients
108 # @param started [true|false] indicates if the call has begun
109 def initialize(call, q, marshal, unmarshal, deadline, started: true,
110 metadata_tag: nil)
111 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
112 unless q.is_a? Core::CompletionQueue
113 fail(TypeError, '!Core::CompletionQueue')
114 end
115 @call = call
116 @cq = q
117 @deadline = deadline
118 @marshal = marshal
119 @started = started
120 @unmarshal = unmarshal
121 @metadata_tag = metadata_tag
122 @op_notifier = nil
123 end
124
125 # output_metadata are provides access to hash that can be used to
126 # save metadata to be sent as trailer
127 def output_metadata
128 @output_metadata ||= {}
129 end
130
131 # cancelled indicates if the call was cancelled
132 def cancelled
133 !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
134 end
135
136 # multi_req_view provides a restricted view of this ActiveCall for use
137 # in a server client-streaming handler.
138 def multi_req_view
139 MultiReqView.new(self)
140 end
141
142 # single_req_view provides a restricted view of this ActiveCall for use in
143 # a server request-response handler.
144 def single_req_view
145 SingleReqView.new(self)
146 end
147
148 # operation provides a restricted view of this ActiveCall for use as
149 # a Operation.
150 def operation
151 @op_notifier = Notifier.new
152 Operation.new(self)
153 end
154
155 # writes_done indicates that all writes are completed.
156 #
157 # It blocks until the remote endpoint acknowledges with at status unless
158 # assert_finished is set to false. Any calls to #remote_send after this
159 # call will fail.
160 #
161 # @param assert_finished [true, false] when true(default), waits for
162 # FINISHED.
163 def writes_done(assert_finished = true)
164 ops = {
165 SEND_CLOSE_FROM_CLIENT => nil
166 }
167 ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
168 batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
169 return unless assert_finished
170 @call.status = batch_result.status
171 op_is_done
172 batch_result.check_status
173 end
174
175 # finished waits until a client call is completed.
176 #
177 # It blocks until the remote endpoint acknowledges by sending a status.
178 def finished
179 batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
180 RECV_STATUS_ON_CLIENT => nil)
181 unless batch_result.status.nil?
182 if @call.metadata.nil?
183 @call.metadata = batch_result.status.metadata
184 else
185 @call.metadata.merge!(batch_result.status.metadata)
186 end
187 end
188 @call.status = batch_result.status
189 op_is_done
190 batch_result.check_status
191 end
192
193 # remote_send sends a request to the remote endpoint.
194 #
195 # It blocks until the remote endpoint accepts the message.
196 #
197 # @param req [Object, String] the object to send or it's marshal form.
198 # @param marshalled [false, true] indicates if the object is already
199 # marshalled.
200 def remote_send(req, marshalled = false)
201 GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
202 payload = marshalled ? req : @marshal.call(req)
203 @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
204 end
205
206 # send_status sends a status to the remote endpoint.
207 #
208 # @param code [int] the status code to send
209 # @param details [String] details
210 # @param assert_finished [true, false] when true(default), waits for
211 # FINISHED.
212 #
213 # == Keyword Arguments ==
214 # any keyword arguments are treated as metadata to be sent to the server
215 # if a keyword value is a list, multiple metadata for it's key are sent
216 def send_status(code = OK, details = '', assert_finished = false, **kw)
217 ops = {
218 SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, kw)
219 }
220 ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
221 @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
222 nil
223 end
224
225 # remote_read reads a response from the remote endpoint.
226 #
227 # It blocks until the remote endpoint replies with a message or status.
228 # On receiving a message, it returns the response after unmarshalling it.
229 # On receiving a status, it returns nil if the status is OK, otherwise
230 # raising BadStatus
231 def remote_read
232 ops = { RECV_MESSAGE => nil }
233 ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
234 batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
235 unless @metadata_tag.nil?
236 @call.metadata = batch_result.metadata
237 @metadata_tag = nil
238 end
239 GRPC.logger.debug("received req: #{batch_result}")
240 unless batch_result.nil? || batch_result.message.nil?
241 GRPC.logger.debug("received req.to_s: #{batch_result.message}")
242 res = @unmarshal.call(batch_result.message)
243 GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}")
244 return res
245 end
246 GRPC.logger.debug('found nil; the final response has been sent')
247 nil
248 end
249
250 # each_remote_read passes each response to the given block or returns an
251 # enumerator the responses if no block is given.
252 #
253 # == Enumerator ==
254 #
255 # * #next blocks until the remote endpoint sends a READ or FINISHED
256 # * for each read, enumerator#next yields the response
257 # * on status
258 # * if it's is OK, enumerator#next raises StopException
259 # * if is not OK, enumerator#next raises RuntimeException
260 #
261 # == Block ==
262 #
263 # * if provided it is executed for each response
264 # * the call blocks until no more responses are provided
265 #
266 # @return [Enumerator] if no block was given
267 def each_remote_read
268 return enum_for(:each_remote_read) unless block_given?
269 loop do
270 resp = remote_read
271 break if resp.nil? # the last response was received
272 yield resp
273 end
274 end
275
276 # each_remote_read_then_finish passes each response to the given block or
277 # returns an enumerator of the responses if no block is given.
278 #
279 # It is like each_remote_read, but it blocks on finishing on detecting
280 # the final message.
281 #
282 # == Enumerator ==
283 #
284 # * #next blocks until the remote endpoint sends a READ or FINISHED
285 # * for each read, enumerator#next yields the response
286 # * on status
287 # * if it's is OK, enumerator#next raises StopException
288 # * if is not OK, enumerator#next raises RuntimeException
289 #
290 # == Block ==
291 #
292 # * if provided it is executed for each response
293 # * the call blocks until no more responses are provided
294 #
295 # @return [Enumerator] if no block was given
296 def each_remote_read_then_finish
297 return enum_for(:each_remote_read_then_finish) unless block_given?
298 loop do
299 resp = remote_read
300 break if resp.is_a? Struct::Status # is an OK status
301 if resp.nil? # the last response was received, but not finished yet
302 finished
303 break
304 end
305 yield resp
306 end
307 end
308
309 # request_response sends a request to a GRPC server, and returns the
310 # response.
311 #
312 # == Keyword Arguments ==
313 # any keyword arguments are treated as metadata to be sent to the server
314 # if a keyword value is a list, multiple metadata for it's key are sent
315 #
316 # @param req [Object] the request sent to the server
317 # @return [Object] the response received from the server
318 def request_response(req, **kw)
319 start_call(**kw) unless @started
320 remote_send(req)
321 writes_done(false)
322 response = remote_read
323 finished unless response.is_a? Struct::Status
324 response
325 rescue GRPC::Core::CallError => e
326 finished # checks for Cancelled
327 raise e
328 end
329
330 # client_streamer sends a stream of requests to a GRPC server, and
331 # returns a single response.
332 #
333 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
334 # #each enumeration protocol. In the simplest case, requests will be an
335 # array of marshallable objects; in typical case it will be an Enumerable
336 # that allows dynamic construction of the marshallable objects.
337 #
338 # == Keyword Arguments ==
339 # any keyword arguments are treated as metadata to be sent to the server
340 # if a keyword value is a list, multiple metadata for it's key are sent
341 #
342 # @param requests [Object] an Enumerable of requests to send
343 # @return [Object] the response received from the server
344 def client_streamer(requests, **kw)
345 start_call(**kw) unless @started
346 requests.each { |r| remote_send(r) }
347 writes_done(false)
348 response = remote_read
349 finished unless response.is_a? Struct::Status
350 response
351 rescue GRPC::Core::CallError => e
352 finished # checks for Cancelled
353 raise e
354 end
355
356 # server_streamer sends one request to the GRPC server, which yields a
357 # stream of responses.
358 #
359 # responses provides an enumerator over the streamed responses, i.e. it
360 # follows Ruby's #each iteration protocol. The enumerator blocks while
361 # waiting for each response, stops when the server signals that no
362 # further responses will be supplied. If the implicit block is provided,
363 # it is executed with each response as the argument and no result is
364 # returned.
365 #
366 # == Keyword Arguments ==
367 # any keyword arguments are treated as metadata to be sent to the server
368 # if a keyword value is a list, multiple metadata for it's key are sent
369 # any keyword arguments are treated as metadata to be sent to the server.
370 #
371 # @param req [Object] the request sent to the server
372 # @return [Enumerator|nil] a response Enumerator
373 def server_streamer(req, **kw)
374 start_call(**kw) unless @started
375 remote_send(req)
376 writes_done(false)
377 replies = enum_for(:each_remote_read_then_finish)
378 return replies unless block_given?
379 replies.each { |r| yield r }
380 rescue GRPC::Core::CallError => e
381 finished # checks for Cancelled
382 raise e
383 end
384
385 # bidi_streamer sends a stream of requests to the GRPC server, and yields
386 # a stream of responses.
387 #
388 # This method takes an Enumerable of requests, and returns and enumerable
389 # of responses.
390 #
391 # == requests ==
392 #
393 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
394 # #each enumeration protocol. In the simplest case, requests will be an
395 # array of marshallable objects; in typical case it will be an
396 # Enumerable that allows dynamic construction of the marshallable
397 # objects.
398 #
399 # == responses ==
400 #
401 # This is an enumerator of responses. I.e, its #next method blocks
402 # waiting for the next response. Also, if at any point the block needs
403 # to consume all the remaining responses, this can be done using #each or
404 # #collect. Calling #each or #collect should only be done if
405 # the_call#writes_done has been called, otherwise the block will loop
406 # forever.
407 #
408 # == Keyword Arguments ==
409 # any keyword arguments are treated as metadata to be sent to the server
410 # if a keyword value is a list, multiple metadata for it's key are sent
411 #
412 # @param requests [Object] an Enumerable of requests to send
413 # @return [Enumerator, nil] a response Enumerator
414 def bidi_streamer(requests, **kw, &blk)
415 start_call(**kw) unless @started
416 bd = BidiCall.new(@call, @cq, @marshal, @unmarshal,
417 metadata_tag: @metadata_tag)
418 @metadata_tag = nil # run_on_client ensures metadata is read
419 bd.run_on_client(requests, @op_notifier, &blk)
420 end
421
422 # run_server_bidi orchestrates a BiDi stream processing on a server.
423 #
424 # N.B. gen_each_reply is a func(Enumerable<Requests>)
425 #
426 # It takes an enumerable of requests as an arg, in case there is a
427 # relationship between the stream of requests and the stream of replies.
428 #
429 # This does not mean that must necessarily be one. E.g, the replies
430 # produced by gen_each_reply could ignore the received_msgs
431 #
432 # @param gen_each_reply [Proc] generates the BiDi stream replies
433 def run_server_bidi(gen_each_reply)
434 bd = BidiCall.new(@call, @cq, @marshal, @unmarshal)
435 bd.run_on_server(gen_each_reply)
436 end
437
438 # Waits till an operation completes
439 def wait
440 return if @op_notifier.nil?
441 GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
442 @op_notifier.wait
443 end
444
445 # Signals that an operation is done
446 def op_is_done
447 return if @op_notifier.nil?
448 @op_notifier.notify(self)
449 end
450
451 private
452
453 # Starts the call if not already started
454 def start_call(**kw)
455 return if @started
456 @metadata_tag = ActiveCall.client_invoke(@call, @cq, **kw)
457 @started = true
458 end
459
460 def self.view_class(*visible_methods)
461 Class.new do
462 extend ::Forwardable
463 def_delegators :@wrapped, *visible_methods
464
465 # @param wrapped [ActiveCall] the call whose methods are shielded
466 def initialize(wrapped)
467 @wrapped = wrapped
468 end
469 end
470 end
471
472 # SingleReqView limits access to an ActiveCall's methods for use in server
473 # handlers that receive just one request.
474 SingleReqView = view_class(:cancelled, :deadline, :metadata,
475 :output_metadata)
476
477 # MultiReqView limits access to an ActiveCall's methods for use in
478 # server client_streamer handlers.
479 MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
480 :each_remote_read, :metadata, :output_metadata)
481
482 # Operation limits access to an ActiveCall's methods for use as
483 # a Operation on the client.
484 Operation = view_class(:cancel, :cancelled, :deadline, :execute,
485 :metadata, :status, :start_call, :wait, :write_flag,
486 :write_flag=)
487 end
488 end
OLDNEW
« no previous file with comments | « third_party/grpc/src/ruby/lib/grpc/errors.rb ('k') | third_party/grpc/src/ruby/lib/grpc/generic/bidi_call.rb » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698