aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-18 11:05:45 -0800
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-18 12:51:30 -0800
commit607307d0beca6b3742ba446390603b42f5a57c19 (patch)
tree29ae22f6962985beabc9a7d27e851de1dd694206 /src
parenta96afb013babf5afd8d47b195d616cd03b93d677 (diff)
Cleanup of AsyncCall.cs
Diffstat (limited to 'src')
-rw-r--r--src/csharp/GrpcApiTests/MathClientServerTests.cs18
-rw-r--r--src/csharp/GrpcCore/GrpcEnvironment.cs2
-rw-r--r--src/csharp/GrpcCore/Internal/AsyncCall.cs325
-rw-r--r--src/csharp/GrpcCore/ServerCallHandler.cs11
-rw-r--r--src/csharp/GrpcCoreTests/ClientServerTest.cs23
5 files changed, 203 insertions, 176 deletions
diff --git a/src/csharp/GrpcApiTests/MathClientServerTests.cs b/src/csharp/GrpcApiTests/MathClientServerTests.cs
index bd298b0932..9056142097 100644
--- a/src/csharp/GrpcApiTests/MathClientServerTests.cs
+++ b/src/csharp/GrpcApiTests/MathClientServerTests.cs
@@ -64,6 +64,15 @@ namespace math.Tests
client = MathGrpc.NewStub(channel);
}
+ [TestFixtureTearDown]
+ public void Cleanup()
+ {
+ channel.Dispose();
+
+ server.ShutdownAsync().Wait();
+ GrpcEnvironment.Shutdown();
+ }
+
[Test]
public void Div1()
{
@@ -136,15 +145,6 @@ namespace math.Tests
CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder));
}
-
- [TestFixtureTearDown]
- public void Cleanup()
- {
- channel.Dispose();
-
- server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
- }
}
}
diff --git a/src/csharp/GrpcCore/GrpcEnvironment.cs b/src/csharp/GrpcCore/GrpcEnvironment.cs
index c4f030267d..55a6cac8f6 100644
--- a/src/csharp/GrpcCore/GrpcEnvironment.cs
+++ b/src/csharp/GrpcCore/GrpcEnvironment.cs
@@ -42,7 +42,7 @@ namespace Google.GRPC.Core
/// </summary>
public class GrpcEnvironment
{
- const int THREAD_POOL_SIZE = 1;
+ const int THREAD_POOL_SIZE = 4;
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_init();
diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs
index ae7428978e..ce0ba30d53 100644
--- a/src/csharp/GrpcCore/Internal/AsyncCall.cs
+++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs
@@ -42,15 +42,13 @@ using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
/// <summary>
- /// Handle native call lifecycle and provides convenience methods.
+ /// Handles native call lifecycle and provides convenience methods.
/// </summary>
- internal class AsyncCall<TWrite, TRead> : IDisposable
+ internal class AsyncCall<TWrite, TRead>
{
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
- // TODO: make sure the delegate doesn't get garbage collected while
- // native callbacks are in the completion queue.
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
readonly CompletionCallbackDelegate writeFinishedHandler;
@@ -59,35 +57,44 @@ namespace Google.GRPC.Core.Internal
readonly CompletionCallbackDelegate finishedServersideHandler;
object myLock = new object();
- bool disposed;
+ GCHandle gchandle;
CallSafeHandle call;
+ bool disposed;
bool server;
+
bool started;
bool errorOccured;
-
bool cancelRequested;
+ bool readingDone;
bool halfcloseRequested;
bool halfclosed;
- bool doneWithReading;
- Nullable<Status> finishedStatus;
+ bool finished;
+ // Completion of a pending write if not null.
TaskCompletionSource<object> writeTcs;
+
+ // Completion of a pending read if not null.
TaskCompletionSource<TRead> readTcs;
- TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
- TaskCompletionSource<object> halfcloseTcs = new TaskCompletionSource<object>();
- TaskCompletionSource<Status> finishedTcs = new TaskCompletionSource<Status>();
+ // Completion of a pending halfclose if not null.
+ TaskCompletionSource<object> halfcloseTcs;
+ // Completion of a pending unary response if not null.
TaskCompletionSource<TRead> unaryResponseTcs;
+ // Set after status is received on client. Only used for server streaming and duplex streaming calls.
+ Nullable<Status> finishedStatus;
+ TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
+
+ // For streaming, the reads will be delivered to this observer.
IObserver<TRead> readObserver;
public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
- this.unaryResponseHandler = HandleUnaryResponseCompletion;
+ this.unaryResponseHandler = HandleUnaryResponse;
this.finishedHandler = HandleFinished;
this.writeFinishedHandler = HandleWriteFinished;
this.readFinishedHandler = HandleReadFinished;
@@ -95,46 +102,23 @@ namespace Google.GRPC.Core.Internal
this.finishedServersideHandler = HandleFinishedServerside;
}
- /// <summary>
- /// Initiates reading to given observer.
- /// </summary>
- public void StartReadingToStream(IObserver<TRead> readObserver) {
- lock (myLock)
- {
- CheckStarted();
- if (this.readObserver != null)
- {
- throw new InvalidOperationException("Already registered an observer.");
- }
- this.readObserver = readObserver;
- ReceiveMessageAsync();
- }
- }
-
- public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) {
- lock (myLock)
- {
- this.call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
- }
+ public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
+ {
+ InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
}
public void InitializeServer(CallSafeHandle call)
{
- lock(myLock)
- {
- this.call = call;
- started = true;
- server = true;
- }
+ InitializeInternal(call, true);
}
-
public Task<TRead> UnaryCallAsync(TWrite msg)
{
lock (myLock)
{
started = true;
halfcloseRequested = true;
+ readingDone = true;
// TODO: handle serialization error...
byte[] payload = serializer(msg);
@@ -151,6 +135,7 @@ namespace Google.GRPC.Core.Internal
lock (myLock)
{
started = true;
+ readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TRead>();
call.StartClientStreaming(unaryResponseHandler);
@@ -191,15 +176,43 @@ namespace Google.GRPC.Core.Internal
}
}
- public Task SendMessageAsync(TWrite msg) {
+ public Task ServerSideUnaryRequestCallAsync()
+ {
lock (myLock)
{
+ started = true;
+ call.StartServerSide(finishedServersideHandler);
+ return finishedServersideTcs.Task;
+ }
+ }
+
+ public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
+ {
+ lock (myLock)
+ {
+ started = true;
+ call.StartServerSide(finishedServersideHandler);
+
+ if (this.readObserver != null)
+ {
+ throw new InvalidOperationException("Already registered an observer.");
+ }
+ this.readObserver = readObserver;
+ ReceiveMessageAsync();
+
+ return finishedServersideTcs.Task;
+ }
+ }
+
+ public Task SendMessageAsync(TWrite msg)
+ {
+ lock (myLock)
+ {
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
CheckNoError();
- CheckCancelNotRequested();
- if (halfcloseRequested || halfclosed)
+ if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
@@ -222,18 +235,19 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
CheckNoError();
- CheckCancelNotRequested();
- if (halfcloseRequested || halfclosed)
+ if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartSendCloseFromClient(halfclosedHandler);
+
halfcloseRequested = true;
+ halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
@@ -242,18 +256,18 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
CheckNoError();
- CheckCancelNotRequested();
- if (halfcloseRequested || halfclosed)
+ if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartSendStatusFromServer(status, halfclosedHandler);
halfcloseRequested = true;
+ halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
@@ -262,13 +276,11 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
CheckNoError();
- // TODO: add check for not cancelled?
-
- if (doneWithReading)
+ if (readingDone)
{
throw new InvalidOperationException("Already read the last message.");
}
@@ -285,22 +297,12 @@ namespace Google.GRPC.Core.Internal
}
}
- internal Task StartServerSide()
- {
- lock (myLock)
- {
- call.StartServerSide(finishedServersideHandler);
- return finishedServersideTcs.Task;
- }
- }
-
public void Cancel()
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
-
cancelRequested = true;
}
// grpc_call_cancel is threadsafe
@@ -311,41 +313,23 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
+ CheckNotDisposed();
CheckStarted();
- CheckNotFinished();
-
cancelRequested = true;
}
// grpc_call_cancel_with_status is threadsafe
call.CancelWithStatus(status);
}
-
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected virtual void Dispose(bool disposing)
- {
- if (!disposed)
- {
- if (disposing)
- {
- if (call != null)
- {
- call.Dispose();
- }
- }
- disposed = true;
- }
- }
- private void UpdateErrorOccured(GRPCOpError error)
+ private void InitializeInternal(CallSafeHandle call, bool server)
{
- if (error == GRPCOpError.GRPC_OP_ERROR)
+ lock (myLock)
{
- errorOccured = true;
+ // Make sure this object and the delegated held by it will not be garbage collected
+ // before we release this handle.
+ gchandle = GCHandle.Alloc(this);
+ this.call = call;
+ this.server = server;
}
}
@@ -357,41 +341,46 @@ namespace Google.GRPC.Core.Internal
}
}
- private void CheckNoError()
+ private void CheckNotDisposed()
{
- if (errorOccured)
+ if (disposed)
{
- throw new InvalidOperationException("Error occured when processing call.");
+ throw new InvalidOperationException("Call has already been disposed.");
}
}
- private void CheckNotFinished()
+ private void CheckNoError()
{
- if (finishedStatus.HasValue)
+ if (errorOccured)
{
- throw new InvalidOperationException("Already finished.");
+ throw new InvalidOperationException("Error occured when processing call.");
}
}
- private void CheckCancelNotRequested()
+ private bool ReleaseResourcesIfPossible()
{
- if (cancelRequested)
+ if (!disposed && call != null)
{
- throw new InvalidOperationException("Cancel has been requested.");
+ if (halfclosed && readingDone && finished)
+ {
+ ReleaseResources();
+ return true;
+ }
}
+ return false;
}
- private void DisposeResourcesIfNeeded()
+ private void ReleaseResources()
{
- if (call != null && started && finishedStatus.HasValue)
- {
- // TODO: should we also wait for all the pending events to finish?
-
+ if (call != null) {
call.Dispose();
}
+ gchandle.Free();
+ disposed = true;
}
- private void CompleteStreamObserver(Status status) {
+ private void CompleteStreamObserver(Status status)
+ {
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
// TODO: wrap to handle exceptions;
@@ -402,20 +391,27 @@ namespace Google.GRPC.Core.Internal
}
}
- private void HandleUnaryResponseCompletion(GRPCOpError error, IntPtr batchContextPtr) {
- try {
-
+ /// <summary>
+ /// Handler for unary response completion.
+ /// </summary>
+ private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
TaskCompletionSource<TRead> tcs;
- lock(myLock) {
+ lock(myLock)
+ {
+ finished = true;
+ halfclosed = true;
tcs = unaryResponseTcs;
- }
- // we're done with this call, get rid of the native object.
- call.Dispose();
+ ReleaseResourcesIfPossible();
+ }
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- if (error != GRPCOpError.GRPC_OP_OK) {
+ if (error != GRPCOpError.GRPC_OP_OK)
+ {
tcs.SetException(new RpcException(
new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.")
));
@@ -423,7 +419,8 @@ namespace Google.GRPC.Core.Internal
}
var status = ctx.GetReceivedStatus();
- if (status.StatusCode != StatusCode.GRPC_STATUS_OK) {
+ if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
+ {
tcs.SetException(new RpcException(status));
return;
}
@@ -431,18 +428,20 @@ namespace Google.GRPC.Core.Internal
// TODO: handle deserialize error...
var msg = deserializer(ctx.GetReceivedMessage());
tcs.SetResult(msg);
- } catch(Exception e) {
+ }
+ catch(Exception e)
+ {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
- private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) {
- try {
-
+ private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
TaskCompletionSource<object> oldTcs = null;
lock (myLock)
{
- UpdateErrorOccured(error);
oldTcs = writeTcs;
writeTcs = null;
}
@@ -458,20 +457,25 @@ namespace Google.GRPC.Core.Internal
oldTcs.SetResult(null);
}
- } catch(Exception e) {
+ }
+ catch(Exception e)
+ {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
- private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) {
- try {
+ private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
lock (myLock)
{
- UpdateErrorOccured(error);
halfclosed = true;
+
+ ReleaseResourcesIfPossible();
}
- if (errorOccured)
+ if (error != GRPCOpError.GRPC_OP_OK)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
@@ -480,14 +484,17 @@ namespace Google.GRPC.Core.Internal
{
halfcloseTcs.SetResult(null);
}
- } catch(Exception e) {
+ }
+ catch(Exception e)
+ {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
- private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) {
- try {
-
+ private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var payload = ctx.GetReceivedMessage();
@@ -502,7 +509,7 @@ namespace Google.GRPC.Core.Internal
readTcs = null;
if (payload == null)
{
- doneWithReading = true;
+ readingDone = true;
}
observer = readObserver;
status = finishedStatus;
@@ -515,7 +522,8 @@ namespace Google.GRPC.Core.Internal
// TODO: make sure we deliver reads in the right order.
- if (observer != null) {
+ if (observer != null)
+ {
if (payload != null)
{
// TODO: wrap to handle exceptions
@@ -526,58 +534,81 @@ namespace Google.GRPC.Core.Internal
}
else
{
- if (!server) {
- if (status.HasValue) {
+ if (!server)
+ {
+ if (status.HasValue)
+ {
CompleteStreamObserver(status.Value);
}
- } else {
+ }
+ else
+ {
// TODO: wrap to handle exceptions..
observer.OnCompleted();
}
// TODO: completeStreamObserver serverside...
}
}
- } catch(Exception e) {
+ }
+ catch(Exception e)
+ {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
- private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) {
- try {
+ private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var status = ctx.GetReceivedStatus();
- bool wasDoneWithReading;
+ bool wasReadingDone;
lock (myLock)
{
+ finished = true;
finishedStatus = status;
- DisposeResourcesIfNeeded();
+ wasReadingDone = readingDone;
- wasDoneWithReading = doneWithReading;
+ ReleaseResourcesIfPossible();
}
- if (wasDoneWithReading) {
+ if (wasReadingDone) {
CompleteStreamObserver(status);
}
- } catch(Exception e) {
+ }
+ catch(Exception e)
+ {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
- private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) {
- try {
+ private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
+ {
+ try
+ {
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
+ lock(myLock)
+ {
+ finished = true;
+
+ // TODO: because of the way server calls are implemented, we need to set
+ // reading done to true here. Should be fixed in the future.
+ readingDone = true;
+
+ ReleaseResourcesIfPossible();
+ }
// TODO: handle error ...
finishedServersideTcs.SetResult(null);
- call.Dispose();
-
- } catch(Exception e) {
+ }
+ catch(Exception e)
+ {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs
index 3bc3b15396..73dfa52def 100644
--- a/src/csharp/GrpcCore/ServerCallHandler.cs
+++ b/src/csharp/GrpcCore/ServerCallHandler.cs
@@ -60,7 +60,7 @@ namespace Google.GRPC.Core
asyncCall.InitializeServer(call);
- var finishedTask = asyncCall.StartServerSide();
+ var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
var request = asyncCall.ReceiveMessageAsync().Result;
@@ -91,14 +91,9 @@ namespace Google.GRPC.Core
asyncCall.InitializeServer(call);
- var finishedTask = asyncCall.StartServerSide();
-
var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
var requestObserver = handler(responseObserver);
-
- // feed the requests
- asyncCall.StartReadingToStream(requestObserver);
-
+ var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver);
finishedTask.Wait();
}
}
@@ -114,7 +109,7 @@ namespace Google.GRPC.Core
asyncCall.InitializeServer(call);
- var finishedTask = asyncCall.StartServerSide();
+ var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs
index dd3fc7038e..37d770e0c0 100644
--- a/src/csharp/GrpcCoreTests/ClientServerTest.cs
+++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs
@@ -52,11 +52,21 @@ namespace Google.GRPC.Core.Tests
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
- [Test]
- public void UnaryCall()
+ [TestFixtureSetUp]
+ public void Init()
+ {
+ GrpcEnvironment.Initialize();
+ }
+
+ [TestFixtureTearDown]
+ public void Cleanup()
{
GrpcEnvironment.Initialize();
+ }
+ [Test]
+ public void UnaryCall()
+ {
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
@@ -82,8 +92,6 @@ namespace Google.GRPC.Core.Tests
[Test]
public void UnaryCallPerformance()
{
- GrpcEnvironment.Initialize();
-
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
@@ -107,16 +115,11 @@ namespace Google.GRPC.Core.Tests
}
server.ShutdownAsync().Wait();
-
- GrpcEnvironment.Shutdown();
}
-
[Test]
public void UnknownMethodHandler()
{
- GrpcEnvironment.Initialize();
-
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService").Build());
@@ -137,8 +140,6 @@ namespace Google.GRPC.Core.Tests
}
server.ShutdownAsync().Wait();
-
- GrpcEnvironment.Shutdown();
}
private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {