diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 577 |
1 files changed, 116 insertions, 461 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 6f37b059f7..5ae036298b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -43,84 +43,47 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> - /// Handles native call lifecycle and provides convenience methods. + /// Handles client side native call lifecycle. /// </summary> - internal class AsyncCall<TWrite, TRead> + internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse> { - readonly Func<TWrite, byte[]> serializer; - readonly Func<byte[], TRead> deserializer; - readonly CompletionCallbackDelegate unaryResponseHandler; readonly CompletionCallbackDelegate finishedHandler; - readonly CompletionCallbackDelegate writeFinishedHandler; - readonly CompletionCallbackDelegate readFinishedHandler; - readonly CompletionCallbackDelegate halfclosedHandler; - readonly CompletionCallbackDelegate finishedServersideHandler; - - object myLock = new object(); - GCHandle gchandle; - CallSafeHandle call; - bool disposed; - - bool server; - - bool started; - bool errorOccured; - bool cancelRequested; - bool readingDone; - bool halfcloseRequested; - bool halfclosed; - bool finished; - - // Completion of a pending write if not null. - TaskCompletionSource<object> writeTcs; - - // Completion of a pending read if not null. - TaskCompletionSource<TRead> readTcs; - - // Completion of a pending halfclose if not null. - TaskCompletionSource<object> halfcloseTcs; // Completion of a pending unary response if not null. - TaskCompletionSource<TRead> unaryResponseTcs; + TaskCompletionSource<TResponse> unaryResponseTcs; - // Set after status is received on client. Only used for server streaming and duplex streaming calls. + // Set after status is received. Only used for streaming response calls. Nullable<Status> finishedStatus; - TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); - // For streaming, the reads will be delivered to this observer. - IObserver<TRead> readObserver; + bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) { - this.serializer = serializer; - this.deserializer = deserializer; - this.unaryResponseHandler = HandleUnaryResponse; - this.finishedHandler = HandleFinished; - this.writeFinishedHandler = HandleWriteFinished; - this.readFinishedHandler = HandleReadFinished; - this.halfclosedHandler = HandleHalfclosed; - this.finishedServersideHandler = HandleFinishedServerside; + this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse); + this.finishedHandler = CreateBatchCompletionCallback(HandleFinished); } public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) { - InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false); + var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture); + InitializeInternal(call); } - public void InitializeServer(CallSafeHandle call) - { - InitializeInternal(call, true); - } - - public TRead UnaryCall(Channel channel, String methodName, TWrite msg) + // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but + // it is reusing fair amount of code in this class, so we are leaving it here. + // TODO: for other calls, you need to call Initialize, this methods calls initialize + // on its own, so there's a usage inconsistency. + /// <summary> + /// Blocking unary request - unary response call. + /// </summary> + public TResponse UnaryCall(Channel channel, String methodName, TRequest msg) { using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) { - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); lock (myLock) { @@ -143,508 +106,200 @@ namespace Grpc.Core.Internal } } - public Task<TRead> UnaryCallAsync(TWrite msg) + /// <summary> + /// Starts a unary request - unary response call. + /// </summary> + public Task<TResponse> UnaryCallAsync(TRequest msg) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; halfcloseRequested = true; readingDone = true; - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); call.StartUnary(payload, unaryResponseHandler); return unaryResponseTcs.Task; } } - public Task<TRead> ClientStreamingCallAsync() + /// <summary> + /// Starts a streamed request - unary response call. + /// Use StartSendMessage and StartSendCloseFromClient to stream requests. + /// </summary> + public Task<TResponse> ClientStreamingCallAsync() { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; readingDone = true; - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); call.StartClientStreaming(unaryResponseHandler); return unaryResponseTcs.Task; } } - public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver) + /// <summary> + /// Starts a unary request - streamed response call. + /// </summary> + public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; halfcloseRequested = true; halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. this.readObserver = readObserver; - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); call.StartServerStreaming(payload, finishedHandler); - ReceiveMessageAsync(); + StartReceiveMessage(); } } - public void StartDuplexStreamingCall(IObserver<TRead> readObserver) + /// <summary> + /// Starts a streaming request - streaming response call. + /// Use StartSendMessage and StartSendCloseFromClient to stream requests. + /// </summary> + public void StartDuplexStreamingCall(IObserver<TResponse> readObserver) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; this.readObserver = readObserver; call.StartDuplexStreaming(finishedHandler); - ReceiveMessageAsync(); + StartReceiveMessage(); } } - public Task ServerSideUnaryRequestCallAsync() - { - lock (myLock) - { - started = true; - call.StartServerSide(finishedServersideHandler); - return finishedServersideTcs.Task; - } - } - - public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver) - { - lock (myLock) - { - started = true; - call.StartServerSide(finishedServersideHandler); - - if (this.readObserver != null) - { - throw new InvalidOperationException("Already registered an observer."); - } - this.readObserver = readObserver; - ReceiveMessageAsync(); - - return finishedServersideTcs.Task; - } - } - - public Task SendMessageAsync(TWrite msg) + /// <summary> + /// Sends a streaming request. Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate) { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } - - if (writeTcs != null) - { - throw new InvalidOperationException("Only one write can be pending at a time"); - } - - // TODO: wrap serialization... - byte[] payload = serializer(msg); - - call.StartSendMessage(payload, writeFinishedHandler); - writeTcs = new TaskCompletionSource<object>(); - return writeTcs.Task; - } + StartSendMessageInternal(msg, completionDelegate); } - public Task SendCloseFromClientAsync() + /// <summary> + /// Sends halfclose, indicating client is done with streaming requests. + /// Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate) { lock (myLock) { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + CheckSendingAllowed(); call.StartSendCloseFromClient(halfclosedHandler); halfcloseRequested = true; - halfcloseTcs = new TaskCompletionSource<object>(); - return halfcloseTcs.Task; - } - } - - public Task SendStatusFromServerAsync(Status status) - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } - - call.StartSendStatusFromServer(status, halfclosedHandler); - halfcloseRequested = true; - halfcloseTcs = new TaskCompletionSource<object>(); - return halfcloseTcs.Task; + sendCompletionDelegate = completionDelegate; } } - public Task<TRead> ReceiveMessageAsync() + /// <summary> + /// On client-side, we only fire readObserver.OnCompleted once all messages have been read + /// and status has been received. + /// </summary> + protected override void CompleteReadObserver() { - lock (myLock) + if (readingDone && finishedStatus.HasValue) { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (readingDone) - { - throw new InvalidOperationException("Already read the last message."); - } - - if (readTcs != null) + bool shouldComplete; + lock (myLock) { - throw new InvalidOperationException("Only one read can be pending at a time"); + shouldComplete = !readObserverCompleted; + readObserverCompleted = true; } - call.StartReceiveMessage(readFinishedHandler); - - readTcs = new TaskCompletionSource<TRead>(); - return readTcs.Task; - } - } - - public void Cancel() - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - cancelRequested = true; - } - // grpc_call_cancel is threadsafe - call.Cancel(); - } - - public void CancelWithStatus(Status status) - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - cancelRequested = true; - } - // grpc_call_cancel_with_status is threadsafe - call.CancelWithStatus(status); - } - - private void InitializeInternal(CallSafeHandle call, bool server) - { - lock (myLock) - { - // Make sure this object and the delegated held by it will not be garbage collected - // before we release this handle. - gchandle = GCHandle.Alloc(this); - this.call = call; - this.server = server; - } - } - - private void CheckStarted() - { - if (!started) - { - throw new InvalidOperationException("Call not started"); - } - } - - private void CheckNotDisposed() - { - if (disposed) - { - throw new InvalidOperationException("Call has already been disposed."); - } - } - - private void CheckNoError() - { - if (errorOccured) - { - throw new InvalidOperationException("Error occured when processing call."); - } - } - - private bool ReleaseResourcesIfPossible() - { - if (!disposed && call != null) - { - if (halfclosed && readingDone && finished) + if (shouldComplete) { - ReleaseResources(); - return true; + var status = finishedStatus.Value; + if (status.StatusCode != StatusCode.OK) + { + FireReadObserverOnError(new RpcException(status)); + } + else + { + FireReadObserverOnCompleted(); + } } } - return false; - } - - private void ReleaseResources() - { - if (call != null) { - call.Dispose(); - } - gchandle.Free(); - disposed = true; - } - - private void CompleteStreamObserver(Status status) - { - if (status.StatusCode != StatusCode.OK) - { - // TODO: wrap to handle exceptions; - readObserver.OnError(new RpcException(status)); - } else { - // TODO: wrap to handle exceptions; - readObserver.OnCompleted(); - } } /// <summary> /// Handler for unary response completion. /// </summary> - private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr) + private void HandleUnaryResponse(bool wasError, BatchContextSafeHandleNotOwned ctx) { - try + lock(myLock) { - TaskCompletionSource<TRead> tcs; - lock(myLock) - { - finished = true; - halfclosed = true; - tcs = unaryResponseTcs; - - ReleaseResourcesIfPossible(); - } - - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + finished = true; + halfclosed = true; - if (error != GRPCOpError.GRPC_OP_OK) - { - tcs.SetException(new RpcException( - new Status(StatusCode.Internal, "Internal error occured.") - )); - return; - } - - var status = ctx.GetReceivedStatus(); - if (status.StatusCode != StatusCode.OK) - { - tcs.SetException(new RpcException(status)); - return; - } - - // TODO: handle deserialize error... - var msg = deserializer(ctx.GetReceivedMessage()); - tcs.SetResult(msg); - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); + ReleaseResourcesIfPossible(); } - } - - private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - TaskCompletionSource<object> oldTcs = null; - lock (myLock) - { - oldTcs = writeTcs; - writeTcs = null; - } - - if (errorOccured) - { - // TODO: use the right type of exception... - oldTcs.SetException(new Exception("Write failed")); - } - else - { - // TODO: where does the continuation run? - oldTcs.SetResult(null); - } - } - catch(Exception e) + if (wasError) { - Console.WriteLine("Caught exception in a native handler: " + e); + unaryResponseTcs.SetException(new RpcException( + new Status(StatusCode.Internal, "Internal error occured.") + )); + return; } - } - - private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - lock (myLock) - { - halfclosed = true; - ReleaseResourcesIfPossible(); - } - - if (error != GRPCOpError.GRPC_OP_OK) - { - halfcloseTcs.SetException(new Exception("Halfclose failed")); - - } - else - { - halfcloseTcs.SetResult(null); - } - } - catch(Exception e) + var status = ctx.GetReceivedStatus(); + if (status.StatusCode != StatusCode.OK) { - Console.WriteLine("Caught exception in a native handler: " + e); + unaryResponseTcs.SetException(new RpcException(status)); + return; } - } - - private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - var payload = ctx.GetReceivedMessage(); - - TaskCompletionSource<TRead> oldTcs = null; - IObserver<TRead> observer = null; - - Nullable<Status> status = null; - - lock (myLock) - { - oldTcs = readTcs; - readTcs = null; - if (payload == null) - { - readingDone = true; - } - observer = readObserver; - status = finishedStatus; - - ReleaseResourcesIfPossible(); - } - - // TODO: wrap deserialization... - TRead msg = payload != null ? deserializer(payload) : default(TRead); - oldTcs.SetResult(msg); + // TODO: handle deserialization error + TResponse msg; + TryDeserialize(ctx.GetReceivedMessage(), out msg); - // TODO: make sure we deliver reads in the right order. - - if (observer != null) - { - if (payload != null) - { - // TODO: wrap to handle exceptions - observer.OnNext(msg); - - // start a new read - ReceiveMessageAsync(); - } - else - { - if (!server) - { - if (status.HasValue) - { - CompleteStreamObserver(status.Value); - } - } - else - { - // TODO: wrap to handle exceptions.. - observer.OnCompleted(); - } - // TODO: completeStreamObserver serverside... - } - } - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } + unaryResponseTcs.SetResult(msg); } - private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) + /// <summary> + /// Handles receive status completion for calls with streaming response. + /// </summary> + private void HandleFinished(bool wasError, BatchContextSafeHandleNotOwned ctx) { - try - { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - var status = ctx.GetReceivedStatus(); - - bool wasReadingDone; - - lock (myLock) - { - finished = true; - finishedStatus = status; - - wasReadingDone = readingDone; - - ReleaseResourcesIfPossible(); - } - - if (wasReadingDone) { - CompleteStreamObserver(status); - } - - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } - } + var status = ctx.GetReceivedStatus(); - private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) - { - try + lock (myLock) { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - - lock(myLock) - { - finished = true; - - // TODO: because of the way server calls are implemented, we need to set - // reading done to true here. Should be fixed in the future. - readingDone = true; - - ReleaseResourcesIfPossible(); - } - // TODO: handle error ... - - finishedServersideTcs.SetResult(null); + finished = true; + finishedStatus = status; + ReleaseResourcesIfPossible(); } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } + + CompleteReadObserver(); } } }
\ No newline at end of file |