diff options
author | 2015-06-01 20:10:13 -0700 | |
---|---|---|
committer | 2015-06-01 20:10:13 -0700 | |
commit | 3d67c7cf45f694a793be1adacafa2cfb98ecd75f (patch) | |
tree | 46e6bdb57e06d264321e06618435c050acad0725 /src/core | |
parent | fa275a97b968060383fe27c26b1d85f08d9582f9 (diff) | |
parent | 795b24225155b32e4985b4ecb8a478458b2be8cb (diff) |
Merge github.com:grpc/grpc into count-the-things
Conflicts:
src/core/iomgr/fd_posix.h
src/core/iomgr/iomgr.c
Diffstat (limited to 'src/core')
25 files changed, 538 insertions, 132 deletions
diff --git a/src/core/census/README.md b/src/core/census/README.md new file mode 100644 index 0000000000..fb615a2194 --- /dev/null +++ b/src/core/census/README.md @@ -0,0 +1,76 @@ +<!--- + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +--> + +# Census - a resource measurement and tracing system + +This directory contains code for Census, which will ultimately provide the +following features for any gRPC-using system: +* A [dapper](http://research.google.com/pubs/pub36356.html)-like tracing + system, enabling tracing across a distributed infrastructure. +* RPC statistics and measurements for key metrics, such as latency, bytes + transferred, number of errors etc. +* Resource measurement framework which can be used for measuring custom + metrics. Through the use of [tags](#Tags), these can be broken down across + the entire distributed stack. +* Easy integration of the above with + [Google Cloud Trace](https://cloud.google.com/tools/cloud-trace) and + [Google Cloud Monitoring](https://cloud.google.com/monitoring/). + +## Concepts + +### Context + +### Operations + +### Tags + +### Metrics + +## API + +### Internal/RPC API + +### External/Client API + +### RPC API + +## Files in this directory + +Note that files and functions in this directory can be split into two +categories: +* Files that define core census library functions. Functions etc. in these + files are named census\_\*, and constitute the core census library + functionality. At some time in the future, these will become a standalone + library. +* Files that define functions etc. that provide a convenient interface between + grpc and the core census functionality. These files are all named + grpc\_\*.{c,h}, and define function names beginning with grpc\_census\_\*. + diff --git a/src/core/census/context.c b/src/core/census/context.c new file mode 100644 index 0000000000..1358c5127b --- /dev/null +++ b/src/core/census/context.c @@ -0,0 +1,59 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "context.h" + +#include <string.h> +#include <grpc/census.h> +#include <grpc/support/alloc.h> + +/* Placeholder implementation only. */ + +size_t census_context_serialize(const census_context *context, char *buffer, + size_t buf_size) { + /* TODO(aveitch): implement serialization */ + return 0; +} + +int census_context_deserialize(const char *buffer, census_context **context) { + int ret = 0; + if (buffer != NULL) { + /* TODO(aveitch): implement deserialization. */ + ret = 1; + } + *context = gpr_malloc(sizeof(census_context)); + memset(*context, 0, sizeof(census_context)); + return ret; +} + +void census_context_destroy(census_context *context) { gpr_free(context); } diff --git a/src/core/census/context.h b/src/core/census/context.h new file mode 100644 index 0000000000..d43a69f7e5 --- /dev/null +++ b/src/core/census/context.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H +#define GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H + +#include <grpc/census.h> + +/* census_context is the in-memory representation of information needed to + * maintain tracing, RPC statistics and resource usage information. */ +struct census_context { + gpr_uint64 op_id; /* Operation identifier - unique per-context */ + gpr_uint64 trace_id; /* Globally unique trace identifier */ + /* TODO(aveitch) Add census tags: + const census_tag_set *tags; + */ +}; + +#endif /* GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H */ diff --git a/src/core/census/grpc_context.c b/src/core/census/grpc_context.c new file mode 100644 index 0000000000..cf2353199f --- /dev/null +++ b/src/core/census/grpc_context.c @@ -0,0 +1,45 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> +#include "src/core/census/grpc_context.h" + +void *grpc_census_context_create() { + census_context *context; + census_context_deserialize(NULL, &context); + return (void *)context; +} + +void grpc_census_context_destroy(void *context) { + census_context_destroy((census_context *)context); +} diff --git a/src/core/census/grpc_context.h b/src/core/census/grpc_context.h new file mode 100644 index 0000000000..f610f6ce21 --- /dev/null +++ b/src/core/census/grpc_context.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* GRPC <--> CENSUS context interface */ + +#ifndef CENSUS_GRPC_CONTEXT_H +#define CENSUS_GRPC_CONTEXT_H + +void *grpc_census_context_create(); +void grpc_census_context_destroy(void *context); + +#endif /* CENSUS_GRPC_CONTEXT_H */ diff --git a/src/core/census/initialize.c b/src/core/census/initialize.c new file mode 100644 index 0000000000..057ac78ee7 --- /dev/null +++ b/src/core/census/initialize.c @@ -0,0 +1,50 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> + +static int census_fns_enabled = CENSUS_NONE; + +int census_initialize(int functions) { + if (census_fns_enabled != CENSUS_NONE) { + return 1; + } + if (functions != CENSUS_NONE) { + return 1; + } else { + census_fns_enabled = functions; + return 0; + } +} + +void census_shutdown() { census_fns_enabled = CENSUS_NONE; } diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index a2f3c54290..600f7df1bf 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -58,6 +58,9 @@ typedef struct { gpr_uint8 sending_farewell; /* have we sent farewell (goaway + disconnect) */ gpr_uint8 sent_farewell; + + grpc_iomgr_closure finally_destroy_channel_closure; + grpc_iomgr_closure send_farewells_closure; } lb_channel_data; typedef struct { grpc_child_channel *channel; } lb_call_data; @@ -213,12 +216,16 @@ static void maybe_destroy_channel(grpc_child_channel *channel) { lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data; if (chand->destroyed && chand->disconnected && chand->active_calls == 0 && !chand->sending_farewell && !chand->calling_back) { - grpc_iomgr_add_callback(finally_destroy_channel, channel); + chand->finally_destroy_channel_closure.cb = finally_destroy_channel; + chand->finally_destroy_channel_closure.cb_arg = channel; + grpc_iomgr_add_callback(&chand->finally_destroy_channel_closure); } else if (chand->destroyed && !chand->disconnected && chand->active_calls == 0 && !chand->sending_farewell && !chand->sent_farewell) { chand->sending_farewell = 1; - grpc_iomgr_add_callback(send_farewells, channel); + chand->send_farewells_closure.cb = send_farewells; + chand->send_farewells_closure.cb_arg = channel; + grpc_iomgr_add_callback(&chand->send_farewells_closure); } } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 4ba06676ab..28ed7708f7 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -90,6 +90,7 @@ static grpc_fd *alloc_fd(int fd) { gpr_mu_init(&r->set_state_mu); gpr_mu_init(&r->watcher_mu); } + gpr_atm_rel_store(&r->refst, 1); gpr_atm_rel_store(&r->readst, NOT_READY); gpr_atm_rel_store(&r->writest, NOT_READY); @@ -115,8 +116,7 @@ static void ref_by(grpc_fd *fd, int n) { static void unref_by(grpc_fd *fd, int n) { gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - close(fd->fd); - grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); + grpc_iomgr_add_callback(&fd->on_done_closure); freelist_fd(fd); grpc_iomgr_unregister_object(&fd->iomgr_object); } else { @@ -179,8 +179,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) { } void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { - fd->on_done = on_done ? on_done : do_nothing; - fd->on_done_user_data = user_data; + grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing, + user_data); shutdown(fd->fd, SHUT_RDWR); ref_by(fd, 1); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); @@ -194,21 +194,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } -static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, +static void process_callback(grpc_iomgr_closure *closure, int success, int allow_synchronous_callback) { if (allow_synchronous_callback) { - cb(arg, success); + closure->cb(closure->cb_arg, success); } else { - grpc_iomgr_add_delayed_callback(cb, arg, success); + grpc_iomgr_add_delayed_callback(closure, success); } } -static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success, - int allow_synchronous_callback) { +static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, + int success, int allow_synchronous_callback) { size_t i; for (i = 0; i < n; i++) { - make_callback(callbacks[i].cb, callbacks[i].cb_arg, success, - allow_synchronous_callback); + process_callback(callbacks + i, success, allow_synchronous_callback); } } @@ -233,10 +232,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, /* swap was unsuccessful due to an intervening set_ready call. Fall through to the READY code below */ case READY: - assert(gpr_atm_no_barrier_load(st) == READY); + GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); - make_callback(closure->cb, closure->cb_arg, - !gpr_atm_acq_load(&fd->shutdown), + process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), allow_synchronous_callback); return; default: /* WAITING */ @@ -250,7 +248,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, abort(); } -static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks, +static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks, size_t *ncallbacks) { gpr_intptr state = gpr_atm_acq_load(st); @@ -268,9 +266,9 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks, Fall through to the WAITING code below */ state = gpr_atm_acq_load(st); default: /* waiting */ - assert(gpr_atm_no_barrier_load(st) != READY && - gpr_atm_no_barrier_load(st) != NOT_READY); - callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state; + GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY && + gpr_atm_no_barrier_load(st) != NOT_READY); + callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state; gpr_atm_rel_store(st, NOT_READY); return; } @@ -281,25 +279,30 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure cb; + grpc_iomgr_closure* closure; size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); - set_ready_locked(st, &cb, &ncb); + set_ready_locked(st, &closure, &ncb); gpr_mu_unlock(&fd->set_state_mu); success = !gpr_atm_acq_load(&fd->shutdown); - make_callbacks(&cb, ncb, success, allow_synchronous_callback); + GPR_ASSERT(ncb <= 1); + if (ncb > 0) { + process_callbacks(closure, ncb, success, allow_synchronous_callback); + } } void grpc_fd_shutdown(grpc_fd *fd) { - grpc_iomgr_closure cb[2]; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(&fd->readst, cb, &ncb); - set_ready_locked(&fd->writest, cb, &ncb); + set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb); + set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb); gpr_mu_unlock(&fd->set_state_mu); - make_callbacks(cb, ncb, 0, 0); + GPR_ASSERT(ncb <= 2); + process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */, + 0 /* GPR_FALSE */); } void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index a15ed7c90d..0fa71850e3 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -40,11 +40,6 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -typedef struct { - grpc_iomgr_cb_func cb; - void *cb_arg; -} grpc_iomgr_closure; - typedef struct grpc_fd grpc_fd; typedef struct grpc_fd_watcher { @@ -96,10 +91,11 @@ struct grpc_fd { gpr_atm readst; gpr_atm writest; - grpc_iomgr_cb_func on_done; - void *on_done_user_data; struct grpc_fd *freelist_next; + grpc_iomgr_closure on_done_closure; + grpc_iomgr_closure *shutdown_closures[2]; + grpc_iomgr_object iomgr_object; }; diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 1377b6dc9e..249228a214 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -43,17 +43,10 @@ #include <grpc/support/thd.h> #include <grpc/support/sync.h> -typedef struct delayed_callback { - grpc_iomgr_cb_func cb; - void *cb_arg; - int success; - struct delayed_callback *next; -} delayed_callback; - static gpr_mu g_mu; static gpr_cv g_rcv; -static delayed_callback *g_cbs_head = NULL; -static delayed_callback *g_cbs_tail = NULL; +static grpc_iomgr_closure *g_cbs_head = NULL; +static grpc_iomgr_closure *g_cbs_tail = NULL; static int g_shutdown; static gpr_event g_background_callback_executor_done; static grpc_iomgr_object g_root_object; @@ -67,12 +60,11 @@ static void background_callback_executor(void *ignored) { gpr_timespec short_deadline = gpr_time_add(gpr_now(), gpr_time_from_millis(100)); if (g_cbs_head) { - delayed_callback *cb = g_cbs_head; - g_cbs_head = cb->next; + grpc_iomgr_closure *closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - cb->cb(cb->cb_arg, cb->success); - gpr_free(cb); + closure->cb(closure->cb_arg, closure->success); gpr_mu_lock(&g_mu); } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { } else { @@ -114,8 +106,8 @@ static size_t count_objects(void) { } void grpc_iomgr_shutdown(void) { - delayed_callback *cb; grpc_iomgr_object *obj; + grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); @@ -128,13 +120,12 @@ void grpc_iomgr_shutdown(void) { g_cbs_head ? " and executing final callbacks" : ""); if (g_cbs_head) { do { - cb = g_cbs_head; - g_cbs_head = cb->next; + closure = g_cbs_head; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); - cb->cb(cb->cb_arg, 0); - gpr_free(cb); + closure->cb(closure->cb_arg, 0); gpr_mu_lock(&g_mu); } while (g_cbs_head); continue; @@ -190,42 +181,48 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, - int success) { - delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback)); - dcb->cb = cb; - dcb->cb_arg = cb_arg; - dcb->success = success; + +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg) { + closure->cb = cb; + closure->cb_arg = cb_arg; + closure->next = NULL; +} + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { + closure->success = success; gpr_mu_lock(&g_mu); - dcb->next = NULL; + closure->next = NULL; if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = dcb; + g_cbs_head = g_cbs_tail = closure; } else { - g_cbs_tail->next = dcb; - g_cbs_tail = dcb; + g_cbs_tail->next = closure; + g_cbs_tail = closure; } gpr_mu_unlock(&g_mu); } -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { - grpc_iomgr_add_delayed_callback(cb, cb_arg, 1); + +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { + grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */); } + int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { int n = 0; gpr_mu *retake_mu = NULL; - delayed_callback *cb; + grpc_iomgr_closure *closure; for (;;) { /* check for new work */ if (!gpr_mu_trylock(&g_mu)) { break; } - cb = g_cbs_head; - if (!cb) { + closure = g_cbs_head; + if (!closure) { gpr_mu_unlock(&g_mu); break; } - g_cbs_head = cb->next; + g_cbs_head = closure->next; if (!g_cbs_head) g_cbs_tail = NULL; gpr_mu_unlock(&g_mu); /* if we have a mutex to drop, do so before executing work */ @@ -234,8 +231,7 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { retake_mu = drop_mu; drop_mu = NULL; } - cb->cb(cb->cb_arg, success && cb->success); - gpr_free(cb); + closure->cb(closure->cb_arg, success && closure->success); n++; } if (retake_mu) { diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 1f5d23fdda..a10e481e48 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -34,14 +34,43 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_H #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_H -/* gRPC Callback definition */ +/** gRPC Callback definition. + * + * \param arg Arbitrary input. + * \param success An indication on the state of the iomgr. On false, cleanup + * actions should be taken (eg, shutdown). */ typedef void (*grpc_iomgr_cb_func)(void *arg, int success); +/** A closure over a grpc_iomgr_cb_func. */ +typedef struct grpc_iomgr_closure { + /** Bound callback. */ + grpc_iomgr_cb_func cb; + + /** Arguments to be passed to "cb". */ + void *cb_arg; + + /** Internal. A boolean indication to "cb" on the state of the iomgr. + * For instance, closures created during a shutdown would have this field set + * to false. */ + int success; + + /**< Internal. Do not touch */ + struct grpc_iomgr_closure *next; +} grpc_iomgr_closure; + +/** Initializes \a closure with \a cb and \a cb_arg. */ +void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg); + +/** Initializes the iomgr. */ void grpc_iomgr_init(void); + +/** Signals the intention to shutdown the iomgr. */ void grpc_iomgr_shutdown(void); -/* This function is called from within a callback or from anywhere else - and causes the invocation of a callback at some point in the future */ -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg); +/** Registers a closure to be invoked at some point in the future. + * + * Can be called from within a callback or from anywhere else */ +void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index 54eadf1edc..6c1e0e1799 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -35,7 +35,6 @@ #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/iomgr_internal.h" #include <grpc/support/sync.h> typedef struct grpc_iomgr_object { @@ -45,8 +44,7 @@ typedef struct grpc_iomgr_object { } grpc_iomgr_object; int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); -void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, - int success); +void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 826c792990..a8e6069002 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -257,6 +257,7 @@ typedef struct grpc_unary_promote_args { const grpc_pollset_vtable *original_vtable; grpc_pollset *pollset; grpc_fd *fd; + grpc_iomgr_closure promotion_closure; } grpc_unary_promote_args; static void unary_poll_do_promote(void *args, int success) { @@ -279,7 +280,7 @@ static void unary_poll_do_promote(void *args, int success) { /* First we need to ensure that nobody is polling concurrently */ while (pollset->counter != 0) { grpc_pollset_kick(pollset); - grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + grpc_iomgr_add_callback(&up_args->promotion_closure); gpr_mu_unlock(&pollset->mu); return; } @@ -363,7 +364,9 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { up_args->pollset = pollset; up_args->fd = fd; up_args->original_vtable = pollset->vtable; - grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + up_args->promotion_closure.cb = unary_poll_do_promote; + up_args->promotion_closure.cb_arg = up_args; + grpc_iomgr_add_callback(&up_args->promotion_closure); grpc_pollset_kick(pollset); } diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 5af0685f9d..b1f4c09a2c 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -66,15 +66,15 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { gpr_timespec now; now = gpr_now(); if (gpr_time_cmp(now, deadline) > 0) { - return 0; + return 0 /* GPR_FALSE */; } - if (grpc_maybe_call_delayed_callbacks(NULL, 1)) { - return 1; + if (grpc_maybe_call_delayed_callbacks(NULL, 1 /* GPR_TRUE */)) { + return 1 /* GPR_TRUE */; } if (grpc_alarm_check(NULL, now, &deadline)) { - return 1; + return 1 /* GPR_TRUE */; } - return 0; + return 0 /* GPR_FALSE */; } void grpc_pollset_kick(grpc_pollset *p) { } diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index ee5150a696..805fa8a4fc 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -39,7 +39,6 @@ #include <grpc/support/log.h> #include "src/core/iomgr/iocp_windows.h" -#include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" @@ -64,13 +63,15 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) { gpr_mu_lock(&socket->state_mu); if (socket->read_info.cb) { callbacks_set++; - grpc_iomgr_add_delayed_callback(socket->read_info.cb, - socket->read_info.opaque, 0); + grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb, + socket->read_info.opaque); + grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); } if (socket->write_info.cb) { callbacks_set++; - grpc_iomgr_add_delayed_callback(socket->write_info.cb, - socket->write_info.opaque, 0); + grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb, + socket->write_info.opaque); + grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); } gpr_mu_unlock(&socket->state_mu); return callbacks_set; diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index b27eb14219..d5fee39604 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -39,6 +39,8 @@ #include <grpc/support/sync.h> #include <grpc/support/atm.h> +#include "src/core/iomgr/iomgr.h" + /* This holds the data for an outstanding read or write on a socket. The mutex to protect the concurrent access to that data is the one inside the winsocket wrapper. */ @@ -93,6 +95,8 @@ typedef struct grpc_winsocket { there is a pending operation that the IO Completion Port will have to wait for. The socket will be collected at that time. */ int orphan; + + grpc_iomgr_closure shutdown_closure; } grpc_winsocket; /* Create a wrapped windows handle. This takes ownership of it, meaning that diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index cd6b2ecae6..2f19f9d442 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -280,6 +280,8 @@ typedef struct { grpc_iomgr_closure read_closure; grpc_iomgr_closure write_closure; + + grpc_iomgr_closure handle_read_closure; } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); @@ -443,7 +445,8 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, tcp->finished_edge = 0; grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { - grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp); + tcp->handle_read_closure.cb_arg = tcp; + grpc_iomgr_add_callback(&tcp->handle_read_closure); } } @@ -592,6 +595,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = grpc_tcp_handle_write; tcp->write_closure.cb_arg = tcp; + + tcp->handle_read_closure.cb = grpc_tcp_handle_read; return &tcp->base; } diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index ae22bf47a0..9bf5c32e74 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -54,6 +54,7 @@ typedef struct { grpc_credentials *creds; grpc_credentials_metadata_cb cb; + grpc_iomgr_closure *on_simulated_token_fetch_done_closure; void *user_data; } grpc_credentials_metadata_request; @@ -65,6 +66,8 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds, gpr_malloc(sizeof(grpc_credentials_metadata_request)); r->creds = grpc_credentials_ref(creds); r->cb = cb; + r->on_simulated_token_fetch_done_closure = + gpr_malloc(sizeof(grpc_iomgr_closure)); r->user_data = user_data; return r; } @@ -72,6 +75,7 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds, static void grpc_credentials_metadata_request_destroy( grpc_credentials_metadata_request *r) { grpc_credentials_unref(r->creds); + gpr_free(r->on_simulated_token_fetch_done_closure); gpr_free(r); } @@ -831,9 +835,11 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds, grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds; if (c->is_async) { - grpc_iomgr_add_callback( - on_simulated_token_fetch_done, - grpc_credentials_metadata_request_create(creds, cb, user_data)); + grpc_credentials_metadata_request *cb_arg = + grpc_credentials_metadata_request_create(creds, cb, user_data); + grpc_iomgr_closure_init(cb_arg->on_simulated_token_fetch_done_closure, + on_simulated_token_fetch_done, cb_arg); + grpc_iomgr_add_callback(cb_arg->on_simulated_token_fetch_done_closure); } else { cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK); } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index e3995a407b..88ff5cfbce 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -31,6 +31,7 @@ * */ +#include "src/core/census/grpc_context.h" #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" #include "src/core/iomgr/alarm.h" @@ -226,6 +227,7 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; + grpc_iomgr_closure destroy_closure; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -242,9 +244,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op); static void execute_op(grpc_call *call, grpc_transport_op *op); static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata); static void finish_read_ops(grpc_call *call); -static grpc_call_error cancel_with_status( - grpc_call *c, grpc_status_code status, const char *description, - gpr_uint8 locked); +static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, + const char *description, + gpr_uint8 locked); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data, @@ -268,6 +270,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, if (call->is_client) { call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; + call->context[GRPC_CONTEXT_TRACING].value = grpc_census_context_create(); + call->context[GRPC_CONTEXT_TRACING].destroy = grpc_census_context_destroy; } GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); for (i = 0; i < add_initial_metadata_count; i++) { @@ -367,7 +371,9 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { if (allow_immediate_deletion) { destroy_call(c, 1); } else { - grpc_iomgr_add_callback(destroy_call, c); + c->destroy_closure.cb = destroy_call; + c->destroy_closure.cb_arg = c; + grpc_iomgr_add_callback(&c->destroy_closure); } } } @@ -403,7 +409,8 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static int need_more_data(grpc_call *call) { if (call->read_state == READ_STATE_STREAM_CLOSED) return 0; return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || - (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) || + (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && + grpc_bbq_empty(&call->incoming_queue)) || is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || is_op_live(call, GRPC_IOREQ_RECV_STATUS) || is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || @@ -556,13 +563,13 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, break; case GRPC_IOREQ_RECV_INITIAL_METADATA: GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0], - *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA] - .recv_metadata); + *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA] + .recv_metadata); break; case GRPC_IOREQ_RECV_TRAILING_METADATA: GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1], - *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA] - .recv_metadata); + *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA] + .recv_metadata); break; case GRPC_IOREQ_OP_COUNT: abort(); @@ -676,9 +683,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { } /* we have to be reading a message to know what to do here */ if (!call->reading_message) { - cancel_with_status( - call, GRPC_STATUS_INVALID_ARGUMENT, - "Received payload data while not reading a message", 1); + cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, + "Received payload data while not reading a message", 1); return 0; } /* append the slice to the incoming buffer */ @@ -1025,9 +1031,9 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return cancel_with_status(c, status, description, 0); } -static grpc_call_error cancel_with_status( - grpc_call *c, grpc_status_code status, const char *description, - gpr_uint8 locked) { +static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, + const char *description, + gpr_uint8 locked) { grpc_transport_op op; grpc_mdstr *details = description ? grpc_mdstr_from_string(c->metadata_context, description) @@ -1294,12 +1300,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_cq_begin_op(call->cq, call); - return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, - tag); + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } -void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *value, - void (*destroy)(void *value)) { +void grpc_call_context_set(grpc_call *call, grpc_context_index elem, + void *value, void (*destroy)(void *value)) { if (call->context[elem].destroy) { call->context[elem].destroy(call->context[elem].value); } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index be9da2b7f9..947011c613 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -61,6 +61,7 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call *registered_calls; + grpc_iomgr_closure destroy_closure; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -193,7 +194,9 @@ static void destroy_channel(void *p, int ok) { void grpc_channel_internal_unref(grpc_channel *channel) { if (gpr_unref(&channel->refs)) { - grpc_iomgr_add_callback(destroy_channel, channel); + channel->destroy_closure.cb = destroy_channel; + channel->destroy_closure.cb_arg = channel; + grpc_iomgr_add_callback(&channel->destroy_closure); } } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index daa8d3a7c6..9fa6696bf6 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -195,9 +195,10 @@ grpc_channel *grpc_channel_create(const char *target, const grpc_channel_filter *filters[MAX_FILTERS]; int n = 0; filters[n++] = &grpc_client_surface_filter; + /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; - } + } */ filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index d6eb9b2c24..ac6871c6f2 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -31,11 +31,11 @@ * */ +#include <grpc/census.h> #include <grpc/grpc.h> #include "src/core/channel/channel_stack.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/iomgr.h" -#include "src/core/statistics/census_interface.h" #include "src/core/profiling/timers.h" #include "src/core/surface/call.h" #include "src/core/surface/init.h" @@ -64,7 +64,9 @@ void grpc_init(void) { grpc_security_pre_init(); grpc_iomgr_init(); grpc_tracer_init("GRPC_TRACE"); - census_init(); + if (census_initialize(CENSUS_NONE)) { + gpr_log(GPR_ERROR, "Could not initialize census."); + } grpc_timers_global_init(); } gpr_mu_unlock(&g_init_mu); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index a71d12291e..8ef121dc48 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -234,9 +234,10 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, new_args_from_connector != NULL ? new_args_from_connector : args, &connector_arg); filters[n++] = &grpc_client_surface_filter; + /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; - } + } */ filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 60606c75e4..7e69ec0221 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -122,6 +122,8 @@ struct channel_data { channel_registered_method *registered_methods; gpr_uint32 registered_method_slots; gpr_uint32 registered_method_max_probes; + grpc_iomgr_closure finish_shutdown_channel_closure; + grpc_iomgr_closure finish_destroy_channel_closure; }; struct grpc_server { @@ -178,6 +180,8 @@ struct call_data { void (*on_done_recv)(void *user_data, int success); void *recv_user_data; + grpc_iomgr_closure kill_zombie_closure; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; }; @@ -304,7 +308,9 @@ static void destroy_channel(channel_data *chand) { GPR_ASSERT(chand->server != NULL); orphan_channel(chand); server_ref(chand->server); - grpc_iomgr_add_callback(finish_destroy_channel, chand); + chand->finish_destroy_channel_closure.cb = finish_destroy_channel; + chand->finish_destroy_channel_closure.cb_arg = chand; + grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure); } static void finish_start_new_rpc_and_unlock(grpc_server *server, @@ -416,7 +422,8 @@ static void server_on_recv(void *ptr, int success) { gpr_mu_lock(&chand->server->mu); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); + grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); } gpr_mu_unlock(&chand->server->mu); break; @@ -424,11 +431,14 @@ static void server_on_recv(void *ptr, int success) { gpr_mu_lock(&chand->server->mu); if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); + grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); } else if (calld->state == PENDING) { call_list_remove(calld, PENDING_START); calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); + grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); + } gpr_mu_unlock(&chand->server->mu); break; @@ -502,7 +512,9 @@ static void finish_shutdown_channel(void *cd, int success) { static void shutdown_channel(channel_data *chand) { grpc_channel_internal_ref(chand->channel); - grpc_iomgr_add_callback(finish_shutdown_channel, chand); + chand->finish_shutdown_channel_closure.cb = finish_shutdown_channel; + chand->finish_shutdown_channel_closure.cb_arg = chand; + grpc_iomgr_add_callback(&chand->finish_shutdown_channel_closure); } static void init_call_elem(grpc_call_element *elem, @@ -594,9 +606,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, - destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, "server", + server_start_transport_op, + channel_op, + sizeof(call_data), + init_call_elem, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + "server", }; void grpc_server_register_completion_queue(grpc_server *server, @@ -616,7 +634,9 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, size_t filter_count, const grpc_channel_args *args) { size_t i; - int census_enabled = grpc_channel_args_is_census_enabled(args); + /* TODO(census): restore this once we finalize census filter etc. + int census_enabled = grpc_channel_args_is_census_enabled(args); */ + int census_enabled = 0; grpc_server *server = gpr_malloc(sizeof(grpc_server)); @@ -642,9 +662,10 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, server->channel_filters = gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *)); server->channel_filters[0] = &server_surface_filter; + /* TODO(census): restore this once we rework census filter if (census_enabled) { server->channel_filters[1] = &grpc_server_census_filter; - } + } */ for (i = 0; i < filter_count; i++) { server->channel_filters[i + 1 + census_enabled] = filters[i]; } @@ -940,11 +961,15 @@ void grpc_server_destroy(grpc_server *server) { while ((calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START)) != NULL) { + /* TODO(dgq): If we knew the size of the call list (or an upper bound), we + * could allocate all the memory for the closures in advance in a single + * chunk */ gpr_log(GPR_DEBUG, "server destroys call %p", calld->call); calld->state = ZOMBIED; - grpc_iomgr_add_callback( - kill_zombie, + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); } for (c = server->root_channel_data.next; c != &server->root_channel_data; diff --git a/src/core/transport/chttp2/gen_hpack_tables.c b/src/core/transport/chttp2/gen_hpack_tables.c index 86b593129b..bdaa3cf094 100644 --- a/src/core/transport/chttp2/gen_hpack_tables.c +++ b/src/core/transport/chttp2/gen_hpack_tables.c @@ -219,10 +219,10 @@ static int state_index(int bitofs, symset syms, int *isnew) { emit - the symbol to emit on this nibble (or -1 if no symbol has been found) syms - the set of symbols that could be matched */ -static void build_dec_tbl(int state, int nibble, int nibbits, int bitofs, +static void build_dec_tbl(int state, int nibble, int nibbits, unsigned bitofs, int emit, symset syms) { int i; - int bit; + unsigned bit; /* If we have four bits in the nibble we're looking at, then we can fill in a slot in the lookup tables. */ @@ -338,7 +338,7 @@ static void generate_base64_inverse_table(void) { static const char alphabet[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="; unsigned char inverse[256]; - int i; + unsigned i; memset(inverse, 255, sizeof(inverse)); for (i = 0; i < strlen(alphabet); i++) { |