Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(686)

Side by Side Diff: third_party/grpc/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 #region Copyright notice and license
2
3 // Copyright 2015-2016, Google Inc.
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are
8 // met:
9 //
10 // * Redistributions of source code must retain the above copyright
11 // notice, this list of conditions and the following disclaimer.
12 // * Redistributions in binary form must reproduce the above
13 // copyright notice, this list of conditions and the following disclaimer
14 // in the documentation and/or other materials provided with the
15 // distribution.
16 // * Neither the name of Google Inc. nor the names of its
17 // contributors may be used to endorse or promote products derived from
18 // this software without specific prior written permission.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32 #endregion
33
34 using System;
35 using System.Diagnostics;
36 using System.IO;
37 using System.Runtime.CompilerServices;
38 using System.Runtime.InteropServices;
39 using System.Threading;
40 using System.Threading.Tasks;
41
42 using Grpc.Core.Internal;
43 using Grpc.Core.Logging;
44 using Grpc.Core.Profiling;
45 using Grpc.Core.Utils;
46
47 namespace Grpc.Core.Internal
48 {
49 /// <summary>
50 /// Base for handling both client side and server side calls.
51 /// Manages native call lifecycle and provides convenience methods.
52 /// </summary>
53 internal abstract class AsyncCallBase<TWrite, TRead>
54 {
55 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCal lBase<TWrite, TRead>>();
56 protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
57
58 readonly Func<TWrite, byte[]> serializer;
59 readonly Func<byte[], TRead> deserializer;
60
61 protected readonly GrpcEnvironment environment;
62 protected readonly object myLock = new object();
63
64 protected INativeCall call;
65 protected bool disposed;
66
67 protected bool started;
68 protected bool cancelRequested;
69
70 protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Co mpletion of a pending send or sendclose if not null.
71 protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Com pletion of a pending send or sendclose if not null.
72
73 protected bool readingDone; // True if last read (i.e. read with null p ayload) was already received.
74 protected bool halfcloseRequested; // True if send close have been init iated.
75 protected bool finished; // True if close has been received from the pe er.
76
77 protected bool initialMetadataSent;
78 protected long streamingWritesCounter; // Number of streaming send oper ations started so far.
79
80 public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead > deserializer, GrpcEnvironment environment)
81 {
82 this.serializer = GrpcPreconditions.CheckNotNull(serializer);
83 this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
84 this.environment = GrpcPreconditions.CheckNotNull(environment);
85 }
86
87 /// <summary>
88 /// Requests cancelling the call.
89 /// </summary>
90 public void Cancel()
91 {
92 lock (myLock)
93 {
94 GrpcPreconditions.CheckState(started);
95 cancelRequested = true;
96
97 if (!disposed)
98 {
99 call.Cancel();
100 }
101 }
102 }
103
104 /// <summary>
105 /// Requests cancelling the call with given status.
106 /// </summary>
107 protected void CancelWithStatus(Status status)
108 {
109 lock (myLock)
110 {
111 cancelRequested = true;
112
113 if (!disposed)
114 {
115 call.CancelWithStatus(status);
116 }
117 }
118 }
119
120 protected void InitializeInternal(INativeCall call)
121 {
122 lock (myLock)
123 {
124 this.call = call;
125 }
126 }
127
128 /// <summary>
129 /// Initiates sending a message. Only one send operation can be active a t a time.
130 /// completionDelegate is invoked upon completion.
131 /// </summary>
132 protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlag s, AsyncCompletionDelegate<object> completionDelegate)
133 {
134 byte[] payload = UnsafeSerialize(msg);
135
136 lock (myLock)
137 {
138 GrpcPreconditions.CheckNotNull(completionDelegate, "Completion d elegate cannot be null");
139 CheckSendingAllowed();
140
141 call.StartSendMessage(HandleSendFinished, payload, writeFlags, ! initialMetadataSent);
142
143 sendCompletionDelegate = completionDelegate;
144 initialMetadataSent = true;
145 streamingWritesCounter++;
146 }
147 }
148
149 /// <summary>
150 /// Initiates reading a message. Only one read operation can be active a t a time.
151 /// completionDelegate is invoked upon completion.
152 /// </summary>
153 protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> c ompletionDelegate)
154 {
155 lock (myLock)
156 {
157 GrpcPreconditions.CheckNotNull(completionDelegate, "Completion d elegate cannot be null");
158 CheckReadingAllowed();
159
160 call.StartReceiveMessage(HandleReadFinished);
161 readCompletionDelegate = completionDelegate;
162 }
163 }
164
165 /// <summary>
166 /// If there are no more pending actions and no new actions can be start ed, releases
167 /// the underlying native resources.
168 /// </summary>
169 protected bool ReleaseResourcesIfPossible()
170 {
171 using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseR esourcesIfPossible"))
172 {
173 if (!disposed && call != null)
174 {
175 bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
176 if (noMoreSendCompletions && readingDone && finished)
177 {
178 ReleaseResources();
179 return true;
180 }
181 }
182 return false;
183 }
184 }
185
186 protected abstract bool IsClient
187 {
188 get;
189 }
190
191 private void ReleaseResources()
192 {
193 if (call != null)
194 {
195 call.Dispose();
196 }
197 disposed = true;
198 OnAfterReleaseResources();
199 }
200
201 protected virtual void OnAfterReleaseResources()
202 {
203 }
204
205 protected void CheckSendingAllowed()
206 {
207 GrpcPreconditions.CheckState(started);
208 CheckNotCancelled();
209 GrpcPreconditions.CheckState(!disposed);
210
211 GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclose d.");
212 GrpcPreconditions.CheckState(!finished, "Already finished.");
213 GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only o ne write can be pending at a time");
214 }
215
216 protected virtual void CheckReadingAllowed()
217 {
218 GrpcPreconditions.CheckState(started);
219 GrpcPreconditions.CheckState(!disposed);
220
221 GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed.");
222 GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only o ne read can be pending at a time");
223 }
224
225 protected void CheckNotCancelled()
226 {
227 if (cancelRequested)
228 {
229 throw new OperationCanceledException("Remote call has been cance lled.");
230 }
231 }
232
233 protected byte[] UnsafeSerialize(TWrite msg)
234 {
235 using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSe rialize"))
236 {
237 return serializer(msg);
238 }
239 }
240
241 protected Exception TryDeserialize(byte[] payload, out TRead msg)
242 {
243 using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeser ialize"))
244 {
245 try
246 {
247
248 msg = deserializer(payload);
249 return null;
250
251 }
252 catch (Exception e)
253 {
254 msg = default(TRead);
255 return e;
256 }
257 }
258 }
259
260 protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDe legate, T value, Exception error)
261 {
262 try
263 {
264 completionDelegate(value, error);
265 }
266 catch (Exception e)
267 {
268 Logger.Error(e, "Exception occured while invoking completion del egate.");
269 }
270 }
271
272 /// <summary>
273 /// Handles send completion.
274 /// </summary>
275 protected void HandleSendFinished(bool success)
276 {
277 AsyncCompletionDelegate<object> origCompletionDelegate = null;
278 lock (myLock)
279 {
280 origCompletionDelegate = sendCompletionDelegate;
281 sendCompletionDelegate = null;
282
283 ReleaseResourcesIfPossible();
284 }
285
286 if (!success)
287 {
288 FireCompletion(origCompletionDelegate, null, new InvalidOperatio nException("Send failed"));
289 }
290 else
291 {
292 FireCompletion(origCompletionDelegate, null, null);
293 }
294 }
295
296 /// <summary>
297 /// Handles halfclose completion.
298 /// </summary>
299 protected void HandleHalfclosed(bool success)
300 {
301 AsyncCompletionDelegate<object> origCompletionDelegate = null;
302 lock (myLock)
303 {
304 origCompletionDelegate = sendCompletionDelegate;
305 sendCompletionDelegate = null;
306
307 ReleaseResourcesIfPossible();
308 }
309
310 if (!success)
311 {
312 FireCompletion(origCompletionDelegate, null, new InvalidOperatio nException("Halfclose failed"));
313 }
314 else
315 {
316 FireCompletion(origCompletionDelegate, null, null);
317 }
318 }
319
320 /// <summary>
321 /// Handles streaming read completion.
322 /// </summary>
323 protected void HandleReadFinished(bool success, byte[] receivedMessage)
324 {
325 TRead msg = default(TRead);
326 var deserializeException = (success && receivedMessage != null) ? Tr yDeserialize(receivedMessage, out msg) : null;
327
328 AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
329 lock (myLock)
330 {
331 origCompletionDelegate = readCompletionDelegate;
332 readCompletionDelegate = null;
333
334 if (receivedMessage == null)
335 {
336 // This was the last read.
337 readingDone = true;
338 }
339
340 if (deserializeException != null && IsClient)
341 {
342 readingDone = true;
343 CancelWithStatus(DeserializeResponseFailureStatus);
344 }
345
346 ReleaseResourcesIfPossible();
347 }
348
349 // TODO: handle the case when success==false
350
351 if (deserializeException != null && !IsClient)
352 {
353 FireCompletion(origCompletionDelegate, default(TRead), new IOExc eption("Failed to deserialize request message.", deserializeException));
354 return;
355 }
356 FireCompletion(origCompletionDelegate, msg, null);
357 }
358 }
359 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698