aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-05-07 17:20:24 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-05-07 17:20:24 -0700
commit4251aad3eb7bbc9220c8b4a8064da82416409ab3 (patch)
treecfbf2db24e0f40ab79fb535923873130b944c4b3 /src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
parentd4f10c03209e731cf5d1793a23d3823ea2b05493 (diff)
parent442918f225797e28e1bfee023046af4f441a50ae (diff)
Merge branch 'master' of github.com:grpc/grpc into standalone_benchmarks
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCallBase.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs135
1 files changed, 62 insertions, 73 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 15b0cfe249..fc5bee40e2 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
/// <summary>
/// Base for handling both client side and server side calls.
- /// Handles native call lifecycle and provides convenience methods.
+ /// Manages native call lifecycle and provides convenience methods.
/// </summary>
internal abstract class AsyncCallBase<TWrite, TRead>
{
@@ -65,16 +65,14 @@ namespace Grpc.Core.Internal
protected bool errorOccured;
protected bool cancelRequested;
- protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
- protected bool readPending; // True if there is a read in progress.
+ protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
+ protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
+
protected bool readingDone;
protected bool halfcloseRequested;
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
- // Streaming reads will be delivered to this observer. For a call that only does unary read it may remain null.
- protected IObserver<TRead> readObserver;
-
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@@ -131,10 +129,10 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Initiates sending a message. Only once send operation can be active at a time.
+ /// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate completionDelegate)
+ protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@@ -149,31 +147,29 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Requests receiving a next message.
+ /// Initiates reading a message. Only one read operation can be active at a time.
+ /// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartReceiveMessage()
+ protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
{
lock (myLock)
{
- Preconditions.CheckState(started);
- Preconditions.CheckState(!disposed);
- Preconditions.CheckState(!errorOccured);
-
- Preconditions.CheckState(!readingDone);
- Preconditions.CheckState(!readPending);
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ CheckReadingAllowed();
call.StartReceiveMessage(readFinishedHandler);
- readPending = true;
+ readCompletionDelegate = completionDelegate;
}
}
+ // TODO(jtattermusch): find more fitting name for this method.
/// <summary>
/// Default behavior just completes the read observer, but more sofisticated behavior might be required
/// by subclasses.
/// </summary>
- protected virtual void CompleteReadObserver()
+ protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate)
{
- FireReadObserverOnCompleted();
+ FireCompletion(completionDelegate, default(TRead), null);
}
/// <summary>
@@ -184,7 +180,8 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
- if (halfclosed && readingDone && finished)
+ bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null);
+ if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
return true;
@@ -195,6 +192,7 @@ namespace Grpc.Core.Internal
private void ReleaseResources()
{
+ OnReleaseResources();
if (call != null)
{
call.Dispose();
@@ -203,16 +201,39 @@ namespace Grpc.Core.Internal
disposed = true;
}
+ protected virtual void OnReleaseResources()
+ {
+ }
+
protected void CheckSendingAllowed()
{
Preconditions.CheckState(started);
- Preconditions.CheckState(!disposed);
Preconditions.CheckState(!errorOccured);
+ CheckNotCancelled();
+ Preconditions.CheckState(!disposed);
Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
+ protected void CheckReadingAllowed()
+ {
+ Preconditions.CheckState(started);
+ Preconditions.CheckState(!disposed);
+ Preconditions.CheckState(!errorOccured);
+
+ Preconditions.CheckState(!readingDone, "Stream has already been closed.");
+ Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
+ }
+
+ protected void CheckNotCancelled()
+ {
+ if (cancelRequested)
+ {
+ throw new OperationCanceledException("Remote call has been cancelled.");
+ }
+ }
+
protected byte[] UnsafeSerialize(TWrite msg)
{
return serializer(msg);
@@ -248,47 +269,11 @@ namespace Grpc.Core.Internal
}
}
- protected void FireReadObserverOnNext(TRead value)
+ protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
{
try
{
- readObserver.OnNext(value);
- }
- catch (Exception e)
- {
- Console.WriteLine("Exception occured while invoking readObserver.OnNext: " + e);
- }
- }
-
- protected void FireReadObserverOnCompleted()
- {
- try
- {
- readObserver.OnCompleted();
- }
- catch (Exception e)
- {
- Console.WriteLine("Exception occured while invoking readObserver.OnCompleted: " + e);
- }
- }
-
- protected void FireReadObserverOnError(Exception error)
- {
- try
- {
- readObserver.OnError(error);
- }
- catch (Exception e)
- {
- Console.WriteLine("Exception occured while invoking readObserver.OnError: " + e);
- }
- }
-
- protected void FireCompletion(AsyncCompletionDelegate completionDelegate, Exception error)
- {
- try
- {
- completionDelegate(error);
+ completionDelegate(value, error);
}
catch (Exception e)
{
@@ -322,7 +307,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleSendFinished(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
- AsyncCompletionDelegate origCompletionDelegate = null;
+ AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
@@ -333,11 +318,11 @@ namespace Grpc.Core.Internal
if (wasError)
{
- FireCompletion(origCompletionDelegate, new OperationFailedException("Send failed"));
+ FireCompletion(origCompletionDelegate, null, new OperationFailedException("Send failed"));
}
else
{
- FireCompletion(origCompletionDelegate, null);
+ FireCompletion(origCompletionDelegate, null, null);
}
}
@@ -346,7 +331,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleHalfclosed(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
- AsyncCompletionDelegate origCompletionDelegate = null;
+ AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
halfclosed = true;
@@ -358,11 +343,11 @@ namespace Grpc.Core.Internal
if (wasError)
{
- FireCompletion(origCompletionDelegate, new OperationFailedException("Halfclose failed"));
+ FireCompletion(origCompletionDelegate, null, new OperationFailedException("Halfclose failed"));
}
else
{
- FireCompletion(origCompletionDelegate, null);
+ FireCompletion(origCompletionDelegate, null, null);
}
}
@@ -373,11 +358,19 @@ namespace Grpc.Core.Internal
{
var payload = ctx.GetReceivedMessage();
+ AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
lock (myLock)
{
- readPending = false;
- if (payload == null)
+ origCompletionDelegate = readCompletionDelegate;
+ if (payload != null)
{
+ readCompletionDelegate = null;
+ }
+ else
+ {
+ // This was the last read. Keeping the readCompletionDelegate
+ // to be either fired by this handler or by client-side finished
+ // handler.
readingDone = true;
}
@@ -392,15 +385,11 @@ namespace Grpc.Core.Internal
TRead msg;
TryDeserialize(payload, out msg);
- FireReadObserverOnNext(msg);
-
- // Start a new read. The current one has already been delivered,
- // so correct ordering of reads is assured.
- StartReceiveMessage();
+ FireCompletion(origCompletionDelegate, msg, null);
}
else
{
- CompleteReadObserver();
+ ProcessLastRead(origCompletionDelegate);
}
}
}