diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
6 files changed, 72 insertions, 24 deletions
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 1697058732..58f493463b 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -38,8 +38,6 @@ namespace Grpc.Core.Internal /// Writes requests asynchronously to an underlying AsyncCall object. /// </summary> internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest> - where TRequest : class - where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; @@ -48,14 +46,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TRequest message) + public Task WriteAsync(TRequest message) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task Close() + public Task CompleteAsync() { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendCloseFromClient(taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index b2378cade6..6c44521038 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; + TResponse current; public ClientResponseStream(AsyncCall<TRequest, TResponse> call) { this.call = call; } - public Task<TResponse> ReadNext() + public TResponse Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TResponse>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 95d8e97869..f494d9e0ff 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -71,12 +72,13 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context var result = await handler(context, request); - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (Exception e) { @@ -85,7 +87,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -122,9 +124,10 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context await handler(context, request, responseStream); @@ -137,7 +140,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -178,7 +181,7 @@ namespace Grpc.Core.Internal var result = await handler(context, requestStream); try { - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (OperationCanceledException) { @@ -193,7 +196,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -240,7 +243,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -263,7 +266,7 @@ namespace Grpc.Core.Internal var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall); var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method.")); + await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed. await requestStream.ToList(); await finishedTask; diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index d9ee0c815b..3fccb88abb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + TRequest current; public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call) { this.call = call; } - public Task<TRequest> ReadNext() + public TRequest Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TRequest>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index da688d504f..a2d77dd5b7 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -49,14 +49,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TResponse message) + public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task WriteStatus(Status status) + public Task WriteStatusAsync(Status status) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendStatusFromServer(status, taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 731ea2be81..7a1c016ae2 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -39,9 +39,6 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - //internal delegate void ServerShutdownCallbackDelegate(bool success); - /// <summary> /// grpc_server from grpc/grpc.h /// </summary> |