aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-05 02:25:33 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-05 14:32:10 -0700
commit542e21cbe08191f6709d0dc6e44367c231fb3072 (patch)
tree330ff56575e41ae6485cfce0998637625ac57489
parent5e10f18376e07fd43c12bc2a14ccf3a0e0682660 (diff)
refactoring AsyncCall
-rw-r--r--src/csharp/Grpc.Core.Tests/TimeoutsTest.cs3
-rw-r--r--src/csharp/Grpc.Core/Call.cs1
-rw-r--r--src/csharp/Grpc.Core/Calls.cs29
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs61
4 files changed, 52 insertions, 42 deletions
diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
index 2dea8d06e1..f90a46368c 100644
--- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
+++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
@@ -126,8 +126,7 @@ namespace Grpc.Core.Tests
[Test]
public void DeadlineInThePast()
{
- var deadline = DateTime.MinValue;
- var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext());
+ var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext(deadline: DateTime.MinValue));
try
{
diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs
index f9d1fde548..577c17b931 100644
--- a/src/csharp/Grpc.Core/Call.cs
+++ b/src/csharp/Grpc.Core/Call.cs
@@ -53,6 +53,7 @@ namespace Grpc.Core
this.name = method.GetFullName(serviceName);
this.requestMarshaller = method.RequestMarshaller;
this.responseMarshaller = method.ResponseMarshaller;
+ this.channel = channel;
this.context = context;
}
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index f3c363bda2..ef6636587e 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -47,19 +47,20 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
+ call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
// TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts.
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
- return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Context.Headers, call.Context.Deadline);
+ return asyncCall.UnaryCall(req);
}
public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline));
- var asyncResult = asyncCall.UnaryCallAsync(req, call.Context.Headers, call.Context.Deadline);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
+ call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
+ var asyncResult = asyncCall.UnaryCallAsync(req);
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
@@ -68,9 +69,9 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline));
- asyncCall.StartServerStreamingCall(req, call.Context.Headers, call.Context.Deadline);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
+ call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
+ asyncCall.StartServerStreamingCall(req);
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
@@ -80,9 +81,9 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline));
- var resultTask = asyncCall.ClientStreamingCallAsync(call.Context.Headers, call.Context.Deadline);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
+ call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
+ var resultTask = asyncCall.ClientStreamingCallAsync();
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
@@ -92,9 +93,9 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
- var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline));
- asyncCall.StartDuplexStreamingCall(call.Context.Headers, call.Context.Deadline);
+ var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context,
+ call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
+ asyncCall.StartDuplexStreamingCall();
RegisterCancellationCallback(asyncCall, call.Context.CancellationToken);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index f84c4b4633..ff3e99d30d 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,7 +50,10 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
- Channel channel;
+ readonly Channel channel;
+ readonly string method;
+ readonly string host;
+ readonly CallContext context;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -60,26 +63,20 @@ namespace Grpc.Core.Internal
bool readObserverCompleted; // True if readObserver has already been completed.
- public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
- {
- }
-
- public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline)
+ public AsyncCall(Channel channel, string method, string host, CallContext context, Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
this.channel = channel;
- var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, methodName, null, deadline);
- channel.Environment.DebugStats.ActiveClientCalls.Increment();
- InitializeInternal(call);
+ this.method = Preconditions.CheckNotNull(method);
+ this.host = host; // null host means default host will be used by C-core.
+ this.context = context;
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
// it is reusing fair amount of code in this class, so we are leaving it here.
- // TODO: for other calls, you need to call Initialize, this methods calls initialize
- // on its own, so there's a usage inconsistency.
/// <summary>
/// Blocking unary request - unary response call.
/// </summary>
- public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline)
+ public TResponse UnaryCall(TRequest msg)
{
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
@@ -89,13 +86,14 @@ namespace Grpc.Core.Internal
lock (myLock)
{
- Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline));
+ Preconditions.CheckState(!started);
+ Initialize(cq);
started = true;
halfcloseRequested = true;
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
@@ -129,11 +127,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - unary response call.
/// </summary>
- public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline)
+ public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
+ Preconditions.CheckState(!started);
+ Initialize(channel.Environment.CompletionQueue);
started = true;
halfcloseRequested = true;
@@ -142,7 +141,7 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
call.StartUnary(payload, HandleUnaryResponse, metadataArray);
}
@@ -154,17 +153,18 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline)
+ public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
+ Preconditions.CheckState(!started);
+ Initialize(channel.Environment.CompletionQueue);
started = true;
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
@@ -176,11 +176,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
- public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline)
+ public void StartServerStreamingCall(TRequest msg)
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
+ Preconditions.CheckState(!started);
+ Initialize(channel.Environment.CompletionQueue);
started = true;
halfcloseRequested = true;
@@ -188,7 +189,7 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
call.StartServerStreaming(payload, HandleFinished, metadataArray);
}
@@ -199,15 +200,16 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public void StartDuplexStreamingCall(Metadata headers, DateTime deadline)
+ public void StartDuplexStreamingCall()
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
+ Preconditions.CheckState(!started);
+ Initialize(channel.Environment.CompletionQueue);
started = true;
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
@@ -312,6 +314,13 @@ namespace Grpc.Core.Internal
channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
+ private void Initialize(CompletionQueueSafeHandle cq)
+ {
+ var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, method, host, Timespec.FromDateTime(context.Deadline));
+ channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ InitializeInternal(call);
+ }
+
/// <summary>
/// Handler for unary response completion.
/// </summary>