aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-11-16 15:25:00 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-11-16 15:25:00 -0800
commitc586666cbdfc5b5f2a3306892f8b4491862c5aba (patch)
tree924f3f760a489ccf7c6c70307b72d1753aedbdf3 /src
parent7cdad96fc49090eb5e3a12a7cca5a9f257d3f301 (diff)
parent1dc9ad33273e090a1c7ffa05991dc8ccc2badee6 (diff)
Merge github.com:grpc/grpc into slice_with_exec_ctx
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/census/census_log.h2
-rw-r--r--src/core/ext/census/mlog.h2
-rw-r--r--src/core/ext/client_channel/client_channel.c4
-rw-r--r--src/core/ext/client_channel/subchannel.c6
-rw-r--r--src/core/ext/client_channel/subchannel.h4
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c38
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c2
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c313
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c19
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c3
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c9
-rw-r--r--src/core/lib/channel/channel_args.c6
-rw-r--r--src/core/lib/channel/channel_args.h8
-rw-r--r--src/core/lib/channel/channel_stack.c5
-rw-r--r--src/core/lib/channel/channel_stack.h3
-rw-r--r--src/core/lib/channel/compress_filter.c3
-rw-r--r--src/core/lib/channel/http_client_filter.c75
-rw-r--r--src/core/lib/channel/http_server_filter.c47
-rw-r--r--src/core/lib/iomgr/endpoint.c2
-rw-r--r--src/core/lib/iomgr/endpoint.h5
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c2
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c2076
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.h41
-rw-r--r--src/core/lib/iomgr/ev_posix.c2
-rw-r--r--src/core/lib/iomgr/socket_mutator.c98
-rw-r--r--src/core/lib/iomgr/socket_mutator.h80
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.c9
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h5
-rw-r--r--src/core/lib/iomgr/tcp_client.h1
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c16
-rw-r--r--src/core/lib/iomgr/tcp_posix.c8
-rw-r--r--src/core/lib/iomgr/tcp_uv.c99
-rw-r--r--src/core/lib/iomgr/tcp_windows.c5
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_credentials.c45
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c33
-rw-r--r--src/core/lib/security/credentials/plugin/plugin_credentials.c2
-rw-r--r--src/core/lib/security/transport/handshake.c2
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c13
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c3
-rw-r--r--src/core/lib/slice/slice.c8
-rw-r--r--src/core/lib/surface/call.c50
-rw-r--r--src/core/lib/surface/server.c14
-rw-r--r--src/core/lib/transport/connectivity_state.c3
-rw-r--r--src/core/lib/transport/metadata.c7
-rw-r--r--src/core/lib/transport/metadata.h4
-rw-r--r--src/core/lib/transport/metadata_batch.c8
-rw-r--r--src/core/lib/transport/metadata_batch.h3
-rw-r--r--src/core/lib/transport/transport.c5
-rw-r--r--src/core/lib/transport/transport.h5
-rw-r--r--src/core/lib/transport/transport_impl.h3
-rw-r--r--src/cpp/common/channel_arguments.cc25
-rw-r--r--src/cpp/common/channel_filter.cc8
-rw-r--r--src/cpp/common/channel_filter.h3
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs97
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs41
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs13
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs22
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs23
-rw-r--r--src/csharp/Grpc.Core/Profiling/Profilers.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs28
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c6
-rw-r--r--src/node/ext/byte_buffer.cc2
-rw-r--r--src/node/src/common.js16
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h7
69 files changed, 963 insertions, 2558 deletions
diff --git a/src/core/ext/census/census_log.h b/src/core/ext/census/census_log.h
index 534ecc5705..1b185a53b9 100644
--- a/src/core/ext/census/census_log.h
+++ b/src/core/ext/census/census_log.h
@@ -84,7 +84,7 @@ const void *census_log_read_next(size_t *bytes_available);
*/
size_t census_log_remaining_space(void);
-/* Returns the number of times gprc_stats_log_start_write() failed due to
+/* Returns the number of times grpc_stats_log_start_write() failed due to
out-of-space. */
int census_log_out_of_space_count(void);
diff --git a/src/core/ext/census/mlog.h b/src/core/ext/census/mlog.h
index a256426f91..18805ad994 100644
--- a/src/core/ext/census/mlog.h
+++ b/src/core/ext/census/mlog.h
@@ -88,7 +88,7 @@ const void* census_log_read_next(size_t* bytes_available);
*/
size_t census_log_remaining_space(void);
-/* Returns the number of times gprc_stats_log_start_write() failed due to
+/* Returns the number of times grpc_stats_log_start_write() failed due to
out-of-space. */
int64_t census_log_out_of_space_count(void);
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 7954fcfb8b..8f0c856afe 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -645,7 +645,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
- calld->deadline, &subchannel_call);
+ calld->call_start_time, calld->deadline, &subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL;
@@ -898,7 +898,7 @@ retry:
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
- calld->deadline, &subchannel_call);
+ calld->call_start_time, calld->deadline, &subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index bcb3082267..6ba6254785 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -703,15 +703,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
- grpc_subchannel_call **call) {
+ grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time,
+ gpr_timespec deadline, grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
- NULL, NULL, path, deadline, callstk);
+ NULL, NULL, path, start_time, deadline, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 93bd72d20d..10bae620df 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -111,8 +111,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
- grpc_subchannel_call **subchannel_call);
+ grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec start_time,
+ gpr_timespec deadline, grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index a85c16b881..4bca8def39 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -187,6 +187,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
* addresses failed to connect). There won't be any user_data/token
* available */
if (wc_arg->target != NULL) {
+ GPR_ASSERT(wc_arg->lb_token != NULL);
initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
@@ -606,10 +607,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
* right grpclb status. */
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
+ gpr_mu_lock(&glb_policy->mu);
if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
!glb_policy->shutting_down) {
- gpr_mu_lock(&glb_policy->mu);
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, GRPC_ERROR_REF(error),
@@ -618,12 +619,12 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
&rr_conn_data->on_change);
- gpr_mu_unlock(&glb_policy->mu);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb");
gpr_free(rr_conn_data);
}
+ gpr_mu_unlock(&glb_policy->mu);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -761,17 +762,24 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
if (glb_policy->rr_policy) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
}
- if (glb_policy->started_picking) {
- if (glb_policy->lb_call != NULL) {
- grpc_call_cancel(glb_policy->lb_call, NULL);
- /* lb_on_server_status_received will pick up the cancel and clean up */
- }
- }
grpc_connectivity_state_set(
exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
+ /* We need a copy of the lb_call pointer because we can't cancell the call
+ * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
+ * the cancel, needs to acquire that same lock */
+ grpc_call *lb_call = glb_policy->lb_call;
+ glb_policy->lb_call = NULL;
gpr_mu_unlock(&glb_policy->mu);
+ /* glb_policy->lb_call and this local lb_call must be consistent at this point
+ * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
+ * of query_for_backends_locked, which can only be invoked while
+ * glb_policy->shutting_down is false. */
+ if (lb_call != NULL) {
+ grpc_call_cancel(lb_call, NULL);
+ /* lb_on_server_status_received will pick up the cancel and clean up */
+ }
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
@@ -955,9 +963,11 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void lb_call_init(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) {
+static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
+ glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->server_name != NULL);
GPR_ASSERT(glb_policy->server_name[0] != '\0');
+ GPR_ASSERT(!glb_policy->shutting_down);
/* Note the following LB call progresses every time there's activity in \a
* glb_policy->base.interested_parties, which is comprised of the polling
@@ -1010,7 +1020,9 @@ static void lb_call_destroy_locked(glb_lb_policy *glb_policy) {
static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->lb_channel != NULL);
- lb_call_init(exec_ctx, glb_policy);
+ if (glb_policy->shutting_down) return;
+
+ lb_call_init_locked(exec_ctx, glb_policy);
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
@@ -1082,6 +1094,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
+ gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_response_payload != NULL) {
gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
/* Received data from the LB server. Look inside
@@ -1110,7 +1123,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
/* update serverlist */
if (serverlist->num_servers > 0) {
- gpr_mu_lock(&glb_policy->mu);
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
@@ -1126,7 +1138,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
rr_handover_locked(exec_ctx, glb_policy, error);
}
- gpr_mu_unlock(&glb_policy->mu);
} else {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
@@ -1154,9 +1165,11 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
&glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
+ gpr_mu_unlock(&glb_policy->mu);
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
+ gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_response_received_empty_payload");
}
@@ -1176,7 +1189,6 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
query_for_backends_locked(exec_ctx, glb_policy);
}
gpr_mu_unlock(&glb_policy->mu);
-
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"grpclb_on_retry_timer");
}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index ac3c6a305a..c69f773e78 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -292,6 +292,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
} else {
loop:
switch (p->checking_connectivity) {
+ case GRPC_CHANNEL_INIT:
+ GPR_UNREACHABLE_CODE(return );
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index e101c0369c..f93dd078ce 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -116,8 +116,13 @@ typedef struct {
grpc_closure connectivity_changed_closure;
/** this subchannels current position in subchannel->ready_list */
ready_list *ready_list_node;
- /** last observed connectivity */
- grpc_connectivity_state connectivity_state;
+ /** last observed connectivity. Not updated by
+ * \a grpc_subchannel_notify_on_state_change. Used to determine the previous
+ * state while processing the new state in \a rr_connectivity_changed */
+ grpc_connectivity_state prev_connectivity_state;
+ /** current connectivity state. Updated by \a
+ * grpc_subchannel_notify_on_state_change */
+ grpc_connectivity_state curr_connectivity_state;
/** the subchannel's target user data */
void *user_data;
/** vtable to operate over \a user_data */
@@ -127,6 +132,7 @@ typedef struct {
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
+ gpr_mu mu;
/** total number of addresses received at creation time */
size_t num_addresses;
@@ -135,8 +141,11 @@ struct round_robin_lb_policy {
size_t num_subchannels;
subchannel_data **subchannels;
- /** mutex protecting remaining members */
- gpr_mu mu;
+ /** how many subchannels are in TRANSIENT_FAILURE */
+ size_t num_transient_failures;
+ /** how many subchannels are IDLE */
+ size_t num_idle;
+
/** have we started picking? */
int started_picking;
/** are we shutting down? */
@@ -258,6 +267,10 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
gpr_free(node);
}
+static bool is_ready_list_empty(round_robin_lb_policy *p) {
+ return p->ready_list.prev == NULL;
+}
+
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
ready_list *elem;
@@ -268,7 +281,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy");
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_destroy");
if (sd->user_data != NULL) {
GPR_ASSERT(sd->user_data_vtable != NULL);
sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
@@ -381,18 +394,18 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
- if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
- p->num_subchannels);
- }
-
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
- sd->connectivity_state = GRPC_CHANNEL_IDLE;
+ /* use some sentinel value outside of the range of grpc_connectivity_state
+ * to signal an undefined previous state. We won't be referring to this
+ * value again and it'll be overwritten after the first call to
+ * rr_connectivity_changed */
+ sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
+ sd->curr_connectivity_state = GRPC_CHANNEL_IDLE;
+ GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->connectivity_state, &sd->connectivity_changed_closure);
- GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
+ &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
}
}
@@ -422,7 +435,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
/* readily available, report right away */
*target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "picked");
+ "rr_picked");
if (user_data != NULL) {
*user_data = selected->user_data;
@@ -453,125 +466,184 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
+static void update_state_counters(subchannel_data *sd) {
+ round_robin_lb_policy *p = sd->policy;
+
+ /* update p->num_transient_failures (resp. p->num_idle): if the previous
+ * state was TRANSIENT_FAILURE (resp. IDLE), decrement
+ * p->num_transient_failures (resp. p->num_idle). */
+ if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(p->num_transient_failures > 0);
+ --p->num_transient_failures;
+ } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) {
+ GPR_ASSERT(p->num_idle > 0);
+ --p->num_idle;
+ }
+}
+
+/* sd is the subchannel_data associted with the updated subchannel.
+ * shutdown_error will only be used upon policy transition to TRANSIENT_FAILURE
+ * or SHUTDOWN */
+static grpc_connectivity_state update_lb_connectivity_status(
+ grpc_exec_ctx *exec_ctx, subchannel_data *sd, grpc_error *error) {
+ /* In priority order. The first rule to match terminates the search (ie, if we
+ * are on rule n, all previous rules were unfulfilled).
+ *
+ * 1) RULE: ANY subchannel is READY => policy is READY.
+ * CHECK: At least one subchannel is ready iff p->ready_list is NOT empty.
+ *
+ * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
+ * CHECK: sd->curr_connectivity_state == CONNECTING.
+ *
+ * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
+ * CHECK: p->num_subchannels = 0.
+ *
+ * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
+ * TRANSIENT_FAILURE.
+ * CHECK: p->num_transient_failures == p->num_subchannels.
+ *
+ * 5) RULE: ALL subchannels are IDLE => policy is IDLE.
+ * CHECK: p->num_idle == p->num_subchannels.
+ */
+ round_robin_lb_policy *p = sd->policy;
+ if (!is_ready_list_empty(p)) { /* 1) READY */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
+ GRPC_ERROR_NONE, "rr_ready");
+ return GRPC_CHANNEL_READY;
+ } else if (sd->curr_connectivity_state ==
+ GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
+ "rr_connecting");
+ return GRPC_CHANNEL_CONNECTING;
+ } else if (p->num_subchannels == 0) { /* 3) SHUTDOWN */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+ "rr_shutdown");
+ return GRPC_CHANNEL_SHUTDOWN;
+ } else if (p->num_transient_failures ==
+ p->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+ GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "rr_transient_failure");
+ return GRPC_CHANNEL_TRANSIENT_FAILURE;
+ } else if (p->num_idle == p->num_subchannels) { /* 5) IDLE */
+ grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
+ GRPC_ERROR_NONE, "rr_idle");
+ return GRPC_CHANNEL_IDLE;
+ }
+ /* no change */
+ return sd->curr_connectivity_state;
+}
+
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
- int unref = 0;
-
GRPC_ERROR_REF(error);
gpr_mu_lock(&p->mu);
if (p->shutdown) {
- unref = 1;
- } else {
- switch (sd->connectivity_state) {
- case GRPC_CHANNEL_READY:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, GRPC_ERROR_REF(error),
- "connecting_ready");
- /* add the newly connected subchannel to the list of connected ones.
- * Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd);
- /* at this point we know there's at least one suitable subchannel. Go
- * ahead and pick one and notify the pending suitors in
- * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
- ready_list *selected = peek_next_connected_locked(p);
- GPR_ASSERT(selected != NULL);
- if (p->pending_picks != NULL) {
- /* if the selected subchannel is going to be used for the pending
- * picks, update the last picked pointer */
- advance_last_picked_locked(p);
+ gpr_mu_unlock(&p->mu);
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ switch (sd->curr_connectivity_state) {
+ case GRPC_CHANNEL_INIT:
+ GPR_UNREACHABLE_CODE(return );
+ case GRPC_CHANNEL_READY:
+ /* add the newly connected subchannel to the list of connected ones.
+ * Note that it goes to the "end of the line". */
+ sd->ready_list_node = add_connected_sc_locked(p, sd);
+ /* at this point we know there's at least one suitable subchannel. Go
+ * ahead and pick one and notify the pending suitors in
+ * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
+ ready_list *selected = peek_next_connected_locked(p);
+ GPR_ASSERT(selected != NULL);
+ if (p->pending_picks != NULL) {
+ /* if the selected subchannel is going to be used for the pending
+ * picks, update the last picked pointer */
+ advance_last_picked_locked(p);
+ }
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
+ grpc_subchannel_get_connected_subchannel(selected->subchannel),
+ "rr_picked");
+ if (pp->user_data != NULL) {
+ *pp->user_data = selected->user_data;
}
-
+ if (grpc_lb_round_robin_trace) {
+ gpr_log(GPR_DEBUG,
+ "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
+ (void *)selected->subchannel, (void *)selected);
+ }
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
+ gpr_free(pp);
+ }
+ update_lb_connectivity_status(exec_ctx, sd, error);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ break;
+ case GRPC_CHANNEL_IDLE:
+ ++p->num_idle;
+ /* fallthrough */
+ case GRPC_CHANNEL_CONNECTING:
+ update_state_counters(sd);
+ update_lb_connectivity_status(exec_ctx, sd, error);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ break;
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ ++p->num_transient_failures;
+ /* remove from ready list if still present */
+ if (sd->ready_list_node != NULL) {
+ remove_disconnected_sc_locked(p, sd->ready_list_node);
+ sd->ready_list_node = NULL;
+ }
+ update_lb_connectivity_status(exec_ctx, sd, error);
+ sd->prev_connectivity_state = sd->curr_connectivity_state;
+ /* renew notification: reuses the "rr_connectivity" weak ref */
+ grpc_subchannel_notify_on_state_change(
+ exec_ctx, sd->subchannel, p->base.interested_parties,
+ &sd->curr_connectivity_state, &sd->connectivity_changed_closure);
+ break;
+ case GRPC_CHANNEL_SHUTDOWN:
+ update_state_counters(sd);
+ if (sd->ready_list_node != NULL) {
+ remove_disconnected_sc_locked(p, sd->ready_list_node);
+ sd->ready_list_node = NULL;
+ }
+ --p->num_subchannels;
+ GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
+ p->subchannels[p->num_subchannels]);
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "rr_subchannel_shutdown");
+ p->subchannels[sd->index]->index = sd->index;
+ if (update_lb_connectivity_status(exec_ctx, sd, error) ==
+ GRPC_CHANNEL_SHUTDOWN) {
+ /* the policy is shutting down. Flush all the pending picks... */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
-
- *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(
- grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "picked");
- if (pp->user_data != NULL) {
- *pp->user_data = selected->user_data;
- }
- if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG,
- "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- (void *)selected->subchannel, (void *)selected);
- }
+ *pp->target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, sd->connectivity_state,
- GRPC_ERROR_REF(error), "connecting_changed");
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->connectivity_state, &sd->connectivity_changed_closure);
- break;
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- /* renew state notification */
- grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, p->base.interested_parties,
- &sd->connectivity_state, &sd->connectivity_changed_closure);
-
- /* remove from ready list if still present */
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "connecting_transient_failure");
- break;
- case GRPC_CHANNEL_SHUTDOWN:
- if (sd->ready_list_node != NULL) {
- remove_disconnected_sc_locked(p, sd->ready_list_node);
- sd->ready_list_node = NULL;
- }
-
- p->num_subchannels--;
- GPR_SWAP(subchannel_data *, p->subchannels[sd->index],
- p->subchannels[p->num_subchannels]);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
- p->subchannels[sd->index]->index = sd->index;
- gpr_free(sd);
-
- unref = 1;
- if (p->num_subchannels == 0) {
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
- GRPC_ERROR_CREATE_REFERENCING("Round Robin Channels Exhausted",
- &error, 1),
- "no_more_channels");
- while ((pp = p->pending_picks)) {
- p->pending_picks = pp->next;
- *pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
- NULL);
- gpr_free(pp);
- }
- } else {
- grpc_connectivity_state_set(
- exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_REF(error), "subchannel_failed");
- }
- } /* switch */
- } /* !unref */
-
- gpr_mu_unlock(&p->mu);
-
- if (unref) {
- GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity");
+ }
+ gpr_free(sd);
+ /* unref the "rr_connectivity" weak ref from start_picking */
+ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity");
+ break;
}
-
+ gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
@@ -607,9 +679,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
target = GRPC_CONNECTED_SUBCHANNEL_REF(
grpc_subchannel_get_connected_subchannel(selected->subchannel),
- "picked");
+ "rr_picked");
grpc_connected_subchannel_ping(exec_ctx, target, closure);
- GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "picked");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked");
} else {
gpr_mu_unlock(&p->mu);
grpc_exec_ctx_sched(exec_ctx, closure,
@@ -705,6 +777,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"round_robin");
+
+ if (grpc_lb_round_robin_trace) {
+ gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels",
+ (void *)p, (unsigned long)p->num_subchannels);
+ }
gpr_mu_init(&p->mu);
return &p->base;
}
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index 2b08c4efec..f50e6520cd 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -68,7 +68,8 @@ typedef struct {
grpc_exec_ctx *exec_ctx;
} recv_md_filter_args;
-static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *recv_md_filter(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem *md) {
recv_md_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
call_data *calld = elem->call_data;
@@ -189,7 +190,8 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
*/
}
-static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *lr_trailing_md_filter(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 96d916e414..7baeddc7b8 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1040,7 +1040,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
"op.send_initial_metadata");
}
} else {
- s->send_trailing_metadata = NULL;
+ s->send_initial_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE(
@@ -1527,13 +1527,17 @@ static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_error *error) {
error =
removal_error(error, s, "Pending writes failed due to stream closure");
- s->fetching_send_message = NULL;
+ s->send_initial_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_REF(error),
"send_initial_metadata_finished");
+
+ s->send_trailing_metadata = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_trailing_metadata_finished,
GRPC_ERROR_REF(error), "send_trailing_metadata_finished");
+
+ s->fetching_send_message = NULL;
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error),
"fetching_send_message_finished");
@@ -2298,6 +2302,14 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
}
+/*******************************************************************************
+ * MONITORING
+ */
+static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *t) {
+ return ((grpc_chttp2_transport *)t)->ep;
+}
+
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
"chttp2",
init_stream,
@@ -2307,7 +2319,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
perform_transport_op,
destroy_stream,
destroy_transport,
- chttp2_get_peer};
+ chttp2_get_peer,
+ chttp2_get_endpoint};
grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index e1202e2ca5..4fb5dc7bd2 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -471,7 +471,8 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
grpc_mdstr_as_c_string(md->value));
*cached_timeout = gpr_inf_future(GPR_TIMESPAN);
}
- grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
+ cached_timeout =
+ grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
}
grpc_chttp2_incoming_metadata_buffer_set_deadline(
&s->metadata_buffer[0],
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 2ab9a8c19d..a4c110101e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -42,6 +42,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
@@ -1095,6 +1096,11 @@ static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
return NULL;
}
+static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *gt) {
+ return NULL;
+}
+
static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {}
@@ -1107,4 +1113,5 @@ const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
perform_op,
destroy_stream,
destroy_transport,
- get_peer};
+ get_peer,
+ get_endpoint};
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index c956b0febc..1a099ac437 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -300,6 +300,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
}
}
+grpc_channel_args *grpc_channel_args_set_socket_mutator(
+ grpc_channel_args *a, grpc_socket_mutator *mutator) {
+ grpc_arg tmp = grpc_socket_mutator_to_arg(mutator);
+ return grpc_channel_args_copy_and_add(a, &tmp, 1);
+}
+
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b) {
int c = GPR_ICMP(a->num_args, b->num_args);
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 5933133413..5c7d31f8bb 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -36,6 +36,7 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include "src/core/lib/iomgr/socket_mutator.h"
// Channel args are intentionally immutable, to avoid the need for locking.
@@ -101,6 +102,13 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
+/** Returns a channel arg instance with socket mutator added. The socket mutator
+ * will perform its mutate_fd method on all file descriptors used by the
+ * channel.
+ * If \a a is non-MULL, its args are copied. */
+grpc_channel_args *grpc_channel_args_set_socket_mutator(
+ grpc_channel_args *a, grpc_socket_mutator *mutator);
+
/** Returns the value of argument \a name from \a args, or NULL if not found. */
const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
const char *name);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 947dff4cb3..285ac178af 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -162,7 +162,8 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) {
+ grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline,
+ grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -179,7 +180,7 @@ grpc_error *grpc_call_stack_init(
/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
- args.start_time = gpr_now(GPR_CLOCK_MONOTONIC);
+ args.start_time = start_time;
for (i = 0; i < count; i++) {
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index c3b662c969..004643d45f 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -231,7 +231,8 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack);
+ grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline,
+ grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index dd496ee095..96188f01c4 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -83,7 +83,8 @@ typedef struct channel_data {
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
-static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *compression_md_filter(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 63afa4bf09..27064e0fb8 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -36,6 +36,7 @@
#include <grpc/support/string_util.h>
#include <string.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -57,6 +58,7 @@ typedef struct call_data {
grpc_linked_mdelem payload_bin;
grpc_metadata_batch *recv_initial_metadata;
+ grpc_metadata_batch *recv_trailing_metadata;
uint8_t *payload_bytes;
/* Vars to read data off of send_message */
@@ -70,14 +72,16 @@ typedef struct call_data {
bool send_message_blocked;
/** Closure to call when finished with the hc_on_recv hook */
- grpc_closure *on_done_recv;
+ grpc_closure *on_done_recv_initial_metadata;
+ grpc_closure *on_done_recv_trailing_metadata;
grpc_closure *on_complete;
grpc_closure *post_send;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
- grpc_closure hc_on_recv;
+ grpc_closure hc_on_recv_initial_metadata;
+ grpc_closure hc_on_recv_trailing_metadata;
grpc_closure hc_on_complete;
grpc_closure got_slice;
grpc_closure send_done;
@@ -89,13 +93,9 @@ typedef struct channel_data {
size_t max_payload_size_for_get;
} channel_data;
-typedef struct {
- grpc_call_element *elem;
- grpc_exec_ctx *exec_ctx;
-} client_recv_filter_args;
-
-static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
- client_recv_filter_args *a = user_data;
+static grpc_mdelem *client_recv_filter(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
if (md == GRPC_MDELEM_STATUS_200) {
return NULL;
} else if (md->key == GRPC_MDSTR_STATUS) {
@@ -104,9 +104,20 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
grpc_mdstr_as_c_string(md->value));
grpc_slice message = grpc_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_close_with_message(a->exec_ctx, a->elem,
+ grpc_call_element_send_close_with_message(exec_ctx, elem,
GRPC_STATUS_CANCELLED, &message);
return NULL;
+ } else if (md->key == GRPC_MDSTR_GRPC_MESSAGE) {
+ grpc_slice pct_decoded_msg =
+ grpc_permissive_percent_decode_slice(md->value->slice);
+ if (grpc_slice_is_equivalent(pct_decoded_msg, md->value->slice)) {
+ grpc_slice_unref(pct_decoded_msg);
+ return md;
+ } else {
+ return grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_mdstr_from_slice(exec_ctx, pct_decoded_msg));
+ }
} else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
return NULL;
} else if (md->key == GRPC_MDSTR_CONTENT_TYPE) {
@@ -130,16 +141,24 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *error) {
+static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- client_recv_filter_args a;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata,
- client_recv_filter, &a);
- calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error);
+ client_recv_filter, elem);
+ grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata,
+ GRPC_ERROR_REF(error));
+}
+
+static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_error *error) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ grpc_metadata_batch_filter(exec_ctx, calld->recv_trailing_metadata,
+ client_recv_filter, elem);
+ grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata,
+ GRPC_ERROR_REF(error));
}
static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
@@ -160,7 +179,8 @@ static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
}
-static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *client_strip_filter(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_mdelem *md) {
/* eat the things we'd like to set ourselves */
if (md->key == GRPC_MDSTR_METHOD) return NULL;
if (md->key == GRPC_MDSTR_SCHEME) return NULL;
@@ -282,8 +302,15 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->hc_on_recv;
+ calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hc_on_recv_initial_metadata;
+ }
+
+ if (op->recv_trailing_metadata != NULL) {
+ /* substitute our callback for the higher callback */
+ calld->recv_trailing_metadata = op->recv_trailing_metadata;
+ calld->on_done_recv_trailing_metadata = op->on_complete;
+ op->on_complete = &calld->hc_on_recv_trailing_metadata;
}
}
@@ -309,11 +336,15 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- calld->on_done_recv = NULL;
+ calld->on_done_recv_initial_metadata = NULL;
+ calld->on_done_recv_trailing_metadata = NULL;
calld->on_complete = NULL;
calld->payload_bytes = NULL;
grpc_slice_buffer_init(&calld->slices);
- grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+ grpc_closure_init(&calld->hc_on_recv_initial_metadata,
+ hc_on_recv_initial_metadata, elem);
+ grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
+ hc_on_recv_trailing_metadata, elem);
grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem);
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index cb0fe230cf..db39020dca 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -37,6 +37,7 @@
#include <grpc/support/log.h>
#include <string.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -82,14 +83,28 @@ typedef struct call_data {
typedef struct channel_data { uint8_t unused; } channel_data;
-typedef struct {
- grpc_call_element *elem;
- grpc_exec_ctx *exec_ctx;
-} server_filter_args;
+static grpc_mdelem *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data,
+ grpc_mdelem *md) {
+ if (md->key == GRPC_MDSTR_GRPC_MESSAGE) {
+ grpc_slice pct_encoded_msg = grpc_percent_encode_slice(
+ md->value->slice, grpc_compatible_percent_encoding_unreserved_bytes);
+ if (grpc_slice_is_equivalent(pct_encoded_msg, md->value->slice)) {
+ grpc_slice_unref_internal(exec_ctx, pct_encoded_msg);
+ return md;
+ } else {
+ return grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_mdstr_from_slice(exec_ctx, pct_encoded_msg));
+ }
+ } else {
+ return md;
+ }
+}
-static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
- server_filter_args *a = user_data;
- grpc_call_element *elem = a->elem;
+static grpc_mdelem *server_filter(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
/* Check if it is one of the headers we care about. */
@@ -140,7 +155,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
/* swallow it and error everything out. */
/* TODO(klempner): We ought to generate more descriptive error messages
on the wire here. */
- grpc_call_element_send_cancel(a->exec_ctx, elem);
+ grpc_call_element_send_cancel(exec_ctx, elem);
return NULL;
} else if (md->key == GRPC_MDSTR_PATH) {
if (calld->seen_path) {
@@ -156,7 +171,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
/* translate host to :authority since :authority may be
omitted */
grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
- a->exec_ctx, GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value));
+ exec_ctx, GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value));
calld->seen_authority = 1;
return authority;
} else if (md->key == GRPC_MDSTR_GRPC_PAYLOAD_BIN) {
@@ -178,11 +193,8 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (err == GRPC_ERROR_NONE) {
- server_filter_args a;
- a.elem = elem;
- a.exec_ctx = exec_ctx;
grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata,
- server_filter, &a);
+ server_filter, elem);
/* Have we seen the required http2 transport headers?
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
@@ -256,7 +268,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
-static void hs_mutate_op(grpc_call_element *elem,
+static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -292,6 +304,11 @@ static void hs_mutate_op(grpc_call_element *elem,
op->on_complete = &calld->hs_on_complete;
}
}
+
+ if (op->send_trailing_metadata) {
+ grpc_metadata_batch_filter(exec_ctx, op->send_trailing_metadata,
+ server_filter_outgoing_metadata, elem);
+ }
}
static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -299,7 +316,7 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
GPR_TIMER_BEGIN("hs_start_transport_op", 0);
- hs_mutate_op(elem, op);
+ hs_mutate_op(exec_ctx, elem, op);
grpc_call_next_op(exec_ctx, elem, op);
GPR_TIMER_END("hs_start_transport_op", 0);
}
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index a4b6668276..2d300f4560 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -66,6 +66,8 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
return ep->vtable->get_peer(ep);
}
+int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); }
+
grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
return ep->vtable->get_workqueue(ep);
}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index bf211ca16a..1609b64f2b 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -61,6 +61,7 @@ struct grpc_endpoint_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
+ int (*get_fd)(grpc_endpoint *ep);
};
/* When data is available on the connection, calls the callback with slices.
@@ -73,6 +74,10 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
+/* Get the file descriptor used by \a ep. Return -1 if \a ep is not using an fd.
+ */
+int grpc_endpoint_get_fd(grpc_endpoint *ep);
+
/* Retrieve a reference to the workqueue associated with this endpoint */
grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index db51ec4939..91041a7c28 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -163,7 +163,7 @@ static void fd_global_shutdown(void);
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
-#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
+#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
deleted file mode 100644
index bf51404203..0000000000
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ /dev/null
@@ -1,2076 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-/* This file will be removed shortly: it's here to keep refactoring
- * steps simple and auditable.
- * It's the combination of the old files:
- * - fd_posix.{h,c}
- * - pollset_posix.{h,c}
- * - pullset_multipoller_with_{poll,epoll}.{h,c}
- * The new version will be split into:
- * - ev_poll_posix.{h,c}
- * - ev_epoll_posix.{h,c}
- */
-
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_POSIX_SOCKET
-
-#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
-
-#include <assert.h>
-#include <errno.h>
-#include <poll.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <unistd.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/tls.h>
-#include <grpc/support/useful.h>
-
-#include "src/core/lib/iomgr/iomgr_internal.h"
-#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/block_annotate.h"
-
-/*******************************************************************************
- * FD declarations
- */
-
-typedef struct grpc_fd_watcher {
- struct grpc_fd_watcher *next;
- struct grpc_fd_watcher *prev;
- grpc_pollset *pollset;
- grpc_pollset_worker *worker;
- grpc_fd *fd;
-} grpc_fd_watcher;
-
-struct grpc_fd {
- int fd;
- /* refst format:
- bit0: 1=active/0=orphaned
- bit1-n: refcount
- meaning that mostly we ref by two to avoid altering the orphaned bit,
- and just unref by 1 when we're ready to flag the object as orphaned */
- gpr_atm refst;
-
- gpr_mu mu;
- int shutdown;
- int closed;
- int released;
-
- /* The watcher list.
-
- The following watcher related fields are protected by watcher_mu.
-
- An fd_watcher is an ephemeral object created when an fd wants to
- begin polling, and destroyed after the poll.
-
- It denotes the fd's interest in whether to read poll or write poll
- or both or neither on this fd.
-
- If a watcher is asked to poll for reads or writes, the read_watcher
- or write_watcher fields are set respectively. A watcher may be asked
- to poll for both, in which case both fields will be set.
-
- read_watcher and write_watcher may be NULL if no watcher has been
- asked to poll for reads or writes.
-
- If an fd_watcher is not asked to poll for reads or writes, it's added
- to a linked list of inactive watchers, rooted at inactive_watcher_root.
- If at a later time there becomes need of a poller to poll, one of
- the inactive pollers may be kicked out of their poll loops to take
- that responsibility. */
- grpc_fd_watcher inactive_watcher_root;
- grpc_fd_watcher *read_watcher;
- grpc_fd_watcher *write_watcher;
-
- grpc_closure *read_closure;
- grpc_closure *write_closure;
-
- struct grpc_fd *freelist_next;
-
- grpc_closure *on_done_closure;
-
- grpc_iomgr_object iomgr_object;
-
- /* The pollset that last noticed and notified that the fd is readable */
- grpc_pollset *read_notifier_pollset;
-};
-
-/* Begin polling on an fd.
- Registers that the given pollset is interested in this fd - so that if read
- or writability interest changes, the pollset can be kicked to pick up that
- new interest.
- Return value is:
- (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
- i.e. a combination of read_mask and write_mask determined by the fd's current
- interest in said events.
- Polling strategies that do not need to alter their behavior depending on the
- fd's current interest (such as epoll) do not need to call this function.
- MUST NOT be called with a pollset lock taken */
-static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- grpc_pollset_worker *worker, uint32_t read_mask,
- uint32_t write_mask, grpc_fd_watcher *rec);
-/* Complete polling previously started with fd_begin_poll
- MUST NOT be called with a pollset lock taken
- if got_read or got_write are 1, also does the become_{readable,writable} as
- appropriate. */
-static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
- int got_read, int got_write,
- grpc_pollset *read_notifier_pollset);
-
-/* Return 1 if this fd is orphaned, 0 otherwise */
-static bool fd_is_orphaned(grpc_fd *fd);
-
-/* Reference counting for fds */
-/*#define GRPC_FD_REF_COUNT_DEBUG*/
-#ifdef GRPC_FD_REF_COUNT_DEBUG
-static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
-static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
- int line);
-#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
-#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
-#else
-static void fd_ref(grpc_fd *fd);
-static void fd_unref(grpc_fd *fd);
-#define GRPC_FD_REF(fd, reason) fd_ref(fd)
-#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
-#endif
-
-static void fd_global_init(void);
-static void fd_global_shutdown(void);
-
-#define CLOSURE_NOT_READY ((grpc_closure *)0)
-#define CLOSURE_READY ((grpc_closure *)1)
-
-/*******************************************************************************
- * pollset declarations
- */
-
-typedef struct grpc_pollset_vtable grpc_pollset_vtable;
-
-typedef struct grpc_cached_wakeup_fd {
- grpc_wakeup_fd fd;
- struct grpc_cached_wakeup_fd *next;
-} grpc_cached_wakeup_fd;
-
-struct grpc_pollset_worker {
- grpc_cached_wakeup_fd *wakeup_fd;
- int reevaluate_polling_on_wakeup;
- int kicked_specifically;
- struct grpc_pollset_worker *next;
- struct grpc_pollset_worker *prev;
-};
-
-struct grpc_pollset {
- /* pollsets under posix can mutate representation as fds are added and
- removed.
- For example, we may choose a poll() based implementation on linux for
- few fds, and an epoll() based implementation for many fds */
- const grpc_pollset_vtable *vtable;
- gpr_mu mu;
- grpc_pollset_worker root_worker;
- int in_flight_cbs;
- int shutting_down;
- int called_shutdown;
- int kicked_without_pollers;
- grpc_closure *shutdown_done;
- grpc_closure_list idle_jobs;
- union {
- int fd;
- void *ptr;
- } data;
- /* Local cache of eventfds for workers */
- grpc_cached_wakeup_fd *local_wakeup_cache;
-};
-
-struct grpc_pollset_vtable {
- void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- struct grpc_fd *fd, int and_unlock_pollset);
- grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
- grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now);
- void (*finish_shutdown)(grpc_pollset *pollset);
- void (*destroy)(grpc_pollset *pollset);
-};
-
-/* Add an fd to a pollset */
-static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- struct grpc_fd *fd);
-
-static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set, grpc_fd *fd);
-
-/* Convert a timespec to milliseconds:
- - very small or negative poll times are clamped to zero to do a
- non-blocking poll (which becomes spin polling)
- - other small values are rounded up to one millisecond
- - longer than a millisecond polls are rounded up to the next nearest
- millisecond to avoid spinning
- - infinite timeouts are converted to -1 */
-static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
- gpr_timespec now);
-
-/* Allow kick to wakeup the currently polling worker */
-#define GRPC_POLLSET_CAN_KICK_SELF 1
-/* Force the wakee to repoll when awoken */
-#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
-/* As per pollset_kick, with an extended set of flags (defined above)
- -- mostly for fd_posix's use. */
-static grpc_error *pollset_kick_ext(grpc_pollset *p,
- grpc_pollset_worker *specific_worker,
- uint32_t flags) GRPC_MUST_USE_RESULT;
-
-/* turn a pollset into a multipoller: platform specific */
-typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
- struct grpc_fd **fds,
- size_t fd_count);
-static platform_become_multipoller_type platform_become_multipoller;
-
-/* Return 1 if the pollset has active threads in pollset_work (pollset must
- * be locked) */
-static int pollset_has_workers(grpc_pollset *pollset);
-
-static void remove_fd_from_all_epoll_sets(int fd);
-
-/*******************************************************************************
- * pollset_set definitions
- */
-
-struct grpc_pollset_set {
- gpr_mu mu;
-
- size_t pollset_count;
- size_t pollset_capacity;
- grpc_pollset **pollsets;
-
- size_t pollset_set_count;
- size_t pollset_set_capacity;
- struct grpc_pollset_set **pollset_sets;
-
- size_t fd_count;
- size_t fd_capacity;
- grpc_fd **fds;
-};
-
-/*******************************************************************************
- * fd_posix.c
- */
-
-/* We need to keep a freelist not because of any concerns of malloc performance
- * but instead so that implementations with multiple threads in (for example)
- * epoll_wait deal with the race between pollset removal and incoming poll
- * notifications.
- *
- * The problem is that the poller ultimately holds a reference to this
- * object, so it is very difficult to know when is safe to free it, at least
- * without some expensive synchronization.
- *
- * If we keep the object freelisted, in the worst case losing this race just
- * becomes a spurious read notification on a reused fd.
- */
-/* TODO(klempner): We could use some form of polling generation count to know
- * when these are safe to free. */
-/* TODO(klempner): Consider disabling freelisting if we don't have multiple
- * threads in poll on the same fd */
-/* TODO(klempner): Batch these allocations to reduce fragmentation */
-static grpc_fd *fd_freelist = NULL;
-static gpr_mu fd_freelist_mu;
-
-static void freelist_fd(grpc_fd *fd) {
- gpr_mu_lock(&fd_freelist_mu);
- fd->freelist_next = fd_freelist;
- fd_freelist = fd;
- grpc_iomgr_unregister_object(&fd->iomgr_object);
- gpr_mu_unlock(&fd_freelist_mu);
-}
-
-static grpc_fd *alloc_fd(int fd) {
- grpc_fd *r = NULL;
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist != NULL) {
- r = fd_freelist;
- fd_freelist = fd_freelist->freelist_next;
- }
- gpr_mu_unlock(&fd_freelist_mu);
- if (r == NULL) {
- r = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_init(&r->mu);
- }
-
- gpr_mu_lock(&r->mu);
- gpr_atm_rel_store(&r->refst, 1);
- r->shutdown = 0;
- r->read_closure = CLOSURE_NOT_READY;
- r->write_closure = CLOSURE_NOT_READY;
- r->fd = fd;
- r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
- &r->inactive_watcher_root;
- r->freelist_next = NULL;
- r->read_watcher = r->write_watcher = NULL;
- r->on_done_closure = NULL;
- r->closed = 0;
- r->released = 0;
- r->read_notifier_pollset = NULL;
- gpr_mu_unlock(&r->mu);
- return r;
-}
-
-static void destroy(grpc_fd *fd) {
- gpr_mu_destroy(&fd->mu);
- gpr_free(fd);
-}
-
-#ifdef GRPC_FD_REF_COUNT_DEBUG
-#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
-#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
-static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
- int line) {
- gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
- gpr_atm_no_barrier_load(&fd->refst),
- gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
-#else
-#define REF_BY(fd, n, reason) ref_by(fd, n)
-#define UNREF_BY(fd, n, reason) unref_by(fd, n)
-static void ref_by(grpc_fd *fd, int n) {
-#endif
- GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
-}
-
-#ifdef GRPC_FD_REF_COUNT_DEBUG
-static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
- int line) {
- gpr_atm old;
- gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
- gpr_atm_no_barrier_load(&fd->refst),
- gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
-#else
-static void unref_by(grpc_fd *fd, int n) {
- gpr_atm old;
-#endif
- old = gpr_atm_full_fetch_add(&fd->refst, -n);
- if (old == n) {
- freelist_fd(fd);
- } else {
- GPR_ASSERT(old > n);
- }
-}
-
-static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
-
-static void fd_global_shutdown(void) {
- gpr_mu_lock(&fd_freelist_mu);
- gpr_mu_unlock(&fd_freelist_mu);
- while (fd_freelist != NULL) {
- grpc_fd *fd = fd_freelist;
- fd_freelist = fd_freelist->freelist_next;
- destroy(fd);
- }
- gpr_mu_destroy(&fd_freelist_mu);
-}
-
-static grpc_fd *fd_create(int fd, const char *name) {
- grpc_fd *r = alloc_fd(fd);
- char *name2;
- gpr_asprintf(&name2, "%s fd=%d", name, fd);
- grpc_iomgr_register_object(&r->iomgr_object, name2);
- gpr_free(name2);
-#ifdef GRPC_FD_REF_COUNT_DEBUG
- gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
-#endif
- return r;
-}
-
-static bool fd_is_orphaned(grpc_fd *fd) {
- return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
-}
-
-static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
- gpr_mu_lock(&watcher->pollset->mu);
- GPR_ASSERT(watcher->worker);
- grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
- GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
- gpr_mu_unlock(&watcher->pollset->mu);
- return err;
-}
-
-static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
- if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- pollset_kick_locked(fd->inactive_watcher_root.next);
- } else if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher);
- } else if (fd->write_watcher) {
- pollset_kick_locked(fd->write_watcher);
- }
-}
-
-static void wake_all_watchers_locked(grpc_fd *fd) {
- grpc_fd_watcher *watcher;
- for (watcher = fd->inactive_watcher_root.next;
- watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- pollset_kick_locked(watcher);
- }
- if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher);
- }
- if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- pollset_kick_locked(fd->write_watcher);
- }
-}
-
-static int has_watchers(grpc_fd *fd) {
- return fd->read_watcher != NULL || fd->write_watcher != NULL ||
- fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
-}
-
-static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- fd->closed = 1;
- if (!fd->released) {
- close(fd->fd);
- } else {
- remove_fd_from_all_epoll_sets(fd->fd);
- }
- grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
-}
-
-static int fd_wrapped_fd(grpc_fd *fd) {
- if (fd->released || fd->closed) {
- return -1;
- } else {
- return fd->fd;
- }
-}
-
-static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *on_done, int *release_fd,
- const char *reason) {
- fd->on_done_closure = on_done;
- fd->released = release_fd != NULL;
- if (!fd->released) {
- shutdown(fd->fd, SHUT_RDWR);
- } else {
- *release_fd = fd->fd;
- }
- gpr_mu_lock(&fd->mu);
- REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
- if (!has_watchers(fd)) {
- close_fd_locked(exec_ctx, fd);
- } else {
- wake_all_watchers_locked(fd);
- }
- gpr_mu_unlock(&fd->mu);
- UNREF_BY(fd, 2, reason); /* drop the reference */
-}
-
-/* increment refcount by two to avoid changing the orphan bit */
-#ifdef GRPC_FD_REF_COUNT_DEBUG
-static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
- int line) {
- ref_by(fd, 2, reason, file, line);
-}
-
-static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
- int line) {
- unref_by(fd, 2, reason, file, line);
-}
-#else
-static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
-
-static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
-#endif
-
-static grpc_error *fd_shutdown_error(bool shutdown) {
- if (!shutdown) {
- return GRPC_ERROR_NONE;
- } else {
- return GRPC_ERROR_CREATE("FD shutdown");
- }
-}
-
-static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure **st, grpc_closure *closure) {
- if (fd->shutdown) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
- NULL);
- } else if (*st == CLOSURE_NOT_READY) {
- /* not ready ==> switch to a waiting state by setting the closure */
- *st = closure;
- } else if (*st == CLOSURE_READY) {
- /* already ready ==> queue the closure to run immediately */
- *st = CLOSURE_NOT_READY;
- grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
- NULL);
- maybe_wake_one_watcher_locked(fd);
- } else {
- /* upcallptr was set to a different closure. This is an error! */
- gpr_log(GPR_ERROR,
- "User called a notify_on function with a previous callback still "
- "pending");
- abort();
- }
-}
-
-/* returns 1 if state becomes not ready */
-static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure **st) {
- if (*st == CLOSURE_READY) {
- /* duplicate ready ==> ignore */
- return 0;
- } else if (*st == CLOSURE_NOT_READY) {
- /* not ready, and not waiting ==> flag ready */
- *st = CLOSURE_READY;
- return 0;
- } else {
- /* waiting ==> queue closure */
- grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
- *st = CLOSURE_NOT_READY;
- return 1;
- }
-}
-
-static void set_read_notifier_pollset_locked(
- grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
- fd->read_notifier_pollset = read_notifier_pollset;
-}
-
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- gpr_mu_lock(&fd->mu);
- /* only shutdown once */
- if (!fd->shutdown) {
- fd->shutdown = 1;
- /* signal read/write closed to OS so that future operations fail */
- shutdown(fd->fd, SHUT_RDWR);
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
- }
- gpr_mu_unlock(&fd->mu);
-}
-
-static bool fd_is_shutdown(grpc_fd *fd) {
- gpr_mu_lock(&fd->mu);
- bool r = fd->shutdown;
- gpr_mu_unlock(&fd->mu);
- return r;
-}
-
-static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *closure) {
- gpr_mu_lock(&fd->mu);
- notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
- gpr_mu_unlock(&fd->mu);
-}
-
-static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *closure) {
- gpr_mu_lock(&fd->mu);
- notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
- gpr_mu_unlock(&fd->mu);
-}
-
-/* Return the read-notifier pollset */
-static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
- grpc_fd *fd) {
- grpc_pollset *notifier = NULL;
-
- gpr_mu_lock(&fd->mu);
- notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->mu);
-
- return notifier;
-}
-
-static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- grpc_pollset_worker *worker, uint32_t read_mask,
- uint32_t write_mask, grpc_fd_watcher *watcher) {
- uint32_t mask = 0;
- grpc_closure *cur;
- int requested;
- /* keep track of pollers that have requested our events, in case they change
- */
- GRPC_FD_REF(fd, "poll");
-
- gpr_mu_lock(&fd->mu);
-
- /* if we are shutdown, then don't add to the watcher set */
- if (fd->shutdown) {
- watcher->fd = NULL;
- watcher->pollset = NULL;
- watcher->worker = NULL;
- gpr_mu_unlock(&fd->mu);
- GRPC_FD_UNREF(fd, "poll");
- return 0;
- }
-
- /* if there is nobody polling for read, but we need to, then start doing so */
- cur = fd->read_closure;
- requested = cur != CLOSURE_READY;
- if (read_mask && fd->read_watcher == NULL && requested) {
- fd->read_watcher = watcher;
- mask |= read_mask;
- }
- /* if there is nobody polling for write, but we need to, then start doing so
- */
- cur = fd->write_closure;
- requested = cur != CLOSURE_READY;
- if (write_mask && fd->write_watcher == NULL && requested) {
- fd->write_watcher = watcher;
- mask |= write_mask;
- }
- /* if not polling, remember this watcher in case we need someone to later */
- if (mask == 0 && worker != NULL) {
- watcher->next = &fd->inactive_watcher_root;
- watcher->prev = watcher->next->prev;
- watcher->next->prev = watcher->prev->next = watcher;
- }
- watcher->pollset = pollset;
- watcher->worker = worker;
- watcher->fd = fd;
- gpr_mu_unlock(&fd->mu);
-
- return mask;
-}
-
-static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
- int got_read, int got_write,
- grpc_pollset *read_notifier_pollset) {
- int was_polling = 0;
- int kick = 0;
- grpc_fd *fd = watcher->fd;
-
- if (fd == NULL) {
- return;
- }
-
- gpr_mu_lock(&fd->mu);
-
- if (watcher == fd->read_watcher) {
- /* remove read watcher, kick if we still need a read */
- was_polling = 1;
- if (!got_read) {
- kick = 1;
- }
- fd->read_watcher = NULL;
- }
- if (watcher == fd->write_watcher) {
- /* remove write watcher, kick if we still need a write */
- was_polling = 1;
- if (!got_write) {
- kick = 1;
- }
- fd->write_watcher = NULL;
- }
- if (!was_polling && watcher->worker != NULL) {
- /* remove from inactive list */
- watcher->next->prev = watcher->prev;
- watcher->prev->next = watcher->next;
- }
- if (got_read) {
- if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
- kick = 1;
- }
-
- if (read_notifier_pollset != NULL) {
- set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
- }
- }
- if (got_write) {
- if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
- kick = 1;
- }
- }
- if (kick) {
- maybe_wake_one_watcher_locked(fd);
- }
- if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
- close_fd_locked(exec_ctx, fd);
- }
- gpr_mu_unlock(&fd->mu);
-
- GRPC_FD_UNREF(fd, "poll");
-}
-
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
-
-/*******************************************************************************
- * pollset_posix.c
- */
-
-GPR_TLS_DECL(g_current_thread_poller);
-GPR_TLS_DECL(g_current_thread_worker);
-
-/** The alarm system needs to be able to wakeup 'some poller' sometimes
- * (specifically when a new alarm needs to be triggered earlier than the next
- * alarm 'epoch').
- * This wakeup_fd gives us something to alert on when such a case occurs. */
-grpc_wakeup_fd grpc_global_wakeup_fd;
-
-static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->prev->next = worker->next;
- worker->next->prev = worker->prev;
-}
-
-static int pollset_has_workers(grpc_pollset *p) {
- return p->root_worker.next != &p->root_worker;
-}
-
-static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
- if (pollset_has_workers(p)) {
- grpc_pollset_worker *w = p->root_worker.next;
- remove_worker(p, w);
- return w;
- } else {
- return NULL;
- }
-}
-
-static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->next = &p->root_worker;
- worker->prev = worker->next->prev;
- worker->prev->next = worker->next->prev = worker;
-}
-
-static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->prev = &p->root_worker;
- worker->next = worker->prev->next;
- worker->prev->next = worker->next->prev = worker;
-}
-
-static void kick_append_error(grpc_error **composite, grpc_error *error) {
- if (error == GRPC_ERROR_NONE) return;
- if (*composite == GRPC_ERROR_NONE) {
- *composite = GRPC_ERROR_CREATE("Kick Failure");
- }
- *composite = grpc_error_add_child(*composite, error);
-}
-
-static grpc_error *pollset_kick_ext(grpc_pollset *p,
- grpc_pollset_worker *specific_worker,
- uint32_t flags) {
- GPR_TIMER_BEGIN("pollset_kick_ext", 0);
- grpc_error *error = GRPC_ERROR_NONE;
-
- /* pollset->mu already held */
- if (specific_worker != NULL) {
- if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
- GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
- GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
- for (specific_worker = p->root_worker.next;
- specific_worker != &p->root_worker;
- specific_worker = specific_worker->next) {
- kick_append_error(
- &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
- }
- p->kicked_without_pollers = true;
- GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
- } else if (gpr_tls_get(&g_current_thread_worker) !=
- (intptr_t)specific_worker) {
- GPR_TIMER_MARK("different_thread_worker", 0);
- if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
- specific_worker->reevaluate_polling_on_wakeup = true;
- }
- specific_worker->kicked_specifically = true;
- kick_append_error(&error,
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
- } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
- GPR_TIMER_MARK("kick_yoself", 0);
- if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
- specific_worker->reevaluate_polling_on_wakeup = true;
- }
- specific_worker->kicked_specifically = true;
- kick_append_error(&error,
- grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
- }
- } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
- GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
- GPR_TIMER_MARK("kick_anonymous", 0);
- specific_worker = pop_front_worker(p);
- if (specific_worker != NULL) {
- if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
- GPR_TIMER_MARK("kick_anonymous_not_self", 0);
- push_back_worker(p, specific_worker);
- specific_worker = pop_front_worker(p);
- if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
- gpr_tls_get(&g_current_thread_worker) ==
- (intptr_t)specific_worker) {
- push_back_worker(p, specific_worker);
- specific_worker = NULL;
- }
- }
- if (specific_worker != NULL) {
- GPR_TIMER_MARK("finally_kick", 0);
- push_back_worker(p, specific_worker);
- kick_append_error(
- &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
- }
- } else {
- GPR_TIMER_MARK("kicked_no_pollers", 0);
- p->kicked_without_pollers = true;
- }
- }
-
- GPR_TIMER_END("pollset_kick_ext", 0);
- return error;
-}
-
-static grpc_error *pollset_kick(grpc_pollset *p,
- grpc_pollset_worker *specific_worker) {
- return pollset_kick_ext(p, specific_worker, 0);
-}
-
-/* global state management */
-
-static grpc_error *pollset_global_init(void) {
- gpr_tls_init(&g_current_thread_poller);
- gpr_tls_init(&g_current_thread_worker);
- return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
-}
-
-static void pollset_global_shutdown(void) {
- grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
- gpr_tls_destroy(&g_current_thread_poller);
- gpr_tls_destroy(&g_current_thread_worker);
-}
-
-static grpc_error *kick_poller(void) {
- return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
-}
-
-/* main interface */
-
-static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
-
-static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- gpr_mu_init(&pollset->mu);
- *mu = &pollset->mu;
- pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
- pollset->in_flight_cbs = 0;
- pollset->shutting_down = 0;
- pollset->called_shutdown = 0;
- pollset->kicked_without_pollers = 0;
- pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
- pollset->local_wakeup_cache = NULL;
- pollset->kicked_without_pollers = 0;
- become_basic_pollset(pollset, NULL);
-}
-
-static void pollset_destroy(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->in_flight_cbs == 0);
- GPR_ASSERT(!pollset_has_workers(pollset));
- GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
- pollset->vtable->destroy(pollset);
- while (pollset->local_wakeup_cache) {
- grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
- grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
- gpr_free(pollset->local_wakeup_cache);
- pollset->local_wakeup_cache = next;
- }
- gpr_mu_destroy(&pollset->mu);
-}
-
-static void pollset_reset(grpc_pollset *pollset) {
- GPR_ASSERT(pollset->shutting_down);
- GPR_ASSERT(pollset->in_flight_cbs == 0);
- GPR_ASSERT(!pollset_has_workers(pollset));
- GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
- pollset->vtable->destroy(pollset);
- pollset->shutting_down = 0;
- pollset->called_shutdown = 0;
- pollset->kicked_without_pollers = 0;
- become_basic_pollset(pollset, NULL);
-}
-
-static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_fd *fd) {
- gpr_mu_lock(&pollset->mu);
- pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
-/* the following (enabled only in debug) will reacquire and then release
- our lock - meaning that if the unlocking flag passed to add_fd above is
- not respected, the code will deadlock (in a way that we have a chance of
- debugging) */
-#ifndef NDEBUG
- gpr_mu_lock(&pollset->mu);
- gpr_mu_unlock(&pollset->mu);
-#endif
-}
-
-static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
- pollset->vtable->finish_shutdown(pollset);
- grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
-}
-
-static void work_combine_error(grpc_error **composite, grpc_error *error) {
- if (error == GRPC_ERROR_NONE) return;
- if (*composite == GRPC_ERROR_NONE) {
- *composite = GRPC_ERROR_CREATE("pollset_work");
- }
- *composite = grpc_error_add_child(*composite, error);
-}
-
-static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker_hdl,
- gpr_timespec now, gpr_timespec deadline) {
- grpc_pollset_worker worker;
- *worker_hdl = &worker;
- grpc_error *error = GRPC_ERROR_NONE;
-
- /* pollset->mu already held */
- int added_worker = 0;
- int locked = 1;
- int queued_work = 0;
- int keep_polling = 0;
- GPR_TIMER_BEGIN("pollset_work", 0);
- /* this must happen before we (potentially) drop pollset->mu */
- worker.next = worker.prev = NULL;
- worker.reevaluate_polling_on_wakeup = 0;
- if (pollset->local_wakeup_cache != NULL) {
- worker.wakeup_fd = pollset->local_wakeup_cache;
- pollset->local_wakeup_cache = worker.wakeup_fd->next;
- } else {
- worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
- error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
- if (error != GRPC_ERROR_NONE) {
- return error;
- }
- }
- worker.kicked_specifically = 0;
- /* If there's work waiting for the pollset to be idle, and the
- pollset is idle, then do that work */
- if (!pollset_has_workers(pollset) &&
- !grpc_closure_list_empty(pollset->idle_jobs)) {
- GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
- goto done;
- }
- /* If we're shutting down then we don't execute any extended work */
- if (pollset->shutting_down) {
- GPR_TIMER_MARK("pollset_work.shutting_down", 0);
- goto done;
- }
- /* Give do_promote priority so we don't starve it out */
- if (pollset->in_flight_cbs) {
- GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
- gpr_mu_unlock(&pollset->mu);
- locked = 0;
- goto done;
- }
- /* Start polling, and keep doing so while we're being asked to
- re-evaluate our pollers (this allows poll() based pollers to
- ensure they don't miss wakeups) */
- keep_polling = 1;
- while (keep_polling) {
- keep_polling = 0;
- if (!pollset->kicked_without_pollers) {
- if (!added_worker) {
- push_front_worker(pollset, &worker);
- added_worker = 1;
- gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- }
- gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
- GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
- work_combine_error(&error,
- pollset->vtable->maybe_work_and_unlock(
- exec_ctx, pollset, &worker, deadline, now));
- GPR_TIMER_END("maybe_work_and_unlock", 0);
- locked = 0;
- gpr_tls_set(&g_current_thread_poller, 0);
- } else {
- GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
- pollset->kicked_without_pollers = 0;
- }
- /* Finished execution - start cleaning up.
- Note that we may arrive here from outside the enclosing while() loop.
- In that case we won't loop though as we haven't added worker to the
- worker list, which means nobody could ask us to re-evaluate polling). */
- done:
- if (!locked) {
- queued_work |= grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
- locked = 1;
- }
- /* If we're forced to re-evaluate polling (via pollset_kick with
- GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
- a loop */
- if (worker.reevaluate_polling_on_wakeup) {
- worker.reevaluate_polling_on_wakeup = 0;
- pollset->kicked_without_pollers = 0;
- if (queued_work || worker.kicked_specifically) {
- /* If there's queued work on the list, then set the deadline to be
- immediate so we get back out of the polling loop quickly */
- deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
- }
- keep_polling = 1;
- }
- }
- if (added_worker) {
- remove_worker(pollset, &worker);
- gpr_tls_set(&g_current_thread_worker, 0);
- }
- /* release wakeup fd to the local pool */
- worker.wakeup_fd->next = pollset->local_wakeup_cache;
- pollset->local_wakeup_cache = worker.wakeup_fd;
- /* check shutdown conditions */
- if (pollset->shutting_down) {
- if (pollset_has_workers(pollset)) {
- pollset_kick(pollset, NULL);
- } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
- pollset->called_shutdown = 1;
- gpr_mu_unlock(&pollset->mu);
- finish_shutdown(exec_ctx, pollset);
- grpc_exec_ctx_flush(exec_ctx);
- /* Continuing to access pollset here is safe -- it is the caller's
- * responsibility to not destroy when it has outstanding calls to
- * pollset_work.
- * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
- gpr_mu_lock(&pollset->mu);
- } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
- gpr_mu_unlock(&pollset->mu);
- grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
- }
- }
- *worker_hdl = NULL;
- GPR_TIMER_END("pollset_work", 0);
- return error;
-}
-
-static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_closure *closure) {
- GPR_ASSERT(!pollset->shutting_down);
- pollset->shutting_down = 1;
- pollset->shutdown_done = closure;
- pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
- if (!pollset_has_workers(pollset)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
- }
- if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
- !pollset_has_workers(pollset)) {
- pollset->called_shutdown = 1;
- finish_shutdown(exec_ctx, pollset);
- }
-}
-
-static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
- gpr_timespec now) {
- gpr_timespec timeout;
- static const int64_t max_spin_polling_us = 10;
- if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
- return -1;
- }
- if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
- max_spin_polling_us,
- GPR_TIMESPAN))) <= 0) {
- return 0;
- }
- timeout = gpr_time_sub(deadline, now);
- return gpr_time_to_millis(gpr_time_add(
- timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
-}
-
-/*
- * basic_pollset - a vtable that provides polling for zero or one file
- * descriptor via poll()
- */
-
-typedef struct grpc_unary_promote_args {
- const grpc_pollset_vtable *original_vtable;
- grpc_pollset *pollset;
- grpc_fd *fd;
- grpc_closure promotion_closure;
-} grpc_unary_promote_args;
-
-static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
- grpc_error *error) {
- grpc_unary_promote_args *up_args = args;
- const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
- grpc_pollset *pollset = up_args->pollset;
- grpc_fd *fd = up_args->fd;
-
- /*
- * This is quite tricky. There are a number of cases to keep in mind here:
- * 1. fd may have been orphaned
- * 2. The pollset may no longer be a unary poller (and we can't let case #1
- * leak to other pollset types!)
- * 3. pollset's fd (which may have changed) may have been orphaned
- * 4. The pollset may be shutting down.
- */
-
- gpr_mu_lock(&pollset->mu);
- /* First we need to ensure that nobody is polling concurrently */
- GPR_ASSERT(!pollset_has_workers(pollset));
-
- gpr_free(up_args);
- /* At this point the pollset may no longer be a unary poller. In that case
- * we should just call the right add function and be done. */
- /* TODO(klempner): If we're not careful this could cause infinite recursion.
- * That's not a problem for now because empty_pollset has a trivial poller
- * and we don't have any mechanism to unbecome multipoller. */
- pollset->in_flight_cbs--;
- if (pollset->shutting_down) {
- /* We don't care about this pollset anymore. */
- if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
- pollset->called_shutdown = 1;
- finish_shutdown(exec_ctx, pollset);
- }
- } else if (fd_is_orphaned(fd)) {
- /* Don't try to add it to anything, we'll drop our ref on it below */
- } else if (pollset->vtable != original_vtable) {
- pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
- } else if (fd != pollset->data.ptr) {
- grpc_fd *fds[2];
- fds[0] = pollset->data.ptr;
- fds[1] = fd;
-
- if (fds[0] && !fd_is_orphaned(fds[0])) {
- platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds));
- GRPC_FD_UNREF(fds[0], "basicpoll");
- } else {
- /* old fd is orphaned and we haven't cleaned it up until now, so remain a
- * unary poller */
- /* Note that it is possible that fds[1] is also orphaned at this point.
- * That's okay, we'll correct it at the next add or poll. */
- if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll");
- pollset->data.ptr = fd;
- GRPC_FD_REF(fd, "basicpoll");
- }
- }
-
- gpr_mu_unlock(&pollset->mu);
-
- /* Matching ref in basic_pollset_add_fd */
- GRPC_FD_UNREF(fd, "basicpoll_add");
-}
-
-static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_fd *fd, int and_unlock_pollset) {
- grpc_unary_promote_args *up_args;
- GPR_ASSERT(fd);
- if (fd == pollset->data.ptr) goto exit;
-
- if (!pollset_has_workers(pollset)) {
- /* Fast path -- no in flight cbs */
- /* TODO(klempner): Comment this out and fix any test failures or establish
- * they are due to timing issues */
- grpc_fd *fds[2];
- fds[0] = pollset->data.ptr;
- fds[1] = fd;
-
- if (fds[0] == NULL) {
- pollset->data.ptr = fd;
- GRPC_FD_REF(fd, "basicpoll");
- } else if (!fd_is_orphaned(fds[0])) {
- platform_become_multipoller(exec_ctx, pollset, fds, GPR_ARRAY_SIZE(fds));
- GRPC_FD_UNREF(fds[0], "basicpoll");
- } else {
- /* old fd is orphaned and we haven't cleaned it up until now, so remain a
- * unary poller */
- GRPC_FD_UNREF(fds[0], "basicpoll");
- pollset->data.ptr = fd;
- GRPC_FD_REF(fd, "basicpoll");
- }
- goto exit;
- }
-
- /* Now we need to promote. This needs to happen when we're not polling. Since
- * this may be called from poll, the wait needs to happen asynchronously. */
- GRPC_FD_REF(fd, "basicpoll_add");
- pollset->in_flight_cbs++;
- up_args = gpr_malloc(sizeof(*up_args));
- up_args->fd = fd;
- up_args->original_vtable = pollset->vtable;
- up_args->pollset = pollset;
- up_args->promotion_closure.cb = basic_do_promote;
- up_args->promotion_closure.cb_arg = up_args;
-
- grpc_closure_list_append(&pollset->idle_jobs, &up_args->promotion_closure,
- GRPC_ERROR_NONE);
- pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
-
-exit:
- if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
- }
-}
-
-static grpc_error *basic_pollset_maybe_work_and_unlock(
- grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now) {
-#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
-#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
-
- struct pollfd pfd[3];
- grpc_fd *fd;
- grpc_fd_watcher fd_watcher;
- int timeout;
- int r;
- nfds_t nfds;
- grpc_error *error = GRPC_ERROR_NONE;
-
- fd = pollset->data.ptr;
- if (fd && fd_is_orphaned(fd)) {
- GRPC_FD_UNREF(fd, "basicpoll");
- fd = pollset->data.ptr = NULL;
- }
- timeout = poll_deadline_to_millis_timeout(deadline, now);
- pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
- pfd[0].events = POLLIN;
- pfd[0].revents = 0;
- pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
- pfd[1].events = POLLIN;
- pfd[1].revents = 0;
- nfds = 2;
- if (fd) {
- pfd[2].fd = fd->fd;
- pfd[2].revents = 0;
- GRPC_FD_REF(fd, "basicpoll_begin");
- gpr_mu_unlock(&pollset->mu);
- pfd[2].events =
- (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher);
- if (pfd[2].events != 0) {
- nfds++;
- }
- } else {
- gpr_mu_unlock(&pollset->mu);
- }
-
- /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
- even going into the blocking annotation if possible */
- /* poll fd count (argument 2) is shortened by one if we have no events
- to poll on - such that it only includes the kicker */
- GPR_TIMER_BEGIN("poll", 0);
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- r = grpc_poll_function(pfd, nfds, timeout);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- GPR_TIMER_END("poll", 0);
-
- if (r < 0) {
- if (errno != EINTR) {
- work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
- }
- if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
- }
- } else if (r == 0) {
- if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
- }
- } else {
- if (pfd[0].revents & POLLIN_CHECK) {
- work_combine_error(&error,
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
- }
- if (pfd[1].revents & POLLIN_CHECK) {
- work_combine_error(&error,
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
- }
- if (nfds > 2) {
- fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
- pfd[2].revents & POLLOUT_CHECK, pollset);
- } else if (fd) {
- fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
- }
- }
-
- if (fd) {
- GRPC_FD_UNREF(fd, "basicpoll_begin");
- }
-
- return error;
-}
-
-static void basic_pollset_destroy(grpc_pollset *pollset) {
- if (pollset->data.ptr != NULL) {
- GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
- pollset->data.ptr = NULL;
- }
-}
-
-static const grpc_pollset_vtable basic_pollset = {
- basic_pollset_add_fd, basic_pollset_maybe_work_and_unlock,
- basic_pollset_destroy, basic_pollset_destroy};
-
-static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
- pollset->vtable = &basic_pollset;
- pollset->data.ptr = fd_or_null;
- if (fd_or_null != NULL) {
- GRPC_FD_REF(fd_or_null, "basicpoll");
- }
-}
-
-/*******************************************************************************
- * pollset_multipoller_with_poll_posix.c
- */
-
-#ifndef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
-
-typedef struct {
- /* all polled fds */
- size_t fd_count;
- size_t fd_capacity;
- grpc_fd **fds;
- /* fds that have been removed from the pollset explicitly */
- size_t del_count;
- size_t del_capacity;
- grpc_fd **dels;
-} poll_hdr;
-
-static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
- grpc_fd *fd,
- int and_unlock_pollset) {
- size_t i;
- poll_hdr *h = pollset->data.ptr;
- /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
- for (i = 0; i < h->fd_count; i++) {
- if (h->fds[i] == fd) goto exit;
- }
- if (h->fd_count == h->fd_capacity) {
- h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
- h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity);
- }
- h->fds[h->fd_count++] = fd;
- GRPC_FD_REF(fd, "multipoller");
-exit:
- if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
- }
-}
-
-static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock(
- grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now) {
-#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
-#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
-
- int timeout;
- int r;
- size_t i, j, fd_count;
- nfds_t pfd_count;
- poll_hdr *h;
- /* TODO(ctiller): inline some elements to avoid an allocation */
- grpc_fd_watcher *watchers;
- struct pollfd *pfds;
- grpc_error *error = GRPC_ERROR_NONE;
-
- h = pollset->data.ptr;
- timeout = poll_deadline_to_millis_timeout(deadline, now);
- /* TODO(ctiller): perform just one malloc here if we exceed the inline case */
- pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2));
- watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2));
- fd_count = 0;
- pfd_count = 2;
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
- pfds[0].events = POLLIN;
- pfds[0].revents = 0;
- pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
- pfds[1].events = POLLIN;
- pfds[1].revents = 0;
- for (i = 0; i < h->fd_count; i++) {
- int remove = fd_is_orphaned(h->fds[i]);
- for (j = 0; !remove && j < h->del_count; j++) {
- if (h->fds[i] == h->dels[j]) remove = 1;
- }
- if (remove) {
- GRPC_FD_UNREF(h->fds[i], "multipoller");
- } else {
- h->fds[fd_count++] = h->fds[i];
- watchers[pfd_count].fd = h->fds[i];
- GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
- pfds[pfd_count].fd = h->fds[i]->fd;
- pfds[pfd_count].revents = 0;
- pfd_count++;
- }
- }
- for (j = 0; j < h->del_count; j++) {
- GRPC_FD_UNREF(h->dels[j], "multipoller_del");
- }
- h->del_count = 0;
- h->fd_count = fd_count;
- gpr_mu_unlock(&pollset->mu);
-
- for (i = 2; i < pfd_count; i++) {
- grpc_fd *fd = watchers[i].fd;
- pfds[i].events = (short)fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT,
- &watchers[i]);
- GRPC_FD_UNREF(fd, "multipoller_start");
- }
-
- /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
- even going into the blocking annotation if possible */
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- r = grpc_poll_function(pfds, pfd_count, timeout);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
-
- if (r < 0) {
- if (errno != EINTR) {
- work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
- }
- for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
- }
- } else if (r == 0) {
- for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
- }
- } else {
- if (pfds[0].revents & POLLIN_CHECK) {
- work_combine_error(&error,
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
- }
- if (pfds[1].revents & POLLIN_CHECK) {
- work_combine_error(&error,
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
- }
- for (i = 2; i < pfd_count; i++) {
- if (watchers[i].fd == NULL) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
- continue;
- }
- fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
- pfds[i].revents & POLLOUT_CHECK, pollset);
- }
- }
-
- gpr_free(pfds);
- gpr_free(watchers);
-
- return error;
-}
-
-static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
- size_t i;
- poll_hdr *h = pollset->data.ptr;
- for (i = 0; i < h->fd_count; i++) {
- GRPC_FD_UNREF(h->fds[i], "multipoller");
- }
- for (i = 0; i < h->del_count; i++) {
- GRPC_FD_UNREF(h->dels[i], "multipoller_del");
- }
- h->fd_count = 0;
- h->del_count = 0;
-}
-
-static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
- poll_hdr *h = pollset->data.ptr;
- multipoll_with_poll_pollset_finish_shutdown(pollset);
- gpr_free(h->fds);
- gpr_free(h->dels);
- gpr_free(h);
-}
-
-static const grpc_pollset_vtable multipoll_with_poll_pollset = {
- multipoll_with_poll_pollset_add_fd,
- multipoll_with_poll_pollset_maybe_work_and_unlock,
- multipoll_with_poll_pollset_finish_shutdown,
- multipoll_with_poll_pollset_destroy};
-
-static void poll_become_multipoller(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset, grpc_fd **fds,
- size_t nfds) {
- size_t i;
- poll_hdr *h = gpr_malloc(sizeof(poll_hdr));
- pollset->vtable = &multipoll_with_poll_pollset;
- pollset->data.ptr = h;
- h->fd_count = nfds;
- h->fd_capacity = nfds;
- h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
- h->del_count = 0;
- h->del_capacity = 0;
- h->dels = NULL;
- for (i = 0; i < nfds; i++) {
- h->fds[i] = fds[i];
- GRPC_FD_REF(fds[i], "multipoller");
- }
-}
-
-#endif /* !GRPC_LINUX_MULTIPOLL_WITH_EPOLL */
-
-/*******************************************************************************
- * pollset_multipoller_with_epoll_posix.c
- */
-
-#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
-
-#include <errno.h>
-#include <poll.h>
-#include <string.h>
-#include <sys/epoll.h>
-#include <unistd.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/block_annotate.h"
-
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st,
- grpc_pollset *read_notifier_pollset) {
- /* only one set_ready can be active at once (but there may be a racing
- notify_on) */
- gpr_mu_lock(&fd->mu);
- set_ready_locked(exec_ctx, fd, st);
-
- /* A non-NULL read_notifier_pollset means that the fd is readable. */
- if (read_notifier_pollset != NULL) {
- /* Note: Since the fd might be a part of multiple pollsets, this might be
- * called multiple times (for each time the fd becomes readable) and it is
- * okay to set the fd's read-notifier pollset to anyone of these pollsets */
- set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
- }
-
- gpr_mu_unlock(&fd->mu);
-}
-
-static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_pollset *notifier_pollset) {
- set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset);
-}
-
-static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- set_ready(exec_ctx, fd, &fd->write_closure, NULL);
-}
-
-struct epoll_fd_list {
- int *epoll_fds;
- size_t count;
- size_t capacity;
-};
-
-static struct epoll_fd_list epoll_fd_global_list;
-static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT;
-static gpr_mu epoll_fd_list_mu;
-
-static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); }
-
-static void add_epoll_fd_to_global_list(int epoll_fd) {
- gpr_once_init(&init_epoll_fd_list_mu, init_mu);
-
- gpr_mu_lock(&epoll_fd_list_mu);
- if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) {
- epoll_fd_global_list.capacity =
- GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2);
- epoll_fd_global_list.epoll_fds =
- gpr_realloc(epoll_fd_global_list.epoll_fds,
- epoll_fd_global_list.capacity * sizeof(int));
- }
- epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd;
- gpr_mu_unlock(&epoll_fd_list_mu);
-}
-
-static void remove_epoll_fd_from_global_list(int epoll_fd) {
- gpr_mu_lock(&epoll_fd_list_mu);
- GPR_ASSERT(epoll_fd_global_list.count > 0);
- for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
- if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) {
- epoll_fd_global_list.epoll_fds[i] =
- epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)];
- break;
- }
- }
- gpr_mu_unlock(&epoll_fd_list_mu);
-}
-
-static void remove_fd_from_all_epoll_sets(int fd) {
- int err;
- gpr_once_init(&init_epoll_fd_list_mu, init_mu);
- gpr_mu_lock(&epoll_fd_list_mu);
- if (epoll_fd_global_list.count == 0) {
- gpr_mu_unlock(&epoll_fd_list_mu);
- return;
- }
- for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
- err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL);
- if (err < 0 && errno != ENOENT) {
- gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd,
- strerror(errno));
- }
- }
- gpr_mu_unlock(&epoll_fd_list_mu);
-}
-
-typedef struct {
- grpc_pollset *pollset;
- grpc_fd *fd;
- grpc_closure closure;
-} delayed_add;
-
-typedef struct { int epoll_fd; } epoll_hdr;
-
-static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_fd *fd) {
- epoll_hdr *h = pollset->data.ptr;
- struct epoll_event ev;
- int err;
- grpc_fd_watcher watcher;
-
- /* We pretend to be polling whilst adding an fd to keep the fd from being
- closed during the add. This may result in a spurious wakeup being assigned
- to this pollset whilst adding, but that should be benign. */
- GPR_ASSERT(fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
- if (watcher.fd != NULL) {
- ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fd;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
- if (err < 0) {
- /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
- if (errno != EEXIST) {
- gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
- strerror(errno));
- }
- }
- }
- fd_end_poll(exec_ctx, &watcher, 0, 0, NULL);
-}
-
-static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- delayed_add *da = arg;
-
- if (!fd_is_orphaned(da->fd)) {
- finally_add_fd(exec_ctx, da->pollset, da->fd);
- }
-
- gpr_mu_lock(&da->pollset->mu);
- da->pollset->in_flight_cbs--;
- if (da->pollset->shutting_down) {
- /* We don't care about this pollset anymore. */
- if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
- da->pollset->called_shutdown = 1;
- grpc_exec_ctx_sched(exec_ctx, da->pollset->shutdown_done, GRPC_ERROR_NONE,
- NULL);
- }
- }
- gpr_mu_unlock(&da->pollset->mu);
-
- GRPC_FD_UNREF(da->fd, "delayed_add");
-
- gpr_free(da);
-}
-
-static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
- grpc_fd *fd,
- int and_unlock_pollset) {
- if (and_unlock_pollset) {
- gpr_mu_unlock(&pollset->mu);
- finally_add_fd(exec_ctx, pollset, fd);
- } else {
- delayed_add *da = gpr_malloc(sizeof(*da));
- da->pollset = pollset;
- da->fd = fd;
- GRPC_FD_REF(fd, "delayed_add");
- grpc_closure_init(&da->closure, perform_delayed_add, da);
- pollset->in_flight_cbs++;
- grpc_exec_ctx_sched(exec_ctx, &da->closure, GRPC_ERROR_NONE, NULL);
- }
-}
-
-/* TODO(klempner): We probably want to turn this down a bit */
-#define GRPC_EPOLL_MAX_EVENTS 1000
-
-static grpc_error *multipoll_with_epoll_pollset_maybe_work_and_unlock(
- grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now) {
- struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
- int ep_rv;
- int poll_rv;
- epoll_hdr *h = pollset->data.ptr;
- int timeout_ms;
- struct pollfd pfds[2];
- grpc_error *error = GRPC_ERROR_NONE;
-
- /* If you want to ignore epoll's ability to sanely handle parallel pollers,
- * for a more apples-to-apples performance comparison with poll, add a
- * if (pollset->counter != 0) { return 0; }
- * here.
- */
-
- gpr_mu_unlock(&pollset->mu);
-
- timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
-
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
- pfds[0].events = POLLIN;
- pfds[0].revents = 0;
- pfds[1].fd = h->epoll_fd;
- pfds[1].events = POLLIN;
- pfds[1].revents = 0;
-
- /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
- even going into the blocking annotation if possible */
- GPR_TIMER_BEGIN("poll", 0);
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- GPR_TIMER_END("poll", 0);
-
- if (poll_rv < 0) {
- if (errno != EINTR) {
- work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
- }
- } else if (poll_rv == 0) {
- /* do nothing */
- } else {
- if (pfds[0].revents) {
- work_combine_error(&error,
- grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd));
- }
- if (pfds[1].revents) {
- do {
- /* The following epoll_wait never blocks; it has a timeout of 0 */
- ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
- if (ep_rv < 0) {
- if (errno != EINTR) {
- work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_wait"));
- }
- } else {
- int i;
- for (i = 0; i < ep_rv; ++i) {
- grpc_fd *fd = ep_ev[i].data.ptr;
- /* TODO(klempner): We might want to consider making err and pri
- * separate events */
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (fd == NULL) {
- work_combine_error(&error, grpc_wakeup_fd_consume_wakeup(
- &grpc_global_wakeup_fd));
- } else {
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
- }
- }
- }
- } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
- }
- }
- return error;
-}
-
-static void multipoll_with_epoll_pollset_finish_shutdown(
- grpc_pollset *pollset) {}
-
-static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
- epoll_hdr *h = pollset->data.ptr;
- close(h->epoll_fd);
- remove_epoll_fd_from_global_list(h->epoll_fd);
- gpr_free(h);
-}
-
-static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
- multipoll_with_epoll_pollset_add_fd,
- multipoll_with_epoll_pollset_maybe_work_and_unlock,
- multipoll_with_epoll_pollset_finish_shutdown,
- multipoll_with_epoll_pollset_destroy};
-
-static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset, grpc_fd **fds,
- size_t nfds) {
- size_t i;
- epoll_hdr *h = gpr_malloc(sizeof(epoll_hdr));
- struct epoll_event ev;
- int err;
-
- pollset->vtable = &multipoll_with_epoll_pollset;
- pollset->data.ptr = h;
- h->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
- if (h->epoll_fd < 0) {
- /* TODO(klempner): Fall back to poll here, especially on ENOSYS */
- gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
- abort();
- }
- add_epoll_fd_to_global_list(h->epoll_fd);
-
- ev.events = (uint32_t)(EPOLLIN | EPOLLET);
- ev.data.ptr = NULL;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
- GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
- if (err < 0) {
- gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
- GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
- strerror(errno));
- }
-
- for (i = 0; i < nfds; i++) {
- multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
- }
-}
-
-#else /* GRPC_LINUX_MULTIPOLL_WITH_EPOLL */
-
-static void remove_fd_from_all_epoll_sets(int fd) {}
-
-#endif /* GRPC_LINUX_MULTIPOLL_WITH_EPOLL */
-
-/*******************************************************************************
- * pollset_set_posix.c
- */
-
-static grpc_pollset_set *pollset_set_create(void) {
- grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
- memset(pollset_set, 0, sizeof(*pollset_set));
- gpr_mu_init(&pollset_set->mu);
- return pollset_set;
-}
-
-static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
- size_t i;
- gpr_mu_destroy(&pollset_set->mu);
- for (i = 0; i < pollset_set->fd_count; i++) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
- }
- gpr_free(pollset_set->pollsets);
- gpr_free(pollset_set->pollset_sets);
- gpr_free(pollset_set->fds);
- gpr_free(pollset_set);
-}
-
-static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {
- size_t i, j;
- gpr_mu_lock(&pollset_set->mu);
- if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
- pollset_set->pollset_capacity =
- GPR_MAX(8, 2 * pollset_set->pollset_capacity);
- pollset_set->pollsets =
- gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
- sizeof(*pollset_set->pollsets));
- }
- pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
- for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
- if (fd_is_orphaned(pollset_set->fds[i])) {
- GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
- } else {
- pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
- pollset_set->fds[j++] = pollset_set->fds[i];
- }
- }
- pollset_set->fd_count = j;
- gpr_mu_unlock(&pollset_set->mu);
-}
-
-static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- for (i = 0; i < pollset_set->pollset_count; i++) {
- if (pollset_set->pollsets[i] == pollset) {
- pollset_set->pollset_count--;
- GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
- pollset_set->pollsets[pollset_set->pollset_count]);
- break;
- }
- }
- gpr_mu_unlock(&pollset_set->mu);
-}
-
-static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {
- size_t i, j;
- gpr_mu_lock(&bag->mu);
- if (bag->pollset_set_count == bag->pollset_set_capacity) {
- bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
- bag->pollset_sets =
- gpr_realloc(bag->pollset_sets,
- bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
- }
- bag->pollset_sets[bag->pollset_set_count++] = item;
- for (i = 0, j = 0; i < bag->fd_count; i++) {
- if (fd_is_orphaned(bag->fds[i])) {
- GRPC_FD_UNREF(bag->fds[i], "pollset_set");
- } else {
- pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
- bag->fds[j++] = bag->fds[i];
- }
- }
- bag->fd_count = j;
- gpr_mu_unlock(&bag->mu);
-}
-
-static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {
- size_t i;
- gpr_mu_lock(&bag->mu);
- for (i = 0; i < bag->pollset_set_count; i++) {
- if (bag->pollset_sets[i] == item) {
- bag->pollset_set_count--;
- GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
- bag->pollset_sets[bag->pollset_set_count]);
- break;
- }
- }
- gpr_mu_unlock(&bag->mu);
-}
-
-static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set, grpc_fd *fd) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- if (pollset_set->fd_count == pollset_set->fd_capacity) {
- pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
- pollset_set->fds = gpr_realloc(
- pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
- }
- GRPC_FD_REF(fd, "pollset_set");
- pollset_set->fds[pollset_set->fd_count++] = fd;
- for (i = 0; i < pollset_set->pollset_count; i++) {
- pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
- }
- for (i = 0; i < pollset_set->pollset_set_count; i++) {
- pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
- }
- gpr_mu_unlock(&pollset_set->mu);
-}
-
-static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pollset_set, grpc_fd *fd) {
- size_t i;
- gpr_mu_lock(&pollset_set->mu);
- for (i = 0; i < pollset_set->fd_count; i++) {
- if (pollset_set->fds[i] == fd) {
- pollset_set->fd_count--;
- GPR_SWAP(grpc_fd *, pollset_set->fds[i],
- pollset_set->fds[pollset_set->fd_count]);
- GRPC_FD_UNREF(fd, "pollset_set");
- break;
- }
- }
- for (i = 0; i < pollset_set->pollset_set_count; i++) {
- pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
- }
- gpr_mu_unlock(&pollset_set->mu);
-}
-
-/*******************************************************************************
- * workqueue stubs
- */
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- return workqueue;
-}
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- return workqueue;
-}
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {}
-#endif
-
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue, grpc_closure *closure,
- grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
-}
-
-/*******************************************************************************
- * event engine binding
- */
-
-static void shutdown_engine(void) {
- fd_global_shutdown();
- pollset_global_shutdown();
-}
-
-static const grpc_event_engine_vtable vtable = {
- .pollset_size = sizeof(grpc_pollset),
-
- .fd_create = fd_create,
- .fd_wrapped_fd = fd_wrapped_fd,
- .fd_orphan = fd_orphan,
- .fd_shutdown = fd_shutdown,
- .fd_is_shutdown = fd_is_shutdown,
- .fd_notify_on_read = fd_notify_on_read,
- .fd_notify_on_write = fd_notify_on_write,
- .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
-
- .pollset_init = pollset_init,
- .pollset_shutdown = pollset_shutdown,
- .pollset_reset = pollset_reset,
- .pollset_destroy = pollset_destroy,
- .pollset_work = pollset_work,
- .pollset_kick = pollset_kick,
- .pollset_add_fd = pollset_add_fd,
-
- .pollset_set_create = pollset_set_create,
- .pollset_set_destroy = pollset_set_destroy,
- .pollset_set_add_pollset = pollset_set_add_pollset,
- .pollset_set_del_pollset = pollset_set_del_pollset,
- .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
- .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
- .pollset_set_add_fd = pollset_set_add_fd,
- .pollset_set_del_fd = pollset_set_del_fd,
-
- .kick_poller = kick_poller,
-
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_enqueue = workqueue_enqueue,
-
- .shutdown_engine = shutdown_engine,
-};
-
-const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) {
-#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
- platform_become_multipoller = epoll_become_multipoller;
-#else
- platform_become_multipoller = poll_become_multipoller;
-#endif
- fd_global_init();
- pollset_global_init();
- return &vtable;
-}
-
-#endif
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.h b/src/core/lib/iomgr/ev_poll_and_epoll_posix.h
deleted file mode 100644
index 06d6dbf29d..0000000000
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H
-#define GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H
-
-#include "src/core/lib/iomgr/ev_posix.h"
-
-const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void);
-
-#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_AND_EPOLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index ef36ba89b2..ab139895fd 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -45,7 +45,6 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_epoll_linux.h"
-#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
#include "src/core/lib/iomgr/ev_poll_posix.h"
#include "src/core/lib/support/env.h"
@@ -67,7 +66,6 @@ static const event_engine_factory g_factories[] = {
{"epoll", grpc_init_epoll_linux},
{"poll", grpc_init_poll_posix},
{"poll-cv", grpc_init_poll_cv_posix},
- {"legacy", grpc_init_poll_and_epoll_posix},
};
static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c
new file mode 100644
index 0000000000..34b61dcfe5
--- /dev/null
+++ b/src/core/lib/iomgr/socket_mutator.c
@@ -0,0 +1,98 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/socket_mutator.h"
+
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+void grpc_socket_mutator_init(grpc_socket_mutator *mutator,
+ const grpc_socket_mutator_vtable *vtable) {
+ mutator->vtable = vtable;
+ gpr_ref_init(&mutator->refcount, 1);
+}
+
+grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator) {
+ gpr_ref(&mutator->refcount);
+ return mutator;
+}
+
+bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd) {
+ return mutator->vtable->mutate_fd(fd, mutator);
+}
+
+int grpc_socket_mutator_compare(grpc_socket_mutator *a,
+ grpc_socket_mutator *b) {
+ int c = GPR_ICMP(a, b);
+ if (c != 0) {
+ grpc_socket_mutator *sma = a;
+ grpc_socket_mutator *smb = b;
+ c = GPR_ICMP(sma->vtable, smb->vtable);
+ if (c == 0) {
+ c = sma->vtable->compare(sma, smb);
+ }
+ }
+ return c;
+}
+
+void grpc_socket_mutator_unref(grpc_socket_mutator *mutator) {
+ if (gpr_unref(&mutator->refcount)) {
+ mutator->vtable->destory(mutator);
+ }
+}
+
+static void *socket_mutator_arg_copy(void *p) {
+ return grpc_socket_mutator_ref(p);
+}
+
+static void socket_mutator_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) {
+ grpc_socket_mutator_unref(p);
+}
+
+static int socket_mutator_cmp(void *a, void *b) {
+ return grpc_socket_mutator_compare((grpc_socket_mutator *)a,
+ (grpc_socket_mutator *)b);
+}
+
+static const grpc_arg_pointer_vtable socket_mutator_arg_vtable = {
+ socket_mutator_arg_copy, socket_mutator_arg_destroy, socket_mutator_cmp};
+
+grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_POINTER;
+ arg.key = GRPC_ARG_SOCKET_MUTATOR;
+ arg.value.pointer.vtable = &socket_mutator_arg_vtable;
+ arg.value.pointer.p = mutator;
+ return arg;
+}
diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h
new file mode 100644
index 0000000000..2f5b6c248e
--- /dev/null
+++ b/src/core/lib/iomgr/socket_mutator.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H
+#define GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/sync.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/** The virtual table of grpc_socket_mutator */
+typedef struct {
+ /** Mutates the socket opitons of \a fd */
+ bool (*mutate_fd)(int fd, grpc_socket_mutator *mutator);
+ /** Compare socket mutator \a a and \a b */
+ int (*compare)(grpc_socket_mutator *a, grpc_socket_mutator *b);
+ /** Destroys the socket mutator instance */
+ void (*destory)(grpc_socket_mutator *mutator);
+} grpc_socket_mutator_vtable;
+
+/** The Socket Mutator interface allows changes on socket options */
+struct grpc_socket_mutator {
+ const grpc_socket_mutator_vtable *vtable;
+ gpr_refcount refcount;
+};
+
+/** called by concrete implementations to initialize the base struct */
+void grpc_socket_mutator_init(grpc_socket_mutator *mutator,
+ const grpc_socket_mutator_vtable *vtable);
+
+/** Wrap \a mutator as a grpc_arg */
+grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator);
+
+/** Perform the file descriptor mutation operation of \a mutator on \a fd */
+bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd);
+
+/** Compare if \a a and \a b are the same mutator or have same settings */
+int grpc_socket_mutator_compare(grpc_socket_mutator *a, grpc_socket_mutator *b);
+
+grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator);
+void grpc_socket_mutator_unref(grpc_socket_mutator *mutator);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H */
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c
index bc28bbe316..88e9ade253 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.c
+++ b/src/core/lib/iomgr/socket_utils_common_posix.c
@@ -209,6 +209,15 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
return GRPC_ERROR_NONE;
}
+/* set a socket using a grpc_socket_mutator */
+grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator) {
+ GPR_ASSERT(mutator);
+ if (!grpc_socket_mutator_mutate_fd(mutator, fd)) {
+ return GRPC_ERROR_CREATE("grpc_socket_mutator failed.");
+ }
+ return GRPC_ERROR_NONE;
+}
+
static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
static int g_ipv6_loopback_available;
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index 175fb2b717..e84d3781a1 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -39,7 +39,9 @@
#include <sys/socket.h>
#include <unistd.h>
+#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/socket_mutator.h"
/* a wrapper for accept or accept4 */
int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock,
@@ -88,6 +90,9 @@ grpc_error *grpc_set_socket_sndbuf(int fd, int buffer_size_bytes);
/* Tries to set the socket's receive buffer to given size. */
grpc_error *grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes);
+/* Tries to set the socket using a grpc_socket_mutator */
+grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator);
+
/* An enum to keep track of IPv4/IPv6 socket modes.
Currently, this information is only used when a socket is first created, but
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index 18e6e60ebc..0485661316 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -34,6 +34,7 @@
#ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H
#define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H
+#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/pollset_set.h"
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 3b0fe3bc15..7e49d89aa8 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -51,6 +51,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/socket_mutator.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/timer.h"
@@ -73,7 +74,8 @@ typedef struct {
grpc_channel_args *channel_args;
} async_connect;
-static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
+static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd,
+ const grpc_channel_args *channel_args) {
grpc_error *err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
@@ -88,6 +90,16 @@ static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (err != GRPC_ERROR_NONE) goto error;
+ if (channel_args) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) {
+ GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER);
+ grpc_socket_mutator *mutator = channel_args->args[i].value.pointer.p;
+ err = grpc_set_socket_with_mutator(fd, mutator);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+ }
+ }
goto done;
error:
@@ -287,7 +299,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy));
addr = &addr4_copy;
}
- if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) {
+ if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
return;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index e27a762f20..ba288c768e 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -497,6 +497,11 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
+static int tcp_get_fd(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return tcp->fd;
+}
+
static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return grpc_fd_get_workqueue(tcp->em_fd);
@@ -515,7 +520,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_shutdown,
tcp_destroy,
tcp_get_resource_user,
- tcp_get_peer};
+ tcp_get_peer,
+ tcp_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
grpc_resource_quota *resource_quota,
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 8b58c04ff5..6e2ad1dbe9 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -38,14 +38,17 @@
#include <limits.h>
#include <string.h>
+#include <grpc/slice_buffer.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/network_status_tracker.h"
+#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/tcp_uv.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
int grpc_tcp_trace = 0;
@@ -62,15 +65,14 @@ typedef struct {
grpc_closure *read_cb;
grpc_closure *write_cb;
- GRPC_SLICE read_slice;
- GRPC_SLICE_buffer *read_slices;
- GRPC_SLICE_buffer *write_slices;
+ grpc_slice read_slice;
+ grpc_slice_buffer *read_slices;
+ grpc_slice_buffer *write_slices;
uv_buf_t *write_buffers;
- grpc_resource_user resource_user;
+ grpc_resource_user *resource_user;
bool shutting_down;
- bool resource_user_shutting_down;
char *peer_string;
grpc_pollset *pollset;
@@ -78,23 +80,23 @@ typedef struct {
static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); }
-static void tcp_free(grpc_tcp *tcp) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user);
+static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp);
- grpc_exec_ctx_finish(&exec_ctx);
}
/*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
- int line) {
+#define TCP_UNREF(exec_ctx, tcp, reason) \
+ tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) \
+ tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
+ const char *reason, const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(exec_ctx, tcp);
}
}
@@ -105,11 +107,11 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
gpr_ref(&tcp->refcount);
}
#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp *tcp) {
+static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(exec_ctx, tcp);
}
}
@@ -122,7 +124,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
grpc_tcp *tcp = handle->data;
(void)suggested_size;
tcp->read_slice = grpc_resource_user_slice_malloc(
- &exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice);
buf->len = GRPC_SLICE_LENGTH(tcp->read_slice);
grpc_exec_ctx_finish(&exec_ctx);
@@ -130,7 +132,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
static void read_callback(uv_stream_t *stream, ssize_t nread,
const uv_buf_t *buf) {
- GRPC_SLICE sub;
+ grpc_slice sub;
grpc_error *error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp *tcp = stream->data;
@@ -139,7 +141,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// Nothing happened. Wait for the next callback
return;
}
- TCP_UNREF(tcp, "read");
+ TCP_UNREF(&exec_ctx, tcp, "read");
tcp->read_cb = NULL;
// TODO(murgatroid99): figure out what the return value here means
uv_read_stop(stream);
@@ -147,8 +149,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
error = GRPC_ERROR_CREATE("EOF");
} else if (nread > 0) {
// Successful read
- sub = GRPC_SLICE_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
- GRPC_SLICE_buffer_add(tcp->read_slices, sub);
+ sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
+ grpc_slice_buffer_add(tcp->read_slices, sub);
error = GRPC_ERROR_NONE;
if (grpc_tcp_trace) {
size_t i;
@@ -156,8 +158,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
gpr_log(GPR_DEBUG, "read: error=%s", str);
grpc_error_free_string(str);
for (i = 0; i < tcp->read_slices->count; i++) {
- char *dump = gpr_dump_slice(tcp->read_slices->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ char *dump = grpc_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
dump);
gpr_free(dump);
@@ -172,14 +174,14 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
}
static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- GRPC_SLICE_buffer *read_slices, grpc_closure *cb) {
+ grpc_slice_buffer *read_slices, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
int status;
grpc_error *error = GRPC_ERROR_NONE;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_slices = read_slices;
- GRPC_SLICE_buffer_reset_and_unref(read_slices);
+ grpc_slice_buffer_reset_and_unref(read_slices);
TCP_REF(tcp, "read");
// TODO(murgatroid99): figure out what the return value here means
status =
@@ -202,7 +204,7 @@ static void write_callback(uv_write_t *req, int status) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure *cb = tcp->write_cb;
tcp->write_cb = NULL;
- TCP_UNREF(tcp, "write");
+ TCP_UNREF(&exec_ctx, tcp, "write");
if (status == 0) {
error = GRPC_ERROR_NONE;
} else {
@@ -213,28 +215,28 @@ static void write_callback(uv_write_t *req, int status) {
gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
}
gpr_free(tcp->write_buffers);
- grpc_resource_user_free(&exec_ctx, &tcp->resource_user,
+ grpc_resource_user_free(&exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- GRPC_SLICE_buffer *write_slices,
+ grpc_slice_buffer *write_slices,
grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
uv_buf_t *buffers;
unsigned int buffer_count;
unsigned int i;
- GRPC_SLICE *slice;
+ grpc_slice *slice;
uv_write_t *write_req;
if (grpc_tcp_trace) {
size_t j;
for (j = 0; j < write_slices->count; j++) {
- char *data = gpr_dump_slice(write_slices->slices[j],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ char *data = grpc_dump_slice(write_slices->slices[j],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
@@ -259,7 +261,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->write_cb = cb;
buffer_count = (unsigned int)tcp->write_slices->count;
buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
- grpc_resource_user_alloc(exec_ctx, &tcp->resource_user,
+ grpc_resource_user_alloc(exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * buffer_count, NULL);
for (i = 0; i < buffer_count; i++) {
slice = &tcp->write_slices->slices[i];
@@ -295,22 +297,6 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void shutdown_callback(uv_shutdown_t *req, int status) {}
-static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- TCP_UNREF(arg, "resource_user");
-}
-
-static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_tcp *tcp) {
- if (!tcp->resource_user_shutting_down) {
- tcp->resource_user_shutting_down = true;
- TCP_REF(tcp, "resource_user");
- grpc_resource_user_shutdown(
- exec_ctx, &tcp->resource_user,
- grpc_closure_create(resource_user_shutdown_done, tcp));
- }
-}
-
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
if (!tcp->shutting_down) {
@@ -324,8 +310,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
- uv_resource_user_maybe_shutdown(exec_ctx, tcp);
- TCP_UNREF(tcp, "destroy");
+ TCP_UNREF(exec_ctx, tcp, "destroy");
}
static char *uv_get_peer(grpc_endpoint *ep) {
@@ -335,15 +320,18 @@ static char *uv_get_peer(grpc_endpoint *ep) {
static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- return &tcp->resource_user;
+ return tcp->resource_user;
}
static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
+static int uv_get_fd(grpc_endpoint *ep) { return -1; }
+
static grpc_endpoint_vtable vtable = {
uv_endpoint_read, uv_endpoint_write, uv_get_workqueue,
uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
- uv_destroy, uv_get_resource_user, uv_get_peer};
+ uv_destroy, uv_get_resource_user, uv_get_peer,
+ uv_get_fd};
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
@@ -364,8 +352,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
tcp->shutting_down = false;
- tcp->resource_user_shutting_down = false;
- grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
+ tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index e5fa360094..34afe51df0 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -402,6 +402,8 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
return tcp->resource_user;
}
+static int win_get_fd(grpc_endpoint *ep) { return -1; }
+
static grpc_endpoint_vtable vtable = {win_read,
win_write,
win_get_workqueue,
@@ -410,7 +412,8 @@ static grpc_endpoint_vtable vtable = {win_read,
win_shutdown,
win_destroy,
win_get_resource_user,
- win_get_peer};
+ win_get_peer,
+ win_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,
diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.c b/src/core/lib/security/credentials/jwt/jwt_credentials.c
index 4ce5675fba..616be64a54 100644
--- a/src/core/lib/security/credentials/jwt/jwt_credentials.c
+++ b/src/core/lib/security/credentials/jwt/jwt_credentials.c
@@ -147,17 +147,44 @@ grpc_service_account_jwt_access_credentials_create_from_auth_json_key(
return &c->base;
}
+static char *redact_private_key(const char *json_key) {
+ char *json_copy = gpr_strdup(json_key);
+ grpc_json *json = grpc_json_parse_string(json_copy);
+ if (!json) {
+ gpr_free(json_copy);
+ return gpr_strdup("<Json failed to parse.>");
+ }
+ const char *redacted = "<redacted>";
+ grpc_json *current = json->child;
+ while (current) {
+ if (current->type == GRPC_JSON_STRING &&
+ strcmp(current->key, "private_key") == 0) {
+ current->value = (char *)redacted;
+ break;
+ }
+ current = current->next;
+ }
+ char *clean_json = grpc_json_dump_to_string(json, 2);
+ gpr_free(json_copy);
+ grpc_json_destroy(json);
+ return clean_json;
+}
+
grpc_call_credentials *grpc_service_account_jwt_access_credentials_create(
const char *json_key, gpr_timespec token_lifetime, void *reserved) {
- GRPC_API_TRACE(
- "grpc_service_account_jwt_access_credentials_create("
- "json_key=%s, "
- "token_lifetime="
- "gpr_timespec { tv_sec: %" PRId64
- ", tv_nsec: %d, clock_type: %d }, "
- "reserved=%p)",
- 5, (json_key, token_lifetime.tv_sec, token_lifetime.tv_nsec,
- (int)token_lifetime.clock_type, reserved));
+ if (grpc_api_trace) {
+ char *clean_json = redact_private_key(json_key);
+ gpr_log(GPR_INFO,
+ "grpc_service_account_jwt_access_credentials_create("
+ "json_key=%s, "
+ "token_lifetime="
+ "gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
+ "reserved=%p)",
+ clean_json, token_lifetime.tv_sec, token_lifetime.tv_nsec,
+ (int)token_lifetime.clock_type, reserved);
+ gpr_free(clean_json);
+ }
GPR_ASSERT(reserved == NULL);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_call_credentials *creds =
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index b7bdc53a35..cbcd74958c 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -394,15 +394,32 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token(
return &c->base.base;
}
+static char *create_loggable_refresh_token(grpc_auth_refresh_token *token) {
+ if (strcmp(token->type, GRPC_AUTH_JSON_TYPE_INVALID) == 0) {
+ return gpr_strdup("<Invalid json token>");
+ }
+ char *loggable_token = NULL;
+ gpr_asprintf(&loggable_token,
+ "{\n type: %s\n client_id: %s\n client_secret: "
+ "<redacted>\n refresh_token: <redacted>\n}",
+ token->type, token->client_id);
+ return loggable_token;
+}
+
grpc_call_credentials *grpc_google_refresh_token_credentials_create(
const char *json_refresh_token, void *reserved) {
- GRPC_API_TRACE(
- "grpc_refresh_token_credentials_create(json_refresh_token=%s, "
- "reserved=%p)",
- 2, (json_refresh_token, reserved));
+ grpc_auth_refresh_token token =
+ grpc_auth_refresh_token_create_from_string(json_refresh_token);
+ if (grpc_api_trace) {
+ char *loggable_token = create_loggable_refresh_token(&token);
+ gpr_log(GPR_INFO,
+ "grpc_refresh_token_credentials_create(json_refresh_token=%s, "
+ "reserved=%p)",
+ loggable_token, reserved);
+ gpr_free(loggable_token);
+ }
GPR_ASSERT(reserved == NULL);
- return grpc_refresh_token_credentials_create_from_auth_refresh_token(
- grpc_auth_refresh_token_create_from_string(json_refresh_token));
+ return grpc_refresh_token_credentials_create_from_auth_refresh_token(token);
}
//
@@ -433,9 +450,9 @@ grpc_call_credentials *grpc_access_token_credentials_create(
gpr_malloc(sizeof(grpc_access_token_credentials));
char *token_md_value;
GRPC_API_TRACE(
- "grpc_access_token_credentials_create(access_token=%s, "
+ "grpc_access_token_credentials_create(access_token=<redacted>, "
"reserved=%p)",
- 2, (access_token, reserved));
+ 1, (reserved));
GPR_ASSERT(reserved == NULL);
memset(c, 0, sizeof(grpc_access_token_credentials));
c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2;
diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c
index 29f28024f6..f90d7dce83 100644
--- a/src/core/lib/security/credentials/plugin/plugin_credentials.c
+++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c
@@ -106,6 +106,8 @@ static void plugin_md_request_metadata_ready(void *request,
grpc_slice_unref_internal(&exec_ctx, md_array[i].value);
}
gpr_free(md_array);
+ } else if (num_md == 0) {
+ r->cb(&exec_ctx, r->user_data, NULL, 0, GRPC_CREDENTIALS_OK, NULL);
}
}
gpr_free(r);
diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c
index 26e63a71e9..16d758d7a1 100644
--- a/src/core/lib/security/transport/handshake.c
+++ b/src/core/lib/security/transport/handshake.c
@@ -127,7 +127,7 @@ static void security_handshake_done(grpc_exec_ctx *exec_ctx,
h->auth_context);
} else {
const char *msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "Security handshake failed: %s", msg);
+ gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg);
grpc_error_free_string(msg);
if (h->secure_endpoint != NULL) {
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 594aa6161d..0e9a392c60 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -31,7 +31,12 @@
*
*/
-#include "src/core/lib/security/transport/secure_endpoint.h"
+/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
+ using that endpoint. Because of various transitive includes in uv.h,
+ including windows.h on Windows, uv.h must be included before other system
+ headers. Therefore, sockaddr.h must always be included first */
+#include "src/core/lib/iomgr/sockaddr.h"
+
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
@@ -39,6 +44,7 @@
#include <grpc/support/sync.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/security/transport/secure_endpoint.h"
#include "src/core/lib/security/transport/tsi_error.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -367,6 +373,8 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
+static int endpoint_get_fd(grpc_endpoint *secure_ep) { return -1; }
+
static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
return grpc_endpoint_get_workqueue(ep->wrapped_ep);
@@ -386,7 +394,8 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_shutdown,
endpoint_destroy,
endpoint_get_resource_user,
- endpoint_get_peer};
+ endpoint_get_peer,
+ endpoint_get_fd};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 246ca35bc6..7a0069710e 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -83,7 +83,8 @@ static grpc_metadata_array metadata_batch_to_md_array(
return result;
}
-static grpc_mdelem *remove_consumed_md(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *remove_consumed_md(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
size_t i;
diff --git a/src/core/lib/slice/slice.c b/src/core/lib/slice/slice.c
index 5b8f71a778..76118102ec 100644
--- a/src/core/lib/slice/slice.c
+++ b/src/core/lib/slice/slice.c
@@ -364,3 +364,11 @@ int grpc_slice_str_cmp(grpc_slice a, const char *b) {
if (d != 0) return d;
return memcmp(GRPC_SLICE_START_PTR(a), b, b_length);
}
+
+int grpc_slice_is_equivalent(grpc_slice a, grpc_slice b) {
+ if (a.refcount == NULL || b.refcount == NULL) {
+ return grpc_slice_cmp(a, b) == 0;
+ }
+ return a.data.refcounted.length == b.data.refcounted.length &&
+ a.data.refcounted.bytes == b.data.refcounted.bytes;
+}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 9be13d84fe..023b0411c7 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -124,6 +124,7 @@ struct grpc_call {
grpc_channel *channel;
grpc_call *parent;
grpc_call *first_child;
+ gpr_timespec start_time;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
@@ -241,6 +242,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
call->channel = args->channel;
call->cq = args->cq;
call->parent = args->parent_call;
+ call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == NULL;
@@ -313,10 +315,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
- grpc_error *error =
- grpc_call_stack_init(exec_ctx, channel_stack, 1, destroy_call, call,
- call->context, args->server_transport_data, path,
- send_deadline, CALL_STACK_FROM_CALL(call));
+ grpc_error *error = grpc_call_stack_init(
+ exec_ctx, channel_stack, 1, destroy_call, call, call->context,
+ args->server_transport_data, path, call->start_time, send_deadline,
+ CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) {
grpc_status_code status;
const char *error_str;
@@ -428,6 +430,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
get_final_status(call, set_status_value_directly,
&c->final_info.final_status);
+ c->final_info.stats.latency =
+ gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
@@ -632,9 +636,6 @@ static int prepare_application_metadata(
if (call->send_extra_metadata_count == 0) {
prepend_extra_metadata = 0;
} else {
- for (i = 0; i < call->send_extra_metadata_count; i++) {
- GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
- }
for (i = 1; i < call->send_extra_metadata_count; i++) {
call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
}
@@ -680,6 +681,7 @@ static int prepare_application_metadata(
&call->send_extra_metadata[call->send_extra_metadata_count - 1];
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
+ call->send_extra_metadata_count = 0;
break;
case 3: {
/* prepend AND md */
@@ -695,6 +697,7 @@ static int prepare_application_metadata(
batch->list.tail = linked_from_md(last_md);
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
+ call->send_extra_metadata_count = 0;
break;
}
default:
@@ -961,38 +964,35 @@ static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem,
return elem;
}
-typedef struct {
- grpc_exec_ctx *exec_ctx;
- grpc_call *call;
-} recv_filter_args;
-
-static grpc_mdelem *recv_initial_filter(void *args, grpc_mdelem *elem) {
- recv_filter_args *a = args;
- elem = recv_common_filter(a->exec_ctx, a->call, elem);
+static grpc_mdelem *recv_initial_filter(grpc_exec_ctx *exec_ctx, void *args,
+ grpc_mdelem *elem) {
+ grpc_call *call = args;
+ elem = recv_common_filter(exec_ctx, call, elem);
if (elem == NULL) {
return NULL;
} else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) {
GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
- set_incoming_compression_algorithm(a->call, decode_compression(elem));
+ set_incoming_compression_algorithm(call, decode_compression(elem));
GPR_TIMER_END("incoming_compression_algorithm", 0);
return NULL;
} else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
- set_encodings_accepted_by_peer(a->exec_ctx, a->call, elem);
+ set_encodings_accepted_by_peer(exec_ctx, call, elem);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
return NULL;
} else {
- return publish_app_metadata(a->call, elem, 0);
+ return publish_app_metadata(call, elem, 0);
}
}
-static grpc_mdelem *recv_trailing_filter(void *args, grpc_mdelem *elem) {
- recv_filter_args *a = args;
- elem = recv_common_filter(a->exec_ctx, a->call, elem);
+static grpc_mdelem *recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args,
+ grpc_mdelem *elem) {
+ grpc_call *call = args;
+ elem = recv_common_filter(exec_ctx, call, elem);
if (elem == NULL) {
return NULL;
} else {
- return publish_app_metadata(a->call, elem, 1);
+ return publish_app_metadata(call, elem, 1);
}
}
@@ -1238,8 +1238,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- recv_filter_args args = {exec_ctx, call};
- grpc_metadata_batch_filter(exec_ctx, md, recv_initial_filter, &args);
+ grpc_metadata_batch_filter(exec_ctx, md, recv_initial_filter, call);
GPR_TIMER_BEGIN("validate_filtered_metadata", 0);
validate_filtered_metadata(exec_ctx, bctl);
@@ -1305,8 +1304,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
if (bctl->recv_final_op) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_filter_args args = {exec_ctx, call};
- grpc_metadata_batch_filter(exec_ctx, md, recv_trailing_filter, &args);
+ grpc_metadata_batch_filter(exec_ctx, md, recv_trailing_filter, call);
call->received_final_op = true;
/* propagate cancellation to any interested children */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 530e5ed46c..03e0913a91 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -196,6 +196,7 @@ struct grpc_server {
grpc_completion_queue **cqs;
grpc_pollset **pollsets;
size_t cq_count;
+ size_t pollset_count;
bool started;
/* The two following mutexes control access to server-state
@@ -740,7 +741,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
}
}
-static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
+static grpc_mdelem *server_filter(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (md->key == GRPC_MDSTR_PATH) {
@@ -1087,7 +1089,7 @@ void grpc_server_start(grpc_server *server) {
GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
server->started = true;
- size_t pollset_count = 0;
+ server->pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
server->request_freelist_per_cq =
gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
@@ -1095,7 +1097,8 @@ void grpc_server_start(grpc_server *server) {
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
- server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]);
+ server->pollsets[server->pollset_count++] =
+ grpc_cq_pollset(server->cqs[i]);
}
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
@@ -1114,7 +1117,8 @@ void grpc_server_start(grpc_server *server) {
}
for (l = server->listeners; l; l = l->next) {
- l->start(&exec_ctx, server, l->arg, server->pollsets, pollset_count);
+ l->start(&exec_ctx, server, l->arg, server->pollsets,
+ server->pollset_count);
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -1122,7 +1126,7 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
size_t *pollset_count) {
- *pollset_count = server->cq_count;
+ *pollset_count = server->pollset_count;
*pollsets = server->pollsets;
}
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index fdb5307814..89072879d9 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -43,6 +43,8 @@ int grpc_connectivity_state_trace = 0;
const char *grpc_connectivity_state_name(grpc_connectivity_state state) {
switch (state) {
+ case GRPC_CHANNEL_INIT:
+ return "INIT";
case GRPC_CHANNEL_IDLE:
return "IDLE";
case GRPC_CHANNEL_CONNECTING:
@@ -159,6 +161,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
grpc_error_free_string(error_string);
}
switch (state) {
+ case GRPC_CHANNEL_INIT:
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
case GRPC_CHANNEL_READY:
diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c
index 9b5d8099c7..54c39df6a7 100644
--- a/src/core/lib/transport/metadata.c
+++ b/src/core/lib/transport/metadata.c
@@ -736,8 +736,8 @@ void *grpc_mdelem_get_user_data(grpc_mdelem *md, void (*destroy_func)(void *)) {
return result;
}
-void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *),
- void *user_data) {
+void *grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *),
+ void *user_data) {
internal_metadata *im = (internal_metadata *)md;
GPR_ASSERT(!is_mdelem_static(md));
GPR_ASSERT((user_data == NULL) == (destroy_func == NULL));
@@ -748,11 +748,12 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *),
if (destroy_func != NULL) {
destroy_func(user_data);
}
- return;
+ return (void *)gpr_atm_no_barrier_load(&im->user_data);
}
gpr_atm_no_barrier_store(&im->user_data, (gpr_atm)user_data);
gpr_atm_rel_store(&im->destroy_user_data, (gpr_atm)destroy_func);
gpr_mu_unlock(&im->mu_user_data);
+ return user_data;
}
grpc_slice grpc_mdstr_as_base64_encoded_and_huffman_compressed(grpc_mdstr *gs) {
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index 8a64be7025..991eee96f1 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -126,8 +126,8 @@ size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem *elem);
is used as a type tag and is checked during user_data fetch. */
void *grpc_mdelem_get_user_data(grpc_mdelem *md,
void (*if_destroy_func)(void *));
-void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *),
- void *user_data);
+void *grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *),
+ void *user_data);
/* Reference counting */
//#define GRPC_METADATA_REFCOUNT_DEBUG
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index 4430224e70..b62ecc3aa6 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -143,7 +143,8 @@ void grpc_metadata_batch_move(grpc_metadata_batch *dst,
void grpc_metadata_batch_filter(grpc_exec_ctx *exec_ctx,
grpc_metadata_batch *batch,
- grpc_mdelem *(*filter)(void *user_data,
+ grpc_mdelem *(*filter)(grpc_exec_ctx *exec_ctx,
+ void *user_data,
grpc_mdelem *elem),
void *user_data) {
grpc_linked_mdelem *l;
@@ -154,7 +155,7 @@ void grpc_metadata_batch_filter(grpc_exec_ctx *exec_ctx,
assert_valid_list(&batch->list);
for (l = batch->list.head; l; l = next) {
grpc_mdelem *orig = l->md;
- grpc_mdelem *filt = filter(user_data, orig);
+ grpc_mdelem *filt = filter(exec_ctx, user_data, orig);
next = l->next;
if (filt == NULL) {
if (l->prev) {
@@ -181,7 +182,8 @@ void grpc_metadata_batch_filter(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_metadata_batch_filter", 0);
}
-static grpc_mdelem *no_metadata_for_you(void *user_data, grpc_mdelem *elem) {
+static grpc_mdelem *no_metadata_for_you(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_mdelem *elem) {
return NULL;
}
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index 862c21b45b..c0bd5174ab 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -122,7 +122,8 @@ void grpc_metadata_batch_add_tail(grpc_metadata_batch *batch,
the element will be moved to the garbage list. */
void grpc_metadata_batch_filter(grpc_exec_ctx *exec_ctx,
grpc_metadata_batch *batch,
- grpc_mdelem *(*filter)(void *user_data,
+ grpc_mdelem *(*filter)(grpc_exec_ctx *exec_ctx,
+ void *user_data,
grpc_mdelem *elem),
void *user_data);
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 1b79520e68..9bc278c133 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -161,6 +161,11 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
return transport->vtable->get_peer(exec_ctx, transport);
}
+grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport) {
+ return transport->vtable->get_endpoint(exec_ctx, transport);
+}
+
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
grpc_error *error) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 3e38d98f28..d1281830aa 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -37,6 +37,7 @@
#include <stddef.h>
#include "src/core/lib/channel/context.h"
+#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
@@ -296,6 +297,10 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
+/* Get the endpoint used by \a transport */
+grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport);
+
/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
\a on_consumed and then delete the returned transport op */
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index fc7140671b..8553148c35 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -74,6 +74,9 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_get_peer */
char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
+
+ /* implementation of grpc_transport_get_endpoint */
+ grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
} grpc_transport_vtable;
/* an instance of a grpc transport */
diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc
index d86ba9dd4e..bc0b68b3e0 100644
--- a/src/cpp/common/channel_arguments.cc
+++ b/src/cpp/common/channel_arguments.cc
@@ -38,8 +38,11 @@
#include <grpc++/resource_quota.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
+extern "C" {
#include "src/core/lib/channel/channel_args.h"
-
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/socket_mutator.h"
+}
namespace grpc {
ChannelArguments::ChannelArguments() {
@@ -88,6 +91,26 @@ void ChannelArguments::SetCompressionAlgorithm(
SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm);
}
+void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) {
+ if (!mutator) {
+ return;
+ }
+ grpc_arg mutator_arg = grpc_socket_mutator_to_arg(mutator);
+ bool replaced = false;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ for (auto it = args_.begin(); it != args_.end(); ++it) {
+ if (it->type == mutator_arg.type &&
+ grpc::string(it->key) == grpc::string(mutator_arg.key)) {
+ it->value.pointer.vtable->destroy(&exec_ctx, it->value.pointer.p);
+ it->value.pointer = mutator_arg.value.pointer;
+ }
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+ if (!replaced) {
+ args_.push_back(mutator_arg);
+ }
+}
+
// Note: a second call to this will add in front the result of the first call.
// An example is calling this on a copy of ChannelArguments which already has a
// prefix. The user can build up a prefix string by calling this multiple times,
diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc
index ad2c0f2295..c0dc9dd63e 100644
--- a/src/cpp/common/channel_filter.cc
+++ b/src/cpp/common/channel_filter.cc
@@ -40,11 +40,12 @@ namespace grpc {
// MetadataBatch
-grpc_linked_mdelem *MetadataBatch::AddMetadata(const string &key,
+grpc_linked_mdelem *MetadataBatch::AddMetadata(grpc_exec_ctx *exec_ctx,
+ const string &key,
const string &value) {
grpc_linked_mdelem *storage = new grpc_linked_mdelem;
memset(storage, 0, sizeof(grpc_linked_mdelem));
- storage->md = grpc_mdelem_from_strings(key.c_str(), value.c_str());
+ storage->md = grpc_mdelem_from_strings(exec_ctx, key.c_str(), value.c_str());
grpc_metadata_batch_link_head(batch_, storage);
return storage;
}
@@ -89,7 +90,8 @@ std::vector<FilterRecord> *channel_filters;
namespace {
-bool MaybeAddFilter(grpc_channel_stack_builder *builder, void *arg) {
+bool MaybeAddFilter(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack_builder *builder, void *arg) {
const FilterRecord &filter = *(FilterRecord *)arg;
if (filter.include_filter) {
const grpc_channel_args *args =
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index e420efc71c..65f44660d2 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -70,7 +70,8 @@ class MetadataBatch {
/// Adds metadata and returns the newly allocated storage.
/// The caller takes ownership of the result, which must exist for the
/// lifetime of the gRPC call.
- grpc_linked_mdelem *AddMetadata(const string &key, const string &value);
+ grpc_linked_mdelem *AddMetadata(grpc_exec_ctx *exec_ctx, const string &key,
+ const string &value);
class const_iterator : public std::iterator<std::bidirectional_iterator_tag,
const grpc_mdelem> {
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 5e61e9ec12..da45c4829d 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -388,35 +388,29 @@ namespace Grpc.Core.Internal
private void Initialize(CompletionQueueSafeHandle cq)
{
- using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
- {
- var call = CreateNativeCall(cq);
+ var call = CreateNativeCall(cq);
- details.Channel.AddCallReference(this);
- InitializeInternal(call);
- RegisterCancellationCallback();
- }
+ details.Channel.AddCallReference(this);
+ InitializeInternal(call);
+ RegisterCancellationCallback();
}
private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
{
- using (Profilers.ForCurrentThread().NewScope("AsyncCall.CreateNativeCall"))
- {
- if (injectedNativeCall != null)
- {
- return injectedNativeCall; // allows injecting a mock INativeCall in tests.
- }
+ if (injectedNativeCall != null)
+ {
+ return injectedNativeCall; // allows injecting a mock INativeCall in tests.
+ }
- var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
+ var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
- var credentials = details.Options.Credentials;
- using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
- {
- var result = details.Channel.Handle.CreateCall(
- parentCall, ContextPropagationToken.DefaultMask, cq,
- details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
- return result;
- }
+ var credentials = details.Options.Credentials;
+ using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
+ {
+ var result = details.Channel.Handle.CreateCall(
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
+ return result;
}
}
@@ -456,47 +450,44 @@ namespace Grpc.Core.Internal
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true.
- using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
+ TaskCompletionSource<object> delayedStreamingWriteTcs = null;
+ TResponse msg = default(TResponse);
+ var deserializeException = TryDeserialize(receivedMessage, out msg);
+
+ lock (myLock)
{
- TaskCompletionSource<object> delayedStreamingWriteTcs = null;
- TResponse msg = default(TResponse);
- var deserializeException = TryDeserialize(receivedMessage, out msg);
+ finished = true;
- lock (myLock)
+ if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
{
- finished = true;
-
- if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
- {
- receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
- }
- finishedStatus = receivedStatus;
-
- if (isStreamingWriteCompletionDelayed)
- {
- delayedStreamingWriteTcs = streamingWriteTcs;
- streamingWriteTcs = null;
- }
-
- ReleaseResourcesIfPossible();
+ receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
+ finishedStatus = receivedStatus;
- responseHeadersTcs.SetResult(responseHeaders);
-
- if (delayedStreamingWriteTcs != null)
+ if (isStreamingWriteCompletionDelayed)
{
- delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
+ delayedStreamingWriteTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
}
- var status = receivedStatus.Status;
- if (status.StatusCode != StatusCode.OK)
- {
- unaryResponseTcs.SetException(new RpcException(status));
- return;
- }
+ ReleaseResourcesIfPossible();
+ }
+
+ responseHeadersTcs.SetResult(responseHeaders);
- unaryResponseTcs.SetResult(msg);
+ if (delayedStreamingWriteTcs != null)
+ {
+ delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
+ }
+
+ var status = receivedStatus.Status;
+ if (status.StatusCode != StatusCode.OK)
+ {
+ unaryResponseTcs.SetException(new RpcException(status));
+ return;
}
+
+ unaryResponseTcs.SetResult(msg);
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 9f9d260e7e..8668903f6e 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -181,19 +181,16 @@ namespace Grpc.Core.Internal
/// </summary>
protected bool ReleaseResourcesIfPossible()
{
- using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseResourcesIfPossible"))
+ if (!disposed && call != null)
{
- if (!disposed && call != null)
+ bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
+ if (noMoreSendCompletions && readingDone && finished)
{
- bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
- if (noMoreSendCompletions && readingDone && finished)
- {
- ReleaseResources();
- return true;
- }
+ ReleaseResources();
+ return true;
}
- return false;
}
+ return false;
}
protected abstract bool IsClient
@@ -229,28 +226,20 @@ namespace Grpc.Core.Internal
protected byte[] UnsafeSerialize(TWrite msg)
{
- using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSerialize"))
- {
- return serializer(msg);
- }
+ return serializer(msg);
}
protected Exception TryDeserialize(byte[] payload, out TRead msg)
{
- using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeserialize"))
+ try
{
- try
- {
-
- msg = deserializer(payload);
- return null;
-
- }
- catch (Exception e)
- {
- msg = default(TRead);
- return e;
- }
+ msg = deserializer(payload);
+ return null;
+ }
+ catch (Exception e)
+ {
+ msg = default(TRead);
+ return e;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 82361f5797..f817a61bce 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -76,11 +76,8 @@ namespace Grpc.Core.Internal
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- using (Profilers.ForCurrentThread().NewScope("CallSafeHandle.StartUnary"))
- {
- Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
- .CheckOk();
- }
+ Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
+ .CheckOk();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 62864dff0c..0fb6360a23 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -65,16 +65,13 @@ namespace Grpc.Core.Internal
public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
{
- using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
+ var result = Native.grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
+ if (credentials != null)
{
- var result = Native.grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
- if (credentials != null)
- {
- result.SetCredentials(credentials);
- }
- result.Initialize(cq);
- return result;
+ result.SetCredentials(credentials);
}
+ result.Initialize(cq);
+ return result;
}
public ChannelState CheckConnectivityState(bool tryToConnect)
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 46f5624223..6c9a31921e 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -70,10 +70,7 @@ namespace Grpc.Core.Internal
public CompletionQueueEvent Pluck(IntPtr tag)
{
- using (Profilers.ForCurrentThread().NewScope("CompletionQueueSafeHandle.Pluck"))
- {
- return Native.grpcsharp_completion_queue_pluck(this, tag);
- }
+ return Native.grpcsharp_completion_queue_pluck(this, tag);
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index a446c1f99f..25a6589f11 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -37,6 +37,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
+using Grpc.Core.Profiling;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
@@ -54,6 +55,8 @@ namespace Grpc.Core.Internal
readonly int poolSize;
readonly int completionQueueCount;
+ readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads
+
bool stopRequested;
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
@@ -82,7 +85,8 @@ namespace Grpc.Core.Internal
for (int i = 0; i < poolSize; i++)
{
- threads.Add(CreateAndStartThread(i));
+ var optionalProfiler = i < threadProfilers.Count ? threadProfilers[i] : null;
+ threads.Add(CreateAndStartThread(i, optionalProfiler));
}
}
}
@@ -111,6 +115,11 @@ namespace Grpc.Core.Internal
{
cq.Dispose();
}
+
+ for (int i = 0; i < threadProfilers.Count; i++)
+ {
+ threadProfilers[i].Dump(string.Format("grpc_trace_thread_{0}.txt", i));
+ }
});
}
@@ -137,12 +146,12 @@ namespace Grpc.Core.Internal
}
}
- private Thread CreateAndStartThread(int threadIndex)
+ private Thread CreateAndStartThread(int threadIndex, IProfiler optionalProfiler)
{
var cqIndex = threadIndex % completionQueues.Count;
var cq = completionQueues.ElementAt(cqIndex);
- var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
+ var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq, optionalProfiler)));
thread.IsBackground = true;
thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
@@ -153,8 +162,13 @@ namespace Grpc.Core.Internal
/// <summary>
/// Body of the polling thread.
/// </summary>
- private void RunHandlerLoop(CompletionQueueSafeHandle cq)
+ private void RunHandlerLoop(CompletionQueueSafeHandle cq, IProfiler optionalProfiler)
{
+ if (optionalProfiler != null)
+ {
+ Profilers.SetForCurrentThread(optionalProfiler);
+ }
+
CompletionQueueEvent ev;
do
{
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index dc9f62fdab..05dda5b148 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -48,22 +48,19 @@ namespace Grpc.Core.Internal
public static MetadataArraySafeHandle Create(Metadata metadata)
{
- using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create"))
+ if (metadata.Count == 0)
{
- if (metadata.Count == 0)
- {
- return new MetadataArraySafeHandle();
- }
+ return new MetadataArraySafeHandle();
+ }
- // TODO(jtattermusch): we might wanna check that the metadata is readonly
- var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
- for (int i = 0; i < metadata.Count; i++)
- {
- var valueBytes = metadata[i].GetSerializedValueUnsafe();
- Native.grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
- }
- return metadataArray;
+ // TODO(jtattermusch): we might wanna check that the metadata is readonly
+ var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
+ for (int i = 0; i < metadata.Count; i++)
+ {
+ var valueBytes = metadata[i].GetSerializedValueUnsafe();
+ Native.grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
}
+ return metadataArray;
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Profiling/Profilers.cs b/src/csharp/Grpc.Core/Profiling/Profilers.cs
index aa0d96c0e0..6afabff6a7 100644
--- a/src/csharp/Grpc.Core/Profiling/Profilers.cs
+++ b/src/csharp/Grpc.Core/Profiling/Profilers.cs
@@ -80,7 +80,7 @@ namespace Grpc.Core.Profiling
ProfilerEntry[] entries;
int count;
- public BasicProfiler() : this(1024*1024)
+ public BasicProfiler() : this(20*1024*1024)
{
}
diff --git a/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs
index eb3bb8a28b..d55e658d94 100644
--- a/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/MetadataCredentialsTest.cs
@@ -103,6 +103,34 @@ namespace Grpc.IntegrationTesting
client.UnaryCall(new SimpleRequest { }, new CallOptions(credentials: callCredentials));
}
+ [Test]
+ public void MetadataCredentials_InterceptorLeavesMetadataEmpty()
+ {
+ var channelCredentials = ChannelCredentials.Create(TestCredentials.CreateSslCredentials(),
+ CallCredentials.FromInterceptor(new AsyncAuthInterceptor((context, metadata) => TaskUtils.CompletedTask)));
+ channel = new Channel(Host, server.Ports.Single().BoundPort, channelCredentials, options);
+ client = new TestService.TestServiceClient(channel);
+
+ var ex = Assert.Throws<RpcException>(() => client.UnaryCall(new SimpleRequest { }));
+ // StatusCode.Unknown as the server-side handler throws an exception after not receiving the authorization header.
+ Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
+ }
+
+ [Test]
+ public void MetadataCredentials_InterceptorThrows()
+ {
+ var callCredentials = CallCredentials.FromInterceptor(new AsyncAuthInterceptor((context, metadata) =>
+ {
+ throw new Exception("Auth interceptor throws");
+ }));
+ var channelCredentials = ChannelCredentials.Create(TestCredentials.CreateSslCredentials(), callCredentials);
+ channel = new Channel(Host, server.Ports.Single().BoundPort, channelCredentials, options);
+ client = new TestService.TestServiceClient(channel);
+
+ var ex = Assert.Throws<RpcException>(() => client.UnaryCall(new SimpleRequest { }));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
+ }
+
private class FakeTestService : TestService.TestServiceBase
{
public override Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 0bf5b0acf3..37d04eb100 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -991,7 +991,11 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_metadata_credentials_notify_from_plugin(
grpc_credentials_plugin_metadata_cb cb,
void *user_data, grpc_metadata_array *metadata,
grpc_status_code status, const char *error_details) {
- cb(user_data, metadata->metadata, metadata->count, status, error_details);
+ if (metadata) {
+ cb(user_data, metadata->metadata, metadata->count, status, error_details);
+ } else {
+ cb(user_data, NULL, 0, status, error_details);
+ }
}
typedef void(GPR_CALLTYPE *grpcsharp_metadata_interceptor_func)(
diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc
index 017d7962c3..fc339fc462 100644
--- a/src/node/ext/byte_buffer.cc
+++ b/src/node/ext/byte_buffer.cc
@@ -37,7 +37,7 @@
#include <nan.h>
#include "grpc/grpc.h"
#include "grpc/byte_buffer_reader.h"
-#include "grpc/support/slice.h"
+#include "grpc/slice.h"
#include "byte_buffer.h"
diff --git a/src/node/src/common.js b/src/node/src/common.js
index c6c6d597a8..98eabf5c0b 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -141,8 +141,14 @@ exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service,
binaryAsBase64 = options.binaryAsBase64;
longsAsStrings = options.longsAsStrings;
}
- return _.fromPairs(_.map(service.children, function(method) {
- return [_.camelCase(method.name), {
+ /* This slightly awkward construction is used to make sure we only use
+ lodash@3.10.1-compatible functions. A previous version used
+ _.fromPairs, which would be cleaner, but was introduced in lodash
+ version 4 */
+ return _.zipObject(_.map(service.children, function(method) {
+ return _.camelCase(method.name);
+ }), _.map(service.children, function(method) {
+ return {
path: prefix + method.name,
requestStream: method.requestStream,
responseStream: method.responseStream,
@@ -150,11 +156,11 @@ exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service,
responseType: method.resolvedResponseType,
requestSerialize: serializeCls(method.resolvedRequestType.build()),
requestDeserialize: deserializeCls(method.resolvedRequestType.build(),
- binaryAsBase64, longsAsStrings),
+ binaryAsBase64, longsAsStrings),
responseSerialize: serializeCls(method.resolvedResponseType.build()),
responseDeserialize: deserializeCls(method.resolvedResponseType.build(),
- binaryAsBase64, longsAsStrings)
- }];
+ binaryAsBase64, longsAsStrings)
+ };
}));
};
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index b0041871d4..5affff10f5 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -99,7 +99,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/error.c',
'src/core/lib/iomgr/ev_epoll_linux.c',
- 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
@@ -121,6 +120,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/resolve_address_windows.c',
'src/core/lib/iomgr/resource_quota.c',
'src/core/lib/iomgr/sockaddr_utils.c',
+ 'src/core/lib/iomgr/socket_mutator.c',
'src/core/lib/iomgr/socket_utils_common_posix.c',
'src/core/lib/iomgr/socket_utils_linux.c',
'src/core/lib/iomgr/socket_utils_posix.c',
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index e214216ece..6c36df9113 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -189,6 +189,7 @@ grpc_slice_split_head_type grpc_slice_split_head_import;
gpr_empty_slice_type gpr_empty_slice_import;
grpc_slice_cmp_type grpc_slice_cmp_import;
grpc_slice_str_cmp_type grpc_slice_str_cmp_import;
+grpc_slice_is_equivalent_type grpc_slice_is_equivalent_import;
grpc_slice_buffer_init_type grpc_slice_buffer_init_import;
grpc_slice_buffer_destroy_type grpc_slice_buffer_destroy_import;
grpc_slice_buffer_add_type grpc_slice_buffer_add_import;
@@ -464,6 +465,7 @@ void grpc_rb_load_imports(HMODULE library) {
gpr_empty_slice_import = (gpr_empty_slice_type) GetProcAddress(library, "gpr_empty_slice");
grpc_slice_cmp_import = (grpc_slice_cmp_type) GetProcAddress(library, "grpc_slice_cmp");
grpc_slice_str_cmp_import = (grpc_slice_str_cmp_type) GetProcAddress(library, "grpc_slice_str_cmp");
+ grpc_slice_is_equivalent_import = (grpc_slice_is_equivalent_type) GetProcAddress(library, "grpc_slice_is_equivalent");
grpc_slice_buffer_init_import = (grpc_slice_buffer_init_type) GetProcAddress(library, "grpc_slice_buffer_init");
grpc_slice_buffer_destroy_import = (grpc_slice_buffer_destroy_type) GetProcAddress(library, "grpc_slice_buffer_destroy");
grpc_slice_buffer_add_import = (grpc_slice_buffer_add_type) GetProcAddress(library, "grpc_slice_buffer_add");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 7a64c1431d..5745686adf 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -518,6 +518,9 @@ extern grpc_slice_cmp_type grpc_slice_cmp_import;
typedef int(*grpc_slice_str_cmp_type)(grpc_slice a, const char *b);
extern grpc_slice_str_cmp_type grpc_slice_str_cmp_import;
#define grpc_slice_str_cmp grpc_slice_str_cmp_import
+typedef int(*grpc_slice_is_equivalent_type)(grpc_slice a, grpc_slice b);
+extern grpc_slice_is_equivalent_type grpc_slice_is_equivalent_import;
+#define grpc_slice_is_equivalent grpc_slice_is_equivalent_import
typedef void(*grpc_slice_buffer_init_type)(grpc_slice_buffer *sb);
extern grpc_slice_buffer_init_type grpc_slice_buffer_init_import;
#define grpc_slice_buffer_init grpc_slice_buffer_init_import
@@ -686,7 +689,7 @@ extern gpr_join_host_port_type gpr_join_host_port_import;
typedef int(*gpr_split_host_port_type)(const char *name, char **host, char **port);
extern gpr_split_host_port_type gpr_split_host_port_import;
#define gpr_split_host_port gpr_split_host_port_import
-typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPRC_PRINT_FORMAT_CHECK(4, 5);
+typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
extern gpr_log_type gpr_log_import;
#define gpr_log gpr_log_import
typedef void(*gpr_log_message_type)(const char *file, int line, gpr_log_severity severity, const char *message);
@@ -707,7 +710,7 @@ extern gpr_format_message_type gpr_format_message_import;
typedef char *(*gpr_strdup_type)(const char *src);
extern gpr_strdup_type gpr_strdup_import;
#define gpr_strdup gpr_strdup_import
-typedef int(*gpr_asprintf_type)(char **strp, const char *format, ...) GPRC_PRINT_FORMAT_CHECK(2, 3);
+typedef int(*gpr_asprintf_type)(char **strp, const char *format, ...) GPR_PRINT_FORMAT_CHECK(2, 3);
extern gpr_asprintf_type gpr_asprintf_import;
#define gpr_asprintf gpr_asprintf_import
typedef const char *(*gpr_subprocess_binary_extension_type)();