aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2017-05-25 15:13:27 -0700
committerGravatar murgatroid99 <mlumish@google.com>2017-05-25 15:13:27 -0700
commitc18b9069cd37083f34e8e3c1601eb20944914ee5 (patch)
tree43943d620ecb7a984694766a273823fd34eebad5 /src
parent0a94f3c8ab55dfd12c14058d57f33121c8d6c411 (diff)
parent3410c8d7dc86fe7778d6795340d9792acb1b912b (diff)
Merge remote-tracking branch 'upstream/v1.3.x' into master_1.3.x_upmerge
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c77
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c115
-rw-r--r--src/core/ext/filters/client_channel/client_channel.h6
-rwxr-xr-xsrc/csharp/Grpc.Auth/Grpc.Auth.csproj6
-rwxr-xr-xsrc/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj8
-rw-r--r--src/csharp/Grpc.Core/Channel.cs41
-rwxr-xr-xsrc/csharp/Grpc.Core/Grpc.Core.csproj4
-rwxr-xr-xsrc/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj6
-rwxr-xr-xsrc/csharp/Grpc.Reflection/Grpc.Reflection.csproj6
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.bat10
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.sh10
-rw-r--r--src/node/ext/call.cc37
-rw-r--r--src/node/ext/call.h1
-rw-r--r--src/node/src/client.js12
-rw-r--r--src/node/test/common_test.js1
-rw-r--r--src/node/test/surface_test.js25
-rw-r--r--src/python/grpcio/grpc/_channel.py7
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
-rw-r--r--src/python/grpcio_tests/tests/unit/_reconnect_test.py70
-rwxr-xr-xsrc/ruby/end2end/channel_closing_driver.rb5
-rwxr-xr-xsrc/ruby/end2end/channel_state_driver.rb3
-rwxr-xr-xsrc/ruby/end2end/grpc_class_init_client.rb96
-rwxr-xr-xsrc/ruby/end2end/grpc_class_init_driver.rb53
-rwxr-xr-xsrc/ruby/end2end/multiple_killed_watching_threads_driver.rb63
-rwxr-xr-xsrc/ruby/end2end/sig_int_during_channel_watch_client.rb2
-rwxr-xr-xsrc/ruby/end2end/sig_int_during_channel_watch_driver.rb5
-rw-r--r--src/ruby/ext/grpc/rb_channel.c474
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.c12
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c20
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h3
-rw-r--r--src/ruby/spec/channel_connection_spec.rb34
32 files changed, 958 insertions, 257 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index f83670db82..04666edbec 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -67,9 +67,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
typedef enum {
WAITING,
- CALLING_BACK,
+ READY_TO_CALL_BACK,
CALLING_BACK_AND_FINISHED,
- CALLED_BACK
} callback_phase;
typedef struct {
@@ -77,11 +76,13 @@ typedef struct {
callback_phase phase;
grpc_closure on_complete;
grpc_closure on_timeout;
+ grpc_closure watcher_timer_init;
grpc_timer alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
grpc_cq_completion completion_storage;
grpc_channel *channel;
+ grpc_error *error;
void *tag;
} state_watcher;
@@ -105,11 +106,8 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
- case CALLED_BACK:
+ case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return );
- case CALLING_BACK:
- w->phase = CALLED_BACK;
- break;
case CALLING_BACK_AND_FINISHED:
delete = 1;
break;
@@ -123,10 +121,14 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
bool due_to_completion, grpc_error *error) {
- int delete = 0;
-
if (due_to_completion) {
grpc_timer_cancel(exec_ctx, &w->alarm);
+ } else {
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ grpc_client_channel_watch_connectivity_state(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq), NULL,
+ &w->on_complete, NULL);
}
gpr_mu_lock(&w->mu);
@@ -147,25 +149,27 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
}
switch (w->phase) {
case WAITING:
- w->phase = CALLING_BACK;
- grpc_cq_end_op(exec_ctx, w->cq, w->tag, GRPC_ERROR_REF(error),
- finished_completion, w, &w->completion_storage);
+ GRPC_ERROR_REF(error);
+ w->error = error;
+ w->phase = READY_TO_CALL_BACK;
break;
- case CALLING_BACK:
+ case READY_TO_CALL_BACK:
+ if (error != GRPC_ERROR_NONE) {
+ GPR_ASSERT(!due_to_completion);
+ GRPC_ERROR_UNREF(w->error);
+ GRPC_ERROR_REF(error);
+ w->error = error;
+ }
w->phase = CALLING_BACK_AND_FINISHED;
+ grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w,
+ &w->completion_storage);
break;
case CALLING_BACK_AND_FINISHED:
GPR_UNREACHABLE_CODE(return );
- case CALLED_BACK:
- delete = 1;
break;
}
gpr_mu_unlock(&w->mu);
- if (delete) {
- delete_state_watcher(exec_ctx, w);
- }
-
GRPC_ERROR_UNREF(error);
}
@@ -179,6 +183,28 @@ static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error));
}
+int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) {
+ grpc_channel_element *client_channel_elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ return grpc_client_channel_num_external_connectivity_watchers(
+ client_channel_elem);
+}
+
+typedef struct watcher_timer_init_arg {
+ state_watcher *w;
+ gpr_timespec deadline;
+} watcher_timer_init_arg;
+
+static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_ignored) {
+ watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg;
+
+ grpc_timer_init(exec_ctx, &wa->w->alarm,
+ gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC),
+ &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_free(wa);
+}
+
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
@@ -208,16 +234,19 @@ void grpc_channel_watch_connectivity_state(
w->cq = cq;
w->tag = tag;
w->channel = channel;
+ w->error = NULL;
- grpc_timer_init(&exec_ctx, &w->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- &w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC));
+ watcher_timer_init_arg *wa = gpr_malloc(sizeof(watcher_timer_init_arg));
+ wa->w = w;
+ wa->deadline = deadline;
+ grpc_closure_init(&w->watcher_timer_init, watcher_timer_init, wa,
+ grpc_schedule_on_exec_ctx);
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
- grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
- grpc_cq_pollset(cq), &w->state,
- &w->on_complete);
+ grpc_client_channel_watch_connectivity_state(
+ &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state,
+ &w->on_complete, &w->watcher_timer_init);
} else {
abort();
}
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index f2f27b9175..8cebbe9eca 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -167,6 +167,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
return value;
}
+struct external_connectivity_watcher;
+
/*************************************************************************
* CHANNEL-WIDE FUNCTIONS
*/
@@ -204,6 +206,11 @@ typedef struct client_channel_channel_data {
/** interested parties (owned) */
grpc_pollset_set *interested_parties;
+ /* external_connectivity_watcher_list head is guarded by its own mutex, since
+ * counts need to be grabbed immediately without polling on a cq */
+ gpr_mu external_connectivity_watcher_list_mu;
+ struct external_connectivity_watcher *external_connectivity_watcher_list_head;
+
/* the following properties are guarded by a mutex since API's require them
to be instantaneously available */
gpr_mu info_mu;
@@ -661,6 +668,12 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
// Initialize data members.
chand->combiner = grpc_combiner_create(NULL);
gpr_mu_init(&chand->info_mu);
+ gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
+
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ chand->external_connectivity_watcher_list_head = NULL;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+
chand->owning_stack = args->channel_stack;
grpc_closure_init(&chand->on_resolver_result_changed,
on_resolver_result_changed_locked, chand,
@@ -749,6 +762,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
gpr_mu_destroy(&chand->info_mu);
+ gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
}
/*************************************************************************
@@ -1431,14 +1445,79 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
return out;
}
-typedef struct {
+typedef struct external_connectivity_watcher {
channel_data *chand;
grpc_pollset *pollset;
grpc_closure *on_complete;
+ grpc_closure *watcher_timer_init;
grpc_connectivity_state *state;
grpc_closure my_closure;
+ struct external_connectivity_watcher *next;
} external_connectivity_watcher;
+static external_connectivity_watcher *lookup_external_connectivity_watcher(
+ channel_data *chand, grpc_closure *on_complete) {
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL && w->on_complete != on_complete) {
+ w = w->next;
+ }
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return w;
+}
+
+static void external_connectivity_watcher_list_append(
+ channel_data *chand, external_connectivity_watcher *w) {
+ GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
+
+ gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
+ GPR_ASSERT(!w->next);
+ w->next = chand->external_connectivity_watcher_list_head;
+ chand->external_connectivity_watcher_list_head = w;
+ gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
+}
+
+static void external_connectivity_watcher_list_remove(
+ channel_data *chand, external_connectivity_watcher *too_remove) {
+ GPR_ASSERT(
+ lookup_external_connectivity_watcher(chand, too_remove->on_complete));
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ if (too_remove == chand->external_connectivity_watcher_list_head) {
+ chand->external_connectivity_watcher_list_head = too_remove->next;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return;
+ }
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL) {
+ if (w->next == too_remove) {
+ w->next = w->next->next;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return;
+ }
+ w = w->next;
+ }
+ GPR_UNREACHABLE_CODE(return );
+}
+
+int grpc_client_channel_num_external_connectivity_watchers(
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ int count = 0;
+
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL) {
+ count++;
+ w = w->next;
+ }
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+
+ return count;
+}
+
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
external_connectivity_watcher *w = arg;
@@ -1447,6 +1526,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
w->pollset);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
+ external_connectivity_watcher_list_remove(w->chand, w);
gpr_free(w);
grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
}
@@ -1454,21 +1534,42 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
external_connectivity_watcher *w = arg;
- grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
- grpc_schedule_on_exec_ctx);
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ external_connectivity_watcher *found = NULL;
+ if (w->state != NULL) {
+ external_connectivity_watcher_list_append(w->chand, w);
+ grpc_closure_run(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
+ grpc_schedule_on_exec_ctx);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ } else {
+ GPR_ASSERT(w->watcher_timer_init == NULL);
+ found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
+ if (found) {
+ GPR_ASSERT(found->on_complete == w->on_complete);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
+ }
+ grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
+ w->pollset);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_free(w);
+ }
}
void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *closure) {
+ grpc_connectivity_state *state, grpc_closure *closure,
+ grpc_closure *watcher_timer_init) {
channel_data *chand = elem->channel_data;
- external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
w->chand = chand;
w->pollset = pollset;
w->on_complete = closure;
w->state = state;
+ w->watcher_timer_init = watcher_timer_init;
+
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h
index 8d2490ea55..356a7ab0c1 100644
--- a/src/core/ext/filters/client_channel/client_channel.h
+++ b/src/core/ext/filters/client_channel/client_channel.h
@@ -53,9 +53,13 @@ extern const grpc_channel_filter grpc_client_channel_filter;
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
+int grpc_client_channel_num_external_connectivity_watchers(
+ grpc_channel_element *elem);
+
void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *on_complete);
+ grpc_connectivity_state *state, grpc_closure *on_complete,
+ grpc_closure *watcher_timer_init);
/* Debug helper: pull the subchannel call from a call stack element */
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
index 6ac25aa1f0..188ddb95b9 100755
--- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj
+++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
@@ -16,6 +16,8 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
</PropertyGroup>
<ItemGroup>
@@ -23,7 +25,9 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj">
+ <PrivateAssets>None</PrivateAssets>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj b/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj
index f4dd5105fc..45ec874322 100755
--- a/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj
+++ b/src/csharp/Grpc.Core.Testing/Grpc.Core.Testing.csproj
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\Grpc.Core\Version.csproj.include" />
<Import Project="..\Grpc.Core\Common.csproj.include" />
@@ -16,6 +16,8 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
</PropertyGroup>
<ItemGroup>
@@ -23,7 +25,9 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj">
+ <PrivateAssets>None</PrivateAssets>
+ </ProjectReference>
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 4f29c35b32..51ae11fbde 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -59,6 +59,8 @@ namespace Grpc.Core
readonly ChannelSafeHandle handle;
readonly Dictionary<string, ChannelOption> options;
+ readonly Task connectivityWatcherTask;
+
bool shutdownRequested;
/// <summary>
@@ -99,6 +101,9 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
}
}
+ // TODO(jtattermusch): Workaround for https://github.com/GoogleCloudPlatform/google-cloud-dotnet/issues/822.
+ // Remove once retries are supported in C core
+ this.connectivityWatcherTask = RunConnectivityWatcherAsync();
GrpcEnvironment.RegisterChannel(this);
}
@@ -244,7 +249,7 @@ namespace Grpc.Core
handle.Dispose();
- await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
+ await Task.WhenAll(GrpcEnvironment.ReleaseAsync(), connectivityWatcherTask).ConfigureAwait(false);
}
internal ChannelSafeHandle Handle
@@ -299,6 +304,40 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Constantly Watches channel connectivity status to work around https://github.com/GoogleCloudPlatform/google-cloud-dotnet/issues/822
+ /// </summary>
+ private async Task RunConnectivityWatcherAsync()
+ {
+ try
+ {
+ var lastState = State;
+ while (lastState != ChannelState.Shutdown)
+ {
+ lock (myLock)
+ {
+ if (shutdownRequested)
+ {
+ break;
+ }
+ }
+
+ try
+ {
+ await WaitForStateChangedAsync(lastState, DateTime.UtcNow.AddSeconds(1)).ConfigureAwait(false);
+ }
+ catch (TaskCanceledException)
+ {
+ // ignore timeout
+ }
+ lastState = State;
+ }
+ }
+ catch (ObjectDisposedException) {
+ // during shutdown, channel is going to be disposed.
+ }
+ }
+
private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
{
var key = ChannelOptions.PrimaryUserAgentString;
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index c0865001a8..ae0d8b2c8d 100755
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
<Import Project="Version.csproj.include" />
<Import Project="Common.csproj.include" />
@@ -15,6 +15,8 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
</PropertyGroup>
<ItemGroup>
diff --git a/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj b/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj
index eac6e1fc95..c3791a4e6b 100755
--- a/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj
+++ b/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj
@@ -15,6 +15,8 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
</PropertyGroup>
<ItemGroup>
@@ -22,7 +24,9 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj">
+ <PrivateAssets>None</PrivateAssets>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
diff --git a/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj b/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj
index 70bfcc89c5..3a07555248 100755
--- a/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj
+++ b/src/csharp/Grpc.Reflection/Grpc.Reflection.csproj
@@ -15,6 +15,8 @@
<PackageProjectUrl>https://github.com/grpc/grpc</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</PackageLicenseUrl>
<NetStandardImplicitPackageVersion Condition=" '$(TargetFramework)' == 'netstandard1.5' ">1.6.0</NetStandardImplicitPackageVersion>
+ <IncludeSymbols>true</IncludeSymbols>
+ <IncludeSource>true</IncludeSource>
</PropertyGroup>
<ItemGroup>
@@ -22,7 +24,9 @@
</ItemGroup>
<ItemGroup>
- <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
+ <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj">
+ <PrivateAssets>None</PrivateAssets>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat
index d823942be5..aa8a8d3b17 100755
--- a/src/csharp/build_packages_dotnetcli.bat
+++ b/src/csharp/build_packages_dotnetcli.bat
@@ -51,11 +51,11 @@ powershell -Command "cp -r ..\..\platform=*\artifacts\protoc_* protoc_plugins"
@rem To be able to build, we also need to put grpc_csharp_ext to its normal location
xcopy /Y /I nativelibs\csharp_ext_windows_x64\grpc_csharp_ext.dll ..\..\cmake\build\x64\Release\
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Core --output ..\..\..\artifacts || goto :error
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Core.Testing --output ..\..\..\artifacts || goto :error
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Auth --output ..\..\..\artifacts || goto :error
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.HealthCheck --output ..\..\..\artifacts || goto :error
-%DOTNET% pack --configuration Release --include-symbols --include-source Grpc.Reflection --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.Core --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.Core.Testing --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.Auth --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.HealthCheck --output ..\..\..\artifacts || goto :error
+%DOTNET% pack --configuration Release Grpc.Reflection --output ..\..\..\artifacts || goto :error
%NUGET% pack Grpc.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts || goto :error
%NUGET% pack Grpc.Tools.nuspec -Version %VERSION% -OutputDirectory ..\..\artifacts
diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh
index f79c97fbbc..d33923845c 100755
--- a/src/csharp/build_packages_dotnetcli.sh
+++ b/src/csharp/build_packages_dotnetcli.sh
@@ -48,11 +48,11 @@ dotnet restore Grpc.sln
mkdir -p ../../libs/opt
cp nativelibs/csharp_ext_linux_x64/libgrpc_csharp_ext.so ../../libs/opt
-dotnet pack --configuration Release --include-symbols --include-source Grpc.Core --output ../../../artifacts
-dotnet pack --configuration Release --include-symbols --include-source Grpc.Core.Testing --output ../../../artifacts
-dotnet pack --configuration Release --include-symbols --include-source Grpc.Auth --output ../../../artifacts
-dotnet pack --configuration Release --include-symbols --include-source Grpc.HealthCheck --output ../../../artifacts
-dotnet pack --configuration Release --include-symbols --include-source Grpc.Reflection --output ../../../artifacts
+dotnet pack --configuration Release Grpc.Core --output ../../../artifacts
+dotnet pack --configuration Release Grpc.Core.Testing --output ../../../artifacts
+dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts
+dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts
+dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts
nuget pack Grpc.nuspec -Version "1.4.0-dev" -OutputDirectory ../../artifacts
nuget pack Grpc.Tools.nuspec -Version "1.4.0-dev" -OutputDirectory ../../artifacts
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 49179ab359..9453000ad3 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -508,9 +508,14 @@ void Call::DestroyCall() {
}
Call::Call(grpc_call *call)
- : wrapped_call(call), pending_batches(0), has_final_op_completed(false) {}
+ : wrapped_call(call), pending_batches(0), has_final_op_completed(false) {
+ peer = grpc_call_get_peer(call);
+}
-Call::~Call() { DestroyCall(); }
+Call::~Call() {
+ DestroyCall();
+ gpr_free(peer);
+}
void Call::Init(Local<Object> exports) {
HandleScope scope;
@@ -662,6 +667,16 @@ NAN_METHOD(Call::StartBatch) {
}
Local<Function> callback_func = info[1].As<Function>();
Call *call = ObjectWrap::Unwrap<Call>(info.This());
+ if (call->wrapped_call == NULL) {
+ /* This implies that the call has completed and has been destroyed. To
+ * emulate
+ * previous behavior, we should call the callback immediately with an error,
+ * as though the batch had failed in core */
+ Local<Value> argv[] = {
+ Nan::Error("The async function failed because the call has completed")};
+ Nan::Call(callback_func, Nan::New<Object>(), 1, argv);
+ return;
+ }
Local<Object> obj = Nan::To<Object>(info[0]).ToLocalChecked();
Local<Array> keys = Nan::GetOwnPropertyNames(obj).ToLocalChecked();
size_t nops = keys->Length();
@@ -727,6 +742,11 @@ NAN_METHOD(Call::Cancel) {
return Nan::ThrowTypeError("cancel can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
+ if (call->wrapped_call == NULL) {
+ /* Cancel is supposed to be idempotent. If the call has already finished,
+ * cancel should just complete silently */
+ return;
+ }
grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
if (error != GRPC_CALL_OK) {
return Nan::ThrowError(nanErrorWithCode("cancel failed", error));
@@ -747,6 +767,11 @@ NAN_METHOD(Call::CancelWithStatus) {
"cancelWithStatus's second argument must be a string");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
+ if (call->wrapped_call == NULL) {
+ /* Cancel is supposed to be idempotent. If the call has already finished,
+ * cancel should just complete silently */
+ return;
+ }
grpc_status_code code =
static_cast<grpc_status_code>(Nan::To<uint32_t>(info[0]).FromJust());
if (code == GRPC_STATUS_OK) {
@@ -763,9 +788,7 @@ NAN_METHOD(Call::GetPeer) {
return Nan::ThrowTypeError("getPeer can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
- char *peer = grpc_call_get_peer(call->wrapped_call);
- Local<Value> peer_value = Nan::New(peer).ToLocalChecked();
- gpr_free(peer);
+ Local<Value> peer_value = Nan::New(call->peer).ToLocalChecked();
info.GetReturnValue().Set(peer_value);
}
@@ -780,6 +803,10 @@ NAN_METHOD(Call::SetCredentials) {
"setCredentials' first argument must be a CallCredentials");
}
Call *call = ObjectWrap::Unwrap<Call>(info.This());
+ if (call->wrapped_call == NULL) {
+ return Nan::ThrowError(
+ "Cannot set credentials on a call that has already started");
+ }
CallCredentials *creds_object = ObjectWrap::Unwrap<CallCredentials>(
Nan::To<Object>(info[0]).ToLocalChecked());
grpc_call_credentials *creds = creds_object->GetWrappedCredentials();
diff --git a/src/node/ext/call.h b/src/node/ext/call.h
index 0bd24f56a9..8f751279e4 100644
--- a/src/node/ext/call.h
+++ b/src/node/ext/call.h
@@ -96,6 +96,7 @@ class Call : public Nan::ObjectWrap {
call, this is GRPC_OP_RECV_STATUS_ON_CLIENT and for a server call, this
is GRPC_OP_SEND_STATUS_FROM_SERVER */
bool has_final_op_completed;
+ char *peer;
};
class Op {
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 16fe06a54d..b79f058573 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -121,6 +121,12 @@ function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
var message;
+ var self = this;
+ if (this.writeFailed) {
+ /* Once a write fails, just call the callback immediately to let the caller
+ flush any pending writes. */
+ setImmediate(callback);
+ }
try {
message = this.serialize(chunk);
} catch (e) {
@@ -141,8 +147,10 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, event) {
if (err) {
- // Something has gone wrong. Stop writing by failing to call callback
- return;
+ /* Assume that the call is complete and that writing failed because a
+ status was received. In that case, set a flag to discard all future
+ writes */
+ self.writeFailed = true;
}
callback();
});
diff --git a/src/node/test/common_test.js b/src/node/test/common_test.js
index b7c2c6a8d6..db80207e22 100644
--- a/src/node/test/common_test.js
+++ b/src/node/test/common_test.js
@@ -100,7 +100,6 @@ describe('Proto message long int serialize and deserialize', function() {
var longNumDeserialize = deserializeCls(messages_proto.LongValues,
num_options);
var serialized = longSerialize({int_64: pos_value});
- console.log(longDeserialize(serialized));
assert.strictEqual(typeof longDeserialize(serialized).int_64, 'string');
/* With the longsAsStrings option disabled, long values are represented as
* objects with 3 keys: low, high, and unsigned */
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index d2f0511af2..0696e7ae19 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -1110,6 +1110,18 @@ describe('Other conditions', function() {
done();
});
});
+ it('after the call has fully completed', function(done) {
+ var peer;
+ var call = client.unary({error: false}, function(err, data) {
+ assert.ifError(err);
+ setImmediate(function() {
+ assert.strictEqual(peer, call.getPeer());
+ done();
+ });
+ });
+ peer = call.getPeer();
+ assert.strictEqual(typeof peer, 'string');
+ });
});
});
describe('Call propagation', function() {
@@ -1352,4 +1364,17 @@ describe('Cancelling surface client', function() {
});
call.cancel();
});
+ it('Should be idempotent', function(done) {
+ var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
+ assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ // Call asynchronously to try cancelling after call is fully completed
+ setImmediate(function() {
+ assert.doesNotThrow(function() {
+ call.cancel();
+ });
+ done();
+ });
+ });
+ call.cancel();
+ });
});
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 4316449ac6..012ed8ec81 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -786,7 +786,7 @@ def _channel_managed_call_management(state):
class _ChannelConnectivityState(object):
def __init__(self, channel):
- self.lock = threading.Lock()
+ self.lock = threading.RLock()
self.channel = channel
self.polling = False
self.connectivity = None
@@ -926,6 +926,11 @@ class Channel(grpc.Channel):
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
+ # TODO(https://github.com/grpc/grpc/issues/9884)
+ # Temporary work around UNAVAILABLE issues
+ # Remove this once c-core has retry support
+ _subscribe(self._connectivity_state, lambda *args: None, None)
+
def subscribe(self, callback, try_to_connect=None):
_subscribe(self._connectivity_state, callback, try_to_connect)
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index 9f7587faa6..126e8ac60d 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -32,6 +32,7 @@
"unit._invocation_defects_test.InvocationDefectsTest",
"unit._metadata_code_details_test.MetadataCodeDetailsTest",
"unit._metadata_test.MetadataTest",
+ "unit._reconnect_test.ReconnectTest",
"unit._resource_exhausted_test.ResourceExhaustedTest",
"unit._rpc_test.RPCTest",
"unit._sanity._sanity_test.Sanity",
diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
new file mode 100644
index 0000000000..6c316476b3
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
@@ -0,0 +1,70 @@
+# Copyright 2017, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""Tests that a channel will reconnect if a connection is dropped"""
+
+import unittest
+
+import grpc
+from grpc.framework.foundation import logging_pool
+
+from tests.unit.framework.common import test_constants
+
+_REQUEST = b'\x00\x00\x00'
+_RESPONSE = b'\x00\x00\x01'
+
+_UNARY_UNARY = '/test/UnaryUnary'
+
+
+def _handle_unary_unary(unused_request, unused_servicer_context):
+ return _RESPONSE
+
+
+class ReconnectTest(unittest.TestCase):
+
+ def test_reconnect(self):
+ server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ handler = grpc.method_handlers_generic_handler('test', {
+ 'UnaryUnary':
+ grpc.unary_unary_rpc_method_handler(_handle_unary_unary)
+ })
+ server = grpc.server(server_pool, (handler,))
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ channel = grpc.insecure_channel('localhost:%d' % port)
+ multi_callable = channel.unary_unary(_UNARY_UNARY)
+ self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
+ server.stop(None)
+ server = grpc.server(server_pool, (handler,))
+ server.add_insecure_port('[::]:{}'.format(port))
+ server.start()
+ self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb
index d3e5373b0b..bed8c43405 100755
--- a/src/ruby/end2end/channel_closing_driver.rb
+++ b/src/ruby/end2end/channel_closing_driver.rb
@@ -61,6 +61,11 @@ def main
'channel is closed while connectivity is watched'
end
+ client_exit_code = $CHILD_STATUS
+ if client_exit_code != 0
+ fail "channel closing client failed, exit code #{client_exit_code}"
+ end
+
server_runner.stop
end
diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb
index 80fb62899e..9910076dba 100755
--- a/src/ruby/end2end/channel_state_driver.rb
+++ b/src/ruby/end2end/channel_state_driver.rb
@@ -58,6 +58,9 @@ def main
'It likely hangs when ended abruptly'
end
+ # The interrupt in the child process should cause it to
+ # exit a non-zero status, so don't check it here.
+ # This test mainly tries to catch deadlock.
server_runner.stop
end
diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb
index ee79292119..e73ca76850 100755
--- a/src/ruby/end2end/grpc_class_init_client.rb
+++ b/src/ruby/end2end/grpc_class_init_client.rb
@@ -34,44 +34,110 @@
require_relative './end2end_common'
-def main
- grpc_class = ''
- OptionParser.new do |opts|
- opts.on('--grpc_class=P', String) do |p|
- grpc_class = p
+def construct_many(test_proc)
+ thds = []
+ 4.times do
+ thds << Thread.new do
+ 20.times do
+ test_proc.call
+ end
end
- end.parse!
+ end
+ 20.times do
+ test_proc.call
+ end
+ thds.each(&:join)
+end
+
+def run_gc_stress_test(test_proc)
+ GC.disable
+ construct_many(test_proc)
- test_proc = nil
+ GC.enable
+ construct_many(test_proc)
+
+ GC.start(full_mark: true, immediate_sweep: true)
+ construct_many(test_proc)
+end
+
+def run_concurrency_stress_test(test_proc)
+ 100.times do
+ Thread.new do
+ test_proc.call
+ end
+ end
+
+ test_proc.call
+
+ fail 'exception thrown while child thread initing class'
+end
+# default (no gc_stress and no concurrency_stress)
+def run_default_test(test_proc)
+ thd = Thread.new do
+ test_proc.call
+ end
+ test_proc.call
+ thd.join
+end
+
+def get_test_proc(grpc_class)
case grpc_class
when 'channel'
- test_proc = proc do
+ return proc do
GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure)
end
when 'server'
- test_proc = proc do
+ return proc do
GRPC::Core::Server.new({})
end
when 'channel_credentials'
- test_proc = proc do
+ return proc do
GRPC::Core::ChannelCredentials.new
end
when 'call_credentials'
- test_proc = proc do
+ return proc do
GRPC::Core::CallCredentials.new(proc { |noop| noop })
end
when 'compression_options'
- test_proc = proc do
+ return proc do
GRPC::Core::CompressionOptions.new
end
else
fail "bad --grpc_class=#{grpc_class} param"
end
+end
- th = Thread.new { test_proc.call }
- test_proc.call
- th.join
+def main
+ grpc_class = ''
+ stress_test = ''
+ OptionParser.new do |opts|
+ opts.on('--grpc_class=P', String) do |p|
+ grpc_class = p
+ end
+ opts.on('--stress_test=P') do |p|
+ stress_test = p
+ end
+ end.parse!
+
+ test_proc = get_test_proc(grpc_class)
+
+ # the different test configs need to be ran
+ # in separate processes, since each one tests
+ # clean shutdown in a different way
+ case stress_test
+ when 'gc'
+ p 'run gc stress'
+ run_gc_stress_test(test_proc)
+ when 'concurrency'
+ p 'run concurrency stress'
+ run_concurrency_stress_test(test_proc)
+ when ''
+ p 'run default'
+ run_default_test(test_proc)
+ else
+ fail "bad --stress_test=#{stress_test} param"
+ end
end
main
diff --git a/src/ruby/end2end/grpc_class_init_driver.rb b/src/ruby/end2end/grpc_class_init_driver.rb
index 764d029f14..c65ed547c5 100755
--- a/src/ruby/end2end/grpc_class_init_driver.rb
+++ b/src/ruby/end2end/grpc_class_init_driver.rb
@@ -38,29 +38,40 @@ def main
call_credentials
compression_options )
- native_grpc_classes.each do |grpc_class|
- STDERR.puts 'start client'
- this_dir = File.expand_path(File.dirname(__FILE__))
- client_path = File.join(this_dir, 'grpc_class_init_client.rb')
- client_pid = Process.spawn(RbConfig.ruby,
- client_path,
- "--grpc_class=#{grpc_class}")
- begin
- Timeout.timeout(10) do
- Process.wait(client_pid)
+ # there is room for false positives in this test,
+ # do a few runs for each config
+ 4.times do
+ native_grpc_classes.each do |grpc_class|
+ ['', 'gc', 'concurrency'].each do |stress_test_type|
+ STDERR.puts 'start client'
+ this_dir = File.expand_path(File.dirname(__FILE__))
+ client_path = File.join(this_dir, 'grpc_class_init_client.rb')
+ client_pid = Process.spawn(RbConfig.ruby,
+ client_path,
+ "--grpc_class=#{grpc_class}",
+ "--stress_test=#{stress_test_type}")
+ begin
+ Timeout.timeout(10) do
+ Process.wait(client_pid)
+ end
+ rescue Timeout::Error
+ STDERR.puts "timeout waiting for client pid #{client_pid}"
+ Process.kill('SIGKILL', client_pid)
+ Process.wait(client_pid)
+ STDERR.puts 'killed client child'
+ raise 'Timed out waiting for client process. ' \
+ 'It likely hangs when the first constructed gRPC object has ' \
+ "type: #{grpc_class}"
+ end
+
+ client_exit_code = $CHILD_STATUS
+ # concurrency stress test type is expected to exit with a
+ # non-zero status due to an exception being raised
+ if client_exit_code != 0 && stress_test_type != 'concurrency'
+ fail "client failed, exit code #{client_exit_code}"
+ end
end
- rescue Timeout::Error
- STDERR.puts "timeout waiting for client pid #{client_pid}"
- Process.kill('SIGKILL', client_pid)
- Process.wait(client_pid)
- STDERR.puts 'killed client child'
- raise 'Timed out waiting for client process. ' \
- 'It likely hangs when the first constructed gRPC object has ' \
- "type: #{grpc_class}"
end
-
- client_exit_code = $CHILD_STATUS
- fail "client failed, exit code #{client_exit_code}" if client_exit_code != 0
end
end
diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb
new file mode 100755
index 0000000000..206ec8e801
--- /dev/null
+++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb
@@ -0,0 +1,63 @@
+#!/usr/bin/env ruby
+
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require_relative './end2end_common'
+
+Thread.abort_on_exception = true
+
+include GRPC::Core::ConnectivityStates
+
+def watch_state(ch)
+ thd = Thread.new do
+ state = ch.connectivity_state(false)
+ fail "non-idle state: #{state}" unless state == IDLE
+ ch.watch_connectivity_state(IDLE, Time.now + 360)
+ end
+ sleep 0.1
+ thd.kill
+end
+
+def main
+ channels = []
+ 10.times do
+ ch = GRPC::Core::Channel.new('dummy_host',
+ nil, :this_channel_is_insecure)
+ watch_state(ch)
+ channels << ch
+ end
+
+ # checking state should still be safe to call
+ channels.each do |c|
+ fail unless c.connectivity_state(false) == FATAL_FAILURE
+ end
+end
+
+main
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_client.rb b/src/ruby/end2end/sig_int_during_channel_watch_client.rb
index 389fc5ba33..0c6a374925 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_client.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_client.rb
@@ -46,6 +46,8 @@ def main
end
end.parse!
+ trap('SIGINT') { exit 0 }
+
thd = Thread.new do
child_thread_channel = GRPC::Core::Channel.new("localhost:#{server_port}",
{},
diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
index 670cda0919..79a8c133fa 100755
--- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
+++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb
@@ -63,6 +63,11 @@ def main
'SIGINT is sent while there is an active connectivity_state call'
end
+ client_exit_code = $CHILD_STATUS
+ if client_exit_code != 0
+ fail "sig_int_during_channel_watch_client failed: #{client_exit_code}"
+ end
+
server_runner.stop
end
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index a802183726..f65388448a 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -68,29 +68,53 @@ static VALUE grpc_rb_cChannel = Qnil;
/* Used during the conversion of a hash to channel args during channel setup */
static VALUE grpc_rb_cChannelArgs;
+typedef struct bg_watched_channel {
+ grpc_channel *channel;
+ // these fields must only be accessed under global_connection_polling_mu
+ struct bg_watched_channel *next;
+ int channel_destroyed;
+ int refcount;
+} bg_watched_channel;
+
/* grpc_rb_channel wraps a grpc_channel. */
typedef struct grpc_rb_channel {
VALUE credentials;
- /* The actual channel */
- grpc_channel *wrapped;
- int request_safe_destroy;
- int safe_to_destroy;
- grpc_connectivity_state current_connectivity_state;
-
- int mu_init_done;
- int abort_watch_connectivity_state;
- gpr_mu channel_mu;
- gpr_cv channel_cv;
+ /* The actual channel (protected in a wrapper to tell when it's safe to
+ * destroy) */
+ bg_watched_channel *bg_wrapped;
} grpc_rb_channel;
-/* Forward declarations of functions involved in temporary fix to
- * https://github.com/grpc/grpc/issues/9941 */
+typedef enum { CONTINUOUS_WATCH, WATCH_STATE_API } watch_state_op_type;
+
+typedef struct watch_state_op {
+ watch_state_op_type op_type;
+ // from event.success
+ union {
+ struct {
+ int success;
+ // has been called back due to a cq next call
+ int called_back;
+ } api_callback_args;
+ struct {
+ bg_watched_channel *bg;
+ } continuous_watch_callback_args;
+ } op;
+} watch_state_op;
+
+static bg_watched_channel *bg_watched_channel_list_head = NULL;
+
static void grpc_rb_channel_try_register_connection_polling(
- grpc_rb_channel *wrapper);
-static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
+ bg_watched_channel *bg);
static void *wait_until_channel_polling_thread_started_no_gil(void *);
static void wait_until_channel_polling_thread_started_unblocking_func(void *);
+static void *channel_init_try_register_connection_polling_without_gil(
+ void *arg);
+
+typedef struct channel_init_try_register_stack {
+ grpc_channel *channel;
+ grpc_rb_channel *wrapper;
+} channel_init_try_register_stack;
static grpc_completion_queue *channel_polling_cq;
static gpr_mu global_connection_polling_mu;
@@ -98,6 +122,42 @@ static gpr_cv global_connection_polling_cv;
static int abort_channel_polling = 0;
static int channel_polling_thread_started = 0;
+static int bg_watched_channel_list_lookup(bg_watched_channel *bg);
+static bg_watched_channel *bg_watched_channel_list_create_and_add(
+ grpc_channel *channel);
+static void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg);
+static void run_poll_channels_loop_unblocking_func(void *arg);
+
+// Needs to be called under global_connection_polling_mu
+static void grpc_rb_channel_watch_connection_state_op_complete(
+ watch_state_op *op, int success) {
+ GPR_ASSERT(!op->op.api_callback_args.called_back);
+ op->op.api_callback_args.called_back = 1;
+ op->op.api_callback_args.success = success;
+ // wake up the watch API call thats waiting on this op
+ gpr_cv_broadcast(&global_connection_polling_cv);
+}
+
+/* Avoids destroying a channel twice. */
+static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) {
+ gpr_mu_lock(&global_connection_polling_mu);
+ GPR_ASSERT(bg_watched_channel_list_lookup(bg));
+ if (!bg->channel_destroyed) {
+ grpc_channel_destroy(bg->channel);
+ bg->channel_destroyed = 1;
+ }
+ bg->refcount--;
+ if (bg->refcount == 0) {
+ bg_watched_channel_list_free_and_remove(bg);
+ }
+ gpr_mu_unlock(&global_connection_polling_mu);
+}
+
+static void *channel_safe_destroy_without_gil(void *arg) {
+ grpc_rb_channel_safe_destroy((bg_watched_channel *)arg);
+ return NULL;
+}
+
/* Destroys Channel instances. */
static void grpc_rb_channel_free(void *p) {
grpc_rb_channel *ch = NULL;
@@ -106,14 +166,13 @@ static void grpc_rb_channel_free(void *p) {
};
ch = (grpc_rb_channel *)p;
- if (ch->wrapped != NULL) {
- grpc_rb_channel_safe_destroy(ch);
- ch->wrapped = NULL;
- }
-
- if (ch->mu_init_done) {
- gpr_mu_destroy(&ch->channel_mu);
- gpr_cv_destroy(&ch->channel_cv);
+ if (ch->bg_wrapped != NULL) {
+ /* assumption made here: it's ok to directly gpr_mu_lock the global
+ * connection polling mutex becuse we're in a finalizer,
+ * and we can count on this thread to not be interrupted or
+ * yield the gil. */
+ grpc_rb_channel_safe_destroy(ch->bg_wrapped);
+ ch->bg_wrapped = NULL;
}
xfree(p);
@@ -146,7 +205,7 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
/* Allocates grpc_rb_channel instances. */
static VALUE grpc_rb_channel_alloc(VALUE cls) {
grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel);
- wrapper->wrapped = NULL;
+ wrapper->bg_wrapped = NULL;
wrapper->credentials = Qnil;
return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper);
}
@@ -168,18 +227,21 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
grpc_channel_credentials *creds = NULL;
char *target_chars = NULL;
grpc_channel_args args;
+ channel_init_try_register_stack stack;
+ int stop_waiting_for_thread_start = 0;
MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
rb_thread_call_without_gvl(
- wait_until_channel_polling_thread_started_no_gil, NULL,
- wait_until_channel_polling_thread_started_unblocking_func, NULL);
+ wait_until_channel_polling_thread_started_no_gil,
+ &stop_waiting_for_thread_start,
+ wait_until_channel_polling_thread_started_unblocking_func,
+ &stop_waiting_for_thread_start);
/* "3" == 3 mandatory args */
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- wrapper->mu_init_done = 0;
target_chars = StringValueCStr(target);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
if (TYPE(credentials) == T_SYMBOL) {
@@ -196,24 +258,11 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
}
GPR_ASSERT(ch);
-
- wrapper->wrapped = ch;
-
- gpr_mu_init(&wrapper->channel_mu);
- gpr_cv_init(&wrapper->channel_cv);
- wrapper->mu_init_done = 1;
-
- gpr_mu_lock(&wrapper->channel_mu);
- wrapper->abort_watch_connectivity_state = 0;
- wrapper->current_connectivity_state =
- grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
- wrapper->safe_to_destroy = 0;
- wrapper->request_safe_destroy = 0;
-
- gpr_cv_broadcast(&wrapper->channel_cv);
- gpr_mu_unlock(&wrapper->channel_mu);
-
- grpc_rb_channel_try_register_connection_polling(wrapper);
+ stack.channel = ch;
+ stack.wrapper = wrapper;
+ rb_thread_call_without_gvl(
+ channel_init_try_register_connection_polling_without_gil, &stack, NULL,
+ NULL);
if (args.args != NULL) {
xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
@@ -224,10 +273,31 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
return Qnil;
}
rb_ivar_set(self, id_target, target);
- wrapper->wrapped = ch;
return self;
}
+typedef struct get_state_stack {
+ bg_watched_channel *bg;
+ int try_to_connect;
+ int out;
+} get_state_stack;
+
+static void *get_state_without_gil(void *arg) {
+ get_state_stack *stack = (get_state_stack *)arg;
+
+ gpr_mu_lock(&global_connection_polling_mu);
+ GPR_ASSERT(abort_channel_polling || channel_polling_thread_started);
+ if (stack->bg->channel_destroyed) {
+ stack->out = GRPC_CHANNEL_SHUTDOWN;
+ } else {
+ stack->out = grpc_channel_check_connectivity_state(stack->bg->channel,
+ stack->try_to_connect);
+ }
+ gpr_mu_unlock(&global_connection_polling_mu);
+
+ return NULL;
+}
+
/*
call-seq:
ch.connectivity_state -> state
@@ -240,59 +310,69 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
VALUE self) {
VALUE try_to_connect_param = Qfalse;
- int grpc_try_to_connect = 0;
grpc_rb_channel *wrapper = NULL;
- grpc_channel *ch = NULL;
+ get_state_stack stack;
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
rb_scan_args(argc, argv, "01", &try_to_connect_param);
- grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- ch = wrapper->wrapped;
- if (ch == NULL) {
+ if (wrapper->bg_wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
- return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped,
- grpc_try_to_connect));
+
+ stack.bg = wrapper->bg_wrapped;
+ stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
+ rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL);
+
+ return LONG2NUM(stack.out);
}
typedef struct watch_state_stack {
- grpc_rb_channel *wrapper;
+ grpc_channel *channel;
gpr_timespec deadline;
int last_state;
} watch_state_stack;
-static void *watch_channel_state_without_gvl(void *arg) {
+static void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
watch_state_stack *stack = (watch_state_stack *)arg;
- gpr_timespec deadline = stack->deadline;
- grpc_rb_channel *wrapper = stack->wrapper;
- int last_state = stack->last_state;
- void *return_value = (void *)0;
+ watch_state_op *op = NULL;
+ void *success = (void *)0;
- gpr_mu_lock(&wrapper->channel_mu);
- while (wrapper->current_connectivity_state == last_state &&
- !wrapper->request_safe_destroy && !wrapper->safe_to_destroy &&
- !wrapper->abort_watch_connectivity_state &&
- gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
- gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
+ gpr_mu_lock(&global_connection_polling_mu);
+ // its unsafe to do a "watch" after "channel polling abort" because the cq has
+ // been shut down.
+ if (abort_channel_polling) {
+ gpr_mu_unlock(&global_connection_polling_mu);
+ return (void *)0;
}
- if (wrapper->current_connectivity_state != last_state) {
- return_value = (void *)1;
+ op = gpr_zalloc(sizeof(watch_state_op));
+ op->op_type = WATCH_STATE_API;
+ grpc_channel_watch_connectivity_state(stack->channel, stack->last_state,
+ stack->deadline, channel_polling_cq,
+ op);
+
+ while (!op->op.api_callback_args.called_back) {
+ gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
}
- gpr_mu_unlock(&wrapper->channel_mu);
+ if (op->op.api_callback_args.success) {
+ success = (void *)1;
+ }
+ gpr_free(op);
+ gpr_mu_unlock(&global_connection_polling_mu);
- return return_value;
+ return success;
}
-
-static void watch_channel_state_unblocking_func(void *arg) {
- grpc_rb_channel *wrapper = (grpc_rb_channel *)arg;
- gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
- gpr_mu_lock(&wrapper->channel_mu);
- wrapper->abort_watch_connectivity_state = 1;
- gpr_cv_broadcast(&wrapper->channel_cv);
- gpr_mu_unlock(&wrapper->channel_mu);
+static void wait_for_watch_state_op_complete_unblocking_func(void *arg) {
+ bg_watched_channel *bg = (bg_watched_channel *)arg;
+ gpr_mu_lock(&global_connection_polling_mu);
+ if (!bg->channel_destroyed) {
+ grpc_channel_destroy(bg->channel);
+ bg->channel_destroyed = 1;
+ }
+ gpr_mu_unlock(&global_connection_polling_mu);
}
/* Wait until the channel's connectivity state becomes different from
@@ -307,11 +387,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
watch_state_stack stack;
- void *out;
+ void *op_success = 0;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- if (wrapper->wrapped == NULL) {
+ if (wrapper->bg_wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
@@ -323,16 +403,15 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil;
}
- stack.wrapper = wrapper;
- stack.deadline = grpc_rb_time_timeval(deadline, 0);
+ stack.channel = wrapper->bg_wrapped->channel;
+ stack.deadline = grpc_rb_time_timeval(deadline, 0),
stack.last_state = NUM2LONG(last_state);
- out =
- rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack,
- watch_channel_state_unblocking_func, wrapper);
- if (out) {
- return Qtrue;
- }
- return Qfalse;
+
+ op_success = rb_thread_call_without_gvl(
+ wait_for_watch_state_op_complete_without_gvl, &stack,
+ wait_for_watch_state_op_complete_unblocking_func, wrapper->bg_wrapped);
+
+ return op_success ? Qtrue : Qfalse;
}
/* Create a call given a grpc_channel, in order to call method. The request
@@ -344,7 +423,6 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL;
grpc_call *parent_call = NULL;
- grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
int flags = GRPC_PROPAGATE_DEFAULTS;
grpc_slice method_slice;
@@ -366,8 +444,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
cq = grpc_completion_queue_create_for_pluck(NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- ch = wrapper->wrapped;
- if (ch == NULL) {
+ if (wrapper->bg_wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
@@ -375,8 +452,8 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
method_slice =
grpc_slice_from_copied_buffer(RSTRING_PTR(method), RSTRING_LEN(method));
- call = grpc_channel_create_call(ch, parent_call, flags, cq, method_slice,
- host_slice_ptr,
+ call = grpc_channel_create_call(wrapper->bg_wrapped->channel, parent_call,
+ flags, cq, method_slice, host_slice_ptr,
grpc_rb_time_timeval(deadline,
/* absolute time */ 0),
NULL);
@@ -401,15 +478,16 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
}
/* Closes the channel, calling it's destroy method */
+/* Note this is an API-level call; a wrapped channel's finalizer doesn't call
+ * this */
static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL;
- grpc_channel *ch = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- ch = wrapper->wrapped;
- if (ch != NULL) {
- grpc_rb_channel_safe_destroy(wrapper);
- wrapper->wrapped = NULL;
+ if (wrapper->bg_wrapped != NULL) {
+ rb_thread_call_without_gvl(channel_safe_destroy_without_gil,
+ wrapper->bg_wrapped, NULL, NULL);
+ wrapper->bg_wrapped = NULL;
}
return Qnil;
@@ -422,64 +500,110 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
char *target = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- target = grpc_channel_get_target(wrapper->wrapped);
+ target = grpc_channel_get_target(wrapper->bg_wrapped->channel);
res = rb_str_new2(target);
gpr_free(target);
return res;
}
-// Either start polling channel connection state or signal that it's free to
-// destroy.
-// Not safe to call while a channel's connection state is polled.
-static void grpc_rb_channel_try_register_connection_polling(
- grpc_rb_channel *wrapper) {
- grpc_connectivity_state conn_state;
- gpr_timespec sleep_time = gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
-
- GPR_ASSERT(wrapper);
- GPR_ASSERT(wrapper->wrapped);
- gpr_mu_lock(&wrapper->channel_mu);
- if (wrapper->request_safe_destroy) {
- wrapper->safe_to_destroy = 1;
- gpr_cv_broadcast(&wrapper->channel_cv);
- gpr_mu_unlock(&wrapper->channel_mu);
- return;
+/* Needs to be called under global_connection_polling_mu */
+static int bg_watched_channel_list_lookup(bg_watched_channel *target) {
+ bg_watched_channel *cur = bg_watched_channel_list_head;
+
+ while (cur != NULL) {
+ if (cur == target) {
+ return 1;
+ }
+ cur = cur->next;
}
- gpr_mu_lock(&global_connection_polling_mu);
- GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
- conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
- if (conn_state != wrapper->current_connectivity_state) {
- wrapper->current_connectivity_state = conn_state;
- gpr_cv_broadcast(&wrapper->channel_cv);
- }
- // avoid posting work to the channel polling cq if it's been shutdown
- if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_channel_watch_connectivity_state(
- wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
- } else {
- wrapper->safe_to_destroy = 1;
- gpr_cv_broadcast(&wrapper->channel_cv);
+ return 0;
+}
+
+/* Needs to be called under global_connection_polling_mu */
+static bg_watched_channel *bg_watched_channel_list_create_and_add(
+ grpc_channel *channel) {
+ bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel));
+
+ watched->channel = channel;
+ watched->next = bg_watched_channel_list_head;
+ watched->refcount = 1;
+ bg_watched_channel_list_head = watched;
+ return watched;
+}
+
+/* Needs to be called under global_connection_polling_mu */
+static void bg_watched_channel_list_free_and_remove(
+ bg_watched_channel *target) {
+ bg_watched_channel *bg = NULL;
+
+ GPR_ASSERT(bg_watched_channel_list_lookup(target));
+ GPR_ASSERT(target->channel_destroyed && target->refcount == 0);
+ if (bg_watched_channel_list_head == target) {
+ bg_watched_channel_list_head = target->next;
+ gpr_free(target);
+ return;
+ }
+ bg = bg_watched_channel_list_head;
+ while (bg != NULL && bg->next != NULL) {
+ if (bg->next == target) {
+ bg->next = bg->next->next;
+ gpr_free(target);
+ return;
+ }
+ bg = bg->next;
}
+ GPR_ASSERT(0);
+}
+
+/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
+ * it onto the background thread for constant watches. */
+static void *channel_init_try_register_connection_polling_without_gil(
+ void *arg) {
+ channel_init_try_register_stack *stack =
+ (channel_init_try_register_stack *)arg;
+
+ gpr_mu_lock(&global_connection_polling_mu);
+ stack->wrapper->bg_wrapped =
+ bg_watched_channel_list_create_and_add(stack->channel);
+ grpc_rb_channel_try_register_connection_polling(stack->wrapper->bg_wrapped);
gpr_mu_unlock(&global_connection_polling_mu);
- gpr_mu_unlock(&wrapper->channel_mu);
+ return NULL;
}
-// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
-static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
- gpr_mu_lock(&wrapper->channel_mu);
- wrapper->request_safe_destroy = 1;
+// Needs to be called under global_connection_poolling_mu
+static void grpc_rb_channel_try_register_connection_polling(
+ bg_watched_channel *bg) {
+ grpc_connectivity_state conn_state;
+ watch_state_op *op = NULL;
- while (!wrapper->safe_to_destroy) {
- gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
+
+ if (bg->refcount == 0) {
+ GPR_ASSERT(bg->channel_destroyed);
+ bg_watched_channel_list_free_and_remove(bg);
+ return;
+ }
+ GPR_ASSERT(bg->refcount == 1);
+ if (bg->channel_destroyed || abort_channel_polling) {
+ return;
}
- GPR_ASSERT(wrapper->safe_to_destroy);
- gpr_mu_unlock(&wrapper->channel_mu);
- grpc_channel_destroy(wrapper->wrapped);
+ conn_state = grpc_channel_check_connectivity_state(bg->channel, 0);
+ if (conn_state == GRPC_CHANNEL_SHUTDOWN) {
+ return;
+ }
+ GPR_ASSERT(bg_watched_channel_list_lookup(bg));
+ // prevent bg from being free'd by GC while background thread is watching it
+ bg->refcount++;
+
+ op = gpr_zalloc(sizeof(watch_state_op));
+ op->op_type = CONTINUOUS_WATCH;
+ op->op.continuous_watch_callback_args.bg = bg;
+ grpc_channel_watch_connectivity_state(bg->channel, conn_state,
+ gpr_inf_future(GPR_CLOCK_REALTIME),
+ channel_polling_cq, op);
}
// Note this loop breaks out with a single call of
@@ -490,6 +614,8 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
// early and falls back to current behavior.
static void *run_poll_channels_loop_no_gil(void *arg) {
grpc_event event;
+ watch_state_op *op = NULL;
+ bg_watched_channel *bg = NULL;
(void)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin");
@@ -505,10 +631,22 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
if (event.type == GRPC_QUEUE_SHUTDOWN) {
break;
}
+ gpr_mu_lock(&global_connection_polling_mu);
if (event.type == GRPC_OP_COMPLETE) {
- grpc_rb_channel_try_register_connection_polling(
- (grpc_rb_channel *)event.tag);
+ op = (watch_state_op *)event.tag;
+ if (op->op_type == CONTINUOUS_WATCH) {
+ bg = (bg_watched_channel *)op->op.continuous_watch_callback_args.bg;
+ bg->refcount--;
+ grpc_rb_channel_try_register_connection_polling(bg);
+ gpr_free(op);
+ } else if (op->op_type == WATCH_STATE_API) {
+ grpc_rb_channel_watch_connection_state_op_complete(
+ (watch_state_op *)event.tag, event.success);
+ } else {
+ GPR_ASSERT(0);
+ }
}
+ gpr_mu_unlock(&global_connection_polling_mu);
}
grpc_completion_queue_destroy(channel_polling_cq);
gpr_log(GPR_DEBUG,
@@ -519,14 +657,36 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
// Notify the channel polling loop to cleanup and shutdown.
static void run_poll_channels_loop_unblocking_func(void *arg) {
+ bg_watched_channel *bg = NULL;
(void)arg;
+
gpr_mu_lock(&global_connection_polling_mu);
gpr_log(GPR_DEBUG,
- "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting "
+ "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting "
"connection polling");
+ // early out after first time through
+ if (abort_channel_polling) {
+ gpr_mu_unlock(&global_connection_polling_mu);
+ return;
+ }
abort_channel_polling = 1;
+
+ // force pending watches to end by switching to shutdown state
+ bg = bg_watched_channel_list_head;
+ while (bg != NULL) {
+ if (!bg->channel_destroyed) {
+ grpc_channel_destroy(bg->channel);
+ bg->channel_destroyed = 1;
+ }
+ bg = bg->next;
+ }
+
grpc_completion_queue_shutdown(channel_polling_cq);
+ gpr_cv_broadcast(&global_connection_polling_cv);
gpr_mu_unlock(&global_connection_polling_mu);
+ gpr_log(GPR_DEBUG,
+ "GRPC_RUBY: run_poll_channels_loop_unblocking_func - end aborting "
+ "connection polling");
}
// Poll channel connectivity states in background thread without the GIL.
@@ -542,10 +702,11 @@ static VALUE run_poll_channels_loop(VALUE arg) {
}
static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
- (void)arg;
+ int *stop_waiting = (int *)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start");
gpr_mu_lock(&global_connection_polling_mu);
- while (!channel_polling_thread_started && !abort_channel_polling) {
+ while (!channel_polling_thread_started && !abort_channel_polling &&
+ !*stop_waiting) {
gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
@@ -556,15 +717,22 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
static void wait_until_channel_polling_thread_started_unblocking_func(
void *arg) {
- (void)arg;
+ int *stop_waiting = (int *)arg;
gpr_mu_lock(&global_connection_polling_mu);
gpr_log(GPR_DEBUG,
- "GRPC_RUBY: "
- "wait_until_channel_polling_thread_started_unblocking_func - begin "
- "aborting connection polling");
+ "GRPC_RUBY: interrupt wait for channel polling thread to start");
+ *stop_waiting = 1;
+ gpr_cv_broadcast(&global_connection_polling_cv);
+ gpr_mu_unlock(&global_connection_polling_mu);
+}
+
+static void *set_abort_channel_polling_without_gil(void *arg) {
+ (void)arg;
+ gpr_mu_lock(&global_connection_polling_mu);
abort_channel_polling = 1;
gpr_cv_broadcast(&global_connection_polling_cv);
gpr_mu_unlock(&global_connection_polling_mu);
+ return NULL;
}
/* Temporary fix for
@@ -592,10 +760,8 @@ void grpc_rb_channel_polling_thread_start() {
if (!RTEST(background_thread)) {
gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread");
- gpr_mu_lock(&global_connection_polling_mu);
- abort_channel_polling = 1;
- gpr_cv_broadcast(&global_connection_polling_cv);
- gpr_mu_unlock(&global_connection_polling_mu);
+ rb_thread_call_without_gvl(set_abort_channel_polling_without_gil, NULL,
+ NULL, NULL);
}
}
@@ -674,5 +840,5 @@ void Init_grpc_channel() {
grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) {
grpc_rb_channel *wrapper = NULL;
TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- return wrapper->wrapped;
+ return wrapper->bg_wrapped->channel;
}
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 9a3b56ddfb..71138265c8 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -105,16 +105,16 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) {
grpc_rb_event *event = NULL;
(void)param;
gpr_mu_lock(&event_queue.mu);
- while ((event = grpc_rb_event_queue_dequeue()) == NULL) {
- gpr_cv_wait(&event_queue.cv, &event_queue.mu,
- gpr_inf_future(GPR_CLOCK_REALTIME));
- if (event_queue.abort) {
+ while (!event_queue.abort) {
+ if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
gpr_mu_unlock(&event_queue.mu);
- return NULL;
+ return event;
}
+ gpr_cv_wait(&event_queue.cv, &event_queue.mu,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
}
gpr_mu_unlock(&event_queue.mu);
- return event;
+ return NULL;
}
static void grpc_rb_event_unblocking_func(void *arg) {
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 5be8861e0c..c319cd1391 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -292,11 +292,12 @@ static gpr_once g_once_init = GPR_ONCE_INIT;
static void grpc_ruby_once_init_internal() {
grpc_init();
- grpc_rb_event_queue_thread_start();
- grpc_rb_channel_polling_thread_start();
atexit(grpc_rb_shutdown);
}
+static VALUE bg_thread_init_rb_mu = Qundef;
+static int bg_thread_init_done = 0;
+
void grpc_ruby_once_init() {
/* ruby_vm_at_exit doesn't seem to be working. It would crash once every
* blue moon, and some users are getting it repeatedly. See the discussions
@@ -309,6 +310,18 @@ void grpc_ruby_once_init() {
* schedule our initialization and destruction only once.
*/
gpr_once_init(&g_once_init, grpc_ruby_once_init_internal);
+
+ // Avoid calling calling into ruby library (when creating threads here)
+ // in gpr_once_init. In general, it appears to be unsafe to call
+ // into the ruby library while holding a non-ruby mutex, because a gil yield
+ // could end up trying to lock onto that same mutex and deadlocking.
+ rb_mutex_lock(bg_thread_init_rb_mu);
+ if (!bg_thread_init_done) {
+ grpc_rb_event_queue_thread_start();
+ grpc_rb_channel_polling_thread_start();
+ bg_thread_init_done = 1;
+ }
+ rb_mutex_unlock(bg_thread_init_rb_mu);
}
void Init_grpc_c() {
@@ -317,6 +330,9 @@ void Init_grpc_c() {
return;
}
+ bg_thread_init_rb_mu = rb_mutex_new();
+ rb_global_variable(&bg_thread_init_rb_mu);
+
grpc_rb_mGRPC = rb_define_module("GRPC");
grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
grpc_rb_sNewServerRpc = rb_struct_define(
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 8f76420af6..f6a4ff6795 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -103,6 +103,7 @@ grpc_alarm_create_type grpc_alarm_create_import;
grpc_alarm_cancel_type grpc_alarm_cancel_import;
grpc_alarm_destroy_type grpc_alarm_destroy_import;
grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
+grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import;
grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
grpc_channel_create_call_type grpc_channel_create_call_import;
grpc_channel_ping_type grpc_channel_ping_import;
@@ -405,6 +406,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel");
grpc_alarm_destroy_import = (grpc_alarm_destroy_type) GetProcAddress(library, "grpc_alarm_destroy");
grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state");
+ grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers");
grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state");
grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call");
grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 58467f97e8..0d64290b55 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -260,6 +260,9 @@ extern grpc_alarm_destroy_type grpc_alarm_destroy_import;
typedef grpc_connectivity_state(*grpc_channel_check_connectivity_state_type)(grpc_channel *channel, int try_to_connect);
extern grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
#define grpc_channel_check_connectivity_state grpc_channel_check_connectivity_state_import
+typedef int(*grpc_channel_num_external_connectivity_watchers_type)(grpc_channel *channel);
+extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import;
+#define grpc_channel_num_external_connectivity_watchers grpc_channel_num_external_connectivity_watchers_import
typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag);
extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
#define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import
diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb
index 940d68b9b0..c8a7856a09 100644
--- a/src/ruby/spec/channel_connection_spec.rb
+++ b/src/ruby/spec/channel_connection_spec.rb
@@ -28,6 +28,10 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc'
+require 'timeout'
+
+include Timeout
+include GRPC::Core
# A test message
class EchoMsg
@@ -62,7 +66,7 @@ end
EchoStub = EchoService.rpc_stub_class
def start_server(port = 0)
- @srv = GRPC::RpcServer.new
+ @srv = GRPC::RpcServer.new(pool_size: 1)
server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
@srv.handle(EchoService)
@server_thd = Thread.new { @srv.run }
@@ -138,4 +142,32 @@ describe 'channel connection behavior' do
stop_server
end
+
+ it 'concurrent watches on the same channel' do
+ timeout(180) do
+ port = start_server
+ ch = GRPC::Core::Channel.new("localhost:#{port}", {},
+ :this_channel_is_insecure)
+ stop_server
+
+ thds = []
+ 50.times do
+ thds << Thread.new do
+ while ch.connectivity_state(true) != ConnectivityStates::READY
+ ch.watch_connectivity_state(
+ ConnectivityStates::READY, Time.now + 60)
+ break
+ end
+ end
+ end
+
+ sleep 0.01
+
+ start_server(port)
+
+ thds.each(&:join)
+
+ stop_server
+ end
+ end
end