diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2015-08-05 02:25:33 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2015-08-05 14:32:10 -0700 |
commit | 542e21cbe08191f6709d0dc6e44367c231fb3072 (patch) | |
tree | 330ff56575e41ae6485cfce0998637625ac57489 | |
parent | 5e10f18376e07fd43c12bc2a14ccf3a0e0682660 (diff) |
refactoring AsyncCall
-rw-r--r-- | src/csharp/Grpc.Core.Tests/TimeoutsTest.cs | 3 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Call.cs | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Calls.cs | 29 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 61 |
4 files changed, 52 insertions, 42 deletions
diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs index 2dea8d06e1..f90a46368c 100644 --- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs +++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs @@ -126,8 +126,7 @@ namespace Grpc.Core.Tests [Test] public void DeadlineInThePast() { - var deadline = DateTime.MinValue; - var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext()); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, new CallContext(deadline: DateTime.MinValue)); try { diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs index f9d1fde548..577c17b931 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/Call.cs @@ -53,6 +53,7 @@ namespace Grpc.Core this.name = method.GetFullName(serviceName); this.requestMarshaller = method.RequestMarshaller; this.responseMarshaller = method.ResponseMarshaller; + this.channel = channel; this.context = context; } diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index f3c363bda2..ef6636587e 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -47,19 +47,20 @@ namespace Grpc.Core where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); + var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context, + call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); // TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts. RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); - return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Context.Headers, call.Context.Deadline); + return asyncCall.UnaryCall(req); } public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline)); - var asyncResult = asyncCall.UnaryCallAsync(req, call.Context.Headers, call.Context.Deadline); + var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context, + call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); + var asyncResult = asyncCall.UnaryCallAsync(req); RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } @@ -68,9 +69,9 @@ namespace Grpc.Core where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline)); - asyncCall.StartServerStreamingCall(req, call.Context.Headers, call.Context.Deadline); + var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context, + call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); + asyncCall.StartServerStreamingCall(req); RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); @@ -80,9 +81,9 @@ namespace Grpc.Core where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline)); - var resultTask = asyncCall.ClientStreamingCallAsync(call.Context.Headers, call.Context.Deadline); + var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context, + call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); + var resultTask = asyncCall.ClientStreamingCallAsync(); RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); @@ -92,9 +93,9 @@ namespace Grpc.Core where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.Environment.CompletionQueue, call.Name, Timespec.FromDateTime(call.Context.Deadline)); - asyncCall.StartDuplexStreamingCall(call.Context.Headers, call.Context.Deadline); + var asyncCall = new AsyncCall<TRequest, TResponse>(call.Channel, call.Name, null, call.Context, + call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); + asyncCall.StartDuplexStreamingCall(); RegisterCancellationCallback(asyncCall, call.Context.CancellationToken); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index f84c4b4633..ff3e99d30d 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -50,7 +50,10 @@ namespace Grpc.Core.Internal { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); - Channel channel; + readonly Channel channel; + readonly string method; + readonly string host; + readonly CallContext context; // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; @@ -60,26 +63,20 @@ namespace Grpc.Core.Internal bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) - { - } - - public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline) + public AsyncCall(Channel channel, string method, string host, CallContext context, Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) { this.channel = channel; - var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, methodName, null, deadline); - channel.Environment.DebugStats.ActiveClientCalls.Increment(); - InitializeInternal(call); + this.method = Preconditions.CheckNotNull(method); + this.host = host; // null host means default host will be used by C-core. + this.context = context; } // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but // it is reusing fair amount of code in this class, so we are leaving it here. - // TODO: for other calls, you need to call Initialize, this methods calls initialize - // on its own, so there's a usage inconsistency. /// <summary> /// Blocking unary request - unary response call. /// </summary> - public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline) + public TResponse UnaryCall(TRequest msg) { using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) { @@ -89,13 +86,14 @@ namespace Grpc.Core.Internal lock (myLock) { - Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline)); + Preconditions.CheckState(!started); + Initialize(cq); started = true; halfcloseRequested = true; readingDone = true; } - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) { using (var ctx = BatchContextSafeHandle.Create()) { @@ -129,11 +127,12 @@ namespace Grpc.Core.Internal /// <summary> /// Starts a unary request - unary response call. /// </summary> - public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline) + public Task<TResponse> UnaryCallAsync(TRequest msg) { lock (myLock) { - Preconditions.CheckNotNull(call); + Preconditions.CheckState(!started); + Initialize(channel.Environment.CompletionQueue); started = true; halfcloseRequested = true; @@ -142,7 +141,7 @@ namespace Grpc.Core.Internal byte[] payload = UnsafeSerialize(msg); unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) { call.StartUnary(payload, HandleUnaryResponse, metadataArray); } @@ -154,17 +153,18 @@ namespace Grpc.Core.Internal /// Starts a streamed request - unary response call. /// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// </summary> - public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline) + public Task<TResponse> ClientStreamingCallAsync() { lock (myLock) { - Preconditions.CheckNotNull(call); + Preconditions.CheckState(!started); + Initialize(channel.Environment.CompletionQueue); started = true; readingDone = true; unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) { call.StartClientStreaming(HandleUnaryResponse, metadataArray); } @@ -176,11 +176,12 @@ namespace Grpc.Core.Internal /// <summary> /// Starts a unary request - streamed response call. /// </summary> - public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline) + public void StartServerStreamingCall(TRequest msg) { lock (myLock) { - Preconditions.CheckNotNull(call); + Preconditions.CheckState(!started); + Initialize(channel.Environment.CompletionQueue); started = true; halfcloseRequested = true; @@ -188,7 +189,7 @@ namespace Grpc.Core.Internal byte[] payload = UnsafeSerialize(msg); - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) { call.StartServerStreaming(payload, HandleFinished, metadataArray); } @@ -199,15 +200,16 @@ namespace Grpc.Core.Internal /// Starts a streaming request - streaming response call. /// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// </summary> - public void StartDuplexStreamingCall(Metadata headers, DateTime deadline) + public void StartDuplexStreamingCall() { lock (myLock) { - Preconditions.CheckNotNull(call); + Preconditions.CheckState(!started); + Initialize(channel.Environment.CompletionQueue); started = true; - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(context.Headers)) { call.StartDuplexStreaming(HandleFinished, metadataArray); } @@ -312,6 +314,13 @@ namespace Grpc.Core.Internal channel.Environment.DebugStats.ActiveClientCalls.Decrement(); } + private void Initialize(CompletionQueueSafeHandle cq) + { + var call = channel.Handle.CreateCall(channel.Environment.CompletionRegistry, cq, method, host, Timespec.FromDateTime(context.Deadline)); + channel.Environment.DebugStats.ActiveClientCalls.Increment(); + InitializeInternal(call); + } + /// <summary> /// Handler for unary response completion. /// </summary> |