aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_config/client_channel.c44
-rw-r--r--src/core/ext/client_config/lb_policy.h27
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c63
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c14
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c80
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c33
-rw-r--r--src/core/lib/channel/message_size_filter.c14
-rw-r--r--src/core/lib/iomgr/error.h8
-rw-r--r--src/core/lib/iomgr/udp_server.c10
-rw-r--r--src/core/lib/surface/byte_buffer.c5
-rw-r--r--src/core/lib/surface/call.c15
-rw-r--r--src/cpp/client/client_context.cc3
-rw-r--r--src/cpp/common/channel_filter.h6
-rw-r--r--src/csharp/Grpc.Tools.nuspec24
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.h20
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m25
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.h4
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.m12
-rw-r--r--src/objective-c/tests/GRPCClientTests.m33
-rw-r--r--src/php/composer.json1
-rw-r--r--src/python/grpcio/grpc/_server.py1
-rw-r--r--src/ruby/README.md2
22 files changed, 278 insertions, 166 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index feb4cbde7b..a6056c3e8d 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -111,10 +111,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_SHUTDOWN) &&
chand->lb_policy != NULL) {
- /* cancel fail-fast picks */
+ /* cancel picks with wait_for_ready=false */
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
- /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
+ /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
/* check= */ 0, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
@@ -185,10 +185,35 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
lb_policy_args.additional_args =
grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
lb_policy_args.client_channel_factory = chand->client_channel_factory;
- lb_policy = grpc_lb_policy_create(
- exec_ctx,
- grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
- &lb_policy_args);
+
+ // Special case: If all of the addresses are balancer addresses,
+ // assume that we should use the grpclb policy, regardless of what the
+ // resolver actually specified.
+ const char *lb_policy_name =
+ grpc_resolver_result_get_lb_policy_name(chand->resolver_result);
+ bool found_backend_address = false;
+ for (size_t i = 0; i < lb_policy_args.addresses->num_addresses; ++i) {
+ if (!lb_policy_args.addresses->addresses[i].is_balancer) {
+ found_backend_address = true;
+ break;
+ }
+ }
+ if (!found_backend_address) {
+ if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided only balancer "
+ "addresses, no backend addresses -- forcing use of grpclb LB "
+ "policy",
+ (lb_policy_name == NULL ? "(none)" : lb_policy_name));
+ }
+ lb_policy_name = "grpclb";
+ }
+ // Use pick_first if nothing was specified and we didn't select grpclb
+ // above.
+ if (lb_policy_name == NULL) lb_policy_name = "pick_first";
+
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error);
@@ -602,9 +627,10 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
- const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
- initial_metadata_flags,
- &calld->lb_token_mdelem};
+ // TODO(dgq): make this deadline configurable somehow.
+ const grpc_lb_policy_pick_args inputs = {
+ calld->pollent, initial_metadata, initial_metadata_flags,
+ &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)};
r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 7fb3e08cb3..110d08fcac 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -59,10 +59,14 @@ typedef struct grpc_lb_policy_pick_args {
grpc_polling_entity *pollent;
/** Initial metadata associated with the picking call. */
grpc_metadata_batch *initial_metadata;
- /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
+ /** Bitmask used for selective cancelling. See \a
+ * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in
+ * grpc_types.h */
uint32_t initial_metadata_flags;
/** Storage for LB token in \a initial_metadata, or NULL if not used */
grpc_linked_mdelem *lb_token_mdelem_storage;
+ /** Deadline for the call to the LB server */
+ gpr_timespec deadline;
} grpc_lb_policy_pick_args;
struct grpc_lb_policy_vtable {
@@ -138,15 +142,18 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
-/** Find an appropriate target for this call, based on \a pick_args.
- Picking can be synchronous or asynchronous. In the synchronous case, when a
- pick is readily available, it'll be returned in \a target and a non-zero
- value will be returned.
- In the asynchronous case, zero is returned and \a on_complete will be called
- once \a target and \a user_data have been set. Any IO should be done under
- \a pick_args->pollent. The opaque \a user_data output argument corresponds
- to information that may need be propagated from the LB policy. It may be
- NULL. Errors are signaled by receiving a NULL \a *target. */
+/** Finds an appropriate subchannel for a call, based on \a pick_args.
+
+ \a target will be set to the selected subchannel, or NULL on failure.
+ Upon success, \a user_data will be set to whatever opaque information
+ may need to be propagated from the LB policy, or NULL if not needed.
+
+ If the pick succeeds and a result is known immediately, a non-zero
+ value will be returned. Otherwise, \a on_complete will be invoked
+ once the pick is complete with its error argument set to indicate
+ success or failure.
+
+ Any I/O should be done under \a pick_args->pollent. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index bdfe65a62a..ae1f2a3b4c 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -105,6 +105,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/lb_policy_factory.h"
@@ -199,18 +200,8 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
typedef struct pending_pick {
struct pending_pick *next;
- /* polling entity for the pick()'s async notification */
- grpc_polling_entity *pollent;
-
- /* the initial metadata for the pick. See grpc_lb_policy_pick() */
- grpc_metadata_batch *initial_metadata;
-
- /* storage for the lb token initial metadata mdelem */
- grpc_linked_mdelem *lb_token_mdelem_storage;
-
- /* bitmask passed to pick() and used for selective cancelling. See
- * grpc_lb_policy_cancel_picks() */
- uint32_t initial_metadata_flags;
+ /* original pick()'s arguments */
+ grpc_lb_policy_pick_args pick_args;
/* output argument where to store the pick()ed connected subchannel, or NULL
* upon error. */
@@ -232,11 +223,8 @@ static void add_pending_pick(pending_pick **root,
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
- pp->pollent = pick_args->pollent;
+ pp->pick_args = *pick_args;
pp->target = target;
- pp->initial_metadata = pick_args->initial_metadata;
- pp->initial_metadata_flags = pick_args->initial_metadata_flags;
- pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
pp->wrapped_on_complete_arg.target = target;
pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
@@ -283,9 +271,13 @@ typedef struct glb_lb_policy {
/** mutex protecting remaining members */
gpr_mu mu;
+ /** who the client is trying to communicate with */
const char *server_name;
grpc_client_channel_factory *cc_factory;
+ /** deadline for the LB's call */
+ gpr_timespec deadline;
+
/** for communicating with the LB server */
grpc_channel *lb_channel;
@@ -486,10 +478,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- const grpc_lb_policy_pick_args pick_args = {
- pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
- pp->lb_token_mdelem_storage};
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args,
+ pp->target,
(void **)&pp->wrapped_on_complete_arg.lb_token,
&pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
@@ -589,7 +579,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
&addr_strs[addr_index++],
(const struct sockaddr *)&args->addresses->addresses[i]
.address.addr,
- true) == 0);
+ true) > 0);
}
}
}
@@ -660,7 +650,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
*pp->target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
NULL);
- gpr_free(pp);
pp = next;
}
@@ -698,12 +687,11 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if (pp->target == target) {
grpc_polling_entity_del_from_pollset_set(
- exec_ctx, pp->pollent, glb_policy->base.interested_parties);
+ exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -729,14 +717,13 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
glb_policy->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
- if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
+ if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(
- exec_ctx, pp->pollent, glb_policy->base.interested_parties);
+ exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -767,8 +754,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
- glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
-
if (pick_args->lb_token_mdelem_storage == NULL) {
*target = NULL;
grpc_exec_ctx_sched(
@@ -776,11 +761,13 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
"won't work without it. Failing"),
NULL);
- return 1;
+ return 0;
}
+ glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
- int r;
+ glb_policy->deadline = pick_args->deadline;
+ bool pick_done;
if (glb_policy->rr_policy != NULL) {
if (grpc_lb_glb_trace) {
@@ -799,10 +786,11 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
- r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
+ pick_done =
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
(void **)&glb_policy->wc_arg.lb_token,
&glb_policy->wrapped_on_complete);
- if (r != 0) {
+ if (pick_done) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
@@ -816,6 +804,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
}
} else {
+ /* else, the pending pick will be registered and taken care of by the
+ * pending pick list inside the RR policy (glb_policy->rr_policy) */
grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties);
add_pending_pick(&glb_policy->pending_picks, pick_args, target,
@@ -824,10 +814,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy);
}
- r = 0;
+ pick_done = false;
}
gpr_mu_unlock(&glb_policy->mu);
- return r;
+ return pick_done;
}
static grpc_connectivity_state glb_check_connectivity(
@@ -937,8 +927,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
- /* TODO(dgq): get the deadline from the parent channel. */
- lb_client->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ lb_client->deadline = glb_policy->deadline;
/* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index e8ac1b12ae..fa33ffd7bd 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -53,16 +53,12 @@
typedef struct {
/** base class: must be first */
grpc_resolver base;
- /** refcount */
- gpr_refcount refs;
/** target name */
char *target_name;
/** name to resolve (usually the same as target_name) */
char *name_to_resolve;
/** default port to use */
char *default_port;
- /** load balancing policy name */
- char *lb_policy_name;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -181,7 +177,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_resolved_addresses_destroy(r->addresses);
result = grpc_resolver_result_create(r->target_name, addresses,
- r->lb_policy_name, NULL);
+ NULL /* lb_policy_name */, NULL);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
@@ -245,13 +241,11 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
gpr_free(r->target_name);
gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
- gpr_free(r->lb_policy_name);
gpr_free(r);
}
static grpc_resolver *dns_create(grpc_resolver_args *args,
- const char *default_port,
- const char *lb_policy_name) {
+ const char *default_port) {
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based dns uri's not supported");
return NULL;
@@ -264,7 +258,6 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
// Create resolver.
dns_resolver *r = gpr_malloc(sizeof(dns_resolver));
memset(r, 0, sizeof(*r));
- gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->target_name = gpr_strdup(path);
@@ -272,7 +265,6 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
r->default_port = gpr_strdup(default_port);
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
- r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
@@ -286,7 +278,7 @@ static void dns_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *dns_factory_create_resolver(
grpc_resolver_factory *factory, grpc_resolver_args *args) {
- return dns_create(args, "https", "pick_first");
+ return dns_create(args, "https");
}
static char *dns_factory_get_default_host_name(grpc_resolver_factory *factory,
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index f232e0460b..5a7a32d7cb 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -49,10 +49,6 @@
typedef struct {
/** base class: must be first */
grpc_resolver base;
- /** refcount */
- gpr_refcount refs;
- /** load balancing policy name */
- char *lb_policy_name;
/** the path component of the uri passed in */
char *target_name;
/** the addresses that we've 'resolved' */
@@ -123,7 +119,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
*r->target_result = grpc_resolver_result_create(
r->target_name,
grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
- r->lb_policy_name, NULL);
+ NULL /* lb_policy_name */, NULL);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
@@ -133,7 +129,6 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
- gpr_free(r->lb_policy_name);
gpr_free(r->target_name);
gpr_free(r);
}
@@ -163,80 +158,49 @@ char *unix_get_default_authority(grpc_resolver_factory *factory,
static void do_nothing(void *ignored) {}
-static grpc_resolver *sockaddr_create(
- grpc_resolver_args *args, const char *default_lb_policy_name,
- int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
- bool errors_found = false;
- sockaddr_resolver *r;
- gpr_slice path_slice;
- gpr_slice_buffer path_parts;
-
+static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
+ int parse(grpc_uri *uri,
+ struct sockaddr_storage *dst,
+ size_t *len)) {
if (0 != strcmp(args->uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme",
args->uri->scheme);
return NULL;
}
-
- r = gpr_malloc(sizeof(sockaddr_resolver));
- memset(r, 0, sizeof(*r));
-
- r->lb_policy_name =
- gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy"));
- const char *lb_enabled_qpart =
- grpc_uri_get_query_arg(args->uri, "lb_enabled");
- /* anything other than "0" is interpreted as true */
- const bool lb_enabled =
- (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0));
-
- if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 &&
- !lb_enabled) {
- /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail
- * out, as this is meant mostly for tests. */
- gpr_log(GPR_ERROR,
- "Requested 'grpclb' LB policy but resolved addresses don't "
- "support load balancing.");
- abort();
- }
-
- if (r->lb_policy_name == NULL) {
- r->lb_policy_name = gpr_strdup(default_lb_policy_name);
- }
-
- path_slice =
+ /* Construct addresses. */
+ gpr_slice path_slice =
gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing);
+ gpr_slice_buffer path_parts;
gpr_slice_buffer_init(&path_parts);
-
gpr_slice_split(path_slice, ",", &path_parts);
- r->addresses = grpc_lb_addresses_create(path_parts.count);
- for (size_t i = 0; i < r->addresses->num_addresses; i++) {
+ grpc_lb_addresses *addresses = grpc_lb_addresses_create(path_parts.count);
+ bool errors_found = false;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
- if (!parse(&ith_uri, (struct sockaddr_storage *)(&r->addresses->addresses[i]
- .address.addr),
- &r->addresses->addresses[i].address.len)) {
+ if (!parse(
+ &ith_uri,
+ (struct sockaddr_storage *)(&addresses->addresses[i].address.addr),
+ &addresses->addresses[i].address.len)) {
errors_found = true;
}
gpr_free(part_str);
- r->addresses->addresses[i].is_balancer = lb_enabled;
if (errors_found) break;
}
-
- r->target_name = gpr_strdup(args->uri->path);
gpr_slice_buffer_destroy(&path_parts);
gpr_slice_unref(path_slice);
if (errors_found) {
- gpr_free(r->lb_policy_name);
- gpr_free(r->target_name);
- grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
- gpr_free(r);
+ grpc_lb_addresses_destroy(addresses, NULL /* user_data_destroy */);
return NULL;
}
-
- gpr_ref_init(&r->refs, 1);
+ /* Instantiate resolver. */
+ sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver));
+ memset(r, 0, sizeof(*r));
+ r->target_name = gpr_strdup(args->uri->path);
+ r->addresses = addresses;
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
-
return &r->base;
}
@@ -251,7 +215,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
#define DECL_FACTORY(name) \
static grpc_resolver *name##_factory_create_resolver( \
grpc_resolver_factory *factory, grpc_resolver_args *args) { \
- return sockaddr_create(args, "pick_first", parse_##name); \
+ return sockaddr_create(args, parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 366690acf2..25ad40b935 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -239,6 +239,14 @@ static const char *op_id_string(enum e_op_id i) {
return "UNKNOWN";
}
+static void free_read_buffer(stream_obj *s) {
+ if (s->state.rs.read_buffer &&
+ s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) {
+ gpr_free(s->state.rs.read_buffer);
+ s->state.rs.read_buffer = NULL;
+ }
+}
+
/*
Add a new stream op to op storage.
*/
@@ -341,6 +349,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
+ free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -363,6 +372,7 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
+ free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -377,6 +387,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
+ free_read_buffer(s);
gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -531,7 +542,8 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
*/
static void convert_metadata_to_cronet_headers(
grpc_linked_mdelem *head, const char *host, char **pp_url,
- cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) {
+ cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers,
+ const char **method) {
grpc_linked_mdelem *curr = head;
/* Walk the linked list and get number of header fields */
size_t num_headers_available = 0;
@@ -558,11 +570,20 @@ static void convert_metadata_to_cronet_headers(
curr = curr->next;
const char *key = grpc_mdstr_as_c_string(mdelem->key);
const char *value = grpc_mdstr_as_c_string(mdelem->value);
- if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME ||
+ if (mdelem->key == GRPC_MDSTR_SCHEME ||
mdelem->key == GRPC_MDSTR_AUTHORITY) {
/* Cronet populates these fields on its own */
continue;
}
+ if (mdelem->key == GRPC_MDSTR_METHOD) {
+ if (mdelem->value == GRPC_MDSTR_PUT) {
+ *method = "PUT";
+ } else {
+ /* POST method in default*/
+ *method = "POST";
+ }
+ continue;
+ }
if (mdelem->key == GRPC_MDSTR_PATH) {
/* Create URL by appending :path value to the hostname */
gpr_asprintf(pp_url, "https://%s%s", host, value);
@@ -759,15 +780,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
&cronet_callbacks);
CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
- char *url;
+ char *url = NULL;
+ const char *method = "POST";
s->header_array.headers = NULL;
convert_metadata_to_cronet_headers(
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
- &s->header_array.headers, &s->header_array.count);
+ &s->header_array.headers, &s->header_array.count, &method);
s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs,
url);
- cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array,
+ cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array,
false);
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
result = ACTION_TAKEN_WITH_CALLBACK;
@@ -901,6 +923,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
memcpy(dst_p, stream_state->rs.read_buffer,
(size_t)stream_state->rs.length_field);
+ free_read_buffer(s);
gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 02fc68fc3a..f067a3a51c 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -73,16 +73,22 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)",
(*calld->recv_message)->length, chand->max_recv_size);
- gpr_slice message = gpr_slice_from_copied_string(message_string);
+ grpc_error* new_error = grpc_error_set_int(
+ GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_INVALID_ARGUMENT);
+ if (error == GRPC_ERROR_NONE) {
+ error = new_error;
+ } else {
+ error = grpc_error_add_child(error, new_error);
+ GRPC_ERROR_UNREF(new_error);
+ }
gpr_free(message_string);
- grpc_call_element_send_close_with_message(
- exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Invoke the next callback.
grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
}
-// Start transport op.
+// Start transport stream op.
static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 2d715500d1..ae6a6cb35e 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -40,6 +40,10 @@
#include <grpc/status.h>
#include <grpc/support/time.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/// Opaque representation of an error.
/// Errors are refcounted objects that represent the result of an operation.
/// Ownership laws:
@@ -208,4 +212,8 @@ bool grpc_log_if_error(const char *what, grpc_error *error, const char *file,
#define GRPC_LOG_IF_ERROR(what, error) \
grpc_log_if_error((what), (error), __FILE__, __LINE__)
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 48032412a2..edf7b133e9 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -38,7 +38,6 @@
#include <grpc/support/port_platform.h>
-#ifdef GRPC_NEED_UDP
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/udp_server.h"
@@ -171,6 +170,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
+ /* Call the orphan_cb to signal that the FD is about to be closed and
+ * should no longer be used. */
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(sp->emfd);
@@ -197,6 +198,12 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
/* shutdown all fd's */
if (s->active_ports) {
for (i = 0; i < s->nports; i++) {
+ server_port *sp = &s->ports[i];
+ /* Call the orphan_cb to signal that the FD is about to be closed and
+ * should no longer be used. */
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(sp->emfd);
+
grpc_fd_shutdown(exec_ctx, s->ports[i].emfd);
}
gpr_mu_unlock(&s->mu);
@@ -439,4 +446,3 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
}
#endif
-#endif
diff --git a/src/core/lib/surface/byte_buffer.c b/src/core/lib/surface/byte_buffer.c
index a093a37af3..054a6e6c58 100644
--- a/src/core/lib/surface/byte_buffer.c
+++ b/src/core/lib/surface/byte_buffer.c
@@ -72,8 +72,9 @@ grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_RAW:
- return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices,
- bb->data.raw.slice_buffer.count);
+ return grpc_raw_compressed_byte_buffer_create(
+ bb->data.raw.slice_buffer.slices, bb->data.raw.slice_buffer.count,
+ bb->data.raw.compression);
}
GPR_UNREACHABLE_CODE(return NULL);
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index d723711c55..c4effa9a05 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1108,8 +1108,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- bool success) {
+static void process_data_after_md(grpc_exec_ctx *exec_ctx,
+ batch_control *bctl) {
grpc_call *call = bctl->call;
if (call->receiving_stream == NULL) {
*call->receiving_buffer = NULL;
@@ -1129,8 +1129,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
bctl);
continue_receiving_slices(exec_ctx, bctl);
- /* early out */
- return;
}
}
@@ -1138,12 +1136,17 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
-
+ if (error != GRPC_ERROR_NONE) {
+ grpc_status_code status;
+ const char *msg;
+ grpc_error_get_status(error, &status, &msg);
+ close_with_status(exec_ctx, call, status, msg);
+ }
gpr_mu_lock(&bctl->call->mu);
if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) {
gpr_mu_unlock(&bctl->call->mu);
- process_data_after_md(exec_ctx, bctlp, error);
+ process_data_after_md(exec_ctx, bctlp);
} else {
call->saved_receiving_stream_ready_bctlp = bctlp;
gpr_mu_unlock(&bctl->call->mu);
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 5b6aaa777b..b6008f47b1 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -59,7 +59,8 @@ static ClientContext::GlobalCallbacks* g_client_callbacks =
ClientContext::ClientContext()
: initial_metadata_received_(false),
- fail_fast_(true),
+ wait_for_ready_(false),
+ wait_for_ready_explicitly_set_(false),
idempotent_(false),
cacheable_(false),
call_(nullptr),
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index 6f5af3dec3..ae32e02f69 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -42,6 +42,7 @@
#include <vector>
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/metadata_batch.h"
@@ -54,11 +55,6 @@
/// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
/// \endcode
-/// Forward declaration to avoid including the file
-/// "src/core/lib/security/context/security_context.h"
-struct grpc_client_security_context;
-struct grpc_server_security_context;
-
namespace grpc {
/// A C++ wrapper for the \c grpc_metadata_batch struct.
diff --git a/src/csharp/Grpc.Tools.nuspec b/src/csharp/Grpc.Tools.nuspec
index 0c937ab9cb..ba4e1d674c 100644
--- a/src/csharp/Grpc.Tools.nuspec
+++ b/src/csharp/Grpc.Tools.nuspec
@@ -17,17 +17,17 @@
</metadata>
<files>
<!-- forward slashes in src path enable building on Linux -->
- <file src="protoc_plugins/windows_x86/protoc.exe" target="tools\windows_x86\protoc.exe" />
- <file src="protoc_plugins/windows_x86/grpc_csharp_plugin.exe" target="tools\windows_x86\grpc_csharp_plugin.exe" />
- <file src="protoc_plugins/windows_x64/protoc.exe" target="tools\windows_x64\protoc.exe" />
- <file src="protoc_plugins/windows_x64/grpc_csharp_plugin.exe" target="tools\windows_x64\grpc_csharp_plugin.exe" />
- <file src="protoc_plugins/linux_x86/protoc" target="tools\linux_x86\protoc" />
- <file src="protoc_plugins/linux_x86/grpc_csharp_plugin" target="tools\linux_x86\grpc_csharp_plugin" />
- <file src="protoc_plugins/linux_x64/protoc" target="tools\linux_x64\protoc" />
- <file src="protoc_plugins/linux_x64/grpc_csharp_plugin" target="tools\linux_x64\grpc_csharp_plugin" />
- <file src="protoc_plugins/macosx_x86/protoc" target="tools\macosx_x86\protoc" />
- <file src="protoc_plugins/macosx_x86/grpc_csharp_plugin" target="tools\macosx_x86\grpc_csharp_plugin" />
- <file src="protoc_plugins/macosx_x64/protoc" target="tools\macosx_x64\protoc" />
- <file src="protoc_plugins/macosx_x64/grpc_csharp_plugin" target="tools\macosx_x64\grpc_csharp_plugin" />
+ <file src="protoc_plugins/windows_x86/protoc.exe" target="tools/windows_x86/protoc.exe" />
+ <file src="protoc_plugins/windows_x86/grpc_csharp_plugin.exe" target="tools/windows_x86/grpc_csharp_plugin.exe" />
+ <file src="protoc_plugins/windows_x64/protoc.exe" target="tools/windows_x64/protoc.exe" />
+ <file src="protoc_plugins/windows_x64/grpc_csharp_plugin.exe" target="tools/windows_x64/grpc_csharp_plugin.exe" />
+ <file src="protoc_plugins/linux_x86/protoc" target="tools/linux_x86/protoc" />
+ <file src="protoc_plugins/linux_x86/grpc_csharp_plugin" target="tools/linux_x86/grpc_csharp_plugin" />
+ <file src="protoc_plugins/linux_x64/protoc" target="tools/linux_x64/protoc" />
+ <file src="protoc_plugins/linux_x64/grpc_csharp_plugin" target="tools/linux_x64/grpc_csharp_plugin" />
+ <file src="protoc_plugins/macosx_x86/protoc" target="tools/macosx_x86/protoc" />
+ <file src="protoc_plugins/macosx_x86/grpc_csharp_plugin" target="tools/macosx_x86/grpc_csharp_plugin" />
+ <file src="protoc_plugins/macosx_x64/protoc" target="tools/macosx_x64/protoc" />
+ <file src="protoc_plugins/macosx_x64/grpc_csharp_plugin" target="tools/macosx_x64/grpc_csharp_plugin" />
</files>
</package>
diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h
index b9e741dfa8..7645bb1d34 100644
--- a/src/objective-c/GRPCClient/GRPCCall.h
+++ b/src/objective-c/GRPCClient/GRPCCall.h
@@ -155,6 +155,18 @@ typedef NS_ENUM(NSUInteger, GRPCErrorCode) {
};
/**
+ * Safety remark of a gRPC method as defined in RFC 2616 Section 9.1
+ */
+typedef NS_ENUM(NSUInteger, GRPCCallSafety) {
+ /** Signal that there is no guarantees on how the call affects the server state. */
+ GRPCCallSafetyDefault = 0,
+ /** Signal that the call is idempotent. gRPC is free to use PUT verb. */
+ GRPCCallSafetyIdempotentRequest = 1,
+ /** Signal that the call is cacheable and will not affect server state. gRPC is free to use GET verb. */
+ GRPCCallSafetyCacheableRequest = 2,
+};
+
+/**
* Keys used in |NSError|'s |userInfo| dictionary to store the response headers and trailers sent by
* the server.
*/
@@ -233,6 +245,14 @@ extern id const kGRPCTrailersKey;
*/
- (void)cancel;
+/**
+ * Set the call flag for a specific host path.
+ *
+ * Host parameter should not contain the scheme (http:// or https://), only the name or IP addr
+ * and the port number, for example @"localhost:5050".
+ */
++ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path;
+
// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter?
@end
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index eecda4c03a..43204345f5 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -47,6 +47,7 @@
NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey";
NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
+static NSMutableDictionary *callFlags;
@interface GRPCCall () <GRXWriteable>
// Make them read-write.
@@ -106,6 +107,29 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
// TODO(jcanizales): If grpc_init is idempotent, this should be changed from load to initialize.
+ (void)load {
grpc_init();
+ callFlags = [NSMutableDictionary dictionary];
+}
+
++ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
+ NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
+ switch (callSafety) {
+ case GRPCCallSafetyDefault:
+ callFlags[hostAndPath] = @0;
+ break;
+ case GRPCCallSafetyIdempotentRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
+ break;
+ case GRPCCallSafetyCacheableRequest:
+ callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
+ break;
+ default:
+ break;
+ }
+}
+
++ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
+ NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
+ return [callFlags[hostAndPath] intValue];
}
- (instancetype)init {
@@ -231,6 +255,7 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
- (void)sendHeaders:(NSDictionary *)headers {
// TODO(jcanizales): Add error handlers for async failures
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers
+ flags:[GRPCCall callFlagsForHost:_host path:_path]
handler:nil]]];
}
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
index e37ed1b59f..52233c8242 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
@@ -45,6 +45,10 @@
@interface GRPCOpSendMetadata : GRPCOperation
- (instancetype)initWithMetadata:(NSDictionary *)metadata
+ handler:(void(^)())handler;
+
+- (instancetype)initWithMetadata:(NSDictionary *)metadata
+ flags:(uint32_t)flags
handler:(void(^)())handler NS_DESIGNATED_INITIALIZER;
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
index 1339429660..627b6aa86d 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
@@ -64,16 +64,24 @@
@implementation GRPCOpSendMetadata
- (instancetype)init {
- return [self initWithMetadata:nil handler:nil];
+ return [self initWithMetadata:nil flags:0 handler:nil];
}
-- (instancetype)initWithMetadata:(NSDictionary *)metadata handler:(void (^)())handler {
+- (instancetype)initWithMetadata:(NSDictionary *)metadata
+ handler:(void (^)())handler {
+ return [self initWithMetadata:metadata flags:0 handler:handler];
+}
+
+- (instancetype)initWithMetadata:(NSDictionary *)metadata
+ flags:(uint32_t)flags
+ handler:(void (^)())handler {
if (self = [super init]) {
_op.op = GRPC_OP_SEND_INITIAL_METADATA;
_op.data.send_initial_metadata.count = metadata.count;
_op.data.send_initial_metadata.metadata = metadata.grpc_metadataArray;
_op.data.send_initial_metadata.maybe_compression_level.is_set = false;
_op.data.send_initial_metadata.maybe_compression_level.level = 0;
+ _op.flags = flags;
_handler = handler;
}
return self;
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index 916a335802..77640525d5 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -317,4 +317,37 @@ static GRPCProtoMethod *kUnaryCallMethod;
}
+- (void)testIdempotentProtoRPC {
+ __weak XCTestExpectation *response = [self expectationWithDescription:@"Expected response."];
+ __weak XCTestExpectation *completion = [self expectationWithDescription:@"RPC completed."];
+
+ RMTSimpleRequest *request = [RMTSimpleRequest message];
+ request.responseSize = 100;
+ request.fillUsername = YES;
+ request.fillOauthScope = YES;
+ GRXWriter *requestsWriter = [GRXWriter writerWithValue:[request data]];
+
+ GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress
+ path:kUnaryCallMethod.HTTPPath
+ requestsWriter:requestsWriter];
+ [GRPCCall setCallSafety:GRPCCallSafetyIdempotentRequest host:kHostAddress path:kUnaryCallMethod.HTTPPath];
+
+ id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
+ XCTAssertNotNil(value, @"nil value received as response.");
+ XCTAssertGreaterThan(value.length, 0, @"Empty response received.");
+ RMTSimpleResponse *responseProto = [RMTSimpleResponse parseFromData:value error:NULL];
+ // We expect empty strings, not nil:
+ XCTAssertNotNil(responseProto.username, @"Response's username is nil.");
+ XCTAssertNotNil(responseProto.oauthScope, @"Response's OAuth scope is nil.");
+ [response fulfill];
+ } completionHandler:^(NSError *errorOrNil) {
+ XCTAssertNil(errorOrNil, @"Finished with unexpected error: %@", errorOrNil);
+ [completion fulfill];
+ }];
+
+ [call startWithWriteable:responsesWriteable];
+
+ [self waitForExpectationsWithTimeout:8 handler:nil];
+}
+
@end
diff --git a/src/php/composer.json b/src/php/composer.json
index 6042094032..2d5d555bc2 100644
--- a/src/php/composer.json
+++ b/src/php/composer.json
@@ -8,6 +8,7 @@
"version": "1.1.0",
"require": {
"php": ">=5.5.0",
+ "ext-grpc": "*",
"google/protobuf": "v3.1.0-alpha-1"
},
"require-dev": {
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 5f846ce773..5223712dfa 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -462,7 +462,6 @@ def _unary_response_in_pool(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
_status(rpc_event, state, serialized_response)
- return
def _stream_response_in_pool(
diff --git a/src/ruby/README.md b/src/ruby/README.md
index 3179575486..67e94dd354 100644
--- a/src/ruby/README.md
+++ b/src/ruby/README.md
@@ -73,5 +73,5 @@ Directory structure is the layout for [ruby extensions][]
[ruby extensions]:http://guides.rubygems.org/gems-with-extensions/
[rubydoc]: http://www.rubydoc.info/gems/grpc
-[grpc.io]: http://www.grpc.io/docs/installation/ruby.html
+[grpc.io]: http://www.grpc.io/docs/quickstart/ruby.html
[Debian jessie-backports]:http://backports.debian.org/Instructions/