aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-09 12:30:59 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-09 12:30:59 -0700
commit8664ca6463e8387b711f32fe283e0115e79c4c54 (patch)
treec7bacd643339d4473ddf3eb5db7cd8003d2986d3 /src/core
parent73b6606629bb65551bc804fa9e96b43ee3aa6583 (diff)
Starting to convert code to work queues
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c3
-rw-r--r--src/core/iomgr/iomgr.c30
-rw-r--r--src/core/iomgr/iomgr.h9
-rw-r--r--src/core/iomgr/iomgr_internal.h3
-rw-r--r--src/core/iomgr/pollset_posix.c3
-rw-r--r--src/core/security/credentials.c13
-rw-r--r--src/core/surface/channel.h3
-rw-r--r--src/core/surface/secure_channel_create.c4
8 files changed, 11 insertions, 57 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 2e25033813..2624fcdd53 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -600,7 +600,8 @@ static void cc_start_transport_op(grpc_channel_element *elem,
}
if (on_consumed) {
- grpc_iomgr_add_callback(on_consumed);
+ grpc_workqueue_push(grpc_channel_get_workqueue(chand->master), on_consumed,
+ 1);
}
}
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 1dd03992ae..cf74a16c0c 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -46,39 +46,9 @@
static gpr_mu g_mu;
static gpr_cv g_rcv;
-static grpc_iomgr_closure *g_cbs_head = NULL;
-static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
-static gpr_event g_background_callback_executor_done;
static grpc_iomgr_object g_root_object;
-/* Execute followup callbacks continuously.
- Other threads may check in and help during pollset_work() */
-static void background_callback_executor(void *ignored) {
- gpr_mu_lock(&g_mu);
- while (!g_shutdown) {
- gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- gpr_timespec short_deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN));
- if (g_cbs_head) {
- grpc_iomgr_closure *closure = g_cbs_head;
- g_cbs_head = closure->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
- closure->cb(closure->cb_arg, closure->success);
- gpr_mu_lock(&g_mu);
- } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC),
- &deadline)) {
- } else {
- gpr_mu_unlock(&g_mu);
- gpr_sleep_until(gpr_time_min(short_deadline, deadline));
- gpr_mu_lock(&g_mu);
- }
- }
- gpr_mu_unlock(&g_mu);
- gpr_event_set(&g_background_callback_executor_done, (void *)1);
-}
-
void grpc_kick_poller(void) {
/* Empty. The background callback executor polls periodically. The activity
* the kicker is trying to draw the executor's attention to will be picked up
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 261c17366a..f1d2e6439d 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -68,13 +68,4 @@ void grpc_iomgr_init(void);
/** Signals the intention to shutdown the iomgr. */
void grpc_iomgr_shutdown(void);
-/** Registers a closure to be invoked at some point in the future.
- *
- * Can be called from within a callback or from anywhere else */
-void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
-
-/** As per grpc_iomgr_add_callback, with the ability to set the success
- argument. */
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h
index 4cec973ba0..f266732c96 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_internal.h
@@ -43,9 +43,6 @@ typedef struct grpc_iomgr_object {
struct grpc_iomgr_object *prev;
} grpc_iomgr_object;
-int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
-
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name);
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 6bd1b61f24..97a9fa863e 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -178,9 +178,6 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
- if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
- goto done;
- }
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
goto done;
}
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index a764413300..f151fba959 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -47,6 +47,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
#include <grpc/support/time.h>
/* -- Common. -- */
@@ -54,7 +55,6 @@
struct grpc_credentials_metadata_request {
grpc_credentials *creds;
grpc_credentials_metadata_cb cb;
- grpc_iomgr_closure *on_simulated_token_fetch_done_closure;
void *user_data;
};
@@ -66,8 +66,6 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds,
gpr_malloc(sizeof(grpc_credentials_metadata_request));
r->creds = grpc_credentials_ref(creds);
r->cb = cb;
- r->on_simulated_token_fetch_done_closure =
- gpr_malloc(sizeof(grpc_iomgr_closure));
r->user_data = user_data;
return r;
}
@@ -75,7 +73,6 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds,
static void grpc_credentials_metadata_request_destroy(
grpc_credentials_metadata_request *r) {
grpc_credentials_unref(r->creds);
- gpr_free(r->on_simulated_token_fetch_done_closure);
gpr_free(r);
}
@@ -746,11 +743,10 @@ static int md_only_test_has_request_metadata_only(
return 1;
}
-void on_simulated_token_fetch_done(void *user_data, int success) {
+void on_simulated_token_fetch_done(void *user_data) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
- GPR_ASSERT(success);
r->cb(r->user_data, c->md_store->entries, c->md_store->num_entries,
GRPC_CREDENTIALS_OK);
grpc_credentials_metadata_request_destroy(r);
@@ -764,11 +760,10 @@ static void md_only_test_get_request_metadata(grpc_credentials *creds,
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds;
if (c->is_async) {
+ gpr_thd_id thd_id;
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
- grpc_iomgr_closure_init(cb_arg->on_simulated_token_fetch_done_closure,
- on_simulated_token_fetch_done, cb_arg);
- grpc_iomgr_add_callback(cb_arg->on_simulated_token_fetch_done_closure);
+ gpr_thd_new(&thd_id, on_simulated_token_fetch_done, cb_arg, NULL);
} else {
cb(user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index f271616f60..593faec7df 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -36,6 +36,7 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/subchannel_factory.h"
+#include "src/core/iomgr/workqueue.h"
grpc_channel *grpc_channel_create_from_filters(
const char *target, const grpc_channel_filter **filters, size_t count,
@@ -61,6 +62,8 @@ grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
+grpc_workqueue *grpc_channel_get_workqueue(grpc_channel *channel);
+
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 35b60bdbef..1b6d1b3f46 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -95,7 +95,7 @@ static void on_secure_transport_setup_done(void *arg,
}
notify = c->notify;
c->notify = NULL;
- grpc_iomgr_add_callback(notify);
+ notify->cb(notify->cb_arg, 1);
}
static void connected(void *arg, grpc_endpoint *tcp) {
@@ -108,7 +108,7 @@ static void connected(void *arg, grpc_endpoint *tcp) {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
c->notify = NULL;
- grpc_iomgr_add_callback(notify);
+ notify->cb(notify->cb_arg, 1);
}
}