aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/client_config/client_config.c6
-rw-r--r--src/core/client_config/lb_policies/pick_first.c2
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c57
-rw-r--r--src/core/client_config/subchannel.c14
-rw-r--r--src/core/iomgr/iocp_windows.c14
-rw-r--r--src/core/iomgr/iocp_windows.h11
-rw-r--r--src/core/iomgr/resolve_address.h6
-rw-r--r--src/core/iomgr/resolve_address_posix.c19
-rw-r--r--src/core/iomgr/resolve_address_windows.c19
-rw-r--r--src/core/iomgr/tcp_server_windows.c3
-rw-r--r--src/core/iomgr/timer.c16
-rw-r--r--src/core/iomgr/timer.h1
-rw-r--r--src/core/support/alloc.c4
-rw-r--r--src/core/surface/completion_queue.c2
-rw-r--r--src/core/tsi/ssl_transport_security.c76
-rw-r--r--src/core/tsi/ssl_transport_security.h5
-rw-r--r--src/cpp/server/server_context.cc26
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs2
-rw-r--r--src/node/src/client.js3
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m37
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.h3
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.m14
-rw-r--r--src/objective-c/GRPCClient/private/GRPCCompletionQueue.h7
-rw-r--r--src/objective-c/GRPCClient/private/GRPCCompletionQueue.m31
-rw-r--r--src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h77
-rw-r--r--src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m192
-rw-r--r--src/objective-c/GRPCClient/private/GRPCHost.h26
-rw-r--r--src/objective-c/GRPCClient/private/GRPCHost.m97
-rw-r--r--src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h65
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.h2
-rw-r--r--src/objective-c/RxLibrary/GRXWriteable.m43
-rw-r--r--src/objective-c/tests/GRPCClientTests.m2
-rw-r--r--src/objective-c/tests/RxLibraryUnitTests.m51
-rw-r--r--src/php/composer.json6
-rw-r--r--src/php/ext/grpc/README.md67
-rw-r--r--src/php/tests/generated_code/math_client.php6
-rw-r--r--src/proto/grpc/testing/echo_messages.proto1
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi1
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi1
-rw-r--r--src/python/grpcio/tests/_runner.py13
-rw-r--r--src/python/grpcio/tests/tests.json9
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py381
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py4
43 files changed, 801 insertions, 621 deletions
diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c
index 6ecffb3854..c500af25ee 100644
--- a/src/core/client_config/client_config.c
+++ b/src/core/client_config/client_config.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -53,7 +53,9 @@ void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); }
void grpc_client_config_unref(grpc_exec_ctx *exec_ctx, grpc_client_config *c) {
if (gpr_unref(&c->refs)) {
- GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config");
+ if (c->lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config");
+ }
gpr_free(c);
}
}
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 81167b31c8..8ed1223d39 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -387,8 +387,8 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
+ if (args->num_subchannels == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
- GPR_ASSERT(args->num_subchannels > 0);
memset(p, 0, sizeof(*p));
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels =
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 376b6b3d76..e28e4757a1 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -41,6 +41,7 @@
#include "src/core/client_config/lb_policy_registry.h"
#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/timer.h"
#include "src/core/support/string.h"
typedef struct {
@@ -71,6 +72,9 @@ typedef struct {
grpc_client_config **target_config;
/** current (fully resolved) config */
grpc_client_config *resolved_config;
+ /** retry timer */
+ bool have_retry_timer;
+ grpc_timer retry_timer;
} dns_resolver;
static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
@@ -91,6 +95,9 @@ static const grpc_resolver_vtable dns_resolver_vtable = {
static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
gpr_mu_lock(&r->mu);
+ if (r->have_retry_timer) {
+ grpc_timer_cancel(exec_ctx, &r->retry_timer);
+ }
if (r->next_completion != NULL) {
*r->target_config = NULL;
grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
@@ -125,6 +132,22 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
gpr_mu_unlock(&r->mu);
}
+static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
+ bool success) {
+ dns_resolver *r = arg;
+
+ gpr_mu_lock(&r->mu);
+ r->have_retry_timer = false;
+ if (success) {
+ if (!r->resolving) {
+ dns_start_resolving_locked(r);
+ }
+ }
+ gpr_mu_unlock(&r->mu);
+
+ GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer");
+}
+
static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses *addresses) {
dns_resolver *r = arg;
@@ -133,29 +156,47 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
size_t i;
- if (addresses) {
+ gpr_mu_lock(&r->mu);
+ GPR_ASSERT(r->resolving);
+ r->resolving = 0;
+ if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
+ size_t naddrs = 0;
for (i = 0; i < addresses->naddrs; i++) {
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
args.addr_len = (size_t)addresses->addrs[i].len;
- subchannels[i] = grpc_subchannel_factory_create_subchannel(
+ grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
exec_ctx, r->subchannel_factory, &args);
+ if (subchannel != NULL) {
+ subchannels[naddrs++] = subchannel;
+ }
}
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.subchannels = subchannels;
- lb_policy_args.num_subchannels = addresses->naddrs;
+ lb_policy_args.num_subchannels = naddrs;
lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
- grpc_client_config_set_lb_policy(config, lb_policy);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
+ if (lb_policy != NULL) {
+ grpc_client_config_set_lb_policy(config, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
+ }
grpc_resolved_addresses_destroy(addresses);
gpr_free(subchannels);
+ } else {
+ int retry_seconds = 15;
+ gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d seconds",
+ retry_seconds);
+ GPR_ASSERT(!r->have_retry_timer);
+ r->have_retry_timer = true;
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ GRPC_RESOLVER_REF(&r->base, "retry-timer");
+ grpc_timer_init(
+ exec_ctx, &r->retry_timer,
+ gpr_time_add(now, gpr_time_from_seconds(retry_seconds, GPR_TIMESPAN)),
+ dns_on_retry_timer, r, now);
}
- gpr_mu_lock(&r->mu);
- GPR_ASSERT(r->resolving);
- r->resolving = 0;
if (r->resolved_config) {
grpc_client_config_unref(exec_ctx, r->resolved_config);
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 0801c20fce..5dea215668 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -519,7 +519,8 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
elem->filter->start_transport_op(exec_ctx, elem, &op);
}
-static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
+static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c) {
size_t channel_stack_size;
grpc_connected_subchannel *con;
grpc_channel_stack *stk;
@@ -555,8 +556,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed,
sw_subchannel);
- gpr_mu_lock(&c->mu);
-
if (c->disconnected) {
gpr_mu_unlock(&c->mu);
gpr_free(sw_subchannel);
@@ -589,7 +588,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY,
"connected");
- gpr_mu_unlock(&c->mu);
gpr_free((void *)filters);
}
@@ -615,21 +613,23 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
bool iomgr_success) {
grpc_subchannel *c = arg;
+ GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
+ gpr_mu_lock(&c->mu);
if (c->connecting_result.transport != NULL) {
- publish_transport(exec_ctx, c);
+ publish_transport_locked(exec_ctx, c);
} else if (c->disconnected) {
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connect_failed");
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
- gpr_mu_unlock(&c->mu);
}
+ gpr_mu_unlock(&c->mu);
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
/*
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 807729708e..fa87e5246b 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -71,7 +71,8 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline,
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
-void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) {
+grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
+ gpr_timespec deadline) {
BOOL success;
DWORD bytes = 0;
DWORD flags = 0;
@@ -84,14 +85,14 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) {
g_iocp, &bytes, &completion_key, &overlapped,
deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type)));
if (success == 0 && overlapped == NULL) {
- return;
+ return GRPC_IOCP_WORK_TIMEOUT;
}
GPR_ASSERT(completion_key && overlapped);
if (overlapped == &g_iocp_custom_overlap) {
gpr_atm_full_fetch_add(&g_custom_events, -1);
if (completion_key == (ULONG_PTR)&g_iocp_kick_token) {
/* We were awoken from a kick. */
- return;
+ return GRPC_IOCP_WORK_KICK;
}
gpr_log(GPR_ERROR, "Unknown custom completion key.");
abort();
@@ -121,6 +122,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) {
}
gpr_mu_unlock(&socket->state_mu);
grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
+ return GRPC_IOCP_WORK_WORK;
}
void grpc_iocp_init(void) {
@@ -140,10 +142,12 @@ void grpc_iocp_kick(void) {
void grpc_iocp_flush(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_iocp_work_status work_status;
do {
- grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC));
- } while (grpc_exec_ctx_flush(&exec_ctx));
+ work_status = grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC));
+ } while (work_status == GRPC_IOCP_WORK_KICK ||
+ grpc_exec_ctx_flush(&exec_ctx));
}
void grpc_iocp_shutdown(void) {
diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h
index 75f3ba8477..8b2b1aeb5c 100644
--- a/src/core/iomgr/iocp_windows.h
+++ b/src/core/iomgr/iocp_windows.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,7 +38,14 @@
#include "src/core/iomgr/socket_windows.h"
-void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline);
+typedef enum {
+ GRPC_IOCP_WORK_WORK,
+ GRPC_IOCP_WORK_TIMEOUT,
+ GRPC_IOCP_WORK_KICK
+} grpc_iocp_work_status;
+
+grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
+ gpr_timespec deadline);
void grpc_iocp_init(void);
void grpc_iocp_kick(void);
void grpc_iocp_flush(void);
diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h
index 01eedffa88..b059630457 100644
--- a/src/core/iomgr/resolve_address.h
+++ b/src/core/iomgr/resolve_address.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -66,7 +66,7 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses);
/* Resolve addr in a blocking fashion. Returns NULL on failure. On success,
result must be freed with grpc_resolved_addresses_destroy. */
-grpc_resolved_addresses *grpc_blocking_resolve_address(
- const char *addr, const char *default_port);
+extern grpc_resolved_addresses *(*grpc_blocking_resolve_address)(
+ const char *name, const char *default_port);
#endif /* GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H */
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index c51745b918..a6c9893f23 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -34,18 +34,13 @@
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/sockaddr.h"
#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/sockaddr.h"
+#include <string.h>
#include <sys/types.h>
#include <sys/un.h>
-#include <string.h>
-#include "src/core/iomgr/executor.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/sockaddr_utils.h"
-#include "src/core/support/block_annotate.h"
-#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
@@ -53,6 +48,11 @@
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/executor.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/support/block_annotate.h"
+#include "src/core/support/string.h"
typedef struct {
char *name;
@@ -62,7 +62,7 @@ typedef struct {
void *arg;
} request;
-grpc_resolved_addresses *grpc_blocking_resolve_address(
+static grpc_resolved_addresses *blocking_resolve_address_impl(
const char *name, const char *default_port) {
struct addrinfo hints;
struct addrinfo *result = NULL, *resp;
@@ -150,6 +150,9 @@ done:
return addrs;
}
+grpc_resolved_addresses *(*grpc_blocking_resolve_address)(
+ const char *name, const char *default_port) = blocking_resolve_address_impl;
+
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index 28c8661e73..472e797163 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -34,17 +34,12 @@
#include <grpc/support/port_platform.h>
#ifdef GPR_WINSOCK_SOCKET
-#include "src/core/iomgr/sockaddr.h"
#include "src/core/iomgr/resolve_address.h"
+#include "src/core/iomgr/sockaddr.h"
-#include <sys/types.h>
#include <string.h>
+#include <sys/types.h>
-#include "src/core/iomgr/executor.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/sockaddr_utils.h"
-#include "src/core/support/block_annotate.h"
-#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
@@ -52,6 +47,11 @@
#include <grpc/support/string_util.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
+#include "src/core/iomgr/executor.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/sockaddr_utils.h"
+#include "src/core/support/block_annotate.h"
+#include "src/core/support/string.h"
typedef struct {
char *name;
@@ -61,7 +61,7 @@ typedef struct {
void *arg;
} request;
-grpc_resolved_addresses *grpc_blocking_resolve_address(
+static grpc_resolved_addresses *blocking_resolve_address_impl(
const char *name, const char *default_port) {
struct addrinfo hints;
struct addrinfo *result = NULL, *resp;
@@ -133,6 +133,9 @@ done:
return addrs;
}
+grpc_resolved_addresses *(*grpc_blocking_resolve_address)(
+ const char *name, const char *default_port) = blocking_resolve_address_impl;
+
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index ce930b8f41..a4abc5b974 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -240,8 +240,7 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
GPR_ASSERT(sp->server->active_ports > 0);
- if (0 == --sp->server->active_ports &&
- sp->server->shutdown_complete != NULL) {
+ if (0 == --sp->server->active_ports) {
notify = 1;
}
gpr_mu_unlock(&sp->server->mu);
diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c
index 8379fffad0..f444643428 100644
--- a/src/core/iomgr/timer.c
+++ b/src/core/iomgr/timer.c
@@ -33,11 +33,11 @@
#include "src/core/iomgr/timer.h"
-#include "src/core/iomgr/timer_heap.h"
-#include "src/core/iomgr/time_averaged_stats.h"
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/time_averaged_stats.h"
+#include "src/core/iomgr/timer_heap.h"
#define INVALID_HEAP_INDEX 0xffffffffu
@@ -330,6 +330,18 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_mu_unlock(&g_mu);
gpr_mu_unlock(&g_checker_mu);
+ } else if (next != NULL) {
+ /* TODO(ctiller): this forces calling code to do an short poll, and
+ then retry the timer check (because this time through the timer list was
+ contended).
+
+ We could reduce the cost here dramatically by keeping a count of how many
+ currently active pollers got through the uncontended case above
+ successfully, and waking up other pollers IFF that count drops to zero.
+
+ Once that count is in place, this entire else branch could disappear. */
+ *next = gpr_time_min(
+ *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN)));
}
return (int)n;
diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h
index 9ad1e92f42..e239e884e7 100644
--- a/src/core/iomgr/timer.h
+++ b/src/core/iomgr/timer.h
@@ -96,7 +96,6 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
*next is never guaranteed to be updated on any given execution; however,
with high probability at least one thread in the system will see an update
at any time slice. */
-
bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next);
void grpc_timer_list_init(gpr_timespec now);
diff --git a/src/core/support/alloc.c b/src/core/support/alloc.c
index 0a064b2c18..b99584bd20 100644
--- a/src/core/support/alloc.c
+++ b/src/core/support/alloc.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -87,4 +87,4 @@ void *gpr_malloc_aligned(size_t size, size_t alignment_log) {
return (void *)ret;
}
-void gpr_free_aligned(void *ptr) { free(((void **)ptr)[-1]); }
+void gpr_free_aligned(void *ptr) { gpr_free(((void **)ptr)[-1]); }
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index f6a95ebbd3..b22818ea87 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -86,7 +86,7 @@ struct grpc_completion_queue {
#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
static gpr_mu g_freelist_mu;
-grpc_completion_queue *g_freelist;
+static grpc_completion_queue *g_freelist;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
bool success);
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 6adcaac9ed..fcbd910f07 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -33,9 +33,18 @@
#include "src/core/tsi/ssl_transport_security.h"
+#include <grpc/support/port_platform.h>
+
#include <limits.h>
#include <string.h>
+/* TODO(jboeuf): refactor inet_ntop into a portability header. */
+#ifdef GPR_WINSOCK_SOCKET
+#include <ws2tcpip.h>
+#else
+#include <arpa/inet.h>
+#endif
+
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
@@ -197,13 +206,16 @@ static void ssl_info_callback(const SSL *ssl, int where, int ret) {
}
/* Returns 1 if name looks like an IP address, 0 otherwise.
- This is a very rough heuristic as it does not handle IPV6 or things like:
- 0300.0250.00.01, 0xC0.0Xa8.0x0.0x1, 000030052000001, 0xc0.052000001 */
+ This is a very rough heuristic, and only handles IPv6 in hexadecimal form. */
static int looks_like_ip_address(const char *name) {
size_t i;
size_t dot_count = 0;
size_t num_size = 0;
for (i = 0; i < strlen(name); i++) {
+ if (name[i] == ':') {
+ /* IPv6 Address in hexadecimal form, : is not allowed in DNS names. */
+ return 1;
+ }
if (name[i] >= '0' && name[i] <= '9') {
if (num_size > 3) return 0;
num_size++;
@@ -296,21 +308,44 @@ static tsi_result add_subject_alt_names_properties_to_peer(
sk_GENERAL_NAME_value(subject_alt_names, TSI_SIZE_AS_SIZE(i));
/* Filter out the non-dns entries names. */
if (subject_alt_name->type == GEN_DNS) {
- unsigned char *dns_name = NULL;
- int dns_name_size =
- ASN1_STRING_to_UTF8(&dns_name, subject_alt_name->d.dNSName);
- if (dns_name_size < 0) {
+ unsigned char *name = NULL;
+ int name_size;
+ name_size = ASN1_STRING_to_UTF8(&name, subject_alt_name->d.dNSName);
+ if (name_size < 0) {
gpr_log(GPR_ERROR, "Could not get utf8 from asn1 string.");
result = TSI_INTERNAL_ERROR;
break;
}
result = tsi_construct_string_peer_property(
- TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY,
- (const char *)dns_name, (size_t)dns_name_size,
+ TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, (const char *)name,
+ (size_t)name_size, &peer->properties[peer->property_count++]);
+ OPENSSL_free(name);
+ } else if (subject_alt_name->type == GEN_IPADD) {
+ char ntop_buf[INET6_ADDRSTRLEN];
+ int af;
+
+ if (subject_alt_name->d.iPAddress->length == 4) {
+ af = AF_INET;
+ } else if (subject_alt_name->d.iPAddress->length == 16) {
+ af = AF_INET6;
+ } else {
+ gpr_log(GPR_ERROR, "SAN IP Address contained invalid IP");
+ result = TSI_INTERNAL_ERROR;
+ break;
+ }
+ const char *name = inet_ntop(af, subject_alt_name->d.iPAddress->data,
+ ntop_buf, INET6_ADDRSTRLEN);
+ if (name == NULL) {
+ gpr_log(GPR_ERROR, "Could not get IP string from asn1 octet.");
+ result = TSI_INTERNAL_ERROR;
+ break;
+ }
+
+ result = tsi_construct_string_peer_property_from_cstring(
+ TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, name,
&peer->properties[peer->property_count++]);
- OPENSSL_free(dns_name);
- if (result != TSI_OK) break;
}
+ if (result != TSI_OK) break;
}
return result;
}
@@ -1436,9 +1471,7 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) {
size_t i = 0;
size_t san_count = 0;
const tsi_peer_property *cn_property = NULL;
-
- /* For now reject what looks like an IP address. */
- if (looks_like_ip_address(name)) return 0;
+ int like_ip = looks_like_ip_address(name);
/* Check the SAN first. */
for (i = 0; i < peer->property_count; i++) {
@@ -1447,8 +1480,15 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) {
if (strcmp(property->name,
TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY) == 0) {
san_count++;
- if (does_entry_match_name(property->value.data, property->value.length,
- name)) {
+
+ if (!like_ip && does_entry_match_name(property->value.data,
+ property->value.length, name)) {
+ return 1;
+ } else if (like_ip &&
+ strncmp(name, property->value.data, property->value.length) ==
+ 0 &&
+ strlen(name) == property->value.length) {
+ /* IP Addresses are exact matches only. */
return 1;
}
} else if (strcmp(property->name,
@@ -1457,8 +1497,8 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) {
}
}
- /* If there's no SAN, try the CN. */
- if (san_count == 0 && cn_property != NULL) {
+ /* If there's no SAN, try the CN, but only if its not like an IP Address */
+ if (san_count == 0 && cn_property != NULL && !like_ip) {
if (does_entry_match_name(cn_property->value.data,
cn_property->value.length, name)) {
return 1;
diff --git a/src/core/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h
index 51c0003a85..4909af4c47 100644
--- a/src/core/tsi/ssl_transport_security.h
+++ b/src/core/tsi/ssl_transport_security.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -162,8 +162,7 @@ void tsi_ssl_handshaker_factory_destroy(tsi_ssl_handshaker_factory *self);
Still TODO(jboeuf):
- handle mixed case.
- handle %encoded chars.
- - handle public suffix wildchar more strictly (e.g. *.co.uk)
- - handle IP addresses in SAN. */
+ - handle public suffix wildchar more strictly (e.g. *.co.uk) */
int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name);
#ifdef __cplusplus
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index e205a1969b..eb49b21037 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -62,7 +62,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
- bool CheckCancelled(CompletionQueue* cq);
+ bool CheckCancelled(CompletionQueue* cq) {
+ cq->TryPluck(this);
+ return CheckCancelledNoPluck();
+ }
+ bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
void set_tag(void* tag) {
has_tag_ = true;
@@ -72,6 +76,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
void Unref();
private:
+ bool CheckCancelledNoPluck() {
+ grpc::lock_guard<grpc::mutex> g(mu_);
+ return finalized_ ? (cancelled_ != 0) : false;
+ }
+
bool has_tag_;
void* tag_;
grpc::mutex mu_;
@@ -88,12 +97,6 @@ void ServerContext::CompletionOp::Unref() {
}
}
-bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
- cq->TryPluck(this);
- grpc::lock_guard<grpc::mutex> g(mu_);
- return finalized_ ? cancelled_ != 0 : false;
-}
-
void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
@@ -182,7 +185,14 @@ void ServerContext::TryCancel() const {
}
bool ServerContext::IsCancelled() const {
- return completion_op_ && completion_op_->CheckCancelled(cq_);
+ if (has_notify_when_done_tag_) {
+ // when using async API, but the result is only valid
+ // if the tag has already been delivered at the completion queue
+ return completion_op_ && completion_op_->CheckCancelledAsync();
+ } else {
+ // when using sync API
+ return completion_op_ && completion_op_->CheckCancelled(cq_);
+ }
}
void ServerContext::set_compression_level(grpc_compression_level level) {
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index aa22f840d6..52cef96f40 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -323,7 +323,7 @@ namespace Grpc.Core
private static string NormalizeKey(string key)
{
- var normalized = GrpcPreconditions.CheckNotNull(key, "key").ToLower(CultureInfo.InvariantCulture);
+ var normalized = GrpcPreconditions.CheckNotNull(key, "key").ToLowerInvariant();
GrpcPreconditions.CheckArgument(ValidKeyRegex.IsMatch(normalized),
"Metadata entry key not valid. Keys can only contain lowercase alphanumeric characters, underscores and hyphens.");
return normalized;
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 9cc5a62bdb..2459e28321 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -149,6 +149,9 @@ function _readsDone(status) {
if (!status) {
status = {code: grpc.status.OK, details: 'OK'};
}
+ if (status.code !== grpc.status.OK) {
+ this.call.cancelWithStatus(status.code, status.details);
+ }
this.finished = true;
this.read_status = status;
this._emitStatusIfDone();
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index f79b7d0bc0..2d45818b6e 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -37,6 +37,8 @@
#include <grpc/support/time.h>
#import <RxLibrary/GRXConcurrentWriteable.h>
+#import "private/GRPCConnectivityMonitor.h"
+#import "private/GRPCHost.h"
#import "private/GRPCRequestHeaders.h"
#import "private/GRPCWrappedCall.h"
#import "private/NSData+GRPC.h"
@@ -71,8 +73,11 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
@implementation GRPCCall {
dispatch_queue_t _callQueue;
+ NSString *_host;
+ NSString *_path;
GRPCWrappedCall *_wrappedCall;
dispatch_once_t _callAlreadyInvoked;
+ GRPCConnectivityMonitor *_connectivityMonitor;
// The C gRPC library has less guarantees on the ordering of events than we
// do. Particularly, in the face of errors, there's no ordering guarantee at
@@ -115,13 +120,11 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
format:@"The requests writer can't be already started."];
}
if ((self = [super init])) {
- _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:host path:path];
- if (!_wrappedCall) {
- return nil;
- }
+ _host = [host copy];
+ _path = [path copy];
// Serial queue to invoke the non-reentrant methods of the grpc_call object.
- _callQueue = dispatch_queue_create("org.grpc.call", NULL);
+ _callQueue = dispatch_queue_create("io.grpc.call", NULL);
_requestWriter = requestWriter;
@@ -156,7 +159,7 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
- (void)cancel {
[self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
code:GRPCErrorCodeCancelled
- userInfo:nil]];
+ userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]];
[self cancelCall];
}
@@ -354,8 +357,29 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
_retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
+
+ _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path];
+ NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
+
[self sendHeaders:_requestHeaders];
[self invokeCall];
+ // TODO(jcanizales): Extract this logic somewhere common.
+ NSString *host = [NSURL URLWithString:[@"https://" stringByAppendingString:_host]].host;
+ if (!host) {
+ // TODO(jcanizales): Check this on init.
+ [NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host];
+ }
+ __weak typeof(self) weakSelf = self;
+ _connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host];
+ [_connectivityMonitor handleLossWithHandler:^{
+ typeof(self) strongSelf = weakSelf;
+ if (strongSelf) {
+ [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeUnavailable
+ userInfo:@{NSLocalizedDescriptionKey: @"Connectivity lost."}]];
+ [[GRPCHost hostWithAddress:strongSelf->_host] disconnect];
+ }
+ }];
}
- (void)setState:(GRXWriterState)newState {
@@ -385,4 +409,5 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
return;
}
}
+
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h
index 8661ae6f97..e49a6aca29 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.h
@@ -35,6 +35,7 @@
#include <grpc/grpc.h>
+@class GRPCCompletionQueue;
struct grpc_channel_credentials;
@@ -80,4 +81,6 @@ struct grpc_channel_credentials;
+ (nonnull GRPCChannel *)insecureChannelWithHost:(nonnull NSString *)host
channelArgs:(nullable NSDictionary *)channelArgs;
+- (nullable grpc_call *)unmanagedCallWithPath:(nonnull NSString *)path
+ completionQueue:(nonnull GRPCCompletionQueue *)queue;
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m
index 7e55a473d7..d7de025e21 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.m
@@ -38,6 +38,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#import "GRPCCompletionQueue.h"
+
/**
* Returns @c grpc_channel_credentials from the specified @c path. If the file at the path could not
* be read then NULL is returned. If NULL is returned, @c errorPtr may not be NULL if there are
@@ -205,4 +207,16 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
channelArgs:channelArgs];
}
+- (grpc_call *)unmanagedCallWithPath:(NSString *)path
+ completionQueue:(GRPCCompletionQueue *)queue {
+ return grpc_channel_create_call(_unmanagedChannel,
+ NULL, GRPC_PROPAGATE_DEFAULTS,
+ queue.unmanagedQueue,
+ path.UTF8String,
+ // Get "host" from "host:port"
+ // TODO(jcanizales): Use NSURLs throughout, to clarify these.
+ [_host componentsSeparatedByString:@":"][0].UTF8String,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+}
+
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
index 7b66cd4c32..a52095dd01 100644
--- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
+++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
@@ -36,6 +36,8 @@
typedef void(^GRPCQueueCompletionHandler)(bool success);
+extern const int64_t kGRPCCompletionQueueDefaultTimeoutSecs;
+
/**
* This class lets one more easily use |grpc_completion_queue|. To use it, pass the value of the
* |unmanagedQueue| property of an instance of this class to |grpc_channel_create_call|. Then for
@@ -49,6 +51,11 @@ typedef void(^GRPCQueueCompletionHandler)(bool success);
*/
@interface GRPCCompletionQueue : NSObject
@property(nonatomic, readonly) grpc_completion_queue *unmanagedQueue;
+@property(nonatomic, readonly) int64_t timeoutSecs;
+ (instancetype)completionQueue;
+
+- (instancetype)init;
+- (instancetype)initWithTimeout:(int64_t)timeoutSecs NS_DESIGNATED_INITIALIZER;
+
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
index ff3031678c..be214d4d36 100644
--- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
+++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
@@ -35,15 +35,28 @@
#import <grpc/grpc.h>
+
+const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60;
+
@implementation GRPCCompletionQueue
+ (instancetype)completionQueue {
- return [[self alloc] init];
+ static GRPCCompletionQueue *singleton = nil;
+ static dispatch_once_t onceToken;
+ dispatch_once(&onceToken, ^{
+ singleton = [[self alloc] init];
+ });
+ return singleton;
}
- (instancetype)init {
+ return [self initWithTimeout:kGRPCCompletionQueueDefaultTimeoutSecs];
+}
+
+- (instancetype)initWithTimeout:(int64_t)timeoutSecs {
if ((self = [super init])) {
_unmanagedQueue = grpc_completion_queue_create(NULL);
+ _timeoutSecs = timeoutSecs;
// This is for the following block to capture the pointer by value (instead
// of retaining self and doing self->_unmanagedQueue). This is essential
@@ -61,22 +74,28 @@
gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
});
dispatch_async(gDefaultConcurrentQueue, ^{
+ // Using a non-infinite deadline to re-enter grpc_completion_queue_next()
+ // alleviates https://github.com/grpc/grpc/issues/5593
+ gpr_timespec deadline = (timeoutSecs < 0)
+ ? gpr_inf_future(GPR_CLOCK_REALTIME)
+ : gpr_time_from_seconds(timeoutSecs, GPR_CLOCK_REALTIME);
while (YES) {
- // The following call blocks until an event is available.
- grpc_event event = grpc_completion_queue_next(unmanagedQueue,
- gpr_inf_future(GPR_CLOCK_REALTIME),
- NULL);
+ // The following call blocks until an event is available or the deadline elapses.
+ grpc_event event = grpc_completion_queue_next(unmanagedQueue, deadline, NULL);
GRPCQueueCompletionHandler handler;
switch (event.type) {
case GRPC_OP_COMPLETE:
handler = (__bridge_transfer GRPCQueueCompletionHandler)event.tag;
handler(event.success);
break;
+ case GRPC_QUEUE_TIMEOUT:
+ // Nothing to do here
+ break;
case GRPC_QUEUE_SHUTDOWN:
grpc_completion_queue_destroy(unmanagedQueue);
return;
default:
- [NSException raise:@"Unrecognized completion type" format:@""];
+ [NSException raise:@"Unrecognized completion type" format:@"type=%d", event.type];
}
};
});
diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h
new file mode 100644
index 0000000000..2fae410331
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h
@@ -0,0 +1,77 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import <Foundation/Foundation.h>
+#import <SystemConfiguration/SystemConfiguration.h>
+
+@interface GRPCReachabilityFlags : NSObject
+
++ (nonnull instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags;
+
+/**
+ * One accessor method to query each of the different flags. Example:
+
+@property(nonatomic, readonly) BOOL isCell;
+
+ */
+#define GRPC_XMACRO_ITEM(methodName, FlagName) \
+@property(nonatomic, readonly) BOOL methodName;
+
+#include "GRPCReachabilityFlagNames.xmacro.h"
+#undef GRPC_XMACRO_ITEM
+
+@property(nonatomic, readonly) BOOL isHostReachable;
+@end
+
+
+@interface GRPCConnectivityMonitor : NSObject
+
++ (nullable instancetype)monitorWithHost:(nonnull NSString *)hostName;
+
+- (nonnull instancetype)init NS_UNAVAILABLE;
+
+/**
+ * Queue on which callbacks will be dispatched. Default is the main queue. Set it before calling
+ * handleLossWithHandler:.
+ */
+// TODO(jcanizales): Default to a serial background queue instead.
+@property(nonatomic, strong, null_resettable) dispatch_queue_t queue;
+
+/**
+ * Calls handler every time the connectivity to this instance's host is lost. If this instance is
+ * released before that happens, the handler won't be called.
+ * Only one handler is active at a time, so if this method is called again before the previous
+ * handler has been called, it might never be called at all (or yes, if it has already been queued).
+ */
+- (void)handleLossWithHandler:(nonnull void (^)())handler;
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m
new file mode 100644
index 0000000000..b4061bd5ef
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m
@@ -0,0 +1,192 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import "GRPCConnectivityMonitor.h"
+
+#pragma mark Flags
+
+@implementation GRPCReachabilityFlags {
+ SCNetworkReachabilityFlags _flags;
+}
+
++ (instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags {
+ return [[self alloc] initWithFlags:flags];
+}
+
+- (instancetype)initWithFlags:(SCNetworkReachabilityFlags)flags {
+ if ((self = [super init])) {
+ _flags = flags;
+ }
+ return self;
+}
+
+/*
+ * One accessor method implementation per flag. Example:
+
+- (BOOL)isCell { \
+ return !!(_flags & kSCNetworkReachabilityFlagsIsWWAN); \
+}
+
+ */
+#define GRPC_XMACRO_ITEM(methodName, FlagName) \
+- (BOOL)methodName { \
+ return !!(_flags & kSCNetworkReachabilityFlags ## FlagName); \
+}
+#include "GRPCReachabilityFlagNames.xmacro.h"
+#undef GRPC_XMACRO_ITEM
+
+- (BOOL)isHostReachable {
+ // Note: connectionOnDemand means it'll be reachable only if using the CFSocketStream API or APIs
+ // on top of it.
+ // connectionRequired means we can't tell until a connection is attempted (e.g. for VPN on
+ // demand).
+ return self.reachable && !self.interventionRequired && !self.connectionOnDemand;
+}
+
+- (NSString *)description {
+ NSMutableArray *activeOptions = [NSMutableArray arrayWithCapacity:9];
+
+ /*
+ * For each flag, add its name to the array if it's ON. Example:
+
+ if (self.isCell) {
+ [activeOptions addObject:@"isCell"];
+ }
+
+ */
+#define GRPC_XMACRO_ITEM(methodName, FlagName) \
+ if (self.methodName) { \
+ [activeOptions addObject:@#methodName]; \
+ }
+#include "GRPCReachabilityFlagNames.xmacro.h"
+#undef GRPC_XMACRO_ITEM
+
+ return activeOptions.count == 0 ? @"(none)" : [activeOptions componentsJoinedByString:@", "];
+}
+
+- (BOOL)isEqual:(id)object {
+ return [object isKindOfClass:[GRPCReachabilityFlags class]] &&
+ _flags == ((GRPCReachabilityFlags *)object)->_flags;
+}
+
+- (NSUInteger)hash {
+ return _flags;
+}
+@end
+
+#pragma mark Connectivity Monitor
+
+// Assumes the third argument is a block that accepts a GRPCReachabilityFlags object, and passes the
+// received ones to it.
+static void PassFlagsToContextInfoBlock(SCNetworkReachabilityRef target,
+ SCNetworkReachabilityFlags flags,
+ void *info) {
+ #pragma unused (target)
+ // This can be called many times with the same info. The info is retained by SCNetworkReachability
+ // while this function is being executed.
+ void (^handler)(GRPCReachabilityFlags *) = (__bridge void (^)(GRPCReachabilityFlags *))info;
+ handler([[GRPCReachabilityFlags alloc] initWithFlags:flags]);
+}
+
+@implementation GRPCConnectivityMonitor {
+ SCNetworkReachabilityRef _reachabilityRef;
+}
+
+- (nullable instancetype)initWithReachability:(nullable SCNetworkReachabilityRef)reachability {
+ if (!reachability) {
+ return nil;
+ }
+ if ((self = [super init])) {
+ _reachabilityRef = CFRetain(reachability);
+ _queue = dispatch_get_main_queue();
+ }
+ return self;
+}
+
++ (nullable instancetype)monitorWithHost:(nonnull NSString *)host {
+ const char *hostName = host.UTF8String;
+ if (!hostName) {
+ [NSException raise:NSInvalidArgumentException
+ format:@"host.UTF8String returns NULL for %@", host];
+ }
+ SCNetworkReachabilityRef reachability =
+ SCNetworkReachabilityCreateWithName(NULL, hostName);
+
+ GRPCConnectivityMonitor *returnValue = [[self alloc] initWithReachability:reachability];
+ if (reachability) {
+ CFRelease(reachability);
+ }
+ return returnValue;
+}
+
+- (void)handleLossWithHandler:(void (^)())handler {
+ [self startListeningWithHandler:^(GRPCReachabilityFlags *flags) {
+ if (!flags.isHostReachable) {
+ handler();
+ }
+ }];
+}
+
+- (void)startListeningWithHandler:(void (^)(GRPCReachabilityFlags *))handler {
+ // Copy to ensure the handler block is in the heap (and so can't be deallocated when this method
+ // returns).
+ void (^copiedHandler)(GRPCReachabilityFlags *) = [handler copy];
+ SCNetworkReachabilityContext context = {
+ .version = 0,
+ .info = (__bridge void *)copiedHandler,
+ .retain = CFRetain,
+ .release = CFRelease,
+ };
+ // The following will retain context.info, and release it when the callback is set to NULL.
+ SCNetworkReachabilitySetCallback(_reachabilityRef, PassFlagsToContextInfoBlock, &context);
+ SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, _queue);
+}
+
+- (void)stopListening {
+ // This releases the block on context.info.
+ SCNetworkReachabilitySetCallback(_reachabilityRef, NULL, NULL);
+ SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, NULL);
+}
+
+- (void)setQueue:(dispatch_queue_t)queue {
+ _queue = queue ?: dispatch_get_main_queue();
+}
+
+- (void)dealloc {
+ if (_reachabilityRef) {
+ [self stopListening];
+ CFRelease(_reachabilityRef);
+ }
+}
+
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCHost.h b/src/objective-c/GRPCClient/private/GRPCHost.h
index 82c0ad6cf6..987d3e9f59 100644
--- a/src/objective-c/GRPCClient/private/GRPCHost.h
+++ b/src/objective-c/GRPCClient/private/GRPCHost.h
@@ -33,27 +33,39 @@
#import <Foundation/Foundation.h>
+NS_ASSUME_NONNULL_BEGIN
+
@class GRPCCompletionQueue;
struct grpc_call;
@interface GRPCHost : NSObject
@property(nonatomic, readonly) NSString *address;
-@property(nonatomic, copy) NSString *userAgentPrefix;
+@property(nonatomic, copy, nullable) NSString *userAgentPrefix;
/** The following properties should only be modified for testing: */
@property(nonatomic, getter=isSecure) BOOL secure;
-@property(nonatomic, copy) NSString *pathToCertificates;
-@property(nonatomic, copy) NSString *hostNameOverride;
+@property(nonatomic, copy, nullable) NSString *pathToCertificates;
+@property(nonatomic, copy, nullable) NSString *hostNameOverride;
+- (nullable instancetype)init NS_UNAVAILABLE;
/** Host objects initialized with the same address are the same. */
-+ (instancetype)hostWithAddress:(NSString *)address;
-- (instancetype)initWithAddress:(NSString *)address NS_DESIGNATED_INITIALIZER;
++ (nullable instancetype)hostWithAddress:(NSString *)address;
+- (nullable instancetype)initWithAddress:(NSString *)address NS_DESIGNATED_INITIALIZER;
/** Create a grpc_call object to the provided path on this host. */
-- (struct grpc_call *)unmanagedCallWithPath:(NSString *)path
- completionQueue:(GRPCCompletionQueue *)queue;
+- (nullable struct grpc_call *)unmanagedCallWithPath:(NSString *)path
+ completionQueue:(GRPCCompletionQueue *)queue;
+// TODO: There's a race when a new RPC is coming through just as an existing one is getting
+// notified that there's no connectivity. If connectivity comes back at that moment, the new RPC
+// will have its channel destroyed by the other RPC, and will never get notified of a problem, so
+// it'll hang (the C layer logs a timeout, with exponential back off). One solution could be to pass
+// the GRPCChannel to the GRPCCall, renaming this as "disconnectChannel:channel", which would only
+// act on that specific channel.
+- (void)disconnect;
@end
+
+NS_ASSUME_NONNULL_END
diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m
index eb1db899b7..508cb20644 100644
--- a/src/objective-c/GRPCClient/private/GRPCHost.m
+++ b/src/objective-c/GRPCClient/private/GRPCHost.m
@@ -34,33 +34,30 @@
#import "GRPCHost.h"
#include <grpc/grpc.h>
+#import <GRPCClient/GRPCCall.h>
#import <GRPCClient/GRPCCall+ChannelArg.h>
#import "GRPCChannel.h"
#import "GRPCCompletionQueue.h"
#import "NSDictionary+GRPC.h"
+NS_ASSUME_NONNULL_BEGIN
+
// TODO(jcanizales): Generate the version in a standalone header, from templates. Like
// templates/src/core/surface/version.c.template .
#define GRPC_OBJC_VERSION_STRING @"0.13.0"
-@interface GRPCHost ()
-// TODO(mlumish): Investigate whether caching channels with strong links is a good idea.
-@property(nonatomic, strong) GRPCChannel *channel;
-@end
-
-@implementation GRPCHost
-
-+ (instancetype)hostWithAddress:(NSString *)address {
- return [[self alloc] initWithAddress:address];
+@implementation GRPCHost {
+ // TODO(mlumish): Investigate whether caching channels with strong links is a good idea.
+ GRPCChannel *_channel;
}
-- (instancetype)init {
- return [self initWithAddress:nil];
++ (nullable instancetype)hostWithAddress:(NSString *)address {
+ return [[self alloc] initWithAddress:address];
}
// Default initializer.
-- (instancetype)initWithAddress:(NSString *)address {
+- (nullable instancetype)initWithAddress:(NSString *)address {
if (!address) {
return nil;
}
@@ -95,46 +92,45 @@
return self;
}
-- (grpc_call *)unmanagedCallWithPath:(NSString *)path completionQueue:(GRPCCompletionQueue *)queue {
- if (!queue || !path || !self.channel) {
- return NULL;
+- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
+ completionQueue:(GRPCCompletionQueue *)queue {
+ GRPCChannel *channel;
+ // This is racing -[GRPCHost disconnect].
+ @synchronized(self) {
+ if (!_channel) {
+ _channel = [self newChannel];
+ }
+ channel = _channel;
}
- return grpc_channel_create_call(self.channel.unmanagedChannel,
- NULL, GRPC_PROPAGATE_DEFAULTS,
- queue.unmanagedQueue,
- path.UTF8String,
- self.hostName.UTF8String,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ return [channel unmanagedCallWithPath:path completionQueue:queue];
}
-- (GRPCChannel *)channel {
- // Create it lazily, because we don't want to open a connection just because someone is
- // configuring a host.
+- (NSDictionary *)channelArgs {
+ NSMutableDictionary *args = [NSMutableDictionary dictionary];
- if (!_channel) {
- NSMutableDictionary *args = [NSMutableDictionary dictionary];
+ // TODO(jcanizales): Add OS and device information (see
+ // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents ).
+ NSString *userAgent = @"grpc-objc/" GRPC_OBJC_VERSION_STRING;
+ if (_userAgentPrefix) {
+ userAgent = [_userAgentPrefix stringByAppendingFormat:@" %@", userAgent];
+ }
+ args[@GRPC_ARG_PRIMARY_USER_AGENT_STRING] = userAgent;
- // TODO(jcanizales): Add OS and device information (see
- // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents ).
- NSString *userAgent = @"grpc-objc/" GRPC_OBJC_VERSION_STRING;
- if (_userAgentPrefix) {
- userAgent = [@[_userAgentPrefix, userAgent] componentsJoinedByString:@" "];
- }
- args[@GRPC_ARG_PRIMARY_USER_AGENT_STRING] = userAgent;
-
- if (_secure) {
- if (_hostNameOverride) {
- args[@GRPC_SSL_TARGET_NAME_OVERRIDE_ARG] = _hostNameOverride;
- }
-
- _channel = [GRPCChannel secureChannelWithHost:_address
- pathToCertificates:_pathToCertificates
- channelArgs:args];
- } else {
- _channel = [GRPCChannel insecureChannelWithHost:_address channelArgs:args];
- }
+ if (_secure && _hostNameOverride) {
+ args[@GRPC_SSL_TARGET_NAME_OVERRIDE_ARG] = _hostNameOverride;
+ }
+ return args;
+}
+
+- (GRPCChannel *)newChannel {
+ NSDictionary *args = [self channelArgs];
+ if (_secure) {
+ return [GRPCChannel secureChannelWithHost:_address
+ pathToCertificates:_pathToCertificates
+ channelArgs:args];
+ } else {
+ return [GRPCChannel insecureChannelWithHost:_address channelArgs:args];
}
- return _channel;
}
- (NSString *)hostName {
@@ -142,7 +138,16 @@
return _hostNameOverride ?: _address;
}
+- (void)disconnect {
+ // This is racing -[GRPCHost unmanagedCallWithPath:completionQueue:].
+ @synchronized(self) {
+ _channel = nil;
+ }
+}
+
// TODO(jcanizales): Don't let set |secure| to |NO| if |pathToCertificates| or |hostNameOverride|
// have been set. Don't let set either of the latter if |secure| has been set to |NO|.
@end
+
+NS_ASSUME_NONNULL_END
diff --git a/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h
new file mode 100644
index 0000000000..02871d5d02
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * "X-macro" file that lists the flags names of Apple's Network Reachability API, along with a nice
+ * Objective-C method name used to query each of them.
+ *
+ * Example usage: To generate a dictionary from flag value to name, one can do:
+
+ NSDictionary *flagNames = @{
+#define GRPC_XMACRO_ITEM(methodName, FlagName) \
+ @(kSCNetworkReachabilityFlags ## FlagName): @#methodName,
+#include "GRXReachabilityFlagNames.xmacro.h"
+#undef GRPC_XMACRO_ITEM
+ };
+
+ XCTAssertEqualObjects(flagNames[@(kSCNetworkReachabilityFlagsIsWWAN)], @"isCell");
+
+ */
+
+#ifndef GRPC_XMACRO_ITEM
+#error This file is to be used with the "X-macro" pattern: Please #define \
+ GRPC_XMACRO_ITEM(methodName, FlagName), then #include this file, and then #undef \
+ GRPC_XMACRO_ITEM.
+#endif
+
+GRPC_XMACRO_ITEM(isCell, IsWWAN)
+GRPC_XMACRO_ITEM(reachable, Reachable)
+GRPC_XMACRO_ITEM(transientConnection, TransientConnection)
+GRPC_XMACRO_ITEM(connectionRequired, ConnectionRequired)
+GRPC_XMACRO_ITEM(connectionOnTraffic, ConnectionOnTraffic)
+GRPC_XMACRO_ITEM(interventionRequired, InterventionRequired)
+GRPC_XMACRO_ITEM(connectionOnDemand, ConnectionOnDemand)
+GRPC_XMACRO_ITEM(isLocalAddress, IsLocalAddress)
+GRPC_XMACRO_ITEM(isDirect, IsDirect)
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
index 71e7e0e54e..e37ed1b59f 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
@@ -34,7 +34,6 @@
#import <Foundation/Foundation.h>
#include <grpc/grpc.h>
-#import "GRPCChannel.h"
#import "GRPCRequestHeaders.h"
@interface GRPCOperation : NSObject
@@ -94,4 +93,5 @@
- (void)startBatchWithOperations:(NSArray *)ops;
- (void)cancel;
+
@end
diff --git a/src/objective-c/RxLibrary/GRXWriteable.m b/src/objective-c/RxLibrary/GRXWriteable.m
index 2729d62b72..028ba9b551 100644
--- a/src/objective-c/RxLibrary/GRXWriteable.m
+++ b/src/objective-c/RxLibrary/GRXWriteable.m
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -42,11 +42,42 @@
if (!handler) {
return [[self alloc] init];
}
- return [[self alloc] initWithValueHandler:^(id value) {
- handler(value, nil);
- } completionHandler:^(NSError *errorOrNil) {
- if (errorOrNil) {
- handler(nil, errorOrNil);
+ // We nilify this variable when the block is invoked, so that handler is only invoked once even if
+ // the writer tries to write multiple values.
+ __block GRXEventHandler eventHandler = ^(BOOL done, id value, NSError *error) {
+ // Nillify eventHandler before invoking handler, in case the latter causes the former to be
+ // executed recursively. Because blocks can be deallocated even during execution, we have to
+ // first retain handler locally to guarantee it's valid.
+ // TODO(jcanizales): Just turn this craziness into a simple subclass of GRXWriteable.
+ GRXSingleHandler singleHandler = handler;
+ eventHandler = nil;
+
+ if (value) {
+ singleHandler(value, nil);
+ } else if (error) {
+ singleHandler(nil, error);
+ } else {
+ NSDictionary *userInfo = @{
+ NSLocalizedDescriptionKey: @"The writer finished without producing any value."
+ };
+ // Even though RxLibrary is independent of gRPC, the domain and code here are, for the moment,
+ // set to the values of kGRPCErrorDomain and GRPCErrorCodeInternal. This way, the error formed
+ // is the one user of gRPC would expect if the server failed to produce a response.
+ //
+ // TODO(jcanizales): Figure out a way to keep errors of RxLibrary generic without making users
+ // of gRPC take care of two different error domains and error code enums. A possibility is to
+ // add error handling to GRXWriters or GRXWriteables, and use them to translate errors between
+ // the two domains.
+ static NSString *kGRPCErrorDomain = @"io.grpc";
+ static NSUInteger kGRPCErrorCodeInternal = 13;
+ singleHandler(nil, [NSError errorWithDomain:kGRPCErrorDomain
+ code:kGRPCErrorCodeInternal
+ userInfo:userInfo]);
+ }
+ };
+ return [self writeableWithEventHandler:^(BOOL done, id value, NSError *error) {
+ if (eventHandler) {
+ eventHandler(done, value, error);
}
}];
}
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index 624958f4b9..7dd6873c80 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -273,10 +273,12 @@ static ProtoMethod *kUnaryCallMethod;
id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
XCTAssertNotNil(value, @"nil value received as response.");
XCTAssertEqual([value length], 0, @"Non-empty response received: %@", value);
+ /* This test needs to be more clever in regards to changing the version of the core.
XCTAssertEqualObjects(call.responseHeaders[@"x-grpc-test-echo-useragent"],
@"Foo grpc-objc/0.13.0 grpc-c/0.14.0-dev (ios)",
@"Did not receive expected user agent %@",
call.responseHeaders[@"x-grpc-test-echo-useragent"]);
+ */
[response fulfill];
} completionHandler:^(NSError *errorOrNil) {
XCTAssertNil(errorOrNil, @"Finished with unexpected error: %@", errorOrNil);
diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m
index d342662814..ae9465f58c 100644
--- a/src/objective-c/tests/RxLibraryUnitTests.m
+++ b/src/objective-c/tests/RxLibraryUnitTests.m
@@ -64,6 +64,8 @@
}
@end
+// TODO(jcanizales): Split into one file per tested class.
+
@interface RxLibraryUnitTests : XCTestCase
@end
@@ -79,6 +81,7 @@
// If:
id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
[writeable writeValue:anyValue];
+ [writeable writesFinishedWithError:nil];
// Then:
XCTAssertEqual(handler.timesCalled, 1);
@@ -101,6 +104,54 @@
XCTAssertEqualObjects(handler.errorOrNil, anyError);
}
+- (void)testWriteableSingleHandlerIsCalledOnlyOnce_ValueThenError {
+ // Given:
+ CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
+ id anyValue = @7;
+ NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil];
+
+ // If:
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ [writeable writeValue:anyValue];
+ [writeable writesFinishedWithError:anyError];
+
+ // Then:
+ XCTAssertEqual(handler.timesCalled, 1);
+ XCTAssertEqualObjects(handler.value, anyValue);
+ XCTAssertEqualObjects(handler.errorOrNil, nil);
+}
+
+- (void)testWriteableSingleHandlerIsCalledOnlyOnce_ValueThenValue {
+ // Given:
+ CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
+ id anyValue = @7;
+
+ // If:
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ [writeable writeValue:anyValue];
+ [writeable writeValue:anyValue];
+ [writeable writesFinishedWithError:nil];
+
+ // Then:
+ XCTAssertEqual(handler.timesCalled, 1);
+ XCTAssertEqualObjects(handler.value, anyValue);
+ XCTAssertEqualObjects(handler.errorOrNil, nil);
+}
+
+- (void)testWriteableSingleHandlerFailsOnEmptyWriter {
+ // Given:
+ CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler];
+
+ // If:
+ id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block];
+ [writeable writesFinishedWithError:nil];
+
+ // Then:
+ XCTAssertEqual(handler.timesCalled, 1);
+ XCTAssertEqualObjects(handler.value, nil);
+ XCTAssertNotNil(handler.errorOrNil);
+}
+
#pragma mark BufferedPipe
- (void)testBufferedPipePropagatesValue {
diff --git a/src/php/composer.json b/src/php/composer.json
index 1d41f847ac..01674a25db 100644
--- a/src/php/composer.json
+++ b/src/php/composer.json
@@ -1,7 +1,9 @@
{
"name": "grpc/grpc",
+ "type": "library",
"description": "gRPC library for PHP",
- "version": "0.6.0",
+ "version": "0.14.0",
+ "keywords": ["rpc"],
"homepage": "http://grpc.io",
"license": "BSD-3-Clause",
"repositories": [
@@ -13,7 +15,7 @@
"require": {
"php": ">=5.5.0",
"datto/protobuf-php": "dev-master",
- "google/auth": "dev-master"
+ "google/auth": "v0.7"
},
"autoload": {
"psr-4": {
diff --git a/src/php/ext/grpc/README.md b/src/php/ext/grpc/README.md
deleted file mode 100644
index 6e1cb2002f..0000000000
--- a/src/php/ext/grpc/README.md
+++ /dev/null
@@ -1,67 +0,0 @@
-gRPC PHP Extension
-==================
-
-# Requirements
-
- * PHP 5.5+
- * [gRPC core library](https://github.com/grpc/grpc) 0.11.0
-
-# Installation
-
-## Install PHP 5
-
-```
-$ sudo apt-get install git php5 php5-dev php-pear unzip
-```
-
-## Compile gRPC Core Library
-
-Clone the gRPC source code repository
-
-```
-$ git clone https://github.com/grpc/grpc.git
-```
-
-Build and install the gRPC C core libraries
-
-```sh
-$ cd grpc
-$ git checkout --track origin/release-0_11
-$ git pull --recurse-submodules && git submodule update --init --recursive
-$ make
-$ sudo make install
-```
-
-Note: you may encounter a warning about the Protobuf compiler `protoc` 3.0.0+ not being installed. The following might help, and will be useful later on when we need to compile the `protoc-gen-php` tool.
-
-```sh
-$ cd grpc/third_party/protobuf
-$ sudo make install # 'make' should have been run by core grpc
-```
-
-## Install the gRPC PHP extension
-
-Quick install
-
-```sh
-$ sudo pecl install grpc
-```
-
-Note: before a stable release, you may need to do
-
-```sh
-$ sudo pecl install grpc-beta
-```
-
-OR
-
-Compile from source
-
-```sh
-$ # from grpc
-$ cd src/php/ext/grpc
-$ phpize
-$ ./configure
-$ make
-$ sudo make install
-```
diff --git a/src/php/tests/generated_code/math_client.php b/src/php/tests/generated_code/math_client.php
index 76ccabc068..2085560d19 100644
--- a/src/php/tests/generated_code/math_client.php
+++ b/src/php/tests/generated_code/math_client.php
@@ -1,7 +1,7 @@
<?php
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,7 +43,9 @@ function p($line)
$host = 'localhost:50051';
p("Connecting to host: $host");
-$client = new math\MathClient($host, []);
+$client = new math\MathClient($host, [
+ 'credentials' => Grpc\ChannelCredentials::createInsecure()
+]);
p('Client class: '.get_class($client));
p('');
diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto
index d05a35548d..5ce0a1fd64 100644
--- a/src/proto/grpc/testing/echo_messages.proto
+++ b/src/proto/grpc/testing/echo_messages.proto
@@ -42,6 +42,7 @@ message RequestParams {
bool echo_peer = 7;
string expected_client_identity = 8; // will force check_auth_context.
bool skip_cancelled_check = 9;
+ string expected_transport_security_type = 10;
}
message EchoRequest {
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 1f1833d5ec..bc03c4dcf1 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -95,7 +95,6 @@ cdef class Channel:
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag = OperationTag(tag)
- operation_tag.references = [self, queue]
cpython.Py_INCREF(operation_tag)
grpc_channel_watch_connectivity_state(
self.c_channel, last_observed_state, deadline.c_time,
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index fe93da6c12..5f85923524 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -106,7 +106,6 @@ cdef class Server:
self.is_shutting_down = True
operation_tag = OperationTag(tag)
operation_tag.shutting_down_server = self
- operation_tag.references.extend([self, queue])
cpython.Py_INCREF(operation_tag)
grpc_server_shutdown_and_notify(
self.c_server, queue.c_completion_queue,
diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py
index b0dbd92a49..32a31ce00e 100644
--- a/src/python/grpcio/tests/_runner.py
+++ b/src/python/grpcio/tests/_runner.py
@@ -43,6 +43,13 @@ import uuid
from tests import _loader
from tests import _result
+# This number needs to be large enough to outpace output on stdout and stderr
+# from the gRPC core, otherwise we could end up in a potential deadlock. This
+# stems from the OS waiting on someone to clear a filled pipe buffer while the
+# GIL is held from a write to stderr from gRPC core, but said someone is in
+# Python code thus necessitating GIL acquisition.
+_READ_BYTES = 2**20
+
class CapturePipe(object):
"""A context-manager pipe to redirect output to a byte array.
@@ -76,6 +83,10 @@ class CapturePipe(object):
flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL)
fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self._read_thread = threading.Thread(target=self._read)
+ # If the user wants to exit from the Python program and hits ctrl-C and the
+ # read thread is somehow deadlocked with something else, the Python code may
+ # refuse to exit. This prevents that by making the read thread second-class.
+ self._read_thread.daemon = True
self._read_thread.start()
def stop(self):
@@ -93,7 +104,7 @@ class CapturePipe(object):
self.output = bytearray()
while True:
select.select([self._read_fd], [], [])
- read_bytes = os.read(self._read_fd, 1024)
+ read_bytes = os.read(self._read_fd, _READ_BYTES)
if read_bytes:
self.output.extend(read_bytes)
else:
diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json
index 388d040d5c..84870aaa5c 100644
--- a/src/python/grpcio/tests/tests.json
+++ b/src/python/grpcio/tests/tests.json
@@ -12,31 +12,22 @@
"_core_over_links_base_interface_test.SyncEasyTest",
"_core_over_links_base_interface_test.SyncPeasyTest",
"_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py
deleted file mode 100644
index 34db6c3e55..0000000000
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py
+++ /dev/null
@@ -1,381 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Test code for the Face layer of RPC Framework."""
-
-import abc
-import unittest
-
-# test_interfaces is referenced from specification in this module.
-from grpc.framework.interfaces.face import face
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.common import test_control
-from tests.unit.framework.common import test_coverage
-from tests.unit.framework.interfaces.face import _3069_test_constant
-from tests.unit.framework.interfaces.face import _digest
-from tests.unit.framework.interfaces.face import _receiver
-from tests.unit.framework.interfaces.face import _stock_service
-from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
-
-
-class TestCase(test_coverage.Coverage, unittest.TestCase):
- """A test of the Face layer of RPC Framework.
-
- Concrete subclasses must have an "implementation" attribute of type
- test_interfaces.Implementation and an "invoker_constructor" attribute of type
- _invocation.InvokerConstructor.
- """
- __metaclass__ = abc.ABCMeta
-
- NAME = 'EventInvocationSynchronousEventServiceTest'
-
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._control = test_control.PauseFailControl()
- self._digest = _digest.digest(
- _stock_service.STOCK_TEST_SERVICE, self._control, None)
-
- generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
- self._digest.methods, self._digest.event_method_implementations, None)
- self._invoker = self.invoker_constructor.construct_invoker(
- generic_stub, dynamic_stubs, self._digest.methods)
-
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._invoker = None
- self.implementation.destantiate(self._memo)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- receiver.block_until_terminated()
- response = receiver.unary_response()
-
- test_messages.verify(request, response, self)
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- receiver.block_until_terminated()
- responses = receiver.stream_responses()
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.terminate()
- receiver.block_until_terminated()
- response = receiver.unary_response()
-
- test_messages.verify(requests, response, self)
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.terminate()
- receiver.block_until_terminated()
- responses = receiver.stream_responses()
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- # pylint: disable=cell-var-from-loop
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
- second_receiver = _receiver.Receiver()
-
- def make_second_invocation():
- self._invoker.event(group, method)(
- second_request, second_receiver, second_receiver.abort,
- test_constants.LONG_TIMEOUT)
-
- class FirstReceiver(_receiver.Receiver):
-
- def complete(self, terminal_metadata, code, details):
- super(FirstReceiver, self).complete(
- terminal_metadata, code, details)
- make_second_invocation()
-
- first_receiver = FirstReceiver()
-
- self._invoker.event(group, method)(
- first_request, first_receiver, first_receiver.abort,
- test_constants.LONG_TIMEOUT)
- second_receiver.block_until_terminated()
-
- first_response = first_receiver.unary_response()
- second_response = second_receiver.unary_response()
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- def testParallelInvocations(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- first_receiver = _receiver.Receiver()
- second_request = test_messages.request()
- second_receiver = _receiver.Receiver()
-
- self._invoker.event(group, method)(
- first_request, first_receiver, first_receiver.abort,
- test_constants.LONG_TIMEOUT)
- self._invoker.event(group, method)(
- second_request, second_receiver, second_receiver.abort,
- test_constants.LONG_TIMEOUT)
- first_receiver.block_until_terminated()
- second_receiver.block_until_terminated()
-
- first_response = first_receiver.unary_response()
- second_response = second_receiver.unary_response()
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- @unittest.skip('TODO(nathaniel): implement.')
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
-
- def testCancelledUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.pause():
- call = self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- call.cancel()
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
-
- def testCancelledUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- call = self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- call.cancel()
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
-
- def testCancelledStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.cancel()
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
-
- def testCancelledStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_stream_messages_sequences.iteritems()):
- for unused_test_messages in test_messages_sequence:
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- call_consumer.cancel()
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.pause():
- self._invoker.event(group, method)(
- request, receiver, receiver.abort,
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
-
- def testExpiredUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.pause():
- self._invoker.event(group, method)(
- request, receiver, receiver.abort,
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_unary_messages_sequences.iteritems()):
- for unused_test_messages in test_messages_sequence:
- receiver = _receiver.Receiver()
-
- self._invoker.event(group, method)(
- receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
-
- def testExpiredStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.fail():
- self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(
- face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
-
- def testFailedUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.fail():
- self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(
- face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
-
- def testFailedStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- with self._control.fail():
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.terminate()
- receiver.block_until_terminated()
-
- self.assertIs(
- face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
-
- def testFailedStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- with self._control.fail():
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.terminate()
- receiver.block_until_terminated()
-
- self.assertIs(
- face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py
index 462829b660..06b9d77e52 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -34,14 +34,12 @@ import unittest # pylint: disable=unused-import
# test_interfaces is referenced from specification in this module.
from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service
-from tests.unit.framework.interfaces.face import _event_invocation_synchronous_event_service
from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service
from tests.unit.framework.interfaces.face import _invocation
from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
_TEST_CASE_SUPERCLASSES = (
_blocking_invocation_inline_service.TestCase,
- _event_invocation_synchronous_event_service.TestCase,
_future_invocation_asynchronous_event_service.TestCase,
)