aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-05-18 14:28:22 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-05-20 10:55:54 -0700
commit7ca6179c666273c850e09f5ecbfc757d653c29ef (patch)
tree4e05e45b188d55415a6bf3f8b8dbf9197df291f5 /src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
parent04ae923a52e8b2f61866a7e18e240fbb258969e9 (diff)
Make IAsyncReadStream use IAsyncEnumerator from Ix-Async
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/ServerRequestStream.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs29
1 files changed, 27 insertions, 2 deletions
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index d9ee0c815b..3fccb88abb 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ TRequest current;
public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call)
{
this.call = call;
}
- public Task<TRequest> ReadNext()
+ public TRequest Current
{
+ get
+ {
+ if (current == null)
+ {
+ throw new InvalidOperationException("No current element is available.");
+ }
+ return current;
+ }
+ }
+
+ public async Task<bool> MoveNext(CancellationToken token)
+ {
+ if (token != CancellationToken.None)
+ {
+ throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ }
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
- return taskSource.Task;
+ var result = await taskSource.Task;
+ this.current = result;
+ return result != null;
+ }
+
+ public void Dispose()
+ {
+ // TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}