OLD | NEW |
(Empty) | |
| 1 #region Copyright notice and license |
| 2 |
| 3 // Copyright 2015-2016, Google Inc. |
| 4 // All rights reserved. |
| 5 // |
| 6 // Redistribution and use in source and binary forms, with or without |
| 7 // modification, are permitted provided that the following conditions are |
| 8 // met: |
| 9 // |
| 10 // * Redistributions of source code must retain the above copyright |
| 11 // notice, this list of conditions and the following disclaimer. |
| 12 // * Redistributions in binary form must reproduce the above |
| 13 // copyright notice, this list of conditions and the following disclaimer |
| 14 // in the documentation and/or other materials provided with the |
| 15 // distribution. |
| 16 // * Neither the name of Google Inc. nor the names of its |
| 17 // contributors may be used to endorse or promote products derived from |
| 18 // this software without specific prior written permission. |
| 19 // |
| 20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 |
| 32 #endregion |
| 33 |
| 34 using System; |
| 35 using System.Diagnostics; |
| 36 using System.IO; |
| 37 using System.Runtime.CompilerServices; |
| 38 using System.Runtime.InteropServices; |
| 39 using System.Threading; |
| 40 using System.Threading.Tasks; |
| 41 |
| 42 using Grpc.Core.Internal; |
| 43 using Grpc.Core.Logging; |
| 44 using Grpc.Core.Profiling; |
| 45 using Grpc.Core.Utils; |
| 46 |
| 47 namespace Grpc.Core.Internal |
| 48 { |
| 49 /// <summary> |
| 50 /// Base for handling both client side and server side calls. |
| 51 /// Manages native call lifecycle and provides convenience methods. |
| 52 /// </summary> |
| 53 internal abstract class AsyncCallBase<TWrite, TRead> |
| 54 { |
| 55 static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCal
lBase<TWrite, TRead>>(); |
| 56 protected static readonly Status DeserializeResponseFailureStatus = new
Status(StatusCode.Internal, "Failed to deserialize response message."); |
| 57 |
| 58 readonly Func<TWrite, byte[]> serializer; |
| 59 readonly Func<byte[], TRead> deserializer; |
| 60 |
| 61 protected readonly GrpcEnvironment environment; |
| 62 protected readonly object myLock = new object(); |
| 63 |
| 64 protected INativeCall call; |
| 65 protected bool disposed; |
| 66 |
| 67 protected bool started; |
| 68 protected bool cancelRequested; |
| 69 |
| 70 protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Co
mpletion of a pending send or sendclose if not null. |
| 71 protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Com
pletion of a pending send or sendclose if not null. |
| 72 |
| 73 protected bool readingDone; // True if last read (i.e. read with null p
ayload) was already received. |
| 74 protected bool halfcloseRequested; // True if send close have been init
iated. |
| 75 protected bool finished; // True if close has been received from the pe
er. |
| 76 |
| 77 protected bool initialMetadataSent; |
| 78 protected long streamingWritesCounter; // Number of streaming send oper
ations started so far. |
| 79 |
| 80 public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead
> deserializer, GrpcEnvironment environment) |
| 81 { |
| 82 this.serializer = GrpcPreconditions.CheckNotNull(serializer); |
| 83 this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); |
| 84 this.environment = GrpcPreconditions.CheckNotNull(environment); |
| 85 } |
| 86 |
| 87 /// <summary> |
| 88 /// Requests cancelling the call. |
| 89 /// </summary> |
| 90 public void Cancel() |
| 91 { |
| 92 lock (myLock) |
| 93 { |
| 94 GrpcPreconditions.CheckState(started); |
| 95 cancelRequested = true; |
| 96 |
| 97 if (!disposed) |
| 98 { |
| 99 call.Cancel(); |
| 100 } |
| 101 } |
| 102 } |
| 103 |
| 104 /// <summary> |
| 105 /// Requests cancelling the call with given status. |
| 106 /// </summary> |
| 107 protected void CancelWithStatus(Status status) |
| 108 { |
| 109 lock (myLock) |
| 110 { |
| 111 cancelRequested = true; |
| 112 |
| 113 if (!disposed) |
| 114 { |
| 115 call.CancelWithStatus(status); |
| 116 } |
| 117 } |
| 118 } |
| 119 |
| 120 protected void InitializeInternal(INativeCall call) |
| 121 { |
| 122 lock (myLock) |
| 123 { |
| 124 this.call = call; |
| 125 } |
| 126 } |
| 127 |
| 128 /// <summary> |
| 129 /// Initiates sending a message. Only one send operation can be active a
t a time. |
| 130 /// completionDelegate is invoked upon completion. |
| 131 /// </summary> |
| 132 protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlag
s, AsyncCompletionDelegate<object> completionDelegate) |
| 133 { |
| 134 byte[] payload = UnsafeSerialize(msg); |
| 135 |
| 136 lock (myLock) |
| 137 { |
| 138 GrpcPreconditions.CheckNotNull(completionDelegate, "Completion d
elegate cannot be null"); |
| 139 CheckSendingAllowed(); |
| 140 |
| 141 call.StartSendMessage(HandleSendFinished, payload, writeFlags, !
initialMetadataSent); |
| 142 |
| 143 sendCompletionDelegate = completionDelegate; |
| 144 initialMetadataSent = true; |
| 145 streamingWritesCounter++; |
| 146 } |
| 147 } |
| 148 |
| 149 /// <summary> |
| 150 /// Initiates reading a message. Only one read operation can be active a
t a time. |
| 151 /// completionDelegate is invoked upon completion. |
| 152 /// </summary> |
| 153 protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> c
ompletionDelegate) |
| 154 { |
| 155 lock (myLock) |
| 156 { |
| 157 GrpcPreconditions.CheckNotNull(completionDelegate, "Completion d
elegate cannot be null"); |
| 158 CheckReadingAllowed(); |
| 159 |
| 160 call.StartReceiveMessage(HandleReadFinished); |
| 161 readCompletionDelegate = completionDelegate; |
| 162 } |
| 163 } |
| 164 |
| 165 /// <summary> |
| 166 /// If there are no more pending actions and no new actions can be start
ed, releases |
| 167 /// the underlying native resources. |
| 168 /// </summary> |
| 169 protected bool ReleaseResourcesIfPossible() |
| 170 { |
| 171 using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseR
esourcesIfPossible")) |
| 172 { |
| 173 if (!disposed && call != null) |
| 174 { |
| 175 bool noMoreSendCompletions = sendCompletionDelegate == null
&& (halfcloseRequested || cancelRequested || finished); |
| 176 if (noMoreSendCompletions && readingDone && finished) |
| 177 { |
| 178 ReleaseResources(); |
| 179 return true; |
| 180 } |
| 181 } |
| 182 return false; |
| 183 } |
| 184 } |
| 185 |
| 186 protected abstract bool IsClient |
| 187 { |
| 188 get; |
| 189 } |
| 190 |
| 191 private void ReleaseResources() |
| 192 { |
| 193 if (call != null) |
| 194 { |
| 195 call.Dispose(); |
| 196 } |
| 197 disposed = true; |
| 198 OnAfterReleaseResources(); |
| 199 } |
| 200 |
| 201 protected virtual void OnAfterReleaseResources() |
| 202 { |
| 203 } |
| 204 |
| 205 protected void CheckSendingAllowed() |
| 206 { |
| 207 GrpcPreconditions.CheckState(started); |
| 208 CheckNotCancelled(); |
| 209 GrpcPreconditions.CheckState(!disposed); |
| 210 |
| 211 GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclose
d."); |
| 212 GrpcPreconditions.CheckState(!finished, "Already finished."); |
| 213 GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only o
ne write can be pending at a time"); |
| 214 } |
| 215 |
| 216 protected virtual void CheckReadingAllowed() |
| 217 { |
| 218 GrpcPreconditions.CheckState(started); |
| 219 GrpcPreconditions.CheckState(!disposed); |
| 220 |
| 221 GrpcPreconditions.CheckState(!readingDone, "Stream has already been
closed."); |
| 222 GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only o
ne read can be pending at a time"); |
| 223 } |
| 224 |
| 225 protected void CheckNotCancelled() |
| 226 { |
| 227 if (cancelRequested) |
| 228 { |
| 229 throw new OperationCanceledException("Remote call has been cance
lled."); |
| 230 } |
| 231 } |
| 232 |
| 233 protected byte[] UnsafeSerialize(TWrite msg) |
| 234 { |
| 235 using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSe
rialize")) |
| 236 { |
| 237 return serializer(msg); |
| 238 } |
| 239 } |
| 240 |
| 241 protected Exception TryDeserialize(byte[] payload, out TRead msg) |
| 242 { |
| 243 using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeser
ialize")) |
| 244 { |
| 245 try |
| 246 { |
| 247 |
| 248 msg = deserializer(payload); |
| 249 return null; |
| 250 |
| 251 } |
| 252 catch (Exception e) |
| 253 { |
| 254 msg = default(TRead); |
| 255 return e; |
| 256 } |
| 257 } |
| 258 } |
| 259 |
| 260 protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDe
legate, T value, Exception error) |
| 261 { |
| 262 try |
| 263 { |
| 264 completionDelegate(value, error); |
| 265 } |
| 266 catch (Exception e) |
| 267 { |
| 268 Logger.Error(e, "Exception occured while invoking completion del
egate."); |
| 269 } |
| 270 } |
| 271 |
| 272 /// <summary> |
| 273 /// Handles send completion. |
| 274 /// </summary> |
| 275 protected void HandleSendFinished(bool success) |
| 276 { |
| 277 AsyncCompletionDelegate<object> origCompletionDelegate = null; |
| 278 lock (myLock) |
| 279 { |
| 280 origCompletionDelegate = sendCompletionDelegate; |
| 281 sendCompletionDelegate = null; |
| 282 |
| 283 ReleaseResourcesIfPossible(); |
| 284 } |
| 285 |
| 286 if (!success) |
| 287 { |
| 288 FireCompletion(origCompletionDelegate, null, new InvalidOperatio
nException("Send failed")); |
| 289 } |
| 290 else |
| 291 { |
| 292 FireCompletion(origCompletionDelegate, null, null); |
| 293 } |
| 294 } |
| 295 |
| 296 /// <summary> |
| 297 /// Handles halfclose completion. |
| 298 /// </summary> |
| 299 protected void HandleHalfclosed(bool success) |
| 300 { |
| 301 AsyncCompletionDelegate<object> origCompletionDelegate = null; |
| 302 lock (myLock) |
| 303 { |
| 304 origCompletionDelegate = sendCompletionDelegate; |
| 305 sendCompletionDelegate = null; |
| 306 |
| 307 ReleaseResourcesIfPossible(); |
| 308 } |
| 309 |
| 310 if (!success) |
| 311 { |
| 312 FireCompletion(origCompletionDelegate, null, new InvalidOperatio
nException("Halfclose failed")); |
| 313 } |
| 314 else |
| 315 { |
| 316 FireCompletion(origCompletionDelegate, null, null); |
| 317 } |
| 318 } |
| 319 |
| 320 /// <summary> |
| 321 /// Handles streaming read completion. |
| 322 /// </summary> |
| 323 protected void HandleReadFinished(bool success, byte[] receivedMessage) |
| 324 { |
| 325 TRead msg = default(TRead); |
| 326 var deserializeException = (success && receivedMessage != null) ? Tr
yDeserialize(receivedMessage, out msg) : null; |
| 327 |
| 328 AsyncCompletionDelegate<TRead> origCompletionDelegate = null; |
| 329 lock (myLock) |
| 330 { |
| 331 origCompletionDelegate = readCompletionDelegate; |
| 332 readCompletionDelegate = null; |
| 333 |
| 334 if (receivedMessage == null) |
| 335 { |
| 336 // This was the last read. |
| 337 readingDone = true; |
| 338 } |
| 339 |
| 340 if (deserializeException != null && IsClient) |
| 341 { |
| 342 readingDone = true; |
| 343 CancelWithStatus(DeserializeResponseFailureStatus); |
| 344 } |
| 345 |
| 346 ReleaseResourcesIfPossible(); |
| 347 } |
| 348 |
| 349 // TODO: handle the case when success==false |
| 350 |
| 351 if (deserializeException != null && !IsClient) |
| 352 { |
| 353 FireCompletion(origCompletionDelegate, default(TRead), new IOExc
eption("Failed to deserialize request message.", deserializeException)); |
| 354 return; |
| 355 } |
| 356 FireCompletion(origCompletionDelegate, msg, null); |
| 357 } |
| 358 } |
| 359 } |
OLD | NEW |