Index: third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs |
diff --git a/third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs |
new file mode 100644 |
index 0000000000000000000000000000000000000000..45d4c3e078c4102996dbaae693a7c10c20457efd |
--- /dev/null |
+++ b/third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs |
@@ -0,0 +1,359 @@ |
+#region Copyright notice and license |
+ |
+// Copyright 2015-2016, Google Inc. |
+// All rights reserved. |
+// |
+// Redistribution and use in source and binary forms, with or without |
+// modification, are permitted provided that the following conditions are |
+// met: |
+// |
+// * Redistributions of source code must retain the above copyright |
+// notice, this list of conditions and the following disclaimer. |
+// * Redistributions in binary form must reproduce the above |
+// copyright notice, this list of conditions and the following disclaimer |
+// in the documentation and/or other materials provided with the |
+// distribution. |
+// * Neither the name of Google Inc. nor the names of its |
+// contributors may be used to endorse or promote products derived from |
+// this software without specific prior written permission. |
+// |
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+#endregion |
+ |
+using System; |
+using System.Diagnostics; |
+using System.IO; |
+using System.Runtime.CompilerServices; |
+using System.Runtime.InteropServices; |
+using System.Threading; |
+using System.Threading.Tasks; |
+ |
+using Grpc.Core.Internal; |
+using Grpc.Core.Logging; |
+using Grpc.Core.Profiling; |
+using Grpc.Core.Utils; |
+ |
+namespace Grpc.Core.Internal |
+{ |
+ /// <summary> |
+ /// Base for handling both client side and server side calls. |
+ /// Manages native call lifecycle and provides convenience methods. |
+ /// </summary> |
+ internal abstract class AsyncCallBase<TWrite, TRead> |
+ { |
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>(); |
+ protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); |
+ |
+ readonly Func<TWrite, byte[]> serializer; |
+ readonly Func<byte[], TRead> deserializer; |
+ |
+ protected readonly GrpcEnvironment environment; |
+ protected readonly object myLock = new object(); |
+ |
+ protected INativeCall call; |
+ protected bool disposed; |
+ |
+ protected bool started; |
+ protected bool cancelRequested; |
+ |
+ protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. |
+ protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null. |
+ |
+ protected bool readingDone; // True if last read (i.e. read with null payload) was already received. |
+ protected bool halfcloseRequested; // True if send close have been initiated. |
+ protected bool finished; // True if close has been received from the peer. |
+ |
+ protected bool initialMetadataSent; |
+ protected long streamingWritesCounter; // Number of streaming send operations started so far. |
+ |
+ public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) |
+ { |
+ this.serializer = GrpcPreconditions.CheckNotNull(serializer); |
+ this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); |
+ this.environment = GrpcPreconditions.CheckNotNull(environment); |
+ } |
+ |
+ /// <summary> |
+ /// Requests cancelling the call. |
+ /// </summary> |
+ public void Cancel() |
+ { |
+ lock (myLock) |
+ { |
+ GrpcPreconditions.CheckState(started); |
+ cancelRequested = true; |
+ |
+ if (!disposed) |
+ { |
+ call.Cancel(); |
+ } |
+ } |
+ } |
+ |
+ /// <summary> |
+ /// Requests cancelling the call with given status. |
+ /// </summary> |
+ protected void CancelWithStatus(Status status) |
+ { |
+ lock (myLock) |
+ { |
+ cancelRequested = true; |
+ |
+ if (!disposed) |
+ { |
+ call.CancelWithStatus(status); |
+ } |
+ } |
+ } |
+ |
+ protected void InitializeInternal(INativeCall call) |
+ { |
+ lock (myLock) |
+ { |
+ this.call = call; |
+ } |
+ } |
+ |
+ /// <summary> |
+ /// Initiates sending a message. Only one send operation can be active at a time. |
+ /// completionDelegate is invoked upon completion. |
+ /// </summary> |
+ protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) |
+ { |
+ byte[] payload = UnsafeSerialize(msg); |
+ |
+ lock (myLock) |
+ { |
+ GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); |
+ CheckSendingAllowed(); |
+ |
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); |
+ |
+ sendCompletionDelegate = completionDelegate; |
+ initialMetadataSent = true; |
+ streamingWritesCounter++; |
+ } |
+ } |
+ |
+ /// <summary> |
+ /// Initiates reading a message. Only one read operation can be active at a time. |
+ /// completionDelegate is invoked upon completion. |
+ /// </summary> |
+ protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate) |
+ { |
+ lock (myLock) |
+ { |
+ GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); |
+ CheckReadingAllowed(); |
+ |
+ call.StartReceiveMessage(HandleReadFinished); |
+ readCompletionDelegate = completionDelegate; |
+ } |
+ } |
+ |
+ /// <summary> |
+ /// If there are no more pending actions and no new actions can be started, releases |
+ /// the underlying native resources. |
+ /// </summary> |
+ protected bool ReleaseResourcesIfPossible() |
+ { |
+ using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseResourcesIfPossible")) |
+ { |
+ if (!disposed && call != null) |
+ { |
+ bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); |
+ if (noMoreSendCompletions && readingDone && finished) |
+ { |
+ ReleaseResources(); |
+ return true; |
+ } |
+ } |
+ return false; |
+ } |
+ } |
+ |
+ protected abstract bool IsClient |
+ { |
+ get; |
+ } |
+ |
+ private void ReleaseResources() |
+ { |
+ if (call != null) |
+ { |
+ call.Dispose(); |
+ } |
+ disposed = true; |
+ OnAfterReleaseResources(); |
+ } |
+ |
+ protected virtual void OnAfterReleaseResources() |
+ { |
+ } |
+ |
+ protected void CheckSendingAllowed() |
+ { |
+ GrpcPreconditions.CheckState(started); |
+ CheckNotCancelled(); |
+ GrpcPreconditions.CheckState(!disposed); |
+ |
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed."); |
+ GrpcPreconditions.CheckState(!finished, "Already finished."); |
+ GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); |
+ } |
+ |
+ protected virtual void CheckReadingAllowed() |
+ { |
+ GrpcPreconditions.CheckState(started); |
+ GrpcPreconditions.CheckState(!disposed); |
+ |
+ GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed."); |
+ GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); |
+ } |
+ |
+ protected void CheckNotCancelled() |
+ { |
+ if (cancelRequested) |
+ { |
+ throw new OperationCanceledException("Remote call has been cancelled."); |
+ } |
+ } |
+ |
+ protected byte[] UnsafeSerialize(TWrite msg) |
+ { |
+ using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSerialize")) |
+ { |
+ return serializer(msg); |
+ } |
+ } |
+ |
+ protected Exception TryDeserialize(byte[] payload, out TRead msg) |
+ { |
+ using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeserialize")) |
+ { |
+ try |
+ { |
+ |
+ msg = deserializer(payload); |
+ return null; |
+ |
+ } |
+ catch (Exception e) |
+ { |
+ msg = default(TRead); |
+ return e; |
+ } |
+ } |
+ } |
+ |
+ protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error) |
+ { |
+ try |
+ { |
+ completionDelegate(value, error); |
+ } |
+ catch (Exception e) |
+ { |
+ Logger.Error(e, "Exception occured while invoking completion delegate."); |
+ } |
+ } |
+ |
+ /// <summary> |
+ /// Handles send completion. |
+ /// </summary> |
+ protected void HandleSendFinished(bool success) |
+ { |
+ AsyncCompletionDelegate<object> origCompletionDelegate = null; |
+ lock (myLock) |
+ { |
+ origCompletionDelegate = sendCompletionDelegate; |
+ sendCompletionDelegate = null; |
+ |
+ ReleaseResourcesIfPossible(); |
+ } |
+ |
+ if (!success) |
+ { |
+ FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed")); |
+ } |
+ else |
+ { |
+ FireCompletion(origCompletionDelegate, null, null); |
+ } |
+ } |
+ |
+ /// <summary> |
+ /// Handles halfclose completion. |
+ /// </summary> |
+ protected void HandleHalfclosed(bool success) |
+ { |
+ AsyncCompletionDelegate<object> origCompletionDelegate = null; |
+ lock (myLock) |
+ { |
+ origCompletionDelegate = sendCompletionDelegate; |
+ sendCompletionDelegate = null; |
+ |
+ ReleaseResourcesIfPossible(); |
+ } |
+ |
+ if (!success) |
+ { |
+ FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Halfclose failed")); |
+ } |
+ else |
+ { |
+ FireCompletion(origCompletionDelegate, null, null); |
+ } |
+ } |
+ |
+ /// <summary> |
+ /// Handles streaming read completion. |
+ /// </summary> |
+ protected void HandleReadFinished(bool success, byte[] receivedMessage) |
+ { |
+ TRead msg = default(TRead); |
+ var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; |
+ |
+ AsyncCompletionDelegate<TRead> origCompletionDelegate = null; |
+ lock (myLock) |
+ { |
+ origCompletionDelegate = readCompletionDelegate; |
+ readCompletionDelegate = null; |
+ |
+ if (receivedMessage == null) |
+ { |
+ // This was the last read. |
+ readingDone = true; |
+ } |
+ |
+ if (deserializeException != null && IsClient) |
+ { |
+ readingDone = true; |
+ CancelWithStatus(DeserializeResponseFailureStatus); |
+ } |
+ |
+ ReleaseResourcesIfPossible(); |
+ } |
+ |
+ // TODO: handle the case when success==false |
+ |
+ if (deserializeException != null && !IsClient) |
+ { |
+ FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException)); |
+ return; |
+ } |
+ FireCompletion(origCompletionDelegate, msg, null); |
+ } |
+ } |
+} |