Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(487)

Side by Side Diff: third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCall.cs

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #region Copyright notice and license
2
3 // Copyright 2015-2016, Google Inc.
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are
8 // met:
9 //
10 // * Redistributions of source code must retain the above copyright
11 // notice, this list of conditions and the following disclaimer.
12 // * Redistributions in binary form must reproduce the above
13 // copyright notice, this list of conditions and the following disclaimer
14 // in the documentation and/or other materials provided with the
15 // distribution.
16 // * Neither the name of Google Inc. nor the names of its
17 // contributors may be used to endorse or promote products derived from
18 // this software without specific prior written permission.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32 #endregion
33
34 using System;
35 using System.Diagnostics;
36 using System.Runtime.CompilerServices;
37 using System.Runtime.InteropServices;
38 using System.Threading;
39 using System.Threading.Tasks;
40 using Grpc.Core.Internal;
41 using Grpc.Core.Logging;
42 using Grpc.Core.Profiling;
43 using Grpc.Core.Utils;
44
45 namespace Grpc.Core.Internal
46 {
47 /// <summary>
48 /// Manages client side native call lifecycle.
49 /// </summary>
50 internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TRes ponse>
51 {
52 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCal l<TRequest, TResponse>>();
53
54 readonly CallInvocationDetails<TRequest, TResponse> details;
55 readonly INativeCall injectedNativeCall; // for testing
56
57 // Completion of a pending unary response if not null.
58 TaskCompletionSource<TResponse> unaryResponseTcs;
59
60 // Indicates that steaming call has finished.
61 TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompleti onSource<object>();
62
63 // Response headers set here once received.
64 TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSo urce<Metadata>();
65
66 // Set after status is received. Used for both unary and streaming respo nse calls.
67 ClientSideStatus? finishedStatus;
68
69 public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
70 : base(callDetails.RequestMarshaller.Serializer, callDetails.Respons eMarshaller.Deserializer, callDetails.Channel.Environment)
71 {
72 this.details = callDetails.WithOptions(callDetails.Options.Normalize ());
73 this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
74 }
75
76 /// <summary>
77 /// This constructor should only be used for testing.
78 /// </summary>
79 public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
80 {
81 this.injectedNativeCall = injectedNativeCall;
82 }
83
84 // TODO: this method is not Async, so it shouldn't be in AsyncCall class , but
85 // it is reusing fair amount of code in this class, so we are leaving it here.
86 /// <summary>
87 /// Blocking unary request - unary response call.
88 /// </summary>
89 public TResponse UnaryCall(TRequest msg)
90 {
91 var profiler = Profilers.ForCurrentThread();
92
93 using (profiler.NewScope("AsyncCall.UnaryCall"))
94 using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Crea te())
95 {
96 byte[] payload = UnsafeSerialize(msg);
97
98 unaryResponseTcs = new TaskCompletionSource<TResponse>();
99
100 lock (myLock)
101 {
102 GrpcPreconditions.CheckState(!started);
103 started = true;
104 Initialize(cq);
105
106 halfcloseRequested = true;
107 readingDone = true;
108 }
109
110 using (var metadataArray = MetadataArraySafeHandle.Create(detail s.Options.Headers))
111 using (var ctx = BatchContextSafeHandle.Create())
112 {
113 call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsFo rCall());
114
115 var ev = cq.Pluck(ctx.Handle);
116
117 bool success = (ev.success != 0);
118 try
119 {
120 using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatc h"))
121 {
122 HandleUnaryResponse(success, ctx.GetReceivedStatusOn Client(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
123 }
124 }
125 catch (Exception e)
126 {
127 Logger.Error(e, "Exception occured while invoking comple tion delegate.");
128 }
129 }
130
131 // Once the blocking call returns, the result should be availabl e synchronously.
132 // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException.
133 return unaryResponseTcs.Task.GetAwaiter().GetResult();
134 }
135 }
136
137 /// <summary>
138 /// Starts a unary request - unary response call.
139 /// </summary>
140 public Task<TResponse> UnaryCallAsync(TRequest msg)
141 {
142 lock (myLock)
143 {
144 GrpcPreconditions.CheckState(!started);
145 started = true;
146
147 Initialize(environment.CompletionQueue);
148
149 halfcloseRequested = true;
150 readingDone = true;
151
152 byte[] payload = UnsafeSerialize(msg);
153
154 unaryResponseTcs = new TaskCompletionSource<TResponse>();
155 using (var metadataArray = MetadataArraySafeHandle.Create(detail s.Options.Headers))
156 {
157 call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
158 }
159 return unaryResponseTcs.Task;
160 }
161 }
162
163 /// <summary>
164 /// Starts a streamed request - unary response call.
165 /// Use StartSendMessage and StartSendCloseFromClient to stream requests .
166 /// </summary>
167 public Task<TResponse> ClientStreamingCallAsync()
168 {
169 lock (myLock)
170 {
171 GrpcPreconditions.CheckState(!started);
172 started = true;
173
174 Initialize(environment.CompletionQueue);
175
176 readingDone = true;
177
178 unaryResponseTcs = new TaskCompletionSource<TResponse>();
179 using (var metadataArray = MetadataArraySafeHandle.Create(detail s.Options.Headers))
180 {
181 call.StartClientStreaming(HandleUnaryResponse, metadataArray );
182 }
183
184 return unaryResponseTcs.Task;
185 }
186 }
187
188 /// <summary>
189 /// Starts a unary request - streamed response call.
190 /// </summary>
191 public void StartServerStreamingCall(TRequest msg)
192 {
193 lock (myLock)
194 {
195 GrpcPreconditions.CheckState(!started);
196 started = true;
197
198 Initialize(environment.CompletionQueue);
199
200 halfcloseRequested = true;
201
202 byte[] payload = UnsafeSerialize(msg);
203
204 using (var metadataArray = MetadataArraySafeHandle.Create(detail s.Options.Headers))
205 {
206 call.StartServerStreaming(HandleFinished, payload, metadataA rray, GetWriteFlagsForCall());
207 }
208 call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
209 }
210 }
211
212 /// <summary>
213 /// Starts a streaming request - streaming response call.
214 /// Use StartSendMessage and StartSendCloseFromClient to stream requests .
215 /// </summary>
216 public void StartDuplexStreamingCall()
217 {
218 lock (myLock)
219 {
220 GrpcPreconditions.CheckState(!started);
221 started = true;
222
223 Initialize(environment.CompletionQueue);
224
225 using (var metadataArray = MetadataArraySafeHandle.Create(detail s.Options.Headers))
226 {
227 call.StartDuplexStreaming(HandleFinished, metadataArray);
228 }
229 call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
230 }
231 }
232
233 /// <summary>
234 /// Sends a streaming request. Only one pending send action is allowed a t any given time.
235 /// completionDelegate is called when the operation finishes.
236 /// </summary>
237 public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncC ompletionDelegate<object> completionDelegate)
238 {
239 StartSendMessageInternal(msg, writeFlags, completionDelegate);
240 }
241
242 /// <summary>
243 /// Receives a streaming response. Only one pending read action is allow ed at any given time.
244 /// completionDelegate is called when the operation finishes.
245 /// </summary>
246 public void StartReadMessage(AsyncCompletionDelegate<TResponse> completi onDelegate)
247 {
248 StartReadMessageInternal(completionDelegate);
249 }
250
251 /// <summary>
252 /// Sends halfclose, indicating client is done with streaming requests.
253 /// Only one pending send action is allowed at any given time.
254 /// completionDelegate is called when the operation finishes.
255 /// </summary>
256 public void StartSendCloseFromClient(AsyncCompletionDelegate<object> com pletionDelegate)
257 {
258 lock (myLock)
259 {
260 GrpcPreconditions.CheckNotNull(completionDelegate, "Completion d elegate cannot be null");
261 CheckSendingAllowed();
262
263 call.StartSendCloseFromClient(HandleHalfclosed);
264
265 halfcloseRequested = true;
266 sendCompletionDelegate = completionDelegate;
267 }
268 }
269
270 /// <summary>
271 /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
272 /// </summary>
273 public Task StreamingCallFinishedTask
274 {
275 get
276 {
277 return streamingCallFinishedTcs.Task;
278 }
279 }
280
281 /// <summary>
282 /// Get the task that completes once response headers are received.
283 /// </summary>
284 public Task<Metadata> ResponseHeadersAsync
285 {
286 get
287 {
288 return responseHeadersTcs.Task;
289 }
290 }
291
292 /// <summary>
293 /// Gets the resulting status if the call has already finished.
294 /// Throws InvalidOperationException otherwise.
295 /// </summary>
296 public Status GetStatus()
297 {
298 lock (myLock)
299 {
300 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Status ca n only be accessed once the call has finished.");
301 return finishedStatus.Value.Status;
302 }
303 }
304
305 /// <summary>
306 /// Gets the trailing metadata if the call has already finished.
307 /// Throws InvalidOperationException otherwise.
308 /// </summary>
309 public Metadata GetTrailers()
310 {
311 lock (myLock)
312 {
313 GrpcPreconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished.");
314 return finishedStatus.Value.Trailers;
315 }
316 }
317
318 public CallInvocationDetails<TRequest, TResponse> Details
319 {
320 get
321 {
322 return this.details;
323 }
324 }
325
326 protected override void OnAfterReleaseResources()
327 {
328 details.Channel.RemoveCallReference(this);
329 }
330
331 protected override bool IsClient
332 {
333 get { return true; }
334 }
335
336 private void Initialize(CompletionQueueSafeHandle cq)
337 {
338 using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize") )
339 {
340 var call = CreateNativeCall(cq);
341
342 details.Channel.AddCallReference(this);
343 InitializeInternal(call);
344 RegisterCancellationCallback();
345 }
346 }
347
348 private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
349 {
350 using (Profilers.ForCurrentThread().NewScope("AsyncCall.CreateNative Call"))
351 {
352 if (injectedNativeCall != null)
353 {
354 return injectedNativeCall; // allows injecting a mock INati veCall in tests.
355 }
356
357 var parentCall = details.Options.PropagationToken != null ? deta ils.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
358
359 var credentials = details.Options.Credentials;
360 using (var nativeCredentials = credentials != null ? credentials .ToNativeCredentials() : null)
361 {
362 var result = details.Channel.Handle.CreateCall(environment.C ompletionRegistry,
363 parentCall, ContextPropagationToken.DefaultMask , cq,
364 details.Method, details.Host, Timespec.FromDate Time(details.Options.Deadline.Value), nativeCredentials);
365 return result;
366 }
367 }
368 }
369
370 // Make sure that once cancellationToken for this call is cancelled, Can cel() will be called.
371 private void RegisterCancellationCallback()
372 {
373 var token = details.Options.CancellationToken;
374 if (token.CanBeCanceled)
375 {
376 token.Register(() => this.Cancel());
377 }
378 }
379
380 /// <summary>
381 /// Gets WriteFlags set in callDetails.Options.WriteOptions
382 /// </summary>
383 private WriteFlags GetWriteFlagsForCall()
384 {
385 var writeOptions = details.Options.WriteOptions;
386 return writeOptions != null ? writeOptions.Flags : default(WriteFlag s);
387 }
388
389 /// <summary>
390 /// Handles receive status completion for calls with streaming response.
391 /// </summary>
392 private void HandleReceivedResponseHeaders(bool success, Metadata respon seHeaders)
393 {
394 responseHeadersTcs.SetResult(responseHeaders);
395 }
396
397 /// <summary>
398 /// Handler for unary response completion.
399 /// </summary>
400 private void HandleUnaryResponse(bool success, ClientSideStatus received Status, byte[] receivedMessage, Metadata responseHeaders)
401 {
402 using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryR esponse"))
403 {
404 TResponse msg = default(TResponse);
405 var deserializeException = success ? TryDeserialize(receivedMess age, out msg) : null;
406
407 lock (myLock)
408 {
409 finished = true;
410
411 if (deserializeException != null && receivedStatus.Status.St atusCode == StatusCode.OK)
412 {
413 receivedStatus = new ClientSideStatus(DeserializeRespons eFailureStatus, receivedStatus.Trailers);
414 }
415 finishedStatus = receivedStatus;
416
417 ReleaseResourcesIfPossible();
418
419 }
420
421 responseHeadersTcs.SetResult(responseHeaders);
422
423 var status = receivedStatus.Status;
424
425 if (!success || status.StatusCode != StatusCode.OK)
426 {
427 unaryResponseTcs.SetException(new RpcException(status));
428 return;
429 }
430
431 unaryResponseTcs.SetResult(msg);
432 }
433 }
434
435 /// <summary>
436 /// Handles receive status completion for calls with streaming response.
437 /// </summary>
438 private void HandleFinished(bool success, ClientSideStatus receivedStatu s)
439 {
440 lock (myLock)
441 {
442 finished = true;
443 finishedStatus = receivedStatus;
444
445 ReleaseResourcesIfPossible();
446 }
447
448 var status = receivedStatus.Status;
449
450 if (!success || status.StatusCode != StatusCode.OK)
451 {
452 streamingCallFinishedTcs.SetException(new RpcException(status));
453 return;
454 }
455
456 streamingCallFinishedTcs.SetResult(null);
457 }
458 }
459 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698