aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-07-23 17:02:12 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-07-23 18:27:16 -0700
commit0846b68eb7bc6f6f63770880a2d48e787274ef67 (patch)
treecb5dd64ed0d3d2c0c46ba6c1453f0eccc26fec98 /src/csharp/Grpc.Core
parent74529562e3227517f952d5b8d527f0e7b616db1c (diff)
add Timeout support and tests
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r--src/csharp/Grpc.Core/Call.cs15
-rw-r--r--src/csharp/Grpc.Core/Calls.cs18
-rw-r--r--src/csharp/Grpc.Core/ClientBase.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs16
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs12
8 files changed, 71 insertions, 36 deletions
diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs
index 37b452f020..94c5e26082 100644
--- a/src/csharp/Grpc.Core/Call.cs
+++ b/src/csharp/Grpc.Core/Call.cs
@@ -47,14 +47,21 @@ namespace Grpc.Core
readonly Marshaller<TResponse> responseMarshaller;
readonly Channel channel;
readonly Metadata headers;
+ readonly DateTime deadline;
public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers)
+ : this(serviceName, method, channel, headers, DateTime.MaxValue)
+ {
+ }
+
+ public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers, DateTime deadline)
{
this.name = method.GetFullName(serviceName);
this.requestMarshaller = method.RequestMarshaller;
this.responseMarshaller = method.ResponseMarshaller;
this.channel = Preconditions.CheckNotNull(channel);
this.headers = Preconditions.CheckNotNull(headers);
+ this.deadline = deadline;
}
public Channel Channel
@@ -87,6 +94,14 @@ namespace Grpc.Core
}
}
+ public DateTime Deadline
+ {
+ get
+ {
+ return this.deadline;
+ }
+ }
+
public Marshaller<TRequest> RequestMarshaller
{
get
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index 359fe53741..054fc27491 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -50,7 +50,7 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
// TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts.
RegisterCancellationCallback(asyncCall, token);
- return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers);
+ return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers, call.Deadline);
}
public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
@@ -58,8 +58,8 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
- var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
+ var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers, call.Deadline);
RegisterCancellationCallback(asyncCall, token);
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
@@ -69,8 +69,8 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
- asyncCall.StartServerStreamingCall(req, call.Headers);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
+ asyncCall.StartServerStreamingCall(req, call.Headers, call.Deadline);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
@@ -81,8 +81,8 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
- var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
+ var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers, call.Deadline);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
@@ -93,8 +93,8 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
- asyncCall.StartDuplexStreamingCall(call.Headers);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline));
+ asyncCall.StartDuplexStreamingCall(call.Headers, call.Deadline);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs
index a099f96aea..fd3473128a 100644
--- a/src/csharp/Grpc.Core/ClientBase.cs
+++ b/src/csharp/Grpc.Core/ClientBase.cs
@@ -76,7 +76,7 @@ namespace Grpc.Core
/// <summary>
/// Creates a new call to given method.
/// </summary>
- protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata)
+ protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata, DateTime? deadline)
where TRequest : class
where TResponse : class
{
@@ -87,8 +87,8 @@ namespace Grpc.Core
interceptor(metadata);
metadata.Freeze();
}
- metadata = metadata ?? Metadata.Empty;
- return new Call<TRequest, TResponse>(serviceName, method, channel, metadata);
+ return new Call<TRequest, TResponse>(serviceName, method, channel,
+ metadata ?? Metadata.Empty, deadline ?? DateTime.MaxValue);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index f983dbb759..51022ac34f 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -61,10 +61,10 @@ namespace Grpc.Core.Internal
{
}
- public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
+ public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline)
{
this.channel = channel;
- var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture);
+ var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, channel.Target, deadline);
channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
}
@@ -76,7 +76,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Blocking unary request - unary response call.
/// </summary>
- public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers)
+ public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline)
{
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
@@ -86,7 +86,7 @@ namespace Grpc.Core.Internal
lock (myLock)
{
- Initialize(channel, cq, methodName);
+ Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline));
started = true;
halfcloseRequested = true;
readingDone = true;
@@ -126,7 +126,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - unary response call.
/// </summary>
- public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers)
+ public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline)
{
lock (myLock)
{
@@ -151,7 +151,7 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public Task<TResponse> ClientStreamingCallAsync(Metadata headers)
+ public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline)
{
lock (myLock)
{
@@ -173,7 +173,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
- public void StartServerStreamingCall(TRequest msg, Metadata headers)
+ public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline)
{
lock (myLock)
{
@@ -196,7 +196,7 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public void StartDuplexStreamingCall(Metadata headers)
+ public void StartDuplexStreamingCall(Metadata headers, DateTime deadline)
{
lock (myLock)
{
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index f809f4a84c..702f1ce9e7 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -48,6 +48,7 @@ namespace Grpc.Core.Internal
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
+ readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly GrpcEnvironment environment;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
@@ -118,6 +119,18 @@ namespace Grpc.Core.Internal
}
}
+ /// <summary>
+ /// Gets cancellation token that gets cancelled once close completion
+ /// is received and the cancelled flag is set.
+ /// </summary>
+ public CancellationToken CancellationToken
+ {
+ get
+ {
+ return cancellationTokenSource.Token;
+ }
+ }
+
protected override void OnReleaseResources()
{
environment.DebugStats.ActiveServerCalls.Decrement();
@@ -138,6 +151,8 @@ namespace Grpc.Core.Internal
{
// Once we cancel, we don't have to care that much
// about reads and writes.
+
+ // TODO(jtattermusch): is this still necessary?
Cancel();
}
@@ -145,6 +160,11 @@ namespace Grpc.Core.Internal
}
// TODO(jtattermusch): handle error
+ if (cancelled)
+ {
+ cancellationTokenSource.Cancel();
+ }
+
finishedServersideTcs.SetResult(null);
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 19dbb83f24..04e35a5efd 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -46,9 +46,6 @@ namespace Grpc.Core.Internal
CompletionRegistry completionRegistry;
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
-
- [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
@@ -98,13 +95,6 @@ namespace Grpc.Core.Internal
{
}
- public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
- {
- var result = grpcsharp_channel_create_call(channel, cq, method, host, deadline);
- result.SetCompletionRegistry(registry);
- return result;
- }
-
public void SetCompletionRegistry(CompletionRegistry completionRegistry)
{
this.completionRegistry = completionRegistry;
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index f046f4c6d0..53c506c872 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -47,6 +47,9 @@ namespace Grpc.Core.Internal
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
+ static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_channel_destroy(IntPtr channel);
private ChannelSafeHandle()
@@ -63,6 +66,13 @@ namespace Grpc.Core.Internal
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
+ public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
+ {
+ var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
+ result.SetCompletionRegistry(registry);
+ return result;
+ }
+
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 3680b1e791..9bbcb17266 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -72,7 +72,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -126,7 +126,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -180,7 +180,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@@ -238,7 +238,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@@ -295,11 +295,11 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc)
+ public static ServerCallContext NewContext(ServerRpcNew newRpc, CancellationToken cancellationToken)
{
return new ServerCallContext(
newRpc.Method, newRpc.Host, newRpc.Deadline.ToDateTime(),
- newRpc.RequestMetadata, CancellationToken.None);
+ newRpc.RequestMetadata, cancellationToken);
}
}
}