using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
///
/// Listener for call events that can be delivered from a completion queue.
///
internal interface ICallEventListener {
void OnClientMetadata();
void OnRead(byte[] payload);
void OnWriteAccepted(GRPCOpError error);
void OnFinishAccepted(GRPCOpError error);
// ignore the status on server
void OnFinished(Status status);
}
///
/// Handle native call lifecycle and provides convenience methods.
///
internal class AsyncCall: ICallEventListener, IDisposable
{
readonly Func serializer;
readonly Func deserializer;
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly EventCallbackDelegate callbackHandler;
object myLock = new object();
bool disposed;
CallSafeHandle call;
bool started;
bool errorOccured;
bool cancelRequested;
bool halfcloseRequested;
bool halfclosed;
bool doneWithReading;
Nullable finishedStatus;
TaskCompletionSource writeTcs;
TaskCompletionSource readTcs;
TaskCompletionSource halfcloseTcs = new TaskCompletionSource();
TaskCompletionSource finishedTcs = new TaskCompletionSource();
IObserver readObserver;
public AsyncCall(Func serializer, Func deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
this.callbackHandler = HandleEvent;
}
public Task WriteAsync(TWrite msg)
{
return StartWrite(msg, false).Task;
}
public Task WritesCompletedAsync()
{
WritesDone();
return halfcloseTcs.Task;
}
public Task WriteStatusAsync(Status status)
{
WriteStatus(status);
return halfcloseTcs.Task;
}
public Task ReadAsync()
{
return StartRead().Task;
}
public Task Halfclosed
{
get
{
return halfcloseTcs.Task;
}
}
public Task Finished
{
get
{
return finishedTcs.Task;
}
}
///
/// Initiates reading to given observer.
///
public void StartReadingToStream(IObserver readObserver) {
lock (myLock)
{
CheckStarted();
if (this.readObserver != null)
{
throw new InvalidOperationException("Already registered an observer.");
}
this.readObserver = readObserver;
StartRead();
}
}
public void Initialize(Channel channel, String methodName) {
lock (myLock)
{
this.call = CallSafeHandle.Create(channel.Handle, methodName, channel.Target, Timespec.InfFuture);
}
}
public void InitializeServer(CallSafeHandle call)
{
lock(myLock)
{
this.call = call;
}
}
// Client only
public void Start(bool buffered, CompletionQueueSafeHandle cq)
{
lock (myLock)
{
if (started)
{
throw new InvalidOperationException("Already started.");
}
call.Invoke(cq, buffered, callbackHandler, callbackHandler);
started = true;
}
}
// Server only
public void Accept(CompletionQueueSafeHandle cq)
{
lock (myLock)
{
if (started)
{
throw new InvalidOperationException("Already started.");
}
call.ServerAccept(cq, callbackHandler);
call.ServerEndInitialMetadata(0);
started = true;
}
}
public TaskCompletionSource StartWrite(TWrite msg, bool buffered)
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
{
throw new InvalidOperationException("Already halfclosed.");
}
if (writeTcs != null)
{
throw new InvalidOperationException("Only one write can be pending at a time");
}
// TODO: wrap serialization...
byte[] payload = serializer(msg);
call.StartWrite(payload, buffered, callbackHandler);
writeTcs = new TaskCompletionSource();
return writeTcs;
}
}
// client only
public void WritesDone()
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.WritesDone(callbackHandler);
halfcloseRequested = true;
}
}
// server only
public void WriteStatus(Status status)
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartWriteStatus(status, callbackHandler);
halfcloseRequested = true;
}
}
public TaskCompletionSource StartRead()
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
CheckNoError();
// TODO: add check for not cancelled?
if (doneWithReading)
{
throw new InvalidOperationException("Already read the last message.");
}
if (readTcs != null)
{
throw new InvalidOperationException("Only one read can be pending at a time");
}
call.StartRead(callbackHandler);
readTcs = new TaskCompletionSource();
return readTcs;
}
}
public void Cancel()
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel is threadsafe
call.Cancel();
}
public void CancelWithStatus(Status status)
{
lock (myLock)
{
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel_with_status is threadsafe
call.CancelWithStatus(status);
}
public void OnClientMetadata()
{
// TODO: implement....
}
public void OnRead(byte[] payload)
{
TaskCompletionSource oldTcs = null;
IObserver observer = null;
lock (myLock)
{
oldTcs = readTcs;
readTcs = null;
if (payload == null)
{
doneWithReading = true;
}
observer = readObserver;
}
// TODO: wrap deserialization...
TRead msg = payload != null ? deserializer(payload) : default(TRead);
oldTcs.SetResult(msg);
// TODO: make sure we deliver reads in the right order.
if (observer != null)
{
if (payload != null)
{
// TODO: wrap to handle exceptions
observer.OnNext(msg);
// start a new read
StartRead();
}
else
{
// TODO: wrap to handle exceptions;
observer.OnCompleted();
}
}
}
public void OnWriteAccepted(GRPCOpError error)
{
TaskCompletionSource oldTcs = null;
lock (myLock)
{
UpdateErrorOccured(error);
oldTcs = writeTcs;
writeTcs = null;
}
if (errorOccured)
{
// TODO: use the right type of exception...
oldTcs.SetException(new Exception("Write failed"));
}
else
{
// TODO: where does the continuation run?
oldTcs.SetResult(null);
}
}
public void OnFinishAccepted(GRPCOpError error)
{
lock (myLock)
{
UpdateErrorOccured(error);
halfclosed = true;
}
if (errorOccured)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
}
else
{
halfcloseTcs.SetResult(null);
}
}
public void OnFinished(Status status)
{
lock (myLock)
{
finishedStatus = status;
DisposeResourcesIfNeeded();
}
finishedTcs.SetResult(status);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (call != null)
{
call.Dispose();
}
}
disposed = true;
}
}
private void UpdateErrorOccured(GRPCOpError error)
{
if (error == GRPCOpError.GRPC_OP_ERROR)
{
errorOccured = true;
}
}
private void CheckStarted()
{
if (!started)
{
throw new InvalidOperationException("Call not started");
}
}
private void CheckNoError()
{
if (errorOccured)
{
throw new InvalidOperationException("Error occured when processing call.");
}
}
private void CheckNotFinished()
{
if (finishedStatus.HasValue)
{
throw new InvalidOperationException("Already finished.");
}
}
private void CheckCancelNotRequested()
{
if (cancelRequested)
{
throw new InvalidOperationException("Cancel has been requested.");
}
}
private void DisposeResourcesIfNeeded()
{
if (call != null && started && finishedStatus.HasValue)
{
// TODO: should we also wait for all the pending events to finish?
call.Dispose();
}
}
private void HandleEvent(IntPtr eventPtr) {
try {
var ev = new EventSafeHandleNotOwned(eventPtr);
switch (ev.GetCompletionType())
{
case GRPCCompletionType.GRPC_CLIENT_METADATA_READ:
OnClientMetadata();
break;
case GRPCCompletionType.GRPC_READ:
byte[] payload = ev.GetReadData();
OnRead(payload);
break;
case GRPCCompletionType.GRPC_WRITE_ACCEPTED:
OnWriteAccepted(ev.GetWriteAccepted());
break;
case GRPCCompletionType.GRPC_FINISH_ACCEPTED:
OnFinishAccepted(ev.GetFinishAccepted());
break;
case GRPCCompletionType.GRPC_FINISHED:
OnFinished(ev.GetFinished());
break;
default:
throw new ArgumentException("Unexpected completion type");
}
} catch(Exception e) {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
}
}