using System;
using System.Threading;
using System.Threading.Tasks;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
{
// NOTE: this class is work-in-progress
///
/// Helper methods for generated stubs to make RPC calls.
///
public static class Calls
{
public static TResponse BlockingUnaryCall(Call call, TRequest req, CancellationToken token)
{
//TODO: implement this in real synchronous style once new GRPC C core API is available.
return AsyncUnaryCall(call, req, token).Result;
}
public static async Task AsyncUnaryCall(Call call, TRequest req, CancellationToken token)
{
var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
await asyncCall.WriteAsync(req);
await asyncCall.WritesCompletedAsync();
TResponse response = await asyncCall.ReadAsync();
Status status = await asyncCall.Finished;
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
throw new RpcException(status);
}
return response;
}
public static async Task AsyncServerStreamingCall(Call call, TRequest req, IObserver outputs, CancellationToken token)
{
var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
asyncCall.StartReadingToStream(outputs);
await asyncCall.WriteAsync(req);
await asyncCall.WritesCompletedAsync();
}
public static ClientStreamingAsyncResult AsyncClientStreamingCall(Call call, CancellationToken token)
{
var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
var task = asyncCall.ReadAsync();
var inputs = new StreamingInputObserver(asyncCall);
return new ClientStreamingAsyncResult(task, inputs);
}
public static TResponse BlockingClientStreamingCall(Call call, IObservable inputs, CancellationToken token)
{
throw new NotImplementedException();
}
public static IObserver DuplexStreamingCall(Call call, IObserver outputs, CancellationToken token)
{
var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
asyncCall.StartReadingToStream(outputs);
var inputs = new StreamingInputObserver(asyncCall);
return inputs;
}
private static CompletionQueueSafeHandle GetCompletionQueue() {
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
}
}