diff options
Diffstat (limited to 'src/core/ext')
46 files changed, 2058 insertions, 1301 deletions
diff --git a/src/core/ext/client_channel/subchannel_factory.h b/src/core/ext/census/trace_context.c index 2b67806767..fbb20d3448 100644 --- a/src/core/ext/client_channel/subchannel_factory.h +++ b/src/core/ext/census/trace_context.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,36 +31,56 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H +#include "src/core/ext/census/trace_context.h" -#include "src/core/ext/client_channel/subchannel.h" -#include "src/core/lib/channel/channel_stack.h" +#include <grpc/census.h> +#include <grpc/support/log.h> +#include <stdbool.h> -typedef struct grpc_subchannel_factory grpc_subchannel_factory; -typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable; +#include "third_party/nanopb/pb_decode.h" +#include "third_party/nanopb/pb_encode.h" -/** Constructor for new configured channels. - Creating decorators around this type is encouraged to adapt behavior. */ -struct grpc_subchannel_factory { - const grpc_subchannel_factory_vtable *vtable; -}; +// This function assumes the TraceContext is valid. +size_t encode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, + const size_t buf_size) { + // Create a stream that will write to our buffer. + pb_ostream_t stream = pb_ostream_from_buffer(buffer, buf_size); -struct grpc_subchannel_factory_vtable { - void (*ref)(grpc_subchannel_factory *factory); - void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory); - grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx, - grpc_subchannel_factory *factory, - grpc_subchannel_args *args); -}; + // encode message + bool status = pb_encode(&stream, google_trace_TraceContext_fields, ctxt); -void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory); -void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel_factory *factory); + if (!status) { + gpr_log(GPR_DEBUG, "TraceContext encoding failed: %s", + PB_GET_ERROR(&stream)); + return 0; + } -/** Create a new grpc_subchannel */ -grpc_subchannel *grpc_subchannel_factory_create_subchannel( - grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory, - grpc_subchannel_args *args); + return stream.bytes_written; +} -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */ +bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, + const size_t nbytes) { + // Create a stream that reads nbytes from the buffer. + pb_istream_t stream = pb_istream_from_buffer(buffer, nbytes); + + // decode message + bool status = pb_decode(&stream, google_trace_TraceContext_fields, ctxt); + + if (!status) { + gpr_log(GPR_DEBUG, "TraceContext decoding failed: %s", + PB_GET_ERROR(&stream)); + return false; + } + + // check fields + if (!ctxt->has_trace_id) { + gpr_log(GPR_DEBUG, "Invalid TraceContext: missing trace_id"); + return false; + } + if (!ctxt->has_span_id) { + gpr_log(GPR_DEBUG, "Invalid TraceContext: missing span_id"); + return false; + } + + return true; +} diff --git a/src/core/ext/census/trace_context.h b/src/core/ext/census/trace_context.h new file mode 100644 index 0000000000..ee71fef460 --- /dev/null +++ b/src/core/ext/census/trace_context.h @@ -0,0 +1,68 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Functions for manipulating trace contexts as defined in + src/proto/census/trace.proto */ +#ifndef GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H +#define GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H + +#include "src/core/ext/census/gen/trace_context.pb.h" + +/* Maximum number of bytes required to encode a TraceContext (31) +1 byte for trace_id field +1 byte for trace_id length +1 byte for trace_id.hi field +8 bytes for trace_id.hi (uint64_t) +1 byte for trace_id.lo field +8 bytes for trace_id.lo (uint64_t) +1 byte for span_id field +8 bytes for span_id (uint64_t) +1 byte for is_sampled field +1 byte for is_sampled (bool) */ +#define TRACE_MAX_CONTEXT_SIZE 31 + +/* Encode a trace context (ctxt) into proto format to the buffer provided. The +size of buffer must be at least TRACE_MAX_CONTEXT_SIZE. On success, returns the +number of bytes successfully encoded into buffer. On failure, returns 0. */ +size_t encode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, + const size_t buf_size); + +/* Decode a proto-encoded TraceContext from the provided buffer into the +TraceContext structure (ctxt). The function expects to be supplied the number +of bytes to be read from buffer (nbytes). This function will also validate that +the TraceContext has a span_id and a trace_id, and will return false if either +of these do not exist. On success, returns true and false otherwise. */ +bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer, + const size_t nbytes); + +#endif diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c index 3b5d6dab2b..8f0e12296d 100644 --- a/src/core/ext/census/tracing.c +++ b/src/core/ext/census/tracing.c @@ -31,15 +31,19 @@ * */ +//#include "src/core/ext/census/tracing.h" + #include <grpc/census.h> /* TODO(aveitch): These are all placeholder implementations. */ -int census_trace_mask(const census_context *context) { - return CENSUS_TRACE_MASK_NONE; -} +// int census_trace_mask(const census_context *context) { +// return CENSUS_TRACE_MASK_NONE; +// } + +// void census_set_trace_mask(int trace_mask) {} -void census_set_trace_mask(int trace_mask) {} +// void census_trace_print(census_context *context, uint32_t type, +// const char *buffer, size_t n) {} -void census_trace_print(census_context *context, uint32_t type, - const char *buffer, size_t n) {} +// void SetTracerParams(const Params& params); diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 7e8579de60..77291412a5 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -42,9 +42,11 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/deadline_filter.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -63,6 +65,8 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ bool started_resolving; + /** client channel factory */ + grpc_client_channel_factory *client_channel_factory; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -111,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_cancel_picks( exec_ctx, chand->lb_policy, /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, - /* check= */ 0); + /* check= */ 0, GRPC_ERROR_REF(error)); } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); @@ -173,20 +177,53 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); if (chand->resolver_result != NULL) { - lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result); + grpc_lb_policy_args lb_policy_args; + lb_policy_args.server_name = + grpc_resolver_result_get_server_name(chand->resolver_result); + lb_policy_args.addresses = + grpc_resolver_result_get_addresses(chand->resolver_result); + lb_policy_args.additional_args = + grpc_resolver_result_get_lb_policy_args(chand->resolver_result); + lb_policy_args.client_channel_factory = chand->client_channel_factory; + + // Special case: If all of the addresses are balancer addresses, + // assume that we should use the grpclb policy, regardless of what the + // resolver actually specified. + const char *lb_policy_name = + grpc_resolver_result_get_lb_policy_name(chand->resolver_result); + bool found_backend_address = false; + for (size_t i = 0; i < lb_policy_args.addresses->num_addresses; ++i) { + if (!lb_policy_args.addresses->addresses[i].is_balancer) { + found_backend_address = true; + break; + } + } + if (!found_backend_address) { + if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) { + gpr_log(GPR_INFO, + "resolver requested LB policy %s but provided only balancer " + "addresses, no backend addresses -- forcing use of grpclb LB " + "policy", + (lb_policy_name == NULL ? "(none)" : lb_policy_name)); + } + lb_policy_name = "grpclb"; + } + // Use pick_first if nothing was specified and we didn't select grpclb + // above. + if (lb_policy_name == NULL) lb_policy_name = "pick_first"; + + lb_policy = + grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } - grpc_resolver_result_unref(exec_ctx, chand->resolver_result); + chand->resolver_result = NULL; } - chand->resolver_result = NULL; - if (lb_policy != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, chand->interested_parties); @@ -346,6 +383,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } + if (chand->client_channel_factory != NULL) { + grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); + } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, @@ -377,6 +417,17 @@ typedef enum { for initial metadata before trying to create a call object, and handling cancellation gracefully. */ typedef struct client_channel_call_data { + // State for handling deadlines. + // The code in deadline_filter.c requires this to be the first field. + // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state + // and this struct both independently store a pointer to the call + // stack and each has its own mutex. If/when we have time, find a way + // to avoid this without breaking the grpc_deadline_state abstraction. + grpc_deadline_state deadline_state; + gpr_timespec deadline; + + grpc_error *cancel_error; + /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; @@ -387,13 +438,15 @@ typedef struct client_channel_call_data { grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; - grpc_transport_stream_op *waiting_ops; + grpc_transport_stream_op **waiting_ops; size_t waiting_ops_count; size_t waiting_ops_capacity; grpc_closure next_step; grpc_call_stack *owning_call; + + grpc_linked_mdelem lb_token_mdelem; } call_data; static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { @@ -404,7 +457,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) { gpr_realloc(calld->waiting_ops, calld->waiting_ops_capacity * sizeof(*calld->waiting_ops)); } - calld->waiting_ops[calld->waiting_ops_count++] = *op; + calld->waiting_ops[calld->waiting_ops_count++] = op; GPR_TIMER_END("add_waiting_locked", 0); } @@ -413,14 +466,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, size_t i; for (i = 0; i < calld->waiting_ops_count; i++) { grpc_transport_stream_op_finish_with_failure( - exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error)); + exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error)); } calld->waiting_ops_count = 0; GRPC_ERROR_UNREF(error); } typedef struct { - grpc_transport_stream_op *ops; + grpc_transport_stream_op **ops; size_t nops; grpc_subchannel_call *call; } retry_ops_args; @@ -429,7 +482,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { retry_ops_args *a = args; size_t i; for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]); + grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); } GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); gpr_free(a->ops); @@ -437,6 +490,10 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { } static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { + if (calld->waiting_ops_count == 0) { + return; + } + retry_ops_args *a = gpr_malloc(sizeof(*a)); a->ops = calld->waiting_ops; a->nops = calld->waiting_ops_count; @@ -465,7 +522,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, gpr_atm_no_barrier_store(&calld->subchannel_call, 1); fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( "Failed to create subchannel", &error, 1)); - } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) { + } else if (GET_CALL(calld) == CANCELLED_CALL) { /* already cancelled before subchannel became ready */ fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING( @@ -473,7 +530,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, } else { grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); @@ -511,7 +568,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready); + grpc_closure *on_ready, grpc_error *error); static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -522,7 +579,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready)) { + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); @@ -532,7 +590,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready) { + grpc_closure *on_ready, grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -546,29 +604,35 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, - connected_subchannel); + connected_subchannel, GRPC_ERROR_REF(error)); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = closure->next_data.next) { cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE("Pick cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); } } gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); + GRPC_ERROR_UNREF(error); return true; } + GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { grpc_lb_policy *lb_policy = chand->lb_policy; int r; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); gpr_mu_unlock(&chand->mu); - r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent, - initial_metadata, initial_metadata_flags, - connected_subchannel, on_ready); + // TODO(dgq): make this deadline configurable somehow. + const grpc_lb_policy_pick_args inputs = { + calld->pollent, initial_metadata, initial_metadata_flags, + &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)}; + r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel, + NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); return r; @@ -609,12 +673,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); /* try to (atomically) get the call */ grpc_subchannel_call *call = GET_CALL(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -630,8 +695,8 @@ retry: call = GET_CALL(calld); if (call == CANCELLED_CALL) { gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -647,18 +712,24 @@ retry: (gpr_atm)(uintptr_t)CANCELLED_CALL)) { goto retry; } else { + // Stash a copy of cancel_error in our call data, so that we can use + // it for subsequent operations. This ensures that if the call is + // cancelled before any ops are passed down (e.g., if the deadline + // is in the past when the call starts), we can return the right + // error to the caller when the first op does get passed down. + calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL); + NULL, GRPC_ERROR_REF(op->cancel_error)); break; } gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -672,7 +743,8 @@ retry: GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step)) { + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -682,7 +754,7 @@ retry: calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; @@ -705,6 +777,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; + grpc_deadline_state_init(exec_ctx, elem, args); + calld->deadline = args->deadline; + calld->cancel_error = GRPC_ERROR_NONE; gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); calld->connected_subchannel = NULL; @@ -723,6 +798,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_final_info *final_info, void *and_free_memory) { call_data *calld = elem->call_data; + grpc_deadline_state_destroy(exec_ctx, elem); + GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); @@ -759,10 +836,12 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver) { +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory) { /* post construction initialization: set the transport setup pointer */ + GPR_ASSERT(client_channel_factory != NULL); grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu); @@ -776,6 +855,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result, &chand->on_resolver_result_changed); } + chand->client_channel_factory = client_channel_factory; + grpc_client_channel_factory_ref(client_channel_factory); gpr_mu_unlock(&chand->mu); } diff --git a/src/core/ext/client_channel/client_channel.h b/src/core/ext/client_channel/client_channel.h index 5dea45de43..ab5a84fdfb 100644 --- a/src/core/ext/client_channel/client_channel.h +++ b/src/core/ext/client_channel/client_channel.h @@ -31,9 +31,10 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H +#include "src/core/ext/client_channel/client_channel_factory.h" #include "src/core/ext/client_channel/resolver.h" #include "src/core/lib/channel/channel_stack.h" @@ -46,12 +47,12 @@ extern const grpc_channel_filter grpc_client_channel_filter; -/* post-construction initializer to let the client channel know which - transport setup it should cancel upon destruction, or initiate when it needs - a connection */ -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver); +/* Post-construction initializer to give the client channel its resolver + and factory. */ +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory); grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); @@ -60,4 +61,4 @@ void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/ext/client_channel/client_channel_factory.h b/src/core/ext/client_channel/client_channel_factory.h index cf6724a24b..28828c2eb6 100644 --- a/src/core/ext/client_channel/client_channel_factory.h +++ b/src/core/ext/client_channel/client_channel_factory.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H #include <grpc/impl/codegen/grpc_types.h> @@ -82,4 +82,4 @@ grpc_channel *grpc_client_channel_factory_create_channel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory, const char *target, grpc_client_channel_type type, grpc_channel_args *args); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_FACTORY_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */ diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 954f0db3bb..a3e5079843 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -43,10 +43,6 @@ #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" -#ifndef GRPC_DEFAULT_NAME_PREFIX -#define GRPC_DEFAULT_NAME_PREFIX "dns:///" -#endif - static bool append_filter(grpc_channel_stack_builder *builder, void *arg) { return grpc_channel_stack_builder_append_filter( builder, (const grpc_channel_filter *)arg, NULL, NULL); @@ -79,7 +75,7 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder *builder, void grpc_client_channel_init(void) { grpc_lb_policy_registry_init(); - grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX); + grpc_resolver_registry_init(); grpc_subchannel_index_init(); grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN, set_default_host_if_unset, NULL); diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h index ea9d23706e..e08244b2c0 100644 --- a/src/core/ext/client_channel/connector.h +++ b/src/core/ext/client_channel/connector.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -89,4 +89,4 @@ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector, void grpc_connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *connector); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_CONNECTOR_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H */ diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c new file mode 100644 index 0000000000..fda1df173e --- /dev/null +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -0,0 +1,275 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_config/http_connect_handshaker.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/client_config/uri_parser.h" +#include "src/core/lib/http/format_request.h" +#include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/env.h" + +typedef struct http_connect_handshaker { + // Base class. Must be first. + grpc_handshaker base; + + char* proxy_server; + char* server_name; + + // State saved while performing the handshake. + grpc_endpoint* endpoint; + grpc_channel_args* args; + grpc_handshaker_done_cb cb; + void* user_data; + + // Objects for processing the HTTP CONNECT request and response. + gpr_slice_buffer write_buffer; + gpr_slice_buffer* read_buffer; // Ownership passes through this object. + grpc_closure request_done_closure; + grpc_closure response_read_closure; + grpc_http_parser http_parser; + grpc_http_response http_response; + grpc_timer timeout_timer; + + gpr_refcount refcount; +} http_connect_handshaker; + +// Unref and clean up handshaker. +static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { + if (gpr_unref(&handshaker->refcount)) { + gpr_free(handshaker->proxy_server); + gpr_free(handshaker->server_name); + gpr_slice_buffer_destroy(&handshaker->write_buffer); + grpc_http_parser_destroy(&handshaker->http_parser); + grpc_http_response_destroy(&handshaker->http_response); + gpr_free(handshaker); + } +} + +// Callback invoked when deadline is exceeded. +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. + grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint); + } + http_connect_handshaker_unref(handshaker); +} + +// Callback invoked when finished writing HTTP CONNECT request. +static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error != GRPC_ERROR_NONE) { + // If the write failed, invoke the callback immediately with the error. + handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, + handshaker->read_buffer, handshaker->user_data, + GRPC_ERROR_REF(error)); + } else { + // Otherwise, read the response. + grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + &handshaker->response_read_closure); + } +} + +// Callback invoked for reading HTTP CONNECT response. +static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + http_connect_handshaker* handshaker = arg; + if (error != GRPC_ERROR_NONE) { + GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. + goto done; + } + // Add buffer to parser. + for (size_t i = 0; i < handshaker->read_buffer->count; ++i) { + if (GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i]) > 0) { + size_t body_start_offset = 0; + error = grpc_http_parser_parse(&handshaker->http_parser, + handshaker->read_buffer->slices[i], + &body_start_offset); + if (error != GRPC_ERROR_NONE) goto done; + if (handshaker->http_parser.state == GRPC_HTTP_BODY) { + // We've gotten back a successul response, so stop the timeout timer. + grpc_timer_cancel(exec_ctx, &handshaker->timeout_timer); + // Remove the data we've already read from the read buffer, + // leaving only the leftover bytes (if any). + gpr_slice_buffer tmp_buffer; + gpr_slice_buffer_init(&tmp_buffer); + if (body_start_offset < + GPR_SLICE_LENGTH(handshaker->read_buffer->slices[i])) { + gpr_slice_buffer_add( + &tmp_buffer, + gpr_slice_split_tail(&handshaker->read_buffer->slices[i], + body_start_offset)); + } + gpr_slice_buffer_addn(&tmp_buffer, + &handshaker->read_buffer->slices[i + 1], + handshaker->read_buffer->count - i - 1); + gpr_slice_buffer_swap(handshaker->read_buffer, &tmp_buffer); + gpr_slice_buffer_destroy(&tmp_buffer); + break; + } + } + } + // If we're not done reading the response, read more data. + // TODO(roth): In practice, I suspect that the response to a CONNECT + // request will never include a body, in which case this check is + // sufficient. However, the language of RFC-2817 doesn't explicitly + // forbid the response from including a body. If there is a body, + // it's possible that we might have parsed part but not all of the + // body, in which case this check will cause us to fail to parse the + // remainder of the body. If that ever becomes an issue, we may + // need to fix the HTTP parser to understand when the body is + // complete (e.g., handling chunked transfer encoding or looking + // at the Content-Length: header). + if (handshaker->http_parser.state != GRPC_HTTP_BODY) { + gpr_slice_buffer_reset_and_unref(handshaker->read_buffer); + grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, + &handshaker->response_read_closure); + return; + } + // Make sure we got a 2xx response. + if (handshaker->http_response.status < 200 || + handshaker->http_response.status >= 300) { + char* msg; + gpr_asprintf(&msg, "HTTP proxy returned response code %d", + handshaker->http_response.status); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + } +done: + // Invoke handshake-done callback. + handshaker->cb(exec_ctx, handshaker->endpoint, handshaker->args, + handshaker->read_buffer, handshaker->user_data, error); +} + +// +// Public handshaker methods +// + +static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker_in) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + http_connect_handshaker_unref(handshaker); +} + +static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) {} + +static void http_connect_handshaker_do_handshake( + grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in, + grpc_endpoint* endpoint, grpc_channel_args* args, + gpr_slice_buffer* read_buffer, gpr_timespec deadline, + grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, + void* user_data) { + http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; + // Save state in the handshaker object. + handshaker->endpoint = endpoint; + handshaker->args = args; + handshaker->cb = cb; + handshaker->user_data = user_data; + handshaker->read_buffer = read_buffer; + // Send HTTP CONNECT request. + gpr_log(GPR_INFO, "Connecting to server %s via HTTP proxy %s", + handshaker->server_name, handshaker->proxy_server); + grpc_httpcli_request request; + memset(&request, 0, sizeof(request)); + request.host = handshaker->proxy_server; + request.http.method = "CONNECT"; + request.http.path = handshaker->server_name; + request.handshaker = &grpc_httpcli_plaintext; + gpr_slice request_slice = grpc_httpcli_format_connect_request(&request); + gpr_slice_buffer_add(&handshaker->write_buffer, request_slice); + grpc_endpoint_write(exec_ctx, endpoint, &handshaker->write_buffer, + &handshaker->request_done_closure); + // Set timeout timer. The timer gets a reference to the handshaker. + gpr_ref(&handshaker->refcount); + grpc_timer_init(exec_ctx, &handshaker->timeout_timer, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + on_timeout, handshaker, gpr_now(GPR_CLOCK_MONOTONIC)); +} + +static const struct grpc_handshaker_vtable http_connect_handshaker_vtable = { + http_connect_handshaker_destroy, http_connect_handshaker_shutdown, + http_connect_handshaker_do_handshake}; + +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + const char* server_name) { + GPR_ASSERT(proxy_server != NULL); + GPR_ASSERT(server_name != NULL); + http_connect_handshaker* handshaker = + gpr_malloc(sizeof(http_connect_handshaker)); + memset(handshaker, 0, sizeof(*handshaker)); + grpc_handshaker_init(&http_connect_handshaker_vtable, &handshaker->base); + handshaker->proxy_server = gpr_strdup(proxy_server); + handshaker->server_name = gpr_strdup(server_name); + gpr_slice_buffer_init(&handshaker->write_buffer); + grpc_closure_init(&handshaker->request_done_closure, on_write_done, + handshaker); + grpc_closure_init(&handshaker->response_read_closure, on_read_done, + handshaker); + grpc_http_parser_init(&handshaker->http_parser, GRPC_HTTP_RESPONSE, + &handshaker->http_response); + gpr_ref_init(&handshaker->refcount, 1); + return &handshaker->base; +} + +char* grpc_get_http_proxy_server() { + char* uri_str = gpr_getenv("http_proxy"); + if (uri_str == NULL) return NULL; + grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */); + char* proxy_name = NULL; + if (uri == NULL || uri->authority == NULL) { + gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var"); + goto done; + } + if (strcmp(uri->scheme, "http") != 0) { + gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme); + goto done; + } + if (strchr(uri->authority, '@') != NULL) { + gpr_log(GPR_ERROR, "userinfo not supported in proxy URI"); + goto done; + } + proxy_name = gpr_strdup(uri->authority); +done: + gpr_free(uri_str); + grpc_uri_destroy(uri); + return proxy_name; +} diff --git a/src/core/ext/client_channel/subchannel_factory.c b/src/core/ext/client_channel/http_connect_handshaker.h index bfb979159b..c689df2b2b 100644 --- a/src/core/ext/client_channel/subchannel_factory.c +++ b/src/core/ext/client_channel/http_connect_handshaker.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,19 +31,17 @@ * */ -#include "src/core/ext/client_channel/subchannel_factory.h" +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H -void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) { - factory->vtable->ref(factory); -} +#include "src/core/lib/channel/handshaker.h" -void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx, - grpc_subchannel_factory* factory) { - factory->vtable->unref(exec_ctx, factory); -} +/// Does NOT take ownership of \a proxy_server or \a server_name. +grpc_handshaker* grpc_http_connect_handshaker_create(const char* proxy_server, + const char* server_name); -grpc_subchannel* grpc_subchannel_factory_create_subchannel( - grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory, - grpc_subchannel_args* args) { - return factory->vtable->create_subchannel(exec_ctx, factory, args); -} +/// Returns the name of the proxy to use, or NULL if no proxy is configured. +/// Caller takes ownership of result. +char* grpc_get_http_proxy_server(); + +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_CONNECT_HANDSHAKER_H */ diff --git a/src/core/ext/client_channel/initial_connect_string.h b/src/core/ext/client_channel/initial_connect_string.h index 06f0767832..39f6465fc9 100644 --- a/src/core/ext/client_channel/initial_connect_string.h +++ b/src/core/ext/client_channel/initial_connect_string.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H #include <grpc/support/slice.h> #include "src/core/lib/iomgr/sockaddr.h" @@ -47,4 +47,4 @@ void grpc_test_set_initial_connect_string_function( void grpc_set_initial_connect_string(struct sockaddr **addr, size_t *addr_len, gpr_slice *connect_string); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_INITIAL_CONNECT_STRING_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H */ diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c index 9ca895ec6c..45ee72e2f0 100644 --- a/src/core/ext/client_channel/lb_policy.c +++ b/src/core/ext/client_channel/lb_policy.c @@ -100,26 +100,26 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, } int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { - return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata, - initial_metadata_flags, target, on_complete); + return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data, + on_complete); } void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target) { - policy->vtable->cancel_pick(exec_ctx, policy, target); + grpc_connected_subchannel **target, + grpc_error *error) { + policy->vtable->cancel_pick(exec_ctx, policy, target, error); } void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask, - initial_metadata_flags_eq); + initial_metadata_flags_eq, error); } void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h index a5838cdb9f..cc24a62d9d 100644 --- a/src/core/ext/client_channel/lb_policy.h +++ b/src/core/ext/client_channel/lb_policy.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/iomgr/polling_entity.h" @@ -53,23 +53,42 @@ struct grpc_lb_policy { grpc_pollset_set *interested_parties; }; +/** Extra arguments for an LB pick */ +typedef struct grpc_lb_policy_pick_args { + /** Parties interested in the pick's progress */ + grpc_polling_entity *pollent; + /** Initial metadata associated with the picking call. */ + grpc_metadata_batch *initial_metadata; + /** Bitmask used for selective cancelling. See \a + * grpc_lb_policy_cancel_picks() and \a GRPC_INITIAL_METADATA_* in + * grpc_types.h */ + uint32_t initial_metadata_flags; + /** Storage for LB token in \a initial_metadata, or NULL if not used */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /** Deadline for the call to the LB server */ + gpr_timespec deadline; +} grpc_lb_policy_pick_args; + struct grpc_lb_policy_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** implement grpc_lb_policy_pick */ + /** \see grpc_lb_policy_pick */ int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, grpc_closure *on_complete); + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete); + + /** \see grpc_lb_policy_cancel_pick */ void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target); + grpc_connected_subchannel **target, grpc_error *error); + + /** \see grpc_lb_policy_cancel_picks */ void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq); + uint32_t initial_metadata_flags_eq, grpc_error *error); + /** \see grpc_lb_policy_ping_one */ void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure); @@ -83,8 +102,7 @@ struct grpc_lb_policy_vtable { /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy. Calling with a NULL \a - state cancels the subscription. - */ + state cancels the subscription. */ void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connectivity_state *state, @@ -124,30 +142,40 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); -/** Given initial metadata in \a initial_metadata, find an appropriate - target for this rpc, and 'return' it by calling \a on_complete after setting - \a target. - Picking can be asynchronous. Any IO should be done under \a pollent. */ +/** Find an appropriate target for this call, based on \a pick_args. + Picking can be synchronous or asynchronous. In the synchronous case, when a + pick is readily available, it'll be returned in \a target and a non-zero + value will be returned. + In the asynchronous case, zero is returned and \a on_complete will be called + once \a target and \a user_data have been set. Any IO should be done under + \a pick_args->pollent. The opaque \a user_data output argument corresponds + to information that may need be propagated from the LB policy. It may be + NULL. Errors are signaled by receiving a NULL \a *target. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete); +/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) + against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure); +/** Cancel picks for \a target. + The \a on_complete callback of the pending picks will be invoked with \a + *target set to NULL. */ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target); + grpc_connected_subchannel **target, + grpc_error *error); -/** Cancel all pending picks which have: - (initial_metadata_flags & initial_metadata_flags_mask) == - initial_metadata_flags_eq */ +/** Cancel all pending picks for which their \a initial_metadata_flags (as given + in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq + when AND'd with \a initial_metadata_flags_mask */ void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq); + uint32_t initial_metadata_flags_eq, + grpc_error *error); /** Try to enter a READY connectivity state */ void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -163,4 +191,4 @@ grpc_connectivity_state grpc_lb_policy_check_connectivity( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_H */ diff --git a/src/core/ext/client_channel/lb_policy_factory.c b/src/core/ext/client_channel/lb_policy_factory.c index 56deac7d3d..45753e6942 100644 --- a/src/core/ext/client_channel/lb_policy_factory.c +++ b/src/core/ext/client_channel/lb_policy_factory.c @@ -31,8 +31,66 @@ * */ +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + #include "src/core/ext/client_channel/lb_policy_factory.h" +grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) { + grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses)); + addresses->num_addresses = num_addresses; + const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses; + addresses->addresses = gpr_malloc(addresses_size); + memset(addresses->addresses, 0, addresses_size); + return addresses; +} + +grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses, + void* (*user_data_copy)(void*)) { + grpc_lb_addresses* new_addresses = + grpc_lb_addresses_create(addresses->num_addresses); + memcpy(new_addresses->addresses, addresses->addresses, + sizeof(grpc_lb_address) * addresses->num_addresses); + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (new_addresses->addresses[i].balancer_name != NULL) { + new_addresses->addresses[i].balancer_name = + gpr_strdup(new_addresses->addresses[i].balancer_name); + } + if (user_data_copy != NULL) { + new_addresses->addresses[i].user_data = + user_data_copy(new_addresses->addresses[i].user_data); + } + } + return new_addresses; +} + +void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, + void* address, size_t address_len, + bool is_balancer, char* balancer_name, + void* user_data) { + GPR_ASSERT(index < addresses->num_addresses); + grpc_lb_address* target = &addresses->addresses[index]; + memcpy(target->address.addr, address, address_len); + target->address.len = address_len; + target->is_balancer = is_balancer; + target->balancer_name = balancer_name; + target->user_data = user_data; +} + +void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses, + void (*user_data_destroy)(void*)) { + for (size_t i = 0; i < addresses->num_addresses; ++i) { + gpr_free(addresses->addresses[i].balancer_name); + if (user_data_destroy != NULL) { + user_data_destroy(addresses->addresses[i].user_data); + } + } + gpr_free(addresses->addresses); + gpr_free(addresses); +} + void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) { factory->vtable->ref(factory); } diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h index eedade191d..3e53972445 100644 --- a/src/core/ext/client_channel/lb_policy_factory.h +++ b/src/core/ext/client_channel/lb_policy_factory.h @@ -31,14 +31,14 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H #include "src/core/ext/client_channel/client_channel_factory.h" #include "src/core/ext/client_channel/lb_policy.h" -#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/resolve_address.h" typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; @@ -47,9 +47,54 @@ struct grpc_lb_policy_factory { const grpc_lb_policy_factory_vtable *vtable; }; +/** A resolved address alongside any LB related information associated with it. + * \a user_data, if not NULL, contains opaque data meant to be consumed by the + * gRPC LB policy. Note that no all LB policies support \a user_data as input. + * Those who don't will simply ignore it and will correspondingly return NULL in + * their namesake pick() output argument. */ +typedef struct grpc_lb_address { + grpc_resolved_address address; + bool is_balancer; + char *balancer_name; /* For secure naming. */ + void *user_data; +} grpc_lb_address; + +typedef struct grpc_lb_addresses { + size_t num_addresses; + grpc_lb_address *addresses; +} grpc_lb_addresses; + +/** Returns a grpc_addresses struct with enough space for + * \a num_addresses addresses. */ +grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses); + +/** Creates a copy of \a addresses. If \a user_data_copy is not NULL, + * it will be invoked to copy the \a user_data field of each address. */ +grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses, + void *(*user_data_copy)(void *)); + +/** Sets the value of the address at index \a index of \a addresses. + * \a address is a socket address of length \a address_len. + * Takes ownership of \a balancer_name. */ +void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index, + void *address, size_t address_len, + bool is_balancer, char *balancer_name, + void *user_data); + +/** Destroys \a addresses. If \a user_data_destroy is not NULL, it will + * be invoked to destroy the \a user_data field of each address. */ +void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses, + void (*user_data_destroy)(void *)); + +/** Arguments passed to LB policies. */ +/* TODO(roth, ctiller): Consider replacing this struct with + grpc_channel_args. See comment in resolver_result.h for details. */ typedef struct grpc_lb_policy_args { - grpc_resolved_addresses *addresses; + const char *server_name; + grpc_lb_addresses *addresses; grpc_client_channel_factory *client_channel_factory; + /* Can be used to pass implementation-specific parameters to the LB policy. */ + grpc_channel_args *additional_args; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { @@ -73,4 +118,4 @@ grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_FACTORY_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_FACTORY_H */ diff --git a/src/core/ext/client_channel/lb_policy_registry.h b/src/core/ext/client_channel/lb_policy_registry.h index 301861553a..21c468e021 100644 --- a/src/core/ext/client_channel/lb_policy_registry.h +++ b/src/core/ext/client_channel/lb_policy_registry.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H #include "src/core/ext/client_channel/lb_policy_factory.h" #include "src/core/lib/iomgr/exec_ctx.h" @@ -52,4 +52,4 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory); grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name, grpc_lb_policy_args *args); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_LB_POLICY_REGISTRY_H */ diff --git a/src/core/ext/client_channel/parse_address.h b/src/core/ext/client_channel/parse_address.h index 1504d3916d..3aa6c41546 100644 --- a/src/core/ext/client_channel/parse_address.h +++ b/src/core/ext/client_channel/parse_address.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_PARSE_ADDRESS_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_PARSE_ADDRESS_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H #include <stddef.h> @@ -53,4 +53,4 @@ int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); * host:port pair. Returns true upon success. */ int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_PARSE_ADDRESS_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H */ diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h index 017c55bb80..0951613175 100644 --- a/src/core/ext/client_channel/resolver.h +++ b/src/core/ext/client_channel/resolver.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H #include "src/core/ext/client_channel/resolver_result.h" #include "src/core/ext/client_channel/subchannel.h" @@ -91,4 +91,4 @@ void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, grpc_resolver_result **result, grpc_closure *on_complete); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */ diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h index 1068acfdd4..1ad018ce1f 100644 --- a/src/core/ext/client_channel/resolver_factory.h +++ b/src/core/ext/client_channel/resolver_factory.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H #include "src/core/ext/client_channel/client_channel_factory.h" #include "src/core/ext/client_channel/resolver.h" @@ -47,10 +47,7 @@ struct grpc_resolver_factory { const grpc_resolver_factory_vtable *vtable; }; -typedef struct grpc_resolver_args { - grpc_uri *uri; - grpc_client_channel_factory *client_channel_factory; -} grpc_resolver_args; +typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args; struct grpc_resolver_factory_vtable { void (*ref)(grpc_resolver_factory *factory); @@ -79,4 +76,4 @@ grpc_resolver *grpc_resolver_factory_create_resolver( char *grpc_resolver_factory_get_default_authority( grpc_resolver_factory *factory, grpc_uri *uri); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_FACTORY_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_FACTORY_H */ diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index 5827819efb..ee43329874 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -40,22 +40,20 @@ #include <grpc/support/string_util.h> #define MAX_RESOLVERS 10 +#define DEFAULT_RESOLVER_PREFIX_MAX_LENGTH 32 static grpc_resolver_factory *g_all_of_the_resolvers[MAX_RESOLVERS]; static int g_number_of_resolvers = 0; -static char *g_default_resolver_prefix; +static char g_default_resolver_prefix[DEFAULT_RESOLVER_PREFIX_MAX_LENGTH] = + "dns:///"; -void grpc_resolver_registry_init(const char *default_resolver_prefix) { - g_default_resolver_prefix = gpr_strdup(default_resolver_prefix); -} +void grpc_resolver_registry_init() {} void grpc_resolver_registry_shutdown(void) { - int i; - for (i = 0; i < g_number_of_resolvers; i++) { + for (int i = 0; i < g_number_of_resolvers; i++) { grpc_resolver_factory_unref(g_all_of_the_resolvers[i]); } - gpr_free(g_default_resolver_prefix); // FIXME(ctiller): this should live in grpc_resolver_registry_init, // however that would have the client_channel plugin call this AFTER we start // registering resolvers from third party plugins, and so they'd never show @@ -65,6 +63,17 @@ void grpc_resolver_registry_shutdown(void) { g_number_of_resolvers = 0; } +void grpc_resolver_registry_set_default_prefix( + const char *default_resolver_prefix) { + const size_t len = strlen(default_resolver_prefix); + GPR_ASSERT(len < DEFAULT_RESOLVER_PREFIX_MAX_LENGTH && + "default resolver prefix too long"); + GPR_ASSERT(len > 0 && "default resolver prefix can't be empty"); + // By the previous assert, default_resolver_prefix is safe to be copied with a + // plain strcpy. + strcpy(g_default_resolver_prefix, default_resolver_prefix); +} + void grpc_register_resolver_type(grpc_resolver_factory *factory) { int i; for (i = 0; i < g_number_of_resolvers; i++) { @@ -108,35 +117,27 @@ static grpc_resolver_factory *resolve_factory(const char *target, *uri = grpc_uri_parse(target, 1); factory = lookup_factory_by_uri(*uri); if (factory == NULL) { - if (g_default_resolver_prefix != NULL) { - grpc_uri_destroy(*uri); - gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target); - *uri = grpc_uri_parse(tmp, 1); - factory = lookup_factory_by_uri(*uri); - if (factory == NULL) { - grpc_uri_destroy(grpc_uri_parse(target, 0)); - grpc_uri_destroy(grpc_uri_parse(tmp, 0)); - gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, - tmp); - } - gpr_free(tmp); - } else { + grpc_uri_destroy(*uri); + gpr_asprintf(&tmp, "%s%s", g_default_resolver_prefix, target); + *uri = grpc_uri_parse(tmp, 1); + factory = lookup_factory_by_uri(*uri); + if (factory == NULL) { grpc_uri_destroy(grpc_uri_parse(target, 0)); - gpr_log(GPR_ERROR, "don't know how to resolve '%s'", target); + grpc_uri_destroy(grpc_uri_parse(tmp, 0)); + gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, tmp); } + gpr_free(tmp); } return factory; } -grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory) { +grpc_resolver *grpc_resolver_create(const char *target) { grpc_uri *uri = NULL; grpc_resolver_factory *factory = resolve_factory(target, &uri); grpc_resolver *resolver; grpc_resolver_args args; memset(&args, 0, sizeof(args)); args.uri = uri; - args.client_channel_factory = client_channel_factory; resolver = grpc_resolver_factory_create_resolver(factory, &args); grpc_uri_destroy(uri); return resolver; diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index cdf5dff8ba..0a39d4a187 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -31,14 +31,17 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H #include "src/core/ext/client_channel/resolver_factory.h" -void grpc_resolver_registry_init(const char *default_prefix); +void grpc_resolver_registry_init(); void grpc_resolver_registry_shutdown(void); +/** Set the default URI prefix to \a default_prefix. */ +void grpc_resolver_registry_set_default_prefix(const char *default_prefix); + /** Register a resolver type. URI's of \a scheme will be resolved with the given resolver. If \a priority is greater than zero, then the resolver will be eligible @@ -55,8 +58,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ -grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory); +grpc_resolver *grpc_resolver_create(const char *target); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ @@ -66,4 +68,4 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name); representing the default authority to pass from a client. */ char *grpc_get_default_authority(const char *target); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_REGISTRY_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */ diff --git a/src/core/ext/client_channel/resolver_result.c b/src/core/ext/client_channel/resolver_result.c index 3a4162b0ff..ab1c215955 100644 --- a/src/core/ext/client_channel/resolver_result.c +++ b/src/core/ext/client_channel/resolver_result.c @@ -1,75 +1,94 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ +// +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// #include "src/core/ext/client_channel/resolver_result.h" #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/channel/channel_args.h" struct grpc_resolver_result { gpr_refcount refs; - grpc_lb_policy *lb_policy; + char* server_name; + grpc_lb_addresses* addresses; + char* lb_policy_name; + grpc_channel_args* lb_policy_args; }; -grpc_resolver_result *grpc_resolver_result_create() { - grpc_resolver_result *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - gpr_ref_init(&c->refs, 1); - return c; +grpc_resolver_result* grpc_resolver_result_create( + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args) { + grpc_resolver_result* result = gpr_malloc(sizeof(*result)); + memset(result, 0, sizeof(*result)); + gpr_ref_init(&result->refs, 1); + result->server_name = gpr_strdup(server_name); + result->addresses = addresses; + result->lb_policy_name = gpr_strdup(lb_policy_name); + result->lb_policy_args = lb_policy_args; + return result; } -void grpc_resolver_result_ref(grpc_resolver_result *c) { gpr_ref(&c->refs); } +void grpc_resolver_result_ref(grpc_resolver_result* result) { + gpr_ref(&result->refs); +} -void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx, - grpc_resolver_result *c) { - if (gpr_unref(&c->refs)) { - if (c->lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "resolver_result"); - } - gpr_free(c); +void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, + grpc_resolver_result* result) { + if (gpr_unref(&result->refs)) { + gpr_free(result->server_name); + grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */); + gpr_free(result->lb_policy_name); + grpc_channel_args_destroy(result->lb_policy_args); + gpr_free(result); } } -void grpc_resolver_result_set_lb_policy(grpc_resolver_result *c, - grpc_lb_policy *lb_policy) { - GPR_ASSERT(c->lb_policy == NULL); - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "resolver_result"); - } - c->lb_policy = lb_policy; +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result) { + return result->server_name; +} + +grpc_lb_addresses* grpc_resolver_result_get_addresses( + grpc_resolver_result* result) { + return result->addresses; +} + +const char* grpc_resolver_result_get_lb_policy_name( + grpc_resolver_result* result) { + return result->lb_policy_name; } -grpc_lb_policy *grpc_resolver_result_get_lb_policy(grpc_resolver_result *c) { - return c->lb_policy; +grpc_channel_args* grpc_resolver_result_get_lb_policy_args( + grpc_resolver_result* result) { + return result->lb_policy_args; } diff --git a/src/core/ext/client_channel/resolver_result.h b/src/core/ext/client_channel/resolver_result.h index 258ac7c36f..7107d2ed59 100644 --- a/src/core/ext/client_channel/resolver_result.h +++ b/src/core/ext/client_channel/resolver_result.h @@ -1,52 +1,73 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H - -#include "src/core/ext/client_channel/lb_policy.h" - -/** Results reported from a grpc_resolver. */ +// +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_RESULT_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_RESULT_H + +#include "src/core/ext/client_channel/lb_policy_factory.h" + +// TODO(roth, ctiller): In the long term, we are considering replacing +// the resolver_result data structure with grpc_channel_args. The idea is +// that the resolver will return a set of channel args that contains the +// information that is currently in the resolver_result struct. For +// example, there will be specific args indicating the set of addresses +// and the name of the LB policy to instantiate. Note that if we did +// this, we would probably want to change the data structure of +// grpc_channel_args such to a hash table or AVL or some other data +// structure that does not require linear search to find keys. + +/// Results reported from a grpc_resolver. typedef struct grpc_resolver_result grpc_resolver_result; -grpc_resolver_result *grpc_resolver_result_create(); -void grpc_resolver_result_ref(grpc_resolver_result *client_config); -void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx, - grpc_resolver_result *client_config); +/// Takes ownership of \a addresses and \a lb_policy_args. +grpc_resolver_result* grpc_resolver_result_create( + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args); +void grpc_resolver_result_ref(grpc_resolver_result* result); +void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, + grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +grpc_lb_addresses* grpc_resolver_result_get_addresses( + grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_lb_policy_name( + grpc_resolver_result* result); -void grpc_resolver_result_set_lb_policy(grpc_resolver_result *client_config, - grpc_lb_policy *lb_policy); -grpc_lb_policy *grpc_resolver_result_get_lb_policy( - grpc_resolver_result *client_config); +/// Caller does NOT take ownership of result. +grpc_channel_args* grpc_resolver_result_get_lb_policy_args( + grpc_resolver_result* result); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_RESULT_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index c97ff39e40..ad229e045b 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -33,6 +33,7 @@ #include "src/core/ext/client_channel/subchannel.h" +#include <limits.h> #include <string.h> #include <grpc/support/alloc.h> @@ -219,8 +220,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, - old_val + delta, reason); + "SUBCHANNEL: %p %12s 0x%08d -> 0x%08d [%s]", c, purpose, (int)old_val, + (int)(old_val + delta), reason); #endif return old_val; } @@ -331,41 +332,40 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - gpr_backoff_init(&c->backoff_state, - GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_SUBCHANNEL_RECONNECT_JITTER, - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + int initial_backoff_ms = + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; + int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + bool fixed_reconnect_backoff = false; if (c->args) { for (size_t i = 0; i < c->args->num_args; i++) { if (0 == strcmp(c->args->args[i].key, "grpc.testing.fixed_reconnect_backoff")) { GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); - gpr_backoff_init(&c->backoff_state, 1.0, 0.0, - c->args->args[i].value.integer, - c->args->args[i].value.integer); - } - if (0 == - strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { - if (c->args->args[i].type == GRPC_ARG_INTEGER) { - if (c->args->args[i].value.integer >= 0) { - gpr_backoff_init( - &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_SUBCHANNEL_RECONNECT_JITTER, - GPR_MIN(c->args->args[i].value.integer, - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000), - c->args->args[i].value.integer); - } else { - gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS - " : must be non-negative"); - } - } else { - gpr_log(GPR_ERROR, - GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer"); - } + fixed_reconnect_backoff = true; + initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); + } else if (0 == strcmp(c->args->args[i].key, + GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + max_backoff_ms = grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){max_backoff_ms, 100, INT_MAX}); + } else if (0 == strcmp(c->args->args[i].key, + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + initial_backoff_ms = grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); } } } + gpr_backoff_init( + &c->backoff_state, + fixed_reconnect_backoff ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, + fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, + initial_backoff_ms, max_backoff_ms); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -504,14 +504,13 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *closure) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; - memset(&op, 0, sizeof(op)); - op.connectivity_state = state; - op.on_connectivity_state_change = closure; - op.bind_pollset_set = interested_parties; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); + elem->filter->start_transport_op(exec_ctx, elem, op); } void grpc_connected_subchannel_notify_on_state_change( @@ -525,12 +524,11 @@ void grpc_connected_subchannel_notify_on_state_change( void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) { - grpc_transport_op op; + grpc_transport_op *op = grpc_make_transport_op(NULL); grpc_channel_element *elem; - memset(&op, 0, sizeof(op)); - op.send_ping = closure; + op->send_ping = closure; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); - elem->filter->start_transport_op(exec_ctx, elem, &op); + elem->filter->start_transport_op(exec_ctx, elem, op); } static void publish_transport_locked(grpc_exec_ctx *exec_ctx, @@ -635,7 +633,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, c->have_alarm = 1; grpc_connectivity_state_set( exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), + grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); const char *errmsg = grpc_error_string(error); @@ -704,14 +704,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_subchannel_call **call) { + grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. grpc_error *error = grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, callstk); + NULL, NULL, deadline, callstk); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index b4183b459f..2ba35c7f1b 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H #include "src/core/ext/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" @@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call */ grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call); + grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( @@ -162,6 +163,8 @@ struct grpc_subchannel_args { size_t filter_count; /** Channel arguments to be supplied to the newly created channel */ const grpc_channel_args *args; + /** Server name */ + const char *server_name; /** Address to connect to */ struct sockaddr *addr; size_t addr_len; @@ -172,4 +175,4 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_subchannel_args *args); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H */ diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c index daf62eac17..bf22975fba 100644 --- a/src/core/ext/client_channel/subchannel_index.c +++ b/src/core/ext/client_channel/subchannel_index.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/avl.h> +#include <grpc/support/string_util.h> #include <grpc/support/tls.h> #include "src/core/lib/channel/channel_args.h" @@ -85,6 +86,7 @@ static grpc_subchannel_key *create_key( } else { k->args.filters = NULL; } + k->args.server_name = gpr_strdup(args->server_name); k->args.addr_len = args->addr_len; k->args.addr = gpr_malloc(args->addr_len); if (k->args.addr_len > 0) { @@ -111,6 +113,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a, if (c != 0) return c; c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; + c = strcmp(a->args.server_name, b->args.server_name); + if (c != 0) return c; if (a->args.addr_len) { c = memcmp(a->args.addr, b->args.addr, a->args.addr_len); if (c != 0) return c; @@ -126,9 +130,10 @@ static int subchannel_key_compare(grpc_subchannel_key *a, void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *k) { grpc_connector_unref(exec_ctx, k->connector); - gpr_free(k->args.addr); gpr_free((grpc_channel_args *)k->args.filters); grpc_channel_args_destroy((grpc_channel_args *)k->args.args); + gpr_free((void *)k->args.server_name); + gpr_free(k->args.addr); gpr_free(k); } diff --git a/src/core/ext/client_channel/subchannel_index.h b/src/core/ext/client_channel/subchannel_index.h index a34389d564..5b3af6b054 100644 --- a/src/core/ext/client_channel/subchannel_index.h +++ b/src/core/ext/client_channel/subchannel_index.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H #include "src/core/ext/client_channel/connector.h" #include "src/core/ext/client_channel/subchannel.h" @@ -74,4 +74,4 @@ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ void grpc_subchannel_index_shutdown(void); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H */ diff --git a/src/core/ext/client_channel/uri_parser.h b/src/core/ext/client_channel/uri_parser.h index 875a7cb07c..5fe0e8f35e 100644 --- a/src/core/ext/client_channel/uri_parser.h +++ b/src/core/ext/client_channel/uri_parser.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H -#define GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H +#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H +#define GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H #include <stddef.h> @@ -60,4 +60,4 @@ const char *grpc_uri_get_query_arg(const grpc_uri *uri, const char *key); /** destroy a uri */ void grpc_uri_destroy(grpc_uri *uri); -#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_URI_PARSER_H */ +#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H */ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 5006e7e3f0..9cd1c077d8 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -76,9 +76,9 @@ * operations in progress over the old RR instance. This is done by * decreasing the reference count on the old policy. The moment no more * references are held on the old RR policy, it'll be destroyed and \a - * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state. - * At this point we can transition to a new RR instance safely, which is done - * once again via \a rr_handover(). + * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN + * state. At this point we can transition to a new RR instance safely, which + * is done once again via \a rr_handover(). * * * Once a RR policy instance is in place (and getting updated as described), @@ -96,6 +96,8 @@ * - Implement LB service forwarding (point 2c. in the doc's diagram). */ +#include <errno.h> + #include <string.h> #include <grpc/byte_buffer_reader.h> @@ -103,24 +105,53 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> +#include <grpc/support/time.h> #include "src/core/ext/client_channel/client_channel_factory.h" +#include "src/core/ext/client_channel/lb_policy_factory.h" #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/static_metadata.h" int grpc_lb_glb_trace = 0; +/* add lb_token of selected subchannel (address) to the call's initial + * metadata */ +static void initial_metadata_add_lb_token( + grpc_metadata_batch *initial_metadata, + grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) { + GPR_ASSERT(lb_token_mdelem_storage != NULL); + GPR_ASSERT(lb_token != NULL); + grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + typedef struct wrapped_rr_closure_arg { /* the original closure. Usually a on_complete/notify cb for pick() and ping() * calls against the internal RR instance, respectively. */ grpc_closure *wrapped_closure; + /* the pick's initial metadata, kept in order to append the LB token for the + * pick */ + grpc_metadata_batch *initial_metadata; + + /* the picked target, used to determine which LB token to add to the pick's + * initial metadata */ + grpc_connected_subchannel **target; + + /* the LB token associated with the pick */ + grpc_mdelem *lb_token; + + /* storage for the lb token initial metadata mdelem */ + grpc_linked_mdelem *lb_token_mdelem_storage; + /* The RR instance related to the closure */ grpc_lb_policy *rr_policy; @@ -141,9 +172,20 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, (intptr_t)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); + + /* if target is NULL, no pick has been made by the RR policy (eg, all + * addresses failed to connect). There won't be any user_data/token + * available */ + if (wc_arg->target != NULL) { + initial_metadata_add_lb_token(wc_arg->initial_metadata, + wc_arg->lb_token_mdelem_storage, + GRPC_MDELEM_REF(wc_arg->lb_token)); + } } GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL); + + grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), + NULL); gpr_free(wc_arg->owning_pending_node); } @@ -158,15 +200,8 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, typedef struct pending_pick { struct pending_pick *next; - /* polling entity for the pick()'s async notification */ - grpc_polling_entity *pollent; - - /* the initial metadata for the pick. See grpc_lb_policy_pick() */ - grpc_metadata_batch *initial_metadata; - - /* bitmask passed to pick() and used for selective cancelling. See - * grpc_lb_policy_cancel_picks() */ - uint32_t initial_metadata_flags; + /* original pick()'s arguments */ + grpc_lb_policy_pick_args pick_args; /* output argument where to store the pick()ed connected subchannel, or NULL * upon error. */ @@ -180,20 +215,21 @@ typedef struct pending_pick { wrapped_rr_closure_arg wrapped_on_complete_arg; } pending_pick; -static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, +static void add_pending_pick(pending_pick **root, + const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, grpc_closure *on_complete) { pending_pick *pp = gpr_malloc(sizeof(*pp)); memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; - pp->pollent = pollent; + pp->pick_args = *pick_args; pp->target = target; - pp->initial_metadata = initial_metadata; - pp->initial_metadata_flags = initial_metadata_flags; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; + pp->wrapped_on_complete_arg.target = target; + pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; + pp->wrapped_on_complete_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure, &pp->wrapped_on_complete_arg); *root = pp; @@ -235,8 +271,13 @@ typedef struct glb_lb_policy { /** mutex protecting remaining members */ gpr_mu mu; + /** who the client is trying to communicate with */ + const char *server_name; grpc_client_channel_factory *cc_factory; + /** deadline for the LB's call */ + gpr_timespec deadline; + /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -252,6 +293,9 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; + /** addresses from \a serverlist */ + grpc_lb_addresses *addresses; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -279,58 +323,133 @@ struct rr_connectivity_data { glb_lb_policy *glb_policy; }; -static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, - const grpc_grpclb_serverlist *serverlist, - glb_lb_policy *glb_policy) { - /* TODO(dgq): support mixed ip version */ - GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address, - serverlist->servers[i]->port); +static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, + bool log) { + const grpc_grpclb_ip_address *ip = &server->ip_address; + if (server->port >> 16 != 0) { + if (log) { + gpr_log(GPR_ERROR, + "Invalid port '%d' at index %zu of serverlist. Ignoring.", + server->port, idx); + } + return false; } - size_t uri_path_len; - char *concat_ipports = gpr_strjoin_sep( - (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len); + if (ip->size != 4 && ip->size != 16) { + if (log) { + gpr_log(GPR_ERROR, + "Expected IP to be 4 or 16 bytes, got %d at index %zu of " + "serverlist. Ignoring", + ip->size, idx); + } + return false; + } + return true; +} - grpc_lb_policy_args args; - args.client_channel_factory = glb_policy->cc_factory; - args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); - args.addresses->naddrs = serverlist->num_servers; - args.addresses->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs); - size_t out_addrs_idx = 0; +/* Returns addresses extracted from \a serverlist. */ +static grpc_lb_addresses *process_serverlist( + const grpc_grpclb_serverlist *serverlist) { + size_t num_valid = 0; + /* first pass: count how many are valid in order to allocate the necessary + * memory in a single block */ for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_uri uri; - struct sockaddr_storage sa; - size_t sa_len; - uri.path = host_ports[i]; - if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ - memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); - args.addresses->addrs[out_addrs_idx].len = sa_len; - ++out_addrs_idx; + if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; + } + if (num_valid == 0) return NULL; + + grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid); + + /* second pass: actually populate the addresses and LB tokens (aka user data + * to the outside world) to be read by the RR policy during its creation. + * Given that the validity tests are very cheap, they are performed again + * instead of marking the valid ones during the first pass, as this would + * incurr in an allocation due to the arbitrary number of server */ + size_t addr_idx = 0; + for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { + GPR_ASSERT(addr_idx < num_valid); + const grpc_grpclb_server *server = serverlist->servers[sl_idx]; + if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; + + /* address processing */ + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; + grpc_resolved_address addr; + memset(&addr, 0, sizeof(addr)); + if (ip->size == 4) { + addr.len = sizeof(struct sockaddr_in); + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr; + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + addr.len = sizeof(struct sockaddr_in6); + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr; + addr6->sin6_family = AF_INET; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } + + /* lb token processing */ + void *user_data; + if (server->has_load_balance_token) { + const size_t lb_token_size = + GPR_ARRAY_SIZE(server->load_balance_token) - 1; + grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( + (uint8_t *)server->load_balance_token, lb_token_size); + user_data = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); } else { - gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", - host_ports[i]); + gpr_log(GPR_ERROR, + "Missing LB token for backend address '%s'. The empty token will " + "be used instead", + grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr)); + user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; } + + grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, + false /* is_balancer */, + NULL /* balancer_name */, user_data); + ++addr_idx; } + GPR_ASSERT(addr_idx == num_valid); + + return lb_addresses; +} + +/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */ +static void lb_token_destroy(void *token) { + if (token != NULL) GRPC_MDELEM_UNREF(token); +} + +static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, + const grpc_grpclb_serverlist *serverlist, + glb_lb_policy *glb_policy) { + GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); + + grpc_lb_policy_args args; + memset(&args, 0, sizeof(args)); + args.server_name = glb_policy->server_name; + args.client_channel_factory = glb_policy->cc_factory; + args.addresses = process_serverlist(serverlist); grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - gpr_free(concat_ipports); - for (size_t i = 0; i < serverlist->num_servers; i++) { - gpr_free(host_ports[i]); + if (glb_policy->addresses != NULL) { + /* dispose of the previous version */ + grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); } - gpr_free(host_ports); - gpr_free(args.addresses->addrs); - gpr_free(args.addresses); + glb_policy->addresses = args.addresses; + return rr; } static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_error *error) { - GRPC_ERROR_REF(error); + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); glb_policy->rr_policy = create_rr(exec_ctx, glb_policy->serverlist, glb_policy); @@ -345,8 +464,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state, &glb_policy->rr_connectivity->on_change); grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - glb_policy->rr_connectivity->state, error, - "rr_handover"); + glb_policy->rr_connectivity->state, + GRPC_ERROR_REF(error), "rr_handover"); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); /* flush pending ops */ @@ -359,9 +478,10 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent, - pp->initial_metadata, pp->initial_metadata_flags, - pp->target, &pp->wrapped_on_complete); + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args, + pp->target, + (void **)&pp->wrapped_on_complete_arg.lb_token, + &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; } @@ -378,13 +498,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, &pping->wrapped_notify); pping->wrapped_notify_arg.owning_pending_node = pping; } - GRPC_ERROR_UNREF(error); } -static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; + if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) { if (glb_policy->serverlist != NULL) { /* a RR policy is shutting down but there's a serverlist available -> @@ -398,8 +518,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (error == GRPC_ERROR_NONE) { /* RR not shutting down. Mimic the RR's policy state */ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, error, - "rr_connectivity_changed"); + rr_conn_data->state, GRPC_ERROR_REF(error), + "glb_rr_connectivity_changed"); /* resubscribe */ grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, &rr_conn_data->state, @@ -408,41 +528,64 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(rr_conn_data); } } - GRPC_ERROR_UNREF(error); } static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + /* Count the number of gRPC-LB addresses. There must be at least one. + * TODO(roth): For now, we ignore non-balancer addresses, but in the + * future, we may change the behavior such that we fall back to using + * the non-balancer addresses if we cannot reach any balancers. At that + * time, this should be changed to allow a list with no balancer addresses, + * since the resolver might fail to return a balancer address even when + * this is the right LB policy to use. */ + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < args->addresses->num_addresses; ++i) { + if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return NULL; + glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); memset(glb_policy, 0, sizeof(*glb_policy)); /* All input addresses in args->addresses come from a resolver that claims - * they are LB services. It's the resolver's responsibility to make sure this + * they are LB services. It's the resolver's responsibility to make sure + * this * policy is only instantiated and used in that case. * * Create a client channel over them to communicate with a LB service */ + glb_policy->server_name = gpr_strdup(args->server_name); glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); - if (args->addresses->naddrs == 0) { - return NULL; - } - /* construct a target from the args->addresses, in the form + /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ - char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); - addr_strs[0] = - grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); - for (size_t i = 1; i < args->addresses->naddrs; i++) { - GPR_ASSERT(grpc_sockaddr_to_string( - &addr_strs[i], - (const struct sockaddr *)&args->addresses->addrs[i], - true) == 0); + char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs); + size_t addr_index = 0; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (args->addresses->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + if (args->addresses->addresses[i].is_balancer) { + if (addr_index == 0) { + addr_strs[addr_index++] = grpc_sockaddr_to_uri( + (const struct sockaddr *)&args->addresses->addresses[i] + .address.addr); + } else { + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_strs[addr_index++], + (const struct sockaddr *)&args->addresses->addresses[i] + .address.addr, + true) > 0); + } + } } size_t uri_path_len; - char *target_uri_str = gpr_strjoin_sep( - (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs, + num_grpclb_addrs, ",", &uri_path_len); /* will pick using pick_first */ glb_policy->lb_channel = grpc_client_channel_factory_create_channel( @@ -450,7 +593,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); gpr_free(target_uri_str); - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < num_grpclb_addrs; i++) { gpr_free(addr_strs[i]); } gpr_free(addr_strs); @@ -463,7 +606,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, rr_connectivity_data *rr_connectivity = gpr_malloc(sizeof(rr_connectivity_data)); memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed, + grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, rr_connectivity); rr_connectivity->glb_policy = glb_policy; glb_policy->rr_connectivity = rr_connectivity; @@ -479,6 +622,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_pings == NULL); + gpr_free((void *)glb_policy->server_name); grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -486,6 +630,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } gpr_mu_destroy(&glb_policy->mu); + grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy); gpr_free(glb_policy); } @@ -505,7 +650,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE, NULL); - gpr_free(pp); pp = next; } @@ -533,7 +677,8 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { + grpc_connected_subchannel **target, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); pending_pick *pp = glb_policy->pending_picks; @@ -542,11 +687,11 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pollent, glb_policy->base.interested_parties); + exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, - GRPC_ERROR_CANCELLED, NULL); - gpr_free(pp); + grpc_exec_ctx_sched( + exec_ctx, &pp->wrapped_on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -554,12 +699,14 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&glb_policy->mu); + GRPC_ERROR_UNREF(error); } static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client); static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); if (glb_policy->lb_client != NULL) { @@ -570,13 +717,13 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, glb_policy->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; - if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pollent, glb_policy->base.interested_parties); - grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, - GRPC_ERROR_CANCELLED, NULL); - gpr_free(pp); + exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); + grpc_exec_ctx_sched( + exec_ctx, &pp->wrapped_on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -584,6 +731,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&glb_policy->mu); + GRPC_ERROR_UNREF(error); } static void query_for_backends(grpc_exec_ctx *exec_ctx, @@ -603,14 +751,23 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { + if (pick_args->lb_token_mdelem_storage == NULL) { + *target = NULL; + grpc_exec_ctx_sched( + exec_ctx, on_complete, + GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting " + "won't work without it. Failing"), + NULL); + return 1; + } + glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); - int r; + glb_policy->deadline = pick_args->deadline; + bool pick_done; if (glb_policy->rr_policy != NULL) { if (grpc_lb_glb_trace) { @@ -620,37 +777,47 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg)); glb_policy->wc_arg.rr_policy = glb_policy->rr_policy; + glb_policy->wc_arg.target = target; glb_policy->wc_arg.wrapped_closure = on_complete; + glb_policy->wc_arg.lb_token_mdelem_storage = + pick_args->lb_token_mdelem_storage; + glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata; + glb_policy->wc_arg.owning_pending_node = NULL; grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, &glb_policy->wc_arg); - r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent, - initial_metadata, initial_metadata_flags, target, + + pick_done = + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, + (void **)&glb_policy->wc_arg.lb_token, &glb_policy->wrapped_on_complete); - if (r != 0) { - /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR - * policy and notify the original callback */ - glb_policy->wc_arg.wrapped_closure = NULL; + if (pick_done) { + /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)glb_policy->wc_arg.rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick"); - grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure, - GRPC_ERROR_NONE, NULL); + + /* add the load reporting initial metadata */ + initial_metadata_add_lb_token( + pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, + GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); } } else { - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + /* else, the pending pick will be registered and taken care of by the + * pending pick list inside the RR policy (glb_policy->rr_policy) */ + grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, glb_policy->base.interested_parties); - add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata, - initial_metadata_flags, target, on_complete); + add_pending_pick(&glb_policy->pending_picks, pick_args, target, + on_complete); if (!glb_policy->started_picking) { start_picking(exec_ctx, glb_policy); } - r = 0; + pick_done = false; } gpr_mu_unlock(&glb_policy->mu); - return r; + return pick_done; } static grpc_connectivity_state glb_check_connectivity( @@ -702,9 +869,6 @@ typedef struct lb_client_data { /* called once initial metadata's been sent */ grpc_closure md_sent; - /* called once initial metadata's been received */ - grpc_closure md_rcvd; - /* called once the LoadBalanceRequest has been sent to the LB server. See * src/proto/grpc/.../load_balancer.proto */ grpc_closure req_sent; @@ -741,7 +905,6 @@ typedef struct lb_client_data { } lb_client_data; static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, @@ -750,38 +913,36 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->server_name != NULL); + GPR_ASSERT(glb_policy->server_name[0] != '\0'); + lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data)); memset(lb_client, 0, sizeof(lb_client_data)); gpr_mu_init(&lb_client->mu); grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client); - grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client); grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client); grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client); grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client); - /* TODO(dgq): get the deadline from the client config instead of fabricating - * one here. */ - lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(3, GPR_TIMESPAN)); + lb_client->deadline = glb_policy->deadline; /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling * entities passed to glb_pick(). */ lb_client->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, - glb_policy->base.interested_parties, "/BalanceLoad", - NULL, /* FIXME(dgq): which "host" value to use? */ + glb_policy->base.interested_parties, + "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name, lb_client->deadline, NULL); grpc_metadata_array_init(&lb_client->initial_metadata_recv); grpc_metadata_array_init(&lb_client->trailing_metadata_recv); - grpc_grpclb_request *request = grpc_grpclb_request_create( - "load.balanced.service.name"); /* FIXME(dgq): get the name of the load - balanced service from the resolver */ + grpc_grpclb_request *request = + grpc_grpclb_request_create(glb_policy->server_name); gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); lb_client->request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); @@ -855,23 +1016,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_op ops[1]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; - op->flags = 0; - op->reserved = NULL; - op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->md_rcvd); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = lb_client->request_payload; @@ -886,11 +1030,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { lb_client_data *lb_client = arg; + GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; + grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &lb_client->response_payload; op->flags = 0; @@ -909,8 +1060,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_op *op = ops; if (lb_client->response_payload != NULL) { /* Received data from the LB server. Look inside - * lb_client->response_payload, for - * a serverlist. */ + * lb_client->response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload); gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); @@ -947,7 +1097,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } else { /* unref the RR policy, eventually leading to its substitution with a * new one constructed from the received serverlist (see - * rr_connectivity_changed) */ + * glb_rr_connectivity_changed) */ GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, "serverlist_received"); } @@ -1010,8 +1160,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, lb_client->status, lb_client->status_details, lb_client->status_details_capacity); } - /* TODO(dgq): deal with stream termination properly (fire up another one? fail - * the original call?) */ + /* TODO(dgq): deal with stream termination properly (fire up another one? + * fail the original call?) */ } /* Code wiring the policy with the rest of the core */ diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c index f4720a1345..a8881004a0 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c @@ -57,6 +57,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, if (dec_arg->first_pass) { /* count how many server do we have */ grpc_grpclb_server server; if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); return false; } dec_arg->num_servers++; @@ -69,6 +70,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers); } if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); return false; } dec_arg->servers[dec_arg->decoding_idx++] = server; @@ -118,6 +120,7 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } grpc_grpclb_initial_response *initial_res = @@ -145,6 +148,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( arg.first_pass = true; status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); if (!status) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } @@ -152,6 +156,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res); if (!status) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h index a63a4c40aa..079a64a3f3 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h @@ -45,6 +45,7 @@ extern "C" { #define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128 +typedef grpc_lb_v1_Server_ip_address_t grpc_grpclb_ip_address; typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request; typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response; typedef grpc_lb_v1_Server grpc_grpclb_server; diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c index 52e11c40bb..afecb716fb 100644 --- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c +++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c @@ -31,10 +31,11 @@ * */ /* Automatically generated nanopb constant definitions */ -/* Generated by nanopb-0.3.5-dev */ +/* Generated by nanopb-0.3.7-dev */ #include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" +/* @@protoc_insertion_point(includes) */ #if PB_PROTO_HEADER_VERSION != 30 #error Regenerate this file with the current version of nanopb generator. #endif @@ -72,8 +73,8 @@ const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3] = { }; const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = { - PB_FIELD( 2, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0), - PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields), + PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0), + PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields), PB_LAST_FIELD }; @@ -84,7 +85,7 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = { }; const pb_field_t grpc_lb_v1_Server_fields[5] = { - PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0), + PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0), PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0), PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0), PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0), @@ -116,3 +117,4 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) #endif +/* @@protoc_insertion_point(eof) */ diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h index 46fe588f72..53fed22bae 100644 --- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h +++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h @@ -31,11 +31,12 @@ * */ /* Automatically generated nanopb header */ -/* Generated by nanopb-0.3.5-dev */ +/* Generated by nanopb-0.3.7-dev */ -#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H -#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H +#ifndef PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED +#define PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED #include "third_party/nanopb/pb.h" +/* @@protoc_insertion_point(includes) */ #if PB_PROTO_HEADER_VERSION != 30 #error Regenerate this file with the current version of nanopb generator. #endif @@ -52,6 +53,7 @@ typedef struct _grpc_lb_v1_ClientStats { int64_t client_rpc_errors; bool has_dropped_requests; int64_t dropped_requests; +/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */ } grpc_lb_v1_ClientStats; typedef struct _grpc_lb_v1_Duration { @@ -59,22 +61,26 @@ typedef struct _grpc_lb_v1_Duration { int64_t seconds; bool has_nanos; int32_t nanos; +/* @@protoc_insertion_point(struct:grpc_lb_v1_Duration) */ } grpc_lb_v1_Duration; typedef struct _grpc_lb_v1_InitialLoadBalanceRequest { bool has_name; char name[128]; +/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceRequest) */ } grpc_lb_v1_InitialLoadBalanceRequest; +typedef PB_BYTES_ARRAY_T(16) grpc_lb_v1_Server_ip_address_t; typedef struct _grpc_lb_v1_Server { bool has_ip_address; - char ip_address[46]; + grpc_lb_v1_Server_ip_address_t ip_address; bool has_port; int32_t port; bool has_load_balance_token; - char load_balance_token[64]; + char load_balance_token[65]; bool has_drop_request; bool drop_request; +/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */ } grpc_lb_v1_Server; typedef struct _grpc_lb_v1_InitialLoadBalanceResponse { @@ -82,6 +88,7 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse { char load_balancer_delegate[64]; bool has_client_stats_report_interval; grpc_lb_v1_Duration client_stats_report_interval; +/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */ } grpc_lb_v1_InitialLoadBalanceResponse; typedef struct _grpc_lb_v1_LoadBalanceRequest { @@ -89,12 +96,14 @@ typedef struct _grpc_lb_v1_LoadBalanceRequest { grpc_lb_v1_InitialLoadBalanceRequest initial_request; bool has_client_stats; grpc_lb_v1_ClientStats client_stats; +/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */ } grpc_lb_v1_LoadBalanceRequest; typedef struct _grpc_lb_v1_ServerList { pb_callback_t servers; bool has_expiration_interval; grpc_lb_v1_Duration expiration_interval; +/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */ } grpc_lb_v1_ServerList; typedef struct _grpc_lb_v1_LoadBalanceResponse { @@ -102,6 +111,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { grpc_lb_v1_InitialLoadBalanceResponse initial_response; bool has_server_list; grpc_lb_v1_ServerList server_list; +/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceResponse) */ } grpc_lb_v1_LoadBalanceResponse; /* Default values for struct fields */ @@ -114,7 +124,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default} #define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default} #define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default} -#define grpc_lb_v1_Server_init_default {false, "", false, 0, false, "", false, 0} +#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0} #define grpc_lb_v1_Duration_init_zero {false, 0, false, 0} #define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero} #define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""} @@ -122,7 +132,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero} #define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero} #define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero} -#define grpc_lb_v1_Server_init_zero {false, "", false, 0, false, "", false, 0} +#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0} /* Field tags (for use in manual encoding/decoding) */ #define grpc_lb_v1_ClientStats_total_requests_tag 1 @@ -135,8 +145,8 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_Server_port_tag 2 #define grpc_lb_v1_Server_load_balance_token_tag 3 #define grpc_lb_v1_Server_drop_request_tag 4 -#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2 -#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3 +#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1 +#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2 #define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1 #define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2 #define grpc_lb_v1_ServerList_servers_tag 1 @@ -161,7 +171,8 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5]; #define grpc_lb_v1_ClientStats_size 33 #define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size) #define grpc_lb_v1_InitialLoadBalanceResponse_size 90 -#define grpc_lb_v1_Server_size 127 +/* grpc_lb_v1_ServerList_size depends on runtime parameters */ +#define grpc_lb_v1_Server_size 98 /* Message IDs (where set with "msgid" option) */ #ifdef PB_MSGID @@ -174,5 +185,6 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5]; #ifdef __cplusplus } /* extern "C" */ #endif +/* @@protoc_insertion_point(eof) */ #endif diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 276189689e..ac0c09f482 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -128,7 +128,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { + grpc_connected_subchannel **target, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -140,8 +141,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -150,11 +152,13 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -166,8 +170,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, initial_metadata_flags_eq) { grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -176,6 +181,7 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { @@ -199,10 +205,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; @@ -225,13 +229,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, p->base.interested_parties); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollent = pollent; + pp->pollent = pick_args->pollent; pp->target = target; - pp->initial_metadata_flags = initial_metadata_flags; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); @@ -443,20 +447,37 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); - if (args->addresses->naddrs == 0) return NULL; + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (!args->addresses->addresses[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + /* Skip balancer addresses, since we only know how to handle backends. */ + if (args->addresses->addresses[i].is_balancer) continue; + + if (args->addresses->addresses[i].user_data != NULL) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + /* server_name will be copied as part of the subchannel creation. This makes + * the copying of args->server_name (a borrowed pointer) OK. */ + sc_args.server_name = args->server_name; + sc_args.addr = + (struct sockaddr *)(&args->addresses->addresses[i].address.addr); + sc_args.addr_len = args->addresses->addresses[i].address.len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index b2ee834b52..c344312e39 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -66,6 +66,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/static_metadata.h" typedef struct round_robin_lb_policy round_robin_lb_policy; @@ -76,15 +77,32 @@ int grpc_lb_round_robin_trace = 0; * Once a pick is available, \a target is updated and \a on_complete called. */ typedef struct pending_pick { struct pending_pick *next; + + /* polling entity for the pick()'s async notification */ grpc_polling_entity *pollent; + + /* output argument where to store the pick()ed user_data. It'll be NULL if no + * such data is present or there's an error (the definite test for errors is + * \a target being NULL). */ + void **user_data; + + /* bitmask passed to pick() and used for selective cancelling. See + * grpc_lb_policy_cancel_picks() */ uint32_t initial_metadata_flags; + + /* output argument where to store the pick()ed connected subchannel, or NULL + * upon error. */ grpc_connected_subchannel **target; + + /* to be invoked once the pick() has completed (regardless of success) */ grpc_closure *on_complete; } pending_pick; /** List of subchannels in a connectivity READY state */ typedef struct ready_list { grpc_subchannel *subchannel; + /* references namesake entry in subchannel_data */ + void *user_data; struct ready_list *next; struct ready_list *prev; } ready_list; @@ -102,12 +120,17 @@ typedef struct { ready_list *ready_list_node; /** last observed connectivity */ grpc_connectivity_state connectivity_state; + /** the subchannel's target user data */ + void *user_data; } subchannel_data; struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; + /** total number of addresses received at creation time */ + size_t num_addresses; + /** all our subchannels */ size_t num_subchannels; subchannel_data **subchannels; @@ -166,16 +189,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - p->ready_list_last_pick, p->ready_list_last_pick->subchannel); + (void *)p->ready_list_last_pick, + (void *)p->ready_list_last_pick->subchannel); } } /** Prepends (relative to the root at p->ready_list) the connected subchannel \a * csc to the list of ready subchannels. */ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - grpc_subchannel *sc) { + subchannel_data *sd) { ready_list *new_elem = gpr_malloc(sizeof(ready_list)); - new_elem->subchannel = sc; + memset(new_elem, 0, sizeof(ready_list)); + new_elem->subchannel = sd->subchannel; + new_elem->user_data = sd->user_data; if (p->ready_list.prev == NULL) { /* first element */ new_elem->next = &p->ready_list; @@ -189,7 +215,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, p->ready_list.prev = new_elem; } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc); + gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)", + (void *)new_elem, (void *)sd->subchannel); } return new_elem; } @@ -216,8 +243,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, - node->subchannel); + gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node, + (void *)node->subchannel); } node->next = NULL; @@ -229,9 +256,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { + for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); gpr_free(sd); @@ -251,6 +277,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(elem); elem = tmp; } + gpr_free(p); } @@ -281,7 +308,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { + grpc_connected_subchannel **target, + grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -293,8 +321,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, - NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -303,11 +332,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq) { + uint32_t initial_metadata_flags_eq, + grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -320,8 +351,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, - NULL); + grpc_exec_ctx_sched( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -330,6 +362,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp = next; } gpr_mu_unlock(&p->mu); + GRPC_ERROR_UNREF(error); } static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { @@ -337,7 +370,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { p->started_picking = 1; if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p, + gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p, p->num_subchannels); } @@ -361,38 +394,43 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_polling_entity *pollent, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **target, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { + /* readily available, report right away */ gpr_mu_unlock(&p->mu); *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + + if (user_data != NULL) { + *user_data = selected->user_data; + } if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target, - selected); + "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", + (void *)*target, (void *)selected); } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); return 1; } else { + /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, p->base.interested_parties); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollent = pollent; + pp->pollent = pick_args->pollent; pp->target = target; pp->on_complete = on_complete; - pp->initial_metadata_flags = initial_metadata_flags; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->user_data = user_data; p->pending_picks = pp; gpr_mu_unlock(&p->mu); return 0; @@ -421,7 +459,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_ready"); /* add the newly connected subchannel to the list of connected ones. * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); + sd->ready_list_node = add_connected_sc_locked(p, sd); /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ @@ -433,12 +471,16 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } while ((pp = p->pending_picks)) { p->pending_picks = pp->next; + *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + if (pp->user_data != NULL) { + *pp->user_data = selected->user_data; + } if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); + (void *)selected->subchannel, (void *)selected); } grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, p->base.interested_parties); @@ -571,19 +613,34 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); + /* Find the number of backend addresses. We ignore balancer + * addresses, since we don't know how to handle them. */ + size_t num_addrs = 0; + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + if (!args->addresses->addresses[i].is_balancer) ++num_addrs; + } + if (num_addrs == 0) return NULL; + round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); memset(p, 0, sizeof(*p)); - p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); + p->num_addresses = num_addrs; + p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs); + memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { + for (size_t i = 0; i < args->addresses->num_addresses; i++) { + /* Skip balancer addresses, since we only know how to handle backends. */ + if (args->addresses->addresses[i].is_balancer) continue; + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; + /* server_name will be copied as part of the subchannel creation. This makes + * the copying of args->server_name (a borrowed pointer) OK. */ + sc_args.server_name = args->server_name; + sc_args.addr = + (struct sockaddr *)(&args->addresses->addresses[i].address.addr); + sc_args.addr_len = args->addresses->addresses[i].address.len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); @@ -595,12 +652,14 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; + sd->user_data = args->addresses->addresses[i].user_data; ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); } } if (subchannel_idx == 0) { + /* couldn't create any subchannel. Bail out */ gpr_free(p->subchannels); gpr_free(p); return NULL; diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index d70b498ae8..b4ceee340e 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -37,6 +37,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> +#include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -52,21 +53,17 @@ typedef struct { /** base class: must be first */ grpc_resolver base; - /** refcount */ - gpr_refcount refs; - /** name to resolve */ - char *name; + /** target name */ + char *target_name; + /** name to resolve (usually the same as target_name) */ + char *name_to_resolve; /** default port to use */ char *default_port; - /** subchannel factory */ - grpc_client_channel_factory *client_channel_factory; - /** load balancing policy name */ - char *lb_policy_name; /** mutex guarding the rest of the state */ gpr_mu mu; /** are we currently resolving? */ - int resolving; + bool resolving; /** which version of the result have we published? */ int published_version; /** which version of the result is current? */ @@ -166,24 +163,21 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { dns_resolver *r = arg; grpc_resolver_result *result = NULL; - grpc_lb_policy *lb_policy; gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); - r->resolving = 0; - grpc_resolved_addresses *addresses = r->addresses; - if (addresses != NULL) { - grpc_lb_policy_args lb_policy_args; - result = grpc_resolver_result_create(); - memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.addresses = addresses; - lb_policy_args.client_channel_factory = r->client_channel_factory; - lb_policy = - grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); - if (lb_policy != NULL) { - grpc_resolver_result_set_lb_policy(result, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + r->resolving = false; + if (r->addresses != NULL) { + grpc_lb_addresses *addresses = + grpc_lb_addresses_create(r->addresses->naddrs); + for (size_t i = 0; i < r->addresses->naddrs; ++i) { + grpc_lb_addresses_set_address( + addresses, i, &r->addresses->addrs[i].addr, + r->addresses->addrs[i].len, false /* is_balancer */, + NULL /* balancer_name */, NULL /* user_data */); } - grpc_resolved_addresses_destroy(addresses); + grpc_resolved_addresses_destroy(r->addresses); + result = grpc_resolver_result_create(r->target_name, addresses, + NULL /* lb_policy_name */, NULL); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); @@ -218,9 +212,9 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r) { GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GPR_ASSERT(!r->resolving); - r->resolving = 1; + r->resolving = true; r->addresses = NULL; - grpc_resolve_address(exec_ctx, r->name, r->default_port, + grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port, grpc_closure_create(dns_on_resolved, r), &r->addresses); } @@ -244,38 +238,33 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { if (r->resolved_result) { grpc_resolver_result_unref(exec_ctx, r->resolved_result); } - grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory); - gpr_free(r->name); + gpr_free(r->target_name); + gpr_free(r->name_to_resolve); gpr_free(r->default_port); - gpr_free(r->lb_policy_name); gpr_free(r); } static grpc_resolver *dns_create(grpc_resolver_args *args, - const char *default_port, - const char *lb_policy_name) { - dns_resolver *r; - const char *path = args->uri->path; - + const char *default_port) { if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based dns uri's not supported"); return NULL; } - + // Get name from args. + const char *path = args->uri->path; if (path[0] == '/') ++path; - - r = gpr_malloc(sizeof(dns_resolver)); + // Get proxy name, if any. + char *proxy_name = grpc_get_http_proxy_server(); + // Create resolver. + dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); - gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &dns_resolver_vtable); - r->name = gpr_strdup(path); + r->target_name = gpr_strdup(path); + r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->default_port = gpr_strdup(default_port); - r->client_channel_factory = args->client_channel_factory; gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); - grpc_client_channel_factory_ref(r->client_channel_factory); - r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } @@ -289,7 +278,7 @@ static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( grpc_resolver_factory *factory, grpc_resolver_args *args) { - return dns_create(args, "https", "pick_first"); + return dns_create(args, "https"); } static char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index f18e10a754..4293c4d23f 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -40,7 +40,6 @@ #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> -#include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -50,20 +49,14 @@ typedef struct { /** base class: must be first */ grpc_resolver base; - /** refcount */ - gpr_refcount refs; - /** subchannel factory */ - grpc_client_channel_factory *client_channel_factory; - /** load balancing policy name */ - char *lb_policy_name; - + /** the path component of the uri passed in */ + char *target_name; /** the addresses that we've 'resolved' */ - grpc_resolved_addresses *addresses; - + grpc_lb_addresses *addresses; /** mutex guarding the rest of the state */ gpr_mu mu; /** have we published? */ - int published; + bool published; /** pending next completion, or NULL */ grpc_closure *next_completion; /** target result address for next completion */ @@ -102,7 +95,7 @@ static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; gpr_mu_lock(&r->mu); - r->published = 0; + r->published = false; sockaddr_maybe_finish_next_locked(exec_ctx, r); gpr_mu_unlock(&r->mu); } @@ -122,17 +115,11 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r) { if (r->next_completion != NULL && !r->published) { - grpc_resolver_result *result = grpc_resolver_result_create(); - grpc_lb_policy_args lb_policy_args; - memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.addresses = r->addresses; - lb_policy_args.client_channel_factory = r->client_channel_factory; - grpc_lb_policy *lb_policy = - grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); - grpc_resolver_result_set_lb_policy(result, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); - r->published = 1; - *r->target_result = result; + r->published = true; + *r->target_result = grpc_resolver_result_create( + r->target_name, + grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), + NULL /* lb_policy_name */, NULL); grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } @@ -141,9 +128,8 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); - grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory); - grpc_resolved_addresses_destroy(r->addresses); - gpr_free(r->lb_policy_name); + grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */); + gpr_free(r->target_name); gpr_free(r); } @@ -172,83 +158,49 @@ char *unix_get_default_authority(grpc_resolver_factory *factory, static void do_nothing(void *ignored) {} -static grpc_resolver *sockaddr_create( - grpc_resolver_args *args, const char *default_lb_policy_name, - int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { - int errors_found = 0; /* GPR_FALSE */ - sockaddr_resolver *r; - gpr_slice path_slice; - gpr_slice_buffer path_parts; - +static grpc_resolver *sockaddr_create(grpc_resolver_args *args, + int parse(grpc_uri *uri, + struct sockaddr_storage *dst, + size_t *len)) { if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme", args->uri->scheme); return NULL; } - - r = gpr_malloc(sizeof(sockaddr_resolver)); - memset(r, 0, sizeof(*r)); - - r->lb_policy_name = - gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy")); - const char *lb_enabled_qpart = - grpc_uri_get_query_arg(args->uri, "lb_enabled"); - /* anything other than "0" is interpreted as true */ - const bool lb_enabled = - (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0)); - - if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 && - !lb_enabled) { - /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail - * out, as this is meant mostly for tests. */ - gpr_log(GPR_ERROR, - "Requested 'grpclb' LB policy but resolved addresses don't " - "support load balancing."); - abort(); - } - - if (r->lb_policy_name == NULL) { - r->lb_policy_name = gpr_strdup(default_lb_policy_name); - } - - path_slice = + /* Construct addresses. */ + gpr_slice path_slice = gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); + gpr_slice_buffer path_parts; gpr_slice_buffer_init(&path_parts); - gpr_slice_split(path_slice, ",", &path_parts); - r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); - r->addresses->naddrs = path_parts.count; - r->addresses->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs); - - for (size_t i = 0; i < r->addresses->naddrs; i++) { + grpc_lb_addresses *addresses = grpc_lb_addresses_create(path_parts.count); + bool errors_found = false; + for (size_t i = 0; i < addresses->num_addresses; i++) { grpc_uri ith_uri = *args->uri; char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; - if (!parse(&ith_uri, - (struct sockaddr_storage *)(&r->addresses->addrs[i].addr), - &r->addresses->addrs[i].len)) { - errors_found = 1; /* GPR_TRUE */ + if (!parse( + &ith_uri, + (struct sockaddr_storage *)(&addresses->addresses[i].address.addr), + &addresses->addresses[i].address.len)) { + errors_found = true; } gpr_free(part_str); if (errors_found) break; } - gpr_slice_buffer_destroy(&path_parts); gpr_slice_unref(path_slice); if (errors_found) { - gpr_free(r->lb_policy_name); - grpc_resolved_addresses_destroy(r->addresses); - gpr_free(r); + grpc_lb_addresses_destroy(addresses, NULL /* user_data_destroy */); return NULL; } - - gpr_ref_init(&r->refs, 1); + /* Instantiate resolver. */ + sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver)); + memset(r, 0, sizeof(*r)); + r->target_name = gpr_strdup(args->uri->path); + r->addresses = addresses; gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); - r->client_channel_factory = args->client_channel_factory; - grpc_client_channel_factory_ref(r->client_channel_factory); - return &r->base; } @@ -263,7 +215,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} #define DECL_FACTORY(name) \ static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_resolver_args *args) { \ - return sockaddr_create(args, "pick_first", parse_##name); \ + return sockaddr_create(args, parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ sockaddr_factory_ref, sockaddr_factory_unref, \ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index f848335f67..4b1764ad43 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -41,6 +41,7 @@ #include <grpc/support/slice_buffer.h> #include "src/core/ext/client_channel/client_channel.h" +#include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" @@ -160,7 +161,6 @@ typedef struct { grpc_client_channel_factory base; gpr_refcount refs; grpc_channel_args *merge_args; - grpc_channel *master; } client_channel_factory; static void client_channel_factory_ref( @@ -173,10 +173,6 @@ static void client_channel_factory_unref( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { client_channel_factory *f = (client_channel_factory *)cc_factory; if (gpr_unref(&f->refs)) { - if (f->master != NULL) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, - "client_channel_factory"); - } grpc_channel_args_destroy(f->merge_args); gpr_free(f); } @@ -194,6 +190,13 @@ static grpc_subchannel *client_channel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); c->handshake_mgr = grpc_handshake_manager_create(); + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + c->handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, args->server_name)); + gpr_free(proxy_name); + } args->args = final_args; s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); @@ -210,15 +213,15 @@ static grpc_channel *client_channel_factory_create_channel( grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args, GRPC_CLIENT_CHANNEL, NULL); grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = grpc_resolver_create(target, &f->base); + grpc_resolver *resolver = grpc_resolver_create(target); if (!resolver) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "client_channel_factory_create_channel"); return NULL; } - grpc_client_channel_set_resolver( - exec_ctx, grpc_channel_get_channel_stack(channel), resolver); + grpc_client_channel_finish_initialization( + exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel"); return channel; @@ -250,12 +253,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_channel *channel = client_channel_factory_create_channel( &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); - if (channel != NULL) { - f->master = channel; - GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create"); - } - grpc_client_channel_factory_unref(&exec_ctx, &f->base); + grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_exec_ctx_finish(&exec_ctx); return channel != NULL ? channel : grpc_lame_client_channel_create( diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index b3f7cf4efb..2a1d0d6087 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -41,6 +41,7 @@ #include <grpc/support/slice_buffer.h> #include "src/core/ext/client_channel/client_channel.h" +#include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" @@ -220,7 +221,6 @@ typedef struct { gpr_refcount refs; grpc_channel_args *merge_args; grpc_channel_security_connector *security_connector; - grpc_channel *master; } client_channel_factory; static void client_channel_factory_ref( @@ -235,10 +235,6 @@ static void client_channel_factory_unref( if (gpr_unref(&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "client_channel_factory"); - if (f->master != NULL) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, - "client_channel_factory"); - } grpc_channel_args_destroy(f->merge_args); gpr_free(f); } @@ -256,6 +252,13 @@ static grpc_subchannel *client_channel_factory_create_subchannel( c->base.vtable = &connector_vtable; c->security_connector = f->security_connector; c->handshake_mgr = grpc_handshake_manager_create(); + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + c->handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, args->server_name)); + gpr_free(proxy_name); + } gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; @@ -276,10 +279,10 @@ static grpc_channel *client_channel_factory_create_channel( GRPC_CLIENT_CHANNEL, NULL); grpc_channel_args_destroy(final_args); - grpc_resolver *resolver = grpc_resolver_create(target, &f->base); + grpc_resolver *resolver = grpc_resolver_create(target); if (resolver != NULL) { - grpc_client_channel_set_resolver( - exec_ctx, grpc_channel_get_channel_stack(channel), resolver); + grpc_client_channel_finish_initialization( + exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create"); } else { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, @@ -356,10 +359,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, grpc_channel *channel = client_channel_factory_create_channel( &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); - if (channel != NULL) { - f->master = channel; - GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_secure_channel_create"); - } grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c index 4350543c27..9af17fb5ae 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -50,10 +50,10 @@ #include "src/core/lib/surface/server.h" void grpc_server_add_insecure_channel_from_fd(grpc_server *server, - grpc_completion_queue *cq, - int fd) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + void *reserved, int fd) { + GPR_ASSERT(reserved == NULL); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; char *name; gpr_asprintf(&name, "fd:%d", fd); @@ -65,7 +65,15 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, const grpc_channel_args *server_args = grpc_server_get_channel_args(server); grpc_transport *transport = grpc_create_chttp2_transport( &exec_ctx, server_args, server_endpoint, 0 /* is_client */); - grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq)); + + grpc_pollset **pollsets; + size_t num_pollsets = 0; + grpc_server_get_pollsets(server, &pollsets, &num_pollsets); + + for (size_t i = 0; i < num_pollsets; i++) { + grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, pollsets[i]); + } + grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args); grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); grpc_exec_ctx_finish(&exec_ctx); @@ -74,8 +82,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, #else // !GPR_SUPPORT_CHANNELS_FROM_FD void grpc_server_add_insecure_channel_from_fd(grpc_server *server, - grpc_completion_queue *cq, - int fd) { + void *reserved, int fd) { GPR_ASSERT(0); } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 00999e3b94..1dd7fef76f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -33,6 +33,7 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include <limits.h> #include <math.h> #include <stdio.h> #include <string.h> @@ -46,6 +47,7 @@ #include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/status_conversion.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" @@ -89,13 +91,20 @@ static const grpc_transport_vtable vtable; static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); -static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, - grpc_error *error); +static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); +static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error, - const char *reason); + grpc_chttp2_transport *t, grpc_error *error); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -105,11 +114,6 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); -/** Perform a transport_op */ -static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, void *transport_op); - /** Cancel a stream: coming from the transport API */ static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -121,22 +125,10 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global, grpc_error *error); -/** Add endpoint from this transport to pollset */ -static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_ignored, void *pollset); -static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_ignored, - void *pollset_set); - /** Start new streams that have been created if we can */ static void maybe_start_some_streams( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); -static void finish_global_actions(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t); - static void connectivity_state_set( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_connectivity_state state, grpc_error *error, const char *reason); @@ -149,14 +141,16 @@ static void incoming_byte_stream_update_flow_control( grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - void *byte_stream); + void *byte_stream, + grpc_error *error_ignored); static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_error *error); +static void set_write_state(grpc_chttp2_transport *t, + grpc_chttp2_write_state state, const char *reason); + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -165,9 +159,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { size_t i; - gpr_mu_lock(&t->executor.mu); - - GPR_ASSERT(t->ep == NULL); + grpc_endpoint_destroy(exec_ctx, t->ep); gpr_slice_buffer_destroy(&t->global.qbuf); @@ -191,8 +183,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->new_stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - gpr_mu_unlock(&t->executor.mu); - gpr_mu_destroy(&t->executor.mu); + grpc_combiner_destroy(exec_ctx, t->executor.combiner); /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ @@ -250,12 +241,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, memset(t, 0, sizeof(*t)); t->base.vtable = &vtable; + t->executor.write_state = GRPC_CHTTP2_WRITES_CORKED; t->ep = ep; - /* one ref is for destroy, the other for when ep becomes NULL */ - gpr_ref_init(&t->refs, 2); + /* one ref is for destroy */ + gpr_ref_init(&t->refs, 1); /* ref is dropped at transport close() */ gpr_ref_init(&t->shutdown_ep_refs, 1); - gpr_mu_init(&t->executor.mu); + t->executor.combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep)); t->peer_string = grpc_endpoint_get_peer(ep); t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; @@ -281,23 +273,22 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor); grpc_closure_init(&t->writing_action, writing_action, t); grpc_closure_init(&t->reading_action, reading_action, t); + grpc_closure_init(&t->reading_action_locked, reading_action_locked, t); grpc_closure_init(&t->parsing_action, parsing_action, t); - grpc_closure_init(&t->initiate_writing, initiate_writing, t); + grpc_closure_init(&t->post_parse_locked, post_parse_locked, t); + grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t); + grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t); + grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked, + t); + grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, + &t->writing); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser); - grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, - &t->writing); gpr_slice_buffer_init(&t->read_buffer); - if (is_client) { - gpr_slice_buffer_add( - &t->global.qbuf, - gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); - } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet large enough that the exponential growth should happen nicely when it's @@ -320,6 +311,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->global.sent_local_settings = 0; + if (is_client) { + gpr_slice_buffer_add( + &t->writing.outbuf, + gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); + } + /* configure http2 the way we like it */ if (is_client) { push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); @@ -337,76 +335,65 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (is_client) { gpr_log(GPR_ERROR, "%s: is ignored on the client", GRPC_ARG_MAX_CONCURRENT_STREAMS); - } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_MAX_CONCURRENT_STREAMS); } else { - push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, - (uint32_t)channel_args->args[i].value.integer); + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + push_setting(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + (uint32_t)value); + } } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER); - } else if ((t->global.next_stream_id & 1) != - (channel_args->args[i].value.integer & 1)) { - gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", - GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, - t->global.next_stream_id & 1, - is_client ? "client" : "server"); - } else { - t->global.next_stream_id = - (uint32_t)channel_args->args[i].value.integer; + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + if ((t->global.next_stream_id & 1) != (value & 1)) { + gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", + GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, + t->global.next_stream_id & 1, + is_client ? "client" : "server"); + } else { + t->global.next_stream_id = (uint32_t)value; + } } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES); - } else if (channel_args->args[i].value.integer <= 5) { - gpr_log(GPR_ERROR, "%s: must be at least 5", - GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES); - } else { - t->global.stream_lookahead = - (uint32_t)channel_args->args[i].value.integer; + const grpc_integer_options options = {-1, 5, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { + t->global.stream_lookahead = (uint32_t)value; } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); - } else if (channel_args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s: must be non-negative", - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); - } else { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, - (uint32_t)channel_args->args[i].value.integer); + (uint32_t)value); } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER); - } else if (channel_args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s: must be non-negative", - GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER); - } else { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { grpc_chttp2_hpack_compressor_set_max_usable_size( - &t->writing.hpack_compressor, - (uint32_t)channel_args->args[i].value.integer); + &t->writing.hpack_compressor, (uint32_t)value); } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_MAX_METADATA_SIZE)) { - if (channel_args->args[i].type != GRPC_ARG_INTEGER) { - gpr_log(GPR_ERROR, "%s: must be an integer", - GRPC_ARG_MAX_METADATA_SIZE); - } else if (channel_args->args[i].value.integer < 0) { - gpr_log(GPR_ERROR, "%s: must be non-negative", - GRPC_ARG_MAX_METADATA_SIZE); - } else { + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + if (value >= 0) { push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, - (uint32_t)channel_args->args[i].value.integer); + (uint32_t)value); } } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_MAX_FRAME_SIZE)) { @@ -424,47 +411,39 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } } + + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "uncork"); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "init"); } -static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_ignored, - void *arg_ignored) { +static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + grpc_chttp2_transport *t = tp; t->destroying = 1; drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); + UNREF_TRANSPORT(exec_ctx, t, "destroy"); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked, - NULL, 0); - UNREF_TRANSPORT(exec_ctx, t, "destroy"); + grpc_combiner_execute(exec_ctx, t->executor.combiner, + grpc_closure_create(destroy_transport_locked, t), + GRPC_ERROR_NONE); } /** block grpc_endpoint_shutdown being called until a paired allow_endpoint_shutdown is made */ static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { - GPR_ASSERT(t->ep); gpr_ref(&t->shutdown_ep_refs); } static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { if (gpr_unref(&t->shutdown_ep_refs)) { - if (t->ep) { - grpc_endpoint_shutdown(exec_ctx, t->ep); - } + grpc_endpoint_shutdown(exec_ctx, t->ep); } } -static void destroy_endpoint(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - grpc_endpoint_destroy(exec_ctx, t->ep); - t->ep = NULL; - /* safe because we'll still have the ref for write */ - UNREF_TRANSPORT(exec_ctx, t, "disconnect"); -} - static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { @@ -475,9 +454,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, t->closed = 1; connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); - if (t->ep) { - allow_endpoint_shutdown_locked(exec_ctx, t); - } + allow_endpoint_shutdown_locked(exec_ctx, t); /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream_global *stream_global; @@ -511,21 +488,23 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, } #endif -static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - void *arg_ignored) { - grpc_chttp2_register_stream(t, s); +static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, + grpc_error *error) { + grpc_chttp2_stream *s = sp; + grpc_chttp2_register_stream(s->t, s); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "init"); } static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data) { + GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; memset(s, 0, sizeof(*s)); + s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is read-closed. The others are for incoming_byte_streams that are actively @@ -560,16 +539,21 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->global.in_stream_map = true; } - grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked, - NULL, 0); + grpc_closure_init(&s->init_stream, finish_init_stream_locked, s); + GRPC_CHTTP2_STREAM_REF(&s->global, "init"); + grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream, + GRPC_ERROR_NONE); + + GPR_TIMER_END("init_stream", 0); return 0; } -static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, void *arg) { +static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, + grpc_error *error) { grpc_byte_stream *bs; + grpc_chttp2_stream *s = sp; + grpc_chttp2_transport *t = s->t; GPR_TIMER_BEGIN("destroy_stream", 0); @@ -588,7 +572,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) { - incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, @@ -625,16 +609,20 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("destroy_stream", 0); - gpr_free(arg); + gpr_free(s->destroy_stream_arg); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, void *and_free_memory) { + GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked, - and_free_memory, 0); + s->destroy_stream_arg = and_free_memory; + grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s); + grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->destroy_stream, + GRPC_ERROR_NONE); + GPR_TIMER_END("destroy_stream", 0); } grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream( @@ -665,12 +653,10 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( static const char *write_state_name(grpc_chttp2_write_state state) { switch (state) { + case GRPC_CHTTP2_WRITES_CORKED: + return "CORKED"; case GRPC_CHTTP2_WRITING_INACTIVE: return "INACTIVE"; - case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: - return "REQUESTED[p=0]"; - case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: - return "REQUESTED[p=1]"; case GRPC_CHTTP2_WRITE_SCHEDULED: return "SCHEDULED"; case GRPC_CHTTP2_WRITING: @@ -693,120 +679,18 @@ static void set_write_state(grpc_chttp2_transport *t, t->executor.write_state = state; } -static void finish_global_actions(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - grpc_chttp2_executor_action_header *hdr; - grpc_chttp2_executor_action_header *next; - - GPR_TIMER_BEGIN("finish_global_actions", 0); - - for (;;) { - check_read_ops(exec_ctx, &t->global); - - gpr_mu_lock(&t->executor.mu); - if (t->executor.pending_actions_head != NULL) { - hdr = t->executor.pending_actions_head; - t->executor.pending_actions_head = t->executor.pending_actions_tail = - NULL; - gpr_mu_unlock(&t->executor.mu); - while (hdr != NULL) { - GPR_TIMER_BEGIN("chttp2:locked_action", 0); - hdr->action(exec_ctx, t, hdr->stream, hdr->arg); - GPR_TIMER_END("chttp2:locked_action", 0); - next = hdr->next; - gpr_free(hdr); - UNREF_TRANSPORT(exec_ctx, t, "pending_action"); - hdr = next; - } - continue; - } else { - t->executor.global_active = false; - switch (t->executor.write_state) { - case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: - set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking"); - REF_TRANSPORT(t, "initiate_writing"); - gpr_mu_unlock(&t->executor.mu); - grpc_exec_ctx_sched( - exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE, - t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL); - break; - case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: - start_writing(exec_ctx, t); - gpr_mu_unlock(&t->executor.mu); - break; - case GRPC_CHTTP2_WRITING_INACTIVE: - case GRPC_CHTTP2_WRITING: - case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: - case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: - case GRPC_CHTTP2_WRITE_SCHEDULED: - gpr_mu_unlock(&t->executor.mu); - break; - } - } - break; - } - - GPR_TIMER_END("finish_global_actions", 0); +static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + grpc_chttp2_transport *t = tp; + GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED); + start_writing(exec_ctx, t); } -void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *optional_stream, - grpc_chttp2_locked_action action, - void *arg, size_t sizeof_arg) { - grpc_chttp2_executor_action_header *hdr; - - GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0); - - REF_TRANSPORT(t, "run_global"); - gpr_mu_lock(&t->executor.mu); - - for (;;) { - if (!t->executor.global_active) { - t->executor.global_active = 1; - gpr_mu_unlock(&t->executor.mu); - - GPR_TIMER_BEGIN("chttp2:locked_action", 0); - action(exec_ctx, t, optional_stream, arg); - GPR_TIMER_END("chttp2:locked_action", 0); - - finish_global_actions(exec_ctx, t); - } else { - gpr_mu_unlock(&t->executor.mu); - - hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg); - hdr->stream = optional_stream; - hdr->action = action; - if (sizeof_arg == 0) { - hdr->arg = arg; - } else { - hdr->arg = hdr + 1; - memcpy(hdr->arg, arg, sizeof_arg); - } - - gpr_mu_lock(&t->executor.mu); - if (!t->executor.global_active) { - /* global lock was released while allocating memory: release & retry */ - gpr_free(hdr); - continue; - } - hdr->next = NULL; - if (t->executor.pending_actions_head != NULL) { - t->executor.pending_actions_tail = - t->executor.pending_actions_tail->next = hdr; - } else { - t->executor.pending_actions_tail = t->executor.pending_actions_head = - hdr; - } - REF_TRANSPORT(t, "pending_action"); - gpr_mu_unlock(&t->executor.mu); - } - break; - } - - UNREF_TRANSPORT(exec_ctx, t, "run_global"); - - GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0); +static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + grpc_chttp2_transport *t = tp; + t->executor.check_read_ops_scheduled = false; + check_read_ops(exec_ctx, &t->global); } /******************************************************************************* @@ -816,11 +700,12 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, bool covered_by_poller, const char *reason) { + GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); + /* Perform state checks, and transition to a scheduled state if appropriate. - Each time we finish the global lock execution, we check if we need to - write. If we do: - - (if there is a poller surrounding the write) schedule - initiate_writing, which locks and calls initiate_writing_locked to... + If we are inactive, schedule a write chain to begin once the transport + combiner finishes any executions in its current batch (which may be + scheduled AFTER this code executes). The write chain will: - call start_writing, which verifies (under the global lock) that there are things that need to be written by calling grpc_chttp2_unlocking_check_writes, and if so schedules writing_action @@ -830,31 +715,28 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, to do *another* write immediately, and if so loops back to start_writing. - Current problems: + Current problems: - too much lock entry/exiting - the writing thread can become stuck indefinitely (punt through the workqueue periodically to fix) */ grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); switch (t->executor.write_state) { - case GRPC_CHTTP2_WRITING_INACTIVE: - set_write_state(t, covered_by_poller - ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER - : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, - reason); + case GRPC_CHTTP2_WRITES_CORKED: break; - case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: - /* nothing to do: write already requested */ + case GRPC_CHTTP2_WRITING_INACTIVE: + set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason); + REF_TRANSPORT(t, "writing"); + grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, + &t->initiate_writing, GRPC_ERROR_NONE, + covered_by_poller); break; - case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: if (covered_by_poller) { /* upgrade to note poller is available to cover the write */ - set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason); + grpc_combiner_force_async_finally(t->executor.combiner); } break; - case GRPC_CHTTP2_WRITE_SCHEDULED: - /* nothing to do: write already scheduled */ - break; case GRPC_CHTTP2_WRITING: set_write_state(t, covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER @@ -871,15 +753,15 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, } break; } + GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED || - t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER); + GPR_TIMER_BEGIN("start_writing", 0); + GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED); if (!t->closed && grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing"); - REF_TRANSPORT(t, "writing"); prevent_endpoint_shutdown(t); grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); } else { @@ -890,25 +772,10 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "start_writing:nothing_to_write"); } - end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE, "Nothing to write"); - if (t->ep && !t->endpoint_reading) { - destroy_endpoint(exec_ctx, t); - } + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE); + UNREF_TRANSPORT(exec_ctx, t, "writing"); } -} - -static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, - void *arg_ignored) { - start_writing(exec_ctx, t); - UNREF_TRANSPORT(exec_ctx, t, "initiate_writing"); -} - -static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked, - NULL, 0); + GPR_TIMER_END("start_writing", 0); } void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, @@ -942,15 +809,10 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* error may be GRPC_ERROR_NONE if there is no error allocated yet. In that case, use "reason" as the text for a new error. */ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error, - const char *reason) { + grpc_chttp2_transport *t, grpc_error *error) { grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { - if (error == GRPC_ERROR_NONE && reason != NULL) { - /* create error object. */ - error = GRPC_ERROR_CREATE(reason); - } fail_pending_writes(exec_ctx, &t->global, stream_global, GRPC_ERROR_REF(error)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); @@ -958,12 +820,10 @@ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_ignored, - void *a) { - grpc_error *error = a; - +static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + GPR_TIMER_BEGIN("terminate_writing_with_lock", 0); + grpc_chttp2_transport *t = tp; allow_endpoint_shutdown_locked(exec_ctx, t); if (error != GRPC_ERROR_NONE) { @@ -972,39 +832,46 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - end_waiting_for_write(exec_ctx, t, error, NULL); + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_REF(error)); switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITES_CORKED: case GRPC_CHTTP2_WRITING_INACTIVE: - case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: - case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: case GRPC_CHTTP2_WRITE_SCHEDULED: GPR_UNREACHABLE_CODE(break); case GRPC_CHTTP2_WRITING: + GPR_TIMER_MARK("state=writing", 0); set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing"); break; case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: - set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, - "terminate_writing"); + GPR_TIMER_MARK("state=writing_stale_with_poller", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing"); + REF_TRANSPORT(t, "writing"); + grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, + &t->initiate_writing, GRPC_ERROR_NONE, + true); break; case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: - set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, - "terminate_writing"); + GPR_TIMER_MARK("state=writing_stale_no_poller", 0); + set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing"); + REF_TRANSPORT(t, "writing"); + grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, + &t->initiate_writing, GRPC_ERROR_NONE, + false); break; } - if (t->ep && !t->endpoint_reading) { - destroy_endpoint(exec_ctx, t); - } - UNREF_TRANSPORT(exec_ctx, t, "writing"); + GPR_TIMER_END("terminate_writing_with_lock", 0); } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, void *transport_writing, grpc_error *error) { + GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0); grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); - grpc_chttp2_run_with_global_lock( - exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0); + grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->terminate_writing, + GRPC_ERROR_REF(error)); + GPR_TIMER_END("grpc_chttp2_terminate_writing", 0); } static void writing_action(grpc_exec_ctx *exec_ctx, void *gt, @@ -1148,15 +1015,22 @@ static int contains_non_ok_status( static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} -static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, void *stream_op) { +static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, + grpc_error *error_ignored) { GPR_TIMER_BEGIN("perform_stream_op_locked", 0); grpc_transport_stream_op *op = stream_op; + grpc_chttp2_transport *t = op->transport_private.args[0]; + grpc_chttp2_stream *s = op->transport_private.args[1]; grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global = &s->global; + if (grpc_http_trace) { + char *str = grpc_transport_stream_op_string(op); + gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s", str); + gpr_free(str); + } + grpc_closure *on_complete = op->on_complete; if (on_complete == NULL) { on_complete = grpc_closure_create(do_nothing, NULL); @@ -1211,7 +1085,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else { if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { stream_global->seen_error = true; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } if (!stream_global->write_closed) { if (transport_global->is_client) { @@ -1278,7 +1153,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) { stream_global->seen_error = true; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } if (stream_global->write_closed) { stream_global->send_trailing_metadata = NULL; @@ -1303,7 +1179,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, stream_global->recv_initial_metadata_ready = op->recv_initial_metadata_ready; stream_global->recv_initial_metadata = op->recv_initial_metadata; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } if (op->recv_message != NULL) { @@ -1317,7 +1194,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, exec_ctx, transport_global, stream_global, transport_global->stream_lookahead, 0); } - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } if (op->recv_trailing_metadata != NULL) { @@ -1326,21 +1204,30 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, add_closure_barrier(on_complete); stream_global->recv_trailing_metadata = op->recv_trailing_metadata; stream_global->final_metadata_requested = true; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global, &on_complete, GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op_locked", 0); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "perform_stream_op"); } static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("perform_stream_op", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op, - sizeof(*op)); + grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked, + op); + op->transport_private.args[0] = gt; + op->transport_private.args[1] = gs; + GRPC_CHTTP2_STREAM_REF(&s->global, "perform_stream_op"); + grpc_combiner_execute(exec_ctx, t->executor.combiner, + &op->transport_private.closure, GRPC_ERROR_NONE); + GPR_TIMER_END("perform_stream_op", 0); } static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1362,13 +1249,20 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping"); } -static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s, void *opaque_8bytes) { +typedef struct ack_ping_args { + grpc_closure closure; + grpc_chttp2_transport *t; + uint8_t opaque_8bytes[8]; +} ack_ping_args; + +static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a, + grpc_error *error_ignored) { + ack_ping_args *args = a; grpc_chttp2_outstanding_ping *ping; - grpc_chttp2_transport_global *transport_global = &t->global; + grpc_chttp2_transport_global *transport_global = &args->t->global; for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { - if (0 == memcmp(opaque_8bytes, ping->id, 8)) { + if (0 == memcmp(args->opaque_8bytes, ping->id, 8)) { grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; @@ -1376,21 +1270,27 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, break; } } + UNREF_TRANSPORT(exec_ctx, args->t, "ack_ping"); + gpr_free(args); } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, const uint8_t *opaque_8bytes) { - grpc_chttp2_run_with_global_lock( - exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL, - ack_ping_locked, (void *)opaque_8bytes, 8); + ack_ping_args *args = gpr_malloc(sizeof(*args)); + args->t = TRANSPORT_FROM_PARSING(transport_parsing); + memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes)); + grpc_closure_init(&args->closure, ack_ping_locked, args); + REF_TRANSPORT(args->t, "ack_ping"); + grpc_combiner_execute(exec_ctx, args->t->executor.combiner, &args->closure, + GRPC_ERROR_NONE); } static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, - void *stream_op) { + void *stream_op, + grpc_error *error_ignored) { grpc_transport_op *op = stream_op; + grpc_chttp2_transport *t = op->transport_private.args[0]; grpc_error *close_transport = op->disconnect_with_error; /* If there's a set_accept_stream ensure that we're not parsing @@ -1402,8 +1302,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, return; } - grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); - if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, @@ -1429,11 +1327,11 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->bind_pollset) { - add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset); + grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset); } if (op->bind_pollset_set) { - add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set); + grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set); } if (op->send_ping) { @@ -1443,13 +1341,21 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, if (close_transport != GRPC_ERROR_NONE) { close_transport_locked(exec_ctx, t, close_transport); } + + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + + UNREF_TRANSPORT(exec_ctx, t, "transport_op"); } static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - grpc_chttp2_run_with_global_lock( - exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op)); + op->transport_private.args[0] = gt; + grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked, + op); + REF_TRANSPORT(t, "transport_op"); + grpc_combiner_execute(exec_ctx, t->executor.combiner, + &op->transport_private.closure, GRPC_ERROR_NONE); } /******************************************************************************* @@ -1458,6 +1364,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) { + GPR_TIMER_BEGIN("check_read_ops", 0); grpc_chttp2_stream_global *stream_global; grpc_byte_stream *bs; while ( @@ -1467,7 +1374,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, if (stream_global->seen_error) { while ((bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } if (stream_global->exceeded_metadata_size) { cancel_from_api( @@ -1490,7 +1397,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, stream_global->seen_error && (bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } if (stream_global->incoming_frames.head != NULL) { *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( @@ -1511,7 +1418,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, if (stream_global->seen_error) { while ((bs = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames)) != NULL) { - incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); + incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); } if (stream_global->exceeded_metadata_size) { cancel_from_api( @@ -1532,6 +1439,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } } } + GPR_TIMER_END("check_read_ops", 0); } static void decrement_active_streams_locked( @@ -1539,7 +1447,8 @@ static void decrement_active_streams_locked( grpc_chttp2_stream_global *stream_global) { if ((stream_global->all_incoming_byte_streams_finished = gpr_unref(&stream_global->active_streams))) { - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } } @@ -1643,7 +1552,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, } if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) { stream_global->seen_error = true; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, 1, due_to_error); @@ -1655,7 +1565,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_status_code status, gpr_slice *slice) { if (status != GRPC_STATUS_OK) { stream_global->seen_error = true; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } /* stream_global->recv_trailing_metadata_finished gives us a last chance replacement: we've received trailing metadata, @@ -1679,7 +1590,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_mdstr_from_slice(gpr_slice_ref(*slice)))); } stream_global->published_trailing_metadata = true; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } if (slice) { gpr_slice_unref(*slice); @@ -1739,7 +1651,8 @@ void grpc_chttp2_mark_stream_closed( GRPC_ERROR_UNREF(error); return; } - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); if (close_reads && !stream_global->read_closed) { stream_global->read_closed_error = GRPC_ERROR_REF(error); stream_global->read_closed = true; @@ -1920,6 +1833,10 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { + if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); end_all_the_calls(exec_ctx, t, error); } @@ -1955,16 +1872,12 @@ static void update_global_window(void *args, uint32_t id, void *stream) { * INPUT PROCESSING - PARSING */ -static void reading_action_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, void *arg); static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, void *arg); -static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, void *arg); +static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { @@ -1972,16 +1885,20 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp, reading_action_locked -> (parse_unlocked -> post_parse_locked)? -> post_reading_action_locked */ - grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked, - GRPC_ERROR_REF(error), 0); + GPR_TIMER_BEGIN("reading_action", 0); + grpc_chttp2_transport *t = tp; + grpc_combiner_execute(exec_ctx, t->executor.combiner, + &t->reading_action_locked, GRPC_ERROR_REF(error)); + GPR_TIMER_END("reading_action", 0); } -static void reading_action_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, void *arg) { +static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp, + grpc_error *error) { + GPR_TIMER_BEGIN("reading_action_locked", 0); + + grpc_chttp2_transport *t = tp; grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; - grpc_error *error = arg; GPR_ASSERT(!t->executor.parsing_active); if (!t->closed) { @@ -1990,10 +1907,13 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, GRPC_ERROR_REF(error), + NULL); } else { - post_reading_action_locked(exec_ctx, t, s_unused, arg); + post_reading_action_locked(exec_ctx, t, error); } + + GPR_TIMER_END("reading_action_locked", 0); } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2008,7 +1928,8 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_error *parse_error = GRPC_ERROR_NONE; for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) { - parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]); + parse_error = + grpc_http_parser_parse(&parser, t->read_buffer.slices[i], NULL); } if (parse_error == GRPC_ERROR_NONE && (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) { @@ -2045,13 +1966,15 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) { GRPC_ERROR_UNREF(errors[i]); } + grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked, + err); GPR_TIMER_END("reading_action.parse", 0); - grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err, - 0); } -static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, void *arg) { +static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + GPR_TIMER_BEGIN("post_parse_locked", 0); + grpc_chttp2_transport *t = arg; grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; /* copy parsing qbuf to global qbuf */ @@ -2077,7 +2000,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (t->post_parsing_op) { grpc_transport_op *op = t->post_parsing_op; t->post_parsing_op = NULL; - perform_transport_op_locked(exec_ctx, t, NULL, op); + perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE); gpr_free(op); } /* if a stream is in the stream map, and gets cancelled, we need to @@ -2094,39 +2017,32 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } - post_reading_action_locked(exec_ctx, t, s_unused, arg); + post_reading_action_locked(exec_ctx, t, error); + GPR_TIMER_END("post_parse_locked", 0); } -static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, - void *arg) { - grpc_error *error = arg; +static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + GPR_TIMER_BEGIN("post_reading_action_locked", 0); + grpc_chttp2_transport *t = arg; bool keep_reading = false; + GRPC_ERROR_REF(error); if (error == GRPC_ERROR_NONE && t->closed) { error = GRPC_ERROR_CREATE("Transport closed"); } if (error != GRPC_ERROR_NONE) { - if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); - } drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; if (grpc_http_write_state_trace) { gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t, write_state_name(t->executor.write_state)); } - if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) { - destroy_endpoint(exec_ctx, t); - } } else if (!t->closed) { keep_reading = true; REF_TRANSPORT(t, "keep_reading"); prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); - GRPC_ERROR_UNREF(error); if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action); @@ -2135,6 +2051,9 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, } else { UNREF_TRANSPORT(exec_ctx, t, "reading_action"); } + GRPC_ERROR_UNREF(error); + + GPR_TIMER_END("post_reading_action_locked", 0); } /******************************************************************************* @@ -2156,36 +2075,16 @@ static void connectivity_state_set( * POLLSET STUFF */ -static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, void *pollset) { - if (t->ep) { - grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); - } -} - -static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s_unused, - void *pollset_set) { - if (t->ep) { - grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); - } -} - static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset *pollset) { - /* TODO(ctiller): keep pollset alive */ - grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, - (grpc_chttp2_stream *)gs, - add_to_pollset_locked, pollset, 0); + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset); } static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset_set *pollset_set) { - grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, - (grpc_chttp2_stream *)gs, - add_to_pollset_set_locked, pollset_set, 0); + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set); } /******************************************************************************* @@ -2197,6 +2096,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&bs->refs)) { GRPC_ERROR_UNREF(bs->error); gpr_slice_buffer_destroy(&bs->slices); + gpr_mu_destroy(&bs->slice_mu); gpr_free(bs); } } @@ -2242,38 +2142,34 @@ static void incoming_byte_stream_update_flow_control( } } -typedef struct { - grpc_chttp2_incoming_byte_stream *byte_stream; - gpr_slice *slice; - size_t max_size_hint; - grpc_closure *on_complete; -} incoming_byte_stream_next_arg; - static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - void *argp) { - incoming_byte_stream_next_arg *arg = argp; - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)arg->byte_stream; + void *argp, + grpc_error *error_ignored) { + grpc_chttp2_incoming_byte_stream *bs = argp; grpc_chttp2_transport_global *transport_global = &bs->transport->global; grpc_chttp2_stream_global *stream_global = &bs->stream->global; if (bs->is_tail) { - incoming_byte_stream_update_flow_control(exec_ctx, transport_global, - stream_global, arg->max_size_hint, - bs->slices.length); - } + gpr_mu_lock(&bs->slice_mu); + size_t cur_length = bs->slices.length; + gpr_mu_unlock(&bs->slice_mu); + incoming_byte_stream_update_flow_control( + exec_ctx, transport_global, stream_global, + bs->next_action.max_size_hint, cur_length); + } + gpr_mu_lock(&bs->slice_mu); if (bs->slices.count > 0) { - *arg->slice = gpr_slice_buffer_take_first(&bs->slices); - grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); - } else if (bs->error != GRPC_ERROR_NONE) { - grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), + *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices); + grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE, NULL); + } else if (bs->error != GRPC_ERROR_NONE) { + grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, + GRPC_ERROR_REF(bs->error), NULL); } else { - bs->on_next = arg->on_complete; - bs->next = arg->slice; + bs->on_next = bs->next_action.on_complete; + bs->next = bs->next_action.slice; } + gpr_mu_unlock(&bs->slice_mu); incoming_byte_stream_unref(exec_ctx, bs); } @@ -2281,13 +2177,18 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, grpc_closure *on_complete) { + GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; - incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete}; gpr_ref(&bs->refs); - grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, - incoming_byte_stream_next_locked, &arg, - sizeof(arg)); + bs->next_action.slice = slice; + bs->next_action.max_size_hint = max_size_hint; + bs->next_action.on_complete = on_complete; + grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked, + bs); + grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner, + &bs->next_action.closure, GRPC_ERROR_NONE); + GPR_TIMER_END("incoming_byte_stream_next", 0); return 0; } @@ -2295,9 +2196,8 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - void *byte_stream) { + void *byte_stream, + grpc_error *error_ignored) { grpc_chttp2_incoming_byte_stream *bs = byte_stream; GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); decrement_active_streams_locked(exec_ctx, &bs->transport->global, @@ -2307,10 +2207,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { + GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0); grpc_chttp2_incoming_byte_stream *bs = (grpc_chttp2_incoming_byte_stream *)byte_stream; - grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, - incoming_byte_stream_destroy_locked, bs, 0); + grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked, + bs); + grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner, + &bs->destroy_action, GRPC_ERROR_NONE); + GPR_TIMER_END("incoming_byte_stream_destroy", 0); } typedef struct { @@ -2318,90 +2222,45 @@ typedef struct { gpr_slice slice; } incoming_byte_stream_push_arg; -static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - void *argp) { - incoming_byte_stream_push_arg *arg = argp; - grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; +void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + gpr_slice slice) { + gpr_mu_lock(&bs->slice_mu); if (bs->on_next != NULL) { - *bs->next = arg->slice; + *bs->next = slice; grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { - gpr_slice_buffer_add(&bs->slices, arg->slice); + gpr_slice_buffer_add(&bs->slices, slice); } - incoming_byte_stream_unref(exec_ctx, bs); -} - -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs, - gpr_slice slice) { - incoming_byte_stream_push_arg arg = {bs, slice}; - gpr_ref(&bs->refs); - grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, - incoming_byte_stream_push_locked, &arg, - sizeof(arg)); -} - -typedef struct { - grpc_chttp2_incoming_byte_stream *bs; - grpc_error *error; -} bs_fail_args; - -static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs, - grpc_error *error) { - bs_fail_args *a = gpr_malloc(sizeof(*a)); - a->bs = bs; - a->error = error; - return a; -} - -static void incoming_byte_stream_finished_failed_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, - void *argp) { - bs_fail_args *a = argp; - grpc_chttp2_incoming_byte_stream *bs = a->bs; - grpc_error *error = a->error; - gpr_free(a); - grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); - bs->on_next = NULL; - GRPC_ERROR_UNREF(bs->error); - bs->error = error; - incoming_byte_stream_unref(exec_ctx, bs); + gpr_mu_unlock(&bs->slice_mu); } -static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - void *argp) { - grpc_chttp2_incoming_byte_stream *bs = argp; +static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx, + void *bsp, grpc_error *error) { + grpc_chttp2_incoming_byte_stream *bs = bsp; + if (error != GRPC_ERROR_NONE) { + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); + bs->on_next = NULL; + GRPC_ERROR_UNREF(bs->error); + bs->error = error; + } incoming_byte_stream_unref(exec_ctx, bs); } void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error, int from_parsing_thread) { + GPR_TIMER_BEGIN("grpc_chttp2_incoming_byte_stream_finished", 0); if (from_parsing_thread) { - if (error == GRPC_ERROR_NONE) { - grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream, - incoming_byte_stream_finished_ok_locked, - bs, 0); - } else { - grpc_chttp2_run_with_global_lock( - exec_ctx, bs->transport, bs->stream, - incoming_byte_stream_finished_failed_locked, - make_bs_fail_args(bs, error), 0); - } + grpc_closure_init(&bs->finished_action, + incoming_byte_stream_finished_locked, bs); + grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner, + &bs->finished_action, GRPC_ERROR_REF(error)); } else { - if (error == GRPC_ERROR_NONE) { - incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport, - bs->stream, bs); - } else { - incoming_byte_stream_finished_failed_locked( - exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error)); - } + incoming_byte_stream_finished_locked(exec_ctx, bs, error); } + GPR_TIMER_END("grpc_chttp2_incoming_byte_stream_finished", 0); } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( @@ -2414,6 +2273,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->base.flags = flags; incoming_byte_stream->base.next = incoming_byte_stream_next; incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; + gpr_mu_init(&incoming_byte_stream->slice_mu); gpr_ref_init(&incoming_byte_stream->refs, 2); incoming_byte_stream->next_message = NULL; incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index d67c014e54..04b788b702 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -48,6 +48,7 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/transport_impl.h" @@ -161,9 +162,20 @@ struct grpc_chttp2_incoming_byte_stream { grpc_chttp2_transport *transport; grpc_chttp2_stream *stream; int is_tail; + + gpr_mu slice_mu; // protects slices, on_next gpr_slice_buffer slices; grpc_closure *on_next; gpr_slice *next; + + struct { + grpc_closure closure; + gpr_slice *slice; + size_t max_size_hint; + grpc_closure *on_complete; + } next_action; + grpc_closure destroy_action; + grpc_closure finished_action; }; typedef struct { @@ -296,23 +308,11 @@ struct grpc_chttp2_transport_parsing { int64_t outgoing_window; }; -typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, void *arg); - -typedef struct grpc_chttp2_executor_action_header { - grpc_chttp2_stream *stream; - grpc_chttp2_locked_action action; - struct grpc_chttp2_executor_action_header *next; - void *arg; -} grpc_chttp2_executor_action_header; - typedef enum { + /** no writing activity allowed */ + GRPC_CHTTP2_WRITES_CORKED, /** no writing activity */ GRPC_CHTTP2_WRITING_INACTIVE, - /** write has been requested, but not scheduled yet */ - GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, - GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, /** write has been requested and scheduled against the workqueue */ GRPC_CHTTP2_WRITE_SCHEDULED, /** write has been initiated after being reaped from the workqueue */ @@ -333,7 +333,7 @@ struct grpc_chttp2_transport { gpr_refcount shutdown_ep_refs; struct { - gpr_mu mu; + grpc_combiner *combiner; /** is a thread currently in the global lock */ bool global_active; @@ -341,9 +341,8 @@ struct grpc_chttp2_transport { bool parsing_active; /** write execution state of the transport */ grpc_chttp2_write_state write_state; - - grpc_chttp2_executor_action_header *pending_actions_head; - grpc_chttp2_executor_action_header *pending_actions_tail; + /** has a check_read_ops been scheduled */ + bool check_read_ops_scheduled; } executor; /** is the transport destroying itself? */ @@ -380,10 +379,16 @@ struct grpc_chttp2_transport { grpc_closure writing_action; /** closure to start reading from the endpoint */ grpc_closure reading_action; + grpc_closure reading_action_locked; + grpc_closure post_parse_locked; /** closure to actually do parsing */ grpc_closure parsing_action; /** closure to initiate writing */ grpc_closure initiate_writing; + /** closure to finish writing */ + grpc_closure terminate_writing; + /** closure to flush read state up the stack */ + grpc_closure initiate_read_flush_locked; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -527,11 +532,16 @@ struct grpc_chttp2_stream_parsing { }; struct grpc_chttp2_stream { + grpc_chttp2_transport *t; grpc_stream_refcount *refcount; grpc_chttp2_stream_global global; grpc_chttp2_stream_writing writing; grpc_chttp2_stream_parsing parsing; + grpc_closure init_stream; + grpc_closure destroy_stream; + void *destroy_stream_arg; + grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; }; @@ -626,7 +636,7 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( grpc_chttp2_stream_global **stream_global); void grpc_chttp2_list_add_check_read_ops( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); bool grpc_chttp2_list_remove_check_read_ops( grpc_chttp2_transport_global *transport_global, @@ -706,12 +716,6 @@ void grpc_chttp2_complete_closure_step( grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure, grpc_error *error); -void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *transport, - grpc_chttp2_stream *optional_stream, - grpc_chttp2_locked_action action, - void *arg, size_t sizeof_arg); - #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 482cd55c44..0e6d579ba9 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -177,7 +177,8 @@ void grpc_chttp2_publish_reads( stream_global->seen_error = true; stream_global->exceeded_metadata_size = stream_parsing->exceeded_metadata_size; - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } /* flush stats to global stream state */ @@ -203,7 +204,8 @@ void grpc_chttp2_publish_reads( stream_global->incoming_frames.tail->is_tail = 0; } if (stream_parsing->data_parser.incoming_frames.head != NULL) { - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } grpc_chttp2_incoming_frame_queue_merge( &stream_global->incoming_frames, @@ -219,7 +221,8 @@ void grpc_chttp2_publish_reads( GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, stream_parsing->metadata_buffer[0], stream_global->received_initial_metadata); - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } if (!stream_global->published_trailing_metadata && stream_parsing->got_metadata_on_parse[1]) { @@ -228,7 +231,8 @@ void grpc_chttp2_publish_reads( GPR_SWAP(grpc_chttp2_incoming_metadata_buffer, stream_parsing->metadata_buffer[1], stream_global->received_trailing_metadata); - grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global, + stream_global); } if (stream_parsing->forced_close_error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 2eb5f5f632..4dc4968248 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -298,8 +298,15 @@ int grpc_chttp2_list_pop_waiting_for_concurrency( } void grpc_chttp2_list_add_check_read_ops( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { + grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); + if (!t->executor.check_read_ops_scheduled) { + grpc_combiner_execute_finally(exec_ctx, t->executor.combiner, + &t->initiate_read_flush_locked, + GRPC_ERROR_NONE, false); + t->executor.check_read_ops_scheduled = true; + } stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_CHECK_READ_OPS); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 311b26e354..979515bd54 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -55,15 +55,6 @@ int grpc_chttp2_unlocking_check_writes( transport_global->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; - /* simple writes are queued to qbuf, and flushed here */ - gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf); - GPR_ASSERT(transport_global->qbuf.count == 0); - - grpc_chttp2_hpack_compressor_set_max_table_size( - &transport_writing->hpack_compressor, - transport_global->settings[GRPC_PEER_SETTINGS] - [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); - if (transport_global->dirtied_local_settings && !transport_global->sent_local_settings) { gpr_slice_buffer_add( @@ -77,6 +68,16 @@ int grpc_chttp2_unlocking_check_writes( transport_global->sent_local_settings = 1; } + /* simple writes are queued to qbuf, and flushed here */ + gpr_slice_buffer_move_into(&transport_global->qbuf, + &transport_writing->outbuf); + GPR_ASSERT(transport_global->qbuf.count == 0); + + grpc_chttp2_hpack_compressor_set_max_table_size( + &transport_writing->hpack_compressor, + transport_global->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); + GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); if (transport_writing->outgoing_window > 0) { @@ -344,6 +345,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, void grpc_chttp2_cleanup_writing( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { + GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0); grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; @@ -382,4 +384,5 @@ void grpc_chttp2_cleanup_writing( GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); } gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); + GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 029c15014e..00a5be419e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -239,6 +239,14 @@ static const char *op_id_string(enum e_op_id i) { return "UNKNOWN"; } +static void free_read_buffer(stream_obj *s) { + if (s->state.rs.read_buffer && + s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { + gpr_free(s->state.rs.read_buffer); + s->state.rs.read_buffer = NULL; + } +} + /* Add a new stream op to op storage. */ @@ -294,7 +302,7 @@ static void remove_from_storage(struct stream_obj *s, /* Cycle through ops and try to take next action. Break when either an action with callback is taken, or no action is possible. - This can be executed from the Cronet network thread via cronet callback + This can get executed from the Cronet network thread via cronet callback or on the application supplied thread via the perform_stream_op function. */ static void execute_from_storage(stream_obj *s) { @@ -329,6 +337,7 @@ static void execute_from_storage(stream_obj *s) { static void on_failed(cronet_bidirectional_stream *stream, int net_error) { CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); stream_obj *s = (stream_obj *)stream->annotation; + gpr_mu_lock(&s->mu); cronet_bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_FAILED] = true; s->cbs = NULL; @@ -340,6 +349,8 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } + free_read_buffer(s); + gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -349,6 +360,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) { static void on_canceled(cronet_bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; + gpr_mu_lock(&s->mu); cronet_bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_CANCELED] = true; s->cbs = NULL; @@ -360,6 +372,8 @@ static void on_canceled(cronet_bidirectional_stream *stream) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } + free_read_buffer(s); + gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -369,9 +383,12 @@ static void on_canceled(cronet_bidirectional_stream *stream) { static void on_succeeded(cronet_bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; + gpr_mu_lock(&s->mu); cronet_bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; s->cbs = NULL; + free_read_buffer(s); + gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -381,6 +398,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) { static void on_request_headers_sent(cronet_bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; + gpr_mu_lock(&s->mu); s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; /* Free the memory allocated for headers */ @@ -388,6 +406,7 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) { gpr_free(s->header_array.headers); s->header_array.headers = NULL; } + gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -401,6 +420,7 @@ static void on_response_headers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream, headers, negotiated_protocol); stream_obj *s = (stream_obj *)stream->annotation; + gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); @@ -412,6 +432,7 @@ static void on_response_headers_received( grpc_mdstr_from_string(headers->headers[i].value))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; + gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -422,11 +443,13 @@ static void on_write_completed(cronet_bidirectional_stream *stream, const char *data) { stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); + gpr_mu_lock(&s->mu); if (s->state.ws.write_buffer) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } s->state.state_callback_received[OP_SEND_MESSAGE] = true; + gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -438,6 +461,7 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); + gpr_mu_lock(&s->mu); s->state.state_callback_received[OP_RECV_MESSAGE] = true; if (count > 0) { s->state.rs.received_bytes += count; @@ -448,11 +472,14 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data, cronet_bidirectional_stream_read( s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->state.rs.remaining_bytes); + gpr_mu_unlock(&s->mu); } else { + gpr_mu_unlock(&s->mu); execute_from_storage(s); } } else { s->state.rs.read_stream_closed = true; + gpr_mu_unlock(&s->mu); execute_from_storage(s); } } @@ -466,6 +493,7 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj *s = (stream_obj *)stream->annotation; + gpr_mu_lock(&s->mu); memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; @@ -481,6 +509,7 @@ static void on_response_trailers_received( s->state.rs.trailing_metadata_valid = true; } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; + gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -513,7 +542,8 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer, */ static void convert_metadata_to_cronet_headers( grpc_linked_mdelem *head, const char *host, char **pp_url, - cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) { + cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers, + const char ** method) { grpc_linked_mdelem *curr = head; /* Walk the linked list and get number of header fields */ size_t num_headers_available = 0; @@ -540,11 +570,20 @@ static void convert_metadata_to_cronet_headers( curr = curr->next; const char *key = grpc_mdstr_as_c_string(mdelem->key); const char *value = grpc_mdstr_as_c_string(mdelem->value); - if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME || + if (mdelem->key == GRPC_MDSTR_SCHEME || mdelem->key == GRPC_MDSTR_AUTHORITY) { /* Cronet populates these fields on its own */ continue; } + if (mdelem->key == GRPC_MDSTR_METHOD) { + if (mdelem->value == GRPC_MDSTR_PUT) { + *method = "PUT"; + } else { + /* POST method in default*/ + *method = "POST"; + } + continue; + } if (mdelem->key == GRPC_MDSTR_PATH) { /* Create URL by appending :path value to the hostname */ gpr_asprintf(pp_url, "https://%s%s", host, value); @@ -741,15 +780,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); - char *url; + char *url = NULL; + const char *method = "POST"; s->header_array.headers = NULL; convert_metadata_to_cronet_headers( stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, - &s->header_array.headers, &s->header_array.count); + &s->header_array.headers, &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs, url); - cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array, + cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; result = ACTION_TAKEN_WITH_CALLBACK; @@ -757,14 +797,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); - if (!stream_state->state_op_done[OP_CANCEL_ERROR]) { + if (stream_state->state_op_done[OP_CANCEL_ERROR] || + stream_state->state_callback_received[OP_FAILED]) { + grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED, NULL); + } else { grpc_chttp2_incoming_metadata_buffer_publish( &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE, NULL); - } else { - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED, NULL); } stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -772,32 +813,40 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); - gpr_slice_buffer write_slice_buffer; - gpr_slice slice; - gpr_slice_buffer_init(&write_slice_buffer); - grpc_byte_stream_next(NULL, stream_op->send_message, &slice, - stream_op->send_message->length, NULL); - /* Check that compression flag is OFF. We don't support compression yet. */ - if (stream_op->send_message->flags != 0) { - gpr_log(GPR_ERROR, "Compression is not supported"); - GPR_ASSERT(stream_op->send_message->flags == 0); - } - gpr_slice_buffer_add(&write_slice_buffer, slice); - if (write_slice_buffer.count != 1) { - /* Empty request not handled yet */ - gpr_log(GPR_ERROR, "Empty request is not supported"); - GPR_ASSERT(write_slice_buffer.count == 1); - } - if (write_slice_buffer.count > 0) { - size_t write_buffer_size; - create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size); - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)", - s->cbs, stream_state->ws.write_buffer); - stream_state->state_callback_received[OP_SEND_MESSAGE] = false; - cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, - (int)write_buffer_size, false); - result = ACTION_TAKEN_WITH_CALLBACK; + if (stream_state->state_callback_received[OP_FAILED]) { + result = NO_ACTION_POSSIBLE; + CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); + } else { + gpr_slice_buffer write_slice_buffer; + gpr_slice slice; + gpr_slice_buffer_init(&write_slice_buffer); + grpc_byte_stream_next(NULL, stream_op->send_message, &slice, + stream_op->send_message->length, NULL); + /* Check that compression flag is OFF. We don't support compression yet. + */ + if (stream_op->send_message->flags != 0) { + gpr_log(GPR_ERROR, "Compression is not supported"); + GPR_ASSERT(stream_op->send_message->flags == 0); + } + gpr_slice_buffer_add(&write_slice_buffer, slice); + if (write_slice_buffer.count != 1) { + /* Empty request not handled yet */ + gpr_log(GPR_ERROR, "Empty request is not supported"); + GPR_ASSERT(write_slice_buffer.count == 1); + } + if (write_slice_buffer.count > 0) { + size_t write_buffer_size; + create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, + &write_buffer_size); + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)", + s->cbs, stream_state->ws.write_buffer); + stream_state->state_callback_received[OP_SEND_MESSAGE] = false; + cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, + (int)write_buffer_size, false); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + result = NO_ACTION_POSSIBLE; + } } stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; @@ -805,7 +854,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); - if (stream_state->state_op_done[OP_CANCEL_ERROR]) { + if (stream_state->state_op_done[OP_CANCEL_ERROR] || + stream_state->state_callback_received[OP_FAILED]) { + CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_CANCELLED, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; @@ -861,8 +912,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + result = NO_ACTION_POSSIBLE; } - result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_state->rs.remaining_bytes == 0) { CRONET_LOG(GPR_DEBUG, "read operation complete"); gpr_slice read_data_slice = @@ -870,6 +923,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); + free_read_buffer(s); gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer); gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); @@ -903,11 +957,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); - stream_state->state_callback_received[OP_SEND_MESSAGE] = false; - cronet_bidirectional_stream_write(s->cbs, "", 0, true); + if (stream_state->state_callback_received[OP_FAILED]) { + result = NO_ACTION_POSSIBLE; + CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); + } else { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", + s->cbs); + stream_state->state_callback_received[OP_SEND_MESSAGE] = false; + cronet_bidirectional_stream_write(s->cbs, "", 0, true); + result = ACTION_TAKEN_WITH_CALLBACK; + } stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; - result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->cancel_error && op_can_be_run(stream_op, stream_state, &oas->state, OP_CANCEL_ERROR)) { |