aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2017-10-04 16:59:49 +0200
committerGravatar Jan Tattermusch <jtattermusch@google.com>2017-10-04 17:07:02 +0200
commitbb872c02d9c988f80ddb5add8e9291a9766b2c65 (patch)
treea037aa630c1543cb418c22ad43bd074ef474bef7 /src
parent8e2e9a6d91b45069651f5331dee9bff757126be8 (diff)
allow cancelling MoveNext operations
Diffstat (limited to 'src')
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs11
3 files changed, 21 insertions, 15 deletions
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 42bfbb87e0..4a169fcca5 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -41,6 +41,11 @@ namespace Grpc.Core
/// (<c>MoveNext</c> will return <c>false</c>) and the <c>CancellationToken</c>
/// associated with the call will be cancelled to signal the failure.
/// </para>
+ /// <para>
+ /// <c>MoveNext()</c> operations can be cancelled via a cancellation token. Cancelling
+ /// an individual read operation has the same effect as cancelling the entire call
+ /// (which will also result in the read operation returning prematurely).
+ /// </para>
/// </summary>
/// <typeparam name="T">The message type.</typeparam>
public interface IAsyncStreamReader<T> : IAsyncEnumerator<T>
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index 851b6ca213..ab649ee766 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -49,19 +49,19 @@ namespace Grpc.Core.Internal
public async Task<bool> MoveNext(CancellationToken token)
{
- if (token != CancellationToken.None)
+ var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null;
+ using (cancellationTokenRegistration)
{
- throw new InvalidOperationException("Cancellation of individual reads is not supported.");
- }
- var result = await call.ReadMessageAsync().ConfigureAwait(false);
- this.current = result;
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
+ this.current = result;
- if (result == null)
- {
- await call.StreamingResponseCallFinishedTask.ConfigureAwait(false);
- return false;
+ if (result == null)
+ {
+ await call.StreamingResponseCallFinishedTask.ConfigureAwait(false);
+ return false;
+ }
+ return true;
}
- return true;
}
public void Dispose()
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index c65b960afb..058dddb7eb 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -49,13 +49,14 @@ namespace Grpc.Core.Internal
public async Task<bool> MoveNext(CancellationToken token)
{
- if (token != CancellationToken.None)
+
+ var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null;
+ using (cancellationTokenRegistration)
{
- throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
+ this.current = result;
+ return result != null;
}
- var result = await call.ReadMessageAsync().ConfigureAwait(false);
- this.current = result;
- return result != null;
}
public void Dispose()