aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCall.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs616
1 files changed, 616 insertions, 0 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
new file mode 100644
index 0000000000..5e96092e27
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -0,0 +1,616 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Handles native call lifecycle and provides convenience methods.
+ /// </summary>
+ internal class AsyncCall<TWrite, TRead>
+ {
+ readonly Func<TWrite, byte[]> serializer;
+ readonly Func<byte[], TRead> deserializer;
+
+ readonly CompletionCallbackDelegate unaryResponseHandler;
+ readonly CompletionCallbackDelegate finishedHandler;
+ readonly CompletionCallbackDelegate writeFinishedHandler;
+ readonly CompletionCallbackDelegate readFinishedHandler;
+ readonly CompletionCallbackDelegate halfclosedHandler;
+ readonly CompletionCallbackDelegate finishedServersideHandler;
+
+ object myLock = new object();
+ GCHandle gchandle;
+ CallSafeHandle call;
+ bool disposed;
+
+ bool server;
+
+ bool started;
+ bool errorOccured;
+ bool cancelRequested;
+ bool readingDone;
+ bool halfcloseRequested;
+ bool halfclosed;
+ bool finished;
+
+ // Completion of a pending write if not null.
+ TaskCompletionSource<object> writeTcs;
+
+ // Completion of a pending read if not null.
+ TaskCompletionSource<TRead> readTcs;
+
+ // Completion of a pending halfclose if not null.
+ TaskCompletionSource<object> halfcloseTcs;
+
+ // Completion of a pending unary response if not null.
+ TaskCompletionSource<TRead> unaryResponseTcs;
+
+ // Set after status is received on client. Only used for server streaming and duplex streaming calls.
+ Nullable<Status> finishedStatus;
+ TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
+
+ // For streaming, the reads will be delivered to this observer.
+ IObserver<TRead> readObserver;
+
+ public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ {
+ this.serializer = serializer;
+ this.deserializer = deserializer;
+ this.unaryResponseHandler = HandleUnaryResponse;
+ this.finishedHandler = HandleFinished;
+ this.writeFinishedHandler = HandleWriteFinished;
+ this.readFinishedHandler = HandleReadFinished;
+ this.halfclosedHandler = HandleHalfclosed;
+ this.finishedServersideHandler = HandleFinishedServerside;
+ }
+
+ public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
+ {
+ InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
+ }
+
+ public void InitializeServer(CallSafeHandle call)
+ {
+ InitializeInternal(call, true);
+ }
+
+ public Task<TRead> UnaryCallAsync(TWrite msg)
+ {
+ lock (myLock)
+ {
+ started = true;
+ halfcloseRequested = true;
+ readingDone = true;
+
+ // TODO: handle serialization error...
+ byte[] payload = serializer(msg);
+
+ unaryResponseTcs = new TaskCompletionSource<TRead>();
+ call.StartUnary(payload, unaryResponseHandler);
+
+ return unaryResponseTcs.Task;
+ }
+ }
+
+ public Task<TRead> ClientStreamingCallAsync()
+ {
+ lock (myLock)
+ {
+ started = true;
+ readingDone = true;
+
+ unaryResponseTcs = new TaskCompletionSource<TRead>();
+ call.StartClientStreaming(unaryResponseHandler);
+
+ return unaryResponseTcs.Task;
+ }
+ }
+
+ public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver)
+ {
+ lock (myLock)
+ {
+ started = true;
+ halfcloseRequested = true;
+
+ this.readObserver = readObserver;
+
+ // TODO: handle serialization error...
+ byte[] payload = serializer(msg);
+
+ call.StartServerStreaming(payload, finishedHandler);
+
+ ReceiveMessageAsync();
+ }
+ }
+
+ public void StartDuplexStreamingCall(IObserver<TRead> readObserver)
+ {
+ lock (myLock)
+ {
+ started = true;
+
+ this.readObserver = readObserver;
+
+ call.StartDuplexStreaming(finishedHandler);
+
+ ReceiveMessageAsync();
+ }
+ }
+
+ public Task ServerSideUnaryRequestCallAsync()
+ {
+ lock (myLock)
+ {
+ started = true;
+ call.StartServerSide(finishedServersideHandler);
+ return finishedServersideTcs.Task;
+ }
+ }
+
+ public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
+ {
+ lock (myLock)
+ {
+ started = true;
+ call.StartServerSide(finishedServersideHandler);
+
+ if (this.readObserver != null)
+ {
+ throw new InvalidOperationException("Already registered an observer.");
+ }
+ this.readObserver = readObserver;
+ ReceiveMessageAsync();
+
+ return finishedServersideTcs.Task;
+ }
+ }
+
+ public Task SendMessageAsync(TWrite msg)
+ {
+ lock (myLock)
+ {
+ CheckNotDisposed();
+ CheckStarted();
+ CheckNoError();
+
+ if (halfcloseRequested)
+ {
+ throw new InvalidOperationException("Already halfclosed.");
+ }
+
+ if (writeTcs != null)
+ {
+ throw new InvalidOperationException("Only one write can be pending at a time");
+ }
+
+ // TODO: wrap serialization...
+ byte[] payload = serializer(msg);
+
+ call.StartSendMessage(payload, writeFinishedHandler);
+ writeTcs = new TaskCompletionSource<object>();
+ return writeTcs.Task;
+ }
+ }
+
+ public Task SendCloseFromClientAsync()
+ {
+ lock (myLock)
+ {
+ CheckNotDisposed();
+ CheckStarted();
+ CheckNoError();
+
+ if (halfcloseRequested)
+ {
+ throw new InvalidOperationException("Already halfclosed.");
+ }
+
+ call.StartSendCloseFromClient(halfclosedHandler);
+
+ halfcloseRequested = true;
+ halfcloseTcs = new TaskCompletionSource<object>();
+ return halfcloseTcs.Task;
+ }
+ }
+
+ public Task SendStatusFromServerAsync(Status status)
+ {
+ lock (myLock)
+ {
+ CheckNotDisposed();
+ CheckStarted();
+ CheckNoError();
+
+ if (halfcloseRequested)
+ {
+ throw new InvalidOperationException("Already halfclosed.");
+ }
+
+ call.StartSendStatusFromServer(status, halfclosedHandler);
+ halfcloseRequested = true;
+ halfcloseTcs = new TaskCompletionSource<object>();
+ return halfcloseTcs.Task;
+ }
+ }
+
+ public Task<TRead> ReceiveMessageAsync()
+ {
+ lock (myLock)
+ {
+ CheckNotDisposed();
+ CheckStarted();
+ CheckNoError();
+
+ if (readingDone)
+ {
+ throw new InvalidOperationException("Already read the last message.");
+ }
+
+ if (readTcs != null)
+ {
+ throw new InvalidOperationException("Only one read can be pending at a time");
+ }
+
+ call.StartReceiveMessage(readFinishedHandler);
+
+ readTcs = new TaskCompletionSource<TRead>();
+ return readTcs.Task;
+ }
+ }
+
+ public void Cancel()
+ {
+ lock (myLock)
+ {
+ CheckNotDisposed();
+ CheckStarted();
+ cancelRequested = true;
+ }
+ // grpc_call_cancel is threadsafe
+ call.Cancel();
+ }
+
+ public void CancelWithStatus(Status status)
+ {
+ lock (myLock)
+ {
+ CheckNotDisposed();
+ CheckStarted();
+ cancelRequested = true;
+ }
+ // grpc_call_cancel_with_status is threadsafe
+ call.CancelWithStatus(status);
+ }
+
+ private void InitializeInternal(CallSafeHandle call, bool server)
+ {
+ lock (myLock)
+ {
+ // Make sure this object and the delegated held by it will not be garbage collected
+ // before we release this handle.
+ gchandle = GCHandle.Alloc(this);
+ this.call = call;
+ this.server = server;
+ }
+ }
+
+ private void CheckStarted()
+ {
+ if (!started)
+ {
+ throw new InvalidOperationException("Call not started");
+ }
+ }
+
+ private void CheckNotDisposed()
+ {
+ if (disposed)
+ {
+ throw new InvalidOperationException("Call has already been disposed.");
+ }
+ }
+
+ private void CheckNoError()
+ {
+ if (errorOccured)
+ {
+ throw new InvalidOperationException("Error occured when processing call.");
+ }
+ }
+
+ private bool ReleaseResourcesIfPossible()
+ {
+ if (!disposed && call != null)
+ {
+ if (halfclosed && readingDone && finished)
+ {
+ ReleaseResources();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void ReleaseResources()
+ {
+ if (call != null) {
+ call.Dispose();
+ }
+ gchandle.Free();
+ disposed = true;
+ }
+
+ private void CompleteStreamObserver(Status status)
+ {
+ if (status.StatusCode != StatusCode.OK)
+ {
+ // TODO: wrap to handle exceptions;
+ readObserver.OnError(new RpcException(status));
+ } else {
+ // TODO: wrap to handle exceptions;
+ readObserver.OnCompleted();
+ }
+ }
+
+ /// <summary>
+ /// Handler for unary response completion.
+ /// </summary>
+ private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
+ TaskCompletionSource<TRead> tcs;
+ lock(myLock)
+ {
+ finished = true;
+ halfclosed = true;
+ tcs = unaryResponseTcs;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+
+ if (error != GRPCOpError.GRPC_OP_OK)
+ {
+ tcs.SetException(new RpcException(
+ new Status(StatusCode.Internal, "Internal error occured.")
+ ));
+ return;
+ }
+
+ var status = ctx.GetReceivedStatus();
+ if (status.StatusCode != StatusCode.OK)
+ {
+ tcs.SetException(new RpcException(status));
+ return;
+ }
+
+ // TODO: handle deserialize error...
+ var msg = deserializer(ctx.GetReceivedMessage());
+ tcs.SetResult(msg);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ }
+
+ private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
+ TaskCompletionSource<object> oldTcs = null;
+ lock (myLock)
+ {
+ oldTcs = writeTcs;
+ writeTcs = null;
+ }
+
+ if (errorOccured)
+ {
+ // TODO: use the right type of exception...
+ oldTcs.SetException(new Exception("Write failed"));
+ }
+ else
+ {
+ // TODO: where does the continuation run?
+ oldTcs.SetResult(null);
+ }
+
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ }
+
+ private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
+ lock (myLock)
+ {
+ halfclosed = true;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ if (error != GRPCOpError.GRPC_OP_OK)
+ {
+ halfcloseTcs.SetException(new Exception("Halfclose failed"));
+
+ }
+ else
+ {
+ halfcloseTcs.SetResult(null);
+ }
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ }
+
+ private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ var payload = ctx.GetReceivedMessage();
+
+ TaskCompletionSource<TRead> oldTcs = null;
+ IObserver<TRead> observer = null;
+
+ Nullable<Status> status = null;
+
+ lock (myLock)
+ {
+ oldTcs = readTcs;
+ readTcs = null;
+ if (payload == null)
+ {
+ readingDone = true;
+ }
+ observer = readObserver;
+ status = finishedStatus;
+ }
+
+ // TODO: wrap deserialization...
+ TRead msg = payload != null ? deserializer(payload) : default(TRead);
+
+ oldTcs.SetResult(msg);
+
+ // TODO: make sure we deliver reads in the right order.
+
+ if (observer != null)
+ {
+ if (payload != null)
+ {
+ // TODO: wrap to handle exceptions
+ observer.OnNext(msg);
+
+ // start a new read
+ ReceiveMessageAsync();
+ }
+ else
+ {
+ if (!server)
+ {
+ if (status.HasValue)
+ {
+ CompleteStreamObserver(status.Value);
+ }
+ }
+ else
+ {
+ // TODO: wrap to handle exceptions..
+ observer.OnCompleted();
+ }
+ // TODO: completeStreamObserver serverside...
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ }
+
+ private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ var status = ctx.GetReceivedStatus();
+
+ bool wasReadingDone;
+
+ lock (myLock)
+ {
+ finished = true;
+ finishedStatus = status;
+
+ wasReadingDone = readingDone;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ if (wasReadingDone) {
+ CompleteStreamObserver(status);
+ }
+
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ }
+
+ private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+
+ lock(myLock)
+ {
+ finished = true;
+
+ // TODO: because of the way server calls are implemented, we need to set
+ // reading done to true here. Should be fixed in the future.
+ readingDone = true;
+
+ ReleaseResourcesIfPossible();
+ }
+ // TODO: handle error ...
+
+ finishedServersideTcs.SetResult(null);
+
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ }
+ }
+} \ No newline at end of file