aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c4
-rw-r--r--src/core/iomgr/alarm.c2
-rw-r--r--src/core/iomgr/fd_posix.h1
-rw-r--r--src/core/iomgr/iomgr.c13
-rw-r--r--src/core/iomgr/pollset_posix.c3
-rw-r--r--src/core/security/secure_endpoint.c11
-rw-r--r--src/core/security/server_secure_chttp2.c11
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/surface/channel.c10
-rw-r--r--src/core/surface/channel.h7
-rw-r--r--src/core/surface/channel_create.c9
-rw-r--r--src/core/surface/lame_client.c3
-rw-r--r--src/core/surface/secure_channel_create.c9
-rw-r--r--src/core/surface/server.c3
-rw-r--r--src/core/surface/server.h1
-rw-r--r--src/core/surface/server_chttp2.c10
-rw-r--r--src/core/transport/connectivity_state.h1
17 files changed, 39 insertions, 61 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 8ea774bebc..c9a200eb8c 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -97,7 +97,7 @@ static void add_interested_parties_locked(pick_first_lb_policy *p,
void pf_destroy(grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
- GPR_ASSERT(p->shutdown);
+ GPR_ASSERT(p->pending_picks == NULL);
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", call_list);
}
@@ -180,7 +180,9 @@ static void pf_connectivity_changed(void *arg, int iomgr_success,
gpr_mu_lock(&p->mu);
if (p->shutdown) {
+ gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list);
+ return;
} else if (p->selected != NULL) {
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
"selected_changed", call_list);
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index 146bda477d..da265d4e50 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -224,7 +224,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm, grpc_call_list *call_list) {
shard_type *shard = &g_shards[shard_idx(alarm)];
gpr_mu_lock(&shard->mu);
if (!alarm->triggered) {
- grpc_call_list_add(call_list, &alarm->closure, 1);
+ grpc_call_list_add(call_list, &alarm->closure, 0);
alarm->triggered = 1;
if (alarm->heap_index == INVALID_HEAP_INDEX) {
list_remove(alarm);
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index f435e2d3f9..607cba6181 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -36,7 +36,6 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
-#include "src/core/iomgr/workqueue.h"
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 029c689982..5cb20a7ba0 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -194,13 +194,14 @@ int grpc_call_list_empty(grpc_call_list call_list) {
}
void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst) {
- if (dst->head == NULL) {
- *dst = *src;
- return;
- }
if (src->head == NULL) {
return;
}
- dst->tail->next = src->head;
- dst->tail = src->tail;
+ if (dst->head == NULL) {
+ *dst = *src;
+ } else {
+ dst->tail->next = src->head;
+ dst->tail = src->tail;
+ }
+ src->head = src->tail = NULL;
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 1040716179..daca2f6daa 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -205,7 +205,7 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
added_worker = 1;
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
pollset->vtable->maybe_work_and_unlock(pollset, worker, deadline, now,
- NULL);
+ call_list);
locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
} else {
@@ -400,6 +400,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args = gpr_malloc(sizeof(*up_args));
up_args->fd = fd;
up_args->original_vtable = pollset->vtable;
+ up_args->pollset = pollset;
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index a501544341..a647345796 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -82,16 +82,17 @@ static void destroy(secure_endpoint *secure_ep, grpc_call_list *call_list) {
/*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/
#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
-#define SECURE_ENDPOINT_UNREF(ep, reason) \
- secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
+#define SECURE_ENDPOINT_UNREF(ep, reason, cl) \
+ secure_endpoint_unref((ep), (cl), (reason), __FILE__, __LINE__)
#define SECURE_ENDPOINT_REF(ep, reason) \
secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
-static void secure_endpoint_unref(secure_endpoint *ep, const char *reason,
+static void secure_endpoint_unref(secure_endpoint *ep,
+ grpc_call_list *call_list, const char *reason,
const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d",
ep, reason, ep->ref.count, ep->ref.count - 1);
if (gpr_unref(&ep->ref)) {
- destroy(ep);
+ destroy(ep, call_list);
}
}
@@ -219,6 +220,7 @@ static void endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices,
ep->read_buffer = slices;
gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+ SECURE_ENDPOINT_REF(ep, "read");
if (ep->leftover_bytes.count) {
gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
GPR_ASSERT(ep->leftover_bytes.count == 0);
@@ -226,7 +228,6 @@ static void endpoint_read(grpc_endpoint *secure_ep, gpr_slice_buffer *slices,
return;
}
- SECURE_ENDPOINT_REF(ep, "read");
grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read,
call_list);
}
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 0662839105..829082507c 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -87,8 +87,7 @@ static void state_unref(grpc_server_secure_state *state) {
}
static void setup_transport(void *statep, grpc_transport *transport,
- grpc_mdctx *mdctx, grpc_workqueue *workqueue,
- grpc_call_list *call_list) {
+ grpc_mdctx *mdctx, grpc_call_list *call_list) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_server_auth_filter, &grpc_http_server_filter};
grpc_server_secure_state *state = statep;
@@ -101,8 +100,8 @@ static void setup_transport(void *statep, grpc_transport *transport,
grpc_server_get_channel_args(state->server), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
grpc_server_setup_transport(state->server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
- args_copy, call_list);
+ GPR_ARRAY_SIZE(extra_filters), mdctx, args_copy,
+ call_list);
grpc_channel_args_destroy(args_copy);
}
@@ -135,17 +134,15 @@ static void on_secure_transport_setup_done(void *statep,
grpc_server_secure_state *state = statep;
grpc_transport *transport;
grpc_mdctx *mdctx;
- grpc_workqueue *workqueue;
if (status == GRPC_SECURITY_OK) {
gpr_mu_lock(&state->mu);
remove_tcp_from_list_locked(state, wrapped_endpoint);
if (!state->is_shutdown) {
mdctx = grpc_mdctx_create();
- workqueue = grpc_workqueue_create(call_list);
transport = grpc_create_chttp2_transport(
grpc_server_get_channel_args(state->server), secure_endpoint, mdctx,
0, call_list);
- setup_transport(state, transport, mdctx, workqueue, call_list);
+ setup_transport(state, transport, mdctx, call_list);
grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list);
} else {
/* We need to consume this here, because the server may already have gone
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index bbaf65759d..9920e0daaf 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -650,8 +650,6 @@ static void unlock(grpc_call *call, grpc_call_list *call_list) {
if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) {
call->bound_pollset = 1;
op.bind_pollset = grpc_cq_pollset(call->cq);
- grpc_workqueue_add_to_pollset(grpc_channel_get_workqueue(call->channel),
- op.bind_pollset, call_list);
start_op = 1;
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 46bea13936..b00096f6fa 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -78,7 +78,6 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
char *target;
- grpc_workqueue *workqueue;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -92,8 +91,8 @@ struct grpc_channel {
grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t num_filters,
- const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
- int is_client, grpc_call_list *call_list) {
+ const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client,
+ grpc_call_list *call_list) {
size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
@@ -105,7 +104,6 @@ grpc_channel *grpc_channel_create_from_filters(
/* decremented by grpc_channel_destroy */
gpr_ref_init(&channel->refs, 1);
channel->metadata_context = mdctx;
- channel->workqueue = workqueue;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0);
channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
@@ -371,7 +369,3 @@ grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) {
return channel->max_message_length;
}
-
-grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel) {
- return channel->workqueue;
-}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 3f51164fcc..2a249c9c51 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -36,12 +36,11 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/subchannel_factory.h"
-#include "src/core/iomgr/workqueue.h"
grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t count,
- const grpc_channel_args *args, grpc_mdctx *mdctx, grpc_workqueue *workqueue,
- int is_client, grpc_call_list *call_list);
+ const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client,
+ grpc_call_list *call_list);
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
@@ -63,8 +62,6 @@ grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
-grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel);
-
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_channel *channel, const char *reason,
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 7ac76da10a..f48fa4ed83 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -59,7 +59,6 @@ typedef struct {
grpc_endpoint *tcp;
grpc_mdctx *mdctx;
- grpc_workqueue *workqueue;
grpc_closure connected;
} connector;
@@ -73,7 +72,6 @@ static void connector_unref(grpc_connector *con, grpc_call_list *call_list) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
grpc_mdctx_unref(c->mdctx);
- GRPC_WORKQUEUE_UNREF(c->workqueue, "connector", call_list);
gpr_free(c);
}
}
@@ -156,6 +154,8 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
grpc_subchannel *s;
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
+ c->mdctx = f->mdctx;
+ grpc_mdctx_ref(c->mdctx);
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
args->args = final_args;
@@ -184,7 +184,6 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
- grpc_workqueue *workqueue = grpc_workqueue_create(&call_list);
size_t n = 0;
GPR_ASSERT(!reserved);
if (grpc_channel_args_is_census_enabled(args)) {
@@ -194,8 +193,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
- channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx,
- workqueue, 1, &call_list);
+ channel = grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1,
+ &call_list);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index c5cf33f1f9..2af028960d 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -157,8 +157,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
grpc_call_list call_list = GRPC_CALL_LIST_INIT;
static const grpc_channel_filter *filters[] = {&lame_filter};
channel = grpc_channel_create_from_filters(
- target, filters, 1, NULL, grpc_mdctx_create(),
- grpc_workqueue_create(&call_list), 1, &call_list);
+ target, filters, 1, NULL, grpc_mdctx_create(), 1, &call_list);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GPR_ASSERT(elem->filter == &lame_filter);
chand = (channel_data *)elem->channel_data;
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index b5b9ee173e..9f10cd29fd 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -69,7 +69,6 @@ typedef struct {
grpc_closure connected_closure;
grpc_mdctx *mdctx;
- grpc_workqueue *workqueue;
} connector;
static void connector_ref(grpc_connector *con) {
@@ -81,7 +80,6 @@ static void connector_unref(grpc_connector *con, grpc_call_list *call_list) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
grpc_mdctx_unref(c->mdctx);
- GRPC_WORKQUEUE_UNREF(c->workqueue, "connector", call_list);
gpr_free(c);
}
}
@@ -214,11 +212,10 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
c->security_connector = f->security_connector;
c->mdctx = f->mdctx;
grpc_mdctx_ref(c->mdctx);
- c->workqueue = grpc_channel_get_workqueue(f->master);
- GRPC_WORKQUEUE_REF(c->workqueue, "connector");
gpr_ref_init(&c->refs, 1);
args->args = final_args;
args->master = f->master;
+ args->mdctx = f->mdctx;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(&c->base, call_list);
grpc_channel_args_destroy(final_args);
@@ -243,7 +240,6 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *connector;
grpc_mdctx *mdctx;
- grpc_workqueue *workqueue;
grpc_resolver *resolver;
subchannel_factory *f;
#define MAX_FILTERS 3
@@ -267,7 +263,6 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
"Failed to create security connector.");
}
mdctx = grpc_mdctx_create();
- workqueue = grpc_workqueue_create(&call_list);
connector_arg = grpc_security_connector_to_arg(&connector->base);
args_copy = grpc_channel_args_copy_and_add(
@@ -281,7 +276,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(target, filters, n, args_copy,
- mdctx, workqueue, 1, &call_list);
+ mdctx, 1, &call_list);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 24545c67e1..4249e6f9bd 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -907,7 +907,6 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
- grpc_workqueue *workqueue,
const grpc_channel_args *args,
grpc_call_list *call_list) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
@@ -943,7 +942,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
}
channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
- mdctx, workqueue, 0, call_list);
+ mdctx, 0, call_list);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 2f2c5b8948..d08a038639 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -58,7 +58,6 @@ void grpc_server_add_listener(
void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
grpc_channel_filter const **extra_filters,
size_t num_extra_filters, grpc_mdctx *mdctx,
- grpc_workqueue *workqueue,
const grpc_channel_args *args,
grpc_call_list *call_list);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index df63d99dea..395c88827d 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -43,12 +43,11 @@
#include <grpc/support/useful.h>
static void setup_transport(void *server, grpc_transport *transport,
- grpc_mdctx *mdctx, grpc_workqueue *workqueue,
- grpc_call_list *call_list) {
+ grpc_mdctx *mdctx, grpc_call_list *call_list) {
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
grpc_server_setup_transport(server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
+ GPR_ARRAY_SIZE(extra_filters), mdctx,
grpc_server_get_channel_args(server), call_list);
}
@@ -62,10 +61,9 @@ static void new_transport(void *server, grpc_endpoint *tcp,
* case.
*/
grpc_mdctx *mdctx = grpc_mdctx_create();
- grpc_workqueue *workqueue = grpc_workqueue_create(call_list);
grpc_transport *transport = grpc_create_chttp2_transport(
grpc_server_get_channel_args(server), tcp, mdctx, 0, call_list);
- setup_transport(server, transport, mdctx, workqueue, call_list);
+ setup_transport(server, transport, mdctx, call_list);
grpc_chttp2_transport_start_reading(transport, NULL, 0, call_list);
}
@@ -144,5 +142,5 @@ error:
done:
grpc_call_list_run(&call_list);
- return 0;
+ return port_num;
}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index 3f92b22d5d..6ade8e19f7 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -36,7 +36,6 @@
#include <grpc/grpc.h>
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/workqueue.h"
typedef struct grpc_connectivity_state_watcher {
/** we keep watchers in a linked list */