diff options
author | Mark D. Roth <roth@google.com> | 2016-10-07 14:26:37 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-10-07 14:26:37 -0700 |
commit | 90576b1b67f48a1581f6cf36626118e5035d16d1 (patch) | |
tree | edbb2878a1baee506a1ce0d21dd57bd17fb06a6a /src/core | |
parent | 43b817ced14083de585111c2657bbe34040bdcc7 (diff) | |
parent | 70c0b32c0fc5db15c764a2e22c2491d19d7cff7b (diff) |
Merge remote-tracking branch 'upstream/master' into run_interop_tests_go
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 44 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.h | 27 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 74 | ||||
-rw-r--r-- | src/core/ext/resolver/dns/native/dns_resolver.c | 14 | ||||
-rw-r--r-- | src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 86 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 33 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 14 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.h | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.c | 10 | ||||
-rw-r--r-- | src/core/lib/surface/byte_buffer.c | 5 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 15 |
11 files changed, 181 insertions, 149 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index feb4cbde7b..a6056c3e8d 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -111,10 +111,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE || state == GRPC_CHANNEL_SHUTDOWN) && chand->lb_policy != NULL) { - /* cancel fail-fast picks */ + /* cancel picks with wait_for_ready=false */ grpc_lb_policy_cancel_picks( exec_ctx, chand->lb_policy, - /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, + /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* check= */ 0, GRPC_ERROR_REF(error)); } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, @@ -185,10 +185,35 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, lb_policy_args.additional_args = grpc_resolver_result_get_lb_policy_args(chand->resolver_result); lb_policy_args.client_channel_factory = chand->client_channel_factory; - lb_policy = grpc_lb_policy_create( - exec_ctx, - grpc_resolver_result_get_lb_policy_name(chand->resolver_result), - &lb_policy_args); + + // Special case: If all of the addresses are balancer addresses, + // assume that we should use the grpclb policy, regardless of what the + // resolver actually specified. + const char *lb_policy_name = + grpc_resolver_result_get_lb_policy_name(chand->resolver_result); + bool found_backend_address = false; + for (size_t i = 0; i < lb_policy_args.addresses->num_addresses; ++i) { + if (!lb_policy_args.addresses->addresses[i].is_balancer) { + found_backend_address = true; + break; + } + } + if (!found_backend_address) { + if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) { + gpr_log(GPR_INFO, + "resolver requested LB policy %s but provided only balancer " + "addresses, no backend addresses -- forcing use of grpclb LB " + "policy", + (lb_policy_name == NULL ? "(none)" : lb_policy_name)); + } + lb_policy_name = "grpclb"; + } + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (lb_policy_name == NULL) lb_policy_name = "pick_first"; + + lb_policy = + grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_ERROR_UNREF(state_error); @@ -602,9 +627,10 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, int r; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); gpr_mu_unlock(&chand->mu); - const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata, - initial_metadata_flags, - &calld->lb_token_mdelem}; + // TODO(dgq): make this deadline configurable somehow. + const grpc_lb_policy_pick_args inputs = { + calld->pollent, initial_metadata, initial_metadata_flags, + &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)}; r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index 7fb3e08cb3..110d08fcac 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -59,10 +59,14 @@ typedef struct grpc_lb_policy_pick_args { grpc_polling_entity *pollent; /** Initial metadata associated with the picking call. */ grpc_metadata_batch *initial_metadata; - /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */ + /** Bitmask used for selective cancelling. See \a + * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in + * grpc_types.h */ uint32_t initial_metadata_flags; /** Storage for LB token in \a initial_metadata, or NULL if not used */ grpc_linked_mdelem *lb_token_mdelem_storage; + /** Deadline for the call to the LB server */ + gpr_timespec deadline; } grpc_lb_policy_pick_args; struct grpc_lb_policy_vtable { @@ -138,15 +142,18 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); -/** Find an appropriate target for this call, based on \a pick_args. - Picking can be synchronous or asynchronous. In the synchronous case, when a - pick is readily available, it'll be returned in \a target and a non-zero - value will be returned. - In the asynchronous case, zero is returned and \a on_complete will be called - once \a target and \a user_data have been set. Any IO should be done under - \a pick_args->pollent. The opaque \a user_data output argument corresponds - to information that may need be propagated from the LB policy. It may be - NULL. Errors are signaled by receiving a NULL \a *target. */ +/** Finds an appropriate subchannel for a call, based on \a pick_args. + + \a target will be set to the selected subchannel, or NULL on failure. + Upon success, \a user_data will be set to whatever opaque information + may need to be propagated from the LB policy, or NULL if not needed. + + If the pick succeeds and a result is known immediately, a non-zero + value will be returned. Otherwise, \a on_complete will be invoked + once the pick is complete with its error argument set to indicate + success or failure. + + Any I/O should be done under \a pick_args->pollent. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, void **user_data, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 0050489425..ae1f2a3b4c 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -105,6 +105,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> +#include <grpc/support/time.h> #include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/lb_policy_factory.h" @@ -199,18 +200,8 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, typedef struct pending_pick { struct pending_pick *next; - /* polling entity for the pick()'s async notification */ - grpc_polling_entity *pollent; - - /* the initial metadata for the pick. See grpc_lb_policy_pick() */ - grpc_metadata_batch *initial_metadata; - - /* storage for the lb token initial metadata mdelem */ - grpc_linked_mdelem *lb_token_mdelem_storage; - - /* bitmask passed to pick() and used for selective cancelling. See - * grpc_lb_policy_cancel_picks() */ - uint32_t initial_metadata_flags; + /* original pick()'s arguments */ + grpc_lb_policy_pick_args pick_args; /* output argument where to store the pick()ed connected subchannel, or NULL * upon error. */ @@ -232,11 +223,8 @@ static void add_pending_pick(pending_pick **root, memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; - pp->pollent = pick_args->pollent; + pp->pick_args = *pick_args; pp->target = target; - pp->initial_metadata = pick_args->initial_metadata; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; pp->wrapped_on_complete_arg.target = target; pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; @@ -283,9 +271,13 @@ typedef struct glb_lb_policy { /** mutex protecting remaining members */ gpr_mu mu; + /** who the client is trying to communicate with */ const char *server_name; grpc_client_channel_factory *cc_factory; + /** deadline for the LB's call */ + gpr_timespec deadline; + /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -486,10 +478,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - const grpc_lb_policy_pick_args pick_args = { - pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, - pp->lb_token_mdelem_storage}; - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args, + pp->target, (void **)&pp->wrapped_on_complete_arg.lb_token, &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; @@ -589,7 +579,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, &addr_strs[addr_index++], (const struct sockaddr *)&args->addresses->addresses[i] .address.addr, - true) == 0); + true) > 0); } } } @@ -660,7 +650,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE, NULL); - gpr_free(pp); pp = next; } @@ -698,12 +687,11 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pollent, glb_policy->base.interested_parties); + exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); *target = NULL; grpc_exec_ctx_sched( exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); - gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -729,14 +717,13 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, glb_policy->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; - if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pollent, glb_policy->base.interested_parties); + exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); grpc_exec_ctx_sched( exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); - gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -767,8 +754,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { - glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; grpc_exec_ctx_sched( @@ -776,11 +761,13 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting " "won't work without it. Failing"), NULL); - return 1; + return 0; } + glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); - int r; + glb_policy->deadline = pick_args->deadline; + bool pick_done; if (glb_policy->rr_policy != NULL) { if (grpc_lb_glb_trace) { @@ -799,10 +786,11 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, &glb_policy->wc_arg); - r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, + pick_done = + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, (void **)&glb_policy->wc_arg.lb_token, &glb_policy->wrapped_on_complete); - if (r != 0) { + if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", @@ -816,6 +804,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); } } else { + /* else, the pending pick will be registered and taken care of by the + * pending pick list inside the RR policy (glb_policy->rr_policy) */ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, glb_policy->base.interested_parties); add_pending_pick(&glb_policy->pending_picks, pick_args, target, @@ -824,10 +814,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!glb_policy->started_picking) { start_picking(exec_ctx, glb_policy); } - r = 0; + pick_done = false; } gpr_mu_unlock(&glb_policy->mu); - return r; + return pick_done; } static grpc_connectivity_state glb_check_connectivity( @@ -923,6 +913,9 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->server_name != NULL); + GPR_ASSERT(glb_policy->server_name[0] != '\0'); + lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data)); memset(lb_client, 0, sizeof(lb_client_data)); @@ -934,8 +927,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client); - /* TODO(dgq): get the deadline from the parent channel. */ - lb_client->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + lb_client->deadline = glb_policy->deadline; /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling @@ -943,14 +935,14 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, - "/grpc.lb.v1.LoadBalancer/BalanceLoad", NULL, lb_client->deadline, NULL); + "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name, + lb_client->deadline, NULL); grpc_metadata_array_init(&lb_client->initial_metadata_recv); grpc_metadata_array_init(&lb_client->trailing_metadata_recv); - grpc_grpclb_request *request = grpc_grpclb_request_create( - "load.balanced.service.name"); /* FIXME(dgq): get the name of the load - balanced service from the resolver */ + grpc_grpclb_request *request = + grpc_grpclb_request_create(glb_policy->server_name); gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); lb_client->request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index e8ac1b12ae..fa33ffd7bd 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -53,16 +53,12 @@ typedef struct { /** base class: must be first */ grpc_resolver base; - /** refcount */ - gpr_refcount refs; /** target name */ char *target_name; /** name to resolve (usually the same as target_name) */ char *name_to_resolve; /** default port to use */ char *default_port; - /** load balancing policy name */ - char *lb_policy_name; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -181,7 +177,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } grpc_resolved_addresses_destroy(r->addresses); result = grpc_resolver_result_create(r->target_name, addresses, - r->lb_policy_name, NULL); + NULL /* lb_policy_name */, NULL); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); @@ -245,13 +241,11 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { gpr_free(r->target_name); gpr_free(r->name_to_resolve); gpr_free(r->default_port); - gpr_free(r->lb_policy_name); gpr_free(r); } static grpc_resolver *dns_create(grpc_resolver_args *args, - const char *default_port, - const char *lb_policy_name) { + const char *default_port) { if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based dns uri's not supported"); return NULL; @@ -264,7 +258,6 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, // Create resolver. dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); - gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &dns_resolver_vtable); r->target_name = gpr_strdup(path); @@ -272,7 +265,6 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, r->default_port = gpr_strdup(default_port); gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); - r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } @@ -286,7 +278,7 @@ static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( grpc_resolver_factory *factory, grpc_resolver_args *args) { - return dns_create(args, "https", "pick_first"); + return dns_create(args, "https"); } static char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 74d2015e5c..5a7a32d7cb 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -49,14 +49,10 @@ typedef struct { /** base class: must be first */ grpc_resolver base; - /** refcount */ - gpr_refcount refs; - /** load balancing policy name */ - char *lb_policy_name; - + /** the path component of the uri passed in */ + char *target_name; /** the addresses that we've 'resolved' */ grpc_lb_addresses *addresses; - /** mutex guarding the rest of the state */ gpr_mu mu; /** have we published? */ @@ -121,8 +117,9 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, if (r->next_completion != NULL && !r->published) { r->published = true; *r->target_result = grpc_resolver_result_create( - "", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), - r->lb_policy_name, NULL); + r->target_name, + grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), + NULL /* lb_policy_name */, NULL); grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } @@ -132,7 +129,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */); - gpr_free(r->lb_policy_name); + gpr_free(r->target_name); gpr_free(r); } @@ -161,78 +158,49 @@ char *unix_get_default_authority(grpc_resolver_factory *factory, static void do_nothing(void *ignored) {} -static grpc_resolver *sockaddr_create( - grpc_resolver_args *args, const char *default_lb_policy_name, - int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { - bool errors_found = false; - sockaddr_resolver *r; - gpr_slice path_slice; - gpr_slice_buffer path_parts; - +static grpc_resolver *sockaddr_create(grpc_resolver_args *args, + int parse(grpc_uri *uri, + struct sockaddr_storage *dst, + size_t *len)) { if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme", args->uri->scheme); return NULL; } - - r = gpr_malloc(sizeof(sockaddr_resolver)); - memset(r, 0, sizeof(*r)); - - r->lb_policy_name = - gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy")); - const char *lb_enabled_qpart = - grpc_uri_get_query_arg(args->uri, "lb_enabled"); - /* anything other than "0" is interpreted as true */ - const bool lb_enabled = - (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0)); - - if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 && - !lb_enabled) { - /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail - * out, as this is meant mostly for tests. */ - gpr_log(GPR_ERROR, - "Requested 'grpclb' LB policy but resolved addresses don't " - "support load balancing."); - abort(); - } - - if (r->lb_policy_name == NULL) { - r->lb_policy_name = gpr_strdup(default_lb_policy_name); - } - - path_slice = + /* Construct addresses. */ + gpr_slice path_slice = gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); + gpr_slice_buffer path_parts; gpr_slice_buffer_init(&path_parts); - gpr_slice_split(path_slice, ",", &path_parts); - r->addresses = grpc_lb_addresses_create(path_parts.count); - for (size_t i = 0; i < r->addresses->num_addresses; i++) { + grpc_lb_addresses *addresses = grpc_lb_addresses_create(path_parts.count); + bool errors_found = false; + for (size_t i = 0; i < addresses->num_addresses; i++) { grpc_uri ith_uri = *args->uri; char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; - if (!parse(&ith_uri, (struct sockaddr_storage *)(&r->addresses->addresses[i] - .address.addr), - &r->addresses->addresses[i].address.len)) { + if (!parse( + &ith_uri, + (struct sockaddr_storage *)(&addresses->addresses[i].address.addr), + &addresses->addresses[i].address.len)) { errors_found = true; } gpr_free(part_str); - r->addresses->addresses[i].is_balancer = lb_enabled; if (errors_found) break; } - gpr_slice_buffer_destroy(&path_parts); gpr_slice_unref(path_slice); if (errors_found) { - gpr_free(r->lb_policy_name); - grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */); - gpr_free(r); + grpc_lb_addresses_destroy(addresses, NULL /* user_data_destroy */); return NULL; } - - gpr_ref_init(&r->refs, 1); + /* Instantiate resolver. */ + sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver)); + memset(r, 0, sizeof(*r)); + r->target_name = gpr_strdup(args->uri->path); + r->addresses = addresses; gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); - return &r->base; } @@ -247,7 +215,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} #define DECL_FACTORY(name) \ static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_resolver_args *args) { \ - return sockaddr_create(args, "pick_first", parse_##name); \ + return sockaddr_create(args, parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ sockaddr_factory_ref, sockaddr_factory_unref, \ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 366690acf2..25ad40b935 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -239,6 +239,14 @@ static const char *op_id_string(enum e_op_id i) { return "UNKNOWN"; } +static void free_read_buffer(stream_obj *s) { + if (s->state.rs.read_buffer && + s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { + gpr_free(s->state.rs.read_buffer); + s->state.rs.read_buffer = NULL; + } +} + /* Add a new stream op to op storage. */ @@ -341,6 +349,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } + free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -363,6 +372,7 @@ static void on_canceled(cronet_bidirectional_stream *stream) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } + free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -377,6 +387,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) { cronet_bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; s->cbs = NULL; + free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -531,7 +542,8 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer, */ static void convert_metadata_to_cronet_headers( grpc_linked_mdelem *head, const char *host, char **pp_url, - cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) { + cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers, + const char **method) { grpc_linked_mdelem *curr = head; /* Walk the linked list and get number of header fields */ size_t num_headers_available = 0; @@ -558,11 +570,20 @@ static void convert_metadata_to_cronet_headers( curr = curr->next; const char *key = grpc_mdstr_as_c_string(mdelem->key); const char *value = grpc_mdstr_as_c_string(mdelem->value); - if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME || + if (mdelem->key == GRPC_MDSTR_SCHEME || mdelem->key == GRPC_MDSTR_AUTHORITY) { /* Cronet populates these fields on its own */ continue; } + if (mdelem->key == GRPC_MDSTR_METHOD) { + if (mdelem->value == GRPC_MDSTR_PUT) { + *method = "PUT"; + } else { + /* POST method in default*/ + *method = "POST"; + } + continue; + } if (mdelem->key == GRPC_MDSTR_PATH) { /* Create URL by appending :path value to the hostname */ gpr_asprintf(pp_url, "https://%s%s", host, value); @@ -759,15 +780,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); - char *url; + char *url = NULL; + const char *method = "POST"; s->header_array.headers = NULL; convert_metadata_to_cronet_headers( stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, - &s->header_array.headers, &s->header_array.count); + &s->header_array.headers, &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs, url); - cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array, + cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; result = ACTION_TAKEN_WITH_CALLBACK; @@ -901,6 +923,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); + free_read_buffer(s); gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer); gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 02fc68fc3a..f067a3a51c 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -73,16 +73,22 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, gpr_asprintf(&message_string, "Received message larger than max (%u vs. %d)", (*calld->recv_message)->length, chand->max_recv_size); - gpr_slice message = gpr_slice_from_copied_string(message_string); + grpc_error* new_error = grpc_error_set_int( + GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_INVALID_ARGUMENT); + if (error == GRPC_ERROR_NONE) { + error = new_error; + } else { + error = grpc_error_add_child(error, new_error); + GRPC_ERROR_UNREF(new_error); + } gpr_free(message_string); - grpc_call_element_send_close_with_message( - exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); } // Invoke the next callback. grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL); } -// Start transport op. +// Start transport stream op. static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 7e2fd7a3bd..00ace8a7a9 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -40,6 +40,10 @@ #include <grpc/status.h> #include <grpc/support/time.h> +#ifdef __cplusplus +extern "C" { +#endif + /// Opaque representation of an error. /// Errors are refcounted objects that represent the result of an operation. /// Ownership laws: @@ -204,4 +208,8 @@ bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, #define GRPC_LOG_IF_ERROR(what, error) \ grpc_log_if_error((what), (error), __FILE__, __LINE__) +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 48032412a2..edf7b133e9 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -38,7 +38,6 @@ #include <grpc/support/port_platform.h> -#ifdef GRPC_NEED_UDP #ifdef GPR_POSIX_SOCKET #include "src/core/lib/iomgr/udp_server.h" @@ -171,6 +170,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; + /* Call the orphan_cb to signal that the FD is about to be closed and + * should no longer be used. */ GPR_ASSERT(sp->orphan_cb); sp->orphan_cb(sp->emfd); @@ -197,6 +198,12 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, /* shutdown all fd's */ if (s->active_ports) { for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + /* Call the orphan_cb to signal that the FD is about to be closed and + * should no longer be used. */ + GPR_ASSERT(sp->orphan_cb); + sp->orphan_cb(sp->emfd); + grpc_fd_shutdown(exec_ctx, s->ports[i].emfd); } gpr_mu_unlock(&s->mu); @@ -439,4 +446,3 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, } #endif -#endif diff --git a/src/core/lib/surface/byte_buffer.c b/src/core/lib/surface/byte_buffer.c index a093a37af3..054a6e6c58 100644 --- a/src/core/lib/surface/byte_buffer.c +++ b/src/core/lib/surface/byte_buffer.c @@ -72,8 +72,9 @@ grpc_byte_buffer *grpc_raw_byte_buffer_from_reader( grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { switch (bb->type) { case GRPC_BB_RAW: - return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices, - bb->data.raw.slice_buffer.count); + return grpc_raw_compressed_byte_buffer_create( + bb->data.raw.slice_buffer.slices, bb->data.raw.slice_buffer.count, + bb->data.raw.compression); } GPR_UNREACHABLE_CODE(return NULL); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 5690bcab1e..b0f66f4f61 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1103,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } -static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, - bool success) { +static void process_data_after_md(grpc_exec_ctx *exec_ctx, + batch_control *bctl) { grpc_call *call = bctl->call; if (call->receiving_stream == NULL) { *call->receiving_buffer = NULL; @@ -1124,8 +1124,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl); continue_receiving_slices(exec_ctx, bctl); - /* early out */ - return; } } @@ -1133,12 +1131,17 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - + if (error != GRPC_ERROR_NONE) { + grpc_status_code status; + const char *msg; + grpc_error_get_status(error, &status, &msg); + close_with_status(exec_ctx, call, status, msg); + } gpr_mu_lock(&bctl->call->mu); if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { gpr_mu_unlock(&bctl->call->mu); - process_data_after_md(exec_ctx, bctlp, error); + process_data_after_md(exec_ctx, bctlp); } else { call->saved_receiving_stream_ready_bctlp = bctlp; gpr_mu_unlock(&bctl->call->mu); |