aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCall.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs67
1 files changed, 42 insertions, 25 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 414b5c4282..c26b0773ba 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
- readonly CallInvocationDetails<TRequest, TResponse> callDetails;
+ readonly CallInvocationDetails<TRequest, TResponse> details;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
- this.callDetails = callDetails;
+ this.details = callDetails;
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
@@ -89,11 +89,11 @@ namespace Grpc.Core.Internal
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(payload, ctx, metadataArray);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@@ -130,7 +130,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -138,9 +138,9 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(payload, HandleUnaryResponse, metadataArray);
+ call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@@ -157,14 +157,14 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartClientStreaming(HandleUnaryResponse, metadataArray);
+ call.StartClientStreaming(HandleUnaryResponse, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
@@ -181,16 +181,16 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(payload, HandleFinished, metadataArray);
+ call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -206,11 +206,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartDuplexStreaming(HandleFinished, metadataArray);
+ call.StartDuplexStreaming(HandleFinished, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -219,9 +219,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -238,14 +238,14 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendCloseFromClient(WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendCloseFromClient(HandleHalfclosed);
+ call.StartSendCloseFromClient(HandleHalfclosed, writeFlags);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
@@ -278,6 +278,14 @@ namespace Grpc.Core.Internal
}
}
+ public CallInvocationDetails<TRequest, TResponse> Details
+ {
+ get
+ {
+ return this.details;
+ }
+ }
+
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@@ -310,14 +318,14 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
private void Initialize(CompletionQueueSafeHandle cq)
{
- var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq,
- callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline));
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
RegisterCancellationCallback();
}
@@ -325,7 +333,7 @@ namespace Grpc.Core.Internal
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
- var token = callDetails.Options.CancellationToken;
+ var token = details.Options.CancellationToken;
if (token.CanBeCanceled)
{
token.Register(() => this.Cancel());
@@ -333,6 +341,15 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Gets WriteFlags set in callDetails.Options.WriteOptions
+ /// </summary>
+ private WriteFlags GetWriteFlagsForCall()
+ {
+ var writeOptions = details.Options.WriteOptions;
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
+ /// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)