diff options
author | 2015-09-09 12:30:59 -0700 | |
---|---|---|
committer | 2015-09-09 12:30:59 -0700 | |
commit | 8664ca6463e8387b711f32fe283e0115e79c4c54 (patch) | |
tree | c7bacd643339d4473ddf3eb5db7cd8003d2986d3 /src/core | |
parent | 73b6606629bb65551bc804fa9e96b43ee3aa6583 (diff) |
Starting to convert code to work queues
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 30 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.h | 9 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_internal.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 3 | ||||
-rw-r--r-- | src/core/security/credentials.c | 13 | ||||
-rw-r--r-- | src/core/surface/channel.h | 3 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 4 |
8 files changed, 11 insertions, 57 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 2e25033813..2624fcdd53 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -600,7 +600,8 @@ static void cc_start_transport_op(grpc_channel_element *elem, } if (on_consumed) { - grpc_iomgr_add_callback(on_consumed); + grpc_workqueue_push(grpc_channel_get_workqueue(chand->master), on_consumed, + 1); } } diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 1dd03992ae..cf74a16c0c 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -46,39 +46,9 @@ static gpr_mu g_mu; static gpr_cv g_rcv; -static grpc_iomgr_closure *g_cbs_head = NULL; -static grpc_iomgr_closure *g_cbs_tail = NULL; static int g_shutdown; -static gpr_event g_background_callback_executor_done; static grpc_iomgr_object g_root_object; -/* Execute followup callbacks continuously. - Other threads may check in and help during pollset_work() */ -static void background_callback_executor(void *ignored) { - gpr_mu_lock(&g_mu); - while (!g_shutdown) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - gpr_timespec short_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN)); - if (g_cbs_head) { - grpc_iomgr_closure *closure = g_cbs_head; - g_cbs_head = closure->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - closure->cb(closure->cb_arg, closure->success); - gpr_mu_lock(&g_mu); - } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC), - &deadline)) { - } else { - gpr_mu_unlock(&g_mu); - gpr_sleep_until(gpr_time_min(short_deadline, deadline)); - gpr_mu_lock(&g_mu); - } - } - gpr_mu_unlock(&g_mu); - gpr_event_set(&g_background_callback_executor_done, (void *)1); -} - void grpc_kick_poller(void) { /* Empty. The background callback executor polls periodically. The activity * the kicker is trying to draw the executor's attention to will be picked up diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 261c17366a..f1d2e6439d 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -68,13 +68,4 @@ void grpc_iomgr_init(void); /** Signals the intention to shutdown the iomgr. */ void grpc_iomgr_shutdown(void); -/** Registers a closure to be invoked at some point in the future. - * - * Can be called from within a callback or from anywhere else */ -void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); - -/** As per grpc_iomgr_add_callback, with the ability to set the success - argument. */ -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); - #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index 4cec973ba0..f266732c96 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -43,9 +43,6 @@ typedef struct grpc_iomgr_object { struct grpc_iomgr_object *prev; } grpc_iomgr_object; -int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); - void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 6bd1b61f24..97a9fa863e 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -178,9 +178,6 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); - if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { - goto done; - } if (grpc_alarm_check(&pollset->mu, now, &deadline)) { goto done; } diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index a764413300..f151fba959 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -47,6 +47,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> +#include <grpc/support/thd.h> #include <grpc/support/time.h> /* -- Common. -- */ @@ -54,7 +55,6 @@ struct grpc_credentials_metadata_request { grpc_credentials *creds; grpc_credentials_metadata_cb cb; - grpc_iomgr_closure *on_simulated_token_fetch_done_closure; void *user_data; }; @@ -66,8 +66,6 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds, gpr_malloc(sizeof(grpc_credentials_metadata_request)); r->creds = grpc_credentials_ref(creds); r->cb = cb; - r->on_simulated_token_fetch_done_closure = - gpr_malloc(sizeof(grpc_iomgr_closure)); r->user_data = user_data; return r; } @@ -75,7 +73,6 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds, static void grpc_credentials_metadata_request_destroy( grpc_credentials_metadata_request *r) { grpc_credentials_unref(r->creds); - gpr_free(r->on_simulated_token_fetch_done_closure); gpr_free(r); } @@ -746,11 +743,10 @@ static int md_only_test_has_request_metadata_only( return 1; } -void on_simulated_token_fetch_done(void *user_data, int success) { +void on_simulated_token_fetch_done(void *user_data) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds; - GPR_ASSERT(success); r->cb(r->user_data, c->md_store->entries, c->md_store->num_entries, GRPC_CREDENTIALS_OK); grpc_credentials_metadata_request_destroy(r); @@ -764,11 +760,10 @@ static void md_only_test_get_request_metadata(grpc_credentials *creds, grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds; if (c->is_async) { + gpr_thd_id thd_id; grpc_credentials_metadata_request *cb_arg = grpc_credentials_metadata_request_create(creds, cb, user_data); - grpc_iomgr_closure_init(cb_arg->on_simulated_token_fetch_done_closure, - on_simulated_token_fetch_done, cb_arg); - grpc_iomgr_add_callback(cb_arg->on_simulated_token_fetch_done_closure); + gpr_thd_new(&thd_id, on_simulated_token_fetch_done, cb_arg, NULL); } else { cb(user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK); } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index f271616f60..593faec7df 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -36,6 +36,7 @@ #include "src/core/channel/channel_stack.h" #include "src/core/client_config/subchannel_factory.h" +#include "src/core/iomgr/workqueue.h" grpc_channel *grpc_channel_create_from_filters( const char *target, const grpc_channel_filter **filters, size_t count, @@ -61,6 +62,8 @@ grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); +grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel); + #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_channel *channel, const char *reason); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 35b60bdbef..1b6d1b3f46 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -95,7 +95,7 @@ static void on_secure_transport_setup_done(void *arg, } notify = c->notify; c->notify = NULL; - grpc_iomgr_add_callback(notify); + notify->cb(notify->cb_arg, 1); } static void connected(void *arg, grpc_endpoint *tcp) { @@ -108,7 +108,7 @@ static void connected(void *arg, grpc_endpoint *tcp) { memset(c->result, 0, sizeof(*c->result)); notify = c->notify; c->notify = NULL; - grpc_iomgr_add_callback(notify); + notify->cb(notify->cb_arg, 1); } } |