diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/alarm.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 13 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 3 | ||||
-rw-r--r-- | src/core/security/secure_endpoint.c | 11 | ||||
-rw-r--r-- | src/core/security/server_secure_chttp2.c | 11 | ||||
-rw-r--r-- | src/core/surface/call.c | 2 | ||||
-rw-r--r-- | src/core/surface/channel.c | 10 | ||||
-rw-r--r-- | src/core/surface/channel.h | 7 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 9 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 3 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 9 | ||||
-rw-r--r-- | src/core/surface/server.c | 3 | ||||
-rw-r--r-- | src/core/surface/server.h | 1 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 10 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.h | 1 |
17 files changed, 39 insertions, 61 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 8ea774bebc..c9a200eb8c 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -97,7 +97,7 @@ static void add_interested_parties_locked(pick_first_lb_policy *p, void pf_destroy(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; - GPR_ASSERT(p->shutdown); + GPR_ASSERT(p->pending_picks == NULL); for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", call_list); } @@ -180,7 +180,9 @@ static void pf_connectivity_changed(void *arg, int iomgr_success, gpr_mu_lock(&p->mu); if (p->shutdown) { + gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list); + return; } else if (p->selected != NULL) { grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, "selected_changed", call_list); diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 146bda477d..da265d4e50 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -224,7 +224,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm, grpc_call_list *call_list) { shard_type *shard = &g_shards[shard_idx(alarm)]; gpr_mu_lock(&shard->mu); if (!alarm->triggered) { - grpc_call_list_add(call_list, &alarm->closure, 1); + grpc_call_list_add(call_list, &alarm->closure, 0); alarm->triggered = 1; if (alarm->heap_index == INVALID_HEAP_INDEX) { list_remove(alarm); diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index f435e2d3f9..607cba6181 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -36,7 +36,6 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset.h" -#include "src/core/iomgr/workqueue.h" #include <grpc/support/atm.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 029c689982..5cb20a7ba0 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -194,13 +194,14 @@ int grpc_call_list_empty(grpc_call_list call_list) { } void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst) { - if (dst->head == NULL) { - *dst = *src; - return; - } if (src->head == NULL) { return; } - dst->tail->next = src->head; - dst->tail = src->tail; + if (dst->head == NULL) { + *dst = *src; + } else { + dst->tail->next = src->head; + dst->tail = src->tail; + } + src->head = src->tail = NULL; } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 1040716179..daca2f6daa 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -205,7 +205,7 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, added_worker = 1; gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); pollset->vtable->maybe_work_and_unlock(pollset, worker, deadline, now, - NULL); + call_list); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); } else { @@ -400,6 +400,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, up_args = gpr_malloc(sizeof(*up_args)); up_args->fd = fd; up_args->original_vtable = pollset->vtable; + up_args->pollset = pollset; up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index a501544341..a647345796 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -82,16 +82,17 @@ static void destroy(secure_endpoint *secure_ep, grpc_call_list *call_list) { /*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/ #ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG -#define SECURE_ENDPOINT_UNREF(ep, reason) \ - secure_endpoint_unref((ep), (reason), __FILE__, __LINE__) +#define SECURE_ENDPOINT_UNREF(ep, reason, cl) \ + secure_endpoint_unref((ep), (cl), (reason), __FILE__, __LINE__) #define SECURE_ENDPOINT_REF(ep, reason) \ secure_endpoint_ref((ep), (reason), __FILE__, __LINE__) -static void secure_endpoint_unref(secure_endpoint *ep, const char *reason, +static void secure_endpoint_unref(secure_endpoint *ep, + grpc_call_list *call_list, const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d", ep, reason, ep->ref.count, ep->ref.count - 1); if (gpr_unref(&ep->ref)) { - destroy(ep); + destroy(ep, call_list); } } @@ -219,6 +220,7 @@ static void endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices, ep->read_buffer = slices; gpr_slice_buffer_reset_and_unref(ep->read_buffer); + SECURE_ENDPOINT_REF(ep, "read"); if (ep->leftover_bytes.count) { gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer); GPR_ASSERT(ep->leftover_bytes.count == 0); @@ -226,7 +228,6 @@ static void endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices, return; } - SECURE_ENDPOINT_REF(ep, "read"); grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, call_list); } diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 0662839105..829082507c 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -87,8 +87,7 @@ static void state_unref(grpc_server_secure_state *state) { } static void setup_transport(void *statep, grpc_transport *transport, - grpc_mdctx *mdctx, grpc_workqueue *workqueue, - grpc_call_list *call_list) { + grpc_mdctx *mdctx, grpc_call_list *call_list) { static grpc_channel_filter const *extra_filters[] = { &grpc_server_auth_filter, &grpc_http_server_filter}; grpc_server_secure_state *state = statep; @@ -101,8 +100,8 @@ static void setup_transport(void *statep, grpc_transport *transport, grpc_server_get_channel_args(state->server), args_to_add, GPR_ARRAY_SIZE(args_to_add)); grpc_server_setup_transport(state->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, - args_copy, call_list); + GPR_ARRAY_SIZE(extra_filters), mdctx, args_copy, + call_list); grpc_channel_args_destroy(args_copy); } @@ -135,17 +134,15 @@ static void on_secure_transport_setup_done(void *statep, grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; - grpc_workqueue *workqueue; if (status == GRPC_SECURITY_OK) { gpr_mu_lock(&state->mu); remove_tcp_from_list_locked(state, wrapped_endpoint); if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); - workqueue = grpc_workqueue_create(call_list); transport = grpc_create_chttp2_transport( grpc_server_get_channel_args(state->server), secure_endpoint, mdctx, 0, call_list); - setup_transport(state, transport, mdctx, workqueue, call_list); + setup_transport(state, transport, mdctx, call_list); grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list); } else { /* We need to consume this here, because the server may already have gone diff --git a/src/core/surface/call.c b/src/core/surface/call.c index bbaf65759d..9920e0daaf 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -650,8 +650,6 @@ static void unlock(grpc_call *call, grpc_call_list *call_list) { if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) { call->bound_pollset = 1; op.bind_pollset = grpc_cq_pollset(call->cq); - grpc_workqueue_add_to_pollset(grpc_channel_get_workqueue(call->channel), - op.bind_pollset, call_list); start_op = 1; } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 46bea13936..b00096f6fa 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -78,7 +78,6 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call *registered_calls; char *target; - grpc_workqueue *workqueue; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -92,8 +91,8 @@ struct grpc_channel { grpc_channel *grpc_channel_create_from_filters( const char *target, const grpc_channel_filter **filters, size_t num_filters, - const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue, - int is_client, grpc_call_list *call_list) { + const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client, + grpc_call_list *call_list) { size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); @@ -105,7 +104,6 @@ grpc_channel *grpc_channel_create_from_filters( /* decremented by grpc_channel_destroy */ gpr_ref_init(&channel->refs, 1); channel->metadata_context = mdctx; - channel->workqueue = workqueue; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0); channel->grpc_compression_algorithm_string = grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); @@ -371,7 +369,3 @@ grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) { return channel->max_message_length; } - -grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel) { - return channel->workqueue; -} diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 3f51164fcc..2a249c9c51 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -36,12 +36,11 @@ #include "src/core/channel/channel_stack.h" #include "src/core/client_config/subchannel_factory.h" -#include "src/core/iomgr/workqueue.h" grpc_channel *grpc_channel_create_from_filters( const char *target, const grpc_channel_filter **filters, size_t count, - const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue, - int is_client, grpc_call_list *call_list); + const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client, + grpc_call_list *call_list); /** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); @@ -63,8 +62,6 @@ grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); -grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel); - #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_channel *channel, const char *reason, diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 7ac76da10a..f48fa4ed83 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -59,7 +59,6 @@ typedef struct { grpc_endpoint *tcp; grpc_mdctx *mdctx; - grpc_workqueue *workqueue; grpc_closure connected; } connector; @@ -73,7 +72,6 @@ static void connector_unref(grpc_connector *con, grpc_call_list *call_list) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { grpc_mdctx_unref(c->mdctx); - GRPC_WORKQUEUE_UNREF(c->workqueue, "connector", call_list); gpr_free(c); } } @@ -156,6 +154,8 @@ static grpc_subchannel *subchannel_factory_create_subchannel( grpc_subchannel *s; memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; + c->mdctx = f->mdctx; + grpc_mdctx_ref(c->mdctx); gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; @@ -184,7 +184,6 @@ grpc_channel *grpc_insecure_channel_create(const char *target, subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_call_list call_list = GRPC_CALL_LIST_INIT; - grpc_workqueue *workqueue = grpc_workqueue_create(&call_list); size_t n = 0; GPR_ASSERT(!reserved); if (grpc_channel_args_is_census_enabled(args)) { @@ -194,8 +193,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx, - workqueue, 1, &call_list); + channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1, + &call_list); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index c5cf33f1f9..2af028960d 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -157,8 +157,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, grpc_call_list call_list = GRPC_CALL_LIST_INIT; static const grpc_channel_filter *filters[] = {&lame_filter}; channel = grpc_channel_create_from_filters( - target, filters, 1, NULL, grpc_mdctx_create(), - grpc_workqueue_create(&call_list), 1, &call_list); + target, filters, 1, NULL, grpc_mdctx_create(), 1, &call_list); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); GPR_ASSERT(elem->filter == &lame_filter); chand = (channel_data *)elem->channel_data; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index b5b9ee173e..9f10cd29fd 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -69,7 +69,6 @@ typedef struct { grpc_closure connected_closure; grpc_mdctx *mdctx; - grpc_workqueue *workqueue; } connector; static void connector_ref(grpc_connector *con) { @@ -81,7 +80,6 @@ static void connector_unref(grpc_connector *con, grpc_call_list *call_list) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { grpc_mdctx_unref(c->mdctx); - GRPC_WORKQUEUE_UNREF(c->workqueue, "connector", call_list); gpr_free(c); } } @@ -214,11 +212,10 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->security_connector = f->security_connector; c->mdctx = f->mdctx; grpc_mdctx_ref(c->mdctx); - c->workqueue = grpc_channel_get_workqueue(f->master); - GRPC_WORKQUEUE_REF(c->workqueue, "connector"); gpr_ref_init(&c->refs, 1); args->args = final_args; args->master = f->master; + args->mdctx = f->mdctx; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base, call_list); grpc_channel_args_destroy(final_args); @@ -243,7 +240,6 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, grpc_channel_args *new_args_from_connector; grpc_channel_security_connector *connector; grpc_mdctx *mdctx; - grpc_workqueue *workqueue; grpc_resolver *resolver; subchannel_factory *f; #define MAX_FILTERS 3 @@ -267,7 +263,6 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, "Failed to create security connector."); } mdctx = grpc_mdctx_create(); - workqueue = grpc_workqueue_create(&call_list); connector_arg = grpc_security_connector_to_arg(&connector->base); args_copy = grpc_channel_args_copy_and_add( @@ -281,7 +276,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(target, filters, n, args_copy, - mdctx, workqueue, 1, &call_list); + mdctx, 1, &call_list); f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 24545c67e1..4249e6f9bd 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -907,7 +907,6 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, - grpc_workqueue *workqueue, const grpc_channel_args *args, grpc_call_list *call_list) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; @@ -943,7 +942,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, } channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, - mdctx, workqueue, 0, call_list); + mdctx, 0, call_list); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 2f2c5b8948..d08a038639 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -58,7 +58,6 @@ void grpc_server_add_listener( void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport, grpc_channel_filter const **extra_filters, size_t num_extra_filters, grpc_mdctx *mdctx, - grpc_workqueue *workqueue, const grpc_channel_args *args, grpc_call_list *call_list); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index df63d99dea..395c88827d 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -43,12 +43,11 @@ #include <grpc/support/useful.h> static void setup_transport(void *server, grpc_transport *transport, - grpc_mdctx *mdctx, grpc_workqueue *workqueue, - grpc_call_list *call_list) { + grpc_mdctx *mdctx, grpc_call_list *call_list) { static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; grpc_server_setup_transport(server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, + GPR_ARRAY_SIZE(extra_filters), mdctx, grpc_server_get_channel_args(server), call_list); } @@ -62,10 +61,9 @@ static void new_transport(void *server, grpc_endpoint *tcp, * case. */ grpc_mdctx *mdctx = grpc_mdctx_create(); - grpc_workqueue *workqueue = grpc_workqueue_create(call_list); grpc_transport *transport = grpc_create_chttp2_transport( grpc_server_get_channel_args(server), tcp, mdctx, 0, call_list); - setup_transport(server, transport, mdctx, workqueue, call_list); + setup_transport(server, transport, mdctx, call_list); grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list); } @@ -144,5 +142,5 @@ error: done: grpc_call_list_run(&call_list); - return 0; + return port_num; } diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h index 3f92b22d5d..6ade8e19f7 100644 --- a/src/core/transport/connectivity_state.h +++ b/src/core/transport/connectivity_state.h @@ -36,7 +36,6 @@ #include <grpc/grpc.h> #include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/workqueue.h" typedef struct grpc_connectivity_state_watcher { /** we keep watchers in a linked list */ |