| Index: third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCall.cs
|
| diff --git a/third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCall.cs
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..2caba260b3a243d43bf9fe72ff627ed5a474acd7
|
| --- /dev/null
|
| +++ b/third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCall.cs
|
| @@ -0,0 +1,459 @@
|
| +#region Copyright notice and license
|
| +
|
| +// Copyright 2015-2016, Google Inc.
|
| +// All rights reserved.
|
| +//
|
| +// Redistribution and use in source and binary forms, with or without
|
| +// modification, are permitted provided that the following conditions are
|
| +// met:
|
| +//
|
| +// * Redistributions of source code must retain the above copyright
|
| +// notice, this list of conditions and the following disclaimer.
|
| +// * Redistributions in binary form must reproduce the above
|
| +// copyright notice, this list of conditions and the following disclaimer
|
| +// in the documentation and/or other materials provided with the
|
| +// distribution.
|
| +// * Neither the name of Google Inc. nor the names of its
|
| +// contributors may be used to endorse or promote products derived from
|
| +// this software without specific prior written permission.
|
| +//
|
| +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +#endregion
|
| +
|
| +using System;
|
| +using System.Diagnostics;
|
| +using System.Runtime.CompilerServices;
|
| +using System.Runtime.InteropServices;
|
| +using System.Threading;
|
| +using System.Threading.Tasks;
|
| +using Grpc.Core.Internal;
|
| +using Grpc.Core.Logging;
|
| +using Grpc.Core.Profiling;
|
| +using Grpc.Core.Utils;
|
| +
|
| +namespace Grpc.Core.Internal
|
| +{
|
| + /// <summary>
|
| + /// Manages client side native call lifecycle.
|
| + /// </summary>
|
| + internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
|
| + {
|
| + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
|
| +
|
| + readonly CallInvocationDetails<TRequest, TResponse> details;
|
| + readonly INativeCall injectedNativeCall; // for testing
|
| +
|
| + // Completion of a pending unary response if not null.
|
| + TaskCompletionSource<TResponse> unaryResponseTcs;
|
| +
|
| + // Indicates that steaming call has finished.
|
| + TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
|
| +
|
| + // Response headers set here once received.
|
| + TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
|
| +
|
| + // Set after status is received. Used for both unary and streaming response calls.
|
| + ClientSideStatus? finishedStatus;
|
| +
|
| + public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
|
| + : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
|
| + {
|
| + this.details = callDetails.WithOptions(callDetails.Options.Normalize());
|
| + this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
|
| + }
|
| +
|
| + /// <summary>
|
| + /// This constructor should only be used for testing.
|
| + /// </summary>
|
| + public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
|
| + {
|
| + this.injectedNativeCall = injectedNativeCall;
|
| + }
|
| +
|
| + // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
|
| + // it is reusing fair amount of code in this class, so we are leaving it here.
|
| + /// <summary>
|
| + /// Blocking unary request - unary response call.
|
| + /// </summary>
|
| + public TResponse UnaryCall(TRequest msg)
|
| + {
|
| + var profiler = Profilers.ForCurrentThread();
|
| +
|
| + using (profiler.NewScope("AsyncCall.UnaryCall"))
|
| + using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
|
| + {
|
| + byte[] payload = UnsafeSerialize(msg);
|
| +
|
| + unaryResponseTcs = new TaskCompletionSource<TResponse>();
|
| +
|
| + lock (myLock)
|
| + {
|
| + GrpcPreconditions.CheckState(!started);
|
| + started = true;
|
| + Initialize(cq);
|
| +
|
| + halfcloseRequested = true;
|
| + readingDone = true;
|
| + }
|
| +
|
| + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
|
| + using (var ctx = BatchContextSafeHandle.Create())
|
| + {
|
| + call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
|
| +
|
| + var ev = cq.Pluck(ctx.Handle);
|
| +
|
| + bool success = (ev.success != 0);
|
| + try
|
| + {
|
| + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
|
| + {
|
| + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
|
| + }
|
| + }
|
| + catch (Exception e)
|
| + {
|
| + Logger.Error(e, "Exception occured while invoking completion delegate.");
|
| + }
|
| + }
|
| +
|
| + // Once the blocking call returns, the result should be available synchronously.
|
| + // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException.
|
| + return unaryResponseTcs.Task.GetAwaiter().GetResult();
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Starts a unary request - unary response call.
|
| + /// </summary>
|
| + public Task<TResponse> UnaryCallAsync(TRequest msg)
|
| + {
|
| + lock (myLock)
|
| + {
|
| + GrpcPreconditions.CheckState(!started);
|
| + started = true;
|
| +
|
| + Initialize(environment.CompletionQueue);
|
| +
|
| + halfcloseRequested = true;
|
| + readingDone = true;
|
| +
|
| + byte[] payload = UnsafeSerialize(msg);
|
| +
|
| + unaryResponseTcs = new TaskCompletionSource<TResponse>();
|
| + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
|
| + {
|
| + call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
|
| + }
|
| + return unaryResponseTcs.Task;
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Starts a streamed request - unary response call.
|
| + /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
|
| + /// </summary>
|
| + public Task<TResponse> ClientStreamingCallAsync()
|
| + {
|
| + lock (myLock)
|
| + {
|
| + GrpcPreconditions.CheckState(!started);
|
| + started = true;
|
| +
|
| + Initialize(environment.CompletionQueue);
|
| +
|
| + readingDone = true;
|
| +
|
| + unaryResponseTcs = new TaskCompletionSource<TResponse>();
|
| + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
|
| + {
|
| + call.StartClientStreaming(HandleUnaryResponse, metadataArray);
|
| + }
|
| +
|
| + return unaryResponseTcs.Task;
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Starts a unary request - streamed response call.
|
| + /// </summary>
|
| + public void StartServerStreamingCall(TRequest msg)
|
| + {
|
| + lock (myLock)
|
| + {
|
| + GrpcPreconditions.CheckState(!started);
|
| + started = true;
|
| +
|
| + Initialize(environment.CompletionQueue);
|
| +
|
| + halfcloseRequested = true;
|
| +
|
| + byte[] payload = UnsafeSerialize(msg);
|
| +
|
| + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
|
| + {
|
| + call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
|
| + }
|
| + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Starts a streaming request - streaming response call.
|
| + /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
|
| + /// </summary>
|
| + public void StartDuplexStreamingCall()
|
| + {
|
| + lock (myLock)
|
| + {
|
| + GrpcPreconditions.CheckState(!started);
|
| + started = true;
|
| +
|
| + Initialize(environment.CompletionQueue);
|
| +
|
| + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
|
| + {
|
| + call.StartDuplexStreaming(HandleFinished, metadataArray);
|
| + }
|
| + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Sends a streaming request. Only one pending send action is allowed at any given time.
|
| + /// completionDelegate is called when the operation finishes.
|
| + /// </summary>
|
| + public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
|
| + {
|
| + StartSendMessageInternal(msg, writeFlags, completionDelegate);
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Receives a streaming response. Only one pending read action is allowed at any given time.
|
| + /// completionDelegate is called when the operation finishes.
|
| + /// </summary>
|
| + public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
|
| + {
|
| + StartReadMessageInternal(completionDelegate);
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Sends halfclose, indicating client is done with streaming requests.
|
| + /// Only one pending send action is allowed at any given time.
|
| + /// completionDelegate is called when the operation finishes.
|
| + /// </summary>
|
| + public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
|
| + {
|
| + lock (myLock)
|
| + {
|
| + GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
|
| + CheckSendingAllowed();
|
| +
|
| + call.StartSendCloseFromClient(HandleHalfclosed);
|
| +
|
| + halfcloseRequested = true;
|
| + sendCompletionDelegate = completionDelegate;
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
|
| + /// </summary>
|
| + public Task StreamingCallFinishedTask
|
| + {
|
| + get
|
| + {
|
| + return streamingCallFinishedTcs.Task;
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Get the task that completes once response headers are received.
|
| + /// </summary>
|
| + public Task<Metadata> ResponseHeadersAsync
|
| + {
|
| + get
|
| + {
|
| + return responseHeadersTcs.Task;
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Gets the resulting status if the call has already finished.
|
| + /// Throws InvalidOperationException otherwise.
|
| + /// </summary>
|
| + public Status GetStatus()
|
| + {
|
| + lock (myLock)
|
| + {
|
| + GrpcPreconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished.");
|
| + return finishedStatus.Value.Status;
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Gets the trailing metadata if the call has already finished.
|
| + /// Throws InvalidOperationException otherwise.
|
| + /// </summary>
|
| + public Metadata GetTrailers()
|
| + {
|
| + lock (myLock)
|
| + {
|
| + GrpcPreconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished.");
|
| + return finishedStatus.Value.Trailers;
|
| + }
|
| + }
|
| +
|
| + public CallInvocationDetails<TRequest, TResponse> Details
|
| + {
|
| + get
|
| + {
|
| + return this.details;
|
| + }
|
| + }
|
| +
|
| + protected override void OnAfterReleaseResources()
|
| + {
|
| + details.Channel.RemoveCallReference(this);
|
| + }
|
| +
|
| + protected override bool IsClient
|
| + {
|
| + get { return true; }
|
| + }
|
| +
|
| + private void Initialize(CompletionQueueSafeHandle cq)
|
| + {
|
| + using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
|
| + {
|
| + var call = CreateNativeCall(cq);
|
| +
|
| + details.Channel.AddCallReference(this);
|
| + InitializeInternal(call);
|
| + RegisterCancellationCallback();
|
| + }
|
| + }
|
| +
|
| + private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
|
| + {
|
| + using (Profilers.ForCurrentThread().NewScope("AsyncCall.CreateNativeCall"))
|
| + {
|
| + if (injectedNativeCall != null)
|
| + {
|
| + return injectedNativeCall; // allows injecting a mock INativeCall in tests.
|
| + }
|
| +
|
| + var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
|
| +
|
| + var credentials = details.Options.Credentials;
|
| + using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
|
| + {
|
| + var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
|
| + parentCall, ContextPropagationToken.DefaultMask, cq,
|
| + details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
|
| + return result;
|
| + }
|
| + }
|
| + }
|
| +
|
| + // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
|
| + private void RegisterCancellationCallback()
|
| + {
|
| + var token = details.Options.CancellationToken;
|
| + if (token.CanBeCanceled)
|
| + {
|
| + token.Register(() => this.Cancel());
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Gets WriteFlags set in callDetails.Options.WriteOptions
|
| + /// </summary>
|
| + private WriteFlags GetWriteFlagsForCall()
|
| + {
|
| + var writeOptions = details.Options.WriteOptions;
|
| + return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Handles receive status completion for calls with streaming response.
|
| + /// </summary>
|
| + private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
|
| + {
|
| + responseHeadersTcs.SetResult(responseHeaders);
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Handler for unary response completion.
|
| + /// </summary>
|
| + private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
|
| + {
|
| + using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
|
| + {
|
| + TResponse msg = default(TResponse);
|
| + var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null;
|
| +
|
| + lock (myLock)
|
| + {
|
| + finished = true;
|
| +
|
| + if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
|
| + {
|
| + receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
|
| + }
|
| + finishedStatus = receivedStatus;
|
| +
|
| + ReleaseResourcesIfPossible();
|
| +
|
| + }
|
| +
|
| + responseHeadersTcs.SetResult(responseHeaders);
|
| +
|
| + var status = receivedStatus.Status;
|
| +
|
| + if (!success || status.StatusCode != StatusCode.OK)
|
| + {
|
| + unaryResponseTcs.SetException(new RpcException(status));
|
| + return;
|
| + }
|
| +
|
| + unaryResponseTcs.SetResult(msg);
|
| + }
|
| + }
|
| +
|
| + /// <summary>
|
| + /// Handles receive status completion for calls with streaming response.
|
| + /// </summary>
|
| + private void HandleFinished(bool success, ClientSideStatus receivedStatus)
|
| + {
|
| + lock (myLock)
|
| + {
|
| + finished = true;
|
| + finishedStatus = receivedStatus;
|
| +
|
| + ReleaseResourcesIfPossible();
|
| + }
|
| +
|
| + var status = receivedStatus.Status;
|
| +
|
| + if (!success || status.StatusCode != StatusCode.OK)
|
| + {
|
| + streamingCallFinishedTcs.SetException(new RpcException(status));
|
| + return;
|
| + }
|
| +
|
| + streamingCallFinishedTcs.SetResult(null);
|
| + }
|
| + }
|
| +}
|
|
|