aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/README.md76
-rw-r--r--src/core/census/context.c59
-rw-r--r--src/core/census/context.h49
-rw-r--r--src/core/census/grpc_context.c45
-rw-r--r--src/core/census/grpc_context.h42
-rw-r--r--src/core/census/initialize.c50
-rw-r--r--src/core/channel/child_channel.c11
-rw-r--r--src/core/iomgr/endpoint_pair.h3
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c17
-rw-r--r--src/core/iomgr/endpoint_pair_windows.c6
-rw-r--r--src/core/iomgr/fd_posix.c60
-rw-r--r--src/core/iomgr/fd_posix.h16
-rw-r--r--src/core/iomgr/iomgr.c125
-rw-r--r--src/core/iomgr/iomgr.h37
-rw-r--r--src/core/iomgr/iomgr_internal.h14
-rw-r--r--src/core/iomgr/pollset_posix.c7
-rw-r--r--src/core/iomgr/pollset_windows.c10
-rw-r--r--src/core/iomgr/resolve_address_posix.c14
-rw-r--r--src/core/iomgr/resolve_address_windows.c8
-rw-r--r--src/core/iomgr/socket_windows.c17
-rw-r--r--src/core/iomgr/socket_windows.h9
-rw-r--r--src/core/iomgr/tcp_client_posix.c22
-rw-r--r--src/core/iomgr/tcp_client_windows.c2
-rw-r--r--src/core/iomgr/tcp_posix.c7
-rw-r--r--src/core/iomgr/tcp_server_posix.c23
-rw-r--r--src/core/iomgr/tcp_server_windows.c5
-rw-r--r--src/core/security/credentials.c12
-rw-r--r--src/core/surface/call.c43
-rw-r--r--src/core/surface/channel.c5
-rw-r--r--src/core/surface/channel_create.c3
-rw-r--r--src/core/surface/init.c6
-rw-r--r--src/core/surface/secure_channel_create.c3
-rw-r--r--src/core/surface/server.c46
-rw-r--r--src/core/transport/chttp2/gen_hpack_tables.c6
34 files changed, 670 insertions, 188 deletions
diff --git a/src/core/census/README.md b/src/core/census/README.md
new file mode 100644
index 0000000000..fb615a2194
--- /dev/null
+++ b/src/core/census/README.md
@@ -0,0 +1,76 @@
+<!---
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+-->
+
+# Census - a resource measurement and tracing system
+
+This directory contains code for Census, which will ultimately provide the
+following features for any gRPC-using system:
+* A [dapper](http://research.google.com/pubs/pub36356.html)-like tracing
+ system, enabling tracing across a distributed infrastructure.
+* RPC statistics and measurements for key metrics, such as latency, bytes
+ transferred, number of errors etc.
+* Resource measurement framework which can be used for measuring custom
+ metrics. Through the use of [tags](#Tags), these can be broken down across
+ the entire distributed stack.
+* Easy integration of the above with
+ [Google Cloud Trace](https://cloud.google.com/tools/cloud-trace) and
+ [Google Cloud Monitoring](https://cloud.google.com/monitoring/).
+
+## Concepts
+
+### Context
+
+### Operations
+
+### Tags
+
+### Metrics
+
+## API
+
+### Internal/RPC API
+
+### External/Client API
+
+### RPC API
+
+## Files in this directory
+
+Note that files and functions in this directory can be split into two
+categories:
+* Files that define core census library functions. Functions etc. in these
+ files are named census\_\*, and constitute the core census library
+ functionality. At some time in the future, these will become a standalone
+ library.
+* Files that define functions etc. that provide a convenient interface between
+ grpc and the core census functionality. These files are all named
+ grpc\_\*.{c,h}, and define function names beginning with grpc\_census\_\*.
+
diff --git a/src/core/census/context.c b/src/core/census/context.c
new file mode 100644
index 0000000000..1358c5127b
--- /dev/null
+++ b/src/core/census/context.c
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "context.h"
+
+#include <string.h>
+#include <grpc/census.h>
+#include <grpc/support/alloc.h>
+
+/* Placeholder implementation only. */
+
+size_t census_context_serialize(const census_context *context, char *buffer,
+ size_t buf_size) {
+ /* TODO(aveitch): implement serialization */
+ return 0;
+}
+
+int census_context_deserialize(const char *buffer, census_context **context) {
+ int ret = 0;
+ if (buffer != NULL) {
+ /* TODO(aveitch): implement deserialization. */
+ ret = 1;
+ }
+ *context = gpr_malloc(sizeof(census_context));
+ memset(*context, 0, sizeof(census_context));
+ return ret;
+}
+
+void census_context_destroy(census_context *context) { gpr_free(context); }
diff --git a/src/core/census/context.h b/src/core/census/context.h
new file mode 100644
index 0000000000..d43a69f7e5
--- /dev/null
+++ b/src/core/census/context.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H
+#define GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H
+
+#include <grpc/census.h>
+
+/* census_context is the in-memory representation of information needed to
+ * maintain tracing, RPC statistics and resource usage information. */
+struct census_context {
+ gpr_uint64 op_id; /* Operation identifier - unique per-context */
+ gpr_uint64 trace_id; /* Globally unique trace identifier */
+ /* TODO(aveitch) Add census tags:
+ const census_tag_set *tags;
+ */
+};
+
+#endif /* GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H */
diff --git a/src/core/census/grpc_context.c b/src/core/census/grpc_context.c
new file mode 100644
index 0000000000..cf2353199f
--- /dev/null
+++ b/src/core/census/grpc_context.c
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+#include "src/core/census/grpc_context.h"
+
+void *grpc_census_context_create() {
+ census_context *context;
+ census_context_deserialize(NULL, &context);
+ return (void *)context;
+}
+
+void grpc_census_context_destroy(void *context) {
+ census_context_destroy((census_context *)context);
+}
diff --git a/src/core/census/grpc_context.h b/src/core/census/grpc_context.h
new file mode 100644
index 0000000000..f610f6ce21
--- /dev/null
+++ b/src/core/census/grpc_context.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* GRPC <--> CENSUS context interface */
+
+#ifndef CENSUS_GRPC_CONTEXT_H
+#define CENSUS_GRPC_CONTEXT_H
+
+void *grpc_census_context_create();
+void grpc_census_context_destroy(void *context);
+
+#endif /* CENSUS_GRPC_CONTEXT_H */
diff --git a/src/core/census/initialize.c b/src/core/census/initialize.c
new file mode 100644
index 0000000000..057ac78ee7
--- /dev/null
+++ b/src/core/census/initialize.c
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+
+static int census_fns_enabled = CENSUS_NONE;
+
+int census_initialize(int functions) {
+ if (census_fns_enabled != CENSUS_NONE) {
+ return 1;
+ }
+ if (functions != CENSUS_NONE) {
+ return 1;
+ } else {
+ census_fns_enabled = functions;
+ return 0;
+ }
+}
+
+void census_shutdown() { census_fns_enabled = CENSUS_NONE; }
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index a2f3c54290..600f7df1bf 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -58,6 +58,9 @@ typedef struct {
gpr_uint8 sending_farewell;
/* have we sent farewell (goaway + disconnect) */
gpr_uint8 sent_farewell;
+
+ grpc_iomgr_closure finally_destroy_channel_closure;
+ grpc_iomgr_closure send_farewells_closure;
} lb_channel_data;
typedef struct { grpc_child_channel *channel; } lb_call_data;
@@ -213,12 +216,16 @@ static void maybe_destroy_channel(grpc_child_channel *channel) {
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
!chand->sending_farewell && !chand->calling_back) {
- grpc_iomgr_add_callback(finally_destroy_channel, channel);
+ chand->finally_destroy_channel_closure.cb = finally_destroy_channel;
+ chand->finally_destroy_channel_closure.cb_arg = channel;
+ grpc_iomgr_add_callback(&chand->finally_destroy_channel_closure);
} else if (chand->destroyed && !chand->disconnected &&
chand->active_calls == 0 && !chand->sending_farewell &&
!chand->sent_farewell) {
chand->sending_farewell = 1;
- grpc_iomgr_add_callback(send_farewells, channel);
+ chand->send_farewells_closure.cb = send_farewells;
+ chand->send_farewells_closure.cb_arg = channel;
+ grpc_iomgr_add_callback(&chand->send_farewells_closure);
}
}
diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h
index dffbd36d4c..25087be0c7 100644
--- a/src/core/iomgr/endpoint_pair.h
+++ b/src/core/iomgr/endpoint_pair.h
@@ -41,6 +41,7 @@ typedef struct {
grpc_endpoint *server;
} grpc_endpoint_pair;
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size);
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ size_t read_slice_size);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
index ac511b97b2..9b3b63f1e7 100644
--- a/src/core/iomgr/endpoint_pair_posix.c
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -44,6 +44,8 @@
#include <sys/socket.h>
#include "src/core/iomgr/tcp_posix.h"
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
static void create_sockets(int sv[2]) {
@@ -55,12 +57,21 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
+ size_t read_slice_size) {
int sv[2];
grpc_endpoint_pair p;
+ char *final_name;
create_sockets(sv);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1]), read_slice_size);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0]), read_slice_size);
+
+ gpr_asprintf(&final_name, "%s:client", name);
+ p.client =
+ grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size);
+ gpr_free(final_name);
+ gpr_asprintf(&final_name, "%s:server", name);
+ p.server =
+ grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size);
+ gpr_free(final_name);
return p;
}
diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c
index 988d622d01..c6790b2937 100644
--- a/src/core/iomgr/endpoint_pair_windows.c
+++ b/src/core/iomgr/endpoint_pair_windows.c
@@ -77,12 +77,12 @@ static void create_sockets(SOCKET sv[2]) {
sv[0] = svr_sock;
}
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read_slice_size) {
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
- p.client = grpc_tcp_create(grpc_winsocket_create(sv[1]));
- p.server = grpc_tcp_create(grpc_winsocket_create(sv[0]));
+ p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"));
+ p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"));
return p;
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b697fcc64a..28ed7708f7 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -41,7 +41,6 @@
#include <sys/socket.h>
#include <unistd.h>
-#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -91,6 +90,7 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
}
+
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
@@ -116,10 +116,9 @@ static void ref_by(grpc_fd *fd, int n) {
static void unref_by(grpc_fd *fd, int n) {
gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
- close(fd->fd);
- grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
+ grpc_iomgr_add_callback(&fd->on_done_closure);
freelist_fd(fd);
- grpc_iomgr_unref();
+ grpc_iomgr_unregister_object(&fd->iomgr_object);
} else {
GPR_ASSERT(old > n);
}
@@ -138,9 +137,9 @@ void grpc_fd_global_shutdown(void) {
static void do_nothing(void *ignored, int success) {}
-grpc_fd *grpc_fd_create(int fd) {
+grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
- grpc_iomgr_ref();
+ grpc_iomgr_register_object(&r->iomgr_object, name);
grpc_pollset_add_fd(grpc_backup_pollset(), r);
return r;
}
@@ -180,8 +179,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
- fd->on_done = on_done ? on_done : do_nothing;
- fd->on_done_user_data = user_data;
+ grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing,
+ user_data);
shutdown(fd->fd, SHUT_RDWR);
ref_by(fd, 1); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
@@ -195,21 +194,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
-static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
+static void process_callback(grpc_iomgr_closure *closure, int success,
int allow_synchronous_callback) {
if (allow_synchronous_callback) {
- cb(arg, success);
+ closure->cb(closure->cb_arg, success);
} else {
- grpc_iomgr_add_delayed_callback(cb, arg, success);
+ grpc_iomgr_add_delayed_callback(closure, success);
}
}
-static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
- int allow_synchronous_callback) {
+static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
+ int success, int allow_synchronous_callback) {
size_t i;
for (i = 0; i < n; i++) {
- make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
- allow_synchronous_callback);
+ process_callback(callbacks + i, success, allow_synchronous_callback);
}
}
@@ -234,10 +232,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the READY code below */
case READY:
- assert(gpr_atm_no_barrier_load(st) == READY);
+ GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
- make_callback(closure->cb, closure->cb_arg,
- !gpr_atm_acq_load(&fd->shutdown),
+ process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
allow_synchronous_callback);
return;
default: /* WAITING */
@@ -251,7 +248,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
abort();
}
-static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
+static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
size_t *ncallbacks) {
gpr_intptr state = gpr_atm_acq_load(st);
@@ -269,9 +266,9 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
Fall through to the WAITING code below */
state = gpr_atm_acq_load(st);
default: /* waiting */
- assert(gpr_atm_no_barrier_load(st) != READY &&
- gpr_atm_no_barrier_load(st) != NOT_READY);
- callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state;
+ GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
+ gpr_atm_no_barrier_load(st) != NOT_READY);
+ callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
@@ -282,25 +279,30 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
- grpc_iomgr_closure cb;
+ grpc_iomgr_closure* closure;
size_t ncb = 0;
+
gpr_mu_lock(&fd->set_state_mu);
- set_ready_locked(st, &cb, &ncb);
+ set_ready_locked(st, &closure, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
success = !gpr_atm_acq_load(&fd->shutdown);
- make_callbacks(&cb, ncb, success, allow_synchronous_callback);
+ GPR_ASSERT(ncb <= 1);
+ if (ncb > 0) {
+ process_callbacks(closure, ncb, success, allow_synchronous_callback);
+ }
}
void grpc_fd_shutdown(grpc_fd *fd) {
- grpc_iomgr_closure cb[2];
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(&fd->readst, cb, &ncb);
- set_ready_locked(&fd->writest, cb, &ncb);
+ set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb);
+ set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb);
gpr_mu_unlock(&fd->set_state_mu);
- make_callbacks(cb, ncb, 0, 0);
+ GPR_ASSERT(ncb <= 2);
+ process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */,
+ 0 /* GPR_FALSE */);
}
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index cfc533b7f5..0fa71850e3 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -34,17 +34,12 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_FD_POSIX_H
-#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-typedef struct {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
-} grpc_iomgr_closure;
-
typedef struct grpc_fd grpc_fd;
typedef struct grpc_fd_watcher {
@@ -96,15 +91,18 @@ struct grpc_fd {
gpr_atm readst;
gpr_atm writest;
- grpc_iomgr_cb_func on_done;
- void *on_done_user_data;
struct grpc_fd *freelist_next;
+
+ grpc_iomgr_closure on_done_closure;
+ grpc_iomgr_closure *shutdown_closures[2];
+
+ grpc_iomgr_object iomgr_object;
};
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
This takes ownership of closing fd. */
-grpc_fd *grpc_fd_create(int fd);
+grpc_fd *grpc_fd_create(int fd, const char *name);
/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index d22542fc91..249228a214 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -37,25 +37,19 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/alarm_internal.h"
+#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/sync.h>
-typedef struct delayed_callback {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
- int success;
- struct delayed_callback *next;
-} delayed_callback;
-
static gpr_mu g_mu;
static gpr_cv g_rcv;
-static delayed_callback *g_cbs_head = NULL;
-static delayed_callback *g_cbs_tail = NULL;
+static grpc_iomgr_closure *g_cbs_head = NULL;
+static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
-static int g_refs;
static gpr_event g_background_callback_executor_done;
+static grpc_iomgr_object g_root_object;
/* Execute followup callbacks continuously.
Other threads may check in and help during pollset_work() */
@@ -66,12 +60,11 @@ static void background_callback_executor(void *ignored) {
gpr_timespec short_deadline =
gpr_time_add(gpr_now(), gpr_time_from_millis(100));
if (g_cbs_head) {
- delayed_callback *cb = g_cbs_head;
- g_cbs_head = cb->next;
+ grpc_iomgr_closure *closure = g_cbs_head;
+ g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, cb->success);
- gpr_free(cb);
+ closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
} else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
} else {
@@ -96,34 +89,48 @@ void grpc_iomgr_init(void) {
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_alarm_list_init(gpr_now());
- g_refs = 0;
+ g_root_object.next = g_root_object.prev = &g_root_object;
+ g_root_object.name = "root";
grpc_iomgr_platform_init();
gpr_event_init(&g_background_callback_executor_done);
gpr_thd_new(&id, background_callback_executor, NULL, NULL);
}
+static size_t count_objects(void) {
+ grpc_iomgr_object *obj;
+ size_t n = 0;
+ for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
+ n++;
+ }
+ return n;
+}
+
void grpc_iomgr_shutdown(void) {
- delayed_callback *cb;
+ grpc_iomgr_object *obj;
+ grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
gpr_mu_lock(&g_mu);
g_shutdown = 1;
- while (g_cbs_head || g_refs) {
- gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
+ while (g_cbs_head || g_root_object.next != &g_root_object) {
+ size_t nobjs = count_objects();
+ gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", nobjs,
g_cbs_head ? " and executing final callbacks" : "");
- while (g_cbs_head) {
- cb = g_cbs_head;
- g_cbs_head = cb->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
-
- cb->cb(cb->cb_arg, 0);
- gpr_free(cb);
- gpr_mu_lock(&g_mu);
+ if (g_cbs_head) {
+ do {
+ closure = g_cbs_head;
+ g_cbs_head = closure->next;
+ if (!g_cbs_head) g_cbs_tail = NULL;
+ gpr_mu_unlock(&g_mu);
+
+ closure->cb(closure->cb_arg, 0);
+ gpr_mu_lock(&g_mu);
+ } while (g_cbs_head);
+ continue;
}
- if (g_refs) {
+ if (nobjs > 0) {
int timeout = 0;
gpr_timespec short_deadline = gpr_time_add(gpr_now(),
gpr_time_from_millis(100));
@@ -137,7 +144,10 @@ void grpc_iomgr_shutdown(void) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
- g_refs);
+ count_objects());
+ for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) {
+ gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s", obj->name);
+ }
break;
}
}
@@ -153,56 +163,66 @@ void grpc_iomgr_shutdown(void) {
gpr_cv_destroy(&g_rcv);
}
-void grpc_iomgr_ref(void) {
+void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) {
+ obj->name = gpr_strdup(name);
gpr_mu_lock(&g_mu);
- ++g_refs;
+ obj->next = &g_root_object;
+ obj->prev = obj->next->prev;
+ obj->next->prev = obj->prev->next = obj;
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_unref(void) {
+void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
+ gpr_free(obj->name);
gpr_mu_lock(&g_mu);
- if (0 == --g_refs) {
- gpr_cv_signal(&g_rcv);
- }
+ obj->next->prev = obj->prev;
+ obj->prev->next = obj->next;
+ gpr_cv_signal(&g_rcv);
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success) {
- delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
- dcb->cb = cb;
- dcb->cb_arg = cb_arg;
- dcb->success = success;
+
+void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg) {
+ closure->cb = cb;
+ closure->cb_arg = cb_arg;
+ closure->next = NULL;
+}
+
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
+ closure->success = success;
gpr_mu_lock(&g_mu);
- dcb->next = NULL;
+ closure->next = NULL;
if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = dcb;
+ g_cbs_head = g_cbs_tail = closure;
} else {
- g_cbs_tail->next = dcb;
- g_cbs_tail = dcb;
+ g_cbs_tail->next = closure;
+ g_cbs_tail = closure;
}
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
- grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);
+
+void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) {
+ grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */);
}
+
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
int n = 0;
gpr_mu *retake_mu = NULL;
- delayed_callback *cb;
+ grpc_iomgr_closure *closure;
for (;;) {
/* check for new work */
if (!gpr_mu_trylock(&g_mu)) {
break;
}
- cb = g_cbs_head;
- if (!cb) {
+ closure = g_cbs_head;
+ if (!closure) {
gpr_mu_unlock(&g_mu);
break;
}
- g_cbs_head = cb->next;
+ g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
/* if we have a mutex to drop, do so before executing work */
@@ -211,8 +231,7 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
retake_mu = drop_mu;
drop_mu = NULL;
}
- cb->cb(cb->cb_arg, success && cb->success);
- gpr_free(cb);
+ closure->cb(closure->cb_arg, success && closure->success);
n++;
}
if (retake_mu) {
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 1f5d23fdda..a10e481e48 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -34,14 +34,43 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_H
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_H
-/* gRPC Callback definition */
+/** gRPC Callback definition.
+ *
+ * \param arg Arbitrary input.
+ * \param success An indication on the state of the iomgr. On false, cleanup
+ * actions should be taken (eg, shutdown). */
typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
+/** A closure over a grpc_iomgr_cb_func. */
+typedef struct grpc_iomgr_closure {
+ /** Bound callback. */
+ grpc_iomgr_cb_func cb;
+
+ /** Arguments to be passed to "cb". */
+ void *cb_arg;
+
+ /** Internal. A boolean indication to "cb" on the state of the iomgr.
+ * For instance, closures created during a shutdown would have this field set
+ * to false. */
+ int success;
+
+ /**< Internal. Do not touch */
+ struct grpc_iomgr_closure *next;
+} grpc_iomgr_closure;
+
+/** Initializes \a closure with \a cb and \a cb_arg. */
+void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg);
+
+/** Initializes the iomgr. */
void grpc_iomgr_init(void);
+
+/** Signals the intention to shutdown the iomgr. */
void grpc_iomgr_shutdown(void);
-/* This function is called from within a callback or from anywhere else
- and causes the invocation of a callback at some point in the future */
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg);
+/** Registers a closure to be invoked at some point in the future.
+ *
+ * Can be called from within a callback or from anywhere else */
+void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h
index 07923258b9..6c1e0e1799 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_internal.h
@@ -35,15 +35,19 @@
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/sync.h>
+typedef struct grpc_iomgr_object {
+ char *name;
+ struct grpc_iomgr_object *next;
+ struct grpc_iomgr_object *prev;
+} grpc_iomgr_object;
+
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success);
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
-void grpc_iomgr_ref(void);
-void grpc_iomgr_unref(void);
+void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name);
+void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);
void grpc_iomgr_platform_init(void);
void grpc_iomgr_platform_shutdown(void);
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 6da7daabc8..d2f615271e 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -272,6 +272,7 @@ typedef struct grpc_unary_promote_args {
const grpc_pollset_vtable *original_vtable;
grpc_pollset *pollset;
grpc_fd *fd;
+ grpc_iomgr_closure promotion_closure;
} grpc_unary_promote_args;
static void unary_poll_do_promote(void *args, int success) {
@@ -294,7 +295,7 @@ static void unary_poll_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
while (pollset->counter != 0) {
grpc_pollset_kick(pollset);
- grpc_iomgr_add_callback(unary_poll_do_promote, up_args);
+ grpc_iomgr_add_callback(&up_args->promotion_closure);
gpr_mu_unlock(&pollset->mu);
return;
}
@@ -378,7 +379,9 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
up_args->pollset = pollset;
up_args->fd = fd;
up_args->original_vtable = pollset->vtable;
- grpc_iomgr_add_callback(unary_poll_do_promote, up_args);
+ up_args->promotion_closure.cb = unary_poll_do_promote;
+ up_args->promotion_closure.cb_arg = up_args;
+ grpc_iomgr_add_callback(&up_args->promotion_closure);
grpc_pollset_kick(pollset);
}
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 5af0685f9d..b1f4c09a2c 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -66,15 +66,15 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
gpr_timespec now;
now = gpr_now();
if (gpr_time_cmp(now, deadline) > 0) {
- return 0;
+ return 0 /* GPR_FALSE */;
}
- if (grpc_maybe_call_delayed_callbacks(NULL, 1)) {
- return 1;
+ if (grpc_maybe_call_delayed_callbacks(NULL, 1 /* GPR_TRUE */)) {
+ return 1 /* GPR_TRUE */;
}
if (grpc_alarm_check(NULL, now, &deadline)) {
- return 1;
+ return 1 /* GPR_TRUE */;
}
- return 0;
+ return 0 /* GPR_FALSE */;
}
void grpc_pollset_kick(grpc_pollset *p) { }
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index 43fd704a6d..fcf48fe0d7 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -55,6 +55,7 @@ typedef struct {
char *default_port;
grpc_resolve_cb cb;
void *arg;
+ grpc_iomgr_object iomgr_object;
} request;
grpc_resolved_addresses *grpc_blocking_resolve_address(
@@ -153,9 +154,9 @@ static void do_request(void *rp) {
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
+ grpc_iomgr_unregister_object(&r->iomgr_object);
gpr_free(r);
cb(arg, resolved);
- grpc_iomgr_unref();
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -166,14 +167,17 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
- /*gpr_thd_id id;*/
- grpc_iomgr_ref();
+ gpr_thd_id id;
+ char *tmp;
+ gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name,
+ default_port);
+ grpc_iomgr_register_object(&r->iomgr_object, tmp);
+ gpr_free(tmp);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
r->arg = arg;
- /*gpr_thd_new(&id, do_request, r, NULL);*/
- do_request(r);
+ gpr_thd_new(&id, do_request, r, NULL);
}
#endif
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index 9b416dfe8a..7d0d2f9e7a 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -54,6 +54,7 @@ typedef struct {
char *default_port;
grpc_resolve_cb cb;
void *arg;
+ grpc_iomgr_object iomgr_object;
} request;
grpc_resolved_addresses *grpc_blocking_resolve_address(
@@ -135,7 +136,7 @@ static void do_request(void *rp) {
gpr_free(r->default_port);
gpr_free(r);
cb(arg, resolved);
- grpc_iomgr_unref();
+ grpc_iomgr_unregister_object(&r->iomgr_object);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -147,7 +148,10 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
- grpc_iomgr_ref();
+ const char *label;
+ gpr_asprintf(&label, "resolve:%s", name);
+ grpc_iomgr_register_object(&r->iomgr_object, label);
+ gpr_free(label);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index ee5150a696..e4ba0a2b66 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -39,18 +39,17 @@
#include <grpc/support/log.h>
#include "src/core/iomgr/iocp_windows.h"
-#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
#include "src/core/iomgr/socket_windows.h"
-grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
+grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) {
grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket));
memset(r, 0, sizeof(grpc_winsocket));
r->socket = socket;
gpr_mu_init(&r->state_mu);
- grpc_iomgr_ref();
+ grpc_iomgr_register_object(&r->iomgr_object, name);
grpc_iocp_add_socket(r);
return r;
}
@@ -64,13 +63,15 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) {
gpr_mu_lock(&socket->state_mu);
if (socket->read_info.cb) {
callbacks_set++;
- grpc_iomgr_add_delayed_callback(socket->read_info.cb,
- socket->read_info.opaque, 0);
+ grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb,
+ socket->read_info.opaque);
+ grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
}
if (socket->write_info.cb) {
callbacks_set++;
- grpc_iomgr_add_delayed_callback(socket->write_info.cb,
- socket->write_info.opaque, 0);
+ grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb,
+ socket->write_info.opaque);
+ grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
}
gpr_mu_unlock(&socket->state_mu);
return callbacks_set;
@@ -90,7 +91,7 @@ void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
- grpc_iomgr_unref();
+ grpc_iomgr_unregister_object(&winsocket->iomgr_object);
}
void grpc_winsocket_destroy(grpc_winsocket *winsocket) {
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index b27eb14219..7080919af0 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -39,6 +39,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/atm.h>
+#include "src/core/iomgr/iomgr_internal.h"
+
/* This holds the data for an outstanding read or write on a socket.
The mutex to protect the concurrent access to that data is the one
inside the winsocket wrapper. */
@@ -93,11 +95,16 @@ typedef struct grpc_winsocket {
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan;
+
+ grpc_iomgr_closure shutdown_closure;
+
+ /* A label for iomgr to track outstanding objects */
+ grpc_iomgr_object iomgr_object;
} grpc_winsocket;
/* Create a wrapped windows handle. This takes ownership of it, meaning that
it will be responsible for closing it. */
-grpc_winsocket *grpc_winsocket_create(SOCKET socket);
+grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name);
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. Returns the number of callbacks that got setup. */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 2401fe00e4..aa21ba9b9e 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -48,6 +48,7 @@
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
+#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -185,6 +186,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
async_connect *ac;
struct sockaddr_in6 addr6_v4mapped;
struct sockaddr_in addr4_copy;
+ char *name;
+ char *addr_str;
/* Use dualstack sockets where available. */
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
@@ -211,24 +214,27 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
err = connect(fd, addr, addr_len);
} while (err < 0 && errno == EINTR);
+ grpc_sockaddr_to_string(&addr_str, addr, 1);
+ gpr_asprintf(&name, "tcp-client:%s", addr_str);
+
if (err >= 0) {
gpr_log(GPR_DEBUG, "instant connect");
- cb(arg,
- grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
- return;
+ cb(arg, grpc_tcp_create(grpc_fd_create(fd, name),
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
- gpr_log(GPR_ERROR, "connect error: %s", strerror(errno));
+ gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
close(fd);
cb(arg, NULL);
- return;
+ goto done;
}
ac = gpr_malloc(sizeof(async_connect));
ac->cb = cb;
ac->cb_arg = arg;
- ac->fd = grpc_fd_create(fd);
+ ac->fd = grpc_fd_create(fd, name);
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->write_closure.cb = on_writable;
@@ -236,6 +242,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
+
+done:
+ gpr_free(name);
+ gpr_free(addr_str);
}
#endif
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index cf5174327d..2a040ffc4a 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -193,7 +193,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
goto failure;
}
- socket = grpc_winsocket_create(sock);
+ socket = grpc_winsocket_create(sock, "client");
info = &socket->write_info;
info->outstanding = 1;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index cd6b2ecae6..2f19f9d442 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -280,6 +280,8 @@ typedef struct {
grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
+
+ grpc_iomgr_closure handle_read_closure;
} grpc_tcp;
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
@@ -443,7 +445,8 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
- grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp);
+ tcp->handle_read_closure.cb_arg = tcp;
+ grpc_iomgr_add_callback(&tcp->handle_read_closure);
}
}
@@ -592,6 +595,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = grpc_tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
+
+ tcp->handle_read_closure.cb = grpc_tcp_handle_read;
return &tcp->base;
}
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index d1cd8a769c..3cd40faafc 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -60,6 +60,7 @@
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
+#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
@@ -281,6 +282,8 @@ static void on_read(void *arg, int success) {
for (;;) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
+ char *addr_str;
+ char *name;
/* Note: If we ever decide to return this address to the user, remember to
strip off the ::ffff:0.0.0.0/96 prefix first. */
int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1);
@@ -299,9 +302,15 @@ static void on_read(void *arg, int success) {
grpc_set_socket_no_sigpipe_if_possible(fd);
- sp->server->cb(
- sp->server->cb_arg,
- grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
+
+ sp->server->cb(sp->server->cb_arg,
+ grpc_tcp_create(grpc_fd_create(fd, name),
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+
+ gpr_free(addr_str);
+ gpr_free(name);
}
abort();
@@ -318,9 +327,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr, int addr_len) {
server_port *sp;
int port;
+ char *addr_str;
+ char *name;
port = prepare_socket(fd, addr, addr_len);
if (port >= 0) {
+ grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
+ gpr_asprintf(&name, "tcp-server-listener:%s", addr_str);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->cb && "must add ports before starting server");
/* append it to the list under a lock */
@@ -331,11 +344,13 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd);
+ sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
+ gpr_free(addr_str);
+ gpr_free(name);
}
return port;
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index d22acc7453..9ef369dfd8 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -270,7 +270,8 @@ static void on_accept(void *arg, int from_iocp) {
gpr_free(utf8_message);
closesocket(sock);
} else {
- ep = grpc_tcp_create(grpc_winsocket_create(sock));
+ /* TODO(ctiller): add sockaddr address to label */
+ ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
}
} else {
/* If we're not notified from the IOCP, it means we are asked to shutdown.
@@ -336,7 +337,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
}
sp = &s->ports[s->nports++];
sp->server = s;
- sp->socket = grpc_winsocket_create(sock);
+ sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index ae22bf47a0..9bf5c32e74 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -54,6 +54,7 @@
typedef struct {
grpc_credentials *creds;
grpc_credentials_metadata_cb cb;
+ grpc_iomgr_closure *on_simulated_token_fetch_done_closure;
void *user_data;
} grpc_credentials_metadata_request;
@@ -65,6 +66,8 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds,
gpr_malloc(sizeof(grpc_credentials_metadata_request));
r->creds = grpc_credentials_ref(creds);
r->cb = cb;
+ r->on_simulated_token_fetch_done_closure =
+ gpr_malloc(sizeof(grpc_iomgr_closure));
r->user_data = user_data;
return r;
}
@@ -72,6 +75,7 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds,
static void grpc_credentials_metadata_request_destroy(
grpc_credentials_metadata_request *r) {
grpc_credentials_unref(r->creds);
+ gpr_free(r->on_simulated_token_fetch_done_closure);
gpr_free(r);
}
@@ -831,9 +835,11 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds,
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds;
if (c->is_async) {
- grpc_iomgr_add_callback(
- on_simulated_token_fetch_done,
- grpc_credentials_metadata_request_create(creds, cb, user_data));
+ grpc_credentials_metadata_request *cb_arg =
+ grpc_credentials_metadata_request_create(creds, cb, user_data);
+ grpc_iomgr_closure_init(cb_arg->on_simulated_token_fetch_done_closure,
+ on_simulated_token_fetch_done, cb_arg);
+ grpc_iomgr_add_callback(cb_arg->on_simulated_token_fetch_done_closure);
} else {
cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index e3995a407b..88ff5cfbce 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -31,6 +31,7 @@
*
*/
+#include "src/core/census/grpc_context.h"
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
@@ -226,6 +227,7 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ grpc_iomgr_closure destroy_closure;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -242,9 +244,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
static void execute_op(grpc_call *call, grpc_transport_op *op);
static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
-static grpc_call_error cancel_with_status(
- grpc_call *c, grpc_status_code status, const char *description,
- gpr_uint8 locked);
+static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
+ const char *description,
+ gpr_uint8 locked);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -268,6 +270,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
if (call->is_client) {
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
+ call->context[GRPC_CONTEXT_TRACING].value = grpc_census_context_create();
+ call->context[GRPC_CONTEXT_TRACING].destroy = grpc_census_context_destroy;
}
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
for (i = 0; i < add_initial_metadata_count; i++) {
@@ -367,7 +371,9 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
if (allow_immediate_deletion) {
destroy_call(c, 1);
} else {
- grpc_iomgr_add_callback(destroy_call, c);
+ c->destroy_closure.cb = destroy_call;
+ c->destroy_closure.cb_arg = c;
+ grpc_iomgr_add_callback(&c->destroy_closure);
}
}
}
@@ -403,7 +409,8 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
- (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
+ (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
+ grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
@@ -556,13 +563,13 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
break;
case GRPC_IOREQ_RECV_INITIAL_METADATA:
GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0],
- *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
- .recv_metadata);
+ *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
+ .recv_metadata);
break;
case GRPC_IOREQ_RECV_TRAILING_METADATA:
GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1],
- *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
- .recv_metadata);
+ *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
+ .recv_metadata);
break;
case GRPC_IOREQ_OP_COUNT:
abort();
@@ -676,9 +683,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
}
/* we have to be reading a message to know what to do here */
if (!call->reading_message) {
- cancel_with_status(
- call, GRPC_STATUS_INVALID_ARGUMENT,
- "Received payload data while not reading a message", 1);
+ cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
+ "Received payload data while not reading a message", 1);
return 0;
}
/* append the slice to the incoming buffer */
@@ -1025,9 +1031,9 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return cancel_with_status(c, status, description, 0);
}
-static grpc_call_error cancel_with_status(
- grpc_call *c, grpc_status_code status, const char *description,
- gpr_uint8 locked) {
+static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
+ const char *description,
+ gpr_uint8 locked) {
grpc_transport_op op;
grpc_mdstr *details =
description ? grpc_mdstr_from_string(c->metadata_context, description)
@@ -1294,12 +1300,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_cq_begin_op(call->cq, call);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func,
- tag);
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}
-void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *value,
- void (*destroy)(void *value)) {
+void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
+ void *value, void (*destroy)(void *value)) {
if (call->context[elem].destroy) {
call->context[elem].destroy(call->context[elem].value);
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index be9da2b7f9..947011c613 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -61,6 +61,7 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
+ grpc_iomgr_closure destroy_closure;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -193,7 +194,9 @@ static void destroy_channel(void *p, int ok) {
void grpc_channel_internal_unref(grpc_channel *channel) {
if (gpr_unref(&channel->refs)) {
- grpc_iomgr_add_callback(destroy_channel, channel);
+ channel->destroy_closure.cb = destroy_channel;
+ channel->destroy_closure.cb_arg = channel;
+ grpc_iomgr_add_callback(&channel->destroy_closure);
}
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index daa8d3a7c6..9fa6696bf6 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -195,9 +195,10 @@ grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_filter *filters[MAX_FILTERS];
int n = 0;
filters[n++] = &grpc_client_surface_filter;
+ /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
- }
+ } */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index d6eb9b2c24..ac6871c6f2 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -31,11 +31,11 @@
*
*/
+#include <grpc/census.h>
#include <grpc/grpc.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/debug/trace.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/statistics/census_interface.h"
#include "src/core/profiling/timers.h"
#include "src/core/surface/call.h"
#include "src/core/surface/init.h"
@@ -64,7 +64,9 @@ void grpc_init(void) {
grpc_security_pre_init();
grpc_iomgr_init();
grpc_tracer_init("GRPC_TRACE");
- census_init();
+ if (census_initialize(CENSUS_NONE)) {
+ gpr_log(GPR_ERROR, "Could not initialize census.");
+ }
grpc_timers_global_init();
}
gpr_mu_unlock(&g_init_mu);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index a71d12291e..8ef121dc48 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -234,9 +234,10 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg);
filters[n++] = &grpc_client_surface_filter;
+ /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
- }
+ } */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 71b1dd0630..f4aa6d5b2a 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -122,6 +122,8 @@ struct channel_data {
channel_registered_method *registered_methods;
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
+ grpc_iomgr_closure finish_shutdown_channel_closure;
+ grpc_iomgr_closure finish_destroy_channel_closure;
};
typedef struct shutdown_tag {
@@ -183,6 +185,8 @@ struct call_data {
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
+ grpc_iomgr_closure kill_zombie_closure;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -312,7 +316,9 @@ static void destroy_channel(channel_data *chand) {
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
server_ref(chand->server);
- grpc_iomgr_add_callback(finish_destroy_channel, chand);
+ chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
+ chand->finish_destroy_channel_closure.cb_arg = chand;
+ grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
}
static void finish_start_new_rpc_and_unlock(grpc_server *server,
@@ -444,7 +450,8 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -452,11 +459,14 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+
}
if (call_list_remove(calld, ALL_CALLS)) {
maybe_finish_shutdown(chand->server);
@@ -533,7 +543,9 @@ static void finish_shutdown_channel(void *cd, int success) {
static void shutdown_channel(channel_data *chand) {
grpc_channel_internal_ref(chand->channel);
- grpc_iomgr_add_callback(finish_shutdown_channel, chand);
+ chand->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
+ chand->finish_shutdown_channel_closure.cb_arg = chand;
+ grpc_iomgr_add_callback(&chand->finish_shutdown_channel_closure);
}
static void init_call_elem(grpc_call_element *elem,
@@ -621,9 +633,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "server",
+ server_start_transport_op,
+ channel_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "server",
};
void grpc_server_register_completion_queue(grpc_server *server,
@@ -643,7 +661,9 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args) {
size_t i;
- int census_enabled = grpc_channel_args_is_census_enabled(args);
+ /* TODO(census): restore this once we finalize census filter etc.
+ int census_enabled = grpc_channel_args_is_census_enabled(args); */
+ int census_enabled = 0;
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -668,9 +688,10 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
server->channel_filters =
gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
server->channel_filters[0] = &server_surface_filter;
+ /* TODO(census): restore this once we rework census filter
if (census_enabled) {
server->channel_filters[1] = &grpc_server_census_filter;
- }
+ } */
for (i = 0; i < filter_count; i++) {
server->channel_filters[i + 1 + census_enabled] = filters[i];
}
@@ -969,9 +990,10 @@ void grpc_server_destroy(grpc_server *server) {
while ((calld = call_list_remove_head(&server->lists[PENDING_START],
PENDING_START)) != NULL) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(
- kill_zombie,
+ grpc_iomgr_closure_init(
+ &calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
for (c = server->root_channel_data.next; c != &server->root_channel_data;
diff --git a/src/core/transport/chttp2/gen_hpack_tables.c b/src/core/transport/chttp2/gen_hpack_tables.c
index 86b593129b..bdaa3cf094 100644
--- a/src/core/transport/chttp2/gen_hpack_tables.c
+++ b/src/core/transport/chttp2/gen_hpack_tables.c
@@ -219,10 +219,10 @@ static int state_index(int bitofs, symset syms, int *isnew) {
emit - the symbol to emit on this nibble (or -1 if no symbol has been
found)
syms - the set of symbols that could be matched */
-static void build_dec_tbl(int state, int nibble, int nibbits, int bitofs,
+static void build_dec_tbl(int state, int nibble, int nibbits, unsigned bitofs,
int emit, symset syms) {
int i;
- int bit;
+ unsigned bit;
/* If we have four bits in the nibble we're looking at, then we can fill in
a slot in the lookup tables. */
@@ -338,7 +338,7 @@ static void generate_base64_inverse_table(void) {
static const char alphabet[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
unsigned char inverse[256];
- int i;
+ unsigned i;
memset(inverse, 255, sizeof(inverse));
for (i = 0; i < strlen(alphabet); i++) {