diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.c | 114 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy.h | 2 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c | 20 | ||||
-rw-r--r-- | src/core/lib/http/httpcli.c | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi | 11 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi | 22 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 2 |
7 files changed, 97 insertions, 76 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 95578d989c..24843d52e9 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -760,12 +760,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, #define CANCELLED_CALL ((grpc_subchannel_call *)1) -typedef enum { - /* zero so that it can be default-initialized */ - GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0, - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL -} subchannel_creation_phase; - /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting @@ -793,7 +787,7 @@ typedef struct client_channel_call_data { gpr_atm subchannel_call; gpr_arena *arena; - subchannel_creation_phase creation_phase; + bool pick_pending; grpc_connected_subchannel *connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; @@ -915,11 +909,10 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(calld->creation_phase == - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + GPR_ASSERT(calld->pick_pending); + calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (calld->connected_subchannel == NULL) { gpr_atm_no_barrier_store(&calld->subchannel_call, 1); fail_locked(exec_ctx, calld, @@ -988,8 +981,7 @@ static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready, - grpc_error *error); + grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready); static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -999,52 +991,51 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, } else if (error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { - if (pick_subchannel_locked( - exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, cpa->connected_subchannel, - cpa->subchannel_call_context, cpa->on_ready, GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, + cpa->connected_subchannel, + cpa->subchannel_call_context, cpa->on_ready)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } } gpr_free(cpa); } +static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_error *error) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->lb_policy != NULL) { + grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, + &calld->connected_subchannel, + GRPC_ERROR_REF(error)); + } + for (grpc_closure *closure = chand->waiting_for_config_closures.head; + closure != NULL; closure = closure->next_data.next) { + continue_picking_args *cpa = closure->cb_arg; + if (cpa->connected_subchannel == &calld->connected_subchannel) { + cpa->connected_subchannel = NULL; + grpc_closure_sched(exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); + } + } + GRPC_ERROR_UNREF(error); +} + static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **connected_subchannel, - grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready, - grpc_error *error) { + grpc_call_context_element *subchannel_call_context, + grpc_closure *on_ready) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - continue_picking_args *cpa; - grpc_closure *closure; GPR_ASSERT(connected_subchannel); - if (initial_metadata == NULL) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, - connected_subchannel, - GRPC_ERROR_REF(error)); - } - for (closure = chand->waiting_for_config_closures.head; closure != NULL; - closure = closure->next_data.next) { - cpa = closure->cb_arg; - if (cpa->connected_subchannel == connected_subchannel) { - cpa->connected_subchannel = NULL; - grpc_closure_sched(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); - } - } - GPR_TIMER_END("pick_subchannel", 0); - GRPC_ERROR_UNREF(error); - return true; - } - GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { apply_final_configuration_locked(exec_ctx, elem); grpc_lb_policy *lb_policy = chand->lb_policy; @@ -1067,8 +1058,7 @@ static bool pick_subchannel_locked( } } const grpc_lb_policy_pick_args inputs = { - initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, - gpr_inf_future(GPR_CLOCK_MONOTONIC)}; + initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem}; // Wrap the user-provided callback in order to hold a strong reference to // the LB policy for the duration of the pick. @@ -1101,7 +1091,7 @@ static bool pick_subchannel_locked( &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { - cpa = gpr_malloc(sizeof(*cpa)); + continue_picking_args *cpa = gpr_malloc(sizeof(*cpa)); cpa->initial_metadata = initial_metadata; cpa->initial_metadata_flags = initial_metadata_flags; cpa->connected_subchannel = connected_subchannel; @@ -1157,16 +1147,13 @@ static void start_transport_stream_op_batch_locked_inner( error to the caller when the first op does get passed down. */ calld->cancel_error = GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); - switch (calld->creation_phase) { - case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: - fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - break; - case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel_locked( - exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, NULL, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - break; + if (calld->pick_pending) { + cancel_pick_locked( + exec_ctx, elem, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); + } else { + fail_locked(exec_ctx, calld, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); } grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, @@ -1176,9 +1163,9 @@ static void start_transport_stream_op_batch_locked_inner( } } /* if we don't have a subchannel, try to get one */ - if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - calld->connected_subchannel == NULL && op->send_initial_metadata) { - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; + if (!calld->pick_pending && calld->connected_subchannel == NULL && + op->send_initial_metadata) { + calld->pick_pending = true; grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, grpc_combiner_scheduler(chand->combiner, true)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); @@ -1190,8 +1177,8 @@ static void start_transport_stream_op_batch_locked_inner( op->payload->send_initial_metadata.send_initial_metadata, op->payload->send_initial_metadata.send_initial_metadata_flags, &calld->connected_subchannel, calld->subchannel_call_context, - &calld->next_step, GRPC_ERROR_NONE)) { - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + &calld->next_step)) { + calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, @@ -1199,8 +1186,7 @@ static void start_transport_stream_op_batch_locked_inner( } } /* if we've got a subchannel, then let's ask it to create a call */ - if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - calld->connected_subchannel != NULL) { + if (!calld->pick_pending && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, @@ -1357,7 +1343,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } - GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); + GPR_ASSERT(!calld->pick_pending); GPR_ASSERT(calld->waiting_ops_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1464,12 +1450,12 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete) { + grpc_connectivity_state *state, grpc_closure *closure) { channel_data *chand = elem->channel_data; external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); w->chand = chand; w->pollset = pollset; - w->on_complete = on_complete; + w->on_complete = closure; w->state = state; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index fefcb4912c..184b2ef720 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -62,8 +62,6 @@ typedef struct grpc_lb_policy_pick_args { uint32_t initial_metadata_flags; /** Storage for LB token in \a initial_metadata, or NULL if not used */ grpc_linked_mdelem *lb_token_mdelem_storage; - /** Deadline for the call to the LB server */ - gpr_timespec deadline; } grpc_lb_policy_pick_args; struct grpc_lb_policy_vtable { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 18b36dde8a..cdec7a1a02 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -95,8 +95,7 @@ headers. Therefore, sockaddr.h must always be included first */ #include "src/core/lib/iomgr/sockaddr.h" -#include <errno.h> - +#include <limits.h> #include <string.h> #include <grpc/byte_buffer_reader.h> @@ -310,8 +309,8 @@ typedef struct glb_lb_policy { grpc_client_channel_factory *cc_factory; grpc_channel_args *args; - /** deadline for the LB's call */ - gpr_timespec deadline; + /** timeout in milliseconds for the LB call. 0 means no deadline. */ + int lb_call_timeout_ms; /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -917,6 +916,10 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->cc_factory = args->client_channel_factory; GPR_ASSERT(glb_policy->cc_factory != NULL); + arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); + glb_policy->lb_call_timeout_ms = grpc_channel_arg_get_integer( + arg, (grpc_integer_options){0, 0, INT_MAX}); + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. grpc_arg new_arg; @@ -1089,7 +1092,6 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - glb_policy->deadline = pick_args->deadline; bool pick_done; if (glb_policy->rr_policy != NULL) { @@ -1275,11 +1277,17 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, * glb_policy->base.interested_parties, which is comprised of the polling * entities from \a client_channel. */ grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name); + gpr_timespec deadline = + glb_policy->lb_call_timeout_ms == 0 + ? gpr_inf_future(GPR_CLOCK_MONOTONIC) + : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(glb_policy->lb_call_timeout_ms, + GPR_TIMESPAN)); glb_policy->lb_call = grpc_channel_create_pollset_set_call( exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, - &host, glb_policy->deadline, NULL); + &host, deadline, NULL); grpc_slice_unref_internal(exec_ctx, host); if (glb_policy->client_stats != NULL) { diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 453a64b049..0ac2c2ad52 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -105,7 +105,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - grpc_closure_sched(exec_ctx, req->on_done, error); + grpc_closure_sched(exec_ctx, req->on_done, GRPC_ERROR_REF(error)); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 34b2623d34..502b6556b4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -37,9 +37,16 @@ cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 cdef class CompletionQueue: - def __cinit__(self): + def __cinit__(self, shutdown_cq=False): + cdef grpc_completion_queue_attributes c_attrs grpc_init() - with nogil: + if shutdown_cq: + c_attrs.version = 1 + c_attrs.cq_completion_type = GRPC_CQ_NEXT + c_attrs.cq_polling_type = GRPC_CQ_NON_LISTENING + self.c_completion_queue = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&c_attrs), &c_attrs, NULL); + else: self.c_completion_queue = grpc_completion_queue_create_for_next(NULL) self.is_shutting_down = False self.is_shutdown = False diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index f66f6e4122..1db2056d47 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -217,6 +217,20 @@ cdef extern from "grpc/grpc.h": GRPC_CALL_ERROR_INVALID_FLAGS GRPC_CALL_ERROR_INVALID_METADATA + ctypedef enum grpc_cq_completion_type: + GRPC_CQ_NEXT + GRPC_CQ_PLUCK + + ctypedef enum grpc_cq_polling_type: + GRPC_CQ_DEFAULT_POLLING + GRPC_CQ_NON_LISTENING + GRPC_CQ_NON_POLLING + + ctypedef struct grpc_completion_queue_attributes: + int version + grpc_cq_completion_type cq_completion_type + grpc_cq_polling_type cq_polling_type + ctypedef enum grpc_connectivity_state: GRPC_CHANNEL_IDLE GRPC_CHANNEL_CONNECTING @@ -309,6 +323,14 @@ cdef extern from "grpc/grpc.h": void grpc_init() nogil void grpc_shutdown() nogil + ctypedef struct grpc_completion_queue_factory: + pass + + grpc_completion_queue_factory *grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) nogil + grpc_completion_queue *grpc_completion_queue_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attr, void* reserved) nogil grpc_completion_queue *grpc_completion_queue_create_for_next(void *reserved) nogil grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 97192efda7..5233edc789 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -85,7 +85,7 @@ cdef class Server: def start(self): if self.is_started: raise ValueError("the server has already started") - self.backup_shutdown_queue = CompletionQueue() + self.backup_shutdown_queue = CompletionQueue(shutdown_cq=True) self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True with nogil: |