Index: third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m |
diff --git a/third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m b/third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f79b7d0bc0cf08920c78619eca04c41de2d9112c |
--- /dev/null |
+++ b/third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m |
@@ -0,0 +1,388 @@ |
+/* |
+ * |
+ * Copyright 2015, Google Inc. |
+ * All rights reserved. |
+ * |
+ * Redistribution and use in source and binary forms, with or without |
+ * modification, are permitted provided that the following conditions are |
+ * met: |
+ * |
+ * * Redistributions of source code must retain the above copyright |
+ * notice, this list of conditions and the following disclaimer. |
+ * * Redistributions in binary form must reproduce the above |
+ * copyright notice, this list of conditions and the following disclaimer |
+ * in the documentation and/or other materials provided with the |
+ * distribution. |
+ * * Neither the name of Google Inc. nor the names of its |
+ * contributors may be used to endorse or promote products derived from |
+ * this software without specific prior written permission. |
+ * |
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ * |
+ */ |
+ |
+#import "GRPCCall.h" |
+ |
+#include <grpc/grpc.h> |
+#include <grpc/support/time.h> |
+#import <RxLibrary/GRXConcurrentWriteable.h> |
+ |
+#import "private/GRPCRequestHeaders.h" |
+#import "private/GRPCWrappedCall.h" |
+#import "private/NSData+GRPC.h" |
+#import "private/NSDictionary+GRPC.h" |
+#import "private/NSError+GRPC.h" |
+ |
+NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey"; |
+NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; |
+ |
+@interface GRPCCall () <GRXWriteable> |
+// Make them read-write. |
+@property(atomic, strong) NSDictionary *responseHeaders; |
+@property(atomic, strong) NSDictionary *responseTrailers; |
+@end |
+ |
+// The following methods of a C gRPC call object aren't reentrant, and thus |
+// calls to them must be serialized: |
+// - start_batch |
+// - destroy |
+// |
+// start_batch with a SEND_MESSAGE argument can only be called after the |
+// OP_COMPLETE event for any previous write is received. This is achieved by |
+// pausing the requests writer immediately every time it writes a value, and |
+// resuming it again when OP_COMPLETE is received. |
+// |
+// Similarly, start_batch with a RECV_MESSAGE argument can only be called after |
+// the OP_COMPLETE event for any previous read is received.This is easier to |
+// enforce, as we're writing the received messages into the writeable: |
+// start_batch is enqueued once upon receiving the OP_COMPLETE event for the |
+// RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for |
+// each RECV_MESSAGE batch. |
+@implementation GRPCCall { |
+ dispatch_queue_t _callQueue; |
+ |
+ GRPCWrappedCall *_wrappedCall; |
+ dispatch_once_t _callAlreadyInvoked; |
+ |
+ // The C gRPC library has less guarantees on the ordering of events than we |
+ // do. Particularly, in the face of errors, there's no ordering guarantee at |
+ // all. This wrapper over our actual writeable ensures thread-safety and |
+ // correct ordering. |
+ GRXConcurrentWriteable *_responseWriteable; |
+ |
+ // The network thread wants the requestWriter to resume (when the server is ready for more input), |
+ // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop |
+ // it. Because a writer isn't thread-safe, we'll synchronize those operations on it. |
+ // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or |
+ // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to |
+ // pause the writer immediately on writeValue:, so we need our locking to be recursive. |
+ GRXWriter *_requestWriter; |
+ |
+ // To create a retain cycle when a call is started, up until it finishes. See |
+ // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a |
+ // reference to the call object if all they're interested in is the handler being executed when |
+ // the response arrives. |
+ GRPCCall *_retainSelf; |
+ |
+ GRPCRequestHeaders *_requestHeaders; |
+} |
+ |
+@synthesize state = _state; |
+ |
+- (instancetype)init { |
+ return [self initWithHost:nil path:nil requestsWriter:nil]; |
+} |
+ |
+// Designated initializer |
+- (instancetype)initWithHost:(NSString *)host |
+ path:(NSString *)path |
+ requestsWriter:(GRXWriter *)requestWriter { |
+ if (!host || !path) { |
+ [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."]; |
+ } |
+ if (requestWriter.state != GRXWriterStateNotStarted) { |
+ [NSException raise:NSInvalidArgumentException |
+ format:@"The requests writer can't be already started."]; |
+ } |
+ if ((self = [super init])) { |
+ _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:host path:path]; |
+ if (!_wrappedCall) { |
+ return nil; |
+ } |
+ |
+ // Serial queue to invoke the non-reentrant methods of the grpc_call object. |
+ _callQueue = dispatch_queue_create("org.grpc.call", NULL); |
+ |
+ _requestWriter = requestWriter; |
+ |
+ _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self]; |
+ } |
+ return self; |
+} |
+ |
+#pragma mark Finish |
+ |
+- (void)finishWithError:(NSError *)errorOrNil { |
+ // If the call isn't retained anywhere else, it can be deallocated now. |
+ _retainSelf = nil; |
+ |
+ // If there were still request messages coming, stop them. |
+ @synchronized(_requestWriter) { |
+ _requestWriter.state = GRXWriterStateFinished; |
+ } |
+ |
+ if (errorOrNil) { |
+ [_responseWriteable cancelWithError:errorOrNil]; |
+ } else { |
+ [_responseWriteable enqueueSuccessfulCompletion]; |
+ } |
+} |
+ |
+- (void)cancelCall { |
+ // Can be called from any thread, any number of times. |
+ [_wrappedCall cancel]; |
+} |
+ |
+- (void)cancel { |
+ [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
+ code:GRPCErrorCodeCancelled |
+ userInfo:nil]]; |
+ [self cancelCall]; |
+} |
+ |
+- (void)dealloc { |
+ __block GRPCWrappedCall *wrappedCall = _wrappedCall; |
+ dispatch_async(_callQueue, ^{ |
+ wrappedCall = nil; |
+ }); |
+} |
+ |
+#pragma mark Read messages |
+ |
+// Only called from the call queue. |
+// The handler will be called from the network queue. |
+- (void)startReadWithHandler:(void(^)(grpc_byte_buffer *))handler { |
+ // TODO(jcanizales): Add error handlers for async failures |
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHandler:handler]]]; |
+} |
+ |
+// Called initially from the network queue once response headers are received, |
+// then "recursively" from the responseWriteable queue after each response from the |
+// server has been written. |
+// If the call is currently paused, this is a noop. Restarting the call will invoke this |
+// method. |
+// TODO(jcanizales): Rename to readResponseIfNotPaused. |
+- (void)startNextRead { |
+ if (self.state == GRXWriterStatePaused) { |
+ return; |
+ } |
+ __weak GRPCCall *weakSelf = self; |
+ __weak GRXConcurrentWriteable *weakWriteable = _responseWriteable; |
+ |
+ dispatch_async(_callQueue, ^{ |
+ [weakSelf startReadWithHandler:^(grpc_byte_buffer *message) { |
+ if (message == NULL) { |
+ // No more messages from the server |
+ return; |
+ } |
+ NSData *data = [NSData grpc_dataWithByteBuffer:message]; |
+ grpc_byte_buffer_destroy(message); |
+ if (!data) { |
+ // The app doesn't have enough memory to hold the server response. We |
+ // don't want to throw, because the app shouldn't crash for a behavior |
+ // that's on the hands of any server to have. Instead we finish and ask |
+ // the server to cancel. |
+ // |
+ // TODO(jcanizales): No canonical code is appropriate for this situation |
+ // (because it's just a client problem). Use another domain and an |
+ // appropriately-documented code. |
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
+ code:GRPCErrorCodeInternal |
+ userInfo:nil]]; |
+ [weakSelf cancelCall]; |
+ return; |
+ } |
+ [weakWriteable enqueueValue:data completionHandler:^{ |
+ [weakSelf startNextRead]; |
+ }]; |
+ }]; |
+ }); |
+} |
+ |
+#pragma mark Send headers |
+ |
+- (void)sendHeaders:(NSDictionary *)headers { |
+ // TODO(jcanizales): Add error handlers for async failures |
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers |
+ handler:nil]]]; |
+} |
+ |
+#pragma mark GRXWriteable implementation |
+ |
+// Only called from the call queue. The error handler will be called from the |
+// network queue if the write didn't succeed. |
+- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler { |
+ |
+ __weak GRPCCall *weakSelf = self; |
+ void(^resumingHandler)(void) = ^{ |
+ // Resume the request writer. |
+ GRPCCall *strongSelf = weakSelf; |
+ if (strongSelf) { |
+ @synchronized(strongSelf->_requestWriter) { |
+ strongSelf->_requestWriter.state = GRXWriterStateStarted; |
+ } |
+ } |
+ }; |
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message |
+ handler:resumingHandler]] |
+ errorHandler:errorHandler]; |
+} |
+ |
+- (void)writeValue:(id)value { |
+ // TODO(jcanizales): Throw/assert if value isn't NSData. |
+ |
+ // Pause the input and only resume it when the C layer notifies us that writes |
+ // can proceed. |
+ @synchronized(_requestWriter) { |
+ _requestWriter.state = GRXWriterStatePaused; |
+ } |
+ |
+ __weak GRPCCall *weakSelf = self; |
+ dispatch_async(_callQueue, ^{ |
+ [weakSelf writeMessage:value withErrorHandler:^{ |
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
+ code:GRPCErrorCodeInternal |
+ userInfo:nil]]; |
+ }]; |
+ }); |
+} |
+ |
+// Only called from the call queue. The error handler will be called from the |
+// network queue if the requests stream couldn't be closed successfully. |
+- (void)finishRequestWithErrorHandler:(void (^)())errorHandler { |
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]] |
+ errorHandler:errorHandler]; |
+} |
+ |
+- (void)writesFinishedWithError:(NSError *)errorOrNil { |
+ if (errorOrNil) { |
+ [self cancel]; |
+ } else { |
+ __weak GRPCCall *weakSelf = self; |
+ dispatch_async(_callQueue, ^{ |
+ [weakSelf finishRequestWithErrorHandler:^{ |
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain |
+ code:GRPCErrorCodeInternal |
+ userInfo:nil]]; |
+ }]; |
+ }); |
+ } |
+} |
+ |
+#pragma mark Invoke |
+ |
+// Both handlers will eventually be called, from the network queue. Writes can start immediately |
+// after this. |
+// The first one (headersHandler), when the response headers are received. |
+// The second one (completionHandler), whenever the RPC finishes for any reason. |
+- (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler |
+ completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler { |
+ // TODO(jcanizales): Add error handlers for async failures |
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc] |
+ initWithHandler:headersHandler]]]; |
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc] |
+ initWithHandler:completionHandler]]]; |
+} |
+ |
+- (void)invokeCall { |
+ __weak GRPCCall *weakSelf = self; |
+ [self invokeCallWithHeadersHandler:^(NSDictionary *headers) { |
+ // Response headers received. |
+ GRPCCall *strongSelf = weakSelf; |
+ if (strongSelf) { |
+ strongSelf.responseHeaders = headers; |
+ [strongSelf startNextRead]; |
+ } |
+ } completionHandler:^(NSError *error, NSDictionary *trailers) { |
+ GRPCCall *strongSelf = weakSelf; |
+ if (strongSelf) { |
+ strongSelf.responseTrailers = trailers; |
+ |
+ if (error) { |
+ NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; |
+ if (error.userInfo) { |
+ [userInfo addEntriesFromDictionary:error.userInfo]; |
+ } |
+ userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; |
+ // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be |
+ // called before this one, so an error might end up with trailers but no headers. We |
+ // shouldn't call finishWithError until ater both blocks are called. It is also when this is |
+ // done that we can provide a merged view of response headers and trailers in a thread-safe |
+ // way. |
+ if (strongSelf.responseHeaders) { |
+ userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; |
+ } |
+ error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; |
+ } |
+ [strongSelf finishWithError:error]; |
+ } |
+ }]; |
+ // Now that the RPC has been initiated, request writes can start. |
+ @synchronized(_requestWriter) { |
+ [_requestWriter startWithWriteable:self]; |
+ } |
+} |
+ |
+#pragma mark GRXWriter implementation |
+ |
+- (void)startWithWriteable:(id<GRXWriteable>)writeable { |
+ // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). |
+ // This makes RPCs in which the call isn't externally retained possible (as long as it is started |
+ // before being autoreleased). |
+ // Care is taken not to retain self strongly in any of the blocks used in this implementation, so |
+ // that the life of the instance is determined by this retain cycle. |
+ _retainSelf = self; |
+ |
+ _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; |
+ [self sendHeaders:_requestHeaders]; |
+ [self invokeCall]; |
+} |
+ |
+- (void)setState:(GRXWriterState)newState { |
+ // Manual transitions are only allowed from the started or paused states. |
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { |
+ return; |
+ } |
+ |
+ switch (newState) { |
+ case GRXWriterStateFinished: |
+ _state = newState; |
+ // Per GRXWriter's contract, setting the state to Finished manually |
+ // means one doesn't wish the writeable to be messaged anymore. |
+ [_responseWriteable cancelSilently]; |
+ _responseWriteable = nil; |
+ return; |
+ case GRXWriterStatePaused: |
+ _state = newState; |
+ return; |
+ case GRXWriterStateStarted: |
+ if (_state == GRXWriterStatePaused) { |
+ _state = newState; |
+ [self startNextRead]; |
+ } |
+ return; |
+ case GRXWriterStateNotStarted: |
+ return; |
+ } |
+} |
+@end |