aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCall.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs116
1 files changed, 79 insertions, 37 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index bfcb9366a1..0db9d2a515 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
- Channel channel;
+ readonly CallInvocationDetails<TRequest, TResponse> details;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -60,26 +60,19 @@ namespace Grpc.Core.Internal
bool readObserverCompleted; // True if readObserver has already been completed.
- public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
+ public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
- }
-
- public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline)
- {
- this.channel = channel;
- var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, channel.Target, deadline);
- channel.Environment.DebugStats.ActiveClientCalls.Increment();
- InitializeInternal(call);
+ this.details = callDetails;
+ this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
// it is reusing fair amount of code in this class, so we are leaving it here.
- // TODO: for other calls, you need to call Initialize, this methods calls initialize
- // on its own, so there's a usage inconsistency.
/// <summary>
/// Blocking unary request - unary response call.
/// </summary>
- public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline)
+ public TResponse UnaryCall(TRequest msg)
{
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
@@ -89,17 +82,19 @@ namespace Grpc.Core.Internal
lock (myLock)
{
- Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline));
+ Preconditions.CheckState(!started);
started = true;
+ Initialize(cq);
+
halfcloseRequested = true;
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(payload, ctx, metadataArray);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@@ -129,22 +124,24 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - unary response call.
/// </summary>
- public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline)
+ public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
halfcloseRequested = true;
readingDone = true;
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(payload, HandleUnaryResponse, metadataArray);
+ call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@@ -154,17 +151,19 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline)
+ public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
@@ -176,21 +175,23 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
- public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline)
+ public void StartServerStreamingCall(TRequest msg)
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(payload, HandleFinished, metadataArray);
+ call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -199,15 +200,16 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public void StartDuplexStreamingCall(Metadata headers, DateTime deadline)
+ public void StartDuplexStreamingCall()
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ Initialize(details.Channel.Environment.CompletionQueue);
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
@@ -218,9 +220,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -277,6 +279,14 @@ namespace Grpc.Core.Internal
}
}
+ public CallInvocationDetails<TRequest, TResponse> Details
+ {
+ get
+ {
+ return this.details;
+ }
+ }
+
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@@ -309,7 +319,39 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ }
+
+ private void Initialize(CompletionQueueSafeHandle cq)
+ {
+ var propagationToken = details.Options.PropagationToken;
+ var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
+
+ var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ InitializeInternal(call);
+ RegisterCancellationCallback();
+ }
+
+ // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
+ private void RegisterCancellationCallback()
+ {
+ var token = details.Options.CancellationToken;
+ if (token.CanBeCanceled)
+ {
+ token.Register(() => this.Cancel());
+ }
+ }
+
+ /// <summary>
+ /// Gets WriteFlags set in callDetails.Options.WriteOptions
+ /// </summary>
+ private WriteFlags GetWriteFlagsForCall()
+ {
+ var writeOptions = details.Options.WriteOptions;
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
}
/// <summary>