diff options
-rw-r--r-- | src/core/census/grpc_filter.c | 69 | ||||
-rw-r--r-- | src/core/channel/channel_stack.c | 50 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 57 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 494 | ||||
-rw-r--r-- | src/core/channel/client_uchannel.c | 355 | ||||
-rw-r--r-- | src/core/channel/compress_filter.c | 293 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 29 | ||||
-rw-r--r-- | src/core/channel/connected_channel.h | 2 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 113 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 148 | ||||
-rw-r--r-- | src/core/channel/noop_filter.c | 22 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.c | 282 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.h | 83 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 37 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 40 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 17 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 19 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 133 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 23 |
19 files changed, 1085 insertions, 1181 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index 872543057e..daa1a7ac61 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -53,28 +53,24 @@ typedef struct call_data { int error; /* recv callback */ - grpc_stream_op_buffer *recv_ops; + grpc_metadata_batch *recv_initial_metadata; grpc_closure *on_done_recv; + grpc_closure finish_recv; } call_data; typedef struct channel_data { grpc_mdstr *path_str; /* pointer to meta data str with key == ":path" */ } channel_data; -static void extract_and_annotate_method_tag(grpc_stream_op_buffer *sopb, +static void extract_and_annotate_method_tag(grpc_metadata_batch *md, call_data *calld, channel_data *chand) { grpc_linked_mdelem *m; - size_t i; - for (i = 0; i < sopb->nops; i++) { - grpc_stream_op *op = &sopb->ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - for (m = op->data.metadata.list.head; m != NULL; m = m->next) { - if (m->md->key == chand->path_str) { - gpr_log(GPR_DEBUG, "%s", - (const char *)GPR_SLICE_START_PTR(m->md->value->slice)); - /* Add method tag here */ - } + for (m = md->list.head; m != NULL; m = m->next) { + if (m->md->key == chand->path_str) { + gpr_log(GPR_DEBUG, "%s", + (const char *)GPR_SLICE_START_PTR(m->md->value->slice)); + /* Add method tag here */ } } } @@ -83,8 +79,8 @@ static void client_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (op->send_ops) { - extract_and_annotate_method_tag(op->send_ops, calld, chand); + if (op->send_initial_metadata) { + extract_and_annotate_method_tag(op->send_initial_metadata, calld, chand); } } @@ -101,7 +97,7 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (success) { - extract_and_annotate_method_tag(calld->recv_ops, calld, chand); + extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand); } calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); } @@ -109,11 +105,11 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, static void server_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; - if (op->recv_ops) { + if (op->recv_initial_metadata) { /* substitute our callback for the op callback */ - calld->recv_ops = op->recv_ops; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = calld->on_done_recv; + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->on_done_recv = op->on_complete; + op->on_complete = &calld->finish_recv; } } @@ -128,13 +124,11 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx, static void client_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); memset(d, 0, sizeof(*d)); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); - if (initial_op) client_mutate_op(elem, initial_op); } static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, @@ -146,15 +140,13 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void server_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); memset(d, 0, sizeof(*d)); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); /* TODO(hongyu): call census_tracing_start_op here. */ - grpc_closure_init(d->on_done_recv, server_on_done_recv, elem); - if (initial_op) server_mutate_op(elem, initial_op); + grpc_closure_init(&d->finish_recv, server_on_done_recv, elem); } static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, @@ -165,12 +157,11 @@ static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, } static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; GPR_ASSERT(chand != NULL); - chand->path_str = grpc_mdstr_from_string(mdctx, ":path"); + chand->path_str = grpc_mdstr_from_string(args->metadata_context, ":path"); } static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, @@ -183,15 +174,13 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_client_census_filter = { - client_start_transport_op, grpc_channel_next_op, - sizeof(call_data), client_init_call_elem, - client_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, "census-client"}; + client_start_transport_op, grpc_channel_next_op, sizeof(call_data), + client_init_call_elem, grpc_call_stack_ignore_set_pollset, + client_destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, grpc_call_next_get_peer, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { - server_start_transport_op, grpc_channel_next_op, - sizeof(call_data), server_init_call_elem, - server_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, "census-server"}; + server_start_transport_op, grpc_channel_next_op, sizeof(call_data), + server_init_call_elem, grpc_call_stack_ignore_set_pollset, + server_destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, grpc_call_next_get_peer, "census-server"}; diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index abd7f719e7..02e33a09ab 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -104,13 +104,14 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, const grpc_channel_filter **filters, size_t filter_count, grpc_channel *master, - const grpc_channel_args *args, + const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); grpc_channel_element *elems; + grpc_channel_element_args args; char *user_data; size_t i; @@ -122,11 +123,14 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, /* init per-filter data */ for (i = 0; i < filter_count; i++) { + args.master = master; + args.channel_args = channel_args; + args.metadata_context = metadata_context; + args.is_first = i == 0; + args.is_last = i == (filter_count - 1); elems[i].filter = filters[i]; elems[i].channel_data = user_data; - elems[i].filter->init_channel_elem(exec_ctx, &elems[i], master, args, - metadata_context, i == 0, - i == (filter_count - 1)); + elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); } @@ -151,33 +155,63 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, } void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, + grpc_channel_stack *channel_stack, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, + grpc_call_context_element *context, const void *transport_server_data, - grpc_transport_stream_op *initial_op, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); + grpc_call_element_args args; size_t count = channel_stack->count; grpc_call_element *call_elems; char *user_data; size_t i; call_stack->count = count; + gpr_ref_init(&call_stack->refcount.refs, initial_refs); + grpc_closure_init(&call_stack->refcount.destroy, destroy, destroy_arg); call_elems = CALL_ELEMS_FROM_STACK(call_stack); user_data = ((char *)call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); /* init per-filter data */ for (i = 0; i < count; i++) { + args.refcount = &call_stack->refcount; + args.server_transport_data = transport_server_data; + args.context = context; call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; - call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], - transport_server_data, initial_op); + call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); } } +void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_call_stack *call_stack, + grpc_pollset *pollset) { + size_t count = call_stack->count; + grpc_call_element *call_elems; + char *user_data; + size_t i; + + call_elems = CALL_ELEMS_FROM_STACK(call_stack); + user_data = ((char *)call_elems) + + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); + + /* init per-filter data */ + for (i = 0; i < count; i++) { + call_elems[i].filter->set_pollset(exec_ctx, &call_elems[i], pollset); + user_data += + ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); + } +} + +void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_pollset *pollset) {} + void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) { grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); size_t count = stack->count; diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 6732cc3018..1279fec080 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,6 +51,20 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; +typedef struct { + grpc_channel *master; + const grpc_channel_args *channel_args; + grpc_mdctx *metadata_context; + int is_first; + int is_last; +} grpc_channel_element_args; + +typedef struct { + grpc_stream_refcount *refcount; + const void *server_transport_data; + grpc_call_context_element *context; +} grpc_call_element_args; + /* Channel filters specify: 1. the amount of memory needed in the channel & call (via the sizeof_XXX members) @@ -84,8 +98,9 @@ typedef struct { transport and is on the server. Most filters want to ignore this argument. */ void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op); + grpc_call_element_args *args); + void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_pollset *pollset); /* Destroy per call data. The filter does not need to do any chaining */ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); @@ -99,9 +114,7 @@ typedef struct { useful for asserting correct configuration by upper layer code. The filter does not need to do any chaining */ void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_channel *master, const grpc_channel_args *args, - grpc_mdctx *metadata_context, int is_first, - int is_last); + grpc_channel_element_args *args); /* Destroy per channel data. The filter does not need to do any chaining */ void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx, @@ -141,7 +154,14 @@ typedef struct { /* A call stack tracks a set of related filters for one call, and guarantees they live within a single malloc() allocation */ -typedef struct { size_t count; } grpc_call_stack; +typedef struct { + /* shared refcount for this channel stack. + MUST be the first element: the underlying code calls destroy + with the address of the refcount, but higher layers prefer to think + about the address of the call stack itself. */ + grpc_stream_refcount refcount; + size_t count; +} grpc_call_stack; /* Get a channel element given a channel stack and its index */ grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, @@ -170,13 +190,34 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, expected to be NULL on a client, or an opaque transport owned pointer on the server. */ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, + grpc_channel_stack *channel_stack, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, + grpc_call_context_element *context, const void *transport_server_data, - grpc_transport_stream_op *initial_op, grpc_call_stack *call_stack); +/* Set a pollset for a call stack: must occur before the first op is started */ +void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_call_stack *call_stack, + grpc_pollset *pollset); + +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#define grpc_call_stack_ref(call_stack, reason) \ + grpc_stream_ref(&(call_stack)->refcount, reason) +#define grpc_call_stack_unref(exec_ctx, call_stack, reason) \ + grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason) +#else +#define grpc_call_stack_ref(call_stack) grpc_stream_ref(&(call_stack)->refcount) +#define grpc_call_stack_unref(exec_ctx, call_stack) \ + grpc_stream_unref(exec_ctx, &(call_stack)->refcount) +#endif + /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); +/* Ignore set pollset */ +void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_pollset *pollset); /* Call the next operation in a call stack */ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9f85557ea1..16d91d4277 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -43,6 +43,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" +#include "src/core/channel/subchannel_call_holder.h" #include "src/core/iomgr/iomgr.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" @@ -51,7 +52,7 @@ /* Client channel implementation */ -typedef struct call_data call_data; +typedef grpc_subchannel_call_holder call_data; typedef struct client_channel_channel_data { /** metadata context for this channel */ @@ -98,360 +99,22 @@ typedef struct { grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; -typedef enum { - CALL_CREATED, - CALL_WAITING_FOR_SEND, - CALL_WAITING_FOR_CONFIG, - CALL_WAITING_FOR_PICK, - CALL_WAITING_FOR_CALL, - CALL_ACTIVE, - CALL_CANCELLED -} call_state; - -struct call_data { - /* owning element */ - grpc_call_element *elem; - - gpr_mu mu_state; - - call_state state; - gpr_timespec deadline; - grpc_subchannel *picked_channel; - grpc_closure async_setup_task; - grpc_transport_stream_op waiting_op; - /* our child call stack */ - grpc_subchannel_call *subchannel_call; - grpc_linked_mdelem status; - grpc_linked_mdelem details; -}; - -static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, - grpc_transport_stream_op *new_op) - GRPC_MUST_USE_RESULT; - -static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - if (op->send_ops) { - grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0); - } - if (op->recv_ops) { - char status[GPR_LTOA_MIN_BUFSIZE]; - grpc_metadata_batch mdb; - gpr_ltoa(GRPC_STATUS_CANCELLED, status); - calld->status.md = - grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); - calld->details.md = - grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); - calld->status.prev = calld->details.next = NULL; - calld->status.next = &calld->details; - calld->details.prev = &calld->status; - mdb.list.head = &calld->status; - mdb.list.tail = &calld->details; - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - grpc_sopb_add_metadata(op->recv_ops, mdb); - *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1); - } - if (op->on_consumed) { - op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0); - } -} - typedef struct { grpc_closure closure; grpc_call_element *elem; } waiting_call; -static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op, - int continuation); - -static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { - waiting_call *wc = arg; - call_data *calld = wc->elem->call_data; - perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1); - gpr_free(wc); -} - -static void add_to_lb_policy_wait_queue_locked_state_config( - grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - waiting_call *wc = gpr_malloc(sizeof(*wc)); - grpc_closure_init(&wc->closure, continue_with_pick, wc); - wc->elem = elem; - grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1); -} - -static int is_empty(void *p, int len) { - char *ptr = p; - int i; - for (i = 0; i < len; i++) { - if (ptr[i] != 0) return 0; - } - return 1; -} - -static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { - call_data *calld = arg; - grpc_transport_stream_op op; - int have_waiting; - - if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { - memset(&op, 0, sizeof(op)); - op.cancel_with_status = GRPC_STATUS_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op); - } else if (calld->state == CALL_WAITING_FOR_CALL) { - have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); - if (calld->subchannel_call != NULL) { - calld->state = CALL_ACTIVE; - gpr_mu_unlock(&calld->mu_state); - if (have_waiting) { - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, - &calld->waiting_op); - } - } else { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - if (have_waiting) { - handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); - } - } - } else { - GPR_ASSERT(calld->state == CALL_CANCELLED); - gpr_mu_unlock(&calld->mu_state); - } -} - -static void started_call(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { - call_data *calld = arg; - gpr_mu_lock(&calld->mu_state); - started_call_locked(exec_ctx, arg, iomgr_success); -} - -static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { - call_data *calld = arg; - grpc_pollset *pollset; - grpc_subchannel_call_create_status call_creation_status; - - GPR_TIMER_BEGIN("picked_target", 0); - - if (calld->picked_channel == NULL) { - /* treat this like a cancellation */ - calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; - perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1); - } else { - gpr_mu_lock(&calld->mu_state); - if (calld->state == CALL_CANCELLED) { - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); - } else { - GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); - calld->state = CALL_WAITING_FOR_CALL; - pollset = calld->waiting_op.bind_pollset; - grpc_closure_init(&calld->async_setup_task, started_call, calld); - call_creation_status = grpc_subchannel_create_call( - exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, - &calld->async_setup_task); - if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) { - started_call_locked(exec_ctx, calld, iomgr_success); - } else { - gpr_mu_unlock(&calld->mu_state); - } - } - } - - GPR_TIMER_END("picked_target", 0); -} - -static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, - grpc_transport_stream_op *new_op) { - call_data *calld = elem->call_data; - grpc_closure *consumed_op = NULL; - grpc_transport_stream_op *waiting_op = &calld->waiting_op; - GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); - GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); - if (new_op->send_ops != NULL) { - waiting_op->send_ops = new_op->send_ops; - waiting_op->is_last_send = new_op->is_last_send; - waiting_op->on_done_send = new_op->on_done_send; - } - if (new_op->recv_ops != NULL) { - waiting_op->recv_ops = new_op->recv_ops; - waiting_op->recv_state = new_op->recv_state; - waiting_op->on_done_recv = new_op->on_done_recv; - } - if (new_op->on_consumed != NULL) { - if (waiting_op->on_consumed != NULL) { - consumed_op = waiting_op->on_consumed; - } - waiting_op->on_consumed = new_op->on_consumed; - } - if (new_op->cancel_with_status != GRPC_STATUS_OK) { - waiting_op->cancel_with_status = new_op->cancel_with_status; - } - return consumed_op; -} - static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_subchannel_call *subchannel_call; - char *result; - - gpr_mu_lock(&calld->mu_state); - if (calld->state == CALL_ACTIVE) { - subchannel_call = calld->subchannel_call; - GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); - gpr_mu_unlock(&calld->mu_state); - result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer"); - return result; - } else { - gpr_mu_unlock(&calld->mu_state); - return grpc_channel_get_target(chand->master); - } -} - -static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op, - int continuation) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - grpc_subchannel_call *subchannel_call; - grpc_lb_policy *lb_policy; - grpc_transport_stream_op op2; - GPR_TIMER_BEGIN("perform_transport_stream_op", 0); - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - gpr_mu_lock(&calld->mu_state); - switch (calld->state) { - case CALL_ACTIVE: - GPR_ASSERT(!continuation); - subchannel_call = calld->subchannel_call; - gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op); - break; - case CALL_CANCELLED: - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(exec_ctx, elem, op); - break; - case CALL_WAITING_FOR_SEND: - GPR_ASSERT(!continuation); - grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); - if (!calld->waiting_op.send_ops && - calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { - gpr_mu_unlock(&calld->mu_state); - break; - } - *op = calld->waiting_op; - memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); - continuation = 1; - /* fall through */ - case CALL_WAITING_FOR_CONFIG: - case CALL_WAITING_FOR_PICK: - case CALL_WAITING_FOR_CALL: - if (!continuation) { - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - op2 = calld->waiting_op; - memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); - if (op->on_consumed) { - calld->waiting_op.on_consumed = op->on_consumed; - op->on_consumed = NULL; - } else if (op2.on_consumed) { - calld->waiting_op.on_consumed = op2.on_consumed; - op2.on_consumed = NULL; - } - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(exec_ctx, elem, op); - handle_op_after_cancellation(exec_ctx, elem, &op2); - } else { - grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); - gpr_mu_unlock(&calld->mu_state); - } - break; - } - /* fall through */ - case CALL_CREATED: - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(exec_ctx, elem, op); - } else { - calld->waiting_op = *op; - - if (op->send_ops == NULL) { - /* need to have some send ops before we can select the - lb target */ - calld->state = CALL_WAITING_FOR_SEND; - gpr_mu_unlock(&calld->mu_state); - } else { - gpr_mu_lock(&chand->mu_config); - lb_policy = chand->lb_policy; - if (lb_policy) { - grpc_transport_stream_op *waiting_op = &calld->waiting_op; - grpc_pollset *bind_pollset = waiting_op->bind_pollset; - grpc_metadata_batch *initial_metadata = - &waiting_op->send_ops->ops[0].data.metadata; - GRPC_LB_POLICY_REF(lb_policy, "pick"); - gpr_mu_unlock(&chand->mu_config); - calld->state = CALL_WAITING_FOR_PICK; - - GPR_ASSERT(waiting_op->bind_pollset); - GPR_ASSERT(waiting_op->send_ops); - GPR_ASSERT(waiting_op->send_ops->nops >= 1); - GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA); - gpr_mu_unlock(&calld->mu_state); - - grpc_closure_init(&calld->async_setup_task, picked_target, calld); - grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset, - initial_metadata, &calld->picked_channel, - &calld->async_setup_task); - - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick"); - } else if (chand->resolver != NULL) { - calld->state = CALL_WAITING_FOR_CONFIG; - add_to_lb_policy_wait_queue_locked_state_config(elem); - if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); - chand->started_resolving = 1; - grpc_resolver_next(exec_ctx, chand->resolver, - &chand->incoming_configuration, - &chand->on_config_changed); - } - gpr_mu_unlock(&chand->mu_config); - gpr_mu_unlock(&calld->mu_state); - } else { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&chand->mu_config); - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(exec_ctx, elem, op); - } - } - } - break; - } - - GPR_TIMER_END("perform_transport_stream_op", 0); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, + chand->master); } static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { - perform_transport_stream_op(exec_ctx, elem, op, 0); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op); } static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, @@ -593,11 +256,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->connectivity_state = NULL; } - if (!is_empty(op, sizeof(*op))) { - lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "broadcast"); - } + lb_policy = chand->lb_policy; + if (lb_policy) { + GRPC_LB_POLICY_REF(lb_policy, "broadcast"); } if (op->disconnect && chand->resolver != NULL) { @@ -624,67 +285,110 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } } -/* Constructor for call_data */ -static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { +typedef struct { + grpc_metadata_batch *initial_metadata; + grpc_subchannel **subchannel; + grpc_closure *on_ready; + grpc_call_element *elem; + grpc_closure closure; +} continue_picking_args; + +static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **subchannel, + grpc_closure *on_ready); + +static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) { + continue_picking_args *cpa = arg; + if (!success) { + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); + } else if (cpa->subchannel == NULL) { + /* cancelled, do nothing */ + } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->subchannel, cpa->on_ready)) { + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1); + } + gpr_free(cpa); +} + +static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **subchannel, + grpc_closure *on_ready) { + grpc_call_element *elem = elemp; + channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + continue_picking_args *cpa; + grpc_closure *closure; - /* TODO(ctiller): is there something useful we can do here? */ - GPR_ASSERT(initial_op == NULL); + GPR_ASSERT(subchannel); - GPR_ASSERT(elem->filter == &grpc_client_channel_filter); - GPR_ASSERT(server_transport_data == NULL); - gpr_mu_init(&calld->mu_state); - calld->elem = elem; - calld->state = CALL_CREATED; - calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + gpr_mu_lock(&chand->mu_config); + if (initial_metadata == NULL) { + if (chand->lb_policy != NULL) { + grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel); + } + for (closure = chand->waiting_for_config_closures.head; closure != NULL; + closure = grpc_closure_next(closure)) { + cpa = closure->cb_arg; + if (cpa->subchannel == subchannel) { + cpa->subchannel = NULL; + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0); + } + } + gpr_mu_unlock(&chand->mu_config); + return 1; + } + if (chand->lb_policy != NULL) { + int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset, + initial_metadata, subchannel, on_ready); + gpr_mu_unlock(&chand->mu_config); + return r; + } + if (chand->resolver != NULL && !chand->started_resolving) { + chand->started_resolving = 1; + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + } + cpa = gpr_malloc(sizeof(*cpa)); + cpa->initial_metadata = initial_metadata; + cpa->subchannel = subchannel; + cpa->on_ready = on_ready; + cpa->elem = elem; + grpc_closure_init(&cpa->closure, continue_picking, cpa); + grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1); + gpr_mu_unlock(&chand->mu_config); + return 0; +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_call_element_args *args) { + grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem); } /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - call_data *calld = elem->call_data; - grpc_subchannel_call *subchannel_call; - - /* if the call got activated, we need to destroy the child stack also, and - remove it from the in-flight requests tracked by the child_entry we - picked */ - gpr_mu_lock(&calld->mu_state); - switch (calld->state) { - case CALL_ACTIVE: - subchannel_call = calld->subchannel_call; - gpr_mu_unlock(&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel"); - break; - case CALL_CREATED: - case CALL_CANCELLED: - gpr_mu_unlock(&calld->mu_state); - break; - case CALL_WAITING_FOR_PICK: - case CALL_WAITING_FOR_CONFIG: - case CALL_WAITING_FOR_CALL: - case CALL_WAITING_FOR_SEND: - GPR_UNREACHABLE_CODE(return ); - } + grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); } /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, int is_first, - int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; memset(chand, 0, sizeof(*chand)); - GPR_ASSERT(is_last); + GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); gpr_mu_init(&chand->mu_config); - chand->mdctx = metadata_context; - chand->master = master; + chand->mdctx = args->metadata_context; + chand->master = args->master; grpc_pollset_set_init(&chand->pollset_set); grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); @@ -709,10 +413,16 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, gpr_mu_destroy(&chand->mu_config); } +static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_pollset *pollset) { + call_data *calld = elem->call_data; + calld->pollset = pollset; +} + const grpc_channel_filter grpc_client_channel_filter = { cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, cc_get_peer, "client-channel", + init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel", }; void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 510677a844..ec6b02381a 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -39,6 +39,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/compress_filter.h" +#include "src/core/channel/subchannel_call_holder.h" #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include "src/core/surface/channel.h" @@ -52,8 +53,6 @@ /** Microchannel (uchannel) implementation: a lightweight channel without any * load-balancing mechanisms meant for communication from within the core. */ -typedef struct call_data call_data; - typedef struct client_uchannel_channel_data { /** metadata context for this channel */ grpc_mdctx *mdctx; @@ -80,85 +79,7 @@ typedef struct client_uchannel_channel_data { gpr_mu mu_state; } channel_data; -typedef enum { - CALL_CREATED, - CALL_WAITING_FOR_SEND, - CALL_WAITING_FOR_CALL, - CALL_ACTIVE, - CALL_CANCELLED -} call_state; - -struct call_data { - /* owning element */ - grpc_call_element *elem; - - gpr_mu mu_state; - - call_state state; - gpr_timespec deadline; - grpc_closure async_setup_task; - grpc_transport_stream_op waiting_op; - /* our child call stack */ - grpc_subchannel_call *subchannel_call; - grpc_linked_mdelem status; - grpc_linked_mdelem details; -}; - -static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, - grpc_transport_stream_op *new_op) - GRPC_MUST_USE_RESULT; - -static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - if (op->send_ops) { - grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0); - } - if (op->recv_ops) { - char status[GPR_LTOA_MIN_BUFSIZE]; - grpc_metadata_batch mdb; - gpr_ltoa(GRPC_STATUS_CANCELLED, status); - calld->status.md = - grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); - calld->details.md = - grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); - calld->status.prev = calld->details.next = NULL; - calld->status.next = &calld->details; - calld->details.prev = &calld->status; - mdb.list.head = &calld->status; - mdb.list.tail = &calld->details; - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - grpc_sopb_add_metadata(op->recv_ops, mdb); - *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1); - } - if (op->on_consumed) { - op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0); - } -} - -typedef struct { - grpc_closure closure; - grpc_call_element *elem; -} waiting_call; - -static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op, - int continuation); - -static int is_empty(void *p, int len) { - char *ptr = p; - int i; - for (i = 0; i < len; i++) { - if (ptr[i] != 0) return 0; - } - return 1; -} +typedef grpc_subchannel_call_holder call_data; static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { @@ -171,201 +92,17 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, &chand->connectivity_cb); } -static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { - call_data *calld = arg; - grpc_transport_stream_op op; - int have_waiting; - - if (calld->state == CALL_CANCELLED && iomgr_success == 0) { - have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); - gpr_mu_unlock(&calld->mu_state); - if (have_waiting) { - handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); - } - } else if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { - memset(&op, 0, sizeof(op)); - op.cancel_with_status = GRPC_STATUS_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op); - } else if (calld->state == CALL_WAITING_FOR_CALL) { - have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); - if (calld->subchannel_call != NULL) { - calld->state = CALL_ACTIVE; - gpr_mu_unlock(&calld->mu_state); - if (have_waiting) { - grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, - &calld->waiting_op); - } - } else { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - if (have_waiting) { - handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); - } - } - } else { - GPR_ASSERT(calld->state == CALL_CANCELLED); - gpr_mu_unlock(&calld->mu_state); - have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); - if (have_waiting) { - handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op); - } - } -} - -static void started_call(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { - call_data *calld = arg; - gpr_mu_lock(&calld->mu_state); - started_call_locked(exec_ctx, arg, iomgr_success); -} - -static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, - grpc_transport_stream_op *new_op) { - call_data *calld = elem->call_data; - grpc_closure *consumed_op = NULL; - grpc_transport_stream_op *waiting_op = &calld->waiting_op; - GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1); - GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1); - if (new_op->send_ops != NULL) { - waiting_op->send_ops = new_op->send_ops; - waiting_op->is_last_send = new_op->is_last_send; - waiting_op->on_done_send = new_op->on_done_send; - } - if (new_op->recv_ops != NULL) { - waiting_op->recv_ops = new_op->recv_ops; - waiting_op->recv_state = new_op->recv_state; - waiting_op->on_done_recv = new_op->on_done_recv; - } - if (new_op->on_consumed != NULL) { - if (waiting_op->on_consumed != NULL) { - consumed_op = waiting_op->on_consumed; - } - waiting_op->on_consumed = new_op->on_consumed; - } - if (new_op->cancel_with_status != GRPC_STATUS_OK) { - waiting_op->cancel_with_status = new_op->cancel_with_status; - } - return consumed_op; -} - static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - grpc_subchannel_call *subchannel_call; - char *result; - - gpr_mu_lock(&calld->mu_state); - if (calld->state == CALL_ACTIVE) { - subchannel_call = calld->subchannel_call; - GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); - gpr_mu_unlock(&calld->mu_state); - result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer"); - return result; - } else { - gpr_mu_unlock(&calld->mu_state); - return grpc_channel_get_target(chand->master); - } -} - -static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op, - int continuation) { - call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_subchannel_call *subchannel_call; - grpc_transport_stream_op op2; - GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - gpr_mu_lock(&calld->mu_state); - /* make sure the wrapped subchannel has been set (see - * grpc_client_uchannel_set_subchannel) */ - GPR_ASSERT(chand->subchannel != NULL); - - switch (calld->state) { - case CALL_ACTIVE: - GPR_ASSERT(!continuation); - subchannel_call = calld->subchannel_call; - gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op); - break; - case CALL_CANCELLED: - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(exec_ctx, elem, op); - break; - case CALL_WAITING_FOR_SEND: - GPR_ASSERT(!continuation); - grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); - if (!calld->waiting_op.send_ops && - calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { - gpr_mu_unlock(&calld->mu_state); - break; - } - *op = calld->waiting_op; - memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); - continuation = 1; - /* fall through */ - case CALL_WAITING_FOR_CALL: - if (!continuation) { - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - op2 = calld->waiting_op; - memset(&calld->waiting_op, 0, sizeof(calld->waiting_op)); - if (op->on_consumed) { - calld->waiting_op.on_consumed = op->on_consumed; - op->on_consumed = NULL; - } else if (op2.on_consumed) { - calld->waiting_op.on_consumed = op2.on_consumed; - op2.on_consumed = NULL; - } - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(exec_ctx, elem, op); - handle_op_after_cancellation(exec_ctx, elem, &op2); - grpc_subchannel_cancel_waiting_call(exec_ctx, chand->subchannel, 1); - } else { - grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1); - gpr_mu_unlock(&calld->mu_state); - } - break; - } - /* fall through */ - case CALL_CREATED: - if (op->cancel_with_status != GRPC_STATUS_OK) { - calld->state = CALL_CANCELLED; - gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(exec_ctx, elem, op); - } else { - calld->waiting_op = *op; - if (op->send_ops == NULL) { - calld->state = CALL_WAITING_FOR_SEND; - gpr_mu_unlock(&calld->mu_state); - } else { - grpc_subchannel_call_create_status call_creation_status; - grpc_pollset *pollset = calld->waiting_op.bind_pollset; - calld->state = CALL_WAITING_FOR_CALL; - grpc_closure_init(&calld->async_setup_task, started_call, calld); - call_creation_status = grpc_subchannel_create_call( - exec_ctx, chand->subchannel, pollset, &calld->subchannel_call, - &calld->async_setup_task); - if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) { - started_call_locked(exec_ctx, calld, 1); - } else { - gpr_mu_unlock(&calld->mu_state); - } - } - } - break; - } + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, + chand->master); } static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { - perform_transport_stream_op(exec_ctx, elem, op, 0); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op); } static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -392,64 +129,40 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, } } +static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **subchannel, + grpc_closure *on_ready) { + channel_data *chand = arg; + GPR_ASSERT(initial_metadata != NULL); + *subchannel = chand->subchannel; + return 1; +} + /* Constructor for call_data */ static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { - call_data *calld = elem->call_data; - memset(calld, 0, sizeof(call_data)); - - /* TODO(ctiller): is there something useful we can do here? */ - GPR_ASSERT(initial_op == NULL); - - GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - GPR_ASSERT(server_transport_data == NULL); - gpr_mu_init(&calld->mu_state); - calld->elem = elem; - calld->state = CALL_CREATED; - calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_call_element_args *args) { + grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel, + elem->channel_data); } /* Destructor for call_data */ static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - call_data *calld = elem->call_data; - grpc_subchannel_call *subchannel_call; - - /* if the call got activated, we need to destroy the child stack also, and - remove it from the in-flight requests tracked by the child_entry we - picked */ - gpr_mu_lock(&calld->mu_state); - switch (calld->state) { - case CALL_ACTIVE: - subchannel_call = calld->subchannel_call; - gpr_mu_unlock(&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_uchannel"); - break; - case CALL_CREATED: - case CALL_CANCELLED: - gpr_mu_unlock(&calld->mu_state); - break; - case CALL_WAITING_FOR_CALL: - case CALL_WAITING_FOR_SEND: - GPR_UNREACHABLE_CODE(return ); - } + grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); } /* Constructor for channel_data */ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_channel *master, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, int is_first, - int is_last) { + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; memset(chand, 0, sizeof(*chand)); grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); - GPR_ASSERT(is_last); + GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - chand->mdctx = metadata_context; - chand->master = master; + chand->mdctx = args->metadata_context; + chand->master = args->master; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_uchannel"); gpr_mu_init(&chand->mu_state); @@ -465,17 +178,17 @@ static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, gpr_mu_destroy(&chand->mu_state); } +static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_pollset *pollset) { + call_data *calld = elem->call_data; + calld->pollset = pollset; +} + const grpc_channel_filter grpc_client_uchannel_filter = { - cuc_start_transport_stream_op, - cuc_start_transport_op, - sizeof(call_data), - cuc_init_call_elem, - cuc_destroy_call_elem, - sizeof(channel_data), - cuc_init_channel_elem, - cuc_destroy_channel_elem, - cuc_get_peer, - "client-uchannel", + cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data), + cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem, + sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem, + cuc_get_peer, "client-uchannel", }; grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 20b5084044..f23d8052f3 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -50,13 +50,20 @@ typedef struct call_data { grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; gpr_uint32 remaining_slice_bytes; - /**< Input data to be read, as per BEGIN_MESSAGE */ - int written_initial_metadata; /**< Already processed initial md? */ /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ grpc_compression_algorithm compression_algorithm; /** If true, contents of \a compression_algorithm are authoritative */ int has_compression_algorithm; + + grpc_transport_stream_op send_op; + gpr_uint32 send_length; + gpr_uint32 send_flags; + gpr_slice incoming_slice; + grpc_slice_buffer_stream replacement_stream; + grpc_closure *post_send; + grpc_closure send_done; + grpc_closure got_slice; } call_data; typedef struct channel_data { @@ -76,24 +83,6 @@ typedef struct channel_data { grpc_compression_options compression_options; } channel_data; -/** Compress \a slices in place using \a algorithm. Returns 1 if compression did - * actually happen, 0 otherwise (for example if the compressed output size was - * larger than the raw input). - * - * Returns 1 if the data was actually compress and 0 otherwise. */ -static int compress_send_sb(grpc_compression_algorithm algorithm, - gpr_slice_buffer *slices) { - int did_compress; - gpr_slice_buffer tmp; - gpr_slice_buffer_init(&tmp); - did_compress = grpc_msg_compress(algorithm, slices, &tmp); - if (did_compress) { - gpr_slice_buffer_swap(slices, &tmp); - } - gpr_slice_buffer_destroy(&tmp); - return did_compress; -} - /** For each \a md element from the incoming metadata, filter out the entry for * "grpc-encoding", using its value to populate the call data's * compression_algorithm field. */ @@ -127,7 +116,9 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { return md; } -static int skip_compression(channel_data *channeld, call_data *calld) { +static int skip_compression(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; if (calld->has_compression_algorithm) { if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { return 1; @@ -138,169 +129,127 @@ static int skip_compression(channel_data *channeld, call_data *calld) { return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; } -/** Assembles a new grpc_stream_op_buffer with the compressed slices, modifying - * the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length, - * flags indicating compression is in effect) and replaces \a send_ops with it. - * */ -static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, - grpc_call_element *elem) { - size_t i; +/** Filter initial metadata */ +static void process_send_initial_metadata( + grpc_call_element *elem, grpc_metadata_batch *initial_metadata) { call_data *calld = elem->call_data; - int new_slices_added = 0; /* GPR_FALSE */ - grpc_metadata_batch metadata; - grpc_stream_op_buffer new_send_ops; - grpc_sopb_init(&new_send_ops); - - for (i = 0; i < send_ops->nops; i++) { - grpc_stream_op *sop = &send_ops->ops[i]; - switch (sop->type) { - case GRPC_OP_BEGIN_MESSAGE: - GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX); - grpc_sopb_add_begin_message( - &new_send_ops, (gpr_uint32)calld->slices.length, - sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); - break; - case GRPC_OP_SLICE: - /* Once we reach the slices section of the original buffer, simply add - * all the new (compressed) slices. We obviously want to do this only - * once, hence the "new_slices_added" guard. */ - if (!new_slices_added) { - size_t j; - for (j = 0; j < calld->slices.count; ++j) { - grpc_sopb_add_slice(&new_send_ops, - gpr_slice_ref(calld->slices.slices[j])); - } - new_slices_added = 1; /* GPR_TRUE */ - } - break; - case GRPC_OP_METADATA: - /* move the metadata to the new buffer. */ - grpc_metadata_batch_move(&metadata, &sop->data.metadata); - grpc_sopb_add_metadata(&new_send_ops, metadata); - break; - case GRPC_NO_OP: - break; - } + channel_data *channeld = elem->channel_data; + /* Parse incoming request for compression. If any, it'll be available + * at calld->compression_algorithm */ + grpc_metadata_batch_filter(initial_metadata, compression_md_filter, elem); + if (!calld->has_compression_algorithm) { + /* If no algorithm was found in the metadata and we aren't + * exceptionally skipping compression, fall back to the channel + * default */ + calld->compression_algorithm = channeld->default_compression_algorithm; + calld->has_compression_algorithm = 1; /* GPR_TRUE */ } - grpc_sopb_swap(send_ops, &new_send_ops); - grpc_sopb_destroy(&new_send_ops); + /* hint compression algorithm */ + grpc_metadata_batch_add_tail( + initial_metadata, &calld->compression_algorithm_storage, + GRPC_MDELEM_REF( + channeld + ->mdelem_compression_algorithms[calld->compression_algorithm])); + + /* convey supported compression algorithms */ + grpc_metadata_batch_add_tail( + initial_metadata, &calld->accept_encoding_storage, + GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); } -/** Filter's "main" function, called for any incoming grpc_transport_stream_op - * instance that holds a non-zero number of send operations, accesible to this - * function in \a send_ops. */ -static void process_send_ops(grpc_call_element *elem, - grpc_stream_op_buffer *send_ops) { - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - size_t i; - int did_compress = 0; +static void continue_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem); - /* In streaming calls, we need to reset the previously accumulated slices */ +static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, int success) { + grpc_call_element *elem = elemp; + call_data *calld = elem->call_data; gpr_slice_buffer_reset_and_unref(&calld->slices); - for (i = 0; i < send_ops->nops; ++i) { - grpc_stream_op *sop = &send_ops->ops[i]; - switch (sop->type) { - case GRPC_OP_BEGIN_MESSAGE: - /* buffer up slices until we've processed all the expected ones (as - * given by GRPC_OP_BEGIN_MESSAGE) */ - calld->remaining_slice_bytes = sop->data.begin_message.length; - if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) { - calld->has_compression_algorithm = 1; /* GPR_TRUE */ - calld->compression_algorithm = GRPC_COMPRESS_NONE; - } - break; - case GRPC_OP_METADATA: - if (!calld->written_initial_metadata) { - /* Parse incoming request for compression. If any, it'll be available - * at calld->compression_algorithm */ - grpc_metadata_batch_filter(&(sop->data.metadata), - compression_md_filter, elem); - if (!calld->has_compression_algorithm) { - /* If no algorithm was found in the metadata and we aren't - * exceptionally skipping compression, fall back to the channel - * default */ - calld->compression_algorithm = - channeld->default_compression_algorithm; - calld->has_compression_algorithm = 1; /* GPR_TRUE */ - } - /* hint compression algorithm */ - grpc_metadata_batch_add_tail( - &(sop->data.metadata), &calld->compression_algorithm_storage, - GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms - [calld->compression_algorithm])); - - /* convey supported compression algorithms */ - grpc_metadata_batch_add_tail( - &(sop->data.metadata), &calld->accept_encoding_storage, - GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); - - calld->written_initial_metadata = 1; /* GPR_TRUE */ - } - break; - case GRPC_OP_SLICE: - if (skip_compression(channeld, calld)) continue; - GPR_ASSERT(calld->remaining_slice_bytes > 0); - /* Increase input ref count, gpr_slice_buffer_add takes ownership. */ - gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice)); - GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) <= - calld->remaining_slice_bytes); - calld->remaining_slice_bytes -= - (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice); - if (calld->remaining_slice_bytes == 0) { - did_compress = - compress_send_sb(calld->compression_algorithm, &calld->slices); - } - break; - case GRPC_NO_OP: - break; - } - } + calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, success); +} - /* Modify the send_ops stream_op_buffer depending on whether compression was - * carried out */ +static void finish_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = elem->call_data; + int did_compress; + gpr_slice_buffer tmp; + gpr_slice_buffer_init(&tmp); + did_compress = + grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { - finish_compressed_sopb(send_ops, elem); + gpr_slice_buffer_swap(&calld->slices, &tmp); + calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } + gpr_slice_buffer_destroy(&tmp); + + grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, + calld->send_flags); + calld->send_op.send_message = &calld->replacement_stream.base; + calld->post_send = calld->send_op.on_complete; + calld->send_op.on_complete = &calld->send_done; + + grpc_call_next_op(exec_ctx, elem, &calld->send_op); +} + +static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, int success) { + grpc_call_element *elem = elemp; + call_data *calld = elem->call_data; + gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + if (calld->send_length == calld->slices.length) { + finish_send_message(exec_ctx, elem); + } else { + continue_send_message(exec_ctx, elem); + } +} + +static void continue_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = elem->call_data; + while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, + &calld->incoming_slice, ~(size_t)0, + &calld->got_slice)) { + gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + if (calld->send_length == calld->slices.length) { + finish_send_message(exec_ctx, elem); + break; + } } } -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0); - if (op->send_ops && op->send_ops->nops > 0) { - process_send_ops(elem, op->send_ops); + if (op->send_initial_metadata) { + process_send_initial_metadata(elem, op->send_initial_metadata); + } + if (op->send_message != NULL && !skip_compression(elem) && + 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) { + calld->send_op = *op; + calld->send_length = op->send_message->length; + calld->send_flags = op->send_message->flags; + continue_send_message(exec_ctx, elem); + } else { + /* pass control down the stack */ + grpc_call_next_op(exec_ctx, elem, op); } GPR_TIMER_END("compress_start_transport_stream_op", 0); - - /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); } /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ gpr_slice_buffer_init(&calld->slices); calld->has_compression_algorithm = 0; - calld->written_initial_metadata = 0; /* GPR_FALSE */ - - if (initial_op) { - if (initial_op->send_ops && initial_op->send_ops->nops > 0) { - process_send_ops(elem, initial_op->send_ops); - } - } + grpc_closure_init(&calld->got_slice, got_slice, elem); + grpc_closure_init(&calld->send_done, send_done, elem); } /* Destructor for call_data */ @@ -313,9 +262,8 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1]; @@ -325,24 +273,25 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_compression_options_init(&channeld->compression_options); channeld->compression_options.enabled_algorithms_bitset = - (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args); + (gpr_uint32)grpc_channel_args_compression_algorithm_get_states( + args->channel_args); channeld->default_compression_algorithm = - grpc_channel_args_get_compression_algorithm(args); + grpc_channel_args_get_compression_algorithm(args->channel_args); /* Make sure the default isn't disabled. */ GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( &channeld->compression_options, channeld->default_compression_algorithm)); channeld->compression_options.default_compression_algorithm = channeld->default_compression_algorithm; - channeld->mdstr_request_compression_algorithm_key = - grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY); + channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string( + args->metadata_context, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY); channeld->mdstr_outgoing_compression_algorithm_key = - grpc_mdstr_from_string(mdctx, "grpc-encoding"); + grpc_mdstr_from_string(args->metadata_context, "grpc-encoding"); channeld->mdstr_compression_capabilities_key = - grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); + grpc_mdstr_from_string(args->metadata_context, "grpc-accept-encoding"); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { char *algorithm_name; @@ -354,9 +303,9 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0); channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( - mdctx, + args->metadata_context, GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), - grpc_mdstr_from_string(mdctx, algorithm_name)); + grpc_mdstr_from_string(args->metadata_context, algorithm_name)); if (algo_idx > 0) { supported_algorithms_names[supported_algorithms_idx++] = algorithm_name; } @@ -369,11 +318,12 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, &accept_encoding_str_len); channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings( - mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), - grpc_mdstr_from_string(mdctx, accept_encoding_str)); + args->metadata_context, + GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), + grpc_mdstr_from_string(args->metadata_context, accept_encoding_str)); gpr_free(accept_encoding_str); - GPR_ASSERT(!is_last); + GPR_ASSERT(!args->is_last); } /* Destructor for channel data */ @@ -393,5 +343,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, const grpc_channel_filter grpc_compress_filter = { compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, grpc_call_next_get_peer, "compress"}; + init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "compress"}; diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 6d4d7be632..0e1efd965a 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -83,8 +83,7 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; int r; @@ -92,10 +91,18 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); r = grpc_transport_init_stream(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - server_transport_data, initial_op); + args->refcount, args->server_transport_data); GPR_ASSERT(r == 0); } +static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_pollset *pollset) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_transport_set_pollset(exec_ctx, chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollset); +} + /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -108,11 +115,10 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *cd = (channel_data *)elem->channel_data; - GPR_ASSERT(is_last); + GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; } @@ -132,8 +138,8 @@ static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { const grpc_channel_filter grpc_connected_channel_filter = { con_start_transport_stream_op, con_start_transport_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, con_get_peer, "connected", + init_call_elem, set_pollset, destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, con_get_peer, "connected", }; void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, @@ -154,3 +160,8 @@ void grpc_connected_channel_bind_transport(grpc_channel_stack *channel_stack, channel. */ channel_stack->call_stack_size += grpc_transport_stream_size(transport); } + +grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) { + call_data *calld = elem->call_data; + return TRANSPORT_STREAM_FROM_CALL_DATA(calld); +} diff --git a/src/core/channel/connected_channel.h b/src/core/channel/connected_channel.h index eac6eb7ebe..95c1834bfa 100644 --- a/src/core/channel/connected_channel.h +++ b/src/core/channel/connected_channel.h @@ -46,4 +46,6 @@ extern const grpc_channel_filter grpc_connected_channel_filter; void grpc_connected_channel_bind_transport(grpc_channel_stack* channel_stack, grpc_transport* transport); +grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem); + #endif /* GRPC_INTERNAL_CORE_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index f78a5cc315..3a0f68f30f 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -45,10 +45,8 @@ typedef struct call_data { grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; grpc_linked_mdelem user_agent; - int sent_initial_metadata; - int got_initial_metadata; - grpc_stream_op_buffer *recv_ops; + grpc_metadata_batch *recv_initial_metadata; /** Closure to call when finished with the hc_on_recv hook */ grpc_closure *on_done_recv; @@ -91,18 +89,11 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - size_t i; - size_t nops = calld->recv_ops->nops; - grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - client_recv_filter_args a; - if (op->type != GRPC_OP_METADATA) continue; - calld->got_initial_metadata = 1; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a); - } + client_recv_filter_args a; + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter, + &a); calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); } @@ -123,40 +114,29 @@ static void hc_mutate_op(grpc_call_element *elem, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - size_t i; - if (op->send_ops && !calld->sent_initial_metadata) { - size_t nops = op->send_ops->nops; - grpc_stream_op *ops = op->send_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *stream_op = &ops[i]; - if (stream_op->type != GRPC_OP_METADATA) continue; - calld->sent_initial_metadata = 1; - grpc_metadata_batch_filter(&stream_op->data.metadata, client_strip_filter, - elem); - /* Send : prefixed headers, which have to be before any application - layer headers. */ - grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->method, - GRPC_MDELEM_REF(channeld->method)); - grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->scheme, - GRPC_MDELEM_REF(channeld->scheme)); - grpc_metadata_batch_add_tail(&stream_op->data.metadata, - &calld->te_trailers, - GRPC_MDELEM_REF(channeld->te_trailers)); - grpc_metadata_batch_add_tail(&stream_op->data.metadata, - &calld->content_type, - GRPC_MDELEM_REF(channeld->content_type)); - grpc_metadata_batch_add_tail(&stream_op->data.metadata, - &calld->user_agent, - GRPC_MDELEM_REF(channeld->user_agent)); - break; - } + if (op->send_initial_metadata != NULL) { + grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter, + elem); + /* Send : prefixed headers, which have to be before any application + layer headers. */ + grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method, + GRPC_MDELEM_REF(channeld->method)); + grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme, + GRPC_MDELEM_REF(channeld->scheme)); + grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers, + GRPC_MDELEM_REF(channeld->te_trailers)); + grpc_metadata_batch_add_tail(op->send_initial_metadata, + &calld->content_type, + GRPC_MDELEM_REF(channeld->content_type)); + grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->user_agent, + GRPC_MDELEM_REF(channeld->user_agent)); } - if (op->recv_ops && !calld->got_initial_metadata) { + if (op->recv_initial_metadata != NULL) { /* substitute our callback for the higher callback */ - calld->recv_ops = op->recv_ops; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = &calld->hc_on_recv; + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->on_done_recv = op->on_complete; + op->on_complete = &calld->hc_on_recv; } } @@ -172,14 +152,10 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { call_data *calld = elem->call_data; - calld->sent_initial_metadata = 0; - calld->got_initial_metadata = 0; calld->on_done_recv = NULL; grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); - if (initial_op) hc_mutate_op(elem, initial_op); } /* Destructor for call_data */ @@ -250,28 +226,31 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *channel_args, - grpc_mdctx *mdctx, int is_first, int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down path */ - GPR_ASSERT(!is_last); + GPR_ASSERT(!args->is_last); /* initialize members */ - channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); - channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST"); - channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme", - scheme_from_args(channel_args)); - channeld->content_type = - grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); - channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); + channeld->te_trailers = + grpc_mdelem_from_strings(args->metadata_context, "te", "trailers"); + channeld->method = + grpc_mdelem_from_strings(args->metadata_context, ":method", "POST"); + channeld->scheme = grpc_mdelem_from_strings( + args->metadata_context, ":scheme", scheme_from_args(args->channel_args)); + channeld->content_type = grpc_mdelem_from_strings( + args->metadata_context, "content-type", "application/grpc"); + channeld->status = + grpc_mdelem_from_strings(args->metadata_context, ":status", "200"); channeld->user_agent = grpc_mdelem_from_metadata_strings( - mdctx, grpc_mdstr_from_string(mdctx, "user-agent"), - user_agent_from_args(mdctx, channel_args)); + args->metadata_context, + grpc_mdstr_from_string(args->metadata_context, "user-agent"), + user_agent_from_args(args->metadata_context, args->channel_args)); } /* Destructor for channel data */ @@ -290,6 +269,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, const grpc_channel_filter grpc_http_client_filter = { hc_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, - "http-client"}; + init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "http-client"}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 99e5066a4e..2adfe2bb61 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -39,7 +39,6 @@ #include "src/core/profiling/timers.h" typedef struct call_data { - gpr_uint8 got_initial_metadata; gpr_uint8 seen_path; gpr_uint8 seen_post; gpr_uint8 sent_status; @@ -49,7 +48,7 @@ typedef struct call_data { grpc_linked_mdelem status; grpc_linked_mdelem content_type; - grpc_stream_op_buffer *recv_ops; + grpc_metadata_batch *recv_initial_metadata; /** Closure to call when finished with the hs_on_recv hook */ grpc_closure *on_done_recv; /** Receive closures are chained: we inject this closure as the on_done_recv @@ -154,43 +153,35 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (success) { - size_t i; - size_t nops = calld->recv_ops->nops; - grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - server_filter_args a; - if (op->type != GRPC_OP_METADATA) continue; - calld->got_initial_metadata = 1; - a.elem = elem; - a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(&op->data.metadata, server_filter, &a); - /* Have we seen the required http2 transport headers? - (:method, :scheme, content-type, with :path and :authority covered - at the channel level right now) */ - if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && - calld->seen_path && calld->seen_authority) { - /* do nothing */ - } else { - if (!calld->seen_path) { - gpr_log(GPR_ERROR, "Missing :path header"); - } - if (!calld->seen_authority) { - gpr_log(GPR_ERROR, "Missing :authority header"); - } - if (!calld->seen_post) { - gpr_log(GPR_ERROR, "Missing :method header"); - } - if (!calld->seen_scheme) { - gpr_log(GPR_ERROR, "Missing :scheme header"); - } - if (!calld->seen_te_trailers) { - gpr_log(GPR_ERROR, "Missing te trailers header"); - } - /* Error this call out */ - success = 0; - grpc_call_element_send_cancel(exec_ctx, elem); + server_filter_args a; + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, &a); + /* Have we seen the required http2 transport headers? + (:method, :scheme, content-type, with :path and :authority covered + at the channel level right now) */ + if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && + calld->seen_path && calld->seen_authority) { + /* do nothing */ + } else { + if (!calld->seen_path) { + gpr_log(GPR_ERROR, "Missing :path header"); + } + if (!calld->seen_authority) { + gpr_log(GPR_ERROR, "Missing :authority header"); + } + if (!calld->seen_post) { + gpr_log(GPR_ERROR, "Missing :method header"); } + if (!calld->seen_scheme) { + gpr_log(GPR_ERROR, "Missing :scheme header"); + } + if (!calld->seen_te_trailers) { + gpr_log(GPR_ERROR, "Missing te trailers header"); + } + /* Error this call out */ + success = 0; + grpc_call_element_send_cancel(exec_ctx, elem); } } calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); @@ -201,29 +192,21 @@ static void hs_mutate_op(grpc_call_element *elem, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - size_t i; - if (op->send_ops && !calld->sent_status) { - size_t nops = op->send_ops->nops; - grpc_stream_op *ops = op->send_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *stream_op = &ops[i]; - if (stream_op->type != GRPC_OP_METADATA) continue; - calld->sent_status = 1; - grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->status, - GRPC_MDELEM_REF(channeld->status_ok)); - grpc_metadata_batch_add_tail(&stream_op->data.metadata, - &calld->content_type, - GRPC_MDELEM_REF(channeld->content_type)); - break; - } + if (op->send_initial_metadata != NULL && !calld->sent_status) { + calld->sent_status = 1; + grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->status, + GRPC_MDELEM_REF(channeld->status_ok)); + grpc_metadata_batch_add_tail(op->send_initial_metadata, + &calld->content_type, + GRPC_MDELEM_REF(channeld->content_type)); } - if (op->recv_ops && !calld->got_initial_metadata) { + if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ - calld->recv_ops = op->recv_ops; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = &calld->hs_on_recv; + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->on_done_recv = op->on_complete; + op->on_complete = &calld->hs_on_recv; } } @@ -239,14 +222,12 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ memset(calld, 0, sizeof(*calld)); grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); - if (initial_op) hs_mutate_op(elem, initial_op); } /* Destructor for call_data */ @@ -255,34 +236,39 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down path */ - GPR_ASSERT(!is_first); - GPR_ASSERT(!is_last); + GPR_ASSERT(!args->is_last); /* initialize members */ - channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); - channeld->status_ok = grpc_mdelem_from_strings(mdctx, ":status", "200"); + channeld->te_trailers = + grpc_mdelem_from_strings(args->metadata_context, "te", "trailers"); + channeld->status_ok = + grpc_mdelem_from_strings(args->metadata_context, ":status", "200"); channeld->status_not_found = - grpc_mdelem_from_strings(mdctx, ":status", "404"); - channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST"); - channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http"); - channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https"); - channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); - channeld->path_key = grpc_mdstr_from_string(mdctx, ":path"); - channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority"); - channeld->host_key = grpc_mdstr_from_string(mdctx, "host"); - channeld->content_type = - grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); + grpc_mdelem_from_strings(args->metadata_context, ":status", "404"); + channeld->method_post = + grpc_mdelem_from_strings(args->metadata_context, ":method", "POST"); + channeld->http_scheme = + grpc_mdelem_from_strings(args->metadata_context, ":scheme", "http"); + channeld->https_scheme = + grpc_mdelem_from_strings(args->metadata_context, ":scheme", "https"); + channeld->grpc_scheme = + grpc_mdelem_from_strings(args->metadata_context, ":scheme", "grpc"); + channeld->path_key = grpc_mdstr_from_string(args->metadata_context, ":path"); + channeld->authority_key = + grpc_mdstr_from_string(args->metadata_context, ":authority"); + channeld->host_key = grpc_mdstr_from_string(args->metadata_context, "host"); + channeld->content_type = grpc_mdelem_from_strings( + args->metadata_context, "content-type", "application/grpc"); - channeld->mdctx = mdctx; + channeld->mdctx = args->metadata_context; } /* Destructor for channel data */ @@ -306,6 +292,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, - "http-server"}; + init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 48f6b1c650..2fbf1c06bb 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -73,16 +73,13 @@ static void noop_start_transport_stream_op(grpc_exec_ctx *exec_ctx, /* Constructor for call_data */ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; /* initialize members */ calld->unused = channeld->unused; - - if (initial_op) noop_mutate_op(elem, initial_op); } /* Destructor for call_data */ @@ -91,17 +88,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; - /* The first and the last filters tend to be implemented differently to - handle the case that there's no 'next' filter to call on the up or down + /* The last filter tends to be implemented differently to + handle the case that there's no 'next' filter to call on the down path */ - GPR_ASSERT(!is_first); - GPR_ASSERT(!is_last); + GPR_ASSERT(!args->is_last); /* initialize members */ channeld->unused = 0; @@ -118,5 +113,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, const grpc_channel_filter grpc_no_op_filter = { noop_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, grpc_call_next_get_peer, "no-op"}; + init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "no-op"}; diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c new file mode 100644 index 0000000000..d39a099234 --- /dev/null +++ b/src/core/channel/subchannel_call_holder.c @@ -0,0 +1,282 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/subchannel_call_holder.h" + +#include <grpc/support/alloc.h> + +#include "src/core/profiling/timers.h" + +#define GET_CALL(holder) \ + ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call))) + +#define CANCELLED_CALL ((grpc_subchannel_call *)1) + +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder, + int success); +static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success); +static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args, + int success); + +static void add_waiting_locked(grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op); +static void fail_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); +static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); + +void grpc_subchannel_call_holder_init( + grpc_subchannel_call_holder *holder, + grpc_subchannel_call_holder_pick_subchannel pick_subchannel, + void *pick_subchannel_arg) { + gpr_atm_rel_store(&holder->subchannel_call, 0); + holder->pick_subchannel = pick_subchannel; + holder->pick_subchannel_arg = pick_subchannel_arg; + gpr_mu_init(&holder->mu); + holder->subchannel = NULL; + holder->waiting_ops = NULL; + holder->waiting_ops_count = 0; + holder->waiting_ops_capacity = 0; + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; +} + +void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + grpc_subchannel_call *call = GET_CALL(holder); + if (call != NULL && call != CANCELLED_CALL) { + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder"); + } + GPR_ASSERT(holder->creation_phase == + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); + gpr_mu_destroy(&holder->mu); + GPR_ASSERT(holder->waiting_ops_count == 0); + gpr_free(holder->waiting_ops); +} + +void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op) { + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(holder); + GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + /* we failed; lock and figure out what to do */ + gpr_mu_lock(&holder->mu); +retry: + /* need to recheck that another thread hasn't set the call */ + call = GET_CALL(holder); + if (call == CANCELLED_CALL) { + gpr_mu_unlock(&holder->mu); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + if (call != NULL) { + gpr_mu_unlock(&holder->mu); + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + /* if this is a cancellation, then we can raise our cancelled flag */ + if (op->cancel_with_status != GRPC_STATUS_OK) { + if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) { + goto retry; + } else { + switch (holder->creation_phase) { + case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: + break; + case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL: + grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel, + &holder->subchannel_call); + break; + case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: + holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL, + &holder->subchannel, NULL); + break; + } + gpr_mu_unlock(&holder->mu); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + } + /* if we don't have a subchannel, try to get one */ + if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && + holder->subchannel == NULL && op->send_initial_metadata != NULL) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; + grpc_closure_init(&holder->next_step, subchannel_ready, holder); + if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, + op->send_initial_metadata, &holder->subchannel, + &holder->next_step)) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + } + } + /* if we've got a subchannel, then let's ask it to create a call */ + if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && + holder->subchannel != NULL) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL; + grpc_closure_init(&holder->next_step, call_ready, holder); + if (grpc_subchannel_create_call(exec_ctx, holder->subchannel, + holder->pollset, &holder->subchannel_call, + &holder->next_step)) { + /* got one immediately - continue the op (and any waiting ops) */ + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + retry_waiting_locked(exec_ctx, holder); + goto retry; + } + } + /* nothing to be done but wait */ + add_waiting_locked(holder, op); + gpr_mu_unlock(&holder->mu); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); +} + +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_subchannel_call_holder *holder = arg; + grpc_subchannel_call *call; + gpr_mu_lock(&holder->mu); + GPR_ASSERT(holder->creation_phase == + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + call = GET_CALL(holder); + GPR_ASSERT(call == NULL || call == CANCELLED_CALL); + if (holder->subchannel == NULL) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + fail_locked(exec_ctx, holder); + } else { + grpc_closure_init(&holder->next_step, call_ready, holder); + if (grpc_subchannel_create_call(exec_ctx, holder->subchannel, + holder->pollset, &holder->subchannel_call, + &holder->next_step)) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + /* got one immediately - continue the op (and any waiting ops) */ + retry_waiting_locked(exec_ctx, holder); + } + } + gpr_mu_unlock(&holder->mu); +} + +static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_subchannel_call_holder *holder = arg; + GPR_TIMER_BEGIN("call_ready", 0); + gpr_mu_lock(&holder->mu); + GPR_ASSERT(holder->creation_phase == + GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL); + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + if (GET_CALL(holder) != NULL) { + retry_waiting_locked(exec_ctx, holder); + } else { + fail_locked(exec_ctx, holder); + } + gpr_mu_unlock(&holder->mu); + GPR_TIMER_END("call_ready", 0); +} + +typedef struct { + grpc_transport_stream_op *ops; + size_t nops; + grpc_subchannel_call *call; +} retry_ops_args; + +static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + retry_ops_args *a = gpr_malloc(sizeof(*a)); + a->ops = holder->waiting_ops; + a->nops = holder->waiting_ops_count; + a->call = GET_CALL(holder); + if (a->call == CANCELLED_CALL) { + gpr_free(a); + fail_locked(exec_ctx, holder); + return; + } + holder->waiting_ops = NULL; + holder->waiting_ops_count = 0; + holder->waiting_ops_capacity = 0; + GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1); +} + +static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) { + retry_ops_args *a = args; + size_t i; + for (i = 0; i < a->nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]); + } + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); + gpr_free(a->ops); + gpr_free(a); +} + +static void add_waiting_locked(grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("add_waiting_locked", 0); + if (holder->waiting_ops_count == holder->waiting_ops_capacity) { + holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity); + holder->waiting_ops = + gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity * + sizeof(*holder->waiting_ops)); + } + holder->waiting_ops[holder->waiting_ops_count++] = *op; + GPR_TIMER_END("add_waiting_locked", 0); +} + +static void fail_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + size_t i; + for (i = 0; i < holder->waiting_ops_count; i++) { + grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready, + 0); + } + holder->waiting_ops_count = 0; +} + +char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder, + grpc_channel *master) { + grpc_subchannel_call *subchannel_call = GET_CALL(holder); + + if (subchannel_call) { + return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + } else { + return grpc_channel_get_target(master); + } +} diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h new file mode 100644 index 0000000000..3dd43c9c3f --- /dev/null +++ b/src/core/channel/subchannel_call_holder.h @@ -0,0 +1,83 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CHANNEL_SUBCHANNEL_CALL_HOLDER_H +#define GRPC_INTERNAL_CORE_CHANNEL_SUBCHANNEL_CALL_HOLDER_H + +#include "src/core/client_config/subchannel.h" + +typedef int (*grpc_subchannel_call_holder_pick_subchannel)( + grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, + grpc_subchannel **subchannel, grpc_closure *on_ready); + +typedef enum { + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL, + GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL +} grpc_subchannel_call_holder_creation_phase; + +typedef struct grpc_subchannel_call_holder { + /* either 0 for no call, 1 for cancelled, or a pointer to a + grpc_subchannel_call */ + gpr_atm subchannel_call; + grpc_subchannel_call_holder_pick_subchannel pick_subchannel; + void *pick_subchannel_arg; + + gpr_mu mu; + + grpc_subchannel_call_holder_creation_phase creation_phase; + grpc_subchannel *subchannel; + grpc_pollset *pollset; + + grpc_transport_stream_op *waiting_ops; + size_t waiting_ops_count; + size_t waiting_ops_capacity; + + grpc_closure next_step; +} grpc_subchannel_call_holder; + +void grpc_subchannel_call_holder_init( + grpc_subchannel_call_holder *holder, + grpc_subchannel_call_holder_pick_subchannel pick_subchannel, + void *pick_subchannel_arg); +void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); + +void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op); +char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder, + grpc_channel *master); + +#endif diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index e5bf0680ff..93312abb00 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -130,6 +130,30 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } +static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_subchannel **target) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + pending_pick *pp; + gpr_mu_lock(&p->mu); + pp = p->pending_picks; + p->pending_picks = NULL; + while (pp != NULL) { + pending_pick *next = pp->next; + if (pp->target == target) { + grpc_subchannel_del_interested_party( + exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); + *target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + gpr_free(pp); + } else { + pp->next = p->pending_picks; + p->pending_picks = pp; + } + pp = next; + } + gpr_mu_unlock(&p->mu); +} + static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = 1; p->checking_subchannel = 0; @@ -149,16 +173,16 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete) { +int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); if (p->selected) { gpr_mu_unlock(&p->mu); *target = p->selected; - grpc_exec_ctx_enqueue(exec_ctx, on_complete, 1); + return 1; } else { if (!p->started_picking) { start_picking(exec_ctx, p); @@ -172,6 +196,7 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); + return 0; } } @@ -365,8 +390,8 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, pf_exit_idle, pf_broadcast, - pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle, + pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index d0b60a0df2..1ffe32fff2 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -264,6 +264,33 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } +static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_subchannel **target) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + pending_pick *pp; + size_t i; + gpr_mu_lock(&p->mu); + pp = p->pending_picks; + p->pending_picks = NULL; + while (pp != NULL) { + pending_pick *next = pp->next; + if (pp->target == target) { + for (i = 0; i < p->num_subchannels; i++) { + grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], + pp->pollset); + } + *target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); + gpr_free(pp); + } else { + pp->next = p->pending_picks; + p->pending_picks = pp; + } + pp = next; + } + gpr_mu_unlock(&p->mu); +} + static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; @@ -286,9 +313,9 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_mu_unlock(&p->mu); } -void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete) { +int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, grpc_subchannel **target, + grpc_closure *on_complete) { size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; @@ -303,7 +330,7 @@ void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); - on_complete->cb(exec_ctx, on_complete->cb_arg, 1); + return 1; } else { if (!p->started_picking) { start_picking(exec_ctx, p); @@ -319,6 +346,7 @@ void rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->on_complete = on_complete; p->pending_picks = pp; gpr_mu_unlock(&p->mu); + return 0; } } @@ -487,8 +515,8 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, rr_shutdown, rr_pick, rr_exit_idle, rr_broadcast, - rr_check_connectivity, rr_notify_on_state_change}; + rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, + rr_broadcast, rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index c955186f7f..36a2454309 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -68,12 +68,17 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { policy->vtable->shutdown(exec_ctx, policy); } -void grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete) { - policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, target, - on_complete); +int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete) { + return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata, + target, on_complete); +} + +void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_subchannel **target) { + policy->vtable->cancel_pick(exec_ctx, policy, target); } void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 0eefe64991..a696c3ce64 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -56,9 +56,11 @@ struct grpc_lb_policy_vtable { void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** implement grpc_lb_policy_pick */ - void (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete); + int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete); + void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_subchannel **target); /** try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -106,10 +108,13 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); target for this rpc, and 'return' it by calling \a on_complete after setting \a target. Picking can be asynchronous. Any IO should be done under \a pollset. */ -void grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, grpc_closure *on_complete); +int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_pollset *pollset, + grpc_metadata_batch *initial_metadata, + grpc_subchannel **target, grpc_closure *on_complete); + +void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_subchannel **target); void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_transport_op *op); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 0401dd3868..e911a46faf 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -22,7 +22,7 @@ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT @@ -41,8 +41,10 @@ #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/timer.h" -#include "src/core/transport/connectivity_state.h" +#include "src/core/profiling/timers.h" #include "src/core/surface/channel.h" +#include "src/core/transport/connectivity_state.h" +#include "src/core/transport/connectivity_state.h" #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 @@ -69,7 +71,7 @@ typedef struct waiting_for_connect { struct waiting_for_connect *next; grpc_closure *notify; grpc_pollset *pollset; - grpc_subchannel_call **target; + gpr_atm *target; grpc_subchannel *subchannel; grpc_closure continuation; } waiting_for_connect; @@ -137,14 +139,16 @@ struct grpc_subchannel { struct grpc_subchannel_call { connection *connection; - gpr_refcount refs; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) +#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ + (((grpc_subchannel_call *)(callstack)) - 1) static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, - connection *con); + connection *con, + grpc_pollset *pollset); static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, const char *reason); @@ -163,7 +167,7 @@ static grpc_subchannel *connection_unref_locked( connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ subchannel_ref_locked((p), __FILE__, __LINE__, (r)) #define SUBCHANNEL_UNREF_LOCKED(p, r) \ @@ -173,6 +177,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); #define CONNECTION_UNREF_LOCKED(cl, p, r) \ connection_unref_locked((cl), (p), __FILE__, __LINE__, (r)) #define REF_PASS_ARGS , file, line, reason +#define REF_PASS_REASON , reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ (name), (p), (p)->refs, (p)->refs + 1, reason) @@ -185,6 +190,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c); #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p)) #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p)) #define REF_PASS_ARGS +#define REF_PASS_REASON #define REF_LOG(name, p) \ do { \ } while (0) @@ -312,9 +318,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } -void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - int iomgr_success) { +static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + int iomgr_success) { waiting_for_connect *w4c; gpr_mu_lock(&subchannel->mu); w4c = subchannel->waiting; @@ -335,6 +341,37 @@ void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, } } +void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + gpr_atm *target) { + waiting_for_connect *w4c; + int unref_count = 0; + gpr_mu_lock(&subchannel->mu); + w4c = subchannel->waiting; + subchannel->waiting = NULL; + while (w4c != NULL) { + waiting_for_connect *next = w4c->next; + if (w4c->target == target) { + grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, + w4c->pollset); + grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0); + + unref_count++; + gpr_free(w4c); + } else { + w4c->next = subchannel->waiting; + subchannel->waiting = w4c; + } + + w4c = next; + } + gpr_mu_unlock(&subchannel->mu); + + while (unref_count-- > 0) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect"); + } +} + static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; @@ -358,29 +395,35 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { - grpc_subchannel_call_create_status call_creation_status; + int call_creation_status; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); call_creation_status = grpc_subchannel_create_call( exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); - GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY); + GPR_ASSERT(call_creation_status == 1); w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } -grpc_subchannel_call_create_status grpc_subchannel_create_call( - grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset, - grpc_subchannel_call **target, grpc_closure *notify) { +int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset *pollset, gpr_atm *target, + grpc_closure *notify) { connection *con; + grpc_subchannel_call *call; + GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0); gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; CONNECTION_REF_LOCKED(con, "call"); gpr_mu_unlock(&c->mu); - *target = create_call(exec_ctx, con); - return GRPC_SUBCHANNEL_CALL_CREATE_READY; + call = create_call(exec_ctx, con, pollset); + if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) { + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set"); + } + GPR_TIMER_END("grpc_subchannel_create_call", 0); + return 1; } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -405,7 +448,8 @@ grpc_subchannel_call_create_status grpc_subchannel_create_call( } else { gpr_mu_unlock(&c->mu); } - return GRPC_SUBCHANNEL_CALL_CREATE_PENDING; + GPR_TIMER_END("grpc_subchannel_create_call", 0); + return 0; } } @@ -687,7 +731,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - grpc_subchannel_cancel_waiting_call(exec_ctx, c, iomgr_success); + cancel_waiting_calls(exec_ctx, c, iomgr_success); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); } @@ -747,26 +791,40 @@ static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx, * grpc_subchannel_call implementation */ +static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, + int success) { + grpc_subchannel_call *c = call; + gpr_mu *mu = &c->connection->subchannel->mu; + grpc_subchannel *destroy; + GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); + gpr_mu_lock(mu); + destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call"); + gpr_mu_unlock(mu); + gpr_free(c); + if (destroy != NULL) { + subchannel_destroy(exec_ctx, destroy); + } + GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); +} + void grpc_subchannel_call_ref(grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_ref(&c->refs); +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); +#else + grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c)); +#endif } void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - if (gpr_unref(&c->refs)) { - gpr_mu *mu = &c->connection->subchannel->mu; - grpc_subchannel *destroy; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); - gpr_mu_lock(mu); - destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call"); - gpr_mu_unlock(mu); - gpr_free(c); - if (destroy != NULL) { - subchannel_destroy(exec_ctx, destroy); - } - } +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason); +#else + grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); +#endif } char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, @@ -785,14 +843,16 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, } static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx, - connection *con) { + connection *con, + grpc_pollset *pollset) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); call->connection = con; - gpr_ref_init(&call->refs, 1); - grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk); + grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call, + NULL, NULL, callstk); + grpc_call_stack_set_pollset(exec_ctx, callstk, pollset); return call; } @@ -803,3 +863,8 @@ grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel) { grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) { return subchannel->master; } + +grpc_call_stack *grpc_subchannel_call_get_call_stack( + grpc_subchannel_call *subchannel_call) { + return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); +} diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index ec1cc7cc69..4769a6de68 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -44,7 +44,7 @@ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; -#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) \ grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ @@ -75,11 +75,6 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -typedef enum { - GRPC_SUBCHANNEL_CALL_CREATE_READY, - GRPC_SUBCHANNEL_CALL_CREATE_PENDING -} grpc_subchannel_call_create_status; - /** construct a subchannel call (possibly asynchronously). * * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will @@ -88,14 +83,15 @@ typedef enum { * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the * subchannel call will be created asynchronously, invoking the \a notify * callback upon completion. */ -grpc_subchannel_call_create_status grpc_subchannel_create_call( - grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, - grpc_subchannel_call **target, grpc_closure *notify); +int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + grpc_pollset *pollset, gpr_atm *target, + grpc_closure *notify); /** cancel \a call in the waiting state. */ -void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - int iomgr_success); +void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx, + grpc_subchannel *subchannel, + gpr_atm *target); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, @@ -138,6 +134,9 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call); +grpc_call_stack *grpc_subchannel_call_get_call_stack( + grpc_subchannel_call *subchannel_call); + struct grpc_subchannel_args { /** Channel filters for this channel - wrapped factories will likely want to mutate this */ |