From 69274c2a0d316ccac66a5fa726255de0c8197834 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 2 May 2016 15:34:40 -0700 Subject: add more features --- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index ccd047f469..877b997aba 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -346,6 +346,10 @@ namespace Grpc.Core.Internal /// protected void HandleReadFinished(bool success, byte[] receivedMessage) { + // if success == false, received message will be null. It that case we will + // treat this completion as the last read an rely on C core to handle the failed + // read (e.g. deliver approriate statusCode on the clientside). + TRead msg = default(TRead); var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; @@ -370,8 +374,6 @@ namespace Grpc.Core.Internal ReleaseResourcesIfPossible(); } - // TODO: handle the case when success==false - if (deserializeException != null && !IsClient) { FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException)); -- cgit v1.2.3 From 96f21a27cb7975d52aa92197536eef0ad8dca455 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 2 May 2016 17:17:18 -0700 Subject: make end-of-stream idempotent --- .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 39 +++++++++++++++------- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 5 ++- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 39 ++++++++++++++-------- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 5 ++- .../Grpc.Core/Internal/ClientResponseStream.cs | 4 +-- .../Grpc.Core/Internal/ServerRequestStream.cs | 4 +-- 6 files changed, 58 insertions(+), 38 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 324b682510..96749cda14 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -76,8 +76,8 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_StreamingOperationsNotAllowed() { asyncCall.UnaryCallAsync("request1"); - Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartReadMessage((x,y) => {})); + Assert.ThrowsAsync(typeof(InvalidOperationException), + async () => await asyncCall.ReadMessageAsync()); Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); } @@ -123,8 +123,8 @@ namespace Grpc.Core.Internal.Tests public void ClientStreaming_StreamingReadNotAllowed() { asyncCall.ClientStreamingCallAsync(); - Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartReadMessage((x,y) => {})); + Assert.ThrowsAsync(typeof(InvalidOperationException), + async () => await asyncCall.ReadMessageAsync()); } [Test] @@ -182,9 +182,6 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedResponseHeadersHandler(true, new Metadata()); - Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); - // try alternative order of completions fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); fakeCall.ReceivedMessageHandler(true, null); @@ -199,15 +196,35 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ClientResponseStream(asyncCall); var readTask = responseStream.MoveNext(); - fakeCall.ReceivedResponseHeadersHandler(true, new Metadata()); - Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); - fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code. fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal)); AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal); } + [Test] + public void ServerStreaming_MoreResponses_Success() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream(asyncCall); + + var readTask1 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask1.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask2 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask2.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask3 = responseStream.MoveNext(); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageHandler(true, null); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3); + } + ClientSideStatus CreateClientSideStatus(StatusCode statusCode) { return new ClientSideStatus(new Status(statusCode, ""), new Metadata()); @@ -236,7 +253,6 @@ namespace Grpc.Core.Internal.Tests Assert.IsFalse(moveNextTask.Result); Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); - Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); Assert.AreEqual(0, asyncCall.GetTrailers().Count); } @@ -259,7 +275,6 @@ namespace Grpc.Core.Internal.Tests var ex = Assert.ThrowsAsync(async () => await moveNextTask); Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode); - Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); Assert.AreEqual(0, asyncCall.GetTrailers().Count); } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 50ba617cdb..f522174bd0 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -241,11 +241,10 @@ namespace Grpc.Core.Internal /// /// Receives a streaming response. Only one pending read action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// - public void StartReadMessage(AsyncCompletionDelegate completionDelegate) + public Task ReadMessageAsync() { - StartReadMessageInternal(completionDelegate); + return ReadMessageInternalAsync(); } /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 877b997aba..abacfabadb 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,7 +68,7 @@ namespace Grpc.Core.Internal protected bool cancelRequested; protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. - protected AsyncCompletionDelegate readCompletionDelegate; // Completion of a pending send or sendclose if not null. + protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -150,15 +150,25 @@ namespace Grpc.Core.Internal /// Initiates reading a message. Only one read operation can be active at a time. /// completionDelegate is invoked upon completion. /// - protected void StartReadMessageInternal(AsyncCompletionDelegate completionDelegate) + protected Task ReadMessageInternalAsync() { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckReadingAllowed(); + if (readingDone) + { + // the last read that returns null or throws an exception is idempotent + // and maintain its state. + GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); + return streamingReadTcs.Task; + } + + GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); + GrpcPreconditions.CheckState(!disposed); call.StartReceiveMessage(HandleReadFinished); - readCompletionDelegate = completionDelegate; + streamingReadTcs = new TaskCompletionSource(); + return streamingReadTcs.Task; } } @@ -216,10 +226,6 @@ namespace Grpc.Core.Internal protected virtual void CheckReadingAllowed() { GrpcPreconditions.CheckState(started); - GrpcPreconditions.CheckState(!disposed); - - GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed."); - GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); } protected void CheckNotCancelled() @@ -353,12 +359,10 @@ namespace Grpc.Core.Internal TRead msg = default(TRead); var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; - AsyncCompletionDelegate origCompletionDelegate = null; + TaskCompletionSource origTcs = null; lock (myLock) { - origCompletionDelegate = readCompletionDelegate; - readCompletionDelegate = null; - + origTcs = streamingReadTcs; if (receivedMessage == null) { // This was the last read. @@ -368,18 +372,25 @@ namespace Grpc.Core.Internal if (deserializeException != null && IsClient) { readingDone = true; + + // TODO(jtattermusch): it might be too late to set the status CancelWithStatus(DeserializeResponseFailureStatus); } + if (!readingDone) + { + streamingReadTcs = null; + } + ReleaseResourcesIfPossible(); } if (deserializeException != null && !IsClient) { - FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException)); + origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException)); return; } - FireCompletion(origCompletionDelegate, msg, null); + origTcs.SetResult(msg); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index bea2b3660c..cce480b2c4 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -91,11 +91,10 @@ namespace Grpc.Core.Internal /// /// Receives a streaming request. Only one pending read action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// - public void StartReadMessage(AsyncCompletionDelegate completionDelegate) + public Task ReadMessageAsync() { - StartReadMessageInternal(completionDelegate); + return ReadMessageInternalAsync(); } /// diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index d6e34a0f04..ad9423ff58 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -68,9 +68,7 @@ namespace Grpc.Core.Internal { throw new InvalidOperationException("Cancellation of individual reads is not supported."); } - var taskSource = new AsyncCompletionTaskSource(); - call.StartReadMessage(taskSource.CompletionDelegate); - var result = await taskSource.Task.ConfigureAwait(false); + var result = await call.ReadMessageAsync().ConfigureAwait(false); this.current = result; if (result == null) diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index e7be82c318..d76030d1ad 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -68,9 +68,7 @@ namespace Grpc.Core.Internal { throw new InvalidOperationException("Cancellation of individual reads is not supported."); } - var taskSource = new AsyncCompletionTaskSource(); - call.StartReadMessage(taskSource.CompletionDelegate); - var result = await taskSource.Task.ConfigureAwait(false); + var result = await call.ReadMessageAsync().ConfigureAwait(false); this.current = result; return result != null; } -- cgit v1.2.3 From b6a9016fc234714632a20cb25dc2e822a72243f8 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 3 May 2016 09:41:15 -0700 Subject: add tests for AsyncCallServer --- src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj | 2 + .../Internal/AsyncCallServerTest.cs | 134 ++++++++++++++++ .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 134 +--------------- .../Grpc.Core.Tests/Internal/FakeNativeCall.cs | 177 +++++++++++++++++++++ src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 9 ++ 5 files changed, 324 insertions(+), 132 deletions(-) create mode 100644 src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs create mode 100644 src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 0cd059c232..47131fc454 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -84,6 +84,8 @@ + + diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs new file mode 100644 index 0000000000..8c178657a1 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -0,0 +1,134 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + /// + /// Uses fake native call to test interaction of AsyncCallServer wrapping code with C core in different situations. + /// + public class AsyncCallServerTest + { + Server server; + FakeNativeCall fakeCall; + AsyncCallServer asyncCallServer; + + [SetUp] + public void Init() + { + var environment = GrpcEnvironment.AddRef(); + server = new Server(); + + fakeCall = new FakeNativeCall(); + asyncCallServer = new AsyncCallServer( + Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer, + environment, + server); + asyncCallServer.InitializeForTesting(fakeCall); + } + + [TearDown] + public void Cleanup() + { + GrpcEnvironment.Release(); + } + + [Test] + public void CancelNotificationAfterStartDisposes() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream(asyncCallServer); + var responseStream = new ServerResponseStream(asyncCallServer); + + // Finishing requestStream is needed for dispose to happen. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + Assert.IsFalse(moveNextTask.Result); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertDisposed(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void ReadAfterCancelNotificationCanSucceed() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream(asyncCallServer); + var responseStream = new ServerResponseStream(asyncCallServer); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + // Check that startin a read after cancel notification has been processed is legal. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + Assert.IsFalse(moveNextTask.Result); + + AssertDisposed(asyncCallServer, fakeCall, finishedTask); + } + + + // TODO: read completion failure ... + + // TODO: + + + + // TODO: write fails... + + // TODO: write completion fails... + + // TODO: cancellation delivered... + + // TODO: cancel notification in the middle of a read... + + // TODO: cancel notification in the middle of a write... + + // TODO: cancellation delivered... + + // TODO: what does writing status do to reads? + + static void AssertDisposed(AsyncCallServer asyncCallServer, FakeNativeCall fakeCall, Task finishedTask) + { + Assert.IsTrue(fakeCall.IsDisposed); + Assert.IsTrue(finishedTask.IsCompleted); + Assert.DoesNotThrow(() => finishedTask.Wait()); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index ed2d22815b..abe9d4a2e6 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -42,7 +42,7 @@ using NUnit.Framework; namespace Grpc.Core.Internal.Tests { /// - /// Uses fake native call to test interaction of wrapping code with C core in different situations. + /// Uses fake native call to test interaction of AsyncCall wrapping code with C core in different situations. /// public class AsyncCallTest { @@ -480,139 +480,9 @@ namespace Grpc.Core.Internal.Tests Assert.IsTrue(fakeCall.IsDisposed); var ex = Assert.ThrowsAsync(async () => await moveNextTask); + Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode); Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode); Assert.AreEqual(0, asyncCall.GetTrailers().Count); } - - internal class FakeNativeCall : INativeCall - { - public UnaryResponseClientHandler UnaryResponseClientHandler - { - get; - set; - } - - public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler - { - get; - set; - } - - public ReceivedMessageHandler ReceivedMessageHandler - { - get; - set; - } - - public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler - { - get; - set; - } - - public SendCompletionHandler SendCompletionHandler - { - get; - set; - } - - public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler - { - get; - set; - } - - public bool IsCancelled - { - get; - set; - } - - public bool IsDisposed - { - get; - set; - } - - public void Cancel() - { - IsCancelled = true; - } - - public void CancelWithStatus(Status status) - { - IsCancelled = true; - } - - public string GetPeer() - { - return "PEER"; - } - - public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - UnaryResponseClientHandler = callback; - } - - public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - throw new NotImplementedException(); - } - - public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) - { - UnaryResponseClientHandler = callback; - } - - public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - ReceivedStatusOnClientHandler = callback; - } - - public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) - { - ReceivedStatusOnClientHandler = callback; - } - - public void StartReceiveMessage(ReceivedMessageHandler callback) - { - ReceivedMessageHandler = callback; - } - - public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) - { - ReceivedResponseHeadersHandler = callback; - } - - public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) - { - SendCompletionHandler = callback; - } - - public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) - { - SendCompletionHandler = callback; - } - - public void StartSendCloseFromClient(SendCompletionHandler callback) - { - SendCompletionHandler = callback; - } - - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) - { - SendCompletionHandler = callback; - } - - public void StartServerSide(ReceivedCloseOnServerHandler callback) - { - ReceivedCloseOnServerHandler = callback; - } - - public void Dispose() - { - IsDisposed = true; - } - } } } diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs new file mode 100644 index 0000000000..441bf9660b --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs @@ -0,0 +1,177 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + /// + /// For testing purposes. + /// + internal class FakeNativeCall : INativeCall + { + public UnaryResponseClientHandler UnaryResponseClientHandler + { + get; + set; + } + + public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler + { + get; + set; + } + + public ReceivedMessageHandler ReceivedMessageHandler + { + get; + set; + } + + public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler + { + get; + set; + } + + public SendCompletionHandler SendCompletionHandler + { + get; + set; + } + + public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler + { + get; + set; + } + + public bool IsCancelled + { + get; + set; + } + + public bool IsDisposed + { + get; + set; + } + + public void Cancel() + { + IsCancelled = true; + } + + public void CancelWithStatus(Status status) + { + IsCancelled = true; + } + + public string GetPeer() + { + return "PEER"; + } + + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + UnaryResponseClientHandler = callback; + } + + public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + throw new NotImplementedException(); + } + + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) + { + UnaryResponseClientHandler = callback; + } + + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartReceiveMessage(ReceivedMessageHandler callback) + { + ReceivedMessageHandler = callback; + } + + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + ReceivedResponseHeadersHandler = callback; + } + + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + { + SendCompletionHandler = callback; + } + + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + { + SendCompletionHandler = callback; + } + + public void StartSendCloseFromClient(SendCompletionHandler callback) + { + SendCompletionHandler = callback; + } + + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + { + SendCompletionHandler = callback; + } + + public void StartServerSide(ReceivedCloseOnServerHandler callback) + { + ReceivedCloseOnServerHandler = callback; + } + + public void Dispose() + { + IsDisposed = true; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index cce480b2c4..efcf4ea7fe 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -64,6 +64,15 @@ namespace Grpc.Core.Internal InitializeInternal(call); } + /// + /// Only for testing purposes. + /// + public void InitializeForTesting(INativeCall call) + { + server.AddCallReference(this); + InitializeInternal(call); + } + /// /// Starts a server side call. /// -- cgit v1.2.3 From b32e29f0a2f2cbe858e040190f2fd69237007f9a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 3 May 2016 12:55:42 -0700 Subject: make SendStatusFromServer independent on WriteAsync --- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 9 +++------ src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 19 ++++++++++--------- src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 12 +++++------- src/csharp/Grpc.Core/Internal/ServerResponseStream.cs | 7 ------- 4 files changed, 18 insertions(+), 29 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index abacfabadb..18dbe87734 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -69,6 +69,7 @@ namespace Grpc.Core.Internal protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -328,22 +329,18 @@ namespace Grpc.Core.Internal /// protected void HandleSendStatusFromServerFinished(bool success) { - AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; - ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server.")); + sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server.")); } else { - FireCompletion(origCompletionDelegate, null, null); + sendStatusFromServerTcs.SetResult(null); } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index efcf4ea7fe..94f49bd8f2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -136,24 +136,24 @@ namespace Grpc.Core.Internal } /// - /// Sends call result status, also indicating server is done with streaming responses. - /// Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. + /// Sends call result status, indicating we are done with writes. + /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// - public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate completionDelegate) + public Task SendStatusFromServerAsync(Status status, Metadata trailers) { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: false); + GrpcPreconditions.CheckState(started); + GrpcPreconditions.CheckState(!disposed); + GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once."); using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; - readingDone = true; - sendCompletionDelegate = completionDelegate; + sendStatusFromServerTcs = new TaskCompletionSource(); + return sendStatusFromServerTcs.Task; } } @@ -198,12 +198,13 @@ namespace Grpc.Core.Internal /// private void HandleFinishedServerside(bool success, bool cancelled) { + // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER, + // success will be always set to true. lock (myLock) { finished = true; ReleaseResourcesIfPossible(); } - // TODO(jtattermusch): handle error if (cancelled) { diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 1f83e51548..bf9df9f783 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -93,7 +93,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -149,7 +149,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -209,7 +209,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -260,7 +260,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -282,9 +282,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - var responseStream = new ServerResponseStream(asyncCall); - - await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 03e39efc02..ecfee0bfdd 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -57,13 +57,6 @@ namespace Grpc.Core.Internal return taskSource.Task; } - public Task WriteStatusAsync(Status status, Metadata trailers) - { - var taskSource = new AsyncCompletionTaskSource(); - call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); - return taskSource.Task; - } - public Task WriteResponseHeadersAsync(Metadata responseHeaders) { var taskSource = new AsyncCompletionTaskSource(); -- cgit v1.2.3 From 2624cfb511d54b7d41f9eeab4073d2e6c9def9ad Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 3 May 2016 13:04:33 -0700 Subject: fixup --- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 94f49bd8f2..44f2988e21 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -152,6 +152,7 @@ namespace Grpc.Core.Internal call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; + initialMetadataSent = true; sendStatusFromServerTcs = new TaskCompletionSource(); return sendStatusFromServerTcs.Task; } -- cgit v1.2.3 From 739e86c394040031ca6ac116b84e2975fb5fe83a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 3 May 2016 17:24:33 -0700 Subject: finishing serverside request stream should not be required for disposal --- .../Internal/AsyncCallServerTest.cs | 34 +++++++--------------- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 7 +---- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 14 +++++---- 3 files changed, 20 insertions(+), 35 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 0b6981f871..058371521d 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -80,16 +80,24 @@ namespace Grpc.Core.Internal.Tests [Test] public void CancelNotificationAfterStartDisposes() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes() { var finishedTask = asyncCallServer.ServerSideCallAsync(); var requestStream = new ServerRequestStream(asyncCallServer); - // Finishing requestStream is needed for dispose to happen. var moveNextTask = requestStream.MoveNext(); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); fakeCall.ReceivedMessageHandler(true, null); Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -101,9 +109,8 @@ namespace Grpc.Core.Internal.Tests fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); - // Check that startin a read after cancel notification has been processed is legal. + // Check that starting a read after cancel notification has been processed is legal. var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); Assert.IsFalse(moveNextTask.Result); AssertFinished(asyncCallServer, fakeCall, finishedTask); @@ -136,12 +143,6 @@ namespace Grpc.Core.Internal.Tests // TODO(jtattermusch): should we throw a different exception type instead? Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1")); - - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -149,7 +150,6 @@ namespace Grpc.Core.Internal.Tests public void WriteCompletionFailureThrows() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream(asyncCallServer); var responseStream = new ServerResponseStream(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); @@ -157,13 +157,7 @@ namespace Grpc.Core.Internal.Tests // TODO(jtattermusch): should we throw a different exception type instead? Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask); - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); - AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -171,7 +165,6 @@ namespace Grpc.Core.Internal.Tests public void WriteAndWriteStatusCanRunConcurrently() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream(asyncCallServer); var responseStream = new ServerResponseStream(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); @@ -183,11 +176,6 @@ namespace Grpc.Core.Internal.Tests Assert.DoesNotThrowAsync(async () => await writeTask); Assert.DoesNotThrowAsync(async () => await writeStatusTask); - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 18dbe87734..42234dcac2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -155,7 +155,7 @@ namespace Grpc.Core.Internal { lock (myLock) { - CheckReadingAllowed(); + GrpcPreconditions.CheckState(started); if (readingDone) { // the last read that returns null or throws an exception is idempotent @@ -224,11 +224,6 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected virtual void CheckReadingAllowed() - { - GrpcPreconditions.CheckState(started); - } - protected void CheckNotCancelled() { if (cancelRequested) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 44f2988e21..eafe2ccab8 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -183,12 +183,6 @@ namespace Grpc.Core.Internal get { return false; } } - protected override void CheckReadingAllowed() - { - base.CheckReadingAllowed(); - GrpcPreconditions.CheckArgument(!cancelRequested); - } - protected override void OnAfterReleaseResources() { server.RemoveCallReference(this); @@ -204,6 +198,14 @@ namespace Grpc.Core.Internal lock (myLock) { finished = true; + if (streamingReadTcs == null) + { + // if there's no pending read, readingDone=true will dispose now. + // if there is a pending read, we will dispose once that read finishes. + readingDone = true; + streamingReadTcs = new TaskCompletionSource(); + streamingReadTcs.SetResult(default(TRequest)); + } ReleaseResourcesIfPossible(); } -- cgit v1.2.3 From 6504c02568da17113c8ebce408245054ca19f0a3 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 3 May 2016 17:46:49 -0700 Subject: remove useless code and todos --- src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 5 ----- 1 file changed, 5 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index bf9df9f783..00d82d51e8 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -80,8 +80,6 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); var result = await handler(request, context).ConfigureAwait(false); status = context.Status; await responseStream.WriteAsync(result).ConfigureAwait(false); @@ -136,8 +134,6 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); await handler(request, responseStream, context).ConfigureAwait(false); status = context.Status; } @@ -298,7 +294,6 @@ namespace Grpc.Core.Internal return rpcException.Status; } - // TODO(jtattermusch): what is the right status code here? return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } -- cgit v1.2.3 From 305ffd4847617c7206b0b4ccec0fcd8977b2e095 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 4 May 2016 10:26:24 -0700 Subject: make SendStatusFromServer optionally send a message as well --- .../Grpc.Core.Tests/Internal/FakeNativeCall.cs | 3 ++- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 3 ++- src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 7 +++-- src/csharp/Grpc.Core/Internal/INativeCall.cs | 2 +- src/csharp/Grpc.Core/Internal/NativeMethods.cs | 18 +++++++------ src/csharp/ext/grpc_csharp_ext.c | 30 +++++++++++++++------- 6 files changed, 41 insertions(+), 22 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs index 1bec258ca2..909112a47c 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs @@ -165,7 +165,8 @@ namespace Grpc.Core.Internal.Tests SendCompletionHandler = callback; } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalPayload, WriteFlags writeFlags) { SendStatusFromServerHandler = callback; } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index eafe2ccab8..c5d900b134 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -149,7 +149,8 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); + call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent, + null, new WriteFlags()); } halfcloseRequested = true; initialMetadataSent = true; diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 500653ba5d..244b97d4a4 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -135,13 +135,16 @@ namespace Grpc.Core.Internal } } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalPayload, WriteFlags writeFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); + var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); + Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata, + optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index cbef599139..cd3719cb50 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -78,7 +78,7 @@ namespace Grpc.Core.Internal void StartSendCloseFromClient(SendCompletionHandler callback); - void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags); void StartServerSide(ReceivedCloseOnServerHandler callback); } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 9ee0ba3bc0..c277c73ef0 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -421,20 +421,21 @@ namespace Grpc.Core.Internal public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call); public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description); public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call, @@ -593,7 +594,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, @@ -601,7 +602,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] @@ -610,7 +611,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, @@ -618,7 +619,8 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index aeef8a79e9..5b8ff9b819 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -715,10 +715,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code, const char *status_details, grpc_metadata_array *trailing_metadata, - int32_t send_empty_initial_metadata) { + int32_t send_empty_initial_metadata, const char* optional_send_buffer, + size_t optional_send_buffer_len, uint32_t write_flags) { /* TODO: don't use magic number */ - grpc_op ops[2]; - size_t nops = send_empty_initial_metadata ? 2 : 1; + grpc_op ops[3]; + size_t nops = 1; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; ops[0].data.send_status_from_server.status_details = @@ -731,12 +732,23 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( ctx->send_status_from_server.trailing_metadata.metadata; ops[0].flags = 0; ops[0].reserved = NULL; - ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[1].data.send_initial_metadata.count = 0; - ops[1].data.send_initial_metadata.metadata = NULL; - ops[1].flags = 0; - ops[1].reserved = NULL; - + if (optional_send_buffer) { + ops[nops].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(optional_send_buffer, + optional_send_buffer_len); + ops[nops].data.send_message = ctx->send_message; + ops[nops].flags = write_flags; + ops[nops].reserved = NULL; + nops ++; + } + if (send_empty_initial_metadata) { + ops[nops].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[nops].data.send_initial_metadata.count = 0; + ops[nops].data.send_initial_metadata.metadata = NULL; + ops[nops].flags = 0; + ops[nops].reserved = NULL; + nops++; + } return grpc_call_start_batch(call, ops, nops, ctx, NULL); } -- cgit v1.2.3 From 14e8dee2dd1fc2db7c2dfbf092fc7d99a296062a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 4 May 2016 12:59:23 -0700 Subject: use just one response batch for unary response serverside calls --- .../Internal/AsyncCallServerTest.cs | 2 +- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 11 ++++++-- src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 32 +++++++++++----------- 3 files changed, 26 insertions(+), 19 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 058371521d..0e204761f6 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -168,7 +168,7 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ServerResponseStream(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); - var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata()); + var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); fakeCall.SendCompletionHandler(true); fakeCall.SendStatusFromServerHandler(true); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index c5d900b134..b1566b44a7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -139,8 +139,11 @@ namespace Grpc.Core.Internal /// Sends call result status, indicating we are done with writes. /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// - public Task SendStatusFromServerAsync(Status status, Metadata trailers) + public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple optionalWrite) { + byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null; + var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags); + lock (myLock) { GrpcPreconditions.CheckState(started); @@ -150,11 +153,15 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent, - null, new WriteFlags()); + payload, writeFlags); } halfcloseRequested = true; initialMetadataSent = true; sendStatusFromServerTcs = new TaskCompletionSource(); + if (optionalWrite != null) + { + streamingWritesCounter++; + } return sendStatusFromServerTcs.Task; } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 00d82d51e8..bbbefd0699 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -75,14 +75,15 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream(asyncCall); Status status; + Tuple responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - var result = await handler(request, context).ConfigureAwait(false); + var response = await handler(request, context).ConfigureAwait(false); status = context.Status; - await responseStream.WriteAsync(result).ConfigureAwait(false); + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { @@ -91,7 +92,7 @@ namespace Grpc.Core.Internal } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -145,7 +146,7 @@ namespace Grpc.Core.Internal try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -183,19 +184,13 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream(asyncCall); Status status; + Tuple responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { - var result = await handler(requestStream, context).ConfigureAwait(false); + var response = await handler(requestStream, context).ConfigureAwait(false); status = context.Status; - try - { - await responseStream.WriteAsync(result).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - status = Status.DefaultCancelled; - } + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { @@ -205,7 +200,7 @@ namespace Grpc.Core.Internal try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -256,7 +251,7 @@ namespace Grpc.Core.Internal } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -278,7 +273,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } @@ -297,6 +292,11 @@ namespace Grpc.Core.Internal return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } + public static WriteFlags GetWriteFlags(WriteOptions writeOptions) + { + return writeOptions != null ? writeOptions.Flags : default(WriteFlags); + } + public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, ServerResponseStream serverResponseStream, CancellationToken cancellationToken) where TRequest : class where TResponse : class -- cgit v1.2.3 From 2901ea55ed17b16107bda30091a8ca0b84f6a926 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 4 May 2016 13:42:04 -0700 Subject: improve serverside handlers --- src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 40 +++++++++++++++------- 1 file changed, 28 insertions(+), 12 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index bbbefd0699..85b7a4b01e 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -87,16 +87,20 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -140,7 +144,10 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } @@ -148,9 +155,10 @@ namespace Grpc.Core.Internal { await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -194,7 +202,10 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } @@ -202,9 +213,10 @@ namespace Grpc.Core.Internal { await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -246,16 +258,20 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } -- cgit v1.2.3 From 16caa50aae7d1669550be35e205039f65cc4c363 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Fri, 6 May 2016 03:02:51 +0200 Subject: Master is now 0.15.0-dev. --- Makefile | 2 +- build.yaml | 2 +- composer.json | 2 +- package.json | 2 +- package.xml | 8 ++++---- src/core/lib/surface/version.c | 2 +- src/csharp/Grpc.Core/VersionInfo.cs | 4 ++-- src/csharp/build_packages.bat | 2 +- src/node/tools/package.json | 2 +- src/python/grpcio/grpc_version.py | 2 +- src/ruby/lib/grpc/version.rb | 2 +- src/ruby/tools/version.rb | 2 +- tools/distrib/python/grpcio_tools/grpc_version.py | 2 +- tools/doxygen/Doxyfile.c++ | 2 +- tools/doxygen/Doxyfile.c++.internal | 2 +- tools/doxygen/Doxyfile.core | 2 +- tools/doxygen/Doxyfile.core.internal | 2 +- 17 files changed, 21 insertions(+), 21 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/Makefile b/Makefile index e77aa2dd16..a684ea8611 100644 --- a/Makefile +++ b/Makefile @@ -407,7 +407,7 @@ E = @echo Q = @ endif -VERSION = 0.14.0-dev +VERSION = 0.15.0-dev CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES)) CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS) diff --git a/build.yaml b/build.yaml index 1a0888bdc3..13916830c1 100644 --- a/build.yaml +++ b/build.yaml @@ -7,7 +7,7 @@ settings: '#3': Use "-preN" suffixes to identify pre-release versions '#4': Per-language overrides are possible with (eg) ruby_version tag here '#5': See the expand_version.py for all the quirks here - version: 0.14.0-dev + version: 0.15.0-dev filegroups: - name: census public_headers: diff --git a/composer.json b/composer.json index 97b1a5cb49..b77a59e351 100644 --- a/composer.json +++ b/composer.json @@ -2,7 +2,7 @@ "name": "grpc/grpc", "type": "library", "description": "gRPC library for PHP", - "version": "0.14.0", + "version": "0.15.0", "keywords": ["rpc"], "homepage": "http://grpc.io", "license": "BSD-3-Clause", diff --git a/package.json b/package.json index 5ed7f363d3..54a44ca551 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "grpc", - "version": "0.14.0-dev", + "version": "0.15.0-dev", "author": "Google Inc.", "description": "gRPC Library for Node", "homepage": "http://www.grpc.io/", diff --git a/package.xml b/package.xml index 716d6ed289..152d5d6190 100644 --- a/package.xml +++ b/package.xml @@ -13,8 +13,8 @@ 2016-04-19 - 0.14.0 - 0.14.0 + 0.15.0 + 0.15.0 beta @@ -1054,8 +1054,8 @@ Update to wrap gRPC C Core version 0.10.0 - 0.14.0 - 0.14.0 + 0.15.0 + 0.15.0 beta diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index fe954cbefb..aca76d2bb7 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -36,4 +36,4 @@ #include -const char *grpc_version_string(void) { return "0.14.0-dev"; } +const char *grpc_version_string(void) { return "0.15.0-dev"; } diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index f7a9cb9c1c..e1609341d9 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -48,11 +48,11 @@ namespace Grpc.Core /// /// Current AssemblyFileVersion of gRPC C# assemblies /// - public const string CurrentAssemblyFileVersion = "0.14.0.0"; + public const string CurrentAssemblyFileVersion = "0.15.0.0"; /// /// Current version of gRPC C# /// - public const string CurrentVersion = "0.14.0-dev"; + public const string CurrentVersion = "0.15.0-dev"; } } diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat index 9a60be26b6..7520b0f81a 100644 --- a/src/csharp/build_packages.bat +++ b/src/csharp/build_packages.bat @@ -1,7 +1,7 @@ @rem Builds gRPC NuGet packages @rem Current package versions -set VERSION=0.14.0-dev +set VERSION=0.15.0-dev set PROTOBUF_VERSION=3.0.0-beta2 @rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well. diff --git a/src/node/tools/package.json b/src/node/tools/package.json index d98ed0b1fc..efdfa81124 100644 --- a/src/node/tools/package.json +++ b/src/node/tools/package.json @@ -1,6 +1,6 @@ { "name": "grpc-tools", - "version": "0.14.0-dev", + "version": "0.15.0-dev", "author": "Google Inc.", "description": "Tools for developing with gRPC on Node.js", "homepage": "http://www.grpc.io/", diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 873b4e2a91..0c13104d9d 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION='0.14.0.dev0' +VERSION='0.15.0.dev0' diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index 67c6a5d5a1..01c8c5ac8f 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -29,5 +29,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '0.14.0.dev' + VERSION = '0.15.0.dev' end diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb index 12ad21b80e..dca7fd7e72 100644 --- a/src/ruby/tools/version.rb +++ b/src/ruby/tools/version.rb @@ -29,6 +29,6 @@ module GRPC module Tools - VERSION = '0.14.0.dev' + VERSION = '0.15.0.dev' end end diff --git a/tools/distrib/python/grpcio_tools/grpc_version.py b/tools/distrib/python/grpcio_tools/grpc_version.py index b8ae8e20b8..1267d0e45d 100644 --- a/tools/distrib/python/grpcio_tools/grpc_version.py +++ b/tools/distrib/python/grpcio_tools/grpc_version.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/tools/distrib/python/grpcio_tools/grpc_version.py.template`!!! -VERSION='0.14.0.dev0' +VERSION='0.15.0.dev0' diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 664ca03d97..2a319db979 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.14.0-dev +PROJECT_NUMBER = 0.15.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 5188ef1e8d..5fdfafbf3e 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.14.0-dev +PROJECT_NUMBER = 0.15.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core index 84b5c2a8ef..aabca410da 100644 --- a/tools/doxygen/Doxyfile.core +++ b/tools/doxygen/Doxyfile.core @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.14.0-dev +PROJECT_NUMBER = 0.15.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 228c1d98d8..3ffc6174ed 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.14.0-dev +PROJECT_NUMBER = 0.15.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a -- cgit v1.2.3 From 6220033e7df811e7b38afb7c9bd39887dd549e23 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 9 May 2016 12:45:27 -0700 Subject: change typo in the comment --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index f522174bd0..da1e6592d1 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -57,7 +57,7 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource unaryResponseTcs; - // Indicates that steaming call has finished. + // Indicates that response streaming call has finished. TaskCompletionSource streamingCallFinishedTcs = new TaskCompletionSource(); // Response headers set here once received. -- cgit v1.2.3 From 98f2430d2d02ceff46b42f4e7e88786e04deb2d6 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 9 May 2016 13:04:30 -0700 Subject: throw RpcException from writes after finishing --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 13 +++++++++++++ src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index da1e6592d1..55351869b5 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -443,6 +443,19 @@ namespace Grpc.Core.Internal } } + protected override void CheckSendingAllowed(bool allowFinished) + { + base.CheckSendingAllowed(true); + + // throwing RpcException if we already received status on client + // side makes the most sense. + // Note that this throws even for StatusCode.OK. + if (!allowFinished && finishedStatus.HasValue) + { + throw new RpcException(finishedStatus.Value.Status); + } + } + /// /// Handles receive status completion for calls with streaming response. /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 42234dcac2..4de23706b2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -213,7 +213,7 @@ namespace Grpc.Core.Internal { } - protected void CheckSendingAllowed(bool allowFinished) + protected virtual void CheckSendingAllowed(bool allowFinished) { GrpcPreconditions.CheckState(started); CheckNotCancelled(); -- cgit v1.2.3 From 65ca9dcabc464a1ea9ea74b8891c77c27008dacd Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 4 May 2016 14:23:48 -0700 Subject: eliminate a thread switch when invoking server-side handler --- src/csharp/Grpc.Core/Server.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 5b61b7f060..9b0895d855 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -283,6 +283,8 @@ namespace Grpc.Core /// private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) { + Task.Run(AllowOneRpc); + if (success) { ServerRpcNew newRpc = ctx.GetServerRpcNew(this); @@ -290,11 +292,9 @@ namespace Grpc.Core // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - Task.Run(async () => await HandleCallAsync(newRpc)).ConfigureAwait(false); + HandleCallAsync(newRpc); // we don't need to await. } } - - AllowOneRpc(); } /// -- cgit v1.2.3 From 26cc1427e93e5f65a5a1a07edd275fdf713e7309 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 4 May 2016 17:21:17 -0700 Subject: start server with more than one allowed RPCs --- src/csharp/Grpc.Core/Server.cs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 9b0895d855..fea76d557a 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -48,6 +48,7 @@ namespace Grpc.Core /// public class Server { + const int InitialAllowRpcTokenCount = 10; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -129,7 +130,13 @@ namespace Grpc.Core startRequested = true; handle.Start(); - AllowOneRpc(); + + // Starting with more than one AllowOneRpc tokens can significantly increase + // unary RPC throughput. + for (int i = 0; i < InitialAllowRpcTokenCount; i++) + { + AllowOneRpc(); + } } } -- cgit v1.2.3 From df0872f2a609c9fc448923ad6a466f9b9033a63a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 9 May 2016 15:03:05 -0700 Subject: expose experimental API to set GrpcThreadPool size. --- src/csharp/Grpc.Core/GrpcEnvironment.cs | 34 +++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index a5c78cc9d7..bee0ef1d62 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -45,11 +45,12 @@ namespace Grpc.Core /// public class GrpcEnvironment { - const int THREAD_POOL_SIZE = 4; + const int MinDefaultThreadPoolSize = 4; static object staticLock = new object(); static GrpcEnvironment instance; static int refCount; + static int? customThreadPoolSize; static ILogger logger = new ConsoleLogger(); @@ -122,6 +123,23 @@ namespace Grpc.Core logger = customLogger; } + /// + /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events. + /// Can be only invoke before the GrpcEnviroment is started and cannot be changed afterwards. + /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// + public static void SetThreadPoolSize(int threadCount) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(threadCount > 0, "threadCount needs to be a positive number"); + customThreadPoolSize = threadCount; + } + } + /// /// Creates gRPC environment. /// @@ -129,7 +147,7 @@ namespace Grpc.Core { GrpcNativeInit(); completionRegistry = new CompletionRegistry(this); - threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault()); threadPool.Start(); } @@ -200,5 +218,17 @@ namespace Grpc.Core debugStats.CheckOK(); } + + private int GetThreadPoolSizeOrDefault() + { + if (customThreadPoolSize.HasValue) + { + return customThreadPoolSize.Value; + } + // In systems with many cores, use half of the cores for GrpcThreadPool + // and the other half for .NET thread pool. This heuristic definitely needs + // more work, but seems to work reasonably well for a start. + return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2); + } } } -- cgit v1.2.3 From 1e1fa0870f60b1c0d23da17b2aa7db9a3ca7f1ae Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 9 May 2016 15:18:11 -0700 Subject: dont lock to run server_request_call --- src/csharp/Grpc.Core/Server.cs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index fea76d557a..3a337ba831 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -66,7 +66,7 @@ namespace Grpc.Core readonly TaskCompletionSource shutdownTcs = new TaskCompletionSource(); bool startRequested; - bool shutdownRequested; + volatile bool shutdownRequested; /// /// Create a new server. @@ -246,12 +246,9 @@ namespace Grpc.Core /// private void AllowOneRpc() { - lock (myLock) + if (!shutdownRequested) { - if (!shutdownRequested) - { - handle.RequestCall(HandleNewServerRpc, environment); - } + handle.RequestCall(HandleNewServerRpc, environment); } } -- cgit v1.2.3 From dae51b0fe5fcc85d2ab698d417d711e39286b380 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 9 May 2016 19:30:04 -0700 Subject: fix compilation on windows --- src/csharp/Grpc.Core/Server.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 3a337ba831..d538a4671f 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -287,7 +287,7 @@ namespace Grpc.Core /// private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) { - Task.Run(AllowOneRpc); + Task.Run(() => AllowOneRpc()); if (success) { -- cgit v1.2.3 From 528fb6651cf6c19032e30dacdf437a3b7caf05e3 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 12 May 2016 08:38:41 -0700 Subject: improve channel behavior in shutdown situations --- src/csharp/Grpc.Core.Tests/ChannelTest.cs | 39 +++++++++++++++++++++++++++++++ src/csharp/Grpc.Core/Channel.cs | 34 ++++++++++++++++++++++++--- 2 files changed, 70 insertions(+), 3 deletions(-) (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs index 6330f50fae..850d70ce92 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -89,5 +90,43 @@ namespace Grpc.Core.Tests channel.ShutdownAsync().Wait(); Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync()); } + + [Test] + public async Task ShutdownTokenCancelledAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested); + var shutdownTask = channel.ShutdownAsync(); + Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested); + await shutdownTask; + } + + [Test] + public async Task StateIsFatalFailureAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + await channel.ShutdownAsync(); + Assert.AreEqual(ChannelState.FatalFailure, channel.State); + } + + [Test] + public async Task ShutdownFinishesWaitForStateChangedAsync() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle); + var shutdownTask = channel.ShutdownAsync(); + await stateChangedTask; + await shutdownTask; + } + + [Test] + public async Task OperationsThrowAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + await channel.ShutdownAsync(); + Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle)); + Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; }); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync()); + } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 89981b1849..93a6e6a3d9 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -51,6 +52,7 @@ namespace Grpc.Core readonly object myLock = new object(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource(); readonly string target; readonly GrpcEnvironment environment; @@ -101,12 +103,13 @@ namespace Grpc.Core /// /// Gets current connectivity state of this channel. + /// After channel is has been shutdown, ChannelState.FatalFailure will be returned. /// public ChannelState State { get { - return handle.CheckConnectivityState(false); + return GetConnectivityState(false); } } @@ -154,6 +157,17 @@ namespace Grpc.Core } } + /// + /// Returns a token that gets cancelled once ShutdownAsync is invoked. + /// + public CancellationToken ShutdownToken + { + get + { + return this.shutdownTokenSource.Token; + } + } + /// /// Allows explicitly requesting channel to connect without starting an RPC. /// Returned task completes once state Ready was seen. If the deadline is reached, @@ -164,7 +178,7 @@ namespace Grpc.Core /// The deadline. null indicates no deadline. public async Task ConnectAsync(DateTime? deadline = null) { - var currentState = handle.CheckConnectivityState(true); + var currentState = GetConnectivityState(true); while (currentState != ChannelState.Ready) { if (currentState == ChannelState.FatalFailure) @@ -172,7 +186,7 @@ namespace Grpc.Core throw new OperationCanceledException("Channel has reached FatalFailure state."); } await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); - currentState = handle.CheckConnectivityState(false); + currentState = GetConnectivityState(false); } } @@ -188,6 +202,8 @@ namespace Grpc.Core shutdownRequested = true; } + shutdownTokenSource.Cancel(); + var activeCallCount = activeCallCounter.Count; if (activeCallCount > 0) { @@ -231,6 +247,18 @@ namespace Grpc.Core activeCallCounter.Decrement(); } + private ChannelState GetConnectivityState(bool tryToConnect) + { + try + { + return handle.CheckConnectivityState(tryToConnect); + } + catch (ObjectDisposedException) + { + return ChannelState.FatalFailure; + } + } + private static void EnsureUserAgentChannelOption(Dictionary options) { var key = ChannelOptions.PrimaryUserAgentString; -- cgit v1.2.3