diff options
Diffstat (limited to 'src/csharp')
35 files changed, 361 insertions, 308 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index eac8d16fb1..62cb443272 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -34,6 +34,9 @@ <HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath> </Reference> <Reference Include="System" /> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> @@ -57,7 +60,5 @@ <ItemGroup> <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> </ItemGroup> - <ItemGroup> - <Folder Include="Internal\" /> - </ItemGroup> + <ItemGroup /> </Project>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core.Tests/packages.config b/src/csharp/Grpc.Core.Tests/packages.config index c714ef3a23..28af8d78c6 100644 --- a/src/csharp/Grpc.Core.Tests/packages.config +++ b/src/csharp/Grpc.Core.Tests/packages.config @@ -1,4 +1,5 @@ <?xml version="1.0" encoding="utf-8"?> <packages> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="NUnit" version="2.6.4" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index b95776f66d..d66b0d4974 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -40,33 +40,17 @@ namespace Grpc.Core /// <summary> /// Return type for client streaming calls. /// </summary> - public sealed class AsyncClientStreamingCall<TRequest, TResponse> - where TRequest : class - where TResponse : class + public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable { readonly IClientStreamWriter<TRequest> requestStream; readonly Task<TResponse> result; + readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result) + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction) { this.requestStream = requestStream; this.result = result; - } - - /// <summary> - /// Writes a request to RequestStream. - /// </summary> - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// <summary> - /// Closes the RequestStream. - /// </summary> - public Task Close() - { - return requestStream.Close(); + this.disposeAction = disposeAction; } /// <summary> @@ -99,5 +83,16 @@ namespace Grpc.Core { return result.GetAwaiter(); } + + /// <summary> + /// Provides means to provide after the call. + /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index ee05437416..4c0d5936ac 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -40,42 +40,17 @@ namespace Grpc.Core /// <summary> /// Return type for bidirectional streaming calls. /// </summary> - public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> - where TRequest : class - where TResponse : class + public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable { readonly IClientStreamWriter<TRequest> requestStream; readonly IAsyncStreamReader<TResponse> responseStream; + readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream) + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; - } - - /// <summary> - /// Writes a request to RequestStream. - /// </summary> - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// <summary> - /// Closes the RequestStream. - /// </summary> - public Task Close() - { - return requestStream.Close(); - } - - /// <summary> - /// Reads a response from ResponseStream. - /// </summary> - /// <returns></returns> - public Task<TResponse> ReadNext() - { - return responseStream.ReadNext(); + this.disposeAction = disposeAction; } /// <summary> @@ -99,5 +74,16 @@ namespace Grpc.Core return requestStream; } } + + /// <summary> + /// Provides means to cleanup after the call. + /// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index 73b9614985..7a479b9a23 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -40,23 +40,15 @@ namespace Grpc.Core /// <summary> /// Return type for server streaming calls. /// </summary> - public sealed class AsyncServerStreamingCall<TResponse> - where TResponse : class + public sealed class AsyncServerStreamingCall<TResponse> : IDisposable { readonly IAsyncStreamReader<TResponse> responseStream; + readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream) + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction) { this.responseStream = responseStream; - } - - /// <summary> - /// Reads the next response from ResponseStream - /// </summary> - /// <returns></returns> - public Task<TResponse> ReadNext() - { - return responseStream.ReadNext(); + this.disposeAction = disposeAction; } /// <summary> @@ -69,5 +61,16 @@ namespace Grpc.Core return responseStream; } } + + /// <summary> + /// Provides means to cleanup after the call. + /// If the call has already finished normally (response stream has been fully read), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs index d1ee59ff0a..37b452f020 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/Call.cs @@ -41,8 +41,6 @@ namespace Grpc.Core /// Abstraction of a call to be invoked on a client. /// </summary> public class Call<TRequest, TResponse> - where TRequest : class - where TResponse : class { readonly string name; readonly Marshaller<TRequest> requestMarshaller; diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index ba42a2d4f8..9f8baac684 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -73,7 +73,7 @@ namespace Grpc.Core asyncCall.StartServerStreamingCall(req, call.Headers); RegisterCancellationCallback(asyncCall, token); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncServerStreamingCall<TResponse>(responseStream); + return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel); } public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -85,7 +85,7 @@ namespace Grpc.Core var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); - return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel); } public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -98,7 +98,7 @@ namespace Grpc.Core RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream); + return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel); } private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index f5f2cf5f22..6b4345cbe1 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -37,6 +37,9 @@ <Reference Include="System.Collections.Immutable"> <HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="AsyncDuplexStreamingCall.cs" /> diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec index e54908cb8b..5269881afa 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.nuspec +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -16,6 +16,7 @@ <tags>gRPC RPC Protocol HTTP/2</tags> <dependencies> <dependency id="Microsoft.Bcl.Immutable" version="1.0.34" /> + <dependency id="Ix-Async" version="1.2.3" /> <dependency id="grpc.native.csharp_ext" version="0.8.0.0" /> </dependencies> </metadata> diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 699741cd05..371fbf27ce 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -43,13 +43,8 @@ namespace Grpc.Core /// A stream of messages to be read. /// </summary> /// <typeparam name="T"></typeparam> - public interface IAsyncStreamReader<T> - where T : class + public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse> { - /// <summary> - /// Reads a single message. Returns null if the last message was already read. - /// A following read can only be started when the previous one finishes. - /// </summary> - Task<T> ReadNext(); + // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface. } } diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index 4bd8bfb8df..2000210252 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -44,12 +44,11 @@ namespace Grpc.Core /// </summary> /// <typeparam name="T"></typeparam> public interface IAsyncStreamWriter<T> - where T : class { /// <summary> - /// Writes a single message. Only one write can be pending at a time. + /// Writes a single asynchronously. Only one write can be pending at a time. /// </summary> /// <param name="message">the message to be written. Cannot be null.</param> - Task Write(T message); + Task WriteAsync(T message); } } diff --git a/src/csharp/Grpc.Core/IClientStreamWriter.cs b/src/csharp/Grpc.Core/IClientStreamWriter.cs index 0847a928e6..a3028bc374 100644 --- a/src/csharp/Grpc.Core/IClientStreamWriter.cs +++ b/src/csharp/Grpc.Core/IClientStreamWriter.cs @@ -44,11 +44,10 @@ namespace Grpc.Core /// </summary> /// <typeparam name="T"></typeparam> public interface IClientStreamWriter<T> : IAsyncStreamWriter<T> - where T : class { /// <summary> - /// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this. + /// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this. /// </summary> - Task Close(); + Task CompleteAsync(); } } diff --git a/src/csharp/Grpc.Core/IServerStreamWriter.cs b/src/csharp/Grpc.Core/IServerStreamWriter.cs index 199a585a3f..9f3af59109 100644 --- a/src/csharp/Grpc.Core/IServerStreamWriter.cs +++ b/src/csharp/Grpc.Core/IServerStreamWriter.cs @@ -43,7 +43,7 @@ namespace Grpc.Core /// A writable stream of messages that is used in server-side handlers. /// </summary> public interface IServerStreamWriter<T> : IAsyncStreamWriter<T> - where T : class { + // TODO(jtattermusch): consider just using IAsyncStreamWriter instead of this interface. } } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 1697058732..58f493463b 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -38,8 +38,6 @@ namespace Grpc.Core.Internal /// Writes requests asynchronously to an underlying AsyncCall object. /// </summary> internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest> - where TRequest : class - where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; @@ -48,14 +46,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TRequest message) + public Task WriteAsync(TRequest message) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task Close() + public Task CompleteAsync() { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendCloseFromClient(taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index b2378cade6..6c44521038 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; + TResponse current; public ClientResponseStream(AsyncCall<TRequest, TResponse> call) { this.call = call; } - public Task<TResponse> ReadNext() + public TResponse Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TResponse>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 95d8e97869..f494d9e0ff 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -71,12 +72,13 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context var result = await handler(context, request); - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (Exception e) { @@ -85,7 +87,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -122,9 +124,10 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context await handler(context, request, responseStream); @@ -137,7 +140,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -178,7 +181,7 @@ namespace Grpc.Core.Internal var result = await handler(context, requestStream); try { - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (OperationCanceledException) { @@ -193,7 +196,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -240,7 +243,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -263,7 +266,7 @@ namespace Grpc.Core.Internal var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall); var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method.")); + await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed. await requestStream.ToList(); await finishedTask; diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index d9ee0c815b..3fccb88abb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + TRequest current; public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call) { this.call = call; } - public Task<TRequest> ReadNext() + public TRequest Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TRequest>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index da688d504f..a2d77dd5b7 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -49,14 +49,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TResponse message) + public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task WriteStatus(Status status) + public Task WriteStatusAsync(Status status) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendStatusFromServer(status, taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 731ea2be81..7a1c016ae2 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -39,9 +39,6 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - //internal delegate void ServerShutdownCallbackDelegate(bool success); - /// <summary> /// grpc_server from grpc/grpc.h /// </summary> diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index e873b3e88a..bc9a499c51 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -42,7 +42,6 @@ namespace Grpc.Core /// </summary> public sealed class ServerCallContext { - // TODO(jtattermusch): add cancellationToken // TODO(jtattermusch): add deadline info diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs index f915155f8a..8a748b45a8 100644 --- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs +++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs @@ -49,14 +49,9 @@ namespace Grpc.Core.Utils public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction) where T : class { - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - await asyncAction(elem); + await asyncAction(streamReader.Current); } } @@ -67,32 +62,27 @@ namespace Grpc.Core.Utils where T : class { var result = new List<T>(); - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - result.Add(elem); + result.Add(streamReader.Current); } return result; } /// <summary> /// Writes all elements from given enumerable to the stream. - /// Closes the stream afterwards unless close = false. + /// Completes the stream afterwards unless close = false. /// </summary> - public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true) + public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true) where T : class { foreach (var element in elements) { - await streamWriter.Write(element); + await streamWriter.WriteAsync(element); } - if (close) + if (complete) { - await streamWriter.Close(); + await streamWriter.CompleteAsync(); } } @@ -104,7 +94,7 @@ namespace Grpc.Core.Utils { foreach (var element in elements) { - await streamWriter.Write(element); + await streamWriter.WriteAsync(element); } } } diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config index 71967de56e..fb7eaaeeda 100644 --- a/src/csharp/Grpc.Core/packages.config +++ b/src/csharp/Grpc.Core/packages.config @@ -2,5 +2,6 @@ <packages> <package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" /> <package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj index 87ccf07dd8..6e84add42b 100644 --- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj +++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj @@ -37,6 +37,10 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index 4997d3aa42..5aa6f4162d 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -96,7 +96,19 @@ namespace math.Tests Assert.AreEqual(0, response.Remainder); } - // TODO(jtattermusch): test division by zero + [Test] + public void DivByZero() + { + try + { + DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build()); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); + } + } [Test] public void DivAsync() @@ -114,11 +126,12 @@ namespace math.Tests { Task.Run(async () => { - var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build()); - - var responses = await call.ResponseStream.ToList(); - CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 }, - responses.ConvertAll((n) => n.Num_)); + using (var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build())) + { + var responses = await call.ResponseStream.ToList(); + CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 }, + responses.ConvertAll((n) => n.Num_)); + } }).Wait(); } @@ -128,13 +141,15 @@ namespace math.Tests { Task.Run(async () => { - var call = client.Sum(); - var numbers = new List<long> { 10, 20, 30 }.ConvertAll( - n => Num.CreateBuilder().SetNum_(n).Build()); + using (var call = client.Sum()) + { + var numbers = new List<long> { 10, 20, 30 }.ConvertAll( + n => Num.CreateBuilder().SetNum_(n).Build()); - await call.RequestStream.WriteAll(numbers); - var result = await call.Result; - Assert.AreEqual(60, result.Num_); + await call.RequestStream.WriteAll(numbers); + var result = await call.Result; + Assert.AreEqual(60, result.Num_); + } }).Wait(); } @@ -150,12 +165,14 @@ namespace math.Tests new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() }; - var call = client.DivMany(); - await call.RequestStream.WriteAll(divArgsList); - var result = await call.ResponseStream.ToList(); + using (var call = client.DivMany()) + { + await call.RequestStream.WriteAll(divArgsList); + var result = await call.ResponseStream.ToList(); - CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient)); - CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder)); + CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient)); + CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder)); + } }).Wait(); } } diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config index 4d6ec63b3c..cc6e9af40f 100644 --- a/src/csharp/Grpc.Examples.Tests/packages.config +++ b/src/csharp/Grpc.Examples.Tests/packages.config @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj index 2c5019c214..5ce490f403 100644 --- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj +++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj @@ -35,6 +35,9 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs index ab06a44c0d..d2cfbee18f 100644 --- a/src/csharp/Grpc.Examples/MathExamples.cs +++ b/src/csharp/Grpc.Examples/MathExamples.cs @@ -51,18 +51,13 @@ namespace math Console.WriteLine("DivAsync Result: " + result); } - public static async Task DivAsyncWithCancellationExample(Math.IMathClient stub) - { - Task<DivReply> resultTask = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); - DivReply result = await resultTask; - Console.WriteLine(result); - } - public static async Task FibExample(Math.IMathClient stub) { - var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build()); - List<Num> result = await call.ResponseStream.ToList(); - Console.WriteLine("Fib Result: " + string.Join("|", result)); + using (var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build())) + { + List<Num> result = await call.ResponseStream.ToList(); + Console.WriteLine("Fib Result: " + string.Join("|", result)); + } } public static async Task SumExample(Math.IMathClient stub) @@ -74,9 +69,11 @@ namespace math new Num.Builder { Num_ = 3 }.Build() }; - var call = stub.Sum(); - await call.RequestStream.WriteAll(numbers); - Console.WriteLine("Sum Result: " + await call.Result); + using (var call = stub.Sum()) + { + await call.RequestStream.WriteAll(numbers); + Console.WriteLine("Sum Result: " + await call.Result); + } } public static async Task DivManyExample(Math.IMathClient stub) @@ -87,9 +84,11 @@ namespace math new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(), new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() }; - var call = stub.DivMany(); - await call.RequestStream.WriteAll(divArgsList); - Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList())); + using (var call = stub.DivMany()) + { + await call.RequestStream.WriteAll(divArgsList); + Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList())); + } } public static async Task DependendRequestsExample(Math.IMathClient stub) @@ -101,9 +100,12 @@ namespace math new Num.Builder { Num_ = 3 }.Build() }; - var sumCall = stub.Sum(); - await sumCall.RequestStream.WriteAll(numbers); - Num sum = await sumCall.Result; + Num sum; + using (var sumCall = stub.Sum()) + { + await sumCall.RequestStream.WriteAll(numbers); + sum = await sumCall.Result; + } DivReply result = await stub.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build()); Console.WriteLine("Avg Result: " + result); diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs index 2546fd220d..b9efc44e8c 100644 --- a/src/csharp/Grpc.Examples/MathGrpc.cs +++ b/src/csharp/Grpc.Examples/MathGrpc.cs @@ -12,30 +12,30 @@ namespace math { { static readonly string __ServiceName = "math.Math"; - static readonly Marshaller<DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom); - static readonly Marshaller<DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom); - static readonly Marshaller<FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), FibArgs.ParseFrom); - static readonly Marshaller<Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom); + static readonly Marshaller<global::math.DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivArgs.ParseFrom); + static readonly Marshaller<global::math.DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivReply.ParseFrom); + static readonly Marshaller<global::math.FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.FibArgs.ParseFrom); + static readonly Marshaller<global::math.Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), global::math.Num.ParseFrom); - static readonly Method<DivArgs, DivReply> __Method_Div = new Method<DivArgs, DivReply>( + static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_Div = new Method<global::math.DivArgs, global::math.DivReply>( MethodType.Unary, "Div", __Marshaller_DivArgs, __Marshaller_DivReply); - static readonly Method<DivArgs, DivReply> __Method_DivMany = new Method<DivArgs, DivReply>( + static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_DivMany = new Method<global::math.DivArgs, global::math.DivReply>( MethodType.DuplexStreaming, "DivMany", __Marshaller_DivArgs, __Marshaller_DivReply); - static readonly Method<FibArgs, Num> __Method_Fib = new Method<FibArgs, Num>( + static readonly Method<global::math.FibArgs, global::math.Num> __Method_Fib = new Method<global::math.FibArgs, global::math.Num>( MethodType.ServerStreaming, "Fib", __Marshaller_FibArgs, __Marshaller_Num); - static readonly Method<Num, Num> __Method_Sum = new Method<Num, Num>( + static readonly Method<global::math.Num, global::math.Num> __Method_Sum = new Method<global::math.Num, global::math.Num>( MethodType.ClientStreaming, "Sum", __Marshaller_Num, @@ -44,20 +44,20 @@ namespace math { // client-side stub interface public interface IMathClient { - DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)); - Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken)); - AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken)); - AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken)); + global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken)); + Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken)); + AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken)); + AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken)); } // server-side interface public interface IMath { - Task<DivReply> Div(ServerCallContext context, DivArgs request); - Task DivMany(ServerCallContext context, IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream); - Task Fib(ServerCallContext context, FibArgs request, IServerStreamWriter<Num> responseStream); - Task<Num> Sum(ServerCallContext context, IAsyncStreamReader<Num> requestStream); + Task<global::math.DivReply> Div(ServerCallContext context, global::math.DivArgs request); + Task DivMany(ServerCallContext context, IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream); + Task Fib(ServerCallContext context, global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream); + Task<global::math.Num> Sum(ServerCallContext context, IAsyncStreamReader<global::math.Num> requestStream); } // client stub @@ -69,27 +69,27 @@ namespace math { public MathClient(Channel channel, StubConfiguration config) : base(channel, config) { } - public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)) + public global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Div); return Calls.BlockingUnaryCall(call, request, token); } - public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)) + public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Div); return Calls.AsyncUnaryCall(call, request, token); } - public AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_DivMany); return Calls.AsyncDuplexStreamingCall(call, token); } - public AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken)) + public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Fib); return Calls.AsyncServerStreamingCall(call, request, token); } - public AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken)) + public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Sum); return Calls.AsyncClientStreamingCall(call, token); diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs index 3b33b09bbd..e247ac9d73 100644 --- a/src/csharp/Grpc.Examples/MathServiceImpl.cs +++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs @@ -62,7 +62,7 @@ namespace math { foreach (var num in FibInternal(request.Limit)) { - await responseStream.Write(num); + await responseStream.WriteAsync(num); } } } @@ -81,7 +81,7 @@ namespace math { await requestStream.ForEach(async divArgs => { - await responseStream.Write(DivInternal(divArgs)); + await responseStream.WriteAsync(DivInternal(divArgs)); }); } diff --git a/src/csharp/Grpc.Examples/packages.config b/src/csharp/Grpc.Examples/packages.config index 51c17bcd5e..4c8d60fa62 100644 --- a/src/csharp/Grpc.Examples/packages.config +++ b/src/csharp/Grpc.Examples/packages.config @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <packages> <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="NUnit" version="2.6.4" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 1ca3dd24e1..b3a0a2917b 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -54,6 +54,9 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> <Reference Include="System.Net" /> <Reference Include="System.Net.Http" /> <Reference Include="System.Net.Http.Extensions"> diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 02f8a369de..dfaf18cae1 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -213,11 +213,13 @@ namespace Grpc.IntegrationTesting var bodySizes = new List<int> { 27182, 8, 1828, 45904 }.ConvertAll((size) => StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build()); - var call = client.StreamingInputCall(); - await call.RequestStream.WriteAll(bodySizes); + using (var call = client.StreamingInputCall()) + { + await call.RequestStream.WriteAll(bodySizes); - var response = await call.Result; - Assert.AreEqual(74922, response.AggregatedPayloadSize); + var response = await call.Result; + Assert.AreEqual(74922, response.AggregatedPayloadSize); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -236,14 +238,15 @@ namespace Grpc.IntegrationTesting (size) => ResponseParameters.CreateBuilder().SetSize(size).Build())) .Build(); - var call = client.StreamingOutputCall(request); - - var responseList = await call.ResponseStream.ToList(); - foreach (var res in responseList) + using (var call = client.StreamingOutputCall(request)) { - Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + var responseList = await call.ResponseStream.ToList(); + foreach (var res in responseList) + { + Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + } + CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); } - CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); Console.WriteLine("Passed!"); }).Wait(); } @@ -254,51 +257,48 @@ namespace Grpc.IntegrationTesting { Console.WriteLine("running ping_pong"); - var call = client.FullDuplexCall(); - - StreamingOutputCallResponse response; - - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) - .SetPayload(CreateZerosPayload(27182)).Build()); - - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + using (var call = client.FullDuplexCall()) + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) + .SetPayload(CreateZerosPayload(27182)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) - .SetPayload(CreateZerosPayload(8)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(9, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) + .SetPayload(CreateZerosPayload(8)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) - .SetPayload(CreateZerosPayload(1828)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(2653, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) + .SetPayload(CreateZerosPayload(1828)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) - .SetPayload(CreateZerosPayload(45904)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(58979, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) + .SetPayload(CreateZerosPayload(45904)).Build()); - await call.RequestStream.Close(); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(null, response); + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -308,12 +308,13 @@ namespace Grpc.IntegrationTesting Task.Run(async () => { Console.WriteLine("running empty_stream"); - var call = client.FullDuplexCall(); - await call.Close(); - - var responseList = await call.ResponseStream.ToList(); - Assert.AreEqual(0, responseList.Count); + using (var call = client.FullDuplexCall()) + { + await call.RequestStream.CompleteAsync(); + var responseList = await call.ResponseStream.ToList(); + Assert.AreEqual(0, responseList.Count); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -365,19 +366,21 @@ namespace Grpc.IntegrationTesting Console.WriteLine("running cancel_after_begin"); var cts = new CancellationTokenSource(); - var call = client.StreamingInputCall(cts.Token); - // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. - await Task.Delay(1000); - cts.Cancel(); - - try + using (var call = client.StreamingInputCall(cts.Token)) { - var response = await call.Result; - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. + await Task.Delay(1000); + cts.Cancel(); + + try + { + var response = await call.Result; + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + } } Console.WriteLine("Passed!"); }).Wait(); @@ -390,29 +393,28 @@ namespace Grpc.IntegrationTesting Console.WriteLine("running cancel_after_first_response"); var cts = new CancellationTokenSource(); - var call = client.FullDuplexCall(cts.Token); - - StreamingOutputCallResponse response; - - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) - .SetPayload(CreateZerosPayload(27182)).Build()); + using (var call = client.FullDuplexCall(cts.Token)) + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) + .SetPayload(CreateZerosPayload(27182)).Build()); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); - cts.Cancel(); + cts.Cancel(); - try - { - response = await call.ResponseStream.ReadNext(); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + try + { + await call.ResponseStream.MoveNext(); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + } } Console.WriteLine("Passed!"); }).Wait(); diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index 679aafb57a..ee077f9f56 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -12,45 +12,45 @@ namespace grpc.testing { { static readonly string __ServiceName = "grpc.testing.TestService"; - static readonly Marshaller<Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom); - static readonly Marshaller<SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom); - static readonly Marshaller<SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom); - static readonly Marshaller<StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallRequest.ParseFrom); - static readonly Marshaller<StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallResponse.ParseFrom); - static readonly Marshaller<StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallRequest.ParseFrom); - static readonly Marshaller<StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallResponse.ParseFrom); + static readonly Marshaller<global::grpc.testing.Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.Empty.ParseFrom); + static readonly Marshaller<global::grpc.testing.SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleRequest.ParseFrom); + static readonly Marshaller<global::grpc.testing.SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleResponse.ParseFrom); + static readonly Marshaller<global::grpc.testing.StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallRequest.ParseFrom); + static readonly Marshaller<global::grpc.testing.StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallResponse.ParseFrom); + static readonly Marshaller<global::grpc.testing.StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallRequest.ParseFrom); + static readonly Marshaller<global::grpc.testing.StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallResponse.ParseFrom); - static readonly Method<Empty, Empty> __Method_EmptyCall = new Method<Empty, Empty>( + static readonly Method<global::grpc.testing.Empty, global::grpc.testing.Empty> __Method_EmptyCall = new Method<global::grpc.testing.Empty, global::grpc.testing.Empty>( MethodType.Unary, "EmptyCall", __Marshaller_Empty, __Marshaller_Empty); - static readonly Method<SimpleRequest, SimpleResponse> __Method_UnaryCall = new Method<SimpleRequest, SimpleResponse>( + static readonly Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse> __Method_UnaryCall = new Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse>( MethodType.Unary, "UnaryCall", __Marshaller_SimpleRequest, __Marshaller_SimpleResponse); - static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( + static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>( MethodType.ServerStreaming, "StreamingOutputCall", __Marshaller_StreamingOutputCallRequest, __Marshaller_StreamingOutputCallResponse); - static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> __Method_StreamingInputCall = new Method<StreamingInputCallRequest, StreamingInputCallResponse>( + static readonly Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> __Method_StreamingInputCall = new Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse>( MethodType.ClientStreaming, "StreamingInputCall", __Marshaller_StreamingInputCallRequest, __Marshaller_StreamingInputCallResponse); - static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( + static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>( MethodType.DuplexStreaming, "FullDuplexCall", __Marshaller_StreamingOutputCallRequest, __Marshaller_StreamingOutputCallResponse); - static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( + static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>( MethodType.DuplexStreaming, "HalfDuplexCall", __Marshaller_StreamingOutputCallRequest, @@ -59,25 +59,25 @@ namespace grpc.testing { // client-side stub interface public interface ITestServiceClient { - Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken)); - Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken)); - SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken)); - Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)); - AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)); - AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken)); + global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)); + Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)); + global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)); + Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)); + AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)); + AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken)); } // server-side interface public interface ITestService { - Task<Empty> EmptyCall(ServerCallContext context, Empty request); - Task<SimpleResponse> UnaryCall(ServerCallContext context, SimpleRequest request); - Task StreamingOutputCall(ServerCallContext context, StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream); - Task<StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<StreamingInputCallRequest> requestStream); - Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream); - Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream); + Task<global::grpc.testing.Empty> EmptyCall(ServerCallContext context, global::grpc.testing.Empty request); + Task<global::grpc.testing.SimpleResponse> UnaryCall(ServerCallContext context, global::grpc.testing.SimpleRequest request); + Task StreamingOutputCall(ServerCallContext context, global::grpc.testing.StreamingOutputCallRequest request, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); + Task<global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingInputCallRequest> requestStream); + Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); + Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); } // client stub @@ -89,42 +89,42 @@ namespace grpc.testing { public TestServiceClient(Channel channel, StubConfiguration config) : base(channel, config) { } - public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken)) + public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_EmptyCall); return Calls.BlockingUnaryCall(call, request, token); } - public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken)) + public Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_EmptyCall); return Calls.AsyncUnaryCall(call, request, token); } - public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken)) + public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_UnaryCall); return Calls.BlockingUnaryCall(call, request, token); } - public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)) + public Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_UnaryCall); return Calls.AsyncUnaryCall(call, request, token); } - public AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)) + public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_StreamingOutputCall); return Calls.AsyncServerStreamingCall(call, request, token); } - public AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)) + public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_StreamingInputCall); return Calls.AsyncClientStreamingCall(call, token); } - public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_FullDuplexCall); return Calls.AsyncDuplexStreamingCall(call, token); } - public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_HalfDuplexCall); return Calls.AsyncDuplexStreamingCall(call, token); diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs index d6ba61ef82..6bd997d1f4 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs @@ -64,7 +64,7 @@ namespace grpc.testing { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - await responseStream.Write(response); + await responseStream.WriteAsync(response); } } @@ -86,7 +86,7 @@ namespace grpc.testing { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - await responseStream.Write(response); + await responseStream.WriteAsync(response); } }); } diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config index e33b6e3e46..291b7b8599 100644 --- a/src/csharp/Grpc.IntegrationTesting/packages.config +++ b/src/csharp/Grpc.IntegrationTesting/packages.config @@ -3,6 +3,7 @@ <package id="Google.Apis.Auth" version="1.9.1" targetFramework="net45" /> <package id="Google.Apis.Core" version="1.9.1" targetFramework="net45" /> <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="Microsoft.Bcl" version="1.1.9" targetFramework="net45" /> <package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" /> <package id="Microsoft.Bcl.Build" version="1.0.14" targetFramework="net45" /> |