aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj7
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs577
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs407
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs125
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCompletion.cs95
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs1
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs16
-rw-r--r--src/csharp/Grpc.Core/OperationFailedException.cs48
-rw-r--r--src/csharp/Grpc.Core/ServerCallHandler.cs34
-rw-r--r--src/csharp/Grpc.Core/Status.cs6
-rw-r--r--src/csharp/Grpc.Core/Utils/Preconditions.cs113
12 files changed, 949 insertions, 489 deletions
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 93d5430591..78b6cdde59 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -51,7 +51,6 @@
<Compile Include="Internal\SafeHandleZeroIsInvalid.cs" />
<Compile Include="Internal\Timespec.cs" />
<Compile Include="Internal\GrpcThreadPool.cs" />
- <Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Method.cs" />
<Compile Include="ServerCalls.cs" />
@@ -69,6 +68,12 @@
<Compile Include="Credentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
<Compile Include="ChannelArgs.cs" />
+ <Compile Include="Internal\AsyncCompletion.cs" />
+ <Compile Include="Internal\AsyncCallBase.cs" />
+ <Compile Include="Internal\AsyncCallServer.cs" />
+ <Compile Include="OperationFailedException.cs" />
+ <Compile Include="Internal\AsyncCall.cs" />
+ <Compile Include="Utils\Preconditions.cs" />
</ItemGroup>
<Choose>
<!-- Under older versions of Monodevelop, Choose is not supported and is just
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 6f37b059f7..5ae036298b 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -43,84 +43,47 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Handles native call lifecycle and provides convenience methods.
+ /// Handles client side native call lifecycle.
/// </summary>
- internal class AsyncCall<TWrite, TRead>
+ internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
- readonly Func<TWrite, byte[]> serializer;
- readonly Func<byte[], TRead> deserializer;
-
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
- readonly CompletionCallbackDelegate writeFinishedHandler;
- readonly CompletionCallbackDelegate readFinishedHandler;
- readonly CompletionCallbackDelegate halfclosedHandler;
- readonly CompletionCallbackDelegate finishedServersideHandler;
-
- object myLock = new object();
- GCHandle gchandle;
- CallSafeHandle call;
- bool disposed;
-
- bool server;
-
- bool started;
- bool errorOccured;
- bool cancelRequested;
- bool readingDone;
- bool halfcloseRequested;
- bool halfclosed;
- bool finished;
-
- // Completion of a pending write if not null.
- TaskCompletionSource<object> writeTcs;
-
- // Completion of a pending read if not null.
- TaskCompletionSource<TRead> readTcs;
-
- // Completion of a pending halfclose if not null.
- TaskCompletionSource<object> halfcloseTcs;
// Completion of a pending unary response if not null.
- TaskCompletionSource<TRead> unaryResponseTcs;
+ TaskCompletionSource<TResponse> unaryResponseTcs;
- // Set after status is received on client. Only used for server streaming and duplex streaming calls.
+ // Set after status is received. Only used for streaming response calls.
Nullable<Status> finishedStatus;
- TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
- // For streaming, the reads will be delivered to this observer.
- IObserver<TRead> readObserver;
+ bool readObserverCompleted; // True if readObserver has already been completed.
- public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
- this.serializer = serializer;
- this.deserializer = deserializer;
- this.unaryResponseHandler = HandleUnaryResponse;
- this.finishedHandler = HandleFinished;
- this.writeFinishedHandler = HandleWriteFinished;
- this.readFinishedHandler = HandleReadFinished;
- this.halfclosedHandler = HandleHalfclosed;
- this.finishedServersideHandler = HandleFinishedServerside;
+ this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
+ this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
{
- InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
+ var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
+ InitializeInternal(call);
}
- public void InitializeServer(CallSafeHandle call)
- {
- InitializeInternal(call, true);
- }
-
- public TRead UnaryCall(Channel channel, String methodName, TWrite msg)
+ // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
+ // it is reusing fair amount of code in this class, so we are leaving it here.
+ // TODO: for other calls, you need to call Initialize, this methods calls initialize
+ // on its own, so there's a usage inconsistency.
+ /// <summary>
+ /// Blocking unary request - unary response call.
+ /// </summary>
+ public TResponse UnaryCall(Channel channel, String methodName, TRequest msg)
{
using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
lock (myLock)
{
@@ -143,508 +106,200 @@ namespace Grpc.Core.Internal
}
}
- public Task<TRead> UnaryCallAsync(TWrite msg)
+ /// <summary>
+ /// Starts a unary request - unary response call.
+ /// </summary>
+ public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
halfcloseRequested = true;
readingDone = true;
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartUnary(payload, unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
- public Task<TRead> ClientStreamingCallAsync()
+ /// <summary>
+ /// Starts a streamed request - unary response call.
+ /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
+ /// </summary>
+ public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
readingDone = true;
- unaryResponseTcs = new TaskCompletionSource<TRead>();
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartClientStreaming(unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
- public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver)
+ /// <summary>
+ /// Starts a unary request - streamed response call.
+ /// </summary>
+ public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
this.readObserver = readObserver;
- // TODO: handle serialization error...
- byte[] payload = serializer(msg);
+ byte[] payload = UnsafeSerialize(msg);
call.StartServerStreaming(payload, finishedHandler);
- ReceiveMessageAsync();
+ StartReceiveMessage();
}
}
- public void StartDuplexStreamingCall(IObserver<TRead> readObserver)
+ /// <summary>
+ /// Starts a streaming request - streaming response call.
+ /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
+ /// </summary>
+ public void StartDuplexStreamingCall(IObserver<TResponse> readObserver)
{
lock (myLock)
{
+ Preconditions.CheckNotNull(call);
+
started = true;
this.readObserver = readObserver;
call.StartDuplexStreaming(finishedHandler);
- ReceiveMessageAsync();
+ StartReceiveMessage();
}
}
- public Task ServerSideUnaryRequestCallAsync()
- {
- lock (myLock)
- {
- started = true;
- call.StartServerSide(finishedServersideHandler);
- return finishedServersideTcs.Task;
- }
- }
-
- public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
- {
- lock (myLock)
- {
- started = true;
- call.StartServerSide(finishedServersideHandler);
-
- if (this.readObserver != null)
- {
- throw new InvalidOperationException("Already registered an observer.");
- }
- this.readObserver = readObserver;
- ReceiveMessageAsync();
-
- return finishedServersideTcs.Task;
- }
- }
-
- public Task SendMessageAsync(TWrite msg)
+ /// <summary>
+ /// Sends a streaming request. Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate)
{
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
-
- if (writeTcs != null)
- {
- throw new InvalidOperationException("Only one write can be pending at a time");
- }
-
- // TODO: wrap serialization...
- byte[] payload = serializer(msg);
-
- call.StartSendMessage(payload, writeFinishedHandler);
- writeTcs = new TaskCompletionSource<object>();
- return writeTcs.Task;
- }
+ StartSendMessageInternal(msg, completionDelegate);
}
- public Task SendCloseFromClientAsync()
+ /// <summary>
+ /// Sends halfclose, indicating client is done with streaming requests.
+ /// Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate)
{
lock (myLock)
{
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ CheckSendingAllowed();
call.StartSendCloseFromClient(halfclosedHandler);
halfcloseRequested = true;
- halfcloseTcs = new TaskCompletionSource<object>();
- return halfcloseTcs.Task;
- }
- }
-
- public Task SendStatusFromServerAsync(Status status)
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (halfcloseRequested)
- {
- throw new InvalidOperationException("Already halfclosed.");
- }
-
- call.StartSendStatusFromServer(status, halfclosedHandler);
- halfcloseRequested = true;
- halfcloseTcs = new TaskCompletionSource<object>();
- return halfcloseTcs.Task;
+ sendCompletionDelegate = completionDelegate;
}
}
- public Task<TRead> ReceiveMessageAsync()
+ /// <summary>
+ /// On client-side, we only fire readObserver.OnCompleted once all messages have been read
+ /// and status has been received.
+ /// </summary>
+ protected override void CompleteReadObserver()
{
- lock (myLock)
+ if (readingDone && finishedStatus.HasValue)
{
- CheckNotDisposed();
- CheckStarted();
- CheckNoError();
-
- if (readingDone)
- {
- throw new InvalidOperationException("Already read the last message.");
- }
-
- if (readTcs != null)
+ bool shouldComplete;
+ lock (myLock)
{
- throw new InvalidOperationException("Only one read can be pending at a time");
+ shouldComplete = !readObserverCompleted;
+ readObserverCompleted = true;
}
- call.StartReceiveMessage(readFinishedHandler);
-
- readTcs = new TaskCompletionSource<TRead>();
- return readTcs.Task;
- }
- }
-
- public void Cancel()
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- cancelRequested = true;
- }
- // grpc_call_cancel is threadsafe
- call.Cancel();
- }
-
- public void CancelWithStatus(Status status)
- {
- lock (myLock)
- {
- CheckNotDisposed();
- CheckStarted();
- cancelRequested = true;
- }
- // grpc_call_cancel_with_status is threadsafe
- call.CancelWithStatus(status);
- }
-
- private void InitializeInternal(CallSafeHandle call, bool server)
- {
- lock (myLock)
- {
- // Make sure this object and the delegated held by it will not be garbage collected
- // before we release this handle.
- gchandle = GCHandle.Alloc(this);
- this.call = call;
- this.server = server;
- }
- }
-
- private void CheckStarted()
- {
- if (!started)
- {
- throw new InvalidOperationException("Call not started");
- }
- }
-
- private void CheckNotDisposed()
- {
- if (disposed)
- {
- throw new InvalidOperationException("Call has already been disposed.");
- }
- }
-
- private void CheckNoError()
- {
- if (errorOccured)
- {
- throw new InvalidOperationException("Error occured when processing call.");
- }
- }
-
- private bool ReleaseResourcesIfPossible()
- {
- if (!disposed && call != null)
- {
- if (halfclosed && readingDone && finished)
+ if (shouldComplete)
{
- ReleaseResources();
- return true;
+ var status = finishedStatus.Value;
+ if (status.StatusCode != StatusCode.OK)
+ {
+ FireReadObserverOnError(new RpcException(status));
+ }
+ else
+ {
+ FireReadObserverOnCompleted();
+ }
}
}
- return false;
- }
-
- private void ReleaseResources()
- {
- if (call != null) {
- call.Dispose();
- }
- gchandle.Free();
- disposed = true;
- }
-
- private void CompleteStreamObserver(Status status)
- {
- if (status.StatusCode != StatusCode.OK)
- {
- // TODO: wrap to handle exceptions;
- readObserver.OnError(new RpcException(status));
- } else {
- // TODO: wrap to handle exceptions;
- readObserver.OnCompleted();
- }
}
/// <summary>
/// Handler for unary response completion.
/// </summary>
- private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
+ private void HandleUnaryResponse(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
- try
+ lock(myLock)
{
- TaskCompletionSource<TRead> tcs;
- lock(myLock)
- {
- finished = true;
- halfclosed = true;
- tcs = unaryResponseTcs;
-
- ReleaseResourcesIfPossible();
- }
-
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ finished = true;
+ halfclosed = true;
- if (error != GRPCOpError.GRPC_OP_OK)
- {
- tcs.SetException(new RpcException(
- new Status(StatusCode.Internal, "Internal error occured.")
- ));
- return;
- }
-
- var status = ctx.GetReceivedStatus();
- if (status.StatusCode != StatusCode.OK)
- {
- tcs.SetException(new RpcException(status));
- return;
- }
-
- // TODO: handle deserialize error...
- var msg = deserializer(ctx.GetReceivedMessage());
- tcs.SetResult(msg);
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
+ ReleaseResourcesIfPossible();
}
- }
-
- private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- TaskCompletionSource<object> oldTcs = null;
- lock (myLock)
- {
- oldTcs = writeTcs;
- writeTcs = null;
- }
-
- if (errorOccured)
- {
- // TODO: use the right type of exception...
- oldTcs.SetException(new Exception("Write failed"));
- }
- else
- {
- // TODO: where does the continuation run?
- oldTcs.SetResult(null);
- }
- }
- catch(Exception e)
+ if (wasError)
{
- Console.WriteLine("Caught exception in a native handler: " + e);
+ unaryResponseTcs.SetException(new RpcException(
+ new Status(StatusCode.Internal, "Internal error occured.")
+ ));
+ return;
}
- }
-
- private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- lock (myLock)
- {
- halfclosed = true;
- ReleaseResourcesIfPossible();
- }
-
- if (error != GRPCOpError.GRPC_OP_OK)
- {
- halfcloseTcs.SetException(new Exception("Halfclose failed"));
-
- }
- else
- {
- halfcloseTcs.SetResult(null);
- }
- }
- catch(Exception e)
+ var status = ctx.GetReceivedStatus();
+ if (status.StatusCode != StatusCode.OK)
{
- Console.WriteLine("Caught exception in a native handler: " + e);
+ unaryResponseTcs.SetException(new RpcException(status));
+ return;
}
- }
-
- private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- var payload = ctx.GetReceivedMessage();
-
- TaskCompletionSource<TRead> oldTcs = null;
- IObserver<TRead> observer = null;
-
- Nullable<Status> status = null;
-
- lock (myLock)
- {
- oldTcs = readTcs;
- readTcs = null;
- if (payload == null)
- {
- readingDone = true;
- }
- observer = readObserver;
- status = finishedStatus;
-
- ReleaseResourcesIfPossible();
- }
-
- // TODO: wrap deserialization...
- TRead msg = payload != null ? deserializer(payload) : default(TRead);
- oldTcs.SetResult(msg);
+ // TODO: handle deserialization error
+ TResponse msg;
+ TryDeserialize(ctx.GetReceivedMessage(), out msg);
- // TODO: make sure we deliver reads in the right order.
-
- if (observer != null)
- {
- if (payload != null)
- {
- // TODO: wrap to handle exceptions
- observer.OnNext(msg);
-
- // start a new read
- ReceiveMessageAsync();
- }
- else
- {
- if (!server)
- {
- if (status.HasValue)
- {
- CompleteStreamObserver(status.Value);
- }
- }
- else
- {
- // TODO: wrap to handle exceptions..
- observer.OnCompleted();
- }
- // TODO: completeStreamObserver serverside...
- }
- }
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
+ unaryResponseTcs.SetResult(msg);
}
- private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
+ /// <summary>
+ /// Handles receive status completion for calls with streaming response.
+ /// </summary>
+ private void HandleFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- var status = ctx.GetReceivedStatus();
-
- bool wasReadingDone;
-
- lock (myLock)
- {
- finished = true;
- finishedStatus = status;
-
- wasReadingDone = readingDone;
-
- ReleaseResourcesIfPossible();
- }
-
- if (wasReadingDone) {
- CompleteStreamObserver(status);
- }
-
- }
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
- }
+ var status = ctx.GetReceivedStatus();
- private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
- {
- try
+ lock (myLock)
{
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
-
- lock(myLock)
- {
- finished = true;
-
- // TODO: because of the way server calls are implemented, we need to set
- // reading done to true here. Should be fixed in the future.
- readingDone = true;
-
- ReleaseResourcesIfPossible();
- }
- // TODO: handle error ...
-
- finishedServersideTcs.SetResult(null);
+ finished = true;
+ finishedStatus = status;
+ ReleaseResourcesIfPossible();
}
- catch(Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
+
+ CompleteReadObserver();
}
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
new file mode 100644
index 0000000000..44d66b394c
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -0,0 +1,407 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Base for handling both client side and server side calls.
+ /// Handles native call lifecycle and provides convenience methods.
+ /// </summary>
+ internal abstract class AsyncCallBase<TWrite, TRead>
+ {
+ readonly Func<TWrite, byte[]> serializer;
+ readonly Func<byte[], TRead> deserializer;
+
+ protected readonly CompletionCallbackDelegate sendFinishedHandler;
+ protected readonly CompletionCallbackDelegate readFinishedHandler;
+ protected readonly CompletionCallbackDelegate halfclosedHandler;
+
+ protected readonly object myLock = new object();
+
+ protected GCHandle gchandle;
+ protected CallSafeHandle call;
+ protected bool disposed;
+
+ protected bool started;
+ protected bool errorOccured;
+ protected bool cancelRequested;
+
+ protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
+ protected bool readPending; // True if there is a read in progress.
+ protected bool readingDone;
+ protected bool halfcloseRequested;
+ protected bool halfclosed;
+ protected bool finished; // True if close has been received from the peer.
+
+ // Streaming reads will be delivered to this observer. For a call that only does unary read it may remain null.
+ protected IObserver<TRead> readObserver;
+
+ public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ {
+ this.serializer = Preconditions.CheckNotNull(serializer);
+ this.deserializer = Preconditions.CheckNotNull(deserializer);
+
+ this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished);
+ this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished);
+ this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed);
+ }
+
+ /// <summary>
+ /// Requests cancelling the call.
+ /// </summary>
+ public void Cancel()
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckState(started);
+ cancelRequested = true;
+
+ if (!disposed)
+ {
+ call.Cancel();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Requests cancelling the call with given status.
+ /// </summary>
+ public void CancelWithStatus(Status status)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckState(started);
+ cancelRequested = true;
+
+ if (!disposed)
+ {
+ call.CancelWithStatus(status);
+ }
+ }
+ }
+
+ protected void InitializeInternal(CallSafeHandle call)
+ {
+ lock (myLock)
+ {
+ // Make sure this object and the delegated held by it will not be garbage collected
+ // before we release this handle.
+ gchandle = GCHandle.Alloc(this);
+ this.call = call;
+ }
+ }
+
+ /// <summary>
+ /// Initiates sending a message. Only once send operation can be active at a time.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate completionDelegate)
+ {
+ byte[] payload = UnsafeSerialize(msg);
+
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ CheckSendingAllowed();
+
+ call.StartSendMessage(payload, sendFinishedHandler);
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
+ /// Requests receiving a next message.
+ /// </summary>
+ protected void StartReceiveMessage()
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckState(started);
+ Preconditions.CheckState(!disposed);
+ Preconditions.CheckState(!errorOccured);
+
+ Preconditions.CheckState(!readingDone);
+ Preconditions.CheckState(!readPending);
+
+ call.StartReceiveMessage(readFinishedHandler);
+ readPending = true;
+ }
+ }
+
+ /// <summary>
+ /// Default behavior just completes the read observer, but more sofisticated behavior might be required
+ /// by subclasses.
+ /// </summary>
+ protected virtual void CompleteReadObserver()
+ {
+ FireReadObserverOnCompleted();
+ }
+
+ /// <summary>
+ /// If there are no more pending actions and no new actions can be started, releases
+ /// the underlying native resources.
+ /// </summary>
+ protected bool ReleaseResourcesIfPossible()
+ {
+ if (!disposed && call != null)
+ {
+ if (halfclosed && readingDone && finished)
+ {
+ ReleaseResources();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void ReleaseResources()
+ {
+ if (call != null)
+ {
+ call.Dispose();
+ }
+ gchandle.Free();
+ disposed = true;
+ }
+
+ protected void CheckSendingAllowed()
+ {
+ Preconditions.CheckState(started);
+ Preconditions.CheckState(!disposed);
+ Preconditions.CheckState(!errorOccured);
+
+ Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
+ Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
+ }
+
+ protected byte[] UnsafeSerialize(TWrite msg)
+ {
+ return serializer(msg);
+ }
+
+ protected bool TrySerialize(TWrite msg, out byte[] payload)
+ {
+ try
+ {
+ payload = serializer(msg);
+ return true;
+ }
+ catch(Exception)
+ {
+ Console.WriteLine("Exception occured while trying to serialize message");
+ payload = null;
+ return false;
+ }
+ }
+
+ protected bool TryDeserialize(byte[] payload, out TRead msg)
+ {
+ try
+ {
+ msg = deserializer(payload);
+ return true;
+ }
+ catch(Exception)
+ {
+ Console.WriteLine("Exception occured while trying to deserialize message");
+ msg = default(TRead);
+ return false;
+ }
+ }
+
+ protected void FireReadObserverOnNext(TRead value)
+ {
+ try
+ {
+ readObserver.OnNext(value);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking readObserver.OnNext: " + e);
+ }
+ }
+
+ protected void FireReadObserverOnCompleted()
+ {
+ try
+ {
+ readObserver.OnCompleted();
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking readObserver.OnCompleted: " + e);
+ }
+ }
+
+ protected void FireReadObserverOnError(Exception error)
+ {
+ try
+ {
+ readObserver.OnError(error);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking readObserver.OnError: " + e);
+ }
+ }
+
+ protected void FireCompletion(AsyncCompletionDelegate completionDelegate, Exception error)
+ {
+ try
+ {
+ completionDelegate(error);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ }
+
+ /// <summary>
+ /// Creates completion callback delegate that wraps the batch completion handler in a try catch block to
+ /// prevent propagating exceptions accross managed/unmanaged boundary.
+ /// </summary>
+ protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler)
+ {
+ return new CompletionCallbackDelegate( (error, batchContextPtr) => {
+ try
+ {
+ var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ bool wasError = (error != GRPCOpError.GRPC_OP_OK);
+ handler(wasError, ctx);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ });
+ }
+
+ /// <summary>
+ /// Handles send completion.
+ /// </summary>
+ private void HandleSendFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
+ {
+ AsyncCompletionDelegate origCompletionDelegate = null;
+ lock (myLock)
+ {
+ origCompletionDelegate = sendCompletionDelegate;
+ sendCompletionDelegate = null;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ if (wasError)
+ {
+ FireCompletion(origCompletionDelegate, new OperationFailedException("Send failed"));
+ }
+ else
+ {
+ FireCompletion(origCompletionDelegate, null);
+ }
+ }
+
+ /// <summary>
+ /// Handles halfclose completion.
+ /// </summary>
+ private void HandleHalfclosed(bool wasError, BatchContextSafeHandleNotOwned ctx)
+ {
+ AsyncCompletionDelegate origCompletionDelegate = null;
+ lock (myLock)
+ {
+ halfclosed = true;
+ origCompletionDelegate = sendCompletionDelegate;
+ sendCompletionDelegate = null;
+
+ ReleaseResourcesIfPossible();
+ }
+
+ if (wasError)
+ {
+ FireCompletion(origCompletionDelegate, new OperationFailedException("Halfclose failed"));
+ }
+ else
+ {
+ FireCompletion(origCompletionDelegate, null);
+ }
+
+ }
+
+ /// <summary>
+ /// Handles streaming read completion.
+ /// </summary>
+ private void HandleReadFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
+ {
+ var payload = ctx.GetReceivedMessage();
+
+ lock (myLock)
+ {
+ readPending = false;
+ if (payload == null)
+ {
+ readingDone = true;
+ }
+
+ ReleaseResourcesIfPossible();
+ }
+
+ // TODO: handle the case when error occured...
+
+ if (payload != null)
+ {
+ // TODO: handle deserialization error
+ TRead msg;
+ TryDeserialize(payload, out msg);
+
+ FireReadObserverOnNext(msg);
+
+ // Start a new read. The current one has already been delivered,
+ // so correct ordering of reads is assured.
+ StartReceiveMessage();
+ }
+ else
+ {
+ CompleteReadObserver();
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
new file mode 100644
index 0000000000..d3a2be553f
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -0,0 +1,125 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Handles server side native call lifecycle.
+ /// </summary>
+ internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
+ {
+ readonly CompletionCallbackDelegate finishedServersideHandler;
+ readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
+
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
+ {
+ this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside);
+ }
+
+ public void Initialize(CallSafeHandle call)
+ {
+ InitializeInternal(call);
+ }
+
+ /// <summary>
+ /// Starts a server side call. Currently, all server side calls are implemented as duplex
+ /// streaming call and they are adapted to the appropriate streaming arity.
+ /// </summary>
+ public Task ServerSideCallAsync(IObserver<TRequest> readObserver)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(call);
+
+ started = true;
+ this.readObserver = readObserver;
+
+ call.StartServerSide(finishedServersideHandler);
+ StartReceiveMessage();
+ return finishedServersideTcs.Task;
+ }
+ }
+
+ /// <summary>
+ /// Sends a streaming response. Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendMessage(TResponse msg, AsyncCompletionDelegate completionDelegate)
+ {
+ StartSendMessageInternal(msg, completionDelegate);
+ }
+
+ /// <summary>
+ /// Sends call result status, also indicating server is done with streaming responses.
+ /// Only one pending send action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ CheckSendingAllowed();
+
+ call.StartSendStatusFromServer(status, halfclosedHandler);
+ halfcloseRequested = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
+ /// Handles the server side close completion.
+ /// </summary>
+ private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx)
+ {
+ lock (myLock)
+ {
+ finished = true;
+
+ ReleaseResourcesIfPossible();
+ }
+ // TODO: handle error ...
+
+ finishedServersideTcs.SetResult(null);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
new file mode 100644
index 0000000000..b78bb497fa
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
@@ -0,0 +1,95 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// If error != null, there's been an error or operation has been cancelled.
+ /// </summary>
+ internal delegate void AsyncCompletionDelegate(Exception error);
+
+ /// <summary>
+ /// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
+ /// </summary>
+ internal class AsyncCompletionTaskSource
+ {
+ readonly TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
+ readonly AsyncCompletionDelegate completionDelegate;
+
+ public AsyncCompletionTaskSource()
+ {
+ completionDelegate = new AsyncCompletionDelegate(HandleCompletion);
+ }
+
+ public Task Task
+ {
+ get
+ {
+ return tcs.Task;
+ }
+ }
+
+ public AsyncCompletionDelegate CompletionDelegate
+ {
+ get
+ {
+ return completionDelegate;
+ }
+ }
+
+ private void HandleCompletion(Exception error)
+ {
+ if (error == null)
+ {
+ tcs.SetResult(null);
+ return;
+ }
+ if (error is OperationCanceledException)
+ {
+ tcs.SetCanceled();
+ return;
+ }
+ tcs.SetException(error);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 1c0bc98f06..f399c5d9b8 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -38,7 +38,6 @@ using Grpc.Core;
namespace Grpc.Core.Internal
{
- //TODO: rename the delegate
internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs
index fb59e86e2d..859f2ee027 100644
--- a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs
@@ -47,9 +47,10 @@ namespace Grpc.Core.Internal
public void OnCompleted()
{
-
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendCloseFromClient(taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendCloseFromClientAsync().Wait();
+ taskSource.Task.Wait();
}
public void OnError(Exception error)
@@ -59,8 +60,10 @@ namespace Grpc.Core.Internal
public void OnNext(TWrite value)
{
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendMessage(value, taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendMessageAsync(value).Wait();
+ taskSource.Task.Wait();
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
index 08d9921475..707070f98a 100644
--- a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
@@ -40,19 +40,21 @@ namespace Grpc.Core.Internal
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
- internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite>
+ internal class ServerStreamingOutputObserver<TRequest, TResponse> : IObserver<TResponse>
{
- readonly AsyncCall<TWrite, TRead> call;
+ readonly AsyncCallServer<TRequest, TResponse> call;
- public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call)
+ public ServerStreamingOutputObserver(AsyncCallServer<TRequest, TResponse> call)
{
this.call = call;
}
public void OnCompleted()
{
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendStatusFromServer(new Status(StatusCode.OK, ""), taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait();
+ taskSource.Task.Wait();
}
public void OnError(Exception error)
@@ -61,10 +63,12 @@ namespace Grpc.Core.Internal
throw new InvalidOperationException("This should never be called.");
}
- public void OnNext(TWrite value)
+ public void OnNext(TResponse value)
{
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendMessage(value, taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendMessageAsync(value).Wait();
+ taskSource.Task.Wait();
}
}
}
diff --git a/src/csharp/Grpc.Core/OperationFailedException.cs b/src/csharp/Grpc.Core/OperationFailedException.cs
new file mode 100644
index 0000000000..34a8c95a85
--- /dev/null
+++ b/src/csharp/Grpc.Core/OperationFailedException.cs
@@ -0,0 +1,48 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Thrown when gRPC operation fails.
+ /// </summary>
+ public class OperationFailedException : Exception
+ {
+ public OperationFailedException(string message) : base(message)
+ {
+ }
+ }
+}
+
diff --git a/src/csharp/Grpc.Core/ServerCallHandler.cs b/src/csharp/Grpc.Core/ServerCallHandler.cs
index 289f97aece..3eb8422f57 100644
--- a/src/csharp/Grpc.Core/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/ServerCallHandler.cs
@@ -32,7 +32,9 @@
#endregion
using System;
+using System.Linq;
using Grpc.Core.Internal;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
@@ -54,17 +56,17 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
- var asyncCall = new AsyncCall<TResponse, TRequest>(
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
- asyncCall.InitializeServer(call);
+ asyncCall.Initialize(call);
- var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
+ var requestObserver = new RecordingObserver<TRequest>();
+ var finishedTask = asyncCall.ServerSideCallAsync(requestObserver);
- var request = asyncCall.ReceiveMessageAsync().Result;
-
- var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
+ var request = requestObserver.ToList().Result.Single();
+ var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall);
handler(request, responseObserver);
finishedTask.Wait();
@@ -85,15 +87,15 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
- var asyncCall = new AsyncCall<TResponse, TRequest>(
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
- asyncCall.InitializeServer(call);
+ asyncCall.Initialize(call);
- var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
+ var responseObserver = new ServerStreamingOutputObserver<TRequest,TResponse>(asyncCall);
var requestObserver = handler(responseObserver);
- var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver);
+ var finishedTask = asyncCall.ServerSideCallAsync(requestObserver);
finishedTask.Wait();
}
}
@@ -103,17 +105,15 @@ namespace Grpc.Core
public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
- AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
+ var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload);
+ asyncCall.Initialize(call);
- asyncCall.InitializeServer(call);
-
- var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>());
+ var finishedTask = asyncCall.ServerSideCallAsync(new NullObserver<byte[]>());
- // TODO: this makes the call finish before all reads can be done which causes trouble
- // in AsyncCall.HandleReadFinished callback. Revisit this.
- asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait();
+ // TODO: check result of the completion status.
+ asyncCall.StartSendStatusFromServer(new Status(StatusCode.Unimplemented, "No such method."), new AsyncCompletionDelegate((error) => {}));
finishedTask.Wait();
}
diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs
index 5ea1df7b48..6ba5d992a6 100644
--- a/src/csharp/Grpc.Core/Status.cs
+++ b/src/csharp/Grpc.Core/Status.cs
@@ -50,6 +50,9 @@ namespace Grpc.Core
this.detail = detail;
}
+ /// <summary>
+ /// Gets the gRPC status code. OK indicates success, all other values indicate an error.
+ /// </summary>
public StatusCode StatusCode
{
get
@@ -58,6 +61,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the detail.
+ /// </summary>
public string Detail
{
get
diff --git a/src/csharp/Grpc.Core/Utils/Preconditions.cs b/src/csharp/Grpc.Core/Utils/Preconditions.cs
new file mode 100644
index 0000000000..b17ce42117
--- /dev/null
+++ b/src/csharp/Grpc.Core/Utils/Preconditions.cs
@@ -0,0 +1,113 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Diagnostics;
+
+namespace Grpc.Core.Utils
+{
+ public static class Preconditions
+ {
+ /// <summary>
+ /// Throws ArgumentException if condition is false.
+ /// </summary>
+ public static void CheckArgument(bool condition)
+ {
+ if (!condition)
+ {
+ throw new ArgumentException();
+ }
+ }
+
+ /// <summary>
+ /// Throws ArgumentException with given message if condition is false.
+ /// </summary>
+ public static void CheckArgument(bool condition, string errorMessage)
+ {
+ if (!condition)
+ {
+ throw new ArgumentException(errorMessage);
+ }
+ }
+
+ /// <summary>
+ /// Throws NullReferenceException if reference is null.
+ /// </summary>
+ public static T CheckNotNull<T> (T reference)
+ {
+ if (reference == null)
+ {
+ throw new NullReferenceException();
+ }
+ return reference;
+ }
+
+ /// <summary>
+ /// Throws NullReferenceException with given message if reference is null.
+ /// </summary>
+ public static T CheckNotNull<T> (T reference, string errorMessage)
+ {
+ if (reference == null)
+ {
+ throw new NullReferenceException(errorMessage);
+ }
+ return reference;
+ }
+
+ /// <summary>
+ /// Throws InvalidOperationException if condition is false.
+ /// </summary>
+ public static void CheckState(bool condition)
+ {
+ if (!condition)
+ {
+ throw new InvalidOperationException();
+ }
+ }
+
+ /// <summary>
+ /// Throws InvalidOperationException with given message if condition is false.
+ /// </summary>
+ public static void CheckState(bool condition, string errorMessage)
+ {
+ if (!condition)
+ {
+ throw new InvalidOperationException(errorMessage);
+ }
+ }
+ }
+}
+