aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCall.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs577
1 files changed, 116 insertions, 461 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 6f37b059f7..5ae036298b 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -43,84 +43,47 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Handles native call lifecycle and provides convenience methods.
+ /// Handles client side native call lifecycle.
/// </summary>
- internal class AsyncCall<TWrite, TRead>
+ internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
- readonly Func<TWrite, byte[]> serializer;
- readonly Func<byte[], TRead> deserializer;
-
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
- readonly CompletionCallbackDelegate writeFinishedHandler;
- readonly CompletionCallbackDelegate readFinishedHandler;
- readonly CompletionCallbackDelegate halfclosedHandler;
- readonly CompletionCallbackDelegate finishedServersideHandler;
-
- object myLock = new object();
- GCHandle gchandle;
- CallSafeHandle call;
- bool disposed;
-
- bool server;
-
- bool started;
- bool errorOccured;
- bool cancelRequested;
- bool readingDone;
- bool halfcloseRequested;
- bool halfclosed;
- bool finished;
-
- // Completion of a pending write if not null.
- TaskCompletionSource<object> writeTcs;
-
- // Completion of a pending read if not null.
- TaskCompletionSource<TRead> readTcs;
-
- // Completion of a pending halfclose if not null.
- TaskCompletionSource<object> halfcloseTcs;
// Completion of a pending unary response if not null.
- TaskCompletionSource<TRead> unaryResponseTcs;
+ TaskCompletionSource<TResponse> unaryResponseTcs;
- // Set after status is received on client. Only used for server streaming and duplex streaming calls.
+ // Set after status is received. Only used for streaming response calls.
Nullable<Status> finishedStatus;
- TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
- // For streaming, the reads will be delivered to this observer.
- IObserver<TRead> readObserver;
+ bool readObserverCompleted; // True if readObserver has already been completed.
- public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
- this.serializer = serializer;
- this.deserializer = deserializer;
- this.unaryResponseHandler = HandleUnaryResponse;
- this.finishedHandler = HandleFinished;
- this.writeFinishedHandler = HandleWriteFinished;
- this.readFinishedHandler = HandleReadFinished;
- this.halfclosedHandler = HandleHalfclosed;
- this.finishedServersideHandler = HandleFinishedServerside;
+ this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
+ this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
{
- InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
+ var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
+ InitializeInternal(call);
}
- public void InitializeServer(CallSafeHandle call)
- {
- InitializeInternal(call, true);
- }
-
- public TRead UnaryCall(Channel channel, String methodName, TWrite msg)
+ // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
+ // it is reusing fair amount of code in this class, so we are leaving it here.
+ // TODO: for other calls, you need to call Initialize, this methods calls initialize
+ // on its own, so there's a usage inconsistency.
+ /// <summary>
+ /// Blocking unary request - unary response call.
+ /// </summary>
+ public TResponse UnaryCall(Channel channel, String methodName, TRequest msg)
{
using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
lock (myLock)
{
@@ -143,508 +106,200 @@ namespace Grpc.Core.Internal
}
}
- public Task<TRead> UnaryCallAsync(TWrite msg)
+ /// <summary>
+ /// Starts a unary request - unary response call.
+ /// </summary>
+ public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
halfcloseRequested = true;
readingDone = true;
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartUnary(payload, unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
- public Task<TRead> ClientStreamingCallAsync()
+ /// <summary>
+ /// Starts a streamed request - unary response call.
+ /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
+ /// </summary>
+ public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
readingDone = true;
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartClientStreaming(unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
- public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver)
+ /// <summary>
+ /// Starts a unary request - streamed response call.
+ /// </summary>
+ public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
this.readObserver = readObserver;
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
call.StartServerStreaming(payload, finishedHandler);
- ReceiveMessageAsync();
+ StartReceiveMessage();
}
}
- public void StartDuplexStreamingCall(IObserver<TRead> readObserver)
+ /// <summary>
+ /// Starts a streaming request - streaming response call.
+ /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
+ /// </summary>
+ public void StartDuplexStreamingCall(IObserver<TResponse> readObserver)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
this.readObserver = readObserver;
call.StartDuplexStreaming(finishedHandler);
- ReceiveMessageAsync();
+ StartReceiveMessage();
}
}
- public Task ServerSideUnaryRequestCallAsync()
- {
- lock (myLock)
- {
- started = true;
- call.StartServerSide(finishedServersideHandler);
- return finishedServersideTcs.Task;
- }
- }
-
- public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
- {
- lock (myLock)
- {
- started = true;
- call.StartServerSide(finishedServersideHandler);
-
- if (this.readObserver != null)
- {
- throw new InvalidOperationException("Already registered an observer.");
- }
- this.readObserver = readObserver;
- ReceiveMessageAsync();
-
- return finishedServersideTcs.Task;
- }
- }
-
- public Task SendMessageAsync(TWrite msg)
+ /// <summary>
+ /// Sends a streaming request. Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate)
{
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
-
- if (writeTcs != null)
- {
- throw new InvalidOperationException("Only one write can be pending at a time");
- }
-
- // TODO: wrap serialization...
- byte[] payload = serializer(msg);
-
- call.StartSendMessage(payload, writeFinishedHandler);
- writeTcs = new TaskCompletionSource<object>();
- return writeTcs.Task;
- }
+ StartSendMessageInternal(msg, completionDelegate);
}
- public Task SendCloseFromClientAsync()
+ /// <summary>
+ /// Sends halfclose, indicating client is done with streaming requests.
+ /// Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate)
{
lock (myLock)
{
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ CheckSendingAllowed();
call.StartSendCloseFromClient(halfclosedHandler);
halfcloseRequested = true;
- halfcloseTcs = new TaskCompletionSource<object>();
- return halfcloseTcs.Task;
- }
- }
-
- public Task SendStatusFromServerAsync(Status status)
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
-
- call.StartSendStatusFromServer(status, halfclosedHandler);
- halfcloseRequested = true;
- halfcloseTcs = new TaskCompletionSource<object>();
- return halfcloseTcs.Task;
+ sendCompletionDelegate = completionDelegate;
}
}
- public Task<TRead> ReceiveMessageAsync()
+ /// <summary>
+ /// On client-side, we only fire readObserver.OnCompleted once all messages have been read
+ /// and status has been received.
+ /// </summary>
+ protected override void CompleteReadObserver()
{
- lock (myLock)
+ if (readingDone && finishedStatus.HasValue)
{
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (readingDone)
- {
- throw new InvalidOperationException("Already read the last message.");
- }
-
- if (readTcs != null)
+ bool shouldComplete;
+ lock (myLock)
{
- throw new InvalidOperationException("Only one read can be pending at a time");
+ shouldComplete = !readObserverCompleted;
+ readObserverCompleted = true;
}
- call.StartReceiveMessage(readFinishedHandler);
-
- readTcs = new TaskCompletionSource<TRead>();
- return readTcs.Task;
- }
- }
-
- public void Cancel()
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- cancelRequested = true;
- }
- // grpc_call_cancel is threadsafe
- call.Cancel();
- }
-
- public void CancelWithStatus(Status status)
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- cancelRequested = true;
- }
- // grpc_call_cancel_with_status is threadsafe
- call.CancelWithStatus(status);
- }
-
- private void InitializeInternal(CallSafeHandle call, bool server)
- {
- lock (myLock)
- {
- // Make sure this object and the delegated held by it will not be garbage collected
- // before we release this handle.
- gchandle = GCHandle.Alloc(this);
- this.call = call;
- this.server = server;
- }
- }
-
- private void CheckStarted()
- {
- if (!started)
- {
- throw new InvalidOperationException("Call not started");
- }
- }
-
- private void CheckNotDisposed()
- {
- if (disposed)
- {
- throw new InvalidOperationException("Call has already been disposed.");
- }
- }
-
- private void CheckNoError()
- {
- if (errorOccured)
- {
- throw new InvalidOperationException("Error occured when processing call.");
- }
- }
-
- private bool ReleaseResourcesIfPossible()
- {
- if (!disposed && call != null)
- {
- if (halfclosed && readingDone && finished)
+ if (shouldComplete)
{
- ReleaseResources();
- return true;
+ var status = finishedStatus.Value;
+ if (status.StatusCode != StatusCode.OK)
+ {
+ FireReadObserverOnError(new RpcException(status));
+ }
+ else
+ {
+ FireReadObserverOnCompleted();
+ }
}
}
- return false;
- }
-
- private void ReleaseResources()
- {
- if (call != null) {
- call.Dispose();
- }
- gchandle.Free();
- disposed = true;
- }
-
- private void CompleteStreamObserver(Status status)
- {
- if (status.StatusCode != StatusCode.OK)
- {
- // TODO: wrap to handle exceptions;
- readObserver.OnError(new RpcException(status));
- } else {
- // TODO: wrap to handle exceptions;
- readObserver.OnCompleted();
- }
}
/// <summary>
/// Handler for unary response completion.
/// </summary>
- private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
+ private void HandleUnaryResponse(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
- try
+ lock(myLock)
{
- TaskCompletionSource<TRead> tcs;
- lock(myLock)
- {
- finished = true;
- halfclosed = true;
- tcs = unaryResponseTcs;
-
- ReleaseResourcesIfPossible();
- }
-
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ finished = true;
+ halfclosed = true;
- if (error != GRPCOpError.GRPC_OP_OK)
- {
- tcs.SetException(new RpcException(
- new Status(StatusCode.Internal, "Internal error occured.")
- ));
- return;
- }
-
- var status = ctx.GetReceivedStatus();
- if (status.StatusCode != StatusCode.OK)
- {
- tcs.SetException(new RpcException(status));
- return;
- }
-
- // TODO: handle deserialize error...
- var msg = deserializer(ctx.GetReceivedMessage());
- tcs.SetResult(msg);
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
+ ReleaseResourcesIfPossible();
}
- }
-
- private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- TaskCompletionSource<object> oldTcs = null;
- lock (myLock)
- {
- oldTcs = writeTcs;
- writeTcs = null;
- }
-
- if (errorOccured)
- {
- // TODO: use the right type of exception...
- oldTcs.SetException(new Exception("Write failed"));
- }
- else
- {
- // TODO: where does the continuation run?
- oldTcs.SetResult(null);
- }
- }
- catch(Exception e)
+ if (wasError)
{
- Console.WriteLine("Caught exception in a native handler: " + e);
+ unaryResponseTcs.SetException(new RpcException(
+ new Status(StatusCode.Internal, "Internal error occured.")
+ ));
+ return;
}
- }
-
- private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- lock (myLock)
- {
- halfclosed = true;
- ReleaseResourcesIfPossible();
- }
-
- if (error != GRPCOpError.GRPC_OP_OK)
- {
- halfcloseTcs.SetException(new Exception("Halfclose failed"));
-
- }
- else
- {
- halfcloseTcs.SetResult(null);
- }
- }
- catch(Exception e)
+ var status = ctx.GetReceivedStatus();
+ if (status.StatusCode != StatusCode.OK)
{
- Console.WriteLine("Caught exception in a native handler: " + e);
+ unaryResponseTcs.SetException(new RpcException(status));
+ return;
}
- }
-
- private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- var payload = ctx.GetReceivedMessage();
-
- TaskCompletionSource<TRead> oldTcs = null;
- IObserver<TRead> observer = null;
-
- Nullable<Status> status = null;
-
- lock (myLock)
- {
- oldTcs = readTcs;
- readTcs = null;
- if (payload == null)
- {
- readingDone = true;
- }
- observer = readObserver;
- status = finishedStatus;
-
- ReleaseResourcesIfPossible();
- }
-
- // TODO: wrap deserialization...
- TRead msg = payload != null ? deserializer(payload) : default(TRead);
- oldTcs.SetResult(msg);
+ // TODO: handle deserialization error
+ TResponse msg;
+ TryDeserialize(ctx.GetReceivedMessage(), out msg);
- // TODO: make sure we deliver reads in the right order.
-
- if (observer != null)
- {
- if (payload != null)
- {
- // TODO: wrap to handle exceptions
- observer.OnNext(msg);
-
- // start a new read
- ReceiveMessageAsync();
- }
- else
- {
- if (!server)
- {
- if (status.HasValue)
- {
- CompleteStreamObserver(status.Value);
- }
- }
- else
- {
- // TODO: wrap to handle exceptions..
- observer.OnCompleted();
- }
- // TODO: completeStreamObserver serverside...
- }
- }
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
+ unaryResponseTcs.SetResult(msg);
}
- private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
+ /// <summary>
+ /// Handles receive status completion for calls with streaming response.
+ /// </summary>
+ private void HandleFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- var status = ctx.GetReceivedStatus();
-
- bool wasReadingDone;
-
- lock (myLock)
- {
- finished = true;
- finishedStatus = status;
-
- wasReadingDone = readingDone;
-
- ReleaseResourcesIfPossible();
- }
-
- if (wasReadingDone) {
- CompleteStreamObserver(status);
- }
-
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
- }
+ var status = ctx.GetReceivedStatus();
- private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
+ lock (myLock)
{
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
-
- lock(myLock)
- {
- finished = true;
-
- // TODO: because of the way server calls are implemented, we need to set
- // reading done to true here. Should be fixed in the future.
- readingDone = true;
-
- ReleaseResourcesIfPossible();
- }
- // TODO: handle error ...
-
- finishedServersideTcs.SetResult(null);
+ finished = true;
+ finishedStatus = status;
+ ReleaseResourcesIfPossible();
}
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
+
+ CompleteReadObserver();
}
}
} \ No newline at end of file