using System; using System.Threading; using System.Threading.Tasks; using Google.GRPC.Core.Internal; namespace Google.GRPC.Core { // NOTE: this class is work-in-progress /// /// Helper methods for generated stubs to make RPC calls. /// public static class Calls { public static TResponse BlockingUnaryCall(Call call, TRequest req, CancellationToken token) { //TODO: implement this in real synchronous style once new GRPC C core API is available. return AsyncUnaryCall(call, req, token).Result; } public static async Task AsyncUnaryCall(Call call, TRequest req, CancellationToken token) { var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); asyncCall.Initialize(call.Channel, call.MethodName); asyncCall.Start(false, GetCompletionQueue()); await asyncCall.WriteAsync(req); await asyncCall.WritesCompletedAsync(); TResponse response = await asyncCall.ReadAsync(); Status status = await asyncCall.Finished; if (status.StatusCode != StatusCode.GRPC_STATUS_OK) { throw new RpcException(status); } return response; } public static async Task AsyncServerStreamingCall(Call call, TRequest req, IObserver outputs, CancellationToken token) { var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); asyncCall.Initialize(call.Channel, call.MethodName); asyncCall.Start(false, GetCompletionQueue()); asyncCall.StartReadingToStream(outputs); await asyncCall.WriteAsync(req); await asyncCall.WritesCompletedAsync(); } public static ClientStreamingAsyncResult AsyncClientStreamingCall(Call call, CancellationToken token) { var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); asyncCall.Initialize(call.Channel, call.MethodName); asyncCall.Start(false, GetCompletionQueue()); var task = asyncCall.ReadAsync(); var inputs = new StreamingInputObserver(asyncCall); return new ClientStreamingAsyncResult(task, inputs); } public static TResponse BlockingClientStreamingCall(Call call, IObservable inputs, CancellationToken token) { throw new NotImplementedException(); } public static IObserver DuplexStreamingCall(Call call, IObserver outputs, CancellationToken token) { var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); asyncCall.Initialize(call.Channel, call.MethodName); asyncCall.Start(false, GetCompletionQueue()); asyncCall.StartReadingToStream(outputs); var inputs = new StreamingInputObserver(asyncCall); return inputs; } private static CompletionQueueSafeHandle GetCompletionQueue() { return GrpcEnvironment.ThreadPool.CompletionQueue; } } }