aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-18 13:56:51 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-18 13:56:51 -0700
commit50448beab3d24bf4e097967d53aed27b90fc4858 (patch)
treee79f698063749e881b8a1eae61af8a190f6d37bb /src/core
parent1c3fd1a4bf68417b0118e369e78c4db267f3191e (diff)
parent63f31e8cd16b185fd735bd753afbc0375cd573aa (diff)
Merge github.com:grpc/grpc into grpc_millis
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/channel_connectivity.c10
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c5
-rw-r--r--src/core/ext/filters/client_channel/client_channel_factory.c7
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c184
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c6
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c8
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c3
-rw-r--r--src/core/ext/filters/client_channel/lb_policy_factory.c9
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c2
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c37
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c7
-rw-r--r--src/core/ext/filters/client_channel/retry_throttle.c9
-rw-r--r--src/core/ext/filters/client_channel/subchannel.c2
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.c34
-rw-r--r--src/core/ext/filters/client_channel/subchannel_index.h7
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.c6
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.c2
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c2
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c18
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c239
-rw-r--r--src/core/ext/transport/chttp2/transport/flow_control.c16
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.c17
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c10
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.c5
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h37
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_map.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c5
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.c101
-rw-r--r--src/core/lib/debug/stats_data.c51
-rw-r--r--src/core/lib/debug/stats_data.h123
-rw-r--r--src/core/lib/debug/stats_data.yaml50
-rw-r--r--src/core/lib/debug/stats_data_bq_schema.sql25
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c54
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c202
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.c14
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h4
-rw-r--r--src/core/lib/security/transport/security_handshaker.c2
-rw-r--r--src/core/lib/surface/call.c64
-rw-r--r--src/core/lib/surface/call.h2
-rw-r--r--src/core/lib/surface/channel.c8
-rw-r--r--src/core/lib/surface/completion_queue.c33
-rw-r--r--src/core/lib/surface/init.c9
-rw-r--r--src/core/lib/surface/server.c35
-rw-r--r--src/core/lib/transport/metadata_batch.c32
-rw-r--r--src/core/lib/transport/transport.c5
-rw-r--r--src/core/tsi/fake_transport_security.c3
-rw-r--r--src/core/tsi/transport_security.h10
-rw-r--r--src/core/tsi/transport_security_grpc.c8
-rw-r--r--src/core/tsi/transport_security_grpc.h3
53 files changed, 1020 insertions, 526 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c
index 416bdefa08..f79d3bdb56 100644
--- a/src/core/ext/filters/client_channel/channel_connectivity.c
+++ b/src/core/ext/filters/client_channel/channel_connectivity.c
@@ -86,7 +86,7 @@ static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
grpc_cq_completion *ignored) {
- int delete = 0;
+ bool should_delete = false;
state_watcher *w = (state_watcher *)pw;
gpr_mu_lock(&w->mu);
switch (w->phase) {
@@ -94,12 +94,12 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return );
case CALLING_BACK_AND_FINISHED:
- delete = 1;
+ should_delete = true;
break;
}
gpr_mu_unlock(&w->mu);
- if (delete) {
+ if (should_delete) {
delete_state_watcher(exec_ctx, w);
}
}
@@ -161,12 +161,12 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) {
- partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error));
+ partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error));
}
static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) {
- partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error));
+ partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error));
}
int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) {
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 93ba8684ed..c1daaf28c9 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -85,7 +85,7 @@ static void method_parameters_unref(method_parameters *method_params) {
}
static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
- method_parameters_unref(value);
+ method_parameters_unref((method_parameters *)value);
}
static bool parse_wait_for_ready(grpc_json *field,
@@ -719,7 +719,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"client channel factory arg must be a pointer");
}
- grpc_client_channel_factory_ref(arg->value.pointer.p);
+ grpc_client_channel_factory_ref(
+ (grpc_client_channel_factory *)arg->value.pointer.p);
chand->client_channel_factory =
(grpc_client_channel_factory *)arg->value.pointer.p;
// Get server name to resolve, using proxy mapper if needed.
diff --git a/src/core/ext/filters/client_channel/client_channel_factory.c b/src/core/ext/filters/client_channel/client_channel_factory.c
index 7220a8639e..e8aa4cda29 100644
--- a/src/core/ext/filters/client_channel/client_channel_factory.c
+++ b/src/core/ext/filters/client_channel/client_channel_factory.c
@@ -43,14 +43,13 @@ grpc_channel* grpc_client_channel_factory_create_channel(
}
static void* factory_arg_copy(void* factory) {
- grpc_client_channel_factory_ref(factory);
+ grpc_client_channel_factory_ref((grpc_client_channel_factory*)factory);
return factory;
}
static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) {
- // TODO(roth): Remove local exec_ctx when
- // https://github.com/grpc/grpc/pull/8705 is merged.
- grpc_client_channel_factory_unref(exec_ctx, factory);
+ grpc_client_channel_factory_unref(exec_ctx,
+ (grpc_client_channel_factory*)factory);
}
static int factory_arg_cmp(void* factory1, void* factory2) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index bd290464c8..7ad322902b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -75,7 +75,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->context != NULL);
GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL);
calld->client_stats = grpc_grpclb_client_stats_ref(
- args->context[GRPC_GRPCLB_CLIENT_STATS].value);
+ (grpc_grpclb_client_stats *)args->context[GRPC_GRPCLB_CLIENT_STATS]
+ .value);
// Record call started.
grpc_grpclb_client_stats_add_call_started(calld->client_stats);
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 137642613c..cd2bd02bd2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -101,6 +101,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -137,7 +138,7 @@ static grpc_error *initial_metadata_add_lb_token(
}
static void destroy_client_stats(void *arg) {
- grpc_grpclb_client_stats_unref(arg);
+ grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg);
}
typedef struct wrapped_rr_closure_arg {
@@ -285,7 +286,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
* glb_lb_policy
*/
typedef struct rr_connectivity_data rr_connectivity_data;
-static const grpc_lb_policy_vtable glb_lb_policy_vtable;
+
typedef struct glb_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
@@ -727,7 +728,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
/* Allocate the data for the tracking of the new RR policy's connectivity.
* It'll be deallocated in glb_rr_connectivity_changed() */
rr_connectivity_data *rr_connectivity =
- gpr_zalloc(sizeof(rr_connectivity_data));
+ (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data));
GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
glb_rr_connectivity_changed_locked, rr_connectivity,
grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -869,7 +870,8 @@ static grpc_channel_args *build_lb_channel_args(
grpc_lb_addresses *lb_addresses =
grpc_lb_addresses_create(num_grpclb_addrs, NULL);
grpc_slice_hash_table_entry *targets_info_entries =
- gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
+ (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) *
+ num_grpclb_addrs);
size_t lb_addresses_idx = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@@ -911,92 +913,6 @@ static grpc_channel_args *build_lb_channel_args(
return result;
}
-static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error);
-static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy_factory *factory,
- grpc_lb_policy_args *args) {
- /* Count the number of gRPC-LB addresses. There must be at least one.
- * TODO(roth): For now, we ignore non-balancer addresses, but in the
- * future, we may change the behavior such that we fall back to using
- * the non-balancer addresses if we cannot reach any balancers. In the
- * fallback case, we should use the LB policy indicated by
- * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
- * unset, we should default to pick_first). */
- const grpc_arg *arg =
- grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
- if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
- return NULL;
- }
- grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
- size_t num_grpclb_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; ++i) {
- if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
- }
- if (num_grpclb_addrs == 0) return NULL;
-
- glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
-
- /* Get server name. */
- arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(arg != NULL);
- GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
- GPR_ASSERT(uri->path[0] != '\0');
- glb_policy->server_name =
- gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
- if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
- gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
- glb_policy->server_name);
- }
- grpc_uri_destroy(uri);
-
- glb_policy->cc_factory = args->client_channel_factory;
- GPR_ASSERT(glb_policy->cc_factory != NULL);
-
- arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
- glb_policy->lb_call_timeout_ms =
- grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
-
- // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
- // since we use this to trigger the client_load_reporting filter.
- grpc_arg new_arg =
- grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
- static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
- glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
- args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
-
- /* Create a client channel over them to communicate with a LB service */
- glb_policy->response_generator =
- grpc_fake_resolver_response_generator_create();
- grpc_channel_args *lb_channel_args = build_lb_channel_args(
- exec_ctx, addresses, glb_policy->response_generator, args->args);
- char *uri_str;
- gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
- glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
- exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
-
- /* Propagate initial resolution */
- grpc_fake_resolver_response_generator_set_response(
- exec_ctx, glb_policy->response_generator, lb_channel_args);
- grpc_channel_args_destroy(exec_ctx, lb_channel_args);
- gpr_free(uri_str);
- if (glb_policy->lb_channel == NULL) {
- gpr_free((void *)glb_policy->server_name);
- grpc_channel_args_destroy(exec_ctx, glb_policy->args);
- gpr_free(glb_policy);
- return NULL;
- }
- GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
- glb_lb_channel_on_connectivity_changed_cb, glb_policy,
- grpc_combiner_scheduler(args->combiner));
- grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
- grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
- "grpclb");
- return &glb_policy->base;
-}
-
static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
GPR_ASSERT(glb_policy->pending_picks == NULL);
@@ -1011,6 +927,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
+ grpc_subchannel_index_unref();
if (glb_policy->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
gpr_free(glb_policy->pending_update_args);
@@ -1302,7 +1219,8 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
grpc_grpclb_dropped_call_counts *drop_entries =
- request->client_stats.calls_finished_with_drop.arg;
+ (grpc_grpclb_dropped_call_counts *)
+ request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
@@ -1859,6 +1777,90 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_notify_on_state_change_locked,
glb_update_locked};
+static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args) {
+ /* Count the number of gRPC-LB addresses. There must be at least one.
+ * TODO(roth): For now, we ignore non-balancer addresses, but in the
+ * future, we may change the behavior such that we fall back to using
+ * the non-balancer addresses if we cannot reach any balancers. In the
+ * fallback case, we should use the LB policy indicated by
+ * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
+ * unset, we should default to pick_first). */
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+ return NULL;
+ }
+ grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ if (num_grpclb_addrs == 0) return NULL;
+
+ glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
+
+ /* Get server name. */
+ arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(arg != NULL);
+ GPR_ASSERT(arg->type == GRPC_ARG_STRING);
+ grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
+ GPR_ASSERT(uri->path[0] != '\0');
+ glb_policy->server_name =
+ gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
+ if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+ gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
+ glb_policy->server_name);
+ }
+ grpc_uri_destroy(uri);
+
+ glb_policy->cc_factory = args->client_channel_factory;
+ GPR_ASSERT(glb_policy->cc_factory != NULL);
+
+ arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
+ glb_policy->lb_call_timeout_ms =
+ grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
+
+ // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
+ // since we use this to trigger the client_load_reporting filter.
+ grpc_arg new_arg =
+ grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
+ static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
+ glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
+ args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
+
+ /* Create a client channel over them to communicate with a LB service */
+ glb_policy->response_generator =
+ grpc_fake_resolver_response_generator_create();
+ grpc_channel_args *lb_channel_args = build_lb_channel_args(
+ exec_ctx, addresses, glb_policy->response_generator, args->args);
+ char *uri_str;
+ gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
+ glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
+ exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
+
+ /* Propagate initial resolution */
+ grpc_fake_resolver_response_generator_set_response(
+ exec_ctx, glb_policy->response_generator, lb_channel_args);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ gpr_free(uri_str);
+ if (glb_policy->lb_channel == NULL) {
+ gpr_free((void *)glb_policy->server_name);
+ grpc_channel_args_destroy(exec_ctx, glb_policy->args);
+ gpr_free(glb_policy);
+ return NULL;
+ }
+ grpc_subchannel_index_ref();
+ GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
+ glb_lb_channel_on_connectivity_changed_cb, glb_policy,
+ grpc_combiner_scheduler(args->combiner));
+ grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
+ grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
+ "grpclb");
+ return &glb_policy->base;
+}
+
static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index 393f6256bf..9c6fb94bf1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -148,7 +148,8 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
if (request->has_client_stats) {
grpc_grpclb_dropped_call_counts *drop_entries =
- request->client_stats.calls_finished_with_drop.arg;
+ (grpc_grpclb_dropped_call_counts *)
+ request->client_stats.calls_finished_with_drop.arg;
grpc_grpclb_dropped_call_counts_destroy(drop_entries);
}
gpr_free(request);
@@ -170,7 +171,8 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
if (!res.has_initial_response) return NULL;
grpc_grpclb_initial_response *initial_res =
- gpr_malloc(sizeof(grpc_grpclb_initial_response));
+ (grpc_grpclb_initial_response *)gpr_malloc(
+ sizeof(grpc_grpclb_initial_response));
memcpy(initial_res, &res.initial_response,
sizeof(grpc_grpclb_initial_response));
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index fab3073eb9..d20cbb8388 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -89,6 +89,7 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
"picked_first_destroy");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_subchannel_index_unref();
if (p->pending_update_args != NULL) {
grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
gpr_free(p->pending_update_args);
@@ -330,8 +331,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
(void *)p, (unsigned long)addresses->num_addresses);
}
- grpc_subchannel_args *sc_args =
- gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses);
+ grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc(
+ sizeof(*sc_args) * addresses->num_addresses);
/* We remove the following keys in order for subchannel keys belonging to
* subchannels point to the same address to match. */
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
@@ -403,7 +404,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
/* Create the subchannels for the new subchannel args/addresses. */
grpc_subchannel **new_subchannels =
- gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
+ (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
size_t num_new_subchannels = 0;
for (size_t i = 0; i < sc_args_count; i++) {
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
@@ -686,6 +687,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
pf_update_locked(exec_ctx, &p->base, args);
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
+ grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p,
grpc_combiner_scheduler(args->combiner));
return &p->base;
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index be91d3d651..8ac1a46abd 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -30,6 +30,7 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/ext/filters/client_channel/subchannel_index.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -310,6 +311,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
(void *)pol, (void *)pol);
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
+ grpc_subchannel_index_unref();
gpr_free(p);
}
@@ -890,6 +892,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->client_channel_factory != NULL);
round_robin_lb_policy *p = (round_robin_lb_policy *)gpr_zalloc(sizeof(*p));
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner);
+ grpc_subchannel_index_ref();
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
rr_update_locked(exec_ctx, &p->base, args);
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c
index cdcaf17544..acf5929746 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.c
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.c
@@ -126,13 +126,14 @@ void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx,
}
static void* lb_addresses_copy(void* addresses) {
- return grpc_lb_addresses_copy(addresses);
+ return grpc_lb_addresses_copy((grpc_lb_addresses*)addresses);
}
static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) {
- grpc_lb_addresses_destroy(exec_ctx, addresses);
+ grpc_lb_addresses_destroy(exec_ctx, (grpc_lb_addresses*)addresses);
}
static int lb_addresses_cmp(void* addresses1, void* addresses2) {
- return grpc_lb_addresses_cmp(addresses1, addresses2);
+ return grpc_lb_addresses_cmp((grpc_lb_addresses*)addresses1,
+ (grpc_lb_addresses*)addresses2);
}
static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = {
lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp};
@@ -149,7 +150,7 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES);
if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER)
return NULL;
- return lb_addresses_arg->value.pointer.p;
+ return (grpc_lb_addresses*)lb_addresses_arg->value.pointer.p;
}
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
index c32b5c5a75..075c275e42 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c
@@ -204,7 +204,7 @@ static char *choose_service_config(char *service_config_choice_json) {
int random_pct = rand() % 100;
int percentage;
if (sscanf(field->value, "%d", &percentage) != 1 ||
- random_pct > percentage) {
+ random_pct > percentage || percentage == 0) {
service_config_json = NULL;
break;
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
index 9747d39a16..7f1f57259a 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
@@ -38,7 +38,7 @@ typedef struct fd_node {
/** the owner of this fd node */
grpc_ares_ev_driver *ev_driver;
/** the grpc_fd owned by this fd node */
- grpc_fd *grpc_fd;
+ grpc_fd *fd;
/** a closure wrapping on_readable_cb, which should be invoked when the
grpc_fd in this node becomes readable. */
grpc_closure read_closure;
@@ -96,15 +96,15 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) {
}
static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) {
- gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered);
gpr_mu_destroy(&fdn->mu);
- grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd);
+ grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->fd);
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */
- grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */,
+ grpc_fd_orphan(exec_ctx, fdn->fd, NULL, NULL, true /* already_closed */,
"c-ares query finished");
gpr_free(fdn);
}
@@ -150,9 +150,8 @@ void grpc_ares_ev_driver_shutdown(grpc_exec_ctx *exec_ctx,
ev_driver->shutting_down = true;
fd_node *fn = ev_driver->fds;
while (fn != NULL) {
- grpc_fd_shutdown(
- exec_ctx, fn->grpc_fd,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_ares_ev_driver_shutdown"));
+ grpc_fd_shutdown(exec_ctx, fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "grpc_ares_ev_driver_shutdown"));
fn = fn->next;
}
gpr_mu_unlock(&ev_driver->mu);
@@ -165,7 +164,7 @@ static fd_node *pop_fd_node(fd_node **head, int fd) {
dummy_head.next = *head;
fd_node *node = &dummy_head;
while (node->next != NULL) {
- if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) {
+ if (grpc_fd_wrapped_fd(node->next->fd) == fd) {
fd_node *ret = node->next;
node->next = node->next->next;
*head = dummy_head.next;
@@ -184,9 +183,9 @@ static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg,
fdn->readable_registered = false;
gpr_mu_unlock(&fdn->mu);
- gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->fd));
if (error == GRPC_ERROR_NONE) {
- ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd),
+ ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->fd),
ARES_SOCKET_BAD);
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
@@ -211,10 +210,10 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg,
fdn->writable_registered = false;
gpr_mu_unlock(&fdn->mu);
- gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd));
+ gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->fd));
if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD,
- grpc_fd_wrapped_fd(fdn->grpc_fd));
+ grpc_fd_wrapped_fd(fdn->fd));
} else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
@@ -253,7 +252,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
fdn = (fd_node *)gpr_malloc(sizeof(fd_node));
gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
- fdn->grpc_fd = grpc_fd_create(socks[i], fd_name);
+ fdn->fd = grpc_fd_create(socks[i], fd_name);
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;
@@ -262,8 +261,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn,
grpc_schedule_on_exec_ctx);
- grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set,
- fdn->grpc_fd);
+ grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->fd);
gpr_free(fd_name);
}
fdn->next = new_list;
@@ -274,9 +272,8 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
!fdn->readable_registered) {
grpc_ares_ev_driver_ref(ev_driver);
- gpr_log(GPR_DEBUG, "notify read on: %d",
- grpc_fd_wrapped_fd(fdn->grpc_fd));
- grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure);
+ gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd));
+ grpc_fd_notify_on_read(exec_ctx, fdn->fd, &fdn->read_closure);
fdn->readable_registered = true;
}
// Register write_closure if the socket is writable and write_closure
@@ -284,9 +281,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) {
gpr_log(GPR_DEBUG, "notify write on: %d",
- grpc_fd_wrapped_fd(fdn->grpc_fd));
+ grpc_fd_wrapped_fd(fdn->fd));
grpc_ares_ev_driver_ref(ev_driver);
- grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure);
+ grpc_fd_notify_on_write(exec_ctx, fdn->fd, &fdn->write_closure);
fdn->writable_registered = true;
}
gpr_mu_unlock(&fdn->mu);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
index 2e2b411ab8..0ffb38518a 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
@@ -123,8 +123,8 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx,
static grpc_ares_hostbyname_request *create_hostbyname_request(
grpc_ares_request *parent_request, char *host, uint16_t port,
bool is_balancer) {
- grpc_ares_hostbyname_request *hr =
- gpr_zalloc(sizeof(grpc_ares_hostbyname_request));
+ grpc_ares_hostbyname_request *hr = (grpc_ares_hostbyname_request *)gpr_zalloc(
+ sizeof(grpc_ares_hostbyname_request));
hr->parent_request = parent_request;
hr->host = gpr_strdup(host);
hr->port = port;
@@ -527,7 +527,8 @@ static void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
grpc_resolve_address_ares_request *r =
- gpr_zalloc(sizeof(grpc_resolve_address_ares_request));
+ (grpc_resolve_address_ares_request *)gpr_zalloc(
+ sizeof(grpc_resolve_address_ares_request));
r->addrs_out = addrs;
r->on_resolve_address_done = on_done;
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r,
diff --git a/src/core/ext/filters/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c
index 6cd6654b6f..09dcade089 100644
--- a/src/core/ext/filters/client_channel/retry_throttle.c
+++ b/src/core/ext/filters/client_channel/retry_throttle.c
@@ -99,7 +99,7 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
int max_milli_tokens, int milli_token_ratio,
grpc_server_retry_throttle_data* old_throttle_data) {
grpc_server_retry_throttle_data* throttle_data =
- gpr_malloc(sizeof(*throttle_data));
+ (grpc_server_retry_throttle_data*)gpr_malloc(sizeof(*throttle_data));
memset(throttle_data, 0, sizeof(*throttle_data));
gpr_ref_init(&throttle_data->refs, 1);
throttle_data->max_milli_tokens = max_milli_tokens;
@@ -131,11 +131,11 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
//
static void* copy_server_name(void* key, void* unused) {
- return gpr_strdup(key);
+ return gpr_strdup((const char*)key);
}
static long compare_server_name(void* key1, void* key2, void* unused) {
- return strcmp(key1, key2);
+ return strcmp((const char*)key1, (const char*)key2);
}
static void destroy_server_retry_throttle_data(void* value, void* unused) {
@@ -177,7 +177,8 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
const char* server_name, int max_milli_tokens, int milli_token_ratio) {
gpr_mu_lock(&g_mu);
grpc_server_retry_throttle_data* throttle_data =
- gpr_avl_get(g_avl, (char*)server_name, NULL);
+ (grpc_server_retry_throttle_data*)gpr_avl_get(g_avl, (char*)server_name,
+ NULL);
if (throttle_data == NULL) {
// Entry not found. Create a new one.
throttle_data = grpc_server_retry_throttle_data_create(
diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c
index 2c466f595a..9bd35f83e8 100644
--- a/src/core/ext/filters/client_channel/subchannel.c
+++ b/src/core/ext/filters/client_channel/subchannel.c
@@ -33,6 +33,7 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
@@ -290,6 +291,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
return c;
}
+ GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx);
c = (grpc_subchannel *)gpr_zalloc(sizeof(*c));
c->key = key;
gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c
index f57b631c41..d7a51f3899 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.c
+++ b/src/core/ext/filters/client_channel/subchannel_index.c
@@ -34,6 +34,8 @@ static gpr_avl g_subchannel_index;
static gpr_mu g_mu;
+static gpr_refcount g_refcount;
+
struct grpc_subchannel_key {
grpc_subchannel_args args;
};
@@ -88,24 +90,26 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
static void sck_avl_destroy(void *p, void *user_data) {
grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data;
- grpc_subchannel_key_destroy(exec_ctx, p);
+ grpc_subchannel_key_destroy(exec_ctx, (grpc_subchannel_key *)p);
}
static void *sck_avl_copy(void *p, void *unused) {
- return subchannel_key_copy(p);
+ return subchannel_key_copy((grpc_subchannel_key *)p);
}
static long sck_avl_compare(void *a, void *b, void *unused) {
- return grpc_subchannel_key_compare(a, b);
+ return grpc_subchannel_key_compare((grpc_subchannel_key *)a,
+ (grpc_subchannel_key *)b);
}
static void scv_avl_destroy(void *p, void *user_data) {
grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data;
- GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index");
+ GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, (grpc_subchannel *)p,
+ "subchannel_index");
}
static void *scv_avl_copy(void *p, void *unused) {
- GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index");
+ GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel *)p, "subchannel_index");
return p;
}
@@ -119,15 +123,27 @@ static const gpr_avl_vtable subchannel_avl_vtable = {
void grpc_subchannel_index_init(void) {
g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable);
gpr_mu_init(&g_mu);
+ gpr_ref_init(&g_refcount, 1);
}
void grpc_subchannel_index_shutdown(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- gpr_mu_destroy(&g_mu);
- gpr_avl_unref(g_subchannel_index, &exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
+ // TODO(juanlishen): This refcounting mechanism may lead to memory leackage.
+ // To solve that, we should force polling to flush any pending callbacks, then
+ // shutdown safely.
+ grpc_subchannel_index_unref();
+}
+
+void grpc_subchannel_index_unref(void) {
+ if (gpr_unref(&g_refcount)) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_destroy(&g_mu);
+ gpr_avl_unref(g_subchannel_index, &exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
}
+void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); }
+
grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx,
grpc_subchannel_key *key) {
// Lock, and take a reference to the subchannel index.
diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h
index 98d882a453..92e36d5283 100644
--- a/src/core/ext/filters/client_channel/subchannel_index.h
+++ b/src/core/ext/filters/client_channel/subchannel_index.h
@@ -59,6 +59,13 @@ void grpc_subchannel_index_init(void);
/** Shutdown the subchannel index (global) */
void grpc_subchannel_index_shutdown(void);
+/** Increment the refcount (non-zero) of subchannel index (global). */
+void grpc_subchannel_index_ref(void);
+
+/** Decrement the refcount of subchannel index (global). If the refcount drops
+ to zero, unref the subchannel index and destroy its mutex. */
+void grpc_subchannel_index_unref(void);
+
/** \em TEST ONLY.
* If \a force_creation is true, all key comparisons will be false, resulting in
* new subchannels always being created. Otherwise, the keys will be compared as
diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c
index 554a7f530d..03958136b4 100644
--- a/src/core/ext/filters/http/server/http_server_filter.c
+++ b/src/core/ext/filters/http/server/http_server_filter.c
@@ -83,12 +83,12 @@ static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx,
}
static void add_error(const char *error_name, grpc_error **cumulative,
- grpc_error *new) {
- if (new == GRPC_ERROR_NONE) return;
+ grpc_error *new_err) {
+ if (new_err == GRPC_ERROR_NONE) return;
if (*cumulative == GRPC_ERROR_NONE) {
*cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name);
}
- *cumulative = grpc_error_add_child(*cumulative, new);
+ *cumulative = grpc_error_add_child(*cumulative, new_err);
}
static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c
index 1580efa063..b7032f288d 100644
--- a/src/core/ext/filters/max_age/max_age_filter.c
+++ b/src/core/ext/filters/max_age/max_age_filter.c
@@ -393,7 +393,7 @@ static bool maybe_add_max_age_filter(grpc_exec_ctx* exec_ctx,
bool enable =
grpc_channel_arg_get_integer(
grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS),
- MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX &&
+ MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX ||
grpc_channel_arg_get_integer(
grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS),
MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX;
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index 0ec9353c04..202bcd47f5 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -161,7 +161,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error));
}
gpr_mu_unlock(&c->mu);
- chttp2_connector_unref(exec_ctx, arg);
+ chttp2_connector_unref(exec_ctx, (grpc_connector *)arg);
} else {
GPR_ASSERT(c->endpoint != NULL);
start_handshake_locked(exec_ctx, c);
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
index c2b2fc03aa..b7ffd16eac 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.c
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -52,7 +52,7 @@ typedef struct {
} server_state;
typedef struct {
- server_state *server_state;
+ server_state *svr_state;
grpc_pollset *accepting_pollset;
grpc_tcp_server_acceptor *acceptor;
grpc_handshake_manager *handshake_mgr;
@@ -63,8 +63,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_handshaker_args *args = (grpc_handshaker_args *)arg;
server_connection_state *connection_state =
(server_connection_state *)args->user_data;
- gpr_mu_lock(&connection_state->server_state->mu);
- if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) {
+ gpr_mu_lock(&connection_state->svr_state->mu);
+ if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
const char *error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
@@ -89,7 +89,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport *transport =
grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0);
grpc_server_setup_transport(
- exec_ctx, connection_state->server_state->server, transport,
+ exec_ctx, connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args);
grpc_chttp2_transport_start_reading(exec_ctx, transport,
args->read_buffer);
@@ -97,11 +97,11 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
}
grpc_handshake_manager_pending_list_remove(
- &connection_state->server_state->pending_handshake_mgrs,
+ &connection_state->svr_state->pending_handshake_mgrs,
connection_state->handshake_mgr);
- gpr_mu_unlock(&connection_state->server_state->mu);
+ gpr_mu_unlock(&connection_state->svr_state->mu);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
- grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server);
+ grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
gpr_free(connection_state->acceptor);
gpr_free(connection_state);
}
@@ -124,8 +124,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
gpr_mu_unlock(&state->mu);
grpc_tcp_server_ref(state->tcp_server);
server_connection_state *connection_state =
- gpr_malloc(sizeof(*connection_state));
- connection_state->server_state = state;
+ (server_connection_state *)gpr_malloc(sizeof(*connection_state));
+ connection_state->svr_state = state;
connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index d77047cb9e..30c2494185 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -144,10 +144,11 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
-static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_ping_type ping_type,
- grpc_closure *on_initiate,
- grpc_closure *on_complete);
+static void send_ping_locked(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate,
+ grpc_closure *on_complete,
+ grpc_chttp2_initiate_write_reason initiate_write_reason);
static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error);
@@ -346,7 +347,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
GRPC_CHTTP2_CLIENT_CONNECT_STRING));
- grpc_chttp2_initiate_write(exec_ctx, t, "initial_write");
}
/* configure http2 the way we like it */
@@ -562,7 +562,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
}
- grpc_chttp2_initiate_write(exec_ctx, t, "init");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
post_benign_reclaimer(exec_ctx, t);
}
@@ -830,13 +831,91 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+static void inc_initiate_write_reason(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_initiate_write_reason reason) {
+ switch (reason) {
+ case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA(
+ exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA(
+ exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL(
+ exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING(
+ exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE(
+ exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED(
+ exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx);
+ break;
+ case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
+ GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx);
+ break;
+ }
+}
+
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, const char *reason) {
+ grpc_chttp2_transport *t,
+ grpc_chttp2_initiate_write_reason reason) {
GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
- set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
+ inc_initiate_write_reason(exec_ctx, reason);
+ set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
+ grpc_chttp2_initiate_write_reason_string(reason));
t->is_first_write_in_batch = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
GRPC_CLOSURE_SCHED(
@@ -848,7 +927,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
- reason);
+ grpc_chttp2_initiate_write_reason_string(reason));
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
break;
@@ -856,16 +935,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
-void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- bool also_initiate_write, const char *reason) {
+void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
}
- if (also_initiate_write) {
- grpc_chttp2_initiate_write(exec_ctx, t, reason);
- }
}
static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t,
@@ -1087,7 +1162,9 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
}
/* cancel out streams that will never be started */
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -1184,7 +1261,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
}
}
@@ -1385,14 +1464,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
- bool initiate_write = true;
- if (op->send_message &&
- (op->payload->send_message.send_message->flags &
- GRPC_WRITE_BUFFER_HINT)) {
- initiate_write = false;
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ if (!(op->send_message &&
+ (op->payload->send_message.send_message->flags &
+ GRPC_WRITE_BUFFER_HINT))) {
+ grpc_chttp2_initiate_write(
+ exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
}
- grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write,
- "op.send_initial_metadata");
}
} else {
s->send_initial_metadata = NULL;
@@ -1500,8 +1578,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
- "op.send_trailing_metadata");
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ grpc_chttp2_initiate_write(
+ exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
}
}
}
@@ -1611,15 +1690,17 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
-static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_ping_type ping_type,
- grpc_closure *on_initiate, grpc_closure *on_ack) {
+static void send_ping_locked(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate,
+ grpc_closure *on_ack,
+ grpc_chttp2_initiate_write_reason initiate_write_reason) {
grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
GRPC_ERROR_NONE);
if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
GRPC_ERROR_NONE)) {
- grpc_chttp2_initiate_write(exec_ctx, t, "send_ping");
+ grpc_chttp2_initiate_write(exec_ctx, t, initiate_write_reason);
}
}
@@ -1627,7 +1708,8 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp;
t->ping_state.is_delayed_ping_timer_set = false;
- grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1642,7 +1724,8 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
- grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
}
}
@@ -1655,7 +1738,8 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&http_error);
grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error,
grpc_slice_ref_internal(slice), &t->qbuf);
- grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
GRPC_ERROR_UNREF(error);
}
@@ -1706,7 +1790,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (op->send_ping) {
send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL,
- op->send_ping);
+ op->send_ping,
+ GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
}
if (op->on_connectivity_state_change != NULL) {
@@ -1952,7 +2037,8 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error,
&s->stats.outgoing));
- grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
}
}
if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) {
@@ -2274,7 +2360,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&s->stats.outgoing));
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error);
- grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
}
typedef struct {
@@ -2309,19 +2396,20 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
- "immediate stream flowctl");
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ grpc_chttp2_initiate_write(
+ exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL);
break;
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE:
- grpc_chttp2_become_writable(exec_ctx, t, s, false,
- "queue stream flowctl");
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
break;
}
switch (action.send_transport_update) {
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED:
break;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY:
- grpc_chttp2_initiate_write(exec_ctx, t, "immediate transport flowctl");
+ grpc_chttp2_initiate_write(
+ exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL);
break;
// this is the same as no action b/c every time the transport enters the
// writing path it will maybe do an update
@@ -2339,7 +2427,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
(uint32_t)action.max_frame_size);
}
if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) {
- grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS);
}
}
if (action.need_ping) {
@@ -2347,7 +2436,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator);
send_ping_locked(exec_ctx, t,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
- &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
+ &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked,
+ GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING);
}
}
@@ -2426,7 +2516,10 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (t->flow_control.initial_window_update > 0) {
grpc_chttp2_stream *s;
while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled");
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ grpc_chttp2_initiate_write(
+ exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
}
}
t->flow_control.initial_window_update = 0;
@@ -2541,7 +2634,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end");
send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE,
&t->start_keepalive_ping_locked,
- &t->finish_keepalive_ping_locked);
+ &t->finish_keepalive_ping_locked,
+ GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
} else {
GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
grpc_timer_init(exec_ctx, &t->keepalive_ping_timer,
@@ -2893,7 +2987,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags) {
grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
- gpr_malloc(sizeof(*incoming_byte_stream));
+ (grpc_chttp2_incoming_byte_stream *)gpr_malloc(
+ sizeof(*incoming_byte_stream));
incoming_byte_stream->base.length = frame_size;
incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags;
@@ -2997,6 +3092,56 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
/*******************************************************************************
* MONITORING
*/
+
+const char *grpc_chttp2_initiate_write_reason_string(
+ grpc_chttp2_initiate_write_reason reason) {
+ switch (reason) {
+ case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
+ return "INITIAL_WRITE";
+ case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
+ return "START_NEW_STREAM";
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
+ return "SEND_MESSAGE";
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
+ return "SEND_INITIAL_METADATA";
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
+ return "SEND_TRAILING_METADATA";
+ case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
+ return "RETRY_SEND_PING";
+ case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
+ return "CONTINUE_PINGS";
+ case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
+ return "GOAWAY_SENT";
+ case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
+ return "RST_STREAM";
+ case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
+ return "CLOSE_FROM_API";
+ case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
+ return "STREAM_FLOW_CONTROL";
+ case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
+ return "TRANSPORT_FLOW_CONTROL";
+ case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
+ return "SEND_SETTINGS";
+ case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING:
+ return "BDP_ESTIMATOR_PING";
+ case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
+ return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
+ case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
+ return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
+ case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
+ return "APPLICATION_PING";
+ case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
+ return "KEEPALIVE_PING";
+ case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
+ return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
+ case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
+ return "PING_RESPONSE";
+ case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
+ return "FORCE_RST_STREAM";
+ }
+ GPR_UNREACHABLE_CODE(return "unknown");
+}
+
static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *t) {
return ((grpc_chttp2_transport *)t)->ep;
diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c
index 0f078e79e9..569a6349d3 100644
--- a/src/core/ext/transport/chttp2/transport/flow_control.c
+++ b/src/core/ext/transport/chttp2/transport/flow_control.c
@@ -60,24 +60,24 @@ static void pretrace(shadow_flow_control* shadow_fc,
#define TRACE_PADDING 30
-static char* fmt_int64_diff_str(int64_t old, int64_t new) {
+static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
char* str;
- if (old != new) {
- gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new);
+ if (old_val != new_val) {
+ gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val);
} else {
- gpr_asprintf(&str, "%" PRId64 "", old);
+ gpr_asprintf(&str, "%" PRId64 "", old_val);
}
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str);
return str_lp;
}
-static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) {
+static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
char* str;
- if (new > 0 && old != new) {
- gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new);
+ if (new_val > 0 && old_val != new_val) {
+ gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
} else {
- gpr_asprintf(&str, "%" PRIu32 "", old);
+ gpr_asprintf(&str, "%" PRIu32 "", old_val);
}
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str);
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index e99bc2eb4e..7fc2ce1bd9 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -115,7 +115,8 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
}
t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
- grpc_chttp2_initiate_write(exec_ctx, t, "ping response");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE);
}
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c
index 806100adaa..2995bf7310 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.c
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.c
@@ -44,7 +44,8 @@ static uint8_t *fill_header(uint8_t *out, uint32_t length, uint8_t flags) {
return out;
}
-grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new,
+grpc_slice grpc_chttp2_settings_create(uint32_t *old_settings,
+ const uint32_t *new_settings,
uint32_t force_mask, size_t count) {
size_t i;
uint32_t n = 0;
@@ -52,21 +53,21 @@ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new,
uint8_t *p;
for (i = 0; i < count; i++) {
- n += (new[i] != old[i] || (force_mask & (1u << i)) != 0);
+ n += (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0);
}
output = GRPC_SLICE_MALLOC(9 + 6 * n);
p = fill_header(GRPC_SLICE_START_PTR(output), 6 * n, 0);
for (i = 0; i < count; i++) {
- if (new[i] != old[i] || (force_mask & (1u << i)) != 0) {
+ if (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0) {
*p++ = (uint8_t)(grpc_setting_id_to_wire_id[i] >> 8);
*p++ = (uint8_t)(grpc_setting_id_to_wire_id[i]);
- *p++ = (uint8_t)(new[i] >> 24);
- *p++ = (uint8_t)(new[i] >> 16);
- *p++ = (uint8_t)(new[i] >> 8);
- *p++ = (uint8_t)(new[i]);
- old[i] = new[i];
+ *p++ = (uint8_t)(new_settings[i] >> 24);
+ *p++ = (uint8_t)(new_settings[i] >> 16);
+ *p++ = (uint8_t)(new_settings[i] >> 8);
+ *p++ = (uint8_t)(new_settings[i]);
+ old_settings[i] = new_settings[i];
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index c94f7725bf..c9ab8d1b50 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -99,8 +99,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
grpc_chttp2_flowctl_recv_stream_update(
&t->flow_control, &s->flow_control, received_update);
if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
- "stream.read_flow_control");
+ grpc_chttp2_mark_stream_writable(exec_ctx, t, s);
+ grpc_chttp2_initiate_write(
+ exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE);
}
}
} else {
@@ -109,7 +111,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
received_update);
bool is_zero = t->flow_control.remote_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control");
+ grpc_chttp2_initiate_write(
+ exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED);
}
}
}
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 82ff2c8e2c..901e7413c0 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -1649,7 +1649,8 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_add(
&t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing));
- grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream");
+ grpc_chttp2_initiate_write(exec_ctx, t,
+ GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM);
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst");
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index 089b360ecf..187ce0ea87 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -42,8 +42,9 @@ grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
grpc_mdelem elem) {
buffer->size += GRPC_MDELEM_LENGTH(elem);
return grpc_metadata_batch_add_tail(
- exec_ctx, &buffer->batch,
- gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem);
+ exec_ctx, &buffer->batch, (grpc_linked_mdelem *)gpr_arena_alloc(
+ buffer->arena, sizeof(grpc_linked_mdelem)),
+ elem);
}
grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 46930dd22c..4382e30fcd 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -79,6 +79,33 @@ typedef enum {
GRPC_CHTTP2_PCL_COUNT /* must be last */
} grpc_chttp2_ping_closure_list;
+typedef enum {
+ GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
+ GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
+ GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
+ GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
+ GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
+ GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
+ GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
+ GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
+ GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
+ GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
+ GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING,
+ GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
+ GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
+ GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
+ GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
+ GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
+ GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
+ GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
+} grpc_chttp2_initiate_write_reason;
+
+const char *grpc_chttp2_initiate_write_reason_string(
+ grpc_chttp2_initiate_write_reason reason);
+
typedef struct {
grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT];
uint64_t inflight_id;
@@ -599,7 +626,8 @@ struct grpc_chttp2_stream {
The actual call chain is documented in the implementation of this function.
*/
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, const char *reason);
+ grpc_chttp2_transport *t,
+ grpc_chttp2_initiate_write_reason reason);
typedef struct {
/** are we writing? */
@@ -851,10 +879,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
-void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- bool also_initiate_write, const char *reason);
+void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
diff --git a/src/core/ext/transport/chttp2/transport/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c
index 650090d8f0..d6079a9a33 100644
--- a/src/core/ext/transport/chttp2/transport/stream_map.c
+++ b/src/core/ext/transport/chttp2/transport/stream_map.c
@@ -72,8 +72,10 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, uint32_t key,
/* resize when less than 25% of the table is free, because compaction
won't help much */
map->capacity = capacity = 3 * capacity / 2;
- map->keys = keys = gpr_realloc(keys, capacity * sizeof(uint32_t));
- map->values = values = gpr_realloc(values, capacity * sizeof(void *));
+ map->keys = keys =
+ (uint32_t *)gpr_realloc(keys, capacity * sizeof(uint32_t));
+ map->values = values =
+ (void **)gpr_realloc(values, capacity * sizeof(void *));
}
}
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index eb818fab2f..c413905f5c 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -200,9 +200,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (t->flow_control.remote_window > 0) {
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) {
- if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) &&
- stream_ref_if_not_destroyed(&s->refcount->refs)) {
- grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control");
+ if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
+ stream_ref_if_not_destroyed(&s->refcount->refs);
}
}
}
diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c
index 49e507cc78..72406aee2c 100644
--- a/src/core/ext/transport/inproc/inproc_transport.c
+++ b/src/core/ext/transport/inproc/inproc_transport.c
@@ -37,7 +37,6 @@
if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \
} while (0)
-static const grpc_transport_vtable inproc_vtable;
static grpc_slice g_empty_slice;
static grpc_slice g_fake_path_key;
static grpc_slice g_fake_path_value;
@@ -1167,6 +1166,55 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
}
/*******************************************************************************
+ * INTEGRATION GLUE
+ */
+
+static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {
+ // Nothing to do here
+}
+
+static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset_set *pollset_set) {
+ // Nothing to do here
+}
+
+static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
+ return NULL;
+}
+
+/*******************************************************************************
+ * GLOBAL INIT AND DESTROY
+ */
+static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+
+void grpc_inproc_transport_init(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL,
+ grpc_schedule_on_exec_ctx);
+ g_empty_slice = grpc_slice_from_static_buffer(NULL, 0);
+
+ grpc_slice key_tmp = grpc_slice_from_static_string(":path");
+ g_fake_path_key = grpc_slice_intern(key_tmp);
+ grpc_slice_unref_internal(&exec_ctx, key_tmp);
+
+ g_fake_path_value = grpc_slice_from_static_string("/");
+
+ grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
+ g_fake_auth_key = grpc_slice_intern(auth_tmp);
+ grpc_slice_unref_internal(&exec_ctx, auth_tmp);
+
+ g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static const grpc_transport_vtable inproc_vtable = {
+ sizeof(inproc_stream), "inproc", init_stream,
+ set_pollset, set_pollset_set, perform_stream_op,
+ perform_transport_op, destroy_stream, destroy_transport,
+ get_endpoint};
+
+/*******************************************************************************
* Main inproc transport functions
*/
static void inproc_transports_create(grpc_exec_ctx *exec_ctx,
@@ -1178,7 +1226,7 @@ static void inproc_transports_create(grpc_exec_ctx *exec_ctx,
inproc_transport *st = (inproc_transport *)gpr_zalloc(sizeof(*st));
inproc_transport *ct = (inproc_transport *)gpr_zalloc(sizeof(*ct));
// Share one lock between both sides since both sides get affected
- st->mu = ct->mu = gpr_malloc(sizeof(*st->mu));
+ st->mu = ct->mu = (shared_mu *)gpr_malloc(sizeof(*st->mu));
gpr_mu_init(&st->mu->mu);
gpr_ref_init(&st->mu->refs, 2);
st->base.vtable = &inproc_vtable;
@@ -1240,55 +1288,6 @@ grpc_channel *grpc_inproc_channel_create(grpc_server *server,
return channel;
}
-/*******************************************************************************
- * INTEGRATION GLUE
- */
-
-static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset *pollset) {
- // Nothing to do here
-}
-
-static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset_set *pollset_set) {
- // Nothing to do here
-}
-
-static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
- return NULL;
-}
-
-static const grpc_transport_vtable inproc_vtable = {
- sizeof(inproc_stream), "inproc", init_stream,
- set_pollset, set_pollset_set, perform_stream_op,
- perform_transport_op, destroy_stream, destroy_transport,
- get_endpoint};
-
-/*******************************************************************************
- * GLOBAL INIT AND DESTROY
- */
-static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
-
-void grpc_inproc_transport_init(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL,
- grpc_schedule_on_exec_ctx);
- g_empty_slice = grpc_slice_from_static_buffer(NULL, 0);
-
- grpc_slice key_tmp = grpc_slice_from_static_string(":path");
- g_fake_path_key = grpc_slice_intern(key_tmp);
- grpc_slice_unref_internal(&exec_ctx, key_tmp);
-
- g_fake_path_value = grpc_slice_from_static_string("/");
-
- grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
- g_fake_auth_key = grpc_slice_intern(auth_tmp);
- grpc_slice_unref_internal(&exec_ctx, auth_tmp);
-
- g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
void grpc_inproc_transport_shutdown(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_slice_unref_internal(&exec_ctx, g_empty_slice);
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c
index a18efcb524..b3e1ee9b4e 100644
--- a/src/core/lib/debug/stats_data.c
+++ b/src/core/lib/debug/stats_data.c
@@ -25,6 +25,10 @@
const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"client_calls_created",
"server_calls_created",
+ "cqs_created",
+ "client_channels_created",
+ "client_subchannels_created",
+ "server_channels_created",
"syscall_poll",
"syscall_wait",
"histogram_slow_lookups",
@@ -46,6 +50,27 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"http2_writes_offloaded",
"http2_writes_continued",
"http2_partial_writes",
+ "http2_initiate_write_due_to_initial_write",
+ "http2_initiate_write_due_to_start_new_stream",
+ "http2_initiate_write_due_to_send_message",
+ "http2_initiate_write_due_to_send_initial_metadata",
+ "http2_initiate_write_due_to_send_trailing_metadata",
+ "http2_initiate_write_due_to_retry_send_ping",
+ "http2_initiate_write_due_to_continue_pings",
+ "http2_initiate_write_due_to_goaway_sent",
+ "http2_initiate_write_due_to_rst_stream",
+ "http2_initiate_write_due_to_close_from_api",
+ "http2_initiate_write_due_to_stream_flow_control",
+ "http2_initiate_write_due_to_transport_flow_control",
+ "http2_initiate_write_due_to_send_settings",
+ "http2_initiate_write_due_to_bdp_estimator_ping",
+ "http2_initiate_write_due_to_flow_control_unstalled_by_setting",
+ "http2_initiate_write_due_to_flow_control_unstalled_by_update",
+ "http2_initiate_write_due_to_application_ping",
+ "http2_initiate_write_due_to_keepalive_ping",
+ "http2_initiate_write_due_to_transport_flow_control_unstalled",
+ "http2_initiate_write_due_to_ping_response",
+ "http2_initiate_write_due_to_force_rst_stream",
"combiner_locks_initiated",
"combiner_locks_scheduled_items",
"combiner_locks_scheduled_final_items",
@@ -62,6 +87,8 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"Number of client side calls created by this process",
"Number of server side calls created by this process",
+ "Number of completion queues created", "Number of client channels created",
+ "Number of client subchannels created", "Number of server channels created",
"Number of polling syscalls (epoll_wait, poll, etc) made by this process",
"Number of sleeping syscalls made by this process",
"Number of times histogram increments went through the slow (binary "
@@ -86,6 +113,30 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"written",
"Number of HTTP2 writes that were made knowing there was still more data "
"to be written (we cap maximum write size to syscall_write)",
+ "Number of HTTP2 writes initiated due to 'initial_write'",
+ "Number of HTTP2 writes initiated due to 'start_new_stream'",
+ "Number of HTTP2 writes initiated due to 'send_message'",
+ "Number of HTTP2 writes initiated due to 'send_initial_metadata'",
+ "Number of HTTP2 writes initiated due to 'send_trailing_metadata'",
+ "Number of HTTP2 writes initiated due to 'retry_send_ping'",
+ "Number of HTTP2 writes initiated due to 'continue_pings'",
+ "Number of HTTP2 writes initiated due to 'goaway_sent'",
+ "Number of HTTP2 writes initiated due to 'rst_stream'",
+ "Number of HTTP2 writes initiated due to 'close_from_api'",
+ "Number of HTTP2 writes initiated due to 'stream_flow_control'",
+ "Number of HTTP2 writes initiated due to 'transport_flow_control'",
+ "Number of HTTP2 writes initiated due to 'send_settings'",
+ "Number of HTTP2 writes initiated due to 'bdp_estimator_ping'",
+ "Number of HTTP2 writes initiated due to "
+ "'flow_control_unstalled_by_setting'",
+ "Number of HTTP2 writes initiated due to "
+ "'flow_control_unstalled_by_update'",
+ "Number of HTTP2 writes initiated due to 'application_ping'",
+ "Number of HTTP2 writes initiated due to 'keepalive_ping'",
+ "Number of HTTP2 writes initiated due to "
+ "'transport_flow_control_unstalled'",
+ "Number of HTTP2 writes initiated due to 'ping_response'",
+ "Number of HTTP2 writes initiated due to 'force_rst_stream'",
"Number of combiner lock entries by process (first items queued to a "
"combiner)",
"Number of items scheduled against combiner locks",
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index 479c9520b6..c9871c4a56 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -27,6 +27,10 @@
typedef enum {
GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED,
GRPC_STATS_COUNTER_SERVER_CALLS_CREATED,
+ GRPC_STATS_COUNTER_CQS_CREATED,
+ GRPC_STATS_COUNTER_CLIENT_CHANNELS_CREATED,
+ GRPC_STATS_COUNTER_CLIENT_SUBCHANNELS_CREATED,
+ GRPC_STATS_COUNTER_SERVER_CHANNELS_CREATED,
GRPC_STATS_COUNTER_SYSCALL_POLL,
GRPC_STATS_COUNTER_SYSCALL_WAIT,
GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS,
@@ -48,6 +52,27 @@ typedef enum {
GRPC_STATS_COUNTER_HTTP2_WRITES_OFFLOADED,
GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED,
GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE,
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM,
GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS,
GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS,
@@ -109,6 +134,15 @@ typedef enum {
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED)
#define GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_CALLS_CREATED)
+#define GRPC_STATS_INC_CQS_CREATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CQS_CREATED)
+#define GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CHANNELS_CREATED)
+#define GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), \
+ GRPC_STATS_COUNTER_CLIENT_SUBCHANNELS_CREATED)
+#define GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_CHANNELS_CREATED)
#define GRPC_STATS_INC_SYSCALL_POLL(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_POLL)
#define GRPC_STATS_INC_SYSCALL_WAIT(exec_ctx) \
@@ -156,6 +190,95 @@ typedef enum {
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED)
#define GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( \
+ exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( \
+ exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL( \
+ exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( \
+ exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING( \
+ exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( \
+ exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( \
+ exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( \
+ exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE)
+#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx) \
+ GRPC_STATS_INC_COUNTER( \
+ (exec_ctx), \
+ GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM)
#define GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx) \
GRPC_STATS_INC_COUNTER((exec_ctx), \
GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED)
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index 7cf82de96c..84727fe6c4 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -20,6 +20,14 @@
doc: Number of client side calls created by this process
- counter: server_calls_created
doc: Number of server side calls created by this process
+- counter: cqs_created
+ doc: Number of completion queues created
+- counter: client_channels_created
+ doc: Number of client channels created
+- counter: client_subchannels_created
+ doc: Number of client subchannels created
+- counter: server_channels_created
+ doc: Number of server channels created
# polling
- counter: syscall_poll
doc: Number of polling syscalls (epoll_wait, poll, etc) made by this process
@@ -109,6 +117,48 @@
- counter: http2_partial_writes
doc: Number of HTTP2 writes that were made knowing there was still more data
to be written (we cap maximum write size to syscall_write)
+- counter: http2_initiate_write_due_to_initial_write
+ doc: Number of HTTP2 writes initiated due to 'initial_write'
+- counter: http2_initiate_write_due_to_start_new_stream
+ doc: Number of HTTP2 writes initiated due to 'start_new_stream'
+- counter: http2_initiate_write_due_to_send_message
+ doc: Number of HTTP2 writes initiated due to 'send_message'
+- counter: http2_initiate_write_due_to_send_initial_metadata
+ doc: Number of HTTP2 writes initiated due to 'send_initial_metadata'
+- counter: http2_initiate_write_due_to_send_trailing_metadata
+ doc: Number of HTTP2 writes initiated due to 'send_trailing_metadata'
+- counter: http2_initiate_write_due_to_retry_send_ping
+ doc: Number of HTTP2 writes initiated due to 'retry_send_ping'
+- counter: http2_initiate_write_due_to_continue_pings
+ doc: Number of HTTP2 writes initiated due to 'continue_pings'
+- counter: http2_initiate_write_due_to_goaway_sent
+ doc: Number of HTTP2 writes initiated due to 'goaway_sent'
+- counter: http2_initiate_write_due_to_rst_stream
+ doc: Number of HTTP2 writes initiated due to 'rst_stream'
+- counter: http2_initiate_write_due_to_close_from_api
+ doc: Number of HTTP2 writes initiated due to 'close_from_api'
+- counter: http2_initiate_write_due_to_stream_flow_control
+ doc: Number of HTTP2 writes initiated due to 'stream_flow_control'
+- counter: http2_initiate_write_due_to_transport_flow_control
+ doc: Number of HTTP2 writes initiated due to 'transport_flow_control'
+- counter: http2_initiate_write_due_to_send_settings
+ doc: Number of HTTP2 writes initiated due to 'send_settings'
+- counter: http2_initiate_write_due_to_bdp_estimator_ping
+ doc: Number of HTTP2 writes initiated due to 'bdp_estimator_ping'
+- counter: http2_initiate_write_due_to_flow_control_unstalled_by_setting
+ doc: Number of HTTP2 writes initiated due to 'flow_control_unstalled_by_setting'
+- counter: http2_initiate_write_due_to_flow_control_unstalled_by_update
+ doc: Number of HTTP2 writes initiated due to 'flow_control_unstalled_by_update'
+- counter: http2_initiate_write_due_to_application_ping
+ doc: Number of HTTP2 writes initiated due to 'application_ping'
+- counter: http2_initiate_write_due_to_keepalive_ping
+ doc: Number of HTTP2 writes initiated due to 'keepalive_ping'
+- counter: http2_initiate_write_due_to_transport_flow_control_unstalled
+ doc: Number of HTTP2 writes initiated due to 'transport_flow_control_unstalled'
+- counter: http2_initiate_write_due_to_ping_response
+ doc: Number of HTTP2 writes initiated due to 'ping_response'
+- counter: http2_initiate_write_due_to_force_rst_stream
+ doc: Number of HTTP2 writes initiated due to 'force_rst_stream'
# combiner locks
- counter: combiner_locks_initiated
doc: Number of combiner lock entries by process
diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql
index 5354769291..d21afbbfe4 100644
--- a/src/core/lib/debug/stats_data_bq_schema.sql
+++ b/src/core/lib/debug/stats_data_bq_schema.sql
@@ -1,5 +1,9 @@
client_calls_created_per_iteration:FLOAT,
server_calls_created_per_iteration:FLOAT,
+cqs_created_per_iteration:FLOAT,
+client_channels_created_per_iteration:FLOAT,
+client_subchannels_created_per_iteration:FLOAT,
+server_channels_created_per_iteration:FLOAT,
syscall_poll_per_iteration:FLOAT,
syscall_wait_per_iteration:FLOAT,
histogram_slow_lookups_per_iteration:FLOAT,
@@ -21,6 +25,27 @@ http2_writes_begun_per_iteration:FLOAT,
http2_writes_offloaded_per_iteration:FLOAT,
http2_writes_continued_per_iteration:FLOAT,
http2_partial_writes_per_iteration:FLOAT,
+http2_initiate_write_due_to_initial_write_per_iteration:FLOAT,
+http2_initiate_write_due_to_start_new_stream_per_iteration:FLOAT,
+http2_initiate_write_due_to_send_message_per_iteration:FLOAT,
+http2_initiate_write_due_to_send_initial_metadata_per_iteration:FLOAT,
+http2_initiate_write_due_to_send_trailing_metadata_per_iteration:FLOAT,
+http2_initiate_write_due_to_retry_send_ping_per_iteration:FLOAT,
+http2_initiate_write_due_to_continue_pings_per_iteration:FLOAT,
+http2_initiate_write_due_to_goaway_sent_per_iteration:FLOAT,
+http2_initiate_write_due_to_rst_stream_per_iteration:FLOAT,
+http2_initiate_write_due_to_close_from_api_per_iteration:FLOAT,
+http2_initiate_write_due_to_stream_flow_control_per_iteration:FLOAT,
+http2_initiate_write_due_to_transport_flow_control_per_iteration:FLOAT,
+http2_initiate_write_due_to_send_settings_per_iteration:FLOAT,
+http2_initiate_write_due_to_bdp_estimator_ping_per_iteration:FLOAT,
+http2_initiate_write_due_to_flow_control_unstalled_by_setting_per_iteration:FLOAT,
+http2_initiate_write_due_to_flow_control_unstalled_by_update_per_iteration:FLOAT,
+http2_initiate_write_due_to_application_ping_per_iteration:FLOAT,
+http2_initiate_write_due_to_keepalive_ping_per_iteration:FLOAT,
+http2_initiate_write_due_to_transport_flow_control_unstalled_per_iteration:FLOAT,
+http2_initiate_write_due_to_ping_response_per_iteration:FLOAT,
+http2_initiate_write_due_to_force_rst_stream_per_iteration:FLOAT,
combiner_locks_initiated_per_iteration:FLOAT,
combiner_locks_scheduled_items_per_iteration:FLOAT,
combiner_locks_scheduled_final_items_per_iteration:FLOAT,
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 7fa2899092..4e2746ad6c 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -131,9 +131,9 @@ static void fd_global_shutdown(void);
* Pollset Declarations
*/
-typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state_t;
+typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
-static const char *kick_state_string(kick_state_t st) {
+static const char *kick_state_string(kick_state st) {
switch (st) {
case UNKICKED:
return "UNKICKED";
@@ -146,7 +146,7 @@ static const char *kick_state_string(kick_state_t st) {
}
struct grpc_pollset_worker {
- kick_state_t kick_state;
+ kick_state state;
int kick_state_mutator; // which line of code last changed kick state
bool initialized_cv;
grpc_pollset_worker *next;
@@ -155,9 +155,9 @@ struct grpc_pollset_worker {
grpc_closure_list schedule_on_end_work;
};
-#define SET_KICK_STATE(worker, state) \
+#define SET_KICK_STATE(worker, kick_state) \
do { \
- (worker)->kick_state = (state); \
+ (worker)->state = (kick_state); \
(worker)->kick_state_mutator = __LINE__; \
} while (false)
@@ -509,7 +509,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
- switch (worker->kick_state) {
+ switch (worker->state) {
case KICKED:
break;
case UNKICKED:
@@ -685,7 +685,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
- pollset, worker, kick_state_string(worker->kick_state),
+ pollset, worker, kick_state_string(worker->state),
is_reassigning);
}
if (pollset->seen_inactive) {
@@ -705,12 +705,12 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
at this point is if it were "kicked specifically". Since the worker has
not added itself to the pollset yet (by calling worker_insert()), it is
not visible in the "kick any" path yet */
- if (worker->kick_state == UNKICKED) {
+ if (worker->state == UNKICKED) {
pollset->seen_inactive = false;
if (neighborhood->active_root == NULL) {
neighborhood->active_root = pollset->next = pollset->prev = pollset;
/* Make this the designated poller if there isn't one already */
- if (worker->kick_state == UNKICKED &&
+ if (worker->state == UNKICKED &&
gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
SET_KICK_STATE(worker, DESIGNATED_POLLER);
}
@@ -730,20 +730,20 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker_insert(pollset, worker);
pollset->begin_refs--;
- if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) {
+ if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
- while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
+ while (worker->state == UNKICKED && !pollset->shutting_down) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
- pollset, worker, kick_state_string(worker->kick_state),
+ pollset, worker, kick_state_string(worker->state),
pollset->shutting_down);
}
if (gpr_cv_wait(&worker->cv, &pollset->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) &&
- worker->kick_state == UNKICKED) {
+ worker->state == UNKICKED) {
/* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
received a kick */
SET_KICK_STATE(worker, KICKED);
@@ -756,7 +756,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_log(GPR_ERROR,
"PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
"kicked_without_poller: %d",
- pollset, worker, kick_state_string(worker->kick_state),
+ pollset, worker, kick_state_string(worker->state),
pollset->shutting_down, pollset->kicked_without_poller);
}
@@ -776,7 +776,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
GPR_TIMER_END("begin_worker", 0);
- return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
+ return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
}
static bool check_neighborhood_for_available_poller(
@@ -793,7 +793,7 @@ static bool check_neighborhood_for_available_poller(
grpc_pollset_worker *inspect_worker = inspect->root_worker;
if (inspect_worker != NULL) {
do {
- switch (inspect_worker->kick_state) {
+ switch (inspect_worker->state) {
case UNKICKED:
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
@@ -856,7 +856,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure_list_move(&worker->schedule_on_end_work,
&exec_ctx->closure_list);
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
- if (worker->next != worker && worker->next->kick_state == UNKICKED) {
+ if (worker->next != worker && worker->next->state == UNKICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
}
@@ -990,14 +990,14 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
gpr_strvec_add(&log, tmp);
if (pollset->root_worker != NULL) {
gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
- kick_state_string(pollset->root_worker->kick_state),
+ kick_state_string(pollset->root_worker->state),
pollset->root_worker->next,
- kick_state_string(pollset->root_worker->next->kick_state));
+ kick_state_string(pollset->root_worker->next->state));
gpr_strvec_add(&log, tmp);
}
if (specific_worker != NULL) {
gpr_asprintf(&tmp, " worker_kick_state=%s",
- kick_state_string(specific_worker->kick_state));
+ kick_state_string(specific_worker->state));
gpr_strvec_add(&log, tmp);
}
tmp = gpr_strvec_flatten(&log, NULL);
@@ -1017,13 +1017,13 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
goto done;
}
grpc_pollset_worker *next_worker = root_worker->next;
- if (root_worker->kick_state == KICKED) {
+ if (root_worker->state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
goto done;
- } else if (next_worker->kick_state == KICKED) {
+ } else if (next_worker->state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
}
@@ -1040,7 +1040,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
SET_KICK_STATE(root_worker, KICKED);
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
- } else if (next_worker->kick_state == UNKICKED) {
+ } else if (next_worker->state == UNKICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
}
@@ -1048,8 +1048,8 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
goto done;
- } else if (next_worker->kick_state == DESIGNATED_POLLER) {
- if (root_worker->kick_state != DESIGNATED_POLLER) {
+ } else if (next_worker->state == DESIGNATED_POLLER) {
+ if (root_worker->state != DESIGNATED_POLLER) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(
GPR_ERROR,
@@ -1071,7 +1071,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
goto done;
}
} else {
- GPR_ASSERT(next_worker->kick_state == KICKED);
+ GPR_ASSERT(next_worker->state == KICKED);
SET_KICK_STATE(next_worker, KICKED);
goto done;
}
@@ -1085,7 +1085,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
GPR_UNREACHABLE_CODE(goto done);
}
- if (specific_worker->kick_state == KICKED) {
+ if (specific_worker->state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. specific worker already kicked");
}
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 91164de13e..959a8f4b4f 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -96,12 +96,12 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
* pollable Declarations
*/
-typedef struct pollable_t {
+typedef struct pollable {
polling_obj po;
int epfd;
grpc_wakeup_fd wakeup;
grpc_pollset_worker *root_worker;
-} pollable_t;
+} pollable;
static const char *polling_obj_type_string(polling_obj_type t) {
switch (t) {
@@ -121,7 +121,7 @@ static const char *polling_obj_type_string(polling_obj_type t) {
return "<invalid>";
}
-static char *pollable_desc(pollable_t *p) {
+static char *pollable_desc(pollable *p) {
char *out;
gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
polling_obj_type_string(p->po.type), p->po.group, p->epfd,
@@ -129,19 +129,19 @@ static char *pollable_desc(pollable_t *p) {
return out;
}
-static pollable_t g_empty_pollable;
+static pollable g_empty_pollable;
-static void pollable_init(pollable_t *p, polling_obj_type type);
-static void pollable_destroy(pollable_t *p);
+static void pollable_init(pollable *p, polling_obj_type type);
+static void pollable_destroy(pollable *p);
/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
-static grpc_error *pollable_materialize(pollable_t *p);
+static grpc_error *pollable_materialize(pollable *p);
/*******************************************************************************
* Fd Declarations
*/
struct grpc_fd {
- pollable_t pollable;
+ pollable pollable_obj;
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@@ -192,15 +192,15 @@ struct grpc_pollset_worker {
pollset_worker_link links[POLLSET_WORKER_LINK_COUNT];
gpr_cv cv;
grpc_pollset *pollset;
- pollable_t *pollable;
+ pollable *pollable_obj;
};
#define MAX_EPOLL_EVENTS 100
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
struct grpc_pollset {
- pollable_t pollable;
- pollable_t *current_pollable;
+ pollable pollable_obj;
+ pollable *current_pollable_obj;
int kick_alls_pending;
bool kicked_without_poller;
grpc_closure *shutdown_closure;
@@ -281,7 +281,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_fd *fd = (grpc_fd *)arg;
/* Add the fd to the freelist */
grpc_iomgr_unregister_object(&fd->iomgr_object);
- pollable_destroy(&fd->pollable);
+ pollable_destroy(&fd->pollable_obj);
gpr_mu_destroy(&fd->orphaned_mu);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@@ -342,7 +342,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
}
- pollable_init(&new_fd->pollable, PO_FD);
+ pollable_init(&new_fd->pollable_obj, PO_FD);
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
@@ -384,7 +384,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
- gpr_mu_lock(&fd->pollable.po.mu);
+ gpr_mu_lock(&fd->pollable_obj.po.mu);
gpr_mu_lock(&fd->orphaned_mu);
fd->on_done_closure = on_done;
@@ -410,7 +410,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->orphaned_mu);
- gpr_mu_unlock(&fd->pollable.po.mu);
+ gpr_mu_unlock(&fd->pollable_obj.po.mu);
UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
@@ -450,13 +450,13 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
* Pollable Definitions
*/
-static void pollable_init(pollable_t *p, polling_obj_type type) {
+static void pollable_init(pollable *p, polling_obj_type type) {
po_init(&p->po, type);
p->root_worker = NULL;
p->epfd = -1;
}
-static void pollable_destroy(pollable_t *p) {
+static void pollable_destroy(pollable *p) {
po_destroy(&p->po);
if (p->epfd != -1) {
close(p->epfd);
@@ -465,7 +465,7 @@ static void pollable_destroy(pollable_t *p) {
}
/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
-static grpc_error *pollable_materialize(pollable_t *p) {
+static grpc_error *pollable_materialize(pollable *p) {
if (p->epfd == -1) {
int new_epfd = epoll_create1(EPOLL_CLOEXEC);
if (new_epfd < 0) {
@@ -491,7 +491,7 @@ static grpc_error *pollable_materialize(pollable_t *p) {
}
/* pollable must be materialized */
-static grpc_error *pollable_add_fd(pollable_t *p, grpc_fd *fd) {
+static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollable_add_fd";
const int epfd = p->epfd;
@@ -556,30 +556,33 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_unused) {
grpc_error *error = GRPC_ERROR_NONE;
grpc_pollset *pollset = (grpc_pollset *)arg;
- gpr_mu_lock(&pollset->pollable.po.mu);
+ gpr_mu_lock(&pollset->pollable_obj.po.mu);
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
- if (worker->pollable != &pollset->pollable) {
- gpr_mu_lock(&worker->pollable->po.mu);
+ if (worker->pollable_obj != &pollset->pollable_obj) {
+ gpr_mu_lock(&worker->pollable_obj->po.mu);
}
if (worker->initialized_cv && worker != pollset->root_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
- pollset, worker, &pollset->pollable, worker->pollable);
+ pollset, worker, &pollset->pollable_obj,
+ worker->pollable_obj);
}
worker->kicked = true;
gpr_cv_signal(&worker->cv);
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
- pollset, worker, &pollset->pollable, worker->pollable);
+ pollset, worker, &pollset->pollable_obj,
+ worker->pollable_obj);
}
- append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup),
+ append_error(&error,
+ grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup),
"pollset_shutdown");
}
- if (worker->pollable != &pollset->pollable) {
- gpr_mu_unlock(&worker->pollable->po.mu);
+ if (worker->pollable_obj != &pollset->pollable_obj) {
+ gpr_mu_unlock(&worker->pollable_obj->po.mu);
}
worker = worker->links[PWL_POLLSET].next;
@@ -587,7 +590,7 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
}
pollset->kick_alls_pending--;
pollset_maybe_finish_shutdown(exec_ctx, pollset);
- gpr_mu_unlock(&pollset->pollable.po.mu);
+ gpr_mu_unlock(&pollset->pollable_obj.po.mu);
GRPC_LOG_IF_ERROR("kick_all", error);
}
@@ -598,7 +601,7 @@ static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GRPC_ERROR_NONE);
}
-static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p,
+static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
grpc_pollset_worker *specific_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
@@ -663,24 +666,24 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p,
/* p->po.mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
- pollable_t *p = pollset->current_pollable;
- if (p != &pollset->pollable) {
+ pollable *p = pollset->current_pollable_obj;
+ if (p != &pollset->pollable_obj) {
gpr_mu_lock(&p->po.mu);
}
grpc_error *error = pollset_kick_inner(pollset, p, specific_worker);
- if (p != &pollset->pollable) {
+ if (p != &pollset->pollable_obj) {
gpr_mu_unlock(&p->po.mu);
}
return error;
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- pollable_init(&pollset->pollable, PO_POLLSET);
- pollset->current_pollable = &g_empty_pollable;
+ pollable_init(&pollset->pollable_obj, PO_POLLSET);
+ pollset->current_pollable_obj = &g_empty_pollable;
pollset->kicked_without_poller = false;
pollset->shutdown_closure = NULL;
pollset->root_worker = NULL;
- *mu = &pollset->pollable.po.mu;
+ *mu = &pollset->pollable_obj.po.mu;
}
static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
@@ -715,8 +718,8 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "fd_become_pollable";
- if (append_error(&error, pollable_materialize(&fd->pollable), err_desc)) {
- append_error(&error, pollable_add_fd(&fd->pollable, fd), err_desc);
+ if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) {
+ append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc);
}
return error;
}
@@ -730,8 +733,8 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
-static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable_t *p) {
- return p != &g_empty_pollable && p != &pollset->pollable;
+static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) {
+ return p != &g_empty_pollable && p != &pollset->pollable_obj;
}
static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
@@ -777,9 +780,9 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- pollable_destroy(&pollset->pollable);
- if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) {
- UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2,
+ pollable_destroy(&pollset->pollable_obj);
+ if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) {
+ UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2,
"pollset_pollable");
}
GRPC_LOG_IF_ERROR("pollset_process_events",
@@ -787,7 +790,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- pollable_t *p, grpc_millis deadline) {
+ pollable *p, grpc_millis deadline) {
int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
@@ -869,69 +872,70 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker->initialized_cv = false;
worker->kicked = false;
worker->pollset = pollset;
- worker->pollable = pollset->current_pollable;
+ worker->pollable_obj = pollset->current_pollable_obj;
- if (pollset_is_pollable_fd(pollset, worker->pollable)) {
- REF_BY((grpc_fd *)worker->pollable, 2, "one_poll");
+ if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
+ REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll");
}
worker_insert(&pollset->root_worker, PWL_POLLSET, worker);
- if (!worker_insert(&worker->pollable->root_worker, PWL_POLLABLE, worker)) {
+ if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE,
+ worker)) {
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
- if (worker->pollable != &pollset->pollable) {
- gpr_mu_unlock(&pollset->pollable.po.mu);
+ if (worker->pollable_obj != &pollset->pollable_obj) {
+ gpr_mu_unlock(&pollset->pollable_obj.po.mu);
}
if (GRPC_TRACER_ON(grpc_polling_trace) &&
- worker->pollable->root_worker != worker) {
+ worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
- worker->pollable, worker,
+ worker->pollable_obj, worker,
poll_deadline_to_millis_timeout(exec_ctx, deadline));
}
- while (do_poll && worker->pollable->root_worker != worker) {
- if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu,
+ while (do_poll && worker->pollable_obj->root_worker != worker) {
+ if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
- worker->pollable, worker);
+ worker->pollable_obj, worker);
}
do_poll = false;
} else if (worker->kicked) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable,
- worker);
+ gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset,
+ worker->pollable_obj, worker);
}
do_poll = false;
} else if (GRPC_TRACER_ON(grpc_polling_trace) &&
- worker->pollable->root_worker != worker) {
+ worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset,
- worker->pollable, worker);
+ worker->pollable_obj, worker);
}
}
- if (worker->pollable != &pollset->pollable) {
- gpr_mu_unlock(&worker->pollable->po.mu);
- gpr_mu_lock(&pollset->pollable.po.mu);
- gpr_mu_lock(&worker->pollable->po.mu);
+ if (worker->pollable_obj != &pollset->pollable_obj) {
+ gpr_mu_unlock(&worker->pollable_obj->po.mu);
+ gpr_mu_lock(&pollset->pollable_obj.po.mu);
+ gpr_mu_lock(&worker->pollable_obj->po.mu);
}
grpc_exec_ctx_invalidate_now(exec_ctx);
}
return do_poll && pollset->shutdown_closure == NULL &&
- pollset->current_pollable == worker->pollable;
+ pollset->current_pollable_obj == worker->pollable_obj;
}
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
if (NEW_ROOT ==
- worker_remove(&worker->pollable->root_worker, PWL_POLLABLE, worker)) {
- gpr_cv_signal(&worker->pollable->root_worker->cv);
+ worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) {
+ gpr_cv_signal(&worker->pollable_obj->root_worker->cv);
}
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
- if (pollset_is_pollable_fd(pollset, worker->pollable)) {
- UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable, 2, "one_poll");
+ if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
+ UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll");
}
if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
@@ -958,41 +962,41 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
- if (pollset->current_pollable != &pollset->pollable) {
- gpr_mu_lock(&pollset->current_pollable->po.mu);
+ if (pollset->current_pollable_obj != &pollset->pollable_obj) {
+ gpr_mu_lock(&pollset->current_pollable_obj->po.mu);
}
if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) {
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure);
- append_error(&error, pollable_materialize(worker.pollable), err_desc);
- if (worker.pollable != &pollset->pollable) {
- gpr_mu_unlock(&worker.pollable->po.mu);
+ append_error(&error, pollable_materialize(worker.pollable_obj), err_desc);
+ if (worker.pollable_obj != &pollset->pollable_obj) {
+ gpr_mu_unlock(&worker.pollable_obj->po.mu);
}
- gpr_mu_unlock(&pollset->pollable.po.mu);
+ gpr_mu_unlock(&pollset->pollable_obj.po.mu);
if (pollset->event_cursor == pollset->event_count) {
- append_error(&error,
- pollset_epoll(exec_ctx, pollset, worker.pollable, deadline),
+ append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj,
+ deadline),
err_desc);
}
append_error(&error, pollset_process_events(exec_ctx, pollset, false),
err_desc);
- gpr_mu_lock(&pollset->pollable.po.mu);
- if (worker.pollable != &pollset->pollable) {
- gpr_mu_lock(&worker.pollable->po.mu);
+ gpr_mu_lock(&pollset->pollable_obj.po.mu);
+ if (worker.pollable_obj != &pollset->pollable_obj) {
+ gpr_mu_lock(&worker.pollable_obj->po.mu);
}
gpr_tls_set(&g_current_thread_pollset, 0);
gpr_tls_set(&g_current_thread_worker, 0);
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
- if (worker.pollable != &pollset->pollable) {
- gpr_mu_unlock(&worker.pollable->po.mu);
+ if (worker.pollable_obj != &pollset->pollable_obj) {
+ gpr_mu_unlock(&worker.pollable_obj->po.mu);
}
if (grpc_exec_ctx_has_work(exec_ctx)) {
- gpr_mu_unlock(&pollset->pollable.po.mu);
+ gpr_mu_unlock(&pollset->pollable_obj.po.mu);
grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->pollable.po.mu);
+ gpr_mu_lock(&pollset->pollable_obj.po.mu);
}
return error;
}
@@ -1009,27 +1013,27 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
bool fd_locked) {
static const char *err_desc = "pollset_add_fd";
grpc_error *error = GRPC_ERROR_NONE;
- if (pollset->current_pollable == &g_empty_pollable) {
+ if (pollset->current_pollable_obj == &g_empty_pollable) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from empty to fd", pollset,
fd);
}
- /* empty pollable --> single fd pollable_t */
+ /* empty pollable --> single fd pollable */
pollset_kick_all(exec_ctx, pollset);
- pollset->current_pollable = &fd->pollable;
- if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu);
+ pollset->current_pollable_obj = &fd->pollable_obj;
+ if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu);
append_error(&error, fd_become_pollable_locked(fd), err_desc);
- if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu);
+ if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu);
REF_BY(fd, 2, "pollset_pollable");
- } else if (pollset->current_pollable == &pollset->pollable) {
+ } else if (pollset->current_pollable_obj == &pollset->pollable_obj) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
}
- append_error(&error, pollable_add_fd(pollset->current_pollable, fd),
+ append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd),
err_desc);
- } else if (pollset->current_pollable != &fd->pollable) {
- grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable;
+ } else if (pollset->current_pollable_obj != &fd->pollable_obj) {
+ grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from fd %p to multipoller",
@@ -1041,11 +1045,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
pollset_kick_all(exec_ctx, pollset);
- pollset->current_pollable = &pollset->pollable;
- if (append_error(&error, pollable_materialize(&pollset->pollable),
+ pollset->current_pollable_obj = &pollset->pollable_obj;
+ if (append_error(&error, pollable_materialize(&pollset->pollable_obj),
err_desc)) {
- pollable_add_fd(&pollset->pollable, had_fd);
- pollable_add_fd(&pollset->pollable, fd);
+ pollable_add_fd(&pollset->pollable_obj, had_fd);
+ pollable_add_fd(&pollset->pollable_obj, fd);
}
GRPC_CLOSURE_SCHED(exec_ctx,
GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd,
@@ -1057,9 +1061,9 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
- gpr_mu_lock(&pollset->pollable.po.mu);
+ gpr_mu_lock(&pollset->pollable_obj.po.mu);
grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false);
- gpr_mu_unlock(&pollset->pollable.po.mu);
+ gpr_mu_unlock(&pollset->pollable_obj.po.mu);
GRPC_LOG_IF_ERROR("pollset_add_fd", error);
}
@@ -1081,7 +1085,7 @@ static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
- po_join(exec_ctx, &pss->po, &fd->pollable.po);
+ po_join(exec_ctx, &pss->po, &fd->pollable_obj.po);
}
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
@@ -1089,7 +1093,7 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pss, grpc_pollset *ps) {
- po_join(exec_ctx, &pss->po, &ps->pollable.po);
+ po_join(exec_ctx, &pss->po, &ps->pollable_obj.po);
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index bf5193de5c..7a5abf4650 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -1528,7 +1528,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
- idx = FD_TO_IDX(fds[i].fd);
+ idx = GRPC_FD_TO_IDX(fds[i].fd);
fd_cvs[i].cv = &pollcv_cv;
fd_cvs[i].prev = NULL;
fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
@@ -1591,8 +1591,8 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
- remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
- if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
+ remove_cvn(&g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
+ if (g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].is_set) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c
index 5e0b1d1704..268e0175dd 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.c
+++ b/src/core/lib/iomgr/wakeup_fd_cv.c
@@ -57,7 +57,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
g_cvfds.free_fds = g_cvfds.free_fds->next_free;
g_cvfds.cvfds[idx].cvs = NULL;
g_cvfds.cvfds[idx].is_set = 0;
- fd_info->read_fd = IDX_TO_FD(idx);
+ fd_info->read_fd = GRPC_IDX_TO_FD(idx);
fd_info->write_fd = -1;
gpr_mu_unlock(&g_cvfds.mu);
return GRPC_ERROR_NONE;
@@ -66,8 +66,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
cv_node* cvn;
gpr_mu_lock(&g_cvfds.mu);
- g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1;
- cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs;
+ g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1;
+ cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs;
while (cvn) {
gpr_cv_signal(cvn->cv);
cvn = cvn->next;
@@ -78,7 +78,7 @@ static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) {
gpr_mu_lock(&g_cvfds.mu);
- g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0;
+ g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 0;
gpr_mu_unlock(&g_cvfds.mu);
return GRPC_ERROR_NONE;
}
@@ -89,9 +89,9 @@ static void cv_fd_destroy(grpc_wakeup_fd* fd_info) {
}
gpr_mu_lock(&g_cvfds.mu);
// Assert that there are no active pollers
- GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs);
- g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds;
- g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)];
+ GPR_ASSERT(!g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs);
+ g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds;
+ g_cvfds.free_fds = &g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)];
gpr_mu_unlock(&g_cvfds.mu);
}
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index 46e84f5843..dc170ad5b4 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -37,8 +37,8 @@
#include "src/core/lib/iomgr/ev_posix.h"
-#define FD_TO_IDX(fd) (-(fd)-1)
-#define IDX_TO_FD(idx) (-(idx)-1)
+#define GRPC_FD_TO_IDX(fd) (-(fd)-1)
+#define GRPC_IDX_TO_FD(idx) (-(idx)-1)
typedef struct cv_node {
gpr_cv* cv;
diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c
index 975d599523..3d19605617 100644
--- a/src/core/lib/security/transport/security_handshaker.c
+++ b/src/core/lib/security/transport/security_handshaker.c
@@ -137,7 +137,7 @@ static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx,
// Create zero-copy frame protector, if implemented.
tsi_zero_copy_grpc_protector *zero_copy_protector = NULL;
tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector(
- h->handshaker_result, NULL, &zero_copy_protector);
+ exec_ctx, h->handshaker_result, NULL, &zero_copy_protector);
if (result != TSI_OK && result != TSI_UNIMPLEMENTED) {
error = grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index ed5138006e..b2146cf211 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -135,7 +135,7 @@ typedef struct batch_control {
typedef struct {
gpr_mu child_list_mu;
grpc_call *first_child;
-} parent_call_t;
+} parent_call;
typedef struct {
grpc_call *parent;
@@ -144,7 +144,7 @@ typedef struct {
parent->mu */
grpc_call *sibling_next;
grpc_call *sibling_prev;
-} child_call_t;
+} child_call;
#define RECV_NONE ((gpr_atm)0)
#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
@@ -157,8 +157,8 @@ struct grpc_call {
grpc_polling_entity pollent;
grpc_channel *channel;
gpr_timespec start_time;
- /* parent_call_t* */ gpr_atm parent_call_atm;
- child_call_t *child_call;
+ /* parent_call* */ gpr_atm parent_call_atm;
+ child_call *child;
/* client or server call */
bool is_client;
@@ -304,21 +304,21 @@ void *grpc_call_arena_alloc(grpc_call *call, size_t size) {
return gpr_arena_alloc(call->arena, size);
}
-static parent_call_t *get_or_create_parent_call(grpc_call *call) {
- parent_call_t *p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm);
+static parent_call *get_or_create_parent_call(grpc_call *call) {
+ parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
if (p == NULL) {
- p = (parent_call_t *)gpr_arena_alloc(call->arena, sizeof(*p));
+ p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p));
gpr_mu_init(&p->child_list_mu);
if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
gpr_mu_destroy(&p->child_list_mu);
- p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm);
+ p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
}
}
return p;
}
-static parent_call_t *get_parent_call(grpc_call *call) {
- return (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm);
+static parent_call *get_parent_call(grpc_call *call) {
+ return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
}
grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
@@ -376,21 +376,21 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
bool immediately_cancel = false;
- if (args->parent_call != NULL) {
- child_call_t *cc = call->child_call =
- (child_call_t *)gpr_arena_alloc(arena, sizeof(child_call_t));
- call->child_call->parent = args->parent_call;
+ if (args->parent != NULL) {
+ child_call *cc = call->child =
+ (child_call *)gpr_arena_alloc(arena, sizeof(child_call));
+ call->child->parent = args->parent;
- GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
+ GRPC_CALL_INTERNAL_REF(args->parent, "child");
GPR_ASSERT(call->is_client);
- GPR_ASSERT(!args->parent_call->is_client);
+ GPR_ASSERT(!args->parent->is_client);
- parent_call_t *pc = get_or_create_parent_call(args->parent_call);
+ parent_call *pc = get_or_create_parent_call(args->parent);
gpr_mu_lock(&pc->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
- send_deadline = GPR_MIN(send_deadline, args->parent_call->send_deadline);
+ send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
}
/* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */
@@ -402,9 +402,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
"Census tracing propagation requested "
"without Census context propagation"));
}
- grpc_call_context_set(
- call, GRPC_CONTEXT_TRACING,
- args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
+ grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
+ args->parent->context[GRPC_CONTEXT_TRACING].value,
+ NULL);
} else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Census context propagation requested "
@@ -412,7 +412,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
}
if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1;
- if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) {
+ if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
immediately_cancel = true;
}
}
@@ -422,9 +422,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
cc->sibling_next = cc->sibling_prev = call;
} else {
cc->sibling_next = pc->first_child;
- cc->sibling_prev = pc->first_child->child_call->sibling_prev;
- cc->sibling_next->child_call->sibling_prev =
- cc->sibling_prev->child_call->sibling_next = call;
+ cc->sibling_prev = pc->first_child->child->sibling_prev;
+ cc->sibling_next->child->sibling_prev =
+ cc->sibling_prev->child->sibling_next = call;
}
gpr_mu_unlock(&pc->child_list_mu);
@@ -529,7 +529,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
- parent_call_t *pc = get_parent_call(c);
+ parent_call *pc = get_parent_call(c);
if (pc != NULL) {
gpr_mu_destroy(&pc->child_list_mu);
}
@@ -566,14 +566,14 @@ void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); }
void grpc_call_unref(grpc_call *c) {
if (!gpr_unref(&c->ext_ref)) return;
- child_call_t *cc = c->child_call;
+ child_call *cc = c->child;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_call_unref", 0);
GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
if (cc) {
- parent_call_t *pc = get_parent_call(cc->parent);
+ parent_call *pc = get_parent_call(cc->parent);
gpr_mu_lock(&pc->child_list_mu);
if (c == pc->first_child) {
pc->first_child = cc->sibling_next;
@@ -581,8 +581,8 @@ void grpc_call_unref(grpc_call *c) {
pc->first_child = NULL;
}
}
- cc->sibling_prev->child_call->sibling_next = cc->sibling_next;
- cc->sibling_next->child_call->sibling_prev = cc->sibling_prev;
+ cc->sibling_prev->child->sibling_next = cc->sibling_next;
+ cc->sibling_next->child->sibling_prev = cc->sibling_prev;
gpr_mu_unlock(&pc->child_list_mu);
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
}
@@ -1310,14 +1310,14 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
- parent_call_t *pc = get_parent_call(call);
+ parent_call *pc = get_parent_call(call);
if (pc != NULL) {
grpc_call *child;
gpr_mu_lock(&pc->child_list_mu);
child = pc->first_child;
if (child != NULL) {
do {
- next_child_call = child->child_call->sibling_next;
+ next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index be362c8000..27c2f5243c 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -37,7 +37,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
typedef struct grpc_call_create_args {
grpc_channel *channel;
- grpc_call *parent_call;
+ grpc_call *parent;
uint32_t propagation_mask;
grpc_completion_queue *cq;
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 89760291c2..0b5ca17cc4 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -27,6 +27,7 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/string.h"
@@ -77,6 +78,11 @@ grpc_channel *grpc_channel_create_with_builder(
grpc_channel_args *args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
grpc_channel *channel;
+ if (channel_stack_type == GRPC_SERVER_CHANNEL) {
+ GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx);
+ } else {
+ GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx);
+ }
grpc_error *error = grpc_channel_stack_builder_finish(
exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL,
(void **)&channel);
@@ -276,7 +282,7 @@ static grpc_call *grpc_channel_create_call_internal(
grpc_call_create_args args;
memset(&args, 0, sizeof(args));
args.channel = channel;
- args.parent_call = parent_call;
+ args.parent = parent_call;
args.propagation_mask = propagation_mask;
args.cq = cq;
args.pollset_set_alternative = pollset_set_alternative;
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index b0da5f2364..5335cf111e 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -26,6 +26,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
@@ -421,6 +422,10 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
const cq_poller_vtable *poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_STATS_INC_CQS_CREATED(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+
cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) +
vtable->data_size +
poller_vtable->size());
@@ -576,12 +581,12 @@ static bool atm_inc_if_nonzero(gpr_atm *counter) {
}
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
@@ -626,7 +631,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
}
}
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
storage->tag = tag;
@@ -687,7 +692,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
@@ -770,7 +775,7 @@ typedef struct {
static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
grpc_completion_queue *cq = a->cq;
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -819,7 +824,7 @@ static void dump_pending_tags(grpc_completion_queue *cq) {}
static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) {
grpc_event ret;
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -950,7 +955,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
this function */
static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
@@ -961,7 +966,7 @@ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_next() below, that would call pollset shutdown.
@@ -991,7 +996,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
static int add_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
@@ -1003,7 +1008,7 @@ static int add_plucker(grpc_completion_queue *cq, void *tag,
static void del_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
cqd->num_pluckers--;
@@ -1017,7 +1022,7 @@ static void del_plucker(grpc_completion_queue *cq, void *tag,
static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
grpc_completion_queue *cq = a->cq;
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -1052,7 +1057,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -1174,7 +1179,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
@@ -1188,7 +1193,7 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
* merging them is a bit tricky and probably not worth it */
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 280315036f..b089da2c54 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -36,6 +36,7 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/resource_quota.h"
+#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/alarm_internal.h"
@@ -179,14 +180,16 @@ void grpc_shutdown(void) {
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
- grpc_iomgr_shutdown(&exec_ctx);
- gpr_timers_global_destroy();
- grpc_tracer_shutdown();
+ grpc_executor_shutdown(&exec_ctx);
+ grpc_timer_manager_set_threading(false); // shutdown timer_manager thread
for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != NULL) {
g_all_of_the_plugins[i].destroy();
}
}
+ grpc_iomgr_shutdown(&exec_ctx);
+ gpr_timers_global_destroy();
+ grpc_tracer_shutdown();
grpc_mdctx_global_shutdown(&exec_ctx);
grpc_handshaker_factory_registry_shutdown(&exec_ctx);
grpc_slice_intern_shutdown();
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 64e5ce78e7..1d3980eb23 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -76,7 +76,7 @@ typedef struct requested_call {
grpc_call_details *details;
} batch;
struct {
- registered_method *registered_method;
+ registered_method *method;
gpr_timespec *deadline;
grpc_byte_buffer **optional_payload;
} registered;
@@ -145,7 +145,7 @@ struct call_data {
uint32_t recv_initial_metadata_flags;
grpc_metadata_array initial_metadata;
- request_matcher *request_matcher;
+ request_matcher *matcher;
grpc_byte_buffer *payload;
grpc_closure got_initial_metadata;
@@ -171,7 +171,7 @@ struct registered_method {
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
/* one request matcher per method */
- request_matcher request_matcher;
+ request_matcher matcher;
registered_method *next;
};
@@ -334,7 +334,7 @@ static void request_matcher_destroy(request_matcher *rm) {
static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem,
grpc_error *error) {
- grpc_call_unref(grpc_call_from_top_element(elem));
+ grpc_call_unref(grpc_call_from_top_element((grpc_call_element *)elem));
}
static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
@@ -387,7 +387,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
if (server->started) {
- request_matcher_destroy(&rm->request_matcher);
+ request_matcher_destroy(&rm->matcher);
}
gpr_free(rm->method);
gpr_free(rm->host);
@@ -521,7 +521,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
grpc_call_element *call_elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)call_elem->call_data;
channel_data *chand = (channel_data *)call_elem->channel_data;
- request_matcher *rm = calld->request_matcher;
+ request_matcher *rm = calld->matcher;
grpc_server *server = rm->server;
if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
@@ -585,7 +585,7 @@ static void finish_start_new_rpc(
return;
}
- calld->request_matcher = rm;
+ calld->matcher = rm;
switch (payload_handling) {
case GRPC_SRM_PAYLOAD_NONE:
@@ -631,7 +631,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
continue;
}
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
+ &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling);
return;
}
@@ -649,7 +649,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
continue;
}
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
+ &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling);
return;
}
@@ -670,7 +670,7 @@ static int num_listeners(grpc_server *server) {
static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server,
grpc_cq_completion *completion) {
- server_unref(exec_ctx, server);
+ server_unref(exec_ctx, (grpc_server *)server);
}
static int num_channels(grpc_server *server) {
@@ -693,9 +693,9 @@ static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
exec_ctx, &server->unregistered_request_matcher);
for (registered_method *rm = server->registered_methods; rm;
rm = rm->next) {
- request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher,
+ request_matcher_kill_requests(exec_ctx, server, &rm->matcher,
GRPC_ERROR_REF(error));
- request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher);
}
}
GRPC_ERROR_UNREF(error);
@@ -1116,7 +1116,7 @@ void grpc_server_start(grpc_server *server) {
request_matcher_init(&server->unregistered_request_matcher,
(size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher,
+ request_matcher_init(&rm->matcher,
(size_t)server->max_requested_calls_per_cq, server);
}
@@ -1269,8 +1269,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* stay locked, and gather up some stuff to do */
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
- grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
- NULL, gpr_malloc(sizeof(grpc_cq_completion)));
+ grpc_cq_end_op(
+ &exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL,
+ (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion)));
gpr_mu_unlock(&server->mu_global);
goto done;
}
@@ -1392,7 +1393,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &server->unregistered_request_matcher;
break;
case REGISTERED_CALL:
- rm = &rc->data.registered.registered_method->request_matcher;
+ rm = &rc->data.registered.method->matcher;
break;
}
server->requested_calls_per_cq[cq_idx][request_id] = *rc;
@@ -1521,7 +1522,7 @@ grpc_call_error grpc_server_request_registered_call(
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->call = call;
- rc->data.registered.registered_method = rm;
+ rc->data.registered.method = rm;
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index b7c46e09bb..2df9c9189c 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -233,32 +233,32 @@ void grpc_metadata_batch_remove(grpc_exec_ctx *exec_ctx,
void grpc_metadata_batch_set_value(grpc_exec_ctx *exec_ctx,
grpc_linked_mdelem *storage,
grpc_slice value) {
- grpc_mdelem old = storage->md;
- grpc_mdelem new = grpc_mdelem_from_slices(
- exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old)), value);
- storage->md = new;
- GRPC_MDELEM_UNREF(exec_ctx, old);
+ grpc_mdelem old_mdelem = storage->md;
+ grpc_mdelem new_mdelem = grpc_mdelem_from_slices(
+ exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old_mdelem)), value);
+ storage->md = new_mdelem;
+ GRPC_MDELEM_UNREF(exec_ctx, old_mdelem);
}
grpc_error *grpc_metadata_batch_substitute(grpc_exec_ctx *exec_ctx,
grpc_metadata_batch *batch,
grpc_linked_mdelem *storage,
- grpc_mdelem new) {
+ grpc_mdelem new_mdelem) {
assert_valid_callouts(exec_ctx, batch);
grpc_error *error = GRPC_ERROR_NONE;
- grpc_mdelem old = storage->md;
- if (!grpc_slice_eq(GRPC_MDKEY(new), GRPC_MDKEY(old))) {
+ grpc_mdelem old_mdelem = storage->md;
+ if (!grpc_slice_eq(GRPC_MDKEY(new_mdelem), GRPC_MDKEY(old_mdelem))) {
maybe_unlink_callout(batch, storage);
- storage->md = new;
+ storage->md = new_mdelem;
error = maybe_link_callout(batch, storage);
if (error != GRPC_ERROR_NONE) {
unlink_storage(&batch->list, storage);
GRPC_MDELEM_UNREF(exec_ctx, storage->md);
}
} else {
- storage->md = new;
+ storage->md = new_mdelem;
}
- GRPC_MDELEM_UNREF(exec_ctx, old);
+ GRPC_MDELEM_UNREF(exec_ctx, old_mdelem);
assert_valid_callouts(exec_ctx, batch);
return error;
}
@@ -300,12 +300,12 @@ grpc_error *grpc_metadata_batch_filter(grpc_exec_ctx *exec_ctx,
grpc_error *error = GRPC_ERROR_NONE;
while (l) {
grpc_linked_mdelem *next = l->next;
- grpc_filtered_mdelem new = func(exec_ctx, user_data, l->md);
- add_error(&error, new.error, composite_error_string);
- if (GRPC_MDISNULL(new.md)) {
+ grpc_filtered_mdelem new_mdelem = func(exec_ctx, user_data, l->md);
+ add_error(&error, new_mdelem.error, composite_error_string);
+ if (GRPC_MDISNULL(new_mdelem.md)) {
grpc_metadata_batch_remove(exec_ctx, batch, l);
- } else if (new.md.payload != l->md.payload) {
- grpc_metadata_batch_substitute(exec_ctx, batch, l, new.md);
+ } else if (new_mdelem.md.payload != l->md.payload) {
+ grpc_metadata_batch_substitute(exec_ctx, batch, l, new_mdelem.md);
}
l = next;
}
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index caa11a956e..ae705195f3 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -102,8 +102,9 @@ static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) {
grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
void *buffer, size_t length) {
slice_stream_ref(&refcount->slice_refcount);
- return (grpc_slice){.refcount = &refcount->slice_refcount,
- .data.refcounted = {.bytes = buffer, .length = length}};
+ return (grpc_slice){
+ .refcount = &refcount->slice_refcount,
+ .data.refcounted = {.bytes = (uint8_t *)buffer, .length = length}};
}
static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {
diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c
index e7b3be3d86..64043fea08 100644
--- a/src/core/tsi/fake_transport_security.c
+++ b/src/core/tsi/fake_transport_security.c
@@ -493,7 +493,8 @@ static tsi_result fake_handshaker_result_extract_peer(
}
static tsi_result fake_handshaker_result_create_zero_copy_grpc_protector(
- const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ void *exec_ctx, const tsi_handshaker_result *self,
+ size_t *max_output_protected_frame_size,
tsi_zero_copy_grpc_protector **protector) {
*protector =
tsi_create_fake_zero_copy_grpc_protector(max_output_protected_frame_size);
diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h
index b0d7039850..3bba38149c 100644
--- a/src/core/tsi/transport_security.h
+++ b/src/core/tsi/transport_security.h
@@ -84,11 +84,17 @@ struct tsi_handshaker {
};
/* Base for tsi_handshaker_result implementations.
- See transport_security_interface.h for documentation. */
+ See transport_security_interface.h for documentation.
+ The exec_ctx parameter in create_zero_copy_grpc_protector is supposed to be
+ of type grpc_exec_ctx*, but we're using void* instead to avoid making the TSI
+ API depend on grpc. The create_zero_copy_grpc_protector() method is only used
+ in grpc, where we do need the exec_ctx passed through, but the API still
+ needs to compile in other applications, where grpc_exec_ctx is not defined.
+*/
typedef struct {
tsi_result (*extract_peer)(const tsi_handshaker_result *self, tsi_peer *peer);
tsi_result (*create_zero_copy_grpc_protector)(
- const tsi_handshaker_result *self,
+ void *exec_ctx, const tsi_handshaker_result *self,
size_t *max_output_protected_frame_size,
tsi_zero_copy_grpc_protector **protector);
tsi_result (*create_frame_protector)(const tsi_handshaker_result *self,
diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c
index 773b35e717..affd995230 100644
--- a/src/core/tsi/transport_security_grpc.c
+++ b/src/core/tsi/transport_security_grpc.c
@@ -20,16 +20,18 @@
/* This method creates a tsi_zero_copy_grpc_protector object. */
tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector(
- const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ grpc_exec_ctx *exec_ctx, const tsi_handshaker_result *self,
+ size_t *max_output_protected_frame_size,
tsi_zero_copy_grpc_protector **protector) {
- if (self == NULL || self->vtable == NULL || protector == NULL) {
+ if (exec_ctx == NULL || self == NULL || self->vtable == NULL ||
+ protector == NULL) {
return TSI_INVALID_ARGUMENT;
}
if (self->vtable->create_zero_copy_grpc_protector == NULL) {
return TSI_UNIMPLEMENTED;
}
return self->vtable->create_zero_copy_grpc_protector(
- self, max_output_protected_frame_size, protector);
+ exec_ctx, self, max_output_protected_frame_size, protector);
}
/* --- tsi_zero_copy_grpc_protector common implementation. ---
diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h
index 375a758888..ca6755c12f 100644
--- a/src/core/tsi/transport_security_grpc.h
+++ b/src/core/tsi/transport_security_grpc.h
@@ -30,7 +30,8 @@ extern "C" {
assuming there is no fatal error.
The caller is responsible for destroying the protector. */
tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector(
- const tsi_handshaker_result *self, size_t *max_output_protected_frame_size,
+ grpc_exec_ctx *exec_ctx, const tsi_handshaker_result *self,
+ size_t *max_output_protected_frame_size,
tsi_zero_copy_grpc_protector **protector);
/* -- tsi_zero_copy_grpc_protector object -- */