diff options
Diffstat (limited to 'src')
20 files changed, 932 insertions, 401 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index 59137e1c92..8e76e6dce6 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -182,18 +182,40 @@ bool GetModuleAndMessagePath(const Descriptor* type, return true; } +// Get all comments (leading, leading_detached, trailing) and print them as a +// docstring. Any leading space of a line will be removed, but the line wrapping +// will not be changed. +template <typename DescriptorType> +static void PrintAllComments(const DescriptorType* desc, Printer* printer) { + std::vector<grpc::string> comments; + grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING_DETACHED, + &comments); + grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING, + &comments); + grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_TRAILING, + &comments); + if (comments.empty()) { + return; + } + printer->Print("\"\"\""); + for (auto it = comments.begin(); it != comments.end(); ++it) { + size_t start_pos = it->find_first_not_of(' '); + if (start_pos != grpc::string::npos) { + printer->Print(it->c_str() + start_pos); + } + printer->Print("\n"); + } + printer->Print("\"\"\"\n"); +} + bool PrintBetaServicer(const ServiceDescriptor* service, Printer* out) { - grpc::string doc = "<fill me in later!>"; - map<grpc::string, grpc::string> dict = ListToDict({ - "Service", service->name(), - "Documentation", doc, - }); out->Print("\n"); - out->Print(dict, "class Beta$Service$Servicer(object):\n"); + out->Print("class Beta$Service$Servicer(object):\n", "Service", + service->name()); { IndentScope raii_class_indent(out); - out->Print(dict, "\"\"\"$Documentation$\"\"\"\n"); + PrintAllComments(service, out); for (int i = 0; i < service->method_count(); ++i) { auto meth = service->method(i); grpc::string arg_name = meth->client_streaming() ? @@ -202,6 +224,7 @@ bool PrintBetaServicer(const ServiceDescriptor* service, "Method", meth->name(), "ArgName", arg_name); { IndentScope raii_method_indent(out); + PrintAllComments(meth, out); out->Print("context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)\n"); } } @@ -211,16 +234,11 @@ bool PrintBetaServicer(const ServiceDescriptor* service, bool PrintBetaStub(const ServiceDescriptor* service, Printer* out) { - grpc::string doc = "The interface to which stubs will conform."; - map<grpc::string, grpc::string> dict = ListToDict({ - "Service", service->name(), - "Documentation", doc, - }); out->Print("\n"); - out->Print(dict, "class Beta$Service$Stub(object):\n"); + out->Print("class Beta$Service$Stub(object):\n", "Service", service->name()); { IndentScope raii_class_indent(out); - out->Print(dict, "\"\"\"$Documentation$\"\"\"\n"); + PrintAllComments(service, out); for (int i = 0; i < service->method_count(); ++i) { const MethodDescriptor* meth = service->method(i); grpc::string arg_name = meth->client_streaming() ? @@ -229,6 +247,7 @@ bool PrintBetaStub(const ServiceDescriptor* service, out->Print(methdict, "def $Method$(self, $ArgName$, timeout):\n"); { IndentScope raii_method_indent(out); + PrintAllComments(meth, out); out->Print("raise NotImplementedError()\n"); } if (!meth->server_streaming()) { diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 0a51780a14..23c7b7b897 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -56,10 +56,6 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, /** Destroy arguments created by \a grpc_channel_args_copy */ void grpc_channel_args_destroy(grpc_channel_args *a); -/** Reads census_enabled settings from channel args. Returns 1 if census_enabled - * is specified in channel args, otherwise returns 0. */ -int grpc_channel_args_is_census_enabled(const grpc_channel_args *a); - /** Returns the compression algorithm set in \a a. */ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( const grpc_channel_args *a); diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 33a8f755e6..8e8d42eb29 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -48,124 +48,6 @@ #include "src/core/lib/profiling/timers.h" -namespace { - -const int kGrpcBufferWriterMaxBufferLength = 8192; - -class GrpcBufferWriter GRPC_FINAL - : public ::grpc::protobuf::io::ZeroCopyOutputStream { - public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) - : block_size_(block_size), byte_count_(0), have_backup_(false) { - *bp = grpc_raw_byte_buffer_create(NULL, 0); - slice_buffer_ = &(*bp)->data.raw.slice_buffer; - } - - ~GrpcBufferWriter() GRPC_OVERRIDE { - if (have_backup_) { - gpr_slice_unref(backup_slice_); - } - } - - bool Next(void** data, int* size) GRPC_OVERRIDE { - if (have_backup_) { - slice_ = backup_slice_; - have_backup_ = false; - } else { - slice_ = gpr_slice_malloc(block_size_); - } - *data = GPR_SLICE_START_PTR(slice_); - // On win x64, int is only 32bit - GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_); - gpr_slice_buffer_add(slice_buffer_, slice_); - return true; - } - - void BackUp(int count) GRPC_OVERRIDE { - gpr_slice_buffer_pop(slice_buffer_); - if (count == block_size_) { - backup_slice_ = slice_; - } else { - backup_slice_ = - gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count); - gpr_slice_buffer_add(slice_buffer_, slice_); - } - have_backup_ = true; - byte_count_ -= count; - } - - grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; } - - private: - const int block_size_; - int64_t byte_count_; - gpr_slice_buffer* slice_buffer_; - bool have_backup_; - gpr_slice backup_slice_; - gpr_slice slice_; -}; - -class GrpcBufferReader GRPC_FINAL - : public ::grpc::protobuf::io::ZeroCopyInputStream { - public: - explicit GrpcBufferReader(grpc_byte_buffer* buffer) - : byte_count_(0), backup_count_(0) { - grpc_byte_buffer_reader_init(&reader_, buffer); - } - ~GrpcBufferReader() GRPC_OVERRIDE { - grpc_byte_buffer_reader_destroy(&reader_); - } - - bool Next(const void** data, int* size) GRPC_OVERRIDE { - if (backup_count_ > 0) { - *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - - backup_count_; - GPR_ASSERT(backup_count_ <= INT_MAX); - *size = (int)backup_count_; - backup_count_ = 0; - return true; - } - if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) { - return false; - } - gpr_slice_unref(slice_); - *data = GPR_SLICE_START_PTR(slice_); - // On win x64, int is only 32bit - GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_); - return true; - } - - void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; } - - bool Skip(int count) GRPC_OVERRIDE { - const void* data; - int size; - while (Next(&data, &size)) { - if (size >= count) { - BackUp(size - count); - return true; - } - // size < count; - count -= size; - } - // error or we have too large count; - return false; - } - - grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { - return byte_count_ - backup_count_; - } - - private: - int64_t byte_count_; - int64_t backup_count_; - grpc_byte_buffer_reader reader_; - gpr_slice slice_; -}; -} // namespace - namespace grpc { grpc_completion_queue* CoreCodegen::grpc_completion_queue_create( @@ -192,6 +74,44 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } +void CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, + grpc_byte_buffer* buffer) { + ::grpc_byte_buffer_reader_init(reader, buffer); +} + +void CoreCodegen::grpc_byte_buffer_reader_destroy( + grpc_byte_buffer_reader* reader) { + ::grpc_byte_buffer_reader_destroy(reader); +} + +int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, + gpr_slice* slice) { + return ::grpc_byte_buffer_reader_next(reader, slice); +} + +grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(gpr_slice* slice, + size_t nslices) { + return ::grpc_raw_byte_buffer_create(slice, nslices); +} + +gpr_slice CoreCodegen::gpr_slice_malloc(size_t length) { + return ::gpr_slice_malloc(length); +} + +void CoreCodegen::gpr_slice_unref(gpr_slice slice) { ::gpr_slice_unref(slice); } + +gpr_slice CoreCodegen::gpr_slice_split_tail(gpr_slice* s, size_t split) { + return ::gpr_slice_split_tail(s, split); +} + +void CoreCodegen::gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) { + ::gpr_slice_buffer_add(sb, slice); +} + +void CoreCodegen::gpr_slice_buffer_pop(gpr_slice_buffer* sb) { + ::gpr_slice_buffer_pop(sb); +} + void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) { ::grpc_metadata_array_init(array); } @@ -200,6 +120,10 @@ void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) { ::grpc_metadata_array_destroy(array); } +const Status& CoreCodegen::ok() { return grpc::Status::OK; } + +const Status& CoreCodegen::cancelled() { return grpc::Status::CANCELLED; } + gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) { return ::gpr_inf_future(type); } @@ -209,48 +133,4 @@ void CoreCodegen::assert_fail(const char* failed_assertion) { abort(); } -Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp) { - GPR_TIMER_SCOPE("SerializeProto", 0); - int byte_size = msg.ByteSize(); - if (byte_size <= kGrpcBufferWriterMaxBufferLength) { - gpr_slice slice = gpr_slice_malloc(byte_size); - GPR_ASSERT(GPR_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); - *bp = grpc_raw_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); - return Status::OK; - } else { - GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength); - return msg.SerializeToZeroCopyStream(&writer) - ? Status::OK - : Status(StatusCode::INTERNAL, "Failed to serialize message"); - } -} - -Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg, - int max_message_size) { - GPR_TIMER_SCOPE("DeserializeProto", 0); - if (buffer == nullptr) { - return Status(StatusCode::INTERNAL, "No payload"); - } - Status result = Status::OK; - { - GrpcBufferReader reader(buffer); - ::grpc::protobuf::io::CodedInputStream decoder(&reader); - if (max_message_size > 0) { - decoder.SetTotalBytesLimit(max_message_size, max_message_size); - } - if (!msg->ParseFromCodedStream(&decoder)) { - result = Status(StatusCode::INTERNAL, msg->InitializationErrorString()); - } - if (!decoder.ConsumedEntireMessage()) { - result = Status(StatusCode::INTERNAL, "Did not read entire message"); - } - } - grpc_byte_buffer_destroy(buffer); - return result; -} - } // namespace grpc diff --git a/src/cpp/common/core_codegen.h b/src/cpp/common/core_codegen.h index e15cb4c34a..656b11e7e7 100644 --- a/src/cpp/common/core_codegen.h +++ b/src/cpp/common/core_codegen.h @@ -42,13 +42,6 @@ namespace grpc { /// Implementation of the core codegen interface. class CoreCodegen : public CoreCodegenInterface { private: - Status SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp) override; - - Status DeserializeProto(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg, - int max_message_size) override; - grpc_completion_queue* grpc_completion_queue_create(void* reserved) override; void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, @@ -60,11 +53,30 @@ class CoreCodegen : public CoreCodegenInterface { void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; + void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, + grpc_byte_buffer* buffer) override; + void grpc_byte_buffer_reader_destroy( + grpc_byte_buffer_reader* reader) override; + int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, + gpr_slice* slice) override; + + grpc_byte_buffer* grpc_raw_byte_buffer_create(gpr_slice* slice, + size_t nslices) override; + + gpr_slice gpr_slice_malloc(size_t length) override; + void gpr_slice_unref(gpr_slice slice) override; + gpr_slice gpr_slice_split_tail(gpr_slice* s, size_t split) override; + void gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) override; + void gpr_slice_buffer_pop(gpr_slice_buffer* sb) override; + void grpc_metadata_array_init(grpc_metadata_array* array) override; void grpc_metadata_array_destroy(grpc_metadata_array* array) override; gpr_timespec gpr_inf_future(gpr_clock_type type) override; + virtual const Status& ok() override; + virtual const Status& cancelled() override; + void assert_fail(const char* failed_assertion) override; }; diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 6c13a4fa48..d92addbf54 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -167,6 +167,37 @@ namespace Grpc.Core.Tests } [Test] + public async Task ServerStreamingCall_EndOfStreamIsIdempotent() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + + Assert.IsFalse(await call.ResponseStream.MoveNext()); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + + [Test] + public async Task ServerStreamingCall_ErrorCanBeAwaitedTwice() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + context.Status = new Status(StatusCode.InvalidArgument, ""); + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + + var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode); + + // attempting MoveNext again should result in throwing the same exception. + var ex2 = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.InvalidArgument, ex2.Status.StatusCode); + } + + [Test] public async Task DuplexStreamingCall() { helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => @@ -209,6 +240,38 @@ namespace Grpc.Core.Tests } [Test] + public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() + { + var handlerStartedBarrier = new TaskCompletionSource<object>(); + var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>(); + var successTcs = new TaskCompletionSource<string>(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + handlerStartedBarrier.SetResult(null); + + // wait for cancellation to be delivered. + context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); + await cancelNotificationReceivedBarrier.Task; + + var moveNextResult = await requestStream.MoveNext(); + successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); + return ""; + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + + await handlerStartedBarrier.Task; + cts.Cancel(); + + var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync); + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + + Assert.AreEqual("SUCCESS", await successTcs.Task); + } + + [Test] public async Task AsyncUnaryCall_EchoMetadata() { helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 0cd059c232..47131fc454 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -84,6 +84,8 @@ <Compile Include="SanityTest.cs" /> <Compile Include="HalfcloseTest.cs" /> <Compile Include="NUnitMain.cs" /> + <Compile Include="Internal\FakeNativeCall.cs" /> + <Compile Include="Internal\AsyncCallServerTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs new file mode 100644 index 0000000000..058371521d --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -0,0 +1,191 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + /// <summary> + /// Uses fake native call to test interaction of <c>AsyncCallServer</c> wrapping code with C core in different situations. + /// </summary> + public class AsyncCallServerTest + { + Server server; + FakeNativeCall fakeCall; + AsyncCallServer<string, string> asyncCallServer; + + [SetUp] + public void Init() + { + var environment = GrpcEnvironment.AddRef(); + + // Create a fake server just so we have an instance to refer to. + // The server won't actually be used at all. + server = new Server() + { + Ports = { { "localhost", 0, ServerCredentials.Insecure } } + }; + server.Start(); + + fakeCall = new FakeNativeCall(); + asyncCallServer = new AsyncCallServer<string, string>( + Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer, + environment, + server); + asyncCallServer.InitializeForTesting(fakeCall); + } + + [TearDown] + public void Cleanup() + { + server.ShutdownAsync().Wait(); + GrpcEnvironment.Release(); + } + + [Test] + public void CancelNotificationAfterStartDisposes() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<string, string>(asyncCallServer); + + var moveNextTask = requestStream.MoveNext(); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + fakeCall.ReceivedMessageHandler(true, null); + Assert.IsFalse(moveNextTask.Result); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void ReadAfterCancelNotificationCanSucceed() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<string, string>(asyncCallServer); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + // Check that starting a read after cancel notification has been processed is legal. + var moveNextTask = requestStream.MoveNext(); + Assert.IsFalse(moveNextTask.Result); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void ReadCompletionFailureClosesRequestStream() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<string, string>(asyncCallServer); + + // if a read completion's success==false, the request stream will silently finish + // and we rely on C core cancelling the call. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(false, null); + Assert.IsFalse(moveNextTask.Result); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void WriteAfterCancelNotificationFails() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<string, string>(asyncCallServer); + var responseStream = new ServerResponseStream<string, string>(asyncCallServer); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + // TODO(jtattermusch): should we throw a different exception type instead? + Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1")); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void WriteCompletionFailureThrows() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var responseStream = new ServerResponseStream<string, string>(asyncCallServer); + + var writeTask = responseStream.WriteAsync("request1"); + fakeCall.SendCompletionHandler(false); + // TODO(jtattermusch): should we throw a different exception type instead? + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void WriteAndWriteStatusCanRunConcurrently() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var responseStream = new ServerResponseStream<string, string>(asyncCallServer); + + var writeTask = responseStream.WriteAsync("request1"); + var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata()); + + fakeCall.SendCompletionHandler(true); + fakeCall.SendStatusFromServerHandler(true); + + Assert.DoesNotThrowAsync(async () => await writeTask); + Assert.DoesNotThrowAsync(async () => await writeStatusTask); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask) + { + Assert.IsTrue(fakeCall.IsDisposed); + Assert.IsTrue(finishedTask.IsCompleted); + Assert.DoesNotThrow(() => finishedTask.Wait()); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index a678e4dafe..abe9d4a2e6 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using System.Threading.Tasks; @@ -40,6 +41,9 @@ using NUnit.Framework; namespace Grpc.Core.Internal.Tests { + /// <summary> + /// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations. + /// </summary> public class AsyncCallTest { Channel channel; @@ -75,8 +79,8 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_StreamingOperationsNotAllowed() { asyncCall.UnaryCallAsync("request1"); - Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartReadMessage((x,y) => {})); + Assert.ThrowsAsync(typeof(InvalidOperationException), + async () => await asyncCall.ReadMessageAsync()); Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); } @@ -119,6 +123,14 @@ namespace Grpc.Core.Internal.Tests } [Test] + public void ClientStreaming_StreamingReadNotAllowed() + { + asyncCall.ClientStreamingCallAsync(); + Assert.ThrowsAsync(typeof(InvalidOperationException), + async () => await asyncCall.ReadMessageAsync()); + } + + [Test] public void ClientStreaming_NoRequest_Success() { var resultTask = asyncCall.ClientStreamingCallAsync(); @@ -142,6 +154,283 @@ namespace Grpc.Core.Internal.Tests AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); } + [Test] + public void ClientStreaming_MoreRequests_Success() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + + var writeTask = requestStream.WriteAsync("request1"); + fakeCall.SendCompletionHandler(true); + writeTask.Wait(); + + var writeTask2 = requestStream.WriteAsync("request2"); + fakeCall.SendCompletionHandler(true); + writeTask2.Wait(); + + var completeTask = requestStream.CompleteAsync(); + fakeCall.SendCompletionHandler(true); + completeTask.Wait(); + + fakeCall.UnaryResponseClientHandler(true, + new ClientSideStatus(Status.DefaultSuccess, new Metadata()), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); + } + + [Test] + public void ClientStreaming_WriteFailure() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + + var writeTask = requestStream.WriteAsync("request1"); + fakeCall.SendCompletionHandler(false); + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask); + + fakeCall.UnaryResponseClientHandler(true, + CreateClientSideStatus(StatusCode.Internal), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); + } + + [Test] + public void ClientStreaming_WriteAfterReceivingStatusFails() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + + fakeCall.UnaryResponseClientHandler(true, + new ClientSideStatus(Status.DefaultSuccess, new Metadata()), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); + Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1")); + } + + [Test] + public void ClientStreaming_CompleteAfterReceivingStatusSucceeds() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + + fakeCall.UnaryResponseClientHandler(true, + new ClientSideStatus(Status.DefaultSuccess, new Metadata()), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); + Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); + } + + [Test] + public void ClientStreaming_WriteAfterCancellationRequestFails() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + + asyncCall.Cancel(); + Assert.IsTrue(fakeCall.IsCancelled); + + Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + + fakeCall.UnaryResponseClientHandler(true, + CreateClientSideStatus(StatusCode.Cancelled), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled); + } + + [Test] + public void ServerStreaming_StreamingSendNotAllowed() + { + asyncCall.StartServerStreamingCall("request1"); + Assert.Throws(typeof(InvalidOperationException), + () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); + } + + [Test] + public void ServerStreaming_NoResponse_Success1() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + var readTask = responseStream.MoveNext(); + + fakeCall.ReceivedResponseHeadersHandler(true, new Metadata()); + Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); + + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + } + + [Test] + public void ServerStreaming_NoResponse_Success2() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + var readTask = responseStream.MoveNext(); + + // try alternative order of completions + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageHandler(true, null); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + } + + [Test] + public void ServerStreaming_NoResponse_ReadFailure() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + var readTask = responseStream.MoveNext(); + + fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code. + fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal)); + + AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal); + } + + [Test] + public void ServerStreaming_MoreResponses_Success() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + + var readTask1 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask1.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask2 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask2.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask3 = responseStream.MoveNext(); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageHandler(true, null); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3); + } + + [Test] + public void DuplexStreaming_NoRequestNoResponse_Success() + { + asyncCall.StartDuplexStreamingCall(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + + var writeTask1 = requestStream.CompleteAsync(); + fakeCall.SendCompletionHandler(true); + Assert.DoesNotThrowAsync(async () => await writeTask1); + + var readTask = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + } + + [Test] + public void DuplexStreaming_WriteAfterReceivingStatusFails() + { + asyncCall.StartDuplexStreamingCall(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + + var readTask = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1")); + } + + [Test] + public void DuplexStreaming_CompleteAfterReceivingStatusFails() + { + asyncCall.StartDuplexStreamingCall(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + + var readTask = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + + Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); + } + + [Test] + public void DuplexStreaming_WriteAfterCancellationRequestFails() + { + asyncCall.StartDuplexStreamingCall(); + var requestStream = new ClientRequestStream<string, string>(asyncCall); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + + asyncCall.Cancel(); + Assert.IsTrue(fakeCall.IsCancelled); + Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + + var readTask = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + + AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled); + } + + [Test] + public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed() + { + asyncCall.StartDuplexStreamingCall(); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + + asyncCall.Cancel(); + Assert.IsTrue(fakeCall.IsCancelled); + + var readTask1 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask1.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask2 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + + AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); + } + + [Test] + public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed() + { + asyncCall.StartDuplexStreamingCall(); + var responseStream = new ClientResponseStream<string, string>(asyncCall); + + var readTask1 = responseStream.MoveNext(); // initiate the read before cancel request + asyncCall.Cancel(); + Assert.IsTrue(fakeCall.IsCancelled); + + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask1.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask2 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + + AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); + } + ClientSideStatus CreateClientSideStatus(StatusCode statusCode) { return new ClientSideStatus(new Status(statusCode, ""), new Metadata()); @@ -163,6 +452,16 @@ namespace Grpc.Core.Internal.Tests Assert.AreEqual("response1", resultTask.Result); } + static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask) + { + Assert.IsTrue(moveNextTask.IsCompleted); + Assert.IsTrue(fakeCall.IsDisposed); + + Assert.IsFalse(moveNextTask.Result); + Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); + Assert.AreEqual(0, asyncCall.GetTrailers().Count); + } + static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode) { Assert.IsTrue(resultTask.IsCompleted); @@ -175,135 +474,15 @@ namespace Grpc.Core.Internal.Tests Assert.AreEqual(0, asyncCall.GetTrailers().Count); } - internal class FakeNativeCall : INativeCall - { - public UnaryResponseClientHandler UnaryResponseClientHandler - { - get; - set; - } - - public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler - { - get; - set; - } - - public ReceivedMessageHandler ReceivedMessageHandler - { - get; - set; - } - - public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler - { - get; - set; - } - - public SendCompletionHandler SendCompletionHandler - { - get; - set; - } - - public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler - { - get; - set; - } - - public bool IsCancelled - { - get; - set; - } - - public bool IsDisposed - { - get; - set; - } - - public void Cancel() - { - IsCancelled = true; - } - - public void CancelWithStatus(Status status) - { - IsCancelled = true; - } - - public string GetPeer() - { - return "PEER"; - } - - public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - UnaryResponseClientHandler = callback; - } - - public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - throw new NotImplementedException(); - } - - public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) - { - UnaryResponseClientHandler = callback; - } - - public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - ReceivedStatusOnClientHandler = callback; - } - - public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) - { - ReceivedStatusOnClientHandler = callback; - } - - public void StartReceiveMessage(ReceivedMessageHandler callback) - { - ReceivedMessageHandler = callback; - } - - public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) - { - ReceivedResponseHeadersHandler = callback; - } - - public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) - { - SendCompletionHandler = callback; - } - - public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) - { - SendCompletionHandler = callback; - } - - public void StartSendCloseFromClient(SendCompletionHandler callback) - { - SendCompletionHandler = callback; - } - - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) - { - SendCompletionHandler = callback; - } - - public void StartServerSide(ReceivedCloseOnServerHandler callback) - { - ReceivedCloseOnServerHandler = callback; - } - - public void Dispose() - { - IsDisposed = true; - } + static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode) + { + Assert.IsTrue(moveNextTask.IsCompleted); + Assert.IsTrue(fakeCall.IsDisposed); + + var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask); + Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode); + Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode); + Assert.AreEqual(0, asyncCall.GetTrailers().Count); } } } diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs new file mode 100644 index 0000000000..1bec258ca2 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs @@ -0,0 +1,183 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + /// <summary> + /// For testing purposes. + /// </summary> + internal class FakeNativeCall : INativeCall + { + public UnaryResponseClientHandler UnaryResponseClientHandler + { + get; + set; + } + + public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler + { + get; + set; + } + + public ReceivedMessageHandler ReceivedMessageHandler + { + get; + set; + } + + public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler + { + get; + set; + } + + public SendCompletionHandler SendCompletionHandler + { + get; + set; + } + + public SendCompletionHandler SendStatusFromServerHandler + { + get; + set; + } + + public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler + { + get; + set; + } + + public bool IsCancelled + { + get; + set; + } + + public bool IsDisposed + { + get; + set; + } + + public void Cancel() + { + IsCancelled = true; + } + + public void CancelWithStatus(Status status) + { + IsCancelled = true; + } + + public string GetPeer() + { + return "PEER"; + } + + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + UnaryResponseClientHandler = callback; + } + + public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + throw new NotImplementedException(); + } + + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) + { + UnaryResponseClientHandler = callback; + } + + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartReceiveMessage(ReceivedMessageHandler callback) + { + ReceivedMessageHandler = callback; + } + + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + ReceivedResponseHeadersHandler = callback; + } + + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + { + SendCompletionHandler = callback; + } + + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + { + SendCompletionHandler = callback; + } + + public void StartSendCloseFromClient(SendCompletionHandler callback) + { + SendCompletionHandler = callback; + } + + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + { + SendStatusFromServerHandler = callback; + } + + public void StartServerSide(ReceivedCloseOnServerHandler callback) + { + ReceivedCloseOnServerHandler = callback; + } + + public void Dispose() + { + IsDisposed = true; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 50ba617cdb..f522174bd0 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -241,11 +241,10 @@ namespace Grpc.Core.Internal /// <summary> /// Receives a streaming response. Only one pending read action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate) + public Task<TResponse> ReadMessageAsync() { - StartReadMessageInternal(completionDelegate); + return ReadMessageInternalAsync(); } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index ccd047f469..42234dcac2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,7 +68,8 @@ namespace Grpc.Core.Internal protected bool cancelRequested; protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. - protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null. + protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource<object> sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -150,15 +151,25 @@ namespace Grpc.Core.Internal /// Initiates reading a message. Only one read operation can be active at a time. /// completionDelegate is invoked upon completion. /// </summary> - protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate) + protected Task<TRead> ReadMessageInternalAsync() { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckReadingAllowed(); + GrpcPreconditions.CheckState(started); + if (readingDone) + { + // the last read that returns null or throws an exception is idempotent + // and maintain its state. + GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); + return streamingReadTcs.Task; + } + + GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); + GrpcPreconditions.CheckState(!disposed); call.StartReceiveMessage(HandleReadFinished); - readCompletionDelegate = completionDelegate; + streamingReadTcs = new TaskCompletionSource<TRead>(); + return streamingReadTcs.Task; } } @@ -213,15 +224,6 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected virtual void CheckReadingAllowed() - { - GrpcPreconditions.CheckState(started); - GrpcPreconditions.CheckState(!disposed); - - GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed."); - GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); - } - protected void CheckNotCancelled() { if (cancelRequested) @@ -322,22 +324,18 @@ namespace Grpc.Core.Internal /// </summary> protected void HandleSendStatusFromServerFinished(bool success) { - AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; - ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server.")); + sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server.")); } else { - FireCompletion(origCompletionDelegate, null, null); + sendStatusFromServerTcs.SetResult(null); } } @@ -346,15 +344,17 @@ namespace Grpc.Core.Internal /// </summary> protected void HandleReadFinished(bool success, byte[] receivedMessage) { + // if success == false, received message will be null. It that case we will + // treat this completion as the last read an rely on C core to handle the failed + // read (e.g. deliver approriate statusCode on the clientside). + TRead msg = default(TRead); var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; - AsyncCompletionDelegate<TRead> origCompletionDelegate = null; + TaskCompletionSource<TRead> origTcs = null; lock (myLock) { - origCompletionDelegate = readCompletionDelegate; - readCompletionDelegate = null; - + origTcs = streamingReadTcs; if (receivedMessage == null) { // This was the last read. @@ -364,20 +364,25 @@ namespace Grpc.Core.Internal if (deserializeException != null && IsClient) { readingDone = true; + + // TODO(jtattermusch): it might be too late to set the status CancelWithStatus(DeserializeResponseFailureStatus); } + if (!readingDone) + { + streamingReadTcs = null; + } + ReleaseResourcesIfPossible(); } - // TODO: handle the case when success==false - if (deserializeException != null && !IsClient) { - FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException)); + origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException)); return; } - FireCompletion(origCompletionDelegate, msg, null); + origTcs.SetResult(msg); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index bea2b3660c..eafe2ccab8 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -65,6 +65,15 @@ namespace Grpc.Core.Internal } /// <summary> + /// Only for testing purposes. + /// </summary> + public void InitializeForTesting(INativeCall call) + { + server.AddCallReference(this); + InitializeInternal(call); + } + + /// <summary> /// Starts a server side call. /// </summary> public Task ServerSideCallAsync() @@ -91,11 +100,10 @@ namespace Grpc.Core.Internal /// <summary> /// Receives a streaming request. Only one pending read action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate) + public Task<TRequest> ReadMessageAsync() { - StartReadMessageInternal(completionDelegate); + return ReadMessageInternalAsync(); } /// <summary> @@ -128,24 +136,25 @@ namespace Grpc.Core.Internal } /// <summary> - /// Sends call result status, also indicating server is done with streaming responses. - /// Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. + /// Sends call result status, indicating we are done with writes. + /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// </summary> - public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate) + public Task SendStatusFromServerAsync(Status status, Metadata trailers) { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: false); + GrpcPreconditions.CheckState(started); + GrpcPreconditions.CheckState(!disposed); + GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once."); using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; - readingDone = true; - sendCompletionDelegate = completionDelegate; + initialMetadataSent = true; + sendStatusFromServerTcs = new TaskCompletionSource<object>(); + return sendStatusFromServerTcs.Task; } } @@ -174,12 +183,6 @@ namespace Grpc.Core.Internal get { return false; } } - protected override void CheckReadingAllowed() - { - base.CheckReadingAllowed(); - GrpcPreconditions.CheckArgument(!cancelRequested); - } - protected override void OnAfterReleaseResources() { server.RemoveCallReference(this); @@ -190,12 +193,21 @@ namespace Grpc.Core.Internal /// </summary> private void HandleFinishedServerside(bool success, bool cancelled) { + // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER, + // success will be always set to true. lock (myLock) { finished = true; + if (streamingReadTcs == null) + { + // if there's no pending read, readingDone=true will dispose now. + // if there is a pending read, we will dispose once that read finishes. + readingDone = true; + streamingReadTcs = new TaskCompletionSource<TRequest>(); + streamingReadTcs.SetResult(default(TRequest)); + } ReleaseResourcesIfPossible(); } - // TODO(jtattermusch): handle error if (cancelled) { diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index d6e34a0f04..ad9423ff58 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -68,9 +68,7 @@ namespace Grpc.Core.Internal { throw new InvalidOperationException("Cancellation of individual reads is not supported."); } - var taskSource = new AsyncCompletionTaskSource<TResponse>(); - call.StartReadMessage(taskSource.CompletionDelegate); - var result = await taskSource.Task.ConfigureAwait(false); + var result = await call.ReadMessageAsync().ConfigureAwait(false); this.current = result; if (result == null) diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 1f83e51548..00d82d51e8 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -80,8 +80,6 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); var result = await handler(request, context).ConfigureAwait(false); status = context.Status; await responseStream.WriteAsync(result).ConfigureAwait(false); @@ -93,7 +91,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -136,8 +134,6 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); await handler(request, responseStream, context).ConfigureAwait(false); status = context.Status; } @@ -149,7 +145,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -209,7 +205,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -260,7 +256,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -282,9 +278,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - - await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } @@ -300,7 +294,6 @@ namespace Grpc.Core.Internal return rpcException.Status; } - // TODO(jtattermusch): what is the right status code here? return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index e7be82c318..d76030d1ad 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -68,9 +68,7 @@ namespace Grpc.Core.Internal { throw new InvalidOperationException("Cancellation of individual reads is not supported."); } - var taskSource = new AsyncCompletionTaskSource<TRequest>(); - call.StartReadMessage(taskSource.CompletionDelegate); - var result = await taskSource.Task.ConfigureAwait(false); + var result = await call.ReadMessageAsync().ConfigureAwait(false); this.current = result; return result != null; } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 03e39efc02..ecfee0bfdd 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -57,13 +57,6 @@ namespace Grpc.Core.Internal return taskSource.Task; } - public Task WriteStatusAsync(Status status, Metadata trailers) - { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); - return taskSource.Task; - } - public Task WriteResponseHeadersAsync(Metadata responseHeaders) { var taskSource = new AsyncCompletionTaskSource<object>(); diff --git a/src/csharp/tests.json b/src/csharp/tests.json index f733352a31..f6af3408d5 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -1,5 +1,6 @@ { "Grpc.Core.Tests": [ + "Grpc.Core.Internal.Tests.AsyncCallServerTest", "Grpc.Core.Internal.Tests.AsyncCallTest", "Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest", "Grpc.Core.Internal.Tests.CompletionQueueEventTest", diff --git a/src/proto/grpc/testing/echo.proto b/src/proto/grpc/testing/echo.proto index 0eef53a92a..c596aabfcc 100644 --- a/src/proto/grpc/testing/echo.proto +++ b/src/proto/grpc/testing/echo.proto @@ -45,3 +45,7 @@ service EchoTestService { service UnimplementedService { rpc Unimplemented(EchoRequest) returns (EchoResponse); } + +// A service without any rpc defined to test coverage. +service NoRpcService { +} diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml index d13ce42655..34bb477543 100644 --- a/src/ruby/.rubocop.yml +++ b/src/ruby/.rubocop.yml @@ -11,10 +11,10 @@ AllCops: - 'pb/test/**/*' Metrics/CyclomaticComplexity: - Max: 8 + Max: 9 Metrics/PerceivedComplexity: - Max: 8 + Max: 9 Metrics/ClassLength: Max: 250 diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 7f3a38a9f4..a0f4071adc 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -332,15 +332,13 @@ module GRPC # the current thread to terminate it. def run_till_terminated GRPC.trap_signals - stopped = false t = Thread.new do run - stopped = true end + t.abort_on_exception = true wait_till_running - loop do + until running_state == :stopped sleep SIGNAL_CHECK_PERIOD - break if stopped break unless GRPC.handle_signals end stop @@ -416,7 +414,7 @@ module GRPC GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) - c.send_status(StatusCodes::RESOURCE_EXHAUSTED, '') + c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '') nil end @@ -427,7 +425,7 @@ module GRPC GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) - c.send_status(StatusCodes::UNIMPLEMENTED, '') + c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end @@ -443,7 +441,12 @@ module GRPC unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac - rpc_descs[mth].run_server_method(c, rpc_handlers[mth]) + begin + rpc_descs[mth].run_server_method(c, rpc_handlers[mth]) + rescue StandardError + c.send_status(GRPC::Core::StatusCodes::INTERNAL, + 'Server handler failed') + end end end rescue Core::CallError, RuntimeError => e |