Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(513)

Side by Side Diff: third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34 #import "GRPCCall.h"
35
36 #include <grpc/grpc.h>
37 #include <grpc/support/time.h>
38 #import <RxLibrary/GRXConcurrentWriteable.h>
39
40 #import "private/GRPCRequestHeaders.h"
41 #import "private/GRPCWrappedCall.h"
42 #import "private/NSData+GRPC.h"
43 #import "private/NSDictionary+GRPC.h"
44 #import "private/NSError+GRPC.h"
45
46 NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey";
47 NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
48
49 @interface GRPCCall () <GRXWriteable>
50 // Make them read-write.
51 @property(atomic, strong) NSDictionary *responseHeaders;
52 @property(atomic, strong) NSDictionary *responseTrailers;
53 @end
54
55 // The following methods of a C gRPC call object aren't reentrant, and thus
56 // calls to them must be serialized:
57 // - start_batch
58 // - destroy
59 //
60 // start_batch with a SEND_MESSAGE argument can only be called after the
61 // OP_COMPLETE event for any previous write is received. This is achieved by
62 // pausing the requests writer immediately every time it writes a value, and
63 // resuming it again when OP_COMPLETE is received.
64 //
65 // Similarly, start_batch with a RECV_MESSAGE argument can only be called after
66 // the OP_COMPLETE event for any previous read is received.This is easier to
67 // enforce, as we're writing the received messages into the writeable:
68 // start_batch is enqueued once upon receiving the OP_COMPLETE event for the
69 // RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
70 // each RECV_MESSAGE batch.
71 @implementation GRPCCall {
72 dispatch_queue_t _callQueue;
73
74 GRPCWrappedCall *_wrappedCall;
75 dispatch_once_t _callAlreadyInvoked;
76
77 // The C gRPC library has less guarantees on the ordering of events than we
78 // do. Particularly, in the face of errors, there's no ordering guarantee at
79 // all. This wrapper over our actual writeable ensures thread-safety and
80 // correct ordering.
81 GRXConcurrentWriteable *_responseWriteable;
82
83 // The network thread wants the requestWriter to resume (when the server is re ady for more input),
84 // or to stop (on errors), concurrently with user threads that want to start i t, pause it or stop
85 // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
86 // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
87 // writesFinishedWithError: on this GRPCCall as part of those operations. We w ant to be able to
88 // pause the writer immediately on writeValue:, so we need our locking to be r ecursive.
89 GRXWriter *_requestWriter;
90
91 // To create a retain cycle when a call is started, up until it finishes. See
92 // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
93 // reference to the call object if all they're interested in is the handler be ing executed when
94 // the response arrives.
95 GRPCCall *_retainSelf;
96
97 GRPCRequestHeaders *_requestHeaders;
98 }
99
100 @synthesize state = _state;
101
102 - (instancetype)init {
103 return [self initWithHost:nil path:nil requestsWriter:nil];
104 }
105
106 // Designated initializer
107 - (instancetype)initWithHost:(NSString *)host
108 path:(NSString *)path
109 requestsWriter:(GRXWriter *)requestWriter {
110 if (!host || !path) {
111 [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."];
112 }
113 if (requestWriter.state != GRXWriterStateNotStarted) {
114 [NSException raise:NSInvalidArgumentException
115 format:@"The requests writer can't be already started."];
116 }
117 if ((self = [super init])) {
118 _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:host path:path];
119 if (!_wrappedCall) {
120 return nil;
121 }
122
123 // Serial queue to invoke the non-reentrant methods of the grpc_call object.
124 _callQueue = dispatch_queue_create("org.grpc.call", NULL);
125
126 _requestWriter = requestWriter;
127
128 _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
129 }
130 return self;
131 }
132
133 #pragma mark Finish
134
135 - (void)finishWithError:(NSError *)errorOrNil {
136 // If the call isn't retained anywhere else, it can be deallocated now.
137 _retainSelf = nil;
138
139 // If there were still request messages coming, stop them.
140 @synchronized(_requestWriter) {
141 _requestWriter.state = GRXWriterStateFinished;
142 }
143
144 if (errorOrNil) {
145 [_responseWriteable cancelWithError:errorOrNil];
146 } else {
147 [_responseWriteable enqueueSuccessfulCompletion];
148 }
149 }
150
151 - (void)cancelCall {
152 // Can be called from any thread, any number of times.
153 [_wrappedCall cancel];
154 }
155
156 - (void)cancel {
157 [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
158 code:GRPCErrorCodeCancelled
159 userInfo:nil]];
160 [self cancelCall];
161 }
162
163 - (void)dealloc {
164 __block GRPCWrappedCall *wrappedCall = _wrappedCall;
165 dispatch_async(_callQueue, ^{
166 wrappedCall = nil;
167 });
168 }
169
170 #pragma mark Read messages
171
172 // Only called from the call queue.
173 // The handler will be called from the network queue.
174 - (void)startReadWithHandler:(void(^)(grpc_byte_buffer *))handler {
175 // TODO(jcanizales): Add error handlers for async failures
176 [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHa ndler:handler]]];
177 }
178
179 // Called initially from the network queue once response headers are received,
180 // then "recursively" from the responseWriteable queue after each response from the
181 // server has been written.
182 // If the call is currently paused, this is a noop. Restarting the call will inv oke this
183 // method.
184 // TODO(jcanizales): Rename to readResponseIfNotPaused.
185 - (void)startNextRead {
186 if (self.state == GRXWriterStatePaused) {
187 return;
188 }
189 __weak GRPCCall *weakSelf = self;
190 __weak GRXConcurrentWriteable *weakWriteable = _responseWriteable;
191
192 dispatch_async(_callQueue, ^{
193 [weakSelf startReadWithHandler:^(grpc_byte_buffer *message) {
194 if (message == NULL) {
195 // No more messages from the server
196 return;
197 }
198 NSData *data = [NSData grpc_dataWithByteBuffer:message];
199 grpc_byte_buffer_destroy(message);
200 if (!data) {
201 // The app doesn't have enough memory to hold the server response. We
202 // don't want to throw, because the app shouldn't crash for a behavior
203 // that's on the hands of any server to have. Instead we finish and ask
204 // the server to cancel.
205 //
206 // TODO(jcanizales): No canonical code is appropriate for this situation
207 // (because it's just a client problem). Use another domain and an
208 // appropriately-documented code.
209 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
210 code:GRPCErrorCodeInternal
211 userInfo:nil]];
212 [weakSelf cancelCall];
213 return;
214 }
215 [weakWriteable enqueueValue:data completionHandler:^{
216 [weakSelf startNextRead];
217 }];
218 }];
219 });
220 }
221
222 #pragma mark Send headers
223
224 - (void)sendHeaders:(NSDictionary *)headers {
225 // TODO(jcanizales): Add error handlers for async failures
226 [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithM etadata:headers
227 handler:nil]]];
228 }
229
230 #pragma mark GRXWriteable implementation
231
232 // Only called from the call queue. The error handler will be called from the
233 // network queue if the write didn't succeed.
234 - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
235
236 __weak GRPCCall *weakSelf = self;
237 void(^resumingHandler)(void) = ^{
238 // Resume the request writer.
239 GRPCCall *strongSelf = weakSelf;
240 if (strongSelf) {
241 @synchronized(strongSelf->_requestWriter) {
242 strongSelf->_requestWriter.state = GRXWriterStateStarted;
243 }
244 }
245 };
246 [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMe ssage:message
247 ha ndler:resumingHandler]]
248 errorHandler:errorHandler];
249 }
250
251 - (void)writeValue:(id)value {
252 // TODO(jcanizales): Throw/assert if value isn't NSData.
253
254 // Pause the input and only resume it when the C layer notifies us that writes
255 // can proceed.
256 @synchronized(_requestWriter) {
257 _requestWriter.state = GRXWriterStatePaused;
258 }
259
260 __weak GRPCCall *weakSelf = self;
261 dispatch_async(_callQueue, ^{
262 [weakSelf writeMessage:value withErrorHandler:^{
263 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
264 code:GRPCErrorCodeInternal
265 userInfo:nil]];
266 }];
267 });
268 }
269
270 // Only called from the call queue. The error handler will be called from the
271 // network queue if the requests stream couldn't be closed successfully.
272 - (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
273 [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
274 errorHandler:errorHandler];
275 }
276
277 - (void)writesFinishedWithError:(NSError *)errorOrNil {
278 if (errorOrNil) {
279 [self cancel];
280 } else {
281 __weak GRPCCall *weakSelf = self;
282 dispatch_async(_callQueue, ^{
283 [weakSelf finishRequestWithErrorHandler:^{
284 [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
285 code:GRPCErrorCodeInternal
286 userInfo:nil]];
287 }];
288 });
289 }
290 }
291
292 #pragma mark Invoke
293
294 // Both handlers will eventually be called, from the network queue. Writes can s tart immediately
295 // after this.
296 // The first one (headersHandler), when the response headers are received.
297 // The second one (completionHandler), whenever the RPC finishes for any reason.
298 - (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler
299 completionHandler:(void(^)(NSError *, NSDictionary *))comple tionHandler {
300 // TODO(jcanizales): Add error handlers for async failures
301 [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc]
302 initWithHandler:headersHandler]]];
303 [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc]
304 initWithHandler:completionHandler]]] ;
305 }
306
307 - (void)invokeCall {
308 __weak GRPCCall *weakSelf = self;
309 [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
310 // Response headers received.
311 GRPCCall *strongSelf = weakSelf;
312 if (strongSelf) {
313 strongSelf.responseHeaders = headers;
314 [strongSelf startNextRead];
315 }
316 } completionHandler:^(NSError *error, NSDictionary *trailers) {
317 GRPCCall *strongSelf = weakSelf;
318 if (strongSelf) {
319 strongSelf.responseTrailers = trailers;
320
321 if (error) {
322 NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
323 if (error.userInfo) {
324 [userInfo addEntriesFromDictionary:error.userInfo];
325 }
326 userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
327 // TODO(jcanizales): The C gRPC library doesn't guarantee that the heade rs block will be
328 // called before this one, so an error might end up with trailers but no headers. We
329 // shouldn't call finishWithError until ater both blocks are called. It is also when this is
330 // done that we can provide a merged view of response headers and traile rs in a thread-safe
331 // way.
332 if (strongSelf.responseHeaders) {
333 userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
334 }
335 error = [NSError errorWithDomain:error.domain code:error.code userInfo:u serInfo];
336 }
337 [strongSelf finishWithError:error];
338 }
339 }];
340 // Now that the RPC has been initiated, request writes can start.
341 @synchronized(_requestWriter) {
342 [_requestWriter startWithWriteable:self];
343 }
344 }
345
346 #pragma mark GRXWriter implementation
347
348 - (void)startWithWriteable:(id<GRXWriteable>)writeable {
349 // Create a retain cycle so that this instance lives until the RPC finishes (o r is cancelled).
350 // This makes RPCs in which the call isn't externally retained possible (as lo ng as it is started
351 // before being autoreleased).
352 // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
353 // that the life of the instance is determined by this retain cycle.
354 _retainSelf = self;
355
356 _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeab le];
357 [self sendHeaders:_requestHeaders];
358 [self invokeCall];
359 }
360
361 - (void)setState:(GRXWriterState)newState {
362 // Manual transitions are only allowed from the started or paused states.
363 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
364 return;
365 }
366
367 switch (newState) {
368 case GRXWriterStateFinished:
369 _state = newState;
370 // Per GRXWriter's contract, setting the state to Finished manually
371 // means one doesn't wish the writeable to be messaged anymore.
372 [_responseWriteable cancelSilently];
373 _responseWriteable = nil;
374 return;
375 case GRXWriterStatePaused:
376 _state = newState;
377 return;
378 case GRXWriterStateStarted:
379 if (_state == GRXWriterStatePaused) {
380 _state = newState;
381 [self startNextRead];
382 }
383 return;
384 case GRXWriterStateNotStarted:
385 return;
386 }
387 }
388 @end
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698