aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-05-04 09:20:43 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-05-04 14:56:51 -0700
commit8c2dd9d864cb874f8fbe577faf8c3f72e6a077e4 (patch)
treefc4141f5561aa05f2d0bcce70f9f98296731a1b4 /src/csharp/Grpc.Core
parent1b54fcf31b32ae8c7f07ae733e781c184791a7c2 (diff)
Fixes for C# cancellation support
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs11
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs51
-rw-r--r--src/csharp/Grpc.Core/Status.cs5
5 files changed, 79 insertions, 13 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index b911cdcc87..7cf0f6ff84 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -180,7 +180,8 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
- if (halfclosed && readingDone && finished)
+ bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null);
+ if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
return true;
@@ -207,8 +208,9 @@ namespace Grpc.Core.Internal
protected void CheckSendingAllowed()
{
Preconditions.CheckState(started);
- Preconditions.CheckState(!disposed);
Preconditions.CheckState(!errorOccured);
+ CheckNotCancelled();
+ Preconditions.CheckState(!disposed);
Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
@@ -221,7 +223,14 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!readingDone, "Stream has already been closed.");
- Preconditions.CheckState(readCompletionDelegate == null, "Only one write can be pending at a time");
+ Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
+ }
+
+ protected void CheckNotCancelled() {
+ if (cancelRequested)
+ {
+ throw new OperationCanceledException("Remote call has been cancelled.");
+ }
}
protected byte[] UnsafeSerialize(TWrite msg)
@@ -292,6 +301,8 @@ namespace Grpc.Core.Internal
});
}
+
+
/// <summary>
/// Handles send completion.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 4775f2d07b..3c66c67dcc 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -123,18 +123,23 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
+ bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
+
lock (myLock)
{
finished = true;
- if (readCompletionDelegate == null)
+ if (cancelled)
{
- // allow disposal of native call
- readingDone = true;
+ // Once we cancel, we don't have to care that much
+ // about reads and writes.
+ Cancel();
}
ReleaseResourcesIfPossible();
}
+ // TODO(jtattermusch): check if call was cancelled.
+
// TODO: handle error ...
finishedServersideTcs.SetResult(null);
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
index 3c54753756..b562abaa7a 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
@@ -61,6 +61,9 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx);
+
public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
{
SetHandle(handle);
@@ -94,5 +97,10 @@ namespace Grpc.Core.Internal
{
return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this));
}
+
+ public bool GetReceivedCloseOnServerCancelled()
+ {
+ return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
+ }
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 0416eada34..01b2a11369 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -80,7 +80,14 @@ namespace Grpc.Core.Internal
Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e);
}
- await responseStream.WriteStatus(status);
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
await finishedTask;
}
}
@@ -121,7 +128,15 @@ namespace Grpc.Core.Internal
Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e);
}
- await responseStream.WriteStatus(status);
+
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
await finishedTask;
}
}
@@ -151,15 +166,30 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
- var result = await handler(requestStream);
- await responseStream.Write(result);
- }
+ var result = await handler(requestStream);
+ try
+ {
+ await responseStream.Write(result);
+ }
+ catch (OperationCanceledException)
+ {
+ status = Status.DefaultCancelled;
+ }
+ }
catch (Exception e)
{
Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e);
}
- await responseStream.WriteStatus(status);
+
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
await finishedTask;
}
}
@@ -196,7 +226,14 @@ namespace Grpc.Core.Internal
Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e);
}
- await responseStream.WriteStatus(status);
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
await finishedTask;
}
}
diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs
index b588170694..754f6cb3ca 100644
--- a/src/csharp/Grpc.Core/Status.cs
+++ b/src/csharp/Grpc.Core/Status.cs
@@ -44,6 +44,11 @@ namespace Grpc.Core
/// </summary>
public static readonly Status DefaultSuccess = new Status(StatusCode.OK, "");
+ /// <summary>
+ /// Default result of a cancelled RPC. StatusCode=Cancelled, empty details message.
+ /// </summary>
+ public static readonly Status DefaultCancelled = new Status(StatusCode.Cancelled, "");
+
readonly StatusCode statusCode;
readonly string detail;