Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Unified Diff: third_party/grpc/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
diff --git a/third_party/grpc/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/third_party/grpc/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
new file mode 100644
index 0000000000000000000000000000000000000000..ccf144de2def6e31819a88331f08c25998ad1459
--- /dev/null
+++ b/third_party/grpc/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -0,0 +1,317 @@
+#region Copyright notice and license
+
+// Copyright 2015-2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+using Grpc.Core.Logging;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ internal interface IServerCallHandler
+ {
+ Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
+ }
+
+ internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ where TRequest : class
+ where TResponse : class
+ {
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<UnaryServerCallHandler<TRequest, TResponse>>();
+
+ readonly Method<TRequest, TResponse> method;
+ readonly UnaryServerMethod<TRequest, TResponse> handler;
+
+ public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)
+ {
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ {
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
+ method.ResponseMarshaller.Serializer,
+ method.RequestMarshaller.Deserializer,
+ environment, newRpc.Server);
+
+ asyncCall.Initialize(newRpc.Call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
+ var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
+
+ Status status;
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
+ try
+ {
+ GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
+ var request = requestStream.Current;
+ // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
+ GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
+ var result = await handler(request, context).ConfigureAwait(false);
+ status = context.Status;
+ await responseStream.WriteAsync(result).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured in handler.");
+ status = HandlerUtils.StatusFromException(e);
+ }
+ try
+ {
+ await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
+ await finishedTask.ConfigureAwait(false);
+ }
+ }
+
+ internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ where TRequest : class
+ where TResponse : class
+ {
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerStreamingServerCallHandler<TRequest, TResponse>>();
+
+ readonly Method<TRequest, TResponse> method;
+ readonly ServerStreamingServerMethod<TRequest, TResponse> handler;
+
+ public ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)
+ {
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ {
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
+ method.ResponseMarshaller.Serializer,
+ method.RequestMarshaller.Deserializer,
+ environment, newRpc.Server);
+
+ asyncCall.Initialize(newRpc.Call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
+ var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
+
+ Status status;
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
+ try
+ {
+ GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
+ var request = requestStream.Current;
+ // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
+ GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
+ await handler(request, responseStream, context).ConfigureAwait(false);
+ status = context.Status;
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured in handler.");
+ status = HandlerUtils.StatusFromException(e);
+ }
+
+ try
+ {
+ await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
+ await finishedTask.ConfigureAwait(false);
+ }
+ }
+
+ internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ where TRequest : class
+ where TResponse : class
+ {
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientStreamingServerCallHandler<TRequest, TResponse>>();
+
+ readonly Method<TRequest, TResponse> method;
+ readonly ClientStreamingServerMethod<TRequest, TResponse> handler;
+
+ public ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)
+ {
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ {
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
+ method.ResponseMarshaller.Serializer,
+ method.RequestMarshaller.Deserializer,
+ environment, newRpc.Server);
+
+ asyncCall.Initialize(newRpc.Call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
+ var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
+
+ Status status;
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
+ try
+ {
+ var result = await handler(requestStream, context).ConfigureAwait(false);
+ status = context.Status;
+ try
+ {
+ await responseStream.WriteAsync(result).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ status = Status.DefaultCancelled;
+ }
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured in handler.");
+ status = HandlerUtils.StatusFromException(e);
+ }
+
+ try
+ {
+ await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
+ await finishedTask.ConfigureAwait(false);
+ }
+ }
+
+ internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ where TRequest : class
+ where TResponse : class
+ {
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<DuplexStreamingServerCallHandler<TRequest, TResponse>>();
+
+ readonly Method<TRequest, TResponse> method;
+ readonly DuplexStreamingServerMethod<TRequest, TResponse> handler;
+
+ public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)
+ {
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ {
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
+ method.ResponseMarshaller.Serializer,
+ method.RequestMarshaller.Deserializer,
+ environment, newRpc.Server);
+
+ asyncCall.Initialize(newRpc.Call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
+ var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
+
+ Status status;
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
+ try
+ {
+ await handler(requestStream, responseStream, context).ConfigureAwait(false);
+ status = context.Status;
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured in handler.");
+ status = HandlerUtils.StatusFromException(e);
+ }
+ try
+ {
+ await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
+ await finishedTask.ConfigureAwait(false);
+ }
+ }
+
+ internal class NoSuchMethodCallHandler : IServerCallHandler
+ {
+ public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
+
+ public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ {
+ // We don't care about the payload type here.
+ var asyncCall = new AsyncCallServer<byte[], byte[]>(
+ (payload) => payload, (payload) => payload, environment, newRpc.Server);
+
+ asyncCall.Initialize(newRpc.Call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
+
+ await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
+ await finishedTask.ConfigureAwait(false);
+ }
+ }
+
+ internal static class HandlerUtils
+ {
+ public static Status StatusFromException(Exception e)
+ {
+ var rpcException = e as RpcException;
+ if (rpcException != null)
+ {
+ // use the status thrown by handler.
+ return rpcException.Status;
+ }
+
+ // TODO(jtattermusch): what is the right status code here?
+ return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
+ }
+
+ public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
+ where TRequest : class
+ where TResponse : class
+ {
+ DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
+
+ return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
+ newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
+ }
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698