diff options
author | 2017-10-04 16:59:49 +0200 | |
---|---|---|
committer | 2017-10-04 17:07:02 +0200 | |
commit | bb872c02d9c988f80ddb5add8e9291a9766b2c65 (patch) | |
tree | a037aa630c1543cb418c22ad43bd074ef474bef7 /src | |
parent | 8e2e9a6d91b45069651f5331dee9bff757126be8 (diff) |
allow cancelling MoveNext operations
Diffstat (limited to 'src')
-rw-r--r-- | src/csharp/Grpc.Core/IAsyncStreamReader.cs | 5 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ClientResponseStream.cs | 20 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerRequestStream.cs | 11 |
3 files changed, 21 insertions, 15 deletions
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 42bfbb87e0..4a169fcca5 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -41,6 +41,11 @@ namespace Grpc.Core /// (<c>MoveNext</c> will return <c>false</c>) and the <c>CancellationToken</c> /// associated with the call will be cancelled to signal the failure. /// </para> + /// <para> + /// <c>MoveNext()</c> operations can be cancelled via a cancellation token. Cancelling + /// an individual read operation has the same effect as cancelling the entire call + /// (which will also result in the read operation returning prematurely). + /// </para> /// </summary> /// <typeparam name="T">The message type.</typeparam> public interface IAsyncStreamReader<T> : IAsyncEnumerator<T> diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index 851b6ca213..ab649ee766 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -49,19 +49,19 @@ namespace Grpc.Core.Internal public async Task<bool> MoveNext(CancellationToken token) { - if (token != CancellationToken.None) + var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null; + using (cancellationTokenRegistration) { - throw new InvalidOperationException("Cancellation of individual reads is not supported."); - } - var result = await call.ReadMessageAsync().ConfigureAwait(false); - this.current = result; + var result = await call.ReadMessageAsync().ConfigureAwait(false); + this.current = result; - if (result == null) - { - await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); - return false; + if (result == null) + { + await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); + return false; + } + return true; } - return true; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index c65b960afb..058dddb7eb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -49,13 +49,14 @@ namespace Grpc.Core.Internal public async Task<bool> MoveNext(CancellationToken token) { - if (token != CancellationToken.None) + + var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null; + using (cancellationTokenRegistration) { - throw new InvalidOperationException("Cancellation of individual reads is not supported."); + var result = await call.ReadMessageAsync().ConfigureAwait(false); + this.current = result; + return result != null; } - var result = await call.ReadMessageAsync().ConfigureAwait(false); - this.current = result; - return result != null; } public void Dispose() |