aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCall.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs102
1 files changed, 64 insertions, 38 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 55351869b5..f549c52876 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -32,12 +32,7 @@
#endregion
using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
using System.Threading.Tasks;
-using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
@@ -57,9 +52,11 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
+ // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+ // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
// Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
@@ -67,7 +64,7 @@ namespace Grpc.Core.Internal
ClientSideStatus? finishedStatus;
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
- : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
@@ -144,7 +141,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -171,7 +168,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
readingDone = true;
@@ -195,7 +192,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
@@ -220,7 +217,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
@@ -232,11 +229,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming request. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -250,29 +246,32 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendCloseFromClientAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: true);
+ GrpcPreconditions.CheckState(started);
- if (!disposed && !finished)
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
{
- call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
+ return earlyResult;
}
- else
+
+ if (disposed || finished)
{
// In case the call has already been finished by the serverside,
- // the halfclose has already been done implicitly, so we only
- // emit the notification for the completion delegate.
- Task.Run(() => HandleSendCloseFromClientFinished(true));
+ // the halfclose has already been done implicitly, so just return
+ // completed task here.
+ halfcloseRequested = true;
+ return Task.FromResult<object>(null);
}
+ call.StartSendCloseFromClient(HandleSendFinished);
halfcloseRequested = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -342,6 +341,45 @@ namespace Grpc.Core.Internal
get { return true; }
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
+
+ if (finishedStatus.HasValue)
+ {
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ // Writing after the call has finished is not a programming error because server can close
+ // the call anytime, so don't throw directly, but let the write task finish with an error.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetException(new RpcException(finishedStatus.Value.Status));
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
+ private Task CheckSendPreconditionsClientSide()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
+
+ if (cancelRequested)
+ {
+ // Return a cancelled task.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetCanceled();
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
private void Initialize(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
@@ -368,7 +406,7 @@ namespace Grpc.Core.Internal
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
- var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+ var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;
@@ -400,6 +438,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
+ // TODO(jtattermusch): handle success==false
responseHeadersTcs.SetResult(responseHeaders);
}
@@ -443,19 +482,6 @@ namespace Grpc.Core.Internal
}
}
- protected override void CheckSendingAllowed(bool allowFinished)
- {
- base.CheckSendingAllowed(true);
-
- // throwing RpcException if we already received status on client
- // side makes the most sense.
- // Note that this throws even for StatusCode.OK.
- if (!allowFinished && finishedStatus.HasValue)
- {
- throw new RpcException(finishedStatus.Value.Status);
- }
- }
-
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>