diff options
author | 2015-05-18 14:28:22 -0700 | |
---|---|---|
committer | 2015-05-20 10:55:54 -0700 | |
commit | 7ca6179c666273c850e09f5ecbfc757d653c29ef (patch) | |
tree | 4e05e45b188d55415a6bf3f8b8dbf9197df291f5 /src/csharp/Grpc.Core/Internal/ServerRequestStream.cs | |
parent | 04ae923a52e8b2f61866a7e18e240fbb258969e9 (diff) |
Make IAsyncReadStream use IAsyncEnumerator from Ix-Async
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/ServerRequestStream.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerRequestStream.cs | 29 |
1 files changed, 27 insertions, 2 deletions
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index d9ee0c815b..3fccb88abb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + TRequest current; public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call) { this.call = call; } - public Task<TRequest> ReadNext() + public TRequest Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TRequest>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } |