OLD | NEW |
(Empty) | |
| 1 #region Copyright notice and license |
| 2 |
| 3 // Copyright 2015-2016, Google Inc. |
| 4 // All rights reserved. |
| 5 // |
| 6 // Redistribution and use in source and binary forms, with or without |
| 7 // modification, are permitted provided that the following conditions are |
| 8 // met: |
| 9 // |
| 10 // * Redistributions of source code must retain the above copyright |
| 11 // notice, this list of conditions and the following disclaimer. |
| 12 // * Redistributions in binary form must reproduce the above |
| 13 // copyright notice, this list of conditions and the following disclaimer |
| 14 // in the documentation and/or other materials provided with the |
| 15 // distribution. |
| 16 // * Neither the name of Google Inc. nor the names of its |
| 17 // contributors may be used to endorse or promote products derived from |
| 18 // this software without specific prior written permission. |
| 19 // |
| 20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 |
| 32 #endregion |
| 33 |
| 34 using System; |
| 35 using System.Collections.Generic; |
| 36 using System.Linq; |
| 37 using System.Threading; |
| 38 using System.Threading.Tasks; |
| 39 using Grpc.Core.Internal; |
| 40 using Grpc.Core.Logging; |
| 41 using Grpc.Core.Utils; |
| 42 |
| 43 namespace Grpc.Core.Internal |
| 44 { |
| 45 internal interface IServerCallHandler |
| 46 { |
| 47 Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment); |
| 48 } |
| 49 |
| 50 internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHand
ler |
| 51 where TRequest : class |
| 52 where TResponse : class |
| 53 { |
| 54 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<UnarySer
verCallHandler<TRequest, TResponse>>(); |
| 55 |
| 56 readonly Method<TRequest, TResponse> method; |
| 57 readonly UnaryServerMethod<TRequest, TResponse> handler; |
| 58 |
| 59 public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryS
erverMethod<TRequest, TResponse> handler) |
| 60 { |
| 61 this.method = method; |
| 62 this.handler = handler; |
| 63 } |
| 64 |
| 65 public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment enviro
nment) |
| 66 { |
| 67 var asyncCall = new AsyncCallServer<TRequest, TResponse>( |
| 68 method.ResponseMarshaller.Serializer, |
| 69 method.RequestMarshaller.Deserializer, |
| 70 environment, newRpc.Server); |
| 71 |
| 72 asyncCall.Initialize(newRpc.Call); |
| 73 var finishedTask = asyncCall.ServerSideCallAsync(); |
| 74 var requestStream = new ServerRequestStream<TRequest, TResponse>(asy
ncCall); |
| 75 var responseStream = new ServerResponseStream<TRequest, TResponse>(a
syncCall); |
| 76 |
| 77 Status status; |
| 78 var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, respon
seStream, asyncCall.CancellationToken); |
| 79 try |
| 80 { |
| 81 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().C
onfigureAwait(false)); |
| 82 var request = requestStream.Current; |
| 83 // TODO(jtattermusch): we need to read the full stream so that n
ative callhandle gets deallocated. |
| 84 GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().
ConfigureAwait(false)); |
| 85 var result = await handler(request, context).ConfigureAwait(fals
e); |
| 86 status = context.Status; |
| 87 await responseStream.WriteAsync(result).ConfigureAwait(false); |
| 88 } |
| 89 catch (Exception e) |
| 90 { |
| 91 Logger.Error(e, "Exception occured in handler."); |
| 92 status = HandlerUtils.StatusFromException(e); |
| 93 } |
| 94 try |
| 95 { |
| 96 await responseStream.WriteStatusAsync(status, context.ResponseTr
ailers).ConfigureAwait(false); |
| 97 } |
| 98 catch (OperationCanceledException) |
| 99 { |
| 100 // Call has been already cancelled. |
| 101 } |
| 102 await finishedTask.ConfigureAwait(false); |
| 103 } |
| 104 } |
| 105 |
| 106 internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServ
erCallHandler |
| 107 where TRequest : class |
| 108 where TResponse : class |
| 109 { |
| 110 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerSt
reamingServerCallHandler<TRequest, TResponse>>(); |
| 111 |
| 112 readonly Method<TRequest, TResponse> method; |
| 113 readonly ServerStreamingServerMethod<TRequest, TResponse> handler; |
| 114 |
| 115 public ServerStreamingServerCallHandler(Method<TRequest, TResponse> meth
od, ServerStreamingServerMethod<TRequest, TResponse> handler) |
| 116 { |
| 117 this.method = method; |
| 118 this.handler = handler; |
| 119 } |
| 120 |
| 121 public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment enviro
nment) |
| 122 { |
| 123 var asyncCall = new AsyncCallServer<TRequest, TResponse>( |
| 124 method.ResponseMarshaller.Serializer, |
| 125 method.RequestMarshaller.Deserializer, |
| 126 environment, newRpc.Server); |
| 127 |
| 128 asyncCall.Initialize(newRpc.Call); |
| 129 var finishedTask = asyncCall.ServerSideCallAsync(); |
| 130 var requestStream = new ServerRequestStream<TRequest, TResponse>(asy
ncCall); |
| 131 var responseStream = new ServerResponseStream<TRequest, TResponse>(a
syncCall); |
| 132 |
| 133 Status status; |
| 134 var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, respon
seStream, asyncCall.CancellationToken); |
| 135 try |
| 136 { |
| 137 GrpcPreconditions.CheckArgument(await requestStream.MoveNext().C
onfigureAwait(false)); |
| 138 var request = requestStream.Current; |
| 139 // TODO(jtattermusch): we need to read the full stream so that n
ative callhandle gets deallocated. |
| 140 GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().
ConfigureAwait(false)); |
| 141 await handler(request, responseStream, context).ConfigureAwait(f
alse); |
| 142 status = context.Status; |
| 143 } |
| 144 catch (Exception e) |
| 145 { |
| 146 Logger.Error(e, "Exception occured in handler."); |
| 147 status = HandlerUtils.StatusFromException(e); |
| 148 } |
| 149 |
| 150 try |
| 151 { |
| 152 await responseStream.WriteStatusAsync(status, context.ResponseTr
ailers).ConfigureAwait(false); |
| 153 } |
| 154 catch (OperationCanceledException) |
| 155 { |
| 156 // Call has been already cancelled. |
| 157 } |
| 158 await finishedTask.ConfigureAwait(false); |
| 159 } |
| 160 } |
| 161 |
| 162 internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServ
erCallHandler |
| 163 where TRequest : class |
| 164 where TResponse : class |
| 165 { |
| 166 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientSt
reamingServerCallHandler<TRequest, TResponse>>(); |
| 167 |
| 168 readonly Method<TRequest, TResponse> method; |
| 169 readonly ClientStreamingServerMethod<TRequest, TResponse> handler; |
| 170 |
| 171 public ClientStreamingServerCallHandler(Method<TRequest, TResponse> meth
od, ClientStreamingServerMethod<TRequest, TResponse> handler) |
| 172 { |
| 173 this.method = method; |
| 174 this.handler = handler; |
| 175 } |
| 176 |
| 177 public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment enviro
nment) |
| 178 { |
| 179 var asyncCall = new AsyncCallServer<TRequest, TResponse>( |
| 180 method.ResponseMarshaller.Serializer, |
| 181 method.RequestMarshaller.Deserializer, |
| 182 environment, newRpc.Server); |
| 183 |
| 184 asyncCall.Initialize(newRpc.Call); |
| 185 var finishedTask = asyncCall.ServerSideCallAsync(); |
| 186 var requestStream = new ServerRequestStream<TRequest, TResponse>(asy
ncCall); |
| 187 var responseStream = new ServerResponseStream<TRequest, TResponse>(a
syncCall); |
| 188 |
| 189 Status status; |
| 190 var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, respon
seStream, asyncCall.CancellationToken); |
| 191 try |
| 192 { |
| 193 var result = await handler(requestStream, context).ConfigureAwai
t(false); |
| 194 status = context.Status; |
| 195 try |
| 196 { |
| 197 await responseStream.WriteAsync(result).ConfigureAwait(false
); |
| 198 } |
| 199 catch (OperationCanceledException) |
| 200 { |
| 201 status = Status.DefaultCancelled; |
| 202 } |
| 203 } |
| 204 catch (Exception e) |
| 205 { |
| 206 Logger.Error(e, "Exception occured in handler."); |
| 207 status = HandlerUtils.StatusFromException(e); |
| 208 } |
| 209 |
| 210 try |
| 211 { |
| 212 await responseStream.WriteStatusAsync(status, context.ResponseTr
ailers).ConfigureAwait(false); |
| 213 } |
| 214 catch (OperationCanceledException) |
| 215 { |
| 216 // Call has been already cancelled. |
| 217 } |
| 218 await finishedTask.ConfigureAwait(false); |
| 219 } |
| 220 } |
| 221 |
| 222 internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServ
erCallHandler |
| 223 where TRequest : class |
| 224 where TResponse : class |
| 225 { |
| 226 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<DuplexSt
reamingServerCallHandler<TRequest, TResponse>>(); |
| 227 |
| 228 readonly Method<TRequest, TResponse> method; |
| 229 readonly DuplexStreamingServerMethod<TRequest, TResponse> handler; |
| 230 |
| 231 public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> meth
od, DuplexStreamingServerMethod<TRequest, TResponse> handler) |
| 232 { |
| 233 this.method = method; |
| 234 this.handler = handler; |
| 235 } |
| 236 |
| 237 public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment enviro
nment) |
| 238 { |
| 239 var asyncCall = new AsyncCallServer<TRequest, TResponse>( |
| 240 method.ResponseMarshaller.Serializer, |
| 241 method.RequestMarshaller.Deserializer, |
| 242 environment, newRpc.Server); |
| 243 |
| 244 asyncCall.Initialize(newRpc.Call); |
| 245 var finishedTask = asyncCall.ServerSideCallAsync(); |
| 246 var requestStream = new ServerRequestStream<TRequest, TResponse>(asy
ncCall); |
| 247 var responseStream = new ServerResponseStream<TRequest, TResponse>(a
syncCall); |
| 248 |
| 249 Status status; |
| 250 var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, respon
seStream, asyncCall.CancellationToken); |
| 251 try |
| 252 { |
| 253 await handler(requestStream, responseStream, context).ConfigureA
wait(false); |
| 254 status = context.Status; |
| 255 } |
| 256 catch (Exception e) |
| 257 { |
| 258 Logger.Error(e, "Exception occured in handler."); |
| 259 status = HandlerUtils.StatusFromException(e); |
| 260 } |
| 261 try |
| 262 { |
| 263 await responseStream.WriteStatusAsync(status, context.ResponseTr
ailers).ConfigureAwait(false); |
| 264 } |
| 265 catch (OperationCanceledException) |
| 266 { |
| 267 // Call has been already cancelled. |
| 268 } |
| 269 await finishedTask.ConfigureAwait(false); |
| 270 } |
| 271 } |
| 272 |
| 273 internal class NoSuchMethodCallHandler : IServerCallHandler |
| 274 { |
| 275 public static readonly NoSuchMethodCallHandler Instance = new NoSuchMeth
odCallHandler(); |
| 276 |
| 277 public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment enviro
nment) |
| 278 { |
| 279 // We don't care about the payload type here. |
| 280 var asyncCall = new AsyncCallServer<byte[], byte[]>( |
| 281 (payload) => payload, (payload) => payload, environment, newRpc.
Server); |
| 282 |
| 283 asyncCall.Initialize(newRpc.Call); |
| 284 var finishedTask = asyncCall.ServerSideCallAsync(); |
| 285 var responseStream = new ServerResponseStream<byte[], byte[]>(asyncC
all); |
| 286 |
| 287 await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplem
ented, ""), Metadata.Empty).ConfigureAwait(false); |
| 288 await finishedTask.ConfigureAwait(false); |
| 289 } |
| 290 } |
| 291 |
| 292 internal static class HandlerUtils |
| 293 { |
| 294 public static Status StatusFromException(Exception e) |
| 295 { |
| 296 var rpcException = e as RpcException; |
| 297 if (rpcException != null) |
| 298 { |
| 299 // use the status thrown by handler. |
| 300 return rpcException.Status; |
| 301 } |
| 302 |
| 303 // TODO(jtattermusch): what is the right status code here? |
| 304 return new Status(StatusCode.Unknown, "Exception was thrown by handl
er."); |
| 305 } |
| 306 |
| 307 public static ServerCallContext NewContext<TRequest, TResponse>(ServerRp
cNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverRespon
seStream, CancellationToken cancellationToken) |
| 308 where TRequest : class |
| 309 where TResponse : class |
| 310 { |
| 311 DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType
.Realtime).ToDateTime(); |
| 312 |
| 313 return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host
, peer, realtimeDeadline, |
| 314 newRpc.RequestMetadata, cancellationToken, serverResponseStream.
WriteResponseHeadersAsync, serverResponseStream); |
| 315 } |
| 316 } |
| 317 } |
OLD | NEW |