diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-05-24 15:58:14 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-05-24 15:58:14 -0700 |
commit | 8b5276c02ec7aebdf749bfd95d140406e6ea2226 (patch) | |
tree | 8389493193029a3cb5c80b11f595bc9d4d5035d7 /src/csharp/Grpc.Core | |
parent | 24d115654056aa2be3d548d80ea97937eb15b2a1 (diff) | |
parent | 031dea1df4b6213b9f9779a824fccc6d348b8648 (diff) |
Merge github.com:grpc/grpc into we-dont-need-no-backup
Conflicts:
src/core/surface/call.c
test/core/end2end/dualstack_socket_test.c
test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
test/core/end2end/tests/early_server_shutdown_finishes_tags.c
test/core/end2end/tests/graceful_server_shutdown.c
test/core/end2end/tests/invoke_large_request.c
test/core/end2end/tests/max_concurrent_streams.c
test/core/end2end/tests/max_message_length.c
test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
test/core/end2end/tests/request_response_with_metadata_and_payload.c
test/core/end2end/tests/request_response_with_payload.c
test/core/end2end/tests/request_response_with_payload_and_call_creds.c
test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
test/core/end2end/tests/request_with_large_metadata.c
test/core/end2end/tests/request_with_payload.c
test/core/httpcli/httpcli_test.c
tools/run_tests/run_tests.py
Diffstat (limited to 'src/csharp/Grpc.Core')
20 files changed, 147 insertions, 126 deletions
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index b95776f66d..d66b0d4974 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -40,33 +40,17 @@ namespace Grpc.Core /// <summary> /// Return type for client streaming calls. /// </summary> - public sealed class AsyncClientStreamingCall<TRequest, TResponse> - where TRequest : class - where TResponse : class + public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable { readonly IClientStreamWriter<TRequest> requestStream; readonly Task<TResponse> result; + readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result) + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction) { this.requestStream = requestStream; this.result = result; - } - - /// <summary> - /// Writes a request to RequestStream. - /// </summary> - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// <summary> - /// Closes the RequestStream. - /// </summary> - public Task Close() - { - return requestStream.Close(); + this.disposeAction = disposeAction; } /// <summary> @@ -99,5 +83,16 @@ namespace Grpc.Core { return result.GetAwaiter(); } + + /// <summary> + /// Provides means to provide after the call. + /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index ee05437416..4c0d5936ac 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -40,42 +40,17 @@ namespace Grpc.Core /// <summary> /// Return type for bidirectional streaming calls. /// </summary> - public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> - where TRequest : class - where TResponse : class + public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable { readonly IClientStreamWriter<TRequest> requestStream; readonly IAsyncStreamReader<TResponse> responseStream; + readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream) + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; - } - - /// <summary> - /// Writes a request to RequestStream. - /// </summary> - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// <summary> - /// Closes the RequestStream. - /// </summary> - public Task Close() - { - return requestStream.Close(); - } - - /// <summary> - /// Reads a response from ResponseStream. - /// </summary> - /// <returns></returns> - public Task<TResponse> ReadNext() - { - return responseStream.ReadNext(); + this.disposeAction = disposeAction; } /// <summary> @@ -99,5 +74,16 @@ namespace Grpc.Core return requestStream; } } + + /// <summary> + /// Provides means to cleanup after the call. + /// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index 73b9614985..7a479b9a23 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -40,23 +40,15 @@ namespace Grpc.Core /// <summary> /// Return type for server streaming calls. /// </summary> - public sealed class AsyncServerStreamingCall<TResponse> - where TResponse : class + public sealed class AsyncServerStreamingCall<TResponse> : IDisposable { readonly IAsyncStreamReader<TResponse> responseStream; + readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream) + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction) { this.responseStream = responseStream; - } - - /// <summary> - /// Reads the next response from ResponseStream - /// </summary> - /// <returns></returns> - public Task<TResponse> ReadNext() - { - return responseStream.ReadNext(); + this.disposeAction = disposeAction; } /// <summary> @@ -69,5 +61,16 @@ namespace Grpc.Core return responseStream; } } + + /// <summary> + /// Provides means to cleanup after the call. + /// If the call has already finished normally (response stream has been fully read), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs index d1ee59ff0a..37b452f020 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/Call.cs @@ -41,8 +41,6 @@ namespace Grpc.Core /// Abstraction of a call to be invoked on a client. /// </summary> public class Call<TRequest, TResponse> - where TRequest : class - where TResponse : class { readonly string name; readonly Marshaller<TRequest> requestMarshaller; diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index ba42a2d4f8..9f8baac684 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -73,7 +73,7 @@ namespace Grpc.Core asyncCall.StartServerStreamingCall(req, call.Headers); RegisterCancellationCallback(asyncCall, token); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncServerStreamingCall<TResponse>(responseStream); + return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel); } public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -85,7 +85,7 @@ namespace Grpc.Core var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); - return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel); } public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -98,7 +98,7 @@ namespace Grpc.Core RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream); + return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel); } private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index f5f2cf5f22..fe2d446a35 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -13,6 +13,7 @@ <AssemblyName>Grpc.Core</AssemblyName> <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> <NuGetPackageImportStamp>8bb563fb</NuGetPackageImportStamp> + <DocumentationFile>bin\$(Configuration)\Grpc.Core.Xml</DocumentationFile> </PropertyGroup> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <DebugSymbols>true</DebugSymbols> @@ -37,6 +38,9 @@ <Reference Include="System.Collections.Immutable"> <HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="AsyncDuplexStreamingCall.cs" /> diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec index e54908cb8b..69e8497bb7 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.nuspec +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -16,10 +16,14 @@ <tags>gRPC RPC Protocol HTTP/2</tags> <dependencies> <dependency id="Microsoft.Bcl.Immutable" version="1.0.34" /> - <dependency id="grpc.native.csharp_ext" version="0.8.0.0" /> + <dependency id="Ix-Async" version="1.2.3" /> + <dependency id="grpc.native.csharp_ext" version="0.9.0.0" /> </dependencies> </metadata> <files> <file src="bin/Release/Grpc.Core.dll" target="lib/net45" /> + <file src="bin/Release/Grpc.Core.pdb" target="lib/net45" /> + <file src="bin/Release/Grpc.Core.xml" target="lib/net45" /> + <file src="**\*.cs" target="src" /> </files> </package> diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 699741cd05..371fbf27ce 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -43,13 +43,8 @@ namespace Grpc.Core /// A stream of messages to be read. /// </summary> /// <typeparam name="T"></typeparam> - public interface IAsyncStreamReader<T> - where T : class + public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse> { - /// <summary> - /// Reads a single message. Returns null if the last message was already read. - /// A following read can only be started when the previous one finishes. - /// </summary> - Task<T> ReadNext(); + // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface. } } diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index 4bd8bfb8df..2000210252 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -44,12 +44,11 @@ namespace Grpc.Core /// </summary> /// <typeparam name="T"></typeparam> public interface IAsyncStreamWriter<T> - where T : class { /// <summary> - /// Writes a single message. Only one write can be pending at a time. + /// Writes a single asynchronously. Only one write can be pending at a time. /// </summary> /// <param name="message">the message to be written. Cannot be null.</param> - Task Write(T message); + Task WriteAsync(T message); } } diff --git a/src/csharp/Grpc.Core/IClientStreamWriter.cs b/src/csharp/Grpc.Core/IClientStreamWriter.cs index 0847a928e6..a3028bc374 100644 --- a/src/csharp/Grpc.Core/IClientStreamWriter.cs +++ b/src/csharp/Grpc.Core/IClientStreamWriter.cs @@ -44,11 +44,10 @@ namespace Grpc.Core /// </summary> /// <typeparam name="T"></typeparam> public interface IClientStreamWriter<T> : IAsyncStreamWriter<T> - where T : class { /// <summary> - /// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this. + /// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this. /// </summary> - Task Close(); + Task CompleteAsync(); } } diff --git a/src/csharp/Grpc.Core/IServerStreamWriter.cs b/src/csharp/Grpc.Core/IServerStreamWriter.cs index 199a585a3f..9f3af59109 100644 --- a/src/csharp/Grpc.Core/IServerStreamWriter.cs +++ b/src/csharp/Grpc.Core/IServerStreamWriter.cs @@ -43,7 +43,7 @@ namespace Grpc.Core /// A writable stream of messages that is used in server-side handlers. /// </summary> public interface IServerStreamWriter<T> : IAsyncStreamWriter<T> - where T : class { + // TODO(jtattermusch): consider just using IAsyncStreamWriter instead of this interface. } } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 1697058732..58f493463b 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -38,8 +38,6 @@ namespace Grpc.Core.Internal /// Writes requests asynchronously to an underlying AsyncCall object. /// </summary> internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest> - where TRequest : class - where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; @@ -48,14 +46,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TRequest message) + public Task WriteAsync(TRequest message) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task Close() + public Task CompleteAsync() { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendCloseFromClient(taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index b2378cade6..6c44521038 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; + TResponse current; public ClientResponseStream(AsyncCall<TRequest, TResponse> call) { this.call = call; } - public Task<TResponse> ReadNext() + public TResponse Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TResponse>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 95d8e97869..f494d9e0ff 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -71,12 +72,13 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context var result = await handler(context, request); - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (Exception e) { @@ -85,7 +87,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -122,9 +124,10 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context await handler(context, request, responseStream); @@ -137,7 +140,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -178,7 +181,7 @@ namespace Grpc.Core.Internal var result = await handler(context, requestStream); try { - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (OperationCanceledException) { @@ -193,7 +196,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -240,7 +243,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -263,7 +266,7 @@ namespace Grpc.Core.Internal var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall); var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method.")); + await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed. await requestStream.ToList(); await finishedTask; diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index d9ee0c815b..3fccb88abb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + TRequest current; public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call) { this.call = call; } - public Task<TRequest> ReadNext() + public TRequest Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TRequest>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index da688d504f..a2d77dd5b7 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -49,14 +49,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TResponse message) + public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task WriteStatus(Status status) + public Task WriteStatusAsync(Status status) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendStatusFromServer(status, taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 731ea2be81..7a1c016ae2 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -39,9 +39,6 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - //internal delegate void ServerShutdownCallbackDelegate(bool success); - /// <summary> /// grpc_server from grpc/grpc.h /// </summary> diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index e873b3e88a..bc9a499c51 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -42,7 +42,6 @@ namespace Grpc.Core /// </summary> public sealed class ServerCallContext { - // TODO(jtattermusch): add cancellationToken // TODO(jtattermusch): add deadline info diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs index f915155f8a..8a748b45a8 100644 --- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs +++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs @@ -49,14 +49,9 @@ namespace Grpc.Core.Utils public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction) where T : class { - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - await asyncAction(elem); + await asyncAction(streamReader.Current); } } @@ -67,32 +62,27 @@ namespace Grpc.Core.Utils where T : class { var result = new List<T>(); - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - result.Add(elem); + result.Add(streamReader.Current); } return result; } /// <summary> /// Writes all elements from given enumerable to the stream. - /// Closes the stream afterwards unless close = false. + /// Completes the stream afterwards unless close = false. /// </summary> - public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true) + public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true) where T : class { foreach (var element in elements) { - await streamWriter.Write(element); + await streamWriter.WriteAsync(element); } - if (close) + if (complete) { - await streamWriter.Close(); + await streamWriter.CompleteAsync(); } } @@ -104,7 +94,7 @@ namespace Grpc.Core.Utils { foreach (var element in elements) { - await streamWriter.Write(element); + await streamWriter.WriteAsync(element); } } } diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config index 71967de56e..fb7eaaeeda 100644 --- a/src/csharp/Grpc.Core/packages.config +++ b/src/csharp/Grpc.Core/packages.config @@ -2,5 +2,6 @@ <packages> <package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" /> <package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" /> </packages>
\ No newline at end of file |