diff options
author | ncteisen <ncteisen@gmail.com> | 2017-11-29 16:46:07 -0800 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2017-11-29 16:46:07 -0800 |
commit | 43db24953aa621dc0960b905cb186ed4eb3ccbe2 (patch) | |
tree | 3bae8f2355eb6427ee310feb03af38993494d3fc /src | |
parent | 14b27b847d8aecdc34c71d788670a4517313d651 (diff) | |
parent | 17a1431a37108bb55c24a2022c1db9ea9632dcc9 (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into no-more-extern-c
Diffstat (limited to 'src')
-rw-r--r-- | src/compiler/objective_c_generator.cc | 1 | ||||
-rw-r--r-- | src/compiler/objective_c_generator.h | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/fork_posix.cc | 88 | ||||
-rw-r--r-- | src/core/lib/iomgr/fork_windows.cc | 39 | ||||
-rw-r--r-- | src/core/lib/iomgr/port.h | 4 | ||||
-rw-r--r-- | src/core/lib/support/fork.cc | 62 | ||||
-rw-r--r-- | src/core/lib/support/fork.h | 35 | ||||
-rw-r--r-- | src/core/lib/support/thd_internal.h | 30 | ||||
-rw-r--r-- | src/core/lib/support/thd_posix.cc | 56 | ||||
-rw-r--r-- | src/core/lib/support/thd_windows.cc | 2 | ||||
-rw-r--r-- | src/core/lib/surface/init.cc | 6 | ||||
-rw-r--r-- | src/csharp/Grpc.Core.Tests/CallCancellationTest.cs | 182 | ||||
-rw-r--r-- | src/csharp/Grpc.Core.Tests/ClientServerTest.cs | 68 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/IAsyncStreamReader.cs | 7 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ClientResponseStream.cs | 20 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerRequestStream.cs | 11 | ||||
-rw-r--r-- | src/csharp/tests.json | 1 | ||||
-rw-r--r-- | src/node/health_check/package.json | 29 | ||||
-rw-r--r-- | src/node/tools/package.json | 41 | ||||
-rw-r--r-- | src/python/grpcio/grpc_core_dependencies.py | 3 |
20 files changed, 603 insertions, 83 deletions
diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index 8fe30e97e9..ab7d869758 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -28,6 +28,7 @@ using ::google::protobuf::compiler::objectivec::ClassName; using ::grpc::protobuf::FileDescriptor; +using ::grpc::protobuf::FileDescriptor; using ::grpc::protobuf::MethodDescriptor; using ::grpc::protobuf::ServiceDescriptor; using ::grpc::protobuf::io::Printer; diff --git a/src/compiler/objective_c_generator.h b/src/compiler/objective_c_generator.h index 2337abaf6a..d3aed76c4f 100644 --- a/src/compiler/objective_c_generator.h +++ b/src/compiler/objective_c_generator.h @@ -24,6 +24,7 @@ namespace grpc_objective_c_generator { using ::grpc::protobuf::FileDescriptor; +using ::grpc::protobuf::FileDescriptor; using ::grpc::protobuf::ServiceDescriptor; using ::grpc::string; diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc new file mode 100644 index 0000000000..a55b3a349a --- /dev/null +++ b/src/core/lib/iomgr/fork_posix.cc @@ -0,0 +1,88 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_FORK + +#include <string.h> + +#include <grpc/fork.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/timer_manager.h" +#include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/support/env.h" +#include "src/core/lib/support/fork.h" +#include "src/core/lib/support/thd_internal.h" +#include "src/core/lib/surface/init.h" + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +void grpc_prefork() { + if (!grpc_fork_support_enabled()) { + gpr_log(GPR_ERROR, + "Fork support not enabled; try running with the " + "environment variable GRPC_ENABLE_FORK_SUPPORT=1"); + return; + } + if (grpc_is_initialized()) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_timer_manager_set_threading(false); + grpc_executor_set_threading(&exec_ctx, false); + grpc_exec_ctx_finish(&exec_ctx); + if (!gpr_await_threads( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(3, GPR_TIMESPAN)))) { + gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!"); + } + } +} + +void grpc_postfork_parent() { + if (grpc_is_initialized()) { + grpc_timer_manager_set_threading(true); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, true); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +void grpc_postfork_child() { + if (grpc_is_initialized()) { + grpc_timer_manager_set_threading(true); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_executor_set_threading(&exec_ctx, true); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +void grpc_fork_handlers_auto_register() { + if (grpc_fork_support_enabled()) { + pthread_atfork(grpc_prefork, grpc_postfork_parent, grpc_postfork_child); + } +} + +#endif // GRPC_POSIX_FORK diff --git a/src/core/lib/iomgr/fork_windows.cc b/src/core/lib/iomgr/fork_windows.cc new file mode 100644 index 0000000000..f9986f33c7 --- /dev/null +++ b/src/core/lib/iomgr/fork_windows.cc @@ -0,0 +1,39 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifndef GRPC_POSIX_FORK + +#include <grpc/fork.h> +#include <grpc/support/log.h> + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +void grpc_prefork() { gpr_log(GPR_ERROR, "Forking not supported on Windows"); } + +void grpc_postfork_parent() {} + +void grpc_postfork_child() {} + +void grpc_fork_handlers_auto_register() {} + +#endif // GRPC_POSIX_FORK diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 1cc6d98491..9fae8c0052 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -30,6 +30,7 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -59,6 +60,7 @@ #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_HOST_NAME_MAX 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -90,6 +92,7 @@ #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_MSG_IOVLEN_TYPE int +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 @@ -103,6 +106,7 @@ #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GRPC_POSIX_SOCKET 1 #define GRPC_POSIX_SOCKETADDR 1 diff --git a/src/core/lib/support/fork.cc b/src/core/lib/support/fork.cc new file mode 100644 index 0000000000..d59ca5584c --- /dev/null +++ b/src/core/lib/support/fork.cc @@ -0,0 +1,62 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/fork.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/support/env.h" + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +static int override_fork_support_enabled = -1; +static int fork_support_enabled; + +void grpc_fork_support_init() { +#ifdef GRPC_ENABLE_FORK_SUPPORT + fork_support_enabled = 1; +#else + fork_support_enabled = 0; + char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); + if (env != NULL) { + static const char* truthy[] = {"yes", "Yes", "YES", "true", + "True", "TRUE", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == strcmp(env, truthy[i])) { + fork_support_enabled = 1; + } + } + gpr_free(env); + } +#endif + if (override_fork_support_enabled != -1) { + fork_support_enabled = override_fork_support_enabled; + } +} + +int grpc_fork_support_enabled() { return fork_support_enabled; } + +void grpc_enable_fork_support(int enable) { + override_fork_support_enabled = enable; +} diff --git a/src/core/lib/support/fork.h b/src/core/lib/support/fork.h new file mode 100644 index 0000000000..215d4214a6 --- /dev/null +++ b/src/core/lib/support/fork.h @@ -0,0 +1,35 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_FORK_H +#define GRPC_CORE_LIB_SUPPORT_FORK_H + +/* + * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK + * AROUND VERY SPECIFIC USE CASES. + */ + +void grpc_fork_support_init(void); + +int grpc_fork_support_enabled(void); + +// Test only: Must be called before grpc_init(), and overrides +// environment variables/compile flags +void grpc_enable_fork_support(int enable); + +#endif /* GRPC_CORE_LIB_SUPPORT_FORK_H */ diff --git a/src/core/lib/support/thd_internal.h b/src/core/lib/support/thd_internal.h new file mode 100644 index 0000000000..38bffc847d --- /dev/null +++ b/src/core/lib/support/thd_internal.h @@ -0,0 +1,30 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H +#define GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H + +#include <grpc/support/time.h> + +/* Internal interfaces between modules within the gpr support library. */ +void gpr_thd_init(); + +/* Wait for all outstanding threads to finish, up to deadline */ +int gpr_await_threads(gpr_timespec deadline); + +#endif /* GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H */ diff --git a/src/core/lib/support/thd_posix.cc b/src/core/lib/support/thd_posix.cc index 02e3846be1..c2a4f4198f 100644 --- a/src/core/lib/support/thd_posix.cc +++ b/src/core/lib/support/thd_posix.cc @@ -24,22 +24,34 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include <grpc/support/thd.h> #include <grpc/support/useful.h> #include <pthread.h> #include <stdlib.h> #include <string.h> +#include "src/core/lib/support/fork.h" + +static gpr_mu g_mu; +static gpr_cv g_cv; +static int g_thread_count; +static int g_awaiting_threads; + struct thd_arg { void (*body)(void* arg); /* body of a thread */ void* arg; /* argument to a thread */ }; +static void inc_thd_count(); +static void dec_thd_count(); + /* Body of every thread started via gpr_thd_new. */ static void* thread_body(void* v) { struct thd_arg a = *(struct thd_arg*)v; free(v); (*a.body)(a.arg); + dec_thd_count(); return nullptr; } @@ -54,6 +66,7 @@ int gpr_thd_new(gpr_thd_id* t, void (*thd_body)(void* arg), void* arg, GPR_ASSERT(a != nullptr); a->body = thd_body; a->arg = arg; + inc_thd_count(); GPR_ASSERT(pthread_attr_init(&attr) == 0); if (gpr_thd_options_is_detached(options)) { @@ -68,6 +81,7 @@ int gpr_thd_new(gpr_thd_id* t, void (*thd_body)(void* arg), void* arg, if (!thread_started) { /* don't use gpr_free, as this was allocated using malloc (see above) */ free(a); + dec_thd_count(); } *t = (gpr_thd_id)p; return thread_started; @@ -77,4 +91,46 @@ gpr_thd_id gpr_thd_currentid(void) { return (gpr_thd_id)pthread_self(); } void gpr_thd_join(gpr_thd_id t) { pthread_join((pthread_t)t, nullptr); } +/***************************************** + * Only used when fork support is enabled + */ + +static void inc_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count++; + gpr_mu_unlock(&g_mu); + } +} + +static void dec_thd_count() { + if (grpc_fork_support_enabled()) { + gpr_mu_lock(&g_mu); + g_thread_count--; + if (g_awaiting_threads && g_thread_count == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); + } +} + +void gpr_thd_init() { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); + g_thread_count = 0; + g_awaiting_threads = 0; +} + +int gpr_await_threads(gpr_timespec deadline) { + gpr_mu_lock(&g_mu); + g_awaiting_threads = 1; + int res = 0; + if (g_thread_count > 0) { + res = gpr_cv_wait(&g_cv, &g_mu, deadline); + } + g_awaiting_threads = 0; + gpr_mu_unlock(&g_mu); + return res == 0; +} + #endif /* GPR_POSIX_SYNC */ diff --git a/src/core/lib/support/thd_windows.cc b/src/core/lib/support/thd_windows.cc index 5bda7f440c..0875c2f03e 100644 --- a/src/core/lib/support/thd_windows.cc +++ b/src/core/lib/support/thd_windows.cc @@ -50,6 +50,8 @@ static void destroy_thread(struct thd_info* t) { gpr_free(t); } +void gpr_thd_init(void) {} + /* Body of every thread started via gpr_thd_new. */ static DWORD WINAPI thread_body(void* v) { g_thd_info = (struct thd_info*)v; diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index c6d2f0a192..8ee1383fb8 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -21,6 +21,7 @@ #include <limits.h> #include <memory.h> +#include <grpc/fork.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -39,6 +40,8 @@ #include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/fork.h" +#include "src/core/lib/support/thd_internal.h" #include "src/core/lib/surface/alarm_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -62,10 +65,12 @@ static int g_initializations; static void do_basic_init(void) { gpr_log_verbosity_init(); + grpc_fork_support_init(); gpr_mu_init(&g_init_mu); grpc_register_built_in_plugins(); grpc_cq_global_init(); g_initializations = 0; + grpc_fork_handlers_auto_register(); } static bool append_filter(grpc_exec_ctx* exec_ctx, @@ -122,6 +127,7 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { gpr_time_init(); + gpr_thd_init(); grpc_stats_init(); grpc_slice_intern_init(); grpc_mdctx_global_init(); diff --git a/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs b/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs new file mode 100644 index 0000000000..e040f52380 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs @@ -0,0 +1,182 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Profiling; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class CallCancellationTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public async Task ClientStreamingCall_CancelAfterBegin() + { + var barrier = new TaskCompletionSource<object>(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + barrier.SetResult(null); + await requestStream.ToListAsync(); + return ""; + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + + await barrier.Task; // make sure the handler has started. + cts.Cancel(); + + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + + [Test] + public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() + { + var handlerStartedBarrier = new TaskCompletionSource<object>(); + var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>(); + var successTcs = new TaskCompletionSource<string>(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + handlerStartedBarrier.SetResult(null); + + // wait for cancellation to be delivered. + context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); + await cancelNotificationReceivedBarrier.Task; + + var moveNextResult = await requestStream.MoveNext(); + successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); + return ""; + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + + await handlerStartedBarrier.Task; + cts.Cancel(); + + try + { + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + Assert.AreEqual("SUCCESS", await successTcs.Task); + } + + [Test] + public async Task ClientStreamingCall_CancelServerSideRead() + { + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + var cts = new CancellationTokenSource(); + var moveNextTask = requestStream.MoveNext(cts.Token); + cts.Cancel(); + await moveNextTask; + return ""; + }); + + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + + [Test] + public async Task ServerStreamingCall_CancelClientSideRead() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + await responseStream.WriteAsync("abc"); + while (!context.CancellationToken.IsCancellationRequested) + { + await Task.Delay(10); + } + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + await call.ResponseStream.MoveNext(); + Assert.AreEqual("abc", call.ResponseStream.Current); + + var cts = new CancellationTokenSource(); + var moveNextTask = call.ResponseStream.MoveNext(cts.Token); + cts.Cancel(); + + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await moveNextTask; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 72d9035a6f..90dd365b07 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -273,74 +273,6 @@ namespace Grpc.Core.Tests } [Test] - public async Task ClientStreamingCall_CancelAfterBegin() - { - var barrier = new TaskCompletionSource<object>(); - - helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => - { - barrier.SetResult(null); - await requestStream.ToListAsync(); - return ""; - }); - - var cts = new CancellationTokenSource(); - var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); - - await barrier.Task; // make sure the handler has started. - cts.Cancel(); - - try - { - // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. - await call.ResponseAsync; - Assert.Fail(); - } - catch (RpcException ex) - { - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); - } - } - - [Test] - public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() - { - var handlerStartedBarrier = new TaskCompletionSource<object>(); - var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>(); - var successTcs = new TaskCompletionSource<string>(); - - helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => - { - handlerStartedBarrier.SetResult(null); - - // wait for cancellation to be delivered. - context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); - await cancelNotificationReceivedBarrier.Task; - - var moveNextResult = await requestStream.MoveNext(); - successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); - return ""; - }); - - var cts = new CancellationTokenSource(); - var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); - - await handlerStartedBarrier.Task; - cts.Cancel(); - - try - { - await call.ResponseAsync; - Assert.Fail(); - } - catch (RpcException ex) - { - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); - } - Assert.AreEqual("SUCCESS", await successTcs.Task); - } - - [Test] public async Task AsyncUnaryCall_EchoMetadata() { helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 42bfbb87e0..3751d549e3 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -41,6 +41,13 @@ namespace Grpc.Core /// (<c>MoveNext</c> will return <c>false</c>) and the <c>CancellationToken</c> /// associated with the call will be cancelled to signal the failure. /// </para> + /// <para> + /// <c>MoveNext()</c> operations can be cancelled via a cancellation token. Cancelling + /// an individual read operation has the same effect as cancelling the entire call + /// (which will also result in the read operation returning prematurely), but the per-read cancellation + /// tokens passed to MoveNext() only result in cancelling the call if the read operation haven't finished + /// yet. + /// </para> /// </summary> /// <typeparam name="T">The message type.</typeparam> public interface IAsyncStreamReader<T> : IAsyncEnumerator<T> diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index 851b6ca213..ab649ee766 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -49,19 +49,19 @@ namespace Grpc.Core.Internal public async Task<bool> MoveNext(CancellationToken token) { - if (token != CancellationToken.None) + var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null; + using (cancellationTokenRegistration) { - throw new InvalidOperationException("Cancellation of individual reads is not supported."); - } - var result = await call.ReadMessageAsync().ConfigureAwait(false); - this.current = result; + var result = await call.ReadMessageAsync().ConfigureAwait(false); + this.current = result; - if (result == null) - { - await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); - return false; + if (result == null) + { + await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); + return false; + } + return true; } - return true; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index c65b960afb..058dddb7eb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -49,13 +49,14 @@ namespace Grpc.Core.Internal public async Task<bool> MoveNext(CancellationToken token) { - if (token != CancellationToken.None) + + var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null; + using (cancellationTokenRegistration) { - throw new InvalidOperationException("Cancellation of individual reads is not supported."); + var result = await call.ReadMessageAsync().ConfigureAwait(false); + this.current = result; + return result != null; } - var result = await call.ReadMessageAsync().ConfigureAwait(false); - this.current = result; - return result != null; } public void Dispose() diff --git a/src/csharp/tests.json b/src/csharp/tests.json index 7841051052..65a0ed293f 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -10,6 +10,7 @@ "Grpc.Core.Tests.AppDomainUnloadTest", "Grpc.Core.Tests.AuthContextTest", "Grpc.Core.Tests.AuthPropertyTest", + "Grpc.Core.Tests.CallCancellationTest", "Grpc.Core.Tests.CallCredentialsTest", "Grpc.Core.Tests.CallOptionsTest", "Grpc.Core.Tests.ChannelCredentialsTest", diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json new file mode 100644 index 0000000000..fca3a2a7a6 --- /dev/null +++ b/src/node/health_check/package.json @@ -0,0 +1,29 @@ +{ + "name": "grpc-health-check", + "version": "1.7.2", + "author": "Google Inc.", + "description": "Health check service for use with gRPC", + "repository": { + "type": "git", + "url": "https://github.com/grpc/grpc.git" + }, + "bugs": "https://github.com/grpc/grpc/issues", + "contributors": [ + { + "name": "Michael Lumish", + "email": "mlumish@google.com" + } + ], + "dependencies": { + "grpc": "^1.7.2", + "lodash": "^3.9.3", + "google-protobuf": "^3.0.0" + }, + "files": [ + "LICENSE", + "health.js", + "v1" + ], + "main": "src/node/index.js", + "license": "Apache-2.0" +} diff --git a/src/node/tools/package.json b/src/node/tools/package.json new file mode 100644 index 0000000000..99fd854067 --- /dev/null +++ b/src/node/tools/package.json @@ -0,0 +1,41 @@ +{ + "name": "grpc-tools", + "version": "1.7.2", + "author": "Google Inc.", + "description": "Tools for developing with gRPC on Node.js", + "homepage": "https://grpc.io/", + "repository": { + "type": "git", + "url": "https://github.com/grpc/grpc.git" + }, + "bugs": "https://github.com/grpc/grpc/issues", + "contributors": [ + { + "name": "Michael Lumish", + "email": "mlumish@google.com" + } + ], + "bin": { + "grpc_tools_node_protoc": "./bin/protoc.js", + "grpc_tools_node_protoc_plugin": "./bin/protoc_plugin.js" + }, + "scripts": { + "install": "./node_modules/.bin/node-pre-gyp install" + }, + "bundledDependencies": ["node-pre-gyp"], + "binary": { + "module_name": "grpc_tools", + "host": "https://storage.googleapis.com/", + "remote_path": "grpc-precompiled-binaries/node/{name}/v{version}", + "package_name": "{platform}-{arch}.tar.gz", + "module_path": "bin" + }, + "files": [ + "index.js", + "bin/protoc.js", + "bin/protoc_plugin.js", + "bin/google/protobuf", + "LICENSE" + ], + "main": "index.js" +} diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index efb4d8617d..56d6ebd842 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -29,6 +29,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/env_linux.cc', 'src/core/lib/support/env_posix.cc', 'src/core/lib/support/env_windows.cc', + 'src/core/lib/support/fork.cc', 'src/core/lib/support/histogram.cc', 'src/core/lib/support/host_port.cc', 'src/core/lib/support/log.cc', @@ -93,6 +94,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/ev_windows.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', + 'src/core/lib/iomgr/fork_posix.cc', + 'src/core/lib/iomgr/fork_windows.cc', 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', |