Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(174)

Unified Diff: third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m
diff --git a/third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m b/third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m
new file mode 100644
index 0000000000000000000000000000000000000000..f79b7d0bc0cf08920c78619eca04c41de2d9112c
--- /dev/null
+++ b/third_party/grpc/src/objective-c/GRPCClient/GRPCCall.m
@@ -0,0 +1,388 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import "GRPCCall.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/time.h>
+#import <RxLibrary/GRXConcurrentWriteable.h>
+
+#import "private/GRPCRequestHeaders.h"
+#import "private/GRPCWrappedCall.h"
+#import "private/NSData+GRPC.h"
+#import "private/NSDictionary+GRPC.h"
+#import "private/NSError+GRPC.h"
+
+NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey";
+NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
+
+@interface GRPCCall () <GRXWriteable>
+// Make them read-write.
+@property(atomic, strong) NSDictionary *responseHeaders;
+@property(atomic, strong) NSDictionary *responseTrailers;
+@end
+
+// The following methods of a C gRPC call object aren't reentrant, and thus
+// calls to them must be serialized:
+// - start_batch
+// - destroy
+//
+// start_batch with a SEND_MESSAGE argument can only be called after the
+// OP_COMPLETE event for any previous write is received. This is achieved by
+// pausing the requests writer immediately every time it writes a value, and
+// resuming it again when OP_COMPLETE is received.
+//
+// Similarly, start_batch with a RECV_MESSAGE argument can only be called after
+// the OP_COMPLETE event for any previous read is received.This is easier to
+// enforce, as we're writing the received messages into the writeable:
+// start_batch is enqueued once upon receiving the OP_COMPLETE event for the
+// RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
+// each RECV_MESSAGE batch.
+@implementation GRPCCall {
+ dispatch_queue_t _callQueue;
+
+ GRPCWrappedCall *_wrappedCall;
+ dispatch_once_t _callAlreadyInvoked;
+
+ // The C gRPC library has less guarantees on the ordering of events than we
+ // do. Particularly, in the face of errors, there's no ordering guarantee at
+ // all. This wrapper over our actual writeable ensures thread-safety and
+ // correct ordering.
+ GRXConcurrentWriteable *_responseWriteable;
+
+ // The network thread wants the requestWriter to resume (when the server is ready for more input),
+ // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
+ // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
+ // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
+ // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
+ // pause the writer immediately on writeValue:, so we need our locking to be recursive.
+ GRXWriter *_requestWriter;
+
+ // To create a retain cycle when a call is started, up until it finishes. See
+ // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
+ // reference to the call object if all they're interested in is the handler being executed when
+ // the response arrives.
+ GRPCCall *_retainSelf;
+
+ GRPCRequestHeaders *_requestHeaders;
+}
+
+@synthesize state = _state;
+
+- (instancetype)init {
+ return [self initWithHost:nil path:nil requestsWriter:nil];
+}
+
+// Designated initializer
+- (instancetype)initWithHost:(NSString *)host
+ path:(NSString *)path
+ requestsWriter:(GRXWriter *)requestWriter {
+ if (!host || !path) {
+ [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."];
+ }
+ if (requestWriter.state != GRXWriterStateNotStarted) {
+ [NSException raise:NSInvalidArgumentException
+ format:@"The requests writer can't be already started."];
+ }
+ if ((self = [super init])) {
+ _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:host path:path];
+ if (!_wrappedCall) {
+ return nil;
+ }
+
+ // Serial queue to invoke the non-reentrant methods of the grpc_call object.
+ _callQueue = dispatch_queue_create("org.grpc.call", NULL);
+
+ _requestWriter = requestWriter;
+
+ _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
+ }
+ return self;
+}
+
+#pragma mark Finish
+
+- (void)finishWithError:(NSError *)errorOrNil {
+ // If the call isn't retained anywhere else, it can be deallocated now.
+ _retainSelf = nil;
+
+ // If there were still request messages coming, stop them.
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStateFinished;
+ }
+
+ if (errorOrNil) {
+ [_responseWriteable cancelWithError:errorOrNil];
+ } else {
+ [_responseWriteable enqueueSuccessfulCompletion];
+ }
+}
+
+- (void)cancelCall {
+ // Can be called from any thread, any number of times.
+ [_wrappedCall cancel];
+}
+
+- (void)cancel {
+ [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:nil]];
+ [self cancelCall];
+}
+
+- (void)dealloc {
+ __block GRPCWrappedCall *wrappedCall = _wrappedCall;
+ dispatch_async(_callQueue, ^{
+ wrappedCall = nil;
+ });
+}
+
+#pragma mark Read messages
+
+// Only called from the call queue.
+// The handler will be called from the network queue.
+- (void)startReadWithHandler:(void(^)(grpc_byte_buffer *))handler {
+ // TODO(jcanizales): Add error handlers for async failures
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMessage alloc] initWithHandler:handler]]];
+}
+
+// Called initially from the network queue once response headers are received,
+// then "recursively" from the responseWriteable queue after each response from the
+// server has been written.
+// If the call is currently paused, this is a noop. Restarting the call will invoke this
+// method.
+// TODO(jcanizales): Rename to readResponseIfNotPaused.
+- (void)startNextRead {
+ if (self.state == GRXWriterStatePaused) {
+ return;
+ }
+ __weak GRPCCall *weakSelf = self;
+ __weak GRXConcurrentWriteable *weakWriteable = _responseWriteable;
+
+ dispatch_async(_callQueue, ^{
+ [weakSelf startReadWithHandler:^(grpc_byte_buffer *message) {
+ if (message == NULL) {
+ // No more messages from the server
+ return;
+ }
+ NSData *data = [NSData grpc_dataWithByteBuffer:message];
+ grpc_byte_buffer_destroy(message);
+ if (!data) {
+ // The app doesn't have enough memory to hold the server response. We
+ // don't want to throw, because the app shouldn't crash for a behavior
+ // that's on the hands of any server to have. Instead we finish and ask
+ // the server to cancel.
+ //
+ // TODO(jcanizales): No canonical code is appropriate for this situation
+ // (because it's just a client problem). Use another domain and an
+ // appropriately-documented code.
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ [weakSelf cancelCall];
+ return;
+ }
+ [weakWriteable enqueueValue:data completionHandler:^{
+ [weakSelf startNextRead];
+ }];
+ }];
+ });
+}
+
+#pragma mark Send headers
+
+- (void)sendHeaders:(NSDictionary *)headers {
+ // TODO(jcanizales): Add error handlers for async failures
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers
+ handler:nil]]];
+}
+
+#pragma mark GRXWriteable implementation
+
+// Only called from the call queue. The error handler will be called from the
+// network queue if the write didn't succeed.
+- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
+
+ __weak GRPCCall *weakSelf = self;
+ void(^resumingHandler)(void) = ^{
+ // Resume the request writer.
+ GRPCCall *strongSelf = weakSelf;
+ if (strongSelf) {
+ @synchronized(strongSelf->_requestWriter) {
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ }
+ }
+ };
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler]]
+ errorHandler:errorHandler];
+}
+
+- (void)writeValue:(id)value {
+ // TODO(jcanizales): Throw/assert if value isn't NSData.
+
+ // Pause the input and only resume it when the C layer notifies us that writes
+ // can proceed.
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStatePaused;
+ }
+
+ __weak GRPCCall *weakSelf = self;
+ dispatch_async(_callQueue, ^{
+ [weakSelf writeMessage:value withErrorHandler:^{
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ }];
+ });
+}
+
+// Only called from the call queue. The error handler will be called from the
+// network queue if the requests stream couldn't be closed successfully.
+- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]]
+ errorHandler:errorHandler];
+}
+
+- (void)writesFinishedWithError:(NSError *)errorOrNil {
+ if (errorOrNil) {
+ [self cancel];
+ } else {
+ __weak GRPCCall *weakSelf = self;
+ dispatch_async(_callQueue, ^{
+ [weakSelf finishRequestWithErrorHandler:^{
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ }];
+ });
+ }
+}
+
+#pragma mark Invoke
+
+// Both handlers will eventually be called, from the network queue. Writes can start immediately
+// after this.
+// The first one (headersHandler), when the response headers are received.
+// The second one (completionHandler), whenever the RPC finishes for any reason.
+- (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler
+ completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler {
+ // TODO(jcanizales): Add error handlers for async failures
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc]
+ initWithHandler:headersHandler]]];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc]
+ initWithHandler:completionHandler]]];
+}
+
+- (void)invokeCall {
+ __weak GRPCCall *weakSelf = self;
+ [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
+ // Response headers received.
+ GRPCCall *strongSelf = weakSelf;
+ if (strongSelf) {
+ strongSelf.responseHeaders = headers;
+ [strongSelf startNextRead];
+ }
+ } completionHandler:^(NSError *error, NSDictionary *trailers) {
+ GRPCCall *strongSelf = weakSelf;
+ if (strongSelf) {
+ strongSelf.responseTrailers = trailers;
+
+ if (error) {
+ NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
+ if (error.userInfo) {
+ [userInfo addEntriesFromDictionary:error.userInfo];
+ }
+ userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
+ // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
+ // called before this one, so an error might end up with trailers but no headers. We
+ // shouldn't call finishWithError until ater both blocks are called. It is also when this is
+ // done that we can provide a merged view of response headers and trailers in a thread-safe
+ // way.
+ if (strongSelf.responseHeaders) {
+ userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
+ }
+ error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
+ }
+ [strongSelf finishWithError:error];
+ }
+ }];
+ // Now that the RPC has been initiated, request writes can start.
+ @synchronized(_requestWriter) {
+ [_requestWriter startWithWriteable:self];
+ }
+}
+
+#pragma mark GRXWriter implementation
+
+- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
+ // This makes RPCs in which the call isn't externally retained possible (as long as it is started
+ // before being autoreleased).
+ // Care is taken not to retain self strongly in any of the blocks used in this implementation, so
+ // that the life of the instance is determined by this retain cycle.
+ _retainSelf = self;
+
+ _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
+ [self sendHeaders:_requestHeaders];
+ [self invokeCall];
+}
+
+- (void)setState:(GRXWriterState)newState {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
+ return;
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ _state = newState;
+ // Per GRXWriter's contract, setting the state to Finished manually
+ // means one doesn't wish the writeable to be messaged anymore.
+ [_responseWriteable cancelSilently];
+ _responseWriteable = nil;
+ return;
+ case GRXWriterStatePaused:
+ _state = newState;
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ [self startNextRead];
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
+}
+@end

Powered by Google App Engine
This is Rietveld 408576698