From b5585d4f723003e4c900f02c2f4ce25e6fb26d5e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 17 Nov 2015 07:18:31 -0800 Subject: Initial pass through to make subchannels single connect --- src/core/transport/transport.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/transport') diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f296ce8251..f1059801d4 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -96,7 +96,7 @@ typedef struct grpc_transport_stream_op { typedef struct grpc_transport_op { /** called when processing of this op is done */ grpc_closure *on_consumed; - /** connectivity monitoring */ + /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ grpc_closure *on_connectivity_state_change; grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ -- cgit v1.2.3 From 7b4356194d33c4582c487d0004ef17ea7b138839 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 Nov 2015 08:15:05 -0800 Subject: Finish initial scrape, clang-format --- src/core/channel/channel_stack.c | 6 +- src/core/channel/channel_stack.h | 13 ++-- src/core/channel/client_uchannel.c | 7 +- src/core/channel/client_uchannel.h | 2 +- src/core/channel/connected_channel.c | 6 +- src/core/channel/subchannel_call_holder.c | 4 +- src/core/client_config/subchannel.c | 15 ++--- src/core/client_config/subchannel.h | 4 +- src/core/iomgr/tcp_server.h | 3 +- src/core/iomgr/tcp_server_posix.c | 13 ++-- src/core/iomgr/tcp_server_windows.c | 10 +-- src/core/surface/call.c | 21 +++--- src/core/surface/channel.c | 43 ++++++------- src/core/surface/channel_connectivity.c | 33 ++-------- src/core/surface/channel_create.c | 1 - src/core/surface/lame_client.c | 6 +- src/core/transport/metadata.c | 6 +- src/core/transport/static_metadata.c | 103 +++++------------------------- test/core/channel/channel_stack_test.c | 25 +++++--- test/core/end2end/tests/hpack_size.c | 6 +- test/core/util/reconnect_server.c | 3 +- 21 files changed, 116 insertions(+), 214 deletions(-) (limited to 'src/core/transport') diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 53433a0923..54b0375c25 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -101,10 +101,10 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, return CALL_ELEMS_FROM_STACK(call_stack) + index; } -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,int initial_refs, - grpc_iomgr_cb_func destroy, void *destroy_arg, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, + size_t filter_count, const grpc_channel_args *channel_args, grpc_channel_stack *stack) { size_t call_size = diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 593adcd7b5..766f543404 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -179,11 +179,10 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count); /* Initialize a channel stack given some filters */ -void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx,int initial_refs, - grpc_iomgr_cb_func destroy, void *destroy_arg, +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, - size_t filter_count, - const grpc_channel_args *args, + size_t filter_count, const grpc_channel_args *args, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, @@ -213,10 +212,12 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, #define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason) #else -#define GRPC_CALL_STACK_REF(call_stack, reason) grpc_stream_ref(&(call_stack)->refcount) +#define GRPC_CALL_STACK_REF(call_stack, reason) \ + grpc_stream_ref(&(call_stack)->refcount) #define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ grpc_stream_unref(exec_ctx, &(call_stack)->refcount) -#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) grpc_stream_ref(&(channel_stack)->refcount) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ + grpc_stream_ref(&(channel_stack)->refcount) #define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ grpc_stream_unref(exec_ctx, &(channel_stack)->refcount) #endif diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index b30d030197..ff50ebfb60 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -205,7 +205,7 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( } void grpc_client_uchannel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu_state); @@ -219,8 +219,6 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; - grpc_channel *master = grpc_subchannel_get_master(subchannel); - char *target = grpc_channel_get_target(master); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t n = 0; @@ -232,9 +230,8 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, GPR_ASSERT(n <= MAX_FILTERS); channel = - grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1); + grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1); - gpr_free(target); return channel; } diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h index a5cf271042..92a831493c 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/channel/client_uchannel.h @@ -48,7 +48,7 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); void grpc_client_uchannel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete); grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 73a7dcc81f..e8eb9dcfc5 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -89,9 +89,9 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, int r; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - r = grpc_transport_init_stream(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - &args->call_stack->refcount, args->server_transport_data); + r = grpc_transport_init_stream( + exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), + &args->call_stack->refcount, args->server_transport_data); GPR_ASSERT(r == 0); } diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 63069baa84..9875dd8080 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -243,8 +243,8 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, holder->waiting_ops_count = 0; } -char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call_holder *holder) { +char *grpc_subchannel_call_holder_get_peer( + grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { grpc_subchannel_call *subchannel_call = GET_CALL(holder); if (subchannel_call) { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6cc2364eaa..c84ef54eef 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -173,15 +173,16 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -void grpc_connected_subchannel_ref( - grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref(grpc_connected_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c), + REF_REASON); } /* @@ -432,8 +433,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { channel_stack_size = grpc_channel_stack_size(filters, num_filters); con = gpr_malloc(channel_stack_size); stk = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, num_filters, c->args, - stk); + grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, + num_filters, c->args, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free((void *)c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -626,10 +627,6 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call( return call; } -grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) { - return subchannel->master; -} - grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b7db363866..b50d1e8ecc 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -77,8 +77,8 @@ void grpc_subchannel_ref(grpc_subchannel *channel void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_connected_subchannel_ref( - grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 3df36174e7..3294e13797 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -64,8 +64,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle all of the multiple socket port matching logic in one place */ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, - const void *addr, - size_t addr_len); + const void *addr, size_t addr_len); /* Returns the file descriptor of the Nth listening socket on this server, or -1 if the index is out of bounds. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 1439dfcd6e..0ece77c4e8 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -374,8 +374,8 @@ error: } static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, - const struct sockaddr *addr, - size_t addr_len) { + const struct sockaddr *addr, + size_t addr_len) { grpc_tcp_listener *sp = NULL; int port; char *addr_str; @@ -410,8 +410,7 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd, } grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, - const void *addr, - size_t addr_len) { + const void *addr, size_t addr_len) { int allocated_port = -1; grpc_tcp_listener *sp; grpc_tcp_listener *sp2 = NULL; @@ -499,7 +498,8 @@ done: int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { grpc_tcp_listener *sp; - for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--); + for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--) + ; if (port_index == 0 && sp) { return sp->fd; } else { @@ -527,8 +527,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, } sp->read_closure.cb = on_read; sp->read_closure.cb_arg = sp; - grpc_fd_notify_on_read(exec_ctx, sp->emfd, - &sp->read_closure); + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index e4a1d7f498..a2425cd4d2 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -352,8 +352,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) { } static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, - const struct sockaddr *addr, - size_t addr_len) { + const struct sockaddr *addr, + size_t addr_len) { grpc_tcp_listener *sp = NULL; int port; int status; @@ -400,8 +400,7 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, } grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, - const void *addr, - size_t addr_len) { + const void *addr, size_t addr_len) { grpc_tcp_listener *sp; SOCKET sock; struct sockaddr_in6 addr6_v4mapped; @@ -459,7 +458,8 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s, int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { grpc_tcp_listener *sp; - for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--); + for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--) + ; if (port_index == 0 && sp) { return _open_osfhandle(sp->socket->socket, 0); } else { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 49a0c7e04e..315eeb0449 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -341,21 +341,18 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { } #ifdef GRPC_STREAM_REFCOUNT_DEBUG -void grpc_call_internal_ref(grpc_call *c, const char *reason) { - grpc_call_stack_ref(CALL_STACK_FROM_CALL(c), reason); -} -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c, - const char *reason) { - grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c), reason); -} +#define REF_REASON reason +#define REF_ARG , const char *reason #else -void grpc_call_internal_ref(grpc_call *c) { - grpc_call_stack_ref(CALL_STACK_FROM_CALL(c)); +#define REF_REASON "" +#define REF_ARG +#endif +void grpc_call_internal_ref(grpc_call *c REF_ARG) { + GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON); } -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) { - grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c)); +void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { + GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } -#endif static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) { size_t i; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 1632b79d0e..a78fd0aae2 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -80,6 +80,8 @@ struct grpc_channel { /* the protobuf library will (by default) start warning at 100megs */ #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) +static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success); + grpc_channel *grpc_channel_create_from_filters( grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_filter **filters, size_t num_filters, @@ -92,8 +94,6 @@ grpc_channel *grpc_channel_create_from_filters( channel->target = gpr_strdup(target); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; - /* decremented by grpc_channel_destroy */ - gpr_ref_init(&channel->refs, 1); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; @@ -151,7 +151,8 @@ grpc_channel *grpc_channel_create_from_filters( gpr_free(default_authority); } - grpc_channel_stack_init(exec_ctx, filters, num_filters, channel, args, + grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters, + num_filters, args, CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; @@ -248,17 +249,25 @@ grpc_call *grpc_channel_create_registered_call( rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline); } -#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG -void grpc_channel_internal_ref(grpc_channel *c, const char *reason) { - gpr_log(GPR_DEBUG, "CHANNEL: ref %p %d -> %d [%s]", c, c->refs.count, - c->refs.count + 1, reason); +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#define REF_REASON reason +#define REF_ARG , const char *reason #else -void grpc_channel_internal_ref(grpc_channel *c) { +#define REF_REASON "" +#define REF_ARG #endif - gpr_ref(&c->refs); +void grpc_channel_internal_ref(grpc_channel *c REF_ARG) { + GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON); } -static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) { +void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_channel *c REF_ARG) { + GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON); +} + +static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + grpc_channel *channel = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); while (channel->registered_calls) { registered_call *rc = channel->registered_calls; @@ -277,20 +286,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) { gpr_free(channel); } -#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG -void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - const char *reason) { - gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel, - channel->refs.count, channel->refs.count - 1, reason); -#else -void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_channel *channel) { -#endif - if (gpr_unref(&channel->refs)) { - destroy_channel(exec_ctx, channel); - } -} - void grpc_channel_destroy(grpc_channel *channel) { grpc_transport_op op; grpc_channel_element *elem; diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index df2774b527..ad1c9b334e 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -83,7 +83,6 @@ typedef struct { gpr_mu mu; callback_phase phase; int success; - int removed; grpc_closure on_complete; grpc_timer alarm; grpc_connectivity_state state; @@ -135,30 +134,15 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, int due_to_completion) { int delete = 0; - grpc_channel_element *client_channel_elem = NULL; - gpr_mu_lock(&w->mu); - if (w->removed == 0) { - w->removed = 1; - client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(w->channel)); - if (client_channel_elem->filter == &grpc_client_channel_filter) { - grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem, - grpc_cq_pollset(w->cq)); - } else { - grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem, - grpc_cq_pollset(w->cq)); - } - } - gpr_mu_unlock(&w->mu); if (due_to_completion) { - gpr_mu_lock(&w->mu); - w->success = 1; - gpr_mu_unlock(&w->mu); grpc_timer_cancel(exec_ctx, &w->alarm); } gpr_mu_lock(&w->mu); + if (due_to_completion) { + w->success = 1; + } switch (w->phase) { case WAITING: w->phase = CALLING_BACK; @@ -212,7 +196,6 @@ void grpc_channel_watch_connectivity_state( w->phase = WAITING; w->state = last_observed_state; w->success = 0; - w->removed = 0; w->cq = cq; w->tag = tag; w->channel = channel; @@ -223,16 +206,14 @@ void grpc_channel_watch_connectivity_state( if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); - grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem, - grpc_cq_pollset(cq)); grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, - &w->state, &w->on_complete); + grpc_cq_pollset(cq), &w->state, + &w->on_complete); } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity"); - grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem, - grpc_cq_pollset(cq)); grpc_client_uchannel_watch_connectivity_state( - &exec_ctx, client_channel_elem, &w->state, &w->on_complete); + &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, + &w->on_complete); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 026592db95..d7722f26a7 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -151,7 +151,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 4a55544ac1..a60e9d20da 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -49,7 +49,6 @@ typedef struct { } call_data; typedef struct { - grpc_channel *master; grpc_status_code error_code; const char *error_message; } channel_data; @@ -84,8 +83,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_channel_get_target(chand->master); + return NULL; } static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -111,10 +109,8 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { - channel_data *chand = elem->channel_data; GPR_ASSERT(args->is_first); GPR_ASSERT(args->is_last); - chand->master = args->master; } static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index a76d1ad3a5..56e83a0a75 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -217,7 +217,8 @@ void grpc_mdctx_global_shutdown(void) { gc_mdtab(shard); /* TODO(ctiller): GPR_ASSERT(shard->count == 0); */ if (shard->count != 0) { - gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked", shard->count); + gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked", + shard->count); } gpr_free(shard->elems); } @@ -226,7 +227,8 @@ void grpc_mdctx_global_shutdown(void) { gpr_mu_destroy(&shard->mu); /* TODO(ctiller): GPR_ASSERT(shard->count == 0); */ if (shard->count != 0) { - gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked", shard->count); + gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked", + shard->count); } gpr_free(shard->strs); } diff --git a/src/core/transport/static_metadata.c b/src/core/transport/static_metadata.c index e7aff325c2..c2bfef0397 100644 --- a/src/core/transport/static_metadata.c +++ b/src/core/transport/static_metadata.c @@ -65,92 +65,23 @@ const gpr_uint8 80, 81, 82, 33, 83, 33, 84, 33, 85, 33, 86, 33}; const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = { - "0", - "1", - "2", - "200", - "204", - "206", - "304", - "400", - "404", - "500", - "accept", - "accept-charset", - "accept-encoding", - "accept-language", - "accept-ranges", - "access-control-allow-origin", - "age", - "allow", - "application/grpc", - ":authority", - "authorization", - "cache-control", - "content-disposition", - "content-encoding", - "content-language", - "content-length", - "content-location", - "content-range", - "content-type", - "cookie", - "date", - "deflate", - "deflate,gzip", - "", - "etag", - "expect", - "expires", - "from", - "GET", - "grpc", - "grpc-accept-encoding", - "grpc-encoding", - "grpc-internal-encoding-request", - "grpc-message", - "grpc-status", - "grpc-timeout", - "gzip", - "gzip, deflate", - "host", - "http", - "https", - "identity", - "identity,deflate", - "identity,deflate,gzip", - "identity,gzip", - "if-match", - "if-modified-since", - "if-none-match", - "if-range", - "if-unmodified-since", - "last-modified", - "link", - "location", - "max-forwards", - ":method", - ":path", - "POST", - "proxy-authenticate", - "proxy-authorization", - "range", - "referer", - "refresh", - "retry-after", - ":scheme", - "server", - "set-cookie", - "/", - "/index.html", - ":status", - "strict-transport-security", - "te", - "trailers", - "transfer-encoding", - "user-agent", - "vary", - "via", + "0", "1", "2", "200", "204", "206", "304", "400", "404", "500", "accept", + "accept-charset", "accept-encoding", "accept-language", "accept-ranges", + "access-control-allow-origin", "age", "allow", "application/grpc", + ":authority", "authorization", "cache-control", "content-disposition", + "content-encoding", "content-language", "content-length", + "content-location", "content-range", "content-type", "cookie", "date", + "deflate", "deflate,gzip", "", "etag", "expect", "expires", "from", "GET", + "grpc", "grpc-accept-encoding", "grpc-encoding", + "grpc-internal-encoding-request", "grpc-message", "grpc-status", + "grpc-timeout", "gzip", "gzip, deflate", "host", "http", "https", + "identity", "identity,deflate", "identity,deflate,gzip", "identity,gzip", + "if-match", "if-modified-since", "if-none-match", "if-range", + "if-unmodified-since", "last-modified", "link", "location", "max-forwards", + ":method", ":path", "POST", "proxy-authenticate", "proxy-authorization", + "range", "referer", "refresh", "retry-after", ":scheme", "server", + "set-cookie", "/", "/index.html", ":status", "strict-transport-security", + "te", "trailers", "transfer-encoding", "user-agent", "vary", "via", "www-authenticate"}; const gpr_uint8 grpc_static_accept_encoding_metadata[8] = {0, 29, 26, 30, diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 9dbb879300..cab31bc69d 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -81,6 +81,16 @@ static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return gpr_strdup("peer"); } +static void free_channel(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_channel_stack_destroy(exec_ctx, arg); + gpr_free(arg); +} + +static void free_call(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_call_stack_destroy(exec_ctx, arg); + gpr_free(arg); +} + static void test_create_channel_stack(void) { const grpc_channel_filter filter = { call_func, channel_func, sizeof(int), call_init_func, @@ -105,16 +115,16 @@ static void test_create_channel_stack(void) { chan_args.args = &arg; channel_stack = gpr_malloc(grpc_channel_stack_size(&filters, 1)); - grpc_channel_stack_init(&exec_ctx, &filters, 1, NULL, &chan_args, - channel_stack); + grpc_channel_stack_init(&exec_ctx, 1, free_channel, channel_stack, &filters, + 1, &chan_args, channel_stack); GPR_ASSERT(channel_stack->count == 1); channel_elem = grpc_channel_stack_element(channel_stack, 0); channel_data = (int *)channel_elem->channel_data; GPR_ASSERT(*channel_data == 0); call_stack = gpr_malloc(channel_stack->call_stack_size); - grpc_call_stack_init(&exec_ctx, channel_stack, 0, NULL, NULL, NULL, NULL, - call_stack); + grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack, NULL, + NULL, call_stack); GPR_ASSERT(call_stack->count == 1); call_elem = grpc_call_stack_element(call_stack, 0); GPR_ASSERT(call_elem->filter == channel_elem->filter); @@ -123,12 +133,11 @@ static void test_create_channel_stack(void) { GPR_ASSERT(*call_data == 0); GPR_ASSERT(*channel_data == 1); - grpc_call_stack_destroy(&exec_ctx, call_stack); - gpr_free(call_stack); + GRPC_CALL_STACK_UNREF(&exec_ctx, call_stack, "done"); + grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(*channel_data == 2); - grpc_channel_stack_destroy(&exec_ctx, channel_stack); - gpr_free(channel_stack); + GRPC_CHANNEL_STACK_UNREF(&exec_ctx, channel_stack, "done"); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/test/core/end2end/tests/hpack_size.c b/test/core/end2end/tests/hpack_size.c index 297ea8d542..997969d3cc 100644 --- a/test/core/end2end/tests/hpack_size.c +++ b/test/core/end2end/tests/hpack_size.c @@ -262,9 +262,9 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) - .type == GRPC_OP_COMPLETE); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; } diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index c064fb32c6..f5d5d3c403 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -122,8 +122,7 @@ void reconnect_server_start(reconnect_server *server, int port) { memset(&addr.sin_addr, 0, sizeof(addr.sin_addr)); server->tcp_server = grpc_tcp_server_create(); - listener = - grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); + listener = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); port_added = grpc_tcp_listener_get_port(listener); GPR_ASSERT(port_added == port); -- cgit v1.2.3 From 27e5aa47a734929f902481b66fab795a8bba65ae Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 Nov 2015 16:28:54 -0800 Subject: Test fixes --- src/core/channel/channel_stack.c | 8 ++++---- src/core/client_config/lb_policies/pick_first.c | 6 +++--- src/core/surface/call.c | 9 ++++++++- src/core/surface/channel.h | 2 +- src/core/transport/chttp2_transport.c | 14 ++++++++++---- src/core/transport/transport.c | 21 +++++++++++++++++---- src/core/transport/transport.h | 15 ++++++++++++++- test/core/end2end/fixtures/h2_uchannel.c | 6 ++++++ test/core/end2end/tests/binary_metadata.c | 2 +- test/core/end2end/tests/cancel_after_accept.c | 2 +- test/core/end2end/tests/cancel_after_client_done.c | 2 +- test/core/end2end/tests/cancel_after_invoke.c | 2 +- test/core/end2end/tests/cancel_before_invoke.c | 2 +- test/core/end2end/tests/cancel_in_a_vacuum.c | 2 +- test/core/end2end/tests/cancel_with_status.c | 2 +- test/core/end2end/tests/census_simple_request.c | 2 +- test/core/end2end/tests/compressed_payload.c | 2 +- test/core/end2end/tests/empty_batch.c | 2 +- test/core/end2end/tests/graceful_server_shutdown.c | 2 +- test/core/end2end/tests/high_initial_seqno.c | 2 +- test/core/end2end/tests/hpack_size.c | 2 +- test/core/end2end/tests/invoke_large_request.c | 2 +- test/core/end2end/tests/large_metadata.c | 2 +- test/core/end2end/tests/max_concurrent_streams.c | 2 +- test/core/end2end/tests/max_message_length.c | 2 +- test/core/end2end/tests/metadata.c | 2 +- test/core/end2end/tests/negative_deadline.c | 2 +- test/core/end2end/tests/no_op.c | 2 +- test/core/end2end/tests/payload.c | 2 +- test/core/end2end/tests/ping_pong_streaming.c | 2 +- test/core/end2end/tests/registered_call.c | 2 +- test/core/end2end/tests/request_with_flags.c | 2 +- test/core/end2end/tests/request_with_payload.c | 2 +- test/core/end2end/tests/server_finishes_request.c | 2 +- test/core/end2end/tests/shutdown_finishes_calls.c | 2 +- test/core/end2end/tests/shutdown_finishes_tags.c | 2 +- test/core/end2end/tests/simple_request.c | 2 +- test/core/end2end/tests/trailing_metadata.c | 2 +- 38 files changed, 93 insertions(+), 48 deletions(-) (limited to 'src/core/transport') diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 54b0375c25..559ad0a32c 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -116,8 +116,8 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, size_t i; stack->count = filter_count; - gpr_ref_init(&stack->refcount.refs, initial_refs); - grpc_closure_init(&stack->refcount.destroy, destroy, destroy_arg); + GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg, + "CHANNEL_STACK"); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + @@ -169,8 +169,8 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, size_t i; call_stack->count = count; - gpr_ref_init(&call_stack->refcount.refs, initial_refs); - grpc_closure_init(&call_stack->refcount.destroy, destroy, destroy_arg); + GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy, + destroy_arg, "CALL_STACK"); call_elems = CALL_ELEMS_FROM_STACK(call_stack); user_data = ((char *)call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index e093c3e9a9..1c9652fc47 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -363,9 +363,9 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { - if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) - continue; - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); + if (selected != grpc_subchannel_get_connected_subchannel(subchannels[i])) { + grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); + } GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } if (p->selected) { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 315eeb0449..1dac6efa11 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -739,8 +739,15 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, char *grpc_call_get_peer(grpc_call *call) { grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *result = elem->filter->get_peer(&exec_ctx, elem); + char *result; GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); + result = elem->filter->get_peer(&exec_ctx, elem); + if (result == NULL) { + result = grpc_channel_get_target(call->channel); + } + if (result == NULL) { + result = gpr_strdup("unknown"); + } grpc_exec_ctx_finish(&exec_ctx); return result; } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 7dea609ebc..3d2ff23542 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -53,7 +53,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); -#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, const char *reason); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index dedd208cbf..79ebaa70fe 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -898,10 +898,16 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); - if (op->on_connectivity_state_change) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change); + if (op->on_connectivity_state_change != NULL) { + if (op->connectivity_state != NULL) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); + } else { + grpc_connectivity_state_change_unsubscribe( + exec_ctx, &t->channel_callback.state_tracker, + op->on_connectivity_state_change); + } } if (op->send_goaway) { diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index f2bebc62f3..2ab978be46 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -40,8 +40,8 @@ #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); - gpr_log(GPR_DEBUG, "STREAM %p:%p REF %d->%d %s", refcount, - refcount->destroy.cb_arg, val, val + 1, reason); + gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type, + refcount, refcount->destroy.cb_arg, val, val + 1, reason); #else void grpc_stream_ref(grpc_stream_refcount *refcount) { #endif @@ -52,8 +52,8 @@ void grpc_stream_ref(grpc_stream_refcount *refcount) { void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, const char *reason) { gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); - gpr_log(GPR_DEBUG, "STREAM %p:%p UNREF %d->%d %s", refcount, - refcount->destroy.cb_arg, val, val - 1, reason); + gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type, + refcount, refcount->destroy.cb_arg, val, val - 1, reason); #else void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { @@ -63,6 +63,19 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, } } +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg, + const char *object_type) { + refcount->object_type = object_type; +#else +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg) { +#endif + gpr_ref_init(&refcount->refs, initial_refs); + grpc_closure_init(&refcount->destroy, cb, cb_arg); +} + size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f1059801d4..f94f0ae76e 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,19 +50,32 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; +/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ + typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + const char *object_type; +#endif } grpc_stream_refcount; -/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ #ifdef GRPC_STREAM_REFCOUNT_DEBUG +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg, + const char *object_type); void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason); void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, const char *reason); +#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ + grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype) #else +void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, + grpc_iomgr_cb_func cb, void *cb_arg); void grpc_stream_ref(grpc_stream_refcount *refcount); void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); +#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \ + grpc_stream_ref_init(rc, ir, cb, cb_arg) #endif /* Transport stream op: a set of operations to perform on a transport diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 6c1638590f..d194df1c2d 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -241,6 +241,10 @@ static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { } } +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, int success) { + grpc_pollset_destroy(arg); +} + static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset pollset; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -259,6 +263,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); } + grpc_pollset_shutdown(&exec_ctx, &pollset, + grpc_closure_create(destroy_pollset, &pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); grpc_subchannel_del_interested_party(&exec_ctx, c, &pollset); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/end2end/tests/binary_metadata.c b/test/core/end2end/tests/binary_metadata.c index 58636ac2a2..e6404d6f51 100644 --- a/test/core/end2end/tests/binary_metadata.c +++ b/test/core/end2end/tests/binary_metadata.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index d384cd1150..68bd4bc36e 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -55,8 +55,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/cancel_after_client_done.c b/test/core/end2end/tests/cancel_after_client_done.c index e267d80493..1e919ce19e 100644 --- a/test/core/end2end/tests/cancel_after_client_done.c +++ b/test/core/end2end/tests/cancel_after_client_done.c @@ -55,8 +55,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c index ef9165ee25..a84f9be14e 100644 --- a/test/core/end2end/tests/cancel_after_invoke.c +++ b/test/core/end2end/tests/cancel_after_invoke.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s/%s", test_name, config.name, mode.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c index ce2b402f1b..61574df3d0 100644 --- a/test/core/end2end/tests/cancel_before_invoke.c +++ b/test/core/end2end/tests/cancel_before_invoke.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/cancel_in_a_vacuum.c b/test/core/end2end/tests/cancel_in_a_vacuum.c index 6c57299e1a..6435d22ee9 100644 --- a/test/core/end2end/tests/cancel_in_a_vacuum.c +++ b/test/core/end2end/tests/cancel_in_a_vacuum.c @@ -55,8 +55,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/cancel_with_status.c b/test/core/end2end/tests/cancel_with_status.c index 2005e5f881..2e36902a51 100644 --- a/test/core/end2end/tests/cancel_with_status.c +++ b/test/core/end2end/tests/cancel_with_status.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c index 29f52ed35a..b747b639d1 100644 --- a/test/core/end2end/tests/census_simple_request.c +++ b/test/core/end2end/tests/census_simple_request.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/compressed_payload.c b/test/core/end2end/tests/compressed_payload.c index f321fe1e7c..0d07110aef 100644 --- a/test/core/end2end/tests/compressed_payload.c +++ b/test/core/end2end/tests/compressed_payload.c @@ -59,8 +59,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/empty_batch.c b/test/core/end2end/tests/empty_batch.c index 59eb8f18f9..f331aa92e0 100644 --- a/test/core/end2end/tests/empty_batch.c +++ b/test/core/end2end/tests/empty_batch.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index 6b786aa89a..8efa5a34d0 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c index 75bb133439..578fdf7b35 100644 --- a/test/core/end2end/tests/high_initial_seqno.c +++ b/test/core/end2end/tests/high_initial_seqno.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/hpack_size.c b/test/core/end2end/tests/hpack_size.c index 997969d3cc..f16883ecfd 100644 --- a/test/core/end2end/tests/hpack_size.c +++ b/test/core/end2end/tests/hpack_size.c @@ -241,8 +241,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index 67cd303fa6..c612af91e3 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/large_metadata.c b/test/core/end2end/tests/large_metadata.c index 75d3f71cae..763f75d59d 100644 --- a/test/core/end2end/tests/large_metadata.c +++ b/test/core/end2end/tests/large_metadata.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index bb64200d9e..d39aabaf70 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c index b3d8304d0b..c311f0a44e 100644 --- a/test/core/end2end/tests/max_message_length.c +++ b/test/core/end2end/tests/max_message_length.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/metadata.c b/test/core/end2end/tests/metadata.c index c325d5a37c..2593cde027 100644 --- a/test/core/end2end/tests/metadata.c +++ b/test/core/end2end/tests/metadata.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/negative_deadline.c b/test/core/end2end/tests/negative_deadline.c index 8fe9e7bcc5..23b8591e25 100644 --- a/test/core/end2end/tests/negative_deadline.c +++ b/test/core/end2end/tests/negative_deadline.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/no_op.c b/test/core/end2end/tests/no_op.c index ec33af78ef..dbaad3004e 100644 --- a/test/core/end2end/tests/no_op.c +++ b/test/core/end2end/tests/no_op.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/payload.c b/test/core/end2end/tests/payload.c index b440fbab21..df44c0de1e 100644 --- a/test/core/end2end/tests/payload.c +++ b/test/core/end2end/tests/payload.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c index 804862ff58..27180dd679 100644 --- a/test/core/end2end/tests/ping_pong_streaming.c +++ b/test/core/end2end/tests/ping_pong_streaming.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c index eea91e6c3b..ef4d5063b5 100644 --- a/test/core/end2end/tests/registered_call.c +++ b/test/core/end2end/tests/registered_call.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/request_with_flags.c b/test/core/end2end/tests/request_with_flags.c index 5ea845e0e5..0ad5a4612e 100644 --- a/test/core/end2end/tests/request_with_flags.c +++ b/test/core/end2end/tests/request_with_flags.c @@ -55,8 +55,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 56c199baf3..ee5b071372 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/server_finishes_request.c b/test/core/end2end/tests/server_finishes_request.c index c77e31bca3..94863e7280 100644 --- a/test/core/end2end/tests/server_finishes_request.c +++ b/test/core/end2end/tests/server_finishes_request.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/shutdown_finishes_calls.c b/test/core/end2end/tests/shutdown_finishes_calls.c index ad7def09a9..aa679081ec 100644 --- a/test/core/end2end/tests/shutdown_finishes_calls.c +++ b/test/core/end2end/tests/shutdown_finishes_calls.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/shutdown_finishes_tags.c b/test/core/end2end/tests/shutdown_finishes_tags.c index 9b678a1754..53a1573e16 100644 --- a/test/core/end2end/tests/shutdown_finishes_tags.c +++ b/test/core/end2end/tests/shutdown_finishes_tags.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index e9965a91ba..ce5df86a92 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -56,8 +56,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } diff --git a/test/core/end2end/tests/trailing_metadata.c b/test/core/end2end/tests/trailing_metadata.c index 306ef7e3aa..71f10eb8f5 100644 --- a/test/core/end2end/tests/trailing_metadata.c +++ b/test/core/end2end/tests/trailing_metadata.c @@ -54,8 +54,8 @@ static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, grpc_end2end_test_fixture f; gpr_log(GPR_INFO, "%s/%s", test_name, config.name); f = config.create_fixture(client_args, server_args); - config.init_client(&f, client_args); config.init_server(&f, server_args); + config.init_client(&f, client_args); return f; } -- cgit v1.2.3 From 53ee0407bb24daf1f6a0f71530ad76215b94f424 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 25 Nov 2015 09:07:09 -0800 Subject: Get ephemeral connections working again --- src/core/client_config/subchannel.c | 2 ++ src/core/transport/connectivity_state.c | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'src/core/transport') diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index b210e7f679..16f9346a35 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -590,6 +590,7 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, grpc_subchannel_call *c = call; GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call"); gpr_free(c); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } @@ -633,6 +634,7 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call( gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); call->connection = con; + GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call, NULL, NULL, callstk); grpc_call_stack_set_pollset(exec_ctx, callstk, pollset); diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 09b298c131..a435ba49f2 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -88,7 +88,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker) { if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "CONWATCH: %s: get %s", tracker->name, + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: get %s", tracker, tracker->name, grpc_connectivity_state_name(tracker->current_state)); } return tracker->current_state; @@ -98,8 +98,8 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p", - tracker->name, grpc_connectivity_state_name(*current), + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", + tracker, tracker->name, grpc_connectivity_state_name(*current), grpc_connectivity_state_name(tracker->current_state), notify); } if (tracker->current_state != *current) { @@ -142,7 +142,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, const char *reason) { grpc_connectivity_state_watcher *w; if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name, + gpr_log(GPR_DEBUG, "SET: %p %s: %s --> %s [%s]", tracker, tracker->name, grpc_connectivity_state_name(tracker->current_state), grpc_connectivity_state_name(state), reason); } -- cgit v1.2.3 From 86c99580a0891697f3c5227ae2fd2911734098fc Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 25 Nov 2015 15:22:26 -0800 Subject: Load balancing interest management fixes --- src/core/channel/client_channel.c | 36 ++++++++++++++++- src/core/client_config/lb_policies/pick_first.c | 40 +++++-------------- src/core/client_config/lb_policies/round_robin.c | 49 +++++++----------------- src/core/client_config/lb_policy.c | 1 + src/core/client_config/lb_policy.h | 1 + src/core/client_config/subchannel.c | 38 +++++++++++------- src/core/client_config/subchannel.h | 10 +---- src/core/iomgr/pollset_set.h | 22 +++++++---- src/core/iomgr/pollset_set_posix.c | 44 +++++++++++++++++++++ src/core/iomgr/pollset_set_posix.h | 4 ++ src/core/iomgr/pollset_set_windows.c | 8 ++++ src/core/transport/transport.h | 2 +- test/core/end2end/fixtures/h2_uchannel.c | 6 ++- 13 files changed, 161 insertions(+), 100 deletions(-) (limited to 'src/core/transport') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index ae1f3cf4c2..5fec87c67c 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -78,6 +78,8 @@ typedef struct client_channel_channel_data { int exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; + /** interested parties */ + grpc_pollset_set interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -177,6 +179,10 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; + if (lb_policy != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, &chand->interested_parties); + } + gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; @@ -220,6 +226,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -263,6 +270,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; @@ -391,6 +399,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_pollset_set_init(&chand->interested_parties); } /* Destructor for channel_data */ @@ -403,9 +412,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); + grpc_pollset_set_destroy(&chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -465,12 +476,35 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( return out; } +typedef struct { + channel_data *chand; + grpc_pollset *pollset; + grpc_closure *on_complete; + grpc_closure my_closure; +} external_connectivity_watcher; + +static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { + external_connectivity_watcher *w = arg; + grpc_closure *follow_up = w->on_complete; + grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); +} + void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; + external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + w->chand = chand; + w->pollset = pollset; + w->on_complete = on_complete; + grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w); + GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_mu_lock(&chand->mu_config); grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, on_complete); + exec_ctx, &chand->state_tracker, state, &w->my_closure); gpr_mu_unlock(&chand->mu_config); } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 4ecfc11cdd..c0f1d3fd94 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -76,24 +76,6 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } -} - -static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } -} - void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; @@ -114,7 +96,6 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - del_interested_parties_locked(exec_ctx, p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; @@ -124,6 +105,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); pp = next; @@ -140,8 +122,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -161,6 +142,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -187,8 +169,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_subchannel_add_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -275,8 +256,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } @@ -288,15 +268,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); - del_interested_parties_locked(exec_ctx, p); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(exec_ctx, p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -309,13 +288,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p); - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; + GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], + p->subchannels[p->num_subchannels]); GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], "pick_first"); if (p->num_subchannels == 0) { @@ -336,7 +315,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(exec_ctx, p); goto loop; } } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index ca0d6abd07..10688b3fa5 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -200,23 +200,10 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - round_robin_lb_policy *p, - const size_t subchannel_idx) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[subchannel_idx], pp->pollset); - } -} - void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin"); } @@ -243,15 +230,10 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } - p->shutdown = 1; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -268,17 +250,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - size_t i; gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pp->pollset); - } + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -298,6 +276,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { for (i = 0; i < p->num_subchannels; i++) { p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], + &p->base.interested_parties, &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i]); GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); @@ -316,7 +295,6 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, grpc_closure *on_complete) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; @@ -336,10 +314,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pollset); - } + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -398,13 +373,15 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, + p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_CONNECTING: @@ -412,14 +389,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, *this_connectivity, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); /* remove from ready list if still present */ @@ -433,7 +413,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_transient_failure"); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked( p, p->subchannel_index_to_readylist_node[this_idx]); diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 5605f788a5..8d9287c36a 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -37,6 +37,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_ref_init(&policy->refs, 1); + grpc_pollset_set_init(&policy->interested_parties); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 2889b8e55d..985c96630f 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -48,6 +48,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_refcount refs; + grpc_pollset_set interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 16f9346a35..3872cacfa0 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -224,18 +224,6 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, } } -void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset); -} - -void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset); -} - static gpr_uint32 random_seed() { return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); } @@ -298,14 +286,38 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } +typedef struct { + grpc_subchannel *subchannel; + grpc_pollset_set *pollset_set; + grpc_closure *notify; + grpc_closure closure; +} external_state_watcher; + +static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { + external_state_watcher *w = arg; + grpc_closure *follow_up = w->notify; + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + GRPC_SUBCHANNEL_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, success); +} + void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify) { int do_connect = 0; + external_state_watcher *w = gpr_malloc(sizeof(*w)); + w->subchannel = c; + w->pollset_set = interested_parties; + w->notify = notify; + grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + GRPC_SUBCHANNEL_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, notify)) { + exec_ctx, &c->state_tracker, state, &w->closure)) { do_connect = 1; c->connecting = 1; /* released by connection */ diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b50d1e8ecc..20d74e9f10 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -109,6 +109,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( Updates *state with the new state of the channel */ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify); void grpc_connected_subchannel_notify_on_state_change( @@ -124,15 +125,6 @@ void grpc_connected_subchannel_state_change_unsubscribe( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_closure *subscribed_notify); -/** express interest in \a channel's activities through \a pollset. */ -void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset *pollset); -/** stop following \a channel's activity through \a pollset. */ -void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset *pollset); - /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 0fdcba01a4..e93a3dbb56 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -49,13 +49,19 @@ #include "src/core/iomgr/pollset_set_windows.h" #endif -void grpc_pollset_set_init(grpc_pollset_set* pollset_set); -void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set); -void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, - grpc_pollset* pollset); -void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, - grpc_pollset* pollset); +void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); +void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset); +void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset); +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item); +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index c86ed3d5da..f29ef7cdcf 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -55,6 +55,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); } gpr_free(pollset_set->pollsets); + gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); } @@ -99,6 +100,43 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&pollset_set->mu); } +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i, j; + gpr_mu_lock(&bag->mu); + if (bag->pollset_set_count == bag->pollset_set_capacity) { + bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); + bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + } + bag->pollset_sets[bag->pollset_set_count++] = item; + for (i = 0, j = 0; i < bag->fd_count; i++) { + if (grpc_fd_is_orphaned(bag->fds[i])) { + GRPC_FD_UNREF(bag->fds[i], "pollset"); + } else { + grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); + bag->fds[j++] = bag->fds[i]; + } + } + bag->fd_count = j; + gpr_mu_unlock(&bag->mu); +} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i; + gpr_mu_lock(&bag->mu); + for (i = 0; i < bag->pollset_set_count; i++) { + if (bag->pollset_sets[i] == item) { + bag->pollset_set_count--; + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]); + break; + } + } + gpr_mu_unlock(&bag->mu); +} + void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd) { size_t i; @@ -113,6 +151,9 @@ void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, for (i = 0; i < pollset_set->pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } gpr_mu_unlock(&pollset_set->mu); } @@ -129,6 +170,9 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, break; } } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } gpr_mu_unlock(&pollset_set->mu); } diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 05234fb642..4820a61e4b 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -44,6 +44,10 @@ typedef struct grpc_pollset_set { size_t pollset_capacity; grpc_pollset **pollsets; + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + size_t fd_count; size_t fd_capacity; grpc_fd **fds; diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 53d5d3dcd4..04d88839cb 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -49,4 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f94f0ae76e..08f34ff0aa 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,7 +50,7 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ +#define GRPC_STREAM_REFCOUNT_DEBUG typedef struct grpc_stream_refcount { gpr_refcount refs; diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index a1b64573b8..25540256c4 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -247,9 +247,11 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, int success) { static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset pollset; + grpc_pollset_set interested_parties; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_init(&pollset); - grpc_subchannel_add_interested_party(&exec_ctx, c, &pollset); + grpc_pollset_set_add_pollset(&exec_ctx, &interested_parties, &pollset); + grpc_subchannel_add_interested_parties(&exec_ctx, c, &interested_parties); grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); @@ -266,7 +268,7 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset_shutdown(&exec_ctx, &pollset, grpc_closure_create(destroy_pollset, &pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); - grpc_subchannel_del_interested_party(&exec_ctx, c, &pollset); + grpc_subchannel_del_interested_parties(&exec_ctx, c, &interested_parties); grpc_exec_ctx_finish(&exec_ctx); return grpc_subchannel_get_connected_subchannel(c); } -- cgit v1.2.3 From 50ec2670a45799b95f2910f26a5a9f79ab2e8404 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 27 Nov 2015 21:45:11 -0800 Subject: Most of the way to auto-cleanup subchannels --- src/core/channel/channel_stack.c | 3 +- src/core/channel/channel_stack.h | 1 + src/core/channel/client_channel.c | 9 ----- src/core/client_config/lb_policies/pick_first.c | 44 +-------------------- src/core/client_config/lb_policies/round_robin.c | 25 +----------- src/core/client_config/lb_policy.c | 6 +-- src/core/client_config/lb_policy.h | 7 ---- src/core/client_config/subchannel.c | 50 +++++------------------- src/core/client_config/subchannel.h | 3 -- src/core/iomgr/fd_posix.c | 6 ++- src/core/iomgr/pollset_set_posix.c | 13 +++--- src/core/surface/channel.c | 2 +- src/core/transport/transport.h | 2 - test/core/channel/channel_stack_test.c | 2 +- test/core/end2end/fixtures/h2_uchannel.c | 12 +++--- 15 files changed, 36 insertions(+), 149 deletions(-) (limited to 'src/core/transport') diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 559ad0a32c..d2f6a90ca8 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -106,6 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *channel_args, + const char *name, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + @@ -117,7 +118,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, stack->count = filter_count; GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg, - "CHANNEL_STACK"); + name); elems = CHANNEL_ELEMS_FROM_STACK(stack); user_data = ((char *)elems) + diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 766f543404..bb7081b2a2 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -183,6 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args, + const char *name, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 5fec87c67c..5ad2e075c3 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -260,10 +260,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } lb_policy = chand->lb_policy; - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "broadcast"); - } - if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); @@ -282,11 +278,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, destroy_resolver); GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); } - - if (lb_policy) { - grpc_lb_policy_broadcast(exec_ctx, lb_policy, op); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast"); - } } typedef struct { diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index c0f1d3fd94..d83f3718c2 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -185,7 +185,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; size_t i; - grpc_transport_op op; size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels; grpc_connected_subchannel *exclude_subchannel; @@ -199,12 +198,6 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { - if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != - exclude_subchannel) { - memset(&op, 0, sizeof(op)); - op.disconnect = 1; - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); - } GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); } @@ -323,41 +316,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&p->mu); } -static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_transport_op *op) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - size_t i; - size_t n; - grpc_subchannel **subchannels; - grpc_connected_subchannel *selected; - - gpr_mu_lock(&p->mu); - n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - selected = p->selected; - if (selected) { - GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); - } - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); - } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - if (selected != grpc_subchannel_get_connected_subchannel(subchannels[i])) { - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); - } - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); - } - if (p->selected) { - grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, - "pf_broadcast_to_selected"); - } - gpr_free(subchannels); -} - static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; @@ -380,7 +338,7 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle, - pf_broadcast, pf_check_connectivity, pf_notify_on_state_change}; + pf_check_connectivity, pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 10688b3fa5..16afd8c10e 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -451,29 +451,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } } -static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_transport_op *op) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; - size_t n; - grpc_subchannel **subchannels; - - gpr_mu_lock(&p->mu); - n = p->num_subchannels; - subchannels = gpr_malloc(n * sizeof(*subchannels)); - for (i = 0; i < n; i++) { - subchannels[i] = p->subchannels[i]; - GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); - } - gpr_mu_unlock(&p->mu); - - for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast"); - } - gpr_free(subchannels); -} - static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; @@ -497,7 +474,7 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, - rr_broadcast, rr_check_connectivity, rr_notify_on_state_change}; + rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 8d9287c36a..2b874daef6 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -61,6 +61,7 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy, void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { #endif if (gpr_unref(&policy->refs)) { + grpc_pollset_set_destroy(&policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } @@ -83,11 +84,6 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, policy->vtable->cancel_pick(exec_ctx, policy, target); } -void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op) { - policy->vtable->broadcast(exec_ctx, policy, op); -} - void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { policy->vtable->exit_idle(exec_ctx, policy); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 985c96630f..96b2bdf4ca 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -66,10 +66,6 @@ struct grpc_lb_policy_vtable { /** try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** broadcast a transport op to all subchannels */ - void (*broadcast)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op); - /** check the current connectivity of the lb_policy */ grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -118,9 +114,6 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); -void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_transport_op *op); - void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 1c66a73146..434a37cf6b 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -238,7 +238,7 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); - if ((old_refs & STRONG_REF_MASK) == 0) { + if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { disconnect(exec_ctx, c); } GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref"); @@ -351,7 +351,7 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, do_connect = 1; c->connecting = 1; /* released by connection */ - GRPC_SUBCHANNEL_REF(c, "connecting"); + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); } gpr_mu_unlock(&c->mu); @@ -369,40 +369,6 @@ void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&c->mu); } -void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_transport_op *op) { - grpc_connected_subchannel *con; - int cancel_alarm = 0; - gpr_mu_lock(&c->mu); - con = GET_CONNECTED_SUBCHANNEL(c, no_barrier); - if (con != NULL) { - GRPC_CONNECTED_SUBCHANNEL_REF(con, "transport-op"); - } - if (op->disconnect) { - c->disconnected = 1; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); - if (c->have_alarm) { - cancel_alarm = 1; - } - } - gpr_mu_unlock(&c->mu); - - if (con != NULL) { - grpc_connected_subchannel_process_transport_op(exec_ctx, con, op); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "transport-op"); - } - - if (cancel_alarm) { - grpc_timer_cancel(exec_ctx, &c->alarm); - } - - if (op->disconnect) { - grpc_connector_shutdown(exec_ctx, c->connector); - } -} - void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { @@ -488,7 +454,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { con = gpr_malloc(channel_stack_size); stk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_stack_init(exec_ctx, 1, connection_destroy, con, filters, - num_filters, c->args, stk); + num_filters, c->args, "CONNECTED_SUBCHANNEL", stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free((void *)c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -507,7 +473,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_free(sw_subchannel); gpr_free((void *)filters); grpc_channel_stack_destroy(exec_ctx, stk); - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + gpr_free(con); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); return; } @@ -519,7 +486,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure); @@ -588,17 +555,18 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); } else { - GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } } static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { grpc_subchannel *c = arg; + if (c->connecting_result.transport != NULL) { publish_transport(exec_ctx, c); } else if (c->disconnected) { - /* do nothing */ + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_mu_lock(&c->mu); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b64a26561b..66c13990e9 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -105,9 +105,6 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call( grpc_pollset *pollset); /** process a transport level op */ -void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - grpc_transport_op *op); void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *subchannel, grpc_transport_op *op); diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 7ff80e6cf8..81c19ca797 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -43,6 +43,7 @@ #include #include +#include #include #define CLOSURE_NOT_READY ((grpc_closure *)0) @@ -158,7 +159,10 @@ void grpc_fd_global_shutdown(void) { grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); - grpc_iomgr_register_object(&r->iomgr_object, name); + char *name2; + gpr_asprintf(&name2, "%s fd=%d", name, fd); + grpc_iomgr_register_object(&r->iomgr_object, name2); + gpr_free(name2); #ifdef GRPC_FD_REF_COUNT_DEBUG gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name); #endif diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index f29ef7cdcf..7f0b34c36b 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -52,7 +52,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { size_t i; gpr_mu_destroy(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); @@ -74,7 +74,7 @@ void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, pollset_set->pollsets[pollset_set->pollset_count++] = pollset; for (i = 0, j = 0; i < pollset_set->fd_count; i++) { if (grpc_fd_is_orphaned(pollset_set->fds[i])) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } else { grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); pollset_set->fds[j++] = pollset_set->fds[i]; @@ -107,12 +107,13 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bag->mu); if (bag->pollset_set_count == bag->pollset_set_capacity) { bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + bag->pollset_sets = gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); } bag->pollset_sets[bag->pollset_set_count++] = item; for (i = 0, j = 0; i < bag->fd_count; i++) { if (grpc_fd_is_orphaned(bag->fds[i])) { - GRPC_FD_UNREF(bag->fds[i], "pollset"); + GRPC_FD_UNREF(bag->fds[i], "pollset_set"); } else { grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); bag->fds[j++] = bag->fds[i]; @@ -130,7 +131,9 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, for (i = 0; i < bag->pollset_set_count; i++) { if (bag->pollset_sets[i] == item) { bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]); + GPR_SWAP(grpc_pollset_set *, + bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); break; } } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a78fd0aae2..92fd3dadd7 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -152,7 +152,7 @@ grpc_channel *grpc_channel_create_from_filters( } grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters, - num_filters, args, + num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 08f34ff0aa..4de72d7d75 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,8 +50,6 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -#define GRPC_STREAM_REFCOUNT_DEBUG - typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index cab31bc69d..f1bb37c0bf 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -116,7 +116,7 @@ static void test_create_channel_stack(void) { channel_stack = gpr_malloc(grpc_channel_stack_size(&filters, 1)); grpc_channel_stack_init(&exec_ctx, 1, free_channel, channel_stack, &filters, - 1, &chan_args, channel_stack); + 1, &chan_args, "test", channel_stack); GPR_ASSERT(channel_stack->count == 1); channel_elem = grpc_channel_stack_element(channel_stack, 0); channel_data = (int *)channel_elem->channel_data; diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 25540256c4..0c062d798a 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -233,11 +233,12 @@ static grpc_end2end_test_fixture chttp2_create_fixture_micro_fullstack( } grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE; +grpc_pollset_set g_interested_parties; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { if (g_state != GRPC_CHANNEL_READY) { grpc_subchannel_notify_on_state_change( - exec_ctx, arg, &g_state, grpc_closure_create(state_changed, arg)); + exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg)); } } @@ -247,12 +248,11 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, int success) { static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset pollset; - grpc_pollset_set interested_parties; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_init(&pollset); - grpc_pollset_set_add_pollset(&exec_ctx, &interested_parties, &pollset); - grpc_subchannel_add_interested_parties(&exec_ctx, c, &interested_parties); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_state, + grpc_pollset_set_init(&g_interested_parties); + grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset); + grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); @@ -267,8 +267,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { } grpc_pollset_shutdown(&exec_ctx, &pollset, grpc_closure_create(destroy_pollset, &pollset)); + grpc_pollset_set_destroy(&g_interested_parties); gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); - grpc_subchannel_del_interested_parties(&exec_ctx, c, &interested_parties); grpc_exec_ctx_finish(&exec_ctx); return grpc_subchannel_get_connected_subchannel(c); } -- cgit v1.2.3 From 486130455f16d30b3c1f792bf302f0858b334d3d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 29 Nov 2015 14:45:11 -0800 Subject: Sanitize unsubscription to be callback preserving --- src/core/channel/client_channel.c | 2 - src/core/channel/client_uchannel.c | 5 +- src/core/client_config/lb_policies/pick_first.c | 20 +++-- src/core/client_config/lb_policy.c | 58 ++++++++----- src/core/client_config/lb_policy.h | 18 ++++- src/core/client_config/subchannel.c | 103 +++++++++++++----------- src/core/client_config/subchannel.h | 9 --- src/core/iomgr/fd_posix.h | 1 + src/core/iomgr/iomgr.c | 1 + src/core/transport/chttp2_transport.c | 12 +-- src/core/transport/connectivity_state.c | 69 ++++++++-------- src/core/transport/connectivity_state.h | 11 +-- src/core/transport/transport.h | 2 + 13 files changed, 173 insertions(+), 138 deletions(-) (limited to 'src/core/transport') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 5ad2e075c3..9d3690dc7d 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -227,7 +227,6 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, if (old_lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); - grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -267,7 +266,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); - grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index f8c621b8eb..f0682d5946 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -166,8 +166,9 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_connected_subchannel_state_change_unsubscribe( - exec_ctx, chand->connected_subchannel, &chand->connectivity_cb); + /* cancel subscription */ + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, NULL, &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index d83f3718c2..a622d98317 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -96,11 +96,17 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); + gpr_log(GPR_DEBUG, "LB_POLICY: pf_shutdown: %p", p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); + if (p->selected != NULL) { + grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, NULL, &p->connectivity_changed); + } else { + grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[p->checking_connectivity], NULL, NULL, &p->connectivity_changed); + } gpr_mu_unlock(&p->mu); while (pp != NULL) { pending_pick *next = pp->next; @@ -139,7 +145,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], &p->base.interested_parties, @@ -195,7 +201,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, p->subchannels = NULL; exclude_subchannel = p->selected; gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); @@ -212,9 +218,11 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&p->mu); + gpr_log(GPR_DEBUG, "LB_POLICY: pf_connectivity_changed: %p success=%d shutdown=%d", p, iomgr_success, p->shutdown); + if (p->shutdown) { gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); return; } else if (p->selected != NULL) { if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -228,7 +236,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, p->selected, &p->checking_connectivity, &p->connectivity_changed); } else { - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } } else { loop: @@ -242,7 +250,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(p->selected); GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first"); /* drop the pick list: we are connected now */ - GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1); /* update any calls that were waiting for a pick */ @@ -300,7 +308,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } else { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 2b874daef6..4208c15b3f 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -33,41 +33,59 @@ #include "src/core/client_config/lb_policy.h" +#define WEAK_REF_BITS 16 + void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; - gpr_ref_init(&policy->refs, 1); + gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); grpc_pollset_set_init(&policy->interested_parties); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, - const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason); +#define REF_FUNC_EXTRA_ARGS , const char *file, int line, const char *reason +#define REF_MUTATE_EXTRA_ARGS REF_FUNC_EXTRA_ARGS, const char *purpose +#define REF_FUNC_PASS_ARGS(new_reason) , file, line, new_reason +#define REF_MUTATE_PASS_ARGS(purpose) , file, line, reason, purpose #else -void grpc_lb_policy_ref(grpc_lb_policy *policy) { +#define REF_FUNC_EXTRA_ARGS +#define REF_MUTATE_EXTRA_ARGS +#define REF_FUNC_PASS_ARGS(new_reason) +#define REF_MUTATE_PASS_ARGS(x) #endif - gpr_ref(&policy->refs); -} +static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_unref(grpc_lb_policy *policy, - grpc_closure_list *closure_list, const char *file, - int line, const char *reason) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", - policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason); -#else -void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); #endif - if (gpr_unref(&policy->refs)) { - grpc_pollset_set_destroy(&policy->interested_parties); - policy->vtable->destroy(exec_ctx, policy); + return old_val; +} + +void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); +} + +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = ref_mutate(policy, (gpr_atm)1-(gpr_atm)(1 << WEAK_REF_BITS), 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); + gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); + gpr_atm check = 1 << WEAK_REF_BITS; + if ((old_val & mask) == check) { + policy->vtable->shutdown(exec_ctx, policy); } + grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); +} + +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); } -void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { - policy->vtable->shutdown(exec_ctx, policy); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); + if (old_val == 1) { + grpc_pollset_set_destroy(&policy->interested_parties); + policy->vtable->destroy(exec_ctx, policy); + } } int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 96b2bdf4ca..988789c934 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -47,7 +47,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; - gpr_refcount refs; + gpr_atm ref_pair; grpc_pollset_set interested_parties; }; @@ -78,29 +78,39 @@ struct grpc_lb_policy_vtable { grpc_closure *closure); }; +#define GRPC_LB_POLICY_REFCOUNT_DEBUG #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_WEAK_REF(p, r) \ + grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \ + grpc_lb_policy_weak_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason); void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const char *file, int line, const char *reason); +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line, + const char *reason); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p)) +#define GRPC_LB_POLICY_WEAK_REF(p, r) grpc_lb_policy_weak_ref((p)) +#define GRPC_LB_POLICY_WEAK_UNREF(cl, p, r) grpc_lb_policy_weak_unref((cl), (p)) void grpc_lb_policy_ref(grpc_lb_policy *policy); void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); +void grpc_lb_policy_weak_ref(grpc_lb_policy *policy); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); #endif /** called by concrete implementations to initialize the base struct */ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); -/** Start shutting down (fail any pending picks) */ -void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 434a37cf6b..9f802f1cc3 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -62,13 +62,19 @@ typedef struct { grpc_closure closure; - union { - grpc_subchannel *subchannel; - grpc_connected_subchannel *connected_subchannel; - } whom; + grpc_subchannel *subchannel; grpc_connectivity_state connectivity_state; } state_watcher; +typedef struct external_state_watcher { + grpc_subchannel *subchannel; + grpc_pollset_set *pollset_set; + grpc_closure *notify; + grpc_closure closure; + struct external_state_watcher *next; + struct external_state_watcher *prev; +} external_state_watcher; + struct grpc_subchannel { grpc_connector *connector; @@ -114,6 +120,8 @@ struct grpc_subchannel { /** connectivity state tracking */ grpc_connectivity_state_tracker state_tracker; + external_state_watcher root_external_state_watcher; + /** next connect attempt time */ gpr_timespec next_attempt; /** amount to backoff each failure */ @@ -201,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { +static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); @@ -277,6 +285,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); c->random = random_seed(); + c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -316,17 +325,16 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -typedef struct { - grpc_subchannel *subchannel; - grpc_pollset_set *pollset_set; - grpc_closure *notify; - grpc_closure closure; -} external_state_watcher; - static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + if (w->pollset_set != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + } + gpr_mu_lock(&w->subchannel->mu); + w->next->prev = w->prev; + w->prev->next = w->next; + gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); gpr_free(w); follow_up->cb(exec_ctx, follow_up->cb_arg, success); @@ -338,37 +346,47 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connectivity_state *state, grpc_closure *notify) { int do_connect = 0; - external_state_watcher *w = gpr_malloc(sizeof(*w)); - w->subchannel = c; - w->pollset_set = interested_parties; - w->notify = notify; - grpc_closure_init(&w->closure, on_external_state_watcher_done, w); - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); - GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); - gpr_mu_lock(&c->mu); - if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, &w->closure)) { - do_connect = 1; - c->connecting = 1; - /* released by connection */ - GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + external_state_watcher *w; + + if (state == NULL) { + gpr_mu_lock(&c->mu); + for (w = c->root_external_state_watcher.next; + w != &c->root_external_state_watcher; + w = w->next) { + if (w->notify == notify) { + grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, NULL, &w->closure); + } + } + gpr_mu_unlock(&c->mu); + } else { + w = gpr_malloc(sizeof(*w)); + w->subchannel = c; + w->pollset_set = interested_parties; + w->notify = notify; + grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + if (interested_parties != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + } + GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); + gpr_mu_lock(&c->mu); + w->next = &c->root_external_state_watcher; + w->prev = w->next->prev; + w->next->prev = w->prev->next = w; + if (grpc_connectivity_state_notify_on_state_change( + exec_ctx, &c->state_tracker, state, &w->closure)) { + do_connect = 1; + c->connecting = 1; + /* released by connection */ + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + } + gpr_mu_unlock(&c->mu); } - gpr_mu_unlock(&c->mu); if (do_connect) { start_connect(exec_ctx, c); } } -void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_closure *subscribed_notify) { - gpr_mu_lock(&c->mu); - grpc_connectivity_state_change_unsubscribe(exec_ctx, &c->state_tracker, - subscribed_notify); - gpr_mu_unlock(&c->mu); -} - void grpc_connected_subchannel_process_transport_op( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) { @@ -380,7 +398,7 @@ void grpc_connected_subchannel_process_transport_op( static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) { state_watcher *sw = p; - grpc_subchannel *c = sw->whom.subchannel; + grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; gpr_mu_lock(mu); @@ -423,16 +441,9 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) { - GPR_ASSERT(state != NULL); connected_subchannel_state_op(exec_ctx, con, state, closure); } -void grpc_connected_subchannel_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_closure *closure) { - connected_subchannel_state_op(exec_ctx, con, NULL, closure); -} - static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { size_t channel_stack_size; grpc_connected_subchannel *con; @@ -461,7 +472,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* initialize state watcher */ sw_subchannel = gpr_malloc(sizeof(*sw_subchannel)); - sw_subchannel->whom.subchannel = c; + sw_subchannel->subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 66c13990e9..b735d2ccab 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -124,15 +124,6 @@ void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); -/** Remove \a subscribed_notify from the list of closures to be called on a - * state change if present. */ -void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_closure *subscribed_notify); -void grpc_connected_subchannel_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, - grpc_closure *subscribed_notify); - /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index dc917ebbc0..8717aa5103 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -168,6 +168,7 @@ void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd); /* Reference counting for fds */ +#define GRPC_FD_REF_COUNT_DEBUG #ifdef GRPC_FD_REF_COUNT_DEBUG void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 212ce5534d..5bba40bc9b 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -116,6 +116,7 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); dump_objects("LEAKED"); + abort(); } break; } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index ff0c616386..91c50dd2cb 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -911,15 +911,9 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); if (op->on_connectivity_state_change != NULL) { - if (op->connectivity_state != NULL) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, - op->on_connectivity_state_change); - } else { - grpc_connectivity_state_change_unsubscribe( - exec_ctx, &t->channel_callback.state_tracker, - op->on_connectivity_state_change); - } + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, + op->on_connectivity_state_change); } if (op->send_goaway) { diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index a435ba49f2..569d3c30b2 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -98,42 +98,47 @@ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", - tracker, tracker->name, grpc_connectivity_state_name(*current), - grpc_connectivity_state_name(tracker->current_state), notify); + if (current == NULL) { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", + tracker, tracker->name, notify); + } else { + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", + tracker, tracker->name, grpc_connectivity_state_name(*current), + grpc_connectivity_state_name(tracker->current_state), notify); + } } - if (tracker->current_state != *current) { - *current = tracker->current_state; - grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + if (current == NULL) { + grpc_connectivity_state_watcher *w = tracker->watchers; + if (w != NULL && w->notify == notify) { + grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + tracker->watchers = w->next; + gpr_free(w); + return 0; + } + while (w != NULL) { + grpc_connectivity_state_watcher *rm_candidate = w->next; + if (rm_candidate != NULL && rm_candidate->notify == notify) { + grpc_exec_ctx_enqueue(exec_ctx, notify, 0); + w->next = w->next->next; + gpr_free(rm_candidate); + return 0; + } + w = w->next; + } + return 0; } else { - grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); - w->current = current; - w->notify = notify; - w->next = tracker->watchers; - tracker->watchers = w; - } - return tracker->current_state == GRPC_CHANNEL_IDLE; -} - -int grpc_connectivity_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, - grpc_closure *subscribed_notify) { - grpc_connectivity_state_watcher *w = tracker->watchers; - if (w != NULL && w->notify == subscribed_notify) { - tracker->watchers = w->next; - gpr_free(w); - return 1; - } - while (w != NULL) { - grpc_connectivity_state_watcher *rm_candidate = w->next; - if (rm_candidate != NULL && rm_candidate->notify == subscribed_notify) { - w->next = w->next->next; - gpr_free(rm_candidate); - return 1; + if (tracker->current_state != *current) { + *current = tracker->current_state; + grpc_exec_ctx_enqueue(exec_ctx, notify, 1); + } else { + grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); + w->current = current; + w->notify = notify; + w->next = tracker->watchers; + tracker->watchers = w; } - w = w->next; + return tracker->current_state == GRPC_CHANNEL_IDLE; } - return 0; } void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 119b1c1554..d8b5b38da0 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -73,16 +73,11 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); -/** Return 1 if the channel should start connecting, 0 otherwise */ +/** Return 1 if the channel should start connecting, 0 otherwise. + If current==NULL cancel notify if it is already queued (success==0 in that + case) */ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current, grpc_closure *notify); -/** Remove \a subscribed_notify from the list of closures to be called on a - * state change if present, returning 1. Otherwise, nothing is done and return - * 0. */ -int grpc_connectivity_state_change_unsubscribe( - grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, - grpc_closure *subscribed_notify); - #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 4de72d7d75..08f34ff0aa 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,6 +50,8 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; +#define GRPC_STREAM_REFCOUNT_DEBUG + typedef struct grpc_stream_refcount { gpr_refcount refs; grpc_closure destroy; -- cgit v1.2.3 From 54914ee42965bb0b5a1382b1b411a5ccb94012e3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 1 Dec 2015 06:23:44 -0800 Subject: Turn off refcount debugging --- src/core/client_config/lb_policies/round_robin.c | 2 -- src/core/client_config/lb_policy.h | 2 +- src/core/client_config/subchannel.c | 2 -- src/core/transport/transport.h | 2 +- 4 files changed, 2 insertions(+), 6 deletions(-) (limited to 'src/core/transport') diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index 457b5d9252..f0b8ebe6fe 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -231,8 +231,6 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pending_pick *pp; size_t i; -gpr_log(GPR_DEBUG, "LB_POLICY: rr_shutdown: p=%p num_subchannels=%d", p, p->num_subchannels); - gpr_mu_lock(&p->mu); p->shutdown = 1; diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 988789c934..6754661745 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -78,7 +78,7 @@ struct grpc_lb_policy_vtable { grpc_closure *closure); }; -#define GRPC_LB_POLICY_REFCOUNT_DEBUG +/*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a97c0b823c..9f802f1cc3 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -348,8 +348,6 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, int do_connect = 0; external_state_watcher *w; - gpr_log(GPR_DEBUG, "sc.nosc: c=%p s=%p n=%p", c, state, notify); - if (state == NULL) { gpr_mu_lock(&c->mu); for (w = c->root_external_state_watcher.next; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 08f34ff0aa..f94f0ae76e 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,7 +50,7 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -#define GRPC_STREAM_REFCOUNT_DEBUG +/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ typedef struct grpc_stream_refcount { gpr_refcount refs; -- cgit v1.2.3 From 1d881fbed6601f34ad0919cbb10d7367fe431148 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 1 Dec 2015 07:39:04 -0800 Subject: clang-format --- src/core/channel/channel_stack.c | 3 +- src/core/channel/channel_stack.h | 3 +- src/core/channel/client_channel.c | 30 +++++++---- src/core/client_config/lb_policies/pick_first.c | 34 +++++++----- src/core/client_config/lb_policies/round_robin.c | 61 +++++++++------------- src/core/client_config/lb_policy.c | 26 ++++++--- src/core/client_config/lb_policy.h | 4 +- src/core/client_config/subchannel.c | 60 ++++++++++++--------- src/core/client_config/subchannel.h | 18 +++---- src/core/iomgr/pollset_set.h | 8 +-- src/core/iomgr/pollset_set_posix.c | 10 ++-- src/core/iomgr/pollset_set_windows.c | 12 ++--- src/core/surface/channel.c | 3 +- src/core/transport/connectivity_state.c | 8 +-- src/core/transport/connectivity_state.h | 2 +- test/core/client_config/lb_policies_test.c | 13 +++-- .../set_initial_connect_string_test.c | 8 +-- test/core/end2end/fixtures/h2_uchannel.c | 12 +++-- test/core/end2end/tests/hpack_size.c | 6 +-- 19 files changed, 177 insertions(+), 144 deletions(-) (limited to 'src/core/transport') diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index d2f6a90ca8..5e09a050ee 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -106,8 +106,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *channel_args, - const char *name, - grpc_channel_stack *stack) { + const char *name, grpc_channel_stack *stack) { size_t call_size = ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index bb7081b2a2..c01050e717 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -183,8 +183,7 @@ void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args, - const char *name, - grpc_channel_stack *stack); + const char *name, grpc_channel_stack *stack); /* Destroy a channel stack */ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_channel_stack *stack); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 9d3690dc7d..60f2acb9ef 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -120,7 +120,8 @@ static void on_lb_policy_state_changed_locked( /* check if the notification is for a stale policy */ if (w->lb_policy != w->chand->lb_policy) return; - if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && w->chand->resolver != NULL) { + if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && + w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); } @@ -180,7 +181,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, + &chand->interested_parties); } gpr_mu_lock(&chand->mu_config); @@ -226,7 +228,9 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &old_lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -265,7 +269,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -401,7 +407,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); + grpc_pollset_set_del_pollset_set(exec_ctx, + &chand->lb_policy->interested_parties, + &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); @@ -472,11 +480,14 @@ typedef struct { grpc_closure my_closure; } external_connectivity_watcher; -static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { +static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset); - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, + w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); gpr_free(w); follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); } @@ -491,7 +502,8 @@ void grpc_client_channel_watch_connectivity_state( w->on_complete = on_complete; grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); grpc_closure_init(&w->my_closure, on_external_watch_complete, w); - GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); + GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, + "external_connectivity_watcher"); gpr_mu_lock(&chand->mu_config); grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, state, &w->my_closure); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index acaf356df4..3bfa7a86fb 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -102,15 +102,19 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); if (p->selected != NULL) { - grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected, NULL, &p->connectivity_changed); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, p->selected, NULL, &p->connectivity_changed); } else { - grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); + grpc_subchannel_notify_on_state_change( + exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, + &p->connectivity_changed); } gpr_mu_unlock(&p->mu); while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); pp = next; @@ -127,7 +131,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -147,8 +152,8 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); } void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { @@ -174,7 +179,8 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -254,7 +260,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } @@ -273,8 +280,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); } else { goto loop; } @@ -286,8 +293,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); + &p->base.interested_parties, &p->checking_connectivity, + &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: p->num_subchannels--; @@ -305,7 +312,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, + "pick_first_connectivity"); } else { grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index f0b8ebe6fe..b86dba20ee 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -244,9 +244,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, - NULL, - NULL, + grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); } gpr_mu_unlock(&p->mu); @@ -262,7 +260,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -279,15 +278,15 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { size_t i; p->started_picking = 1; - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, p->num_subchannels); + gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, + p->num_subchannels); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; sd->connectivity_state = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + grpc_subchannel_notify_on_state_change( + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } } @@ -323,7 +322,8 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, + pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -355,8 +355,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_READY, "connecting_ready"); /* add the newly connected subchannel to the list of connected ones. * Note that it goes to the "end of the line". */ - sd->ready_list_node = - add_connected_sc_locked(p, sd->subchannel); + sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ @@ -375,34 +374,29 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, - sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - sd->connectivity_state, "connecting_changed"); + sd->connectivity_state, + "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, - &p->base.interested_parties, - &sd->connectivity_state, - &sd->connectivity_changed_closure); + exec_ctx, sd->subchannel, &p->base.interested_parties, + &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ if (sd->ready_list_node != NULL) { @@ -415,16 +409,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, break; case GRPC_CHANNEL_FATAL_FAILURE: if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked( - p, sd->ready_list_node); + remove_disconnected_sc_locked(p, sd->ready_list_node); sd->ready_list_node = NULL; } p->num_subchannels--; GPR_SWAP(subchannel_data *, p->subchannels[sd->index], p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, - "round_robin"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); p->subchannels[sd->index]->index = sd->index; gpr_free(sd); @@ -491,8 +483,7 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); p->num_subchannels = args->num_subchannels; - p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); + p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels); memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); @@ -505,8 +496,8 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, sd->policy = p; sd->index = i; sd->subchannel = args->subchannels[i]; - grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, - sd); + grpc_closure_init(&sd->connectivity_changed_closure, + rr_connectivity_changed, sd); } /* The (dummy node) root of the ready list */ diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 4208c15b3f..d254161546 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -54,10 +54,14 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, #define REF_MUTATE_PASS_ARGS(x) #endif -static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); +static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, + int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) + : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, + old_val + delta, reason); #endif return old_val; } @@ -66,22 +70,28 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); } -void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { - gpr_atm old_val = ref_mutate(policy, (gpr_atm)1-(gpr_atm)(1 << WEAK_REF_BITS), 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); +void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = + ref_mutate(policy, (gpr_atm)1 - (gpr_atm)(1 << WEAK_REF_BITS), + 1 REF_MUTATE_PASS_ARGS("STRONG_UNREF")); gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { policy->vtable->shutdown(exec_ctx, policy); } - grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); + grpc_lb_policy_weak_unref(exec_ctx, + policy REF_FUNC_PASS_ARGS("strong-unref")); } void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { ref_mutate(policy, 1, 0 REF_MUTATE_PASS_ARGS("WEAK_REF")); } -void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { - gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); +void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { + gpr_atm old_val = + ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { grpc_pollset_set_destroy(&policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 6754661745..2f8d655558 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -93,9 +93,9 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const char *file, int line, const char *reason); void grpc_lb_policy_weak_ref(grpc_lb_policy *policy, const char *file, int line, - const char *reason); + const char *reason); void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const char *file, int line, const char *reason); + const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) #define GRPC_LB_POLICY_UNREF(cl, p, r) grpc_lb_policy_unref((cl), (p)) diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a0e51d57ec..d89a5e9be1 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -78,9 +78,9 @@ typedef struct external_state_watcher { struct grpc_subchannel { grpc_connector *connector; - /** refcount - - lower INTERNAL_REF_BITS bits are for internal references: - these do not keep the subchannel open. + /** refcount + - lower INTERNAL_REF_BITS bits are for internal references: + these do not keep the subchannel open. - upper remaining bits are for public references: these do keep the subchannel open */ gpr_atm ref_pair; @@ -155,7 +155,8 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, #define UNREF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \ (name), (p), (p)->refs.count, (p)->refs.count - 1, reason) -#define REF_MUTATE_EXTRA_ARGS GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose +#define REF_MUTATE_EXTRA_ARGS \ + GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose #define REF_MUTATE_PURPOSE(x) , file, line, reason, x #else #define REF_REASON "" @@ -209,21 +210,27 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); +static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, + int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) + : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_STREAM_REFCOUNT_DEBUG - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, old_val + delta, reason); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, + old_val + delta, reason); #endif return old_val; } void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; - old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); + old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), + 0 REF_MUTATE_PURPOSE("STRONG_REF")); GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); } -void grpc_subchannel_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_weak_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); @@ -246,7 +253,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; - old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); + old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), + 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { disconnect(exec_ctx, c); } @@ -254,7 +262,8 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, } void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 0) { @@ -286,7 +295,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); c->random = random_seed(); - c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; + c->root_external_state_watcher.next = c->root_external_state_watcher.prev = + &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); @@ -326,11 +336,13 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { +static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, + int success) { external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, + w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); w->next->prev = w->prev; @@ -341,21 +353,20 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, i follow_up->cb(exec_ctx, follow_up->cb_arg, success); } -void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset_set *interested_parties, - grpc_connectivity_state *state, - grpc_closure *notify) { +void grpc_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, + grpc_closure *notify) { int do_connect = 0; external_state_watcher *w; if (state == NULL) { gpr_mu_lock(&c->mu); - for (w = c->root_external_state_watcher.next; - w != &c->root_external_state_watcher; - w = w->next) { + for (w = c->root_external_state_watcher.next; + w != &c->root_external_state_watcher; w = w->next) { if (w->notify == notify) { - grpc_connectivity_state_notify_on_state_change(exec_ctx, &c->state_tracker, NULL, &w->closure); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &c->state_tracker, NULL, &w->closure); } } gpr_mu_unlock(&c->mu); @@ -366,7 +377,8 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); if (interested_parties != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, + interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b735d2ccab..e45708f91d 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -68,7 +68,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) -#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) grpc_subchannel_weak_unref((cl), (p)) +#define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \ + grpc_subchannel_weak_unref((cl), (p)) #define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p)) #define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \ grpc_connected_subchannel_unref((cl), (p)) @@ -84,10 +85,10 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_ref(grpc_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, @@ -115,11 +116,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the channel */ -void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset_set *interested_parties, - grpc_connectivity_state *state, - grpc_closure *notify); +void grpc_subchannel_notify_on_state_change( + grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, + grpc_closure *notify); void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_connectivity_state *state, grpc_closure *notify); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index e93a3dbb56..09c04438f7 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -58,10 +58,10 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_pollset *pollset); void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item); + grpc_pollset_set *bag, + grpc_pollset_set *item); void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item); + grpc_pollset_set *bag, + grpc_pollset_set *item); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 7f0b34c36b..4ec92202e3 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -107,8 +107,9 @@ void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bag->mu); if (bag->pollset_set_count == bag->pollset_set_capacity) { bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = gpr_realloc(bag->pollset_sets, - bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + bag->pollset_sets = + gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); } bag->pollset_sets[bag->pollset_set_count++] = item; for (i = 0, j = 0; i < bag->fd_count; i++) { @@ -131,9 +132,8 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, for (i = 0; i < bag->pollset_set_count; i++) { if (bag->pollset_sets[i] == item) { bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, - bag->pollset_sets[i], - bag->pollset_sets[bag->pollset_set_count]); + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); break; } } diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 04d88839cb..157b46ec32 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -49,12 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} -void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} -void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 92fd3dadd7..b4959069c6 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -152,7 +152,8 @@ grpc_channel *grpc_channel_create_from_filters( } grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters, - num_filters, args, is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", + num_filters, args, + is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 569d3c30b2..c409d983da 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -99,11 +99,11 @@ int grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state *current, grpc_closure *notify) { if (grpc_connectivity_state_trace) { if (current == NULL) { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", - tracker, tracker->name, notify); + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: unsubscribe notify=%p", tracker, + tracker->name, notify); } else { - gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", - tracker, tracker->name, grpc_connectivity_state_name(*current), + gpr_log(GPR_DEBUG, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker, + tracker->name, grpc_connectivity_state_name(*current), grpc_connectivity_state_name(tracker->current_state), notify); } } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index d8b5b38da0..1f3a130e90 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -74,7 +74,7 @@ grpc_connectivity_state grpc_connectivity_state_check( grpc_connectivity_state_tracker *tracker); /** Return 1 if the channel should start connecting, 0 otherwise. - If current==NULL cancel notify if it is already queued (success==0 in that + If current==NULL cancel notify if it is already queued (success==0 in that case) */ int grpc_connectivity_state_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connectivity_state_tracker *tracker, diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c index 5aa8140e08..6f218e7f08 100644 --- a/test/core/client_config/lb_policies_test.c +++ b/test/core/client_config/lb_policies_test.c @@ -135,9 +135,8 @@ static void kill_server(const servers_fixture *f, size_t i) { gpr_log(GPR_INFO, "KILLING SERVER %d", i); GPR_ASSERT(f->servers[i] != NULL); grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); - GPR_ASSERT( - grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL) - .type == GRPC_OP_COMPLETE); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); f->servers[i] = NULL; } @@ -203,8 +202,8 @@ static void teardown_servers(servers_fixture *f) { if (f->servers[i] == NULL) continue; grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), - n_millis_time(5000), NULL) - .type == GRPC_OP_COMPLETE); + n_millis_time(5000), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); } grpc_completion_queue_shutdown(f->cq); @@ -304,8 +303,8 @@ int *perform_request(servers_fixture *f, grpc_channel *client, s_idx = -1; while ((ev = grpc_completion_queue_next( - f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)) - .type != GRPC_QUEUE_TIMEOUT) { + f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)).type != + GRPC_QUEUE_TIMEOUT) { GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); read_tag = ((int)(gpr_intptr)ev.tag); gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d", diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index 13517b7c1f..ceca56c833 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -66,15 +66,15 @@ static grpc_closure on_read; static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) { GPR_ASSERT(success); - gpr_slice_buffer_move_into( - &state.temp_incoming_buffer, &state.incoming_buffer); + gpr_slice_buffer_move_into(&state.temp_incoming_buffer, + &state.incoming_buffer); if (state.incoming_buffer.length > strlen(magic_connect_string)) { state.done = 1; grpc_endpoint_shutdown(exec_ctx, state.tcp); grpc_endpoint_destroy(exec_ctx, state.tcp); } else { - grpc_endpoint_read( - exec_ctx, state.tcp, &state.temp_incoming_buffer, &on_read); + grpc_endpoint_read(exec_ctx, state.tcp, &state.temp_incoming_buffer, + &on_read); } } diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 6123bf061e..dd9a625ac2 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -239,7 +239,8 @@ grpc_pollset_set g_interested_parties; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, int success) { if (g_state != GRPC_CHANNEL_READY) { grpc_subchannel_notify_on_state_change( - exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg)); + exec_ctx, arg, &g_interested_parties, &g_state, + grpc_closure_create(state_changed, arg)); } } @@ -253,7 +254,8 @@ static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { grpc_pollset_init(&pollset); grpc_pollset_set_init(&g_interested_parties); grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state, + grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, + &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); @@ -330,9 +332,9 @@ static void chttp2_tear_down_micro_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/micro_fullstack", 0, - chttp2_create_fixture_micro_fullstack, chttp2_init_client_micro_fullstack, - chttp2_init_server_micro_fullstack, chttp2_tear_down_micro_fullstack}, + {"chttp2/micro_fullstack", 0, chttp2_create_fixture_micro_fullstack, + chttp2_init_client_micro_fullstack, chttp2_init_server_micro_fullstack, + chttp2_tear_down_micro_fullstack}, }; int main(int argc, char **argv) { diff --git a/test/core/end2end/tests/hpack_size.c b/test/core/end2end/tests/hpack_size.c index 0b1a9faad8..f16883ecfd 100644 --- a/test/core/end2end/tests/hpack_size.c +++ b/test/core/end2end/tests/hpack_size.c @@ -262,9 +262,9 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) - .type == GRPC_OP_COMPLETE); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + NULL).type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; } -- cgit v1.2.3