aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/client_config/client_channel.c24
-rw-r--r--src/core/ext/client_config/lb_policy.c10
-rw-r--r--src/core/ext/client_config/lb_policy.h60
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h13
-rw-r--r--src/core/ext/client_config/subchannel.c18
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c392
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.c5
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.h1
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c10
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h32
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c31
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c100
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c10
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c10
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c766
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h54
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c9
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c21
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c111
20 files changed, 951 insertions, 738 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 76c2f38a5d..3ed66a3313 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -399,13 +399,15 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
- grpc_transport_stream_op *waiting_ops;
+ grpc_transport_stream_op **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
grpc_closure next_step;
grpc_call_stack *owning_call;
+
+ grpc_linked_mdelem lb_token_mdelem;
} call_data;
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
@@ -416,7 +418,7 @@ static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
gpr_realloc(calld->waiting_ops,
calld->waiting_ops_capacity * sizeof(*calld->waiting_ops));
}
- calld->waiting_ops[calld->waiting_ops_count++] = *op;
+ calld->waiting_ops[calld->waiting_ops_count++] = op;
GPR_TIMER_END("add_waiting_locked", 0);
}
@@ -425,14 +427,14 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
size_t i;
for (i = 0; i < calld->waiting_ops_count; i++) {
grpc_transport_stream_op_finish_with_failure(
- exec_ctx, &calld->waiting_ops[i], GRPC_ERROR_REF(error));
+ exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error));
}
calld->waiting_ops_count = 0;
GRPC_ERROR_UNREF(error);
}
typedef struct {
- grpc_transport_stream_op *ops;
+ grpc_transport_stream_op **ops;
size_t nops;
grpc_subchannel_call *call;
} retry_ops_args;
@@ -441,7 +443,7 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
}
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
gpr_free(a->ops);
@@ -449,6 +451,10 @@ static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
}
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
+ if (calld->waiting_ops_count == 0) {
+ return;
+ }
+
retry_ops_args *a = gpr_malloc(sizeof(*a));
a->ops = calld->waiting_ops;
a->nops = calld->waiting_ops_count;
@@ -582,9 +588,11 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
- initial_metadata, initial_metadata_flags,
- connected_subchannel, on_ready);
+ const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
+ initial_metadata_flags,
+ &calld->lb_token_mdelem};
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
+ NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
return r;
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 3e498f5f56..46391272a6 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -100,13 +100,11 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data,
+ on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index eb07ad3360..7fb3e08cb3 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -53,23 +53,38 @@ struct grpc_lb_policy {
grpc_pollset_set *interested_parties;
};
+/** Extra arguments for an LB pick */
+typedef struct grpc_lb_policy_pick_args {
+ /** Parties interested in the pick's progress */
+ grpc_polling_entity *pollent;
+ /** Initial metadata associated with the picking call. */
+ grpc_metadata_batch *initial_metadata;
+ /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
+ uint32_t initial_metadata_flags;
+ /** Storage for LB token in \a initial_metadata, or NULL if not used */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+} grpc_lb_policy_pick_args;
+
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** implement grpc_lb_policy_pick */
+ /** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target, grpc_closure *on_complete);
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
+ grpc_closure *on_complete);
+
+ /** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target, grpc_error *error);
+
+ /** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, grpc_error *error);
+ /** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -83,8 +98,7 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
- state cancels the subscription.
- */
+ state cancels the subscription. */
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
@@ -124,27 +138,35 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
-/** Given initial metadata in \a initial_metadata, find an appropriate
- target for this rpc, and 'return' it by calling \a on_complete after setting
- \a target.
- Picking can be asynchronous. Any IO should be done under \a pollent. */
+/** Find an appropriate target for this call, based on \a pick_args.
+ Picking can be synchronous or asynchronous. In the synchronous case, when a
+ pick is readily available, it'll be returned in \a target and a non-zero
+ value will be returned.
+ In the asynchronous case, zero is returned and \a on_complete will be called
+ once \a target and \a user_data have been set. Any IO should be done under
+ \a pick_args->pollent. The opaque \a user_data output argument corresponds
+ to information that may need be propagated from the LB policy. It may be
+ NULL. Errors are signaled by receiving a NULL \a *target. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete);
+/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+ against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
+/** Cancel picks for \a target.
+ The \a on_complete callback of the pending picks will be invoked with \a
+ *target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target,
grpc_error *error);
-/** Cancel all pending picks which have:
- (initial_metadata_flags & initial_metadata_flags_mask) ==
- initial_metadata_flags_eq */
+/** Cancel all pending picks for which their \a initial_metadata_flags (as given
+ in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
+ when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index da1de3579a..7191ca7d89 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -47,8 +47,19 @@ struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable *vtable;
};
+/** A resolved address alongside any LB related information associated with it.
+ * \a user_data, if not NULL, contains opaque data meant to be consumed by the
+ * gRPC LB policy. Note that no all LB policies support \a user_data as input.
+ * Those who don't will simply ignore it and will correspondingly return NULL in
+ * their namesake pick() output argument. */
+typedef struct grpc_lb_address {
+ grpc_resolved_address *resolved_address;
+ void *user_data;
+} grpc_lb_address;
+
typedef struct grpc_lb_policy_args {
- grpc_resolved_addresses *addresses;
+ grpc_lb_address *addresses;
+ size_t num_addresses;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 8f4a2f9e3e..53ba8aeca6 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -504,14 +504,13 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *closure) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.connectivity_state = state;
- op.on_connectivity_state_change = closure;
- op.bind_pollset_set = interested_parties;
+ op->connectivity_state = state;
+ op->on_connectivity_state_change = closure;
+ op->bind_pollset_set = interested_parties;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
void grpc_connected_subchannel_notify_on_state_change(
@@ -525,12 +524,11 @@ void grpc_connected_subchannel_notify_on_state_change(
void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
grpc_connected_subchannel *con,
grpc_closure *closure) {
- grpc_transport_op op;
+ grpc_transport_op *op = grpc_make_transport_op(NULL);
grpc_channel_element *elem;
- memset(&op, 0, sizeof(op));
- op.send_ping = closure;
+ op->send_ping = closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
}
static void publish_transport_locked(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 550f5c5972..874e26834a 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -76,9 +76,9 @@
* operations in progress over the old RR instance. This is done by
* decreasing the reference count on the old policy. The moment no more
* references are held on the old RR policy, it'll be destroyed and \a
- * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
- * At this point we can transition to a new RR instance safely, which is done
- * once again via \a rr_handover().
+ * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
+ * state. At this point we can transition to a new RR instance safely, which
+ * is done once again via \a rr_handover().
*
*
* Once a RR policy instance is in place (and getting updated as described),
@@ -96,6 +96,8 @@
* - Implement LB service forwarding (point 2c. in the doc's diagram).
*/
+#include <errno.h>
+
#include <string.h>
#include <grpc/byte_buffer_reader.h>
@@ -109,18 +111,57 @@
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/static_metadata.h"
int grpc_lb_glb_trace = 0;
+static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
+ size_t num_addresses) {
+ /* free "resolved" addresses memblock */
+ gpr_free(lb_addresses->resolved_address);
+ for (size_t i = 0; i < num_addresses; ++i) {
+ if (lb_addresses[i].user_data != NULL) {
+ GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
+ }
+ }
+ gpr_free(lb_addresses);
+}
+
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static void initial_metadata_add_lb_token(
+ grpc_metadata_batch *initial_metadata,
+ grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
+ GPR_ASSERT(lb_token_mdelem_storage != NULL);
+ GPR_ASSERT(lb_token != NULL);
+ grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+ lb_token);
+}
+
typedef struct wrapped_rr_closure_arg {
/* the original closure. Usually a on_complete/notify cb for pick() and ping()
* calls against the internal RR instance, respectively. */
grpc_closure *wrapped_closure;
+ /* the pick's initial metadata, kept in order to append the LB token for the
+ * pick */
+ grpc_metadata_batch *initial_metadata;
+
+ /* the picked target, used to determine which LB token to add to the pick's
+ * initial metadata */
+ grpc_connected_subchannel **target;
+
+ /* the LB token associated with the pick */
+ grpc_mdelem *lb_token;
+
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* The RR instance related to the closure */
grpc_lb_policy *rr_policy;
@@ -141,9 +182,20 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
(intptr_t)wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
+
+ /* if target is NULL, no pick has been made by the RR policy (eg, all
+ * addresses failed to connect). There won't be any user_data/token
+ * available */
+ if (wc_arg->target != NULL) {
+ initial_metadata_add_lb_token(wc_arg->initial_metadata,
+ wc_arg->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(wc_arg->lb_token));
+ }
}
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
- grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
+
+ grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
+ NULL);
gpr_free(wc_arg->owning_pending_node);
}
@@ -164,6 +216,9 @@ typedef struct pending_pick {
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
@@ -180,20 +235,24 @@ typedef struct pending_pick {
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
-static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+static void add_pending_pick(pending_pick **root,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pending_pick *pp = gpr_malloc(sizeof(*pp));
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata = initial_metadata;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata = pick_args->initial_metadata;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
+ pp->wrapped_on_complete_arg.target = target;
+ pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
+ pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
+ pick_args->lb_token_mdelem_storage;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
&pp->wrapped_on_complete_arg);
*root = pp;
@@ -252,6 +311,12 @@ typedef struct glb_lb_policy {
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
+ /** total number of valid addresses received in \a serverlist */
+ size_t num_ok_serverlist_addresses;
+
+ /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
+ grpc_lb_address *lb_addresses;
+
/** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks;
@@ -279,58 +344,142 @@ struct rr_connectivity_data {
glb_lb_policy *glb_policy;
};
-static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
- const grpc_grpclb_serverlist *serverlist,
- glb_lb_policy *glb_policy) {
- /* TODO(dgq): support mixed ip version */
- GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
- char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
- for (size_t i = 0; i < serverlist->num_servers; ++i) {
- gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
- serverlist->servers[i]->port);
+static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
+ bool log) {
+ const grpc_grpclb_ip_address *ip = &server->ip_address;
+ if (server->port >> 16 != 0) {
+ if (log) {
+ gpr_log(GPR_ERROR,
+ "Invalid port '%d' at index %zu of serverlist. Ignoring.",
+ server->port, idx);
+ }
+ return false;
}
- size_t uri_path_len;
- char *concat_ipports = gpr_strjoin_sep(
- (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
+ if (ip->size != 4 && ip->size != 16) {
+ if (log) {
+ gpr_log(GPR_ERROR,
+ "Expected IP to be 4 or 16 bytes, got %d at index %zu of "
+ "serverlist. Ignoring",
+ ip->size, idx);
+ }
+ return false;
+ }
+ return true;
+}
- grpc_lb_policy_args args;
- args.client_channel_factory = glb_policy->cc_factory;
- args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
- args.addresses->naddrs = serverlist->num_servers;
- args.addresses->addrs =
- gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
- size_t out_addrs_idx = 0;
+/* populate \a addresses according to \a serverlist. Returns the number of
+ * addresses successfully parsed and added to \a addresses */
+static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
+ grpc_lb_address **lb_addresses) {
+ size_t num_valid = 0;
+ /* first pass: count how many are valid in order to allocate the necessary
+ * memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- grpc_uri uri;
- struct sockaddr_storage sa;
- size_t sa_len;
- uri.path = host_ports[i];
- if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
- memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
- args.addresses->addrs[out_addrs_idx].len = sa_len;
- ++out_addrs_idx;
+ if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
+ }
+ if (num_valid == 0) {
+ return 0;
+ }
+
+ /* allocate the memory block for the "resolved" addresses. */
+ grpc_resolved_address *r_addrs_memblock =
+ gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
+ memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
+ grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
+ memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
+
+ /* second pass: actually populate the addresses and LB tokens (aka user data
+ * to the outside world) to be read by the RR policy during its creation.
+ * Given that the validity tests are very cheap, they are performed again
+ * instead of marking the valid ones during the first pass, as this would
+ * incurr in an allocation due to the arbitrary number of server */
+ size_t addr_idx = 0;
+ for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
+ GPR_ASSERT(addr_idx < num_valid);
+ const grpc_grpclb_server *server = serverlist->servers[sl_idx];
+ if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+ grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
+
+ /* address processing */
+ const uint16_t netorder_port = htons((uint16_t)server->port);
+ /* the addresses are given in binary format (a in(6)_addr struct) in
+ * server->ip_address.bytes. */
+ const grpc_grpclb_ip_address *ip = &server->ip_address;
+
+ lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
+ struct sockaddr_storage *sa =
+ (struct sockaddr_storage *)lb_addr->resolved_address->addr;
+ size_t *sa_len = &lb_addr->resolved_address->len;
+ *sa_len = 0;
+ if (ip->size == 4) {
+ struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
+ *sa_len = sizeof(struct sockaddr_in);
+ memset(addr4, 0, *sa_len);
+ addr4->sin_family = AF_INET;
+ memcpy(&addr4->sin_addr, ip->bytes, ip->size);
+ addr4->sin_port = netorder_port;
+ } else if (ip->size == 16) {
+ struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
+ *sa_len = sizeof(struct sockaddr_in6);
+ memset(addr6, 0, *sa_len);
+ addr6->sin6_family = AF_INET;
+ memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
+ addr6->sin6_port = netorder_port;
+ }
+ GPR_ASSERT(*sa_len > 0);
+
+ /* lb token processing */
+ if (server->has_load_balance_token) {
+ const size_t lb_token_size =
+ GPR_ARRAY_SIZE(server->load_balance_token) - 1;
+ grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
+ (uint8_t *)server->load_balance_token, lb_token_size);
+ lb_addr->user_data = grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
} else {
- gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
- host_ports[i]);
+ gpr_log(GPR_ERROR,
+ "Missing LB token for backend address '%s'. The empty token will "
+ "be used instead",
+ grpc_sockaddr_to_uri((struct sockaddr *)sa));
+ lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
}
+ ++addr_idx;
}
+ GPR_ASSERT(addr_idx == num_valid);
+ *lb_addresses = lb_addrs;
+ return num_valid;
+}
+
+static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
+ const grpc_grpclb_serverlist *serverlist,
+ glb_lb_policy *glb_policy) {
+ GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
+
+ grpc_lb_policy_args args;
+ memset(&args, 0, sizeof(args));
+ args.client_channel_factory = glb_policy->cc_factory;
+ const size_t num_ok_addresses =
+ process_serverlist(serverlist, &args.addresses);
+ args.num_addresses = num_ok_addresses;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- gpr_free(concat_ipports);
- for (size_t i = 0; i < serverlist->num_servers; i++) {
- gpr_free(host_ports[i]);
+ if (glb_policy->lb_addresses != NULL) {
+ /* dispose of the previous version */
+ lb_addrs_destroy(glb_policy->lb_addresses,
+ glb_policy->num_ok_serverlist_addresses);
}
- gpr_free(host_ports);
- gpr_free(args.addresses->addrs);
- gpr_free(args.addresses);
+ glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
+ glb_policy->lb_addresses = args.addresses;
+
return rr;
}
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_error *error) {
- GRPC_ERROR_REF(error);
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
glb_policy->rr_policy =
create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
@@ -345,8 +494,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
&glb_policy->rr_connectivity->on_change);
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- glb_policy->rr_connectivity->state, error,
- "rr_handover");
+ glb_policy->rr_connectivity->state,
+ GRPC_ERROR_REF(error), "rr_handover");
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
/* flush pending ops */
@@ -359,9 +508,12 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
- pp->initial_metadata, pp->initial_metadata_flags,
- pp->target, &pp->wrapped_on_complete);
+ const grpc_lb_policy_pick_args pick_args = {
+ pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
+ pp->lb_token_mdelem_storage};
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
+ (void **)&pp->wrapped_on_complete_arg.lb_token,
+ &pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
}
@@ -378,13 +530,13 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
&pping->wrapped_notify);
pping->wrapped_notify_arg.owning_pending_node = pping;
}
- GRPC_ERROR_UNREF(error);
}
-static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
+
if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
if (glb_policy->serverlist != NULL) {
/* a RR policy is shutting down but there's a serverlist available ->
@@ -398,8 +550,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (error == GRPC_ERROR_NONE) {
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- rr_conn_data->state, error,
- "rr_connectivity_changed");
+ rr_conn_data->state, GRPC_ERROR_REF(error),
+ "glb_rr_connectivity_changed");
/* resubscribe */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
@@ -408,7 +560,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(rr_conn_data);
}
}
- GRPC_ERROR_UNREF(error);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -418,31 +569,43 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
memset(glb_policy, 0, sizeof(*glb_policy));
/* All input addresses in args->addresses come from a resolver that claims
- * they are LB services. It's the resolver's responsibility to make sure this
+ * they are LB services. It's the resolver's responsibility to make sure
+ * this
* policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
- if (args->addresses->naddrs == 0) {
+ if (args->num_addresses == 0) {
return NULL;
}
- /* construct a target from the args->addresses, in the form
+ if (args->addresses[0].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+
+ /* construct a target from the addresses in args, given in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
- addr_strs[0] =
- grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
- for (size_t i = 1; i < args->addresses->naddrs; i++) {
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_strs[i],
- (const struct sockaddr *)&args->addresses->addrs[i],
- true) == 0);
+ char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
+ addr_strs[0] = grpc_sockaddr_to_uri(
+ (const struct sockaddr *)&args->addresses[0].resolved_address->addr);
+ for (size_t i = 1; i < args->num_addresses; i++) {
+ if (args->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+
+ GPR_ASSERT(
+ grpc_sockaddr_to_string(
+ &addr_strs[i],
+ (const struct sockaddr *)&args->addresses[i].resolved_address->addr,
+ true) == 0);
}
size_t uri_path_len;
char *target_uri_str = gpr_strjoin_sep(
- (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
+ (const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
/* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
@@ -450,7 +613,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
gpr_free(target_uri_str);
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < args->num_addresses; i++) {
gpr_free(addr_strs[i]);
}
gpr_free(addr_strs);
@@ -463,7 +626,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
- grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
+ grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
rr_connectivity);
rr_connectivity->glb_policy = glb_policy;
glb_policy->rr_connectivity = rr_connectivity;
@@ -486,6 +649,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
gpr_mu_destroy(&glb_policy->mu);
+
+ lb_addrs_destroy(glb_policy->lb_addresses,
+ glb_policy->num_ok_serverlist_addresses);
gpr_free(glb_policy);
}
@@ -609,12 +775,21 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+
+ if (pick_args->lb_token_mdelem_storage == NULL) {
+ *target = NULL;
+ grpc_exec_ctx_sched(
+ exec_ctx, on_complete,
+ GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
+ "won't work without it. Failing"),
+ NULL);
+ return 1;
+ }
+
gpr_mu_lock(&glb_policy->mu);
int r;
@@ -626,29 +801,36 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
+ glb_policy->wc_arg.target = target;
glb_policy->wc_arg.wrapped_closure = on_complete;
+ glb_policy->wc_arg.lb_token_mdelem_storage =
+ pick_args->lb_token_mdelem_storage;
+ glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
+ glb_policy->wc_arg.owning_pending_node = NULL;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
- r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
- initial_metadata, initial_metadata_flags, target,
+
+ r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
+ (void **)&glb_policy->wc_arg.lb_token,
&glb_policy->wrapped_on_complete);
if (r != 0) {
- /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
- * policy and notify the original callback */
- glb_policy->wc_arg.wrapped_closure = NULL;
+ /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
(intptr_t)glb_policy->wc_arg.rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
- grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
- GRPC_ERROR_NONE, NULL);
+
+ /* add the load reporting initial metadata */
+ initial_metadata_add_lb_token(
+ pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
}
} else {
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties);
- add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ add_pending_pick(&glb_policy->pending_picks, pick_args, target,
+ on_complete);
if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy);
@@ -708,9 +890,6 @@ typedef struct lb_client_data {
/* called once initial metadata's been sent */
grpc_closure md_sent;
- /* called once initial metadata's been received */
- grpc_closure md_rcvd;
-
/* called once the LoadBalanceRequest has been sent to the LB server. See
* src/proto/grpc/.../load_balancer.proto */
grpc_closure req_sent;
@@ -747,7 +926,6 @@ typedef struct lb_client_data {
} lb_client_data;
static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
@@ -762,7 +940,6 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
gpr_mu_init(&lb_client->mu);
grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
- grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
@@ -861,23 +1038,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
- op->flags = 0;
- op->reserved = NULL;
- op++;
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
- &lb_client->md_rcvd);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
-}
-
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- lb_client_data *lb_client = arg;
- GPR_ASSERT(lb_client->lb_call);
- grpc_op ops[1];
- memset(ops, 0, sizeof(ops));
- grpc_op *op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = lb_client->request_payload;
@@ -892,11 +1052,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lb_client = arg;
+ GPR_ASSERT(lb_client->lb_call);
- grpc_op ops[1];
+ grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &lb_client->response_payload;
op->flags = 0;
@@ -915,8 +1082,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_op *op = ops;
if (lb_client->response_payload != NULL) {
/* Received data from the LB server. Look inside
- * lb_client->response_payload, for
- * a serverlist. */
+ * lb_client->response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
@@ -953,7 +1119,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see
- * rr_connectivity_changed) */
+ * glb_rr_connectivity_changed) */
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
"serverlist_received");
}
@@ -1016,8 +1182,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
lb_client->status, lb_client->status_details,
lb_client->status_details_capacity);
}
- /* TODO(dgq): deal with stream termination properly (fire up another one? fail
- * the original call?) */
+ /* TODO(dgq): deal with stream termination properly (fire up another one?
+ * fail the original call?) */
}
/* Code wiring the policy with the rest of the core */
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index f4720a1345..a8881004a0 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -57,6 +57,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
if (dec_arg->first_pass) { /* count how many server do we have */
grpc_grpclb_server server;
if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->num_servers++;
@@ -69,6 +70,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
}
if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->servers[dec_arg->decoding_idx++] = server;
@@ -118,6 +120,7 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
grpc_grpclb_response res;
memset(&res, 0, sizeof(grpc_grpclb_response));
if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
grpc_grpclb_initial_response *initial_res =
@@ -145,6 +148,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
arg.first_pass = true;
status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
@@ -152,6 +156,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
status =
pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index 9726c87a37..c1e73d08ef 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -45,6 +45,7 @@ extern "C" {
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
+typedef grpc_lb_v1_Server_ip_address_t grpc_grpclb_ip_address;
typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
typedef grpc_lb_v1_Server grpc_grpclb_server;
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index 52e11c40bb..afecb716fb 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -31,10 +31,11 @@
*
*/
/* Automatically generated nanopb constant definitions */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -72,8 +73,8 @@ const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3] = {
};
const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
- PB_FIELD( 2, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
- PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
+ PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
@@ -84,7 +85,7 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
};
const pb_field_t grpc_lb_v1_Server_fields[5] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
+ PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
@@ -116,3 +117,4 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
#endif
+/* @@protoc_insertion_point(eof) */
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 46fe588f72..53fed22bae 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -31,11 +31,12 @@
*
*/
/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
-#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
-#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
+#ifndef PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
+#define PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
#include "third_party/nanopb/pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -52,6 +53,7 @@ typedef struct _grpc_lb_v1_ClientStats {
int64_t client_rpc_errors;
bool has_dropped_requests;
int64_t dropped_requests;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
typedef struct _grpc_lb_v1_Duration {
@@ -59,22 +61,26 @@ typedef struct _grpc_lb_v1_Duration {
int64_t seconds;
bool has_nanos;
int32_t nanos;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Duration) */
} grpc_lb_v1_Duration;
typedef struct _grpc_lb_v1_InitialLoadBalanceRequest {
bool has_name;
char name[128];
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceRequest) */
} grpc_lb_v1_InitialLoadBalanceRequest;
+typedef PB_BYTES_ARRAY_T(16) grpc_lb_v1_Server_ip_address_t;
typedef struct _grpc_lb_v1_Server {
bool has_ip_address;
- char ip_address[46];
+ grpc_lb_v1_Server_ip_address_t ip_address;
bool has_port;
int32_t port;
bool has_load_balance_token;
- char load_balance_token[64];
+ char load_balance_token[65];
bool has_drop_request;
bool drop_request;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
@@ -82,6 +88,7 @@ typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
char load_balancer_delegate[64];
bool has_client_stats_report_interval;
grpc_lb_v1_Duration client_stats_report_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
} grpc_lb_v1_InitialLoadBalanceResponse;
typedef struct _grpc_lb_v1_LoadBalanceRequest {
@@ -89,12 +96,14 @@ typedef struct _grpc_lb_v1_LoadBalanceRequest {
grpc_lb_v1_InitialLoadBalanceRequest initial_request;
bool has_client_stats;
grpc_lb_v1_ClientStats client_stats;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */
} grpc_lb_v1_LoadBalanceRequest;
typedef struct _grpc_lb_v1_ServerList {
pb_callback_t servers;
bool has_expiration_interval;
grpc_lb_v1_Duration expiration_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
} grpc_lb_v1_ServerList;
typedef struct _grpc_lb_v1_LoadBalanceResponse {
@@ -102,6 +111,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
grpc_lb_v1_InitialLoadBalanceResponse initial_response;
bool has_server_list;
grpc_lb_v1_ServerList server_list;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceResponse) */
} grpc_lb_v1_LoadBalanceResponse;
/* Default values for struct fields */
@@ -114,7 +124,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
@@ -122,7 +132,7 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
#define grpc_lb_v1_ClientStats_total_requests_tag 1
@@ -135,8 +145,8 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
#define grpc_lb_v1_Server_drop_request_tag 4
-#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
-#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
+#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
+#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
#define grpc_lb_v1_ServerList_servers_tag 1
@@ -161,7 +171,8 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5];
#define grpc_lb_v1_ClientStats_size 33
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
-#define grpc_lb_v1_Server_size 127
+/* grpc_lb_v1_ServerList_size depends on runtime parameters */
+#define grpc_lb_v1_Server_size 98
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
@@ -174,5 +185,6 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5];
#ifdef __cplusplus
} /* extern "C" */
#endif
+/* @@protoc_insertion_point(eof) */
#endif
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 5962ec3327..3c863affb3 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -205,10 +205,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@@ -231,13 +229,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
@@ -449,20 +447,25 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->addresses->naddrs == 0) return NULL;
+ if (args->num_addresses == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < args->num_addresses; i++) {
+ if (args->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
- sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+ sc_args.addr =
+ (struct sockaddr *)(args->addresses[i].resolved_address->addr);
+ sc_args.addr_len = (size_t)args->addresses[i].resolved_address->len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 654fcf2d2a..ad89491167 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -66,6 +66,7 @@
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/static_metadata.h"
typedef struct round_robin_lb_policy round_robin_lb_policy;
@@ -76,15 +77,32 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
+
+ /* polling entity for the pick()'s async notification */
grpc_polling_entity *pollent;
+
+ /* output argument where to store the pick()ed user_data. It'll be NULL if no
+ * such data is present or there's an error (the definite test for errors is
+ * \a target being NULL). */
+ void **user_data;
+
+ /* bitmask passed to pick() and used for selective cancelling. See
+ * grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
+
+ /* output argument where to store the pick()ed connected subchannel, or NULL
+ * upon error. */
grpc_connected_subchannel **target;
+
+ /* to be invoked once the pick() has completed (regardless of success) */
grpc_closure *on_complete;
} pending_pick;
/** List of subchannels in a connectivity READY state */
typedef struct ready_list {
grpc_subchannel *subchannel;
+ /* references namesake entry in subchannel_data */
+ void *user_data;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@@ -102,12 +120,21 @@ typedef struct {
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
+ /** the subchannel's target user data */
+ void *user_data;
} subchannel_data;
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
+ /** total number of addresses received at creation time */
+ size_t num_addresses;
+ /** array holding the borrowed and opaque pointers to incoming user data, one
+ * per incoming address. These individual pointers will be returned as-is in
+ * successful picks. */
+ void **user_data_pointers;
+
/** all our subchannels */
size_t num_subchannels;
subchannel_data **subchannels;
@@ -166,16 +193,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
- p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
+ (void *)p->ready_list_last_pick,
+ (void *)p->ready_list_last_pick->subchannel);
}
}
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *sc) {
+ subchannel_data *sd) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = sc;
+ memset(new_elem, 0, sizeof(ready_list));
+ new_elem->subchannel = sd->subchannel;
+ new_elem->user_data = sd->user_data;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -189,7 +219,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
+ (void *)new_elem, (void *)sd->subchannel);
}
return new_elem;
}
@@ -216,8 +247,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
- node->subchannel);
+ gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", (void *)node,
+ (void *)node->subchannel);
}
node->next = NULL;
@@ -229,9 +260,8 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
- size_t i;
ready_list *elem;
- for (i = 0; i < p->num_subchannels; i++) {
+ for (size_t i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin");
gpr_free(sd);
@@ -251,6 +281,8 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(elem);
elem = tmp;
}
+
+ gpr_free(p->user_data_pointers);
gpr_free(p);
}
@@ -343,7 +375,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
p->started_picking = 1;
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
p->num_subchannels);
}
@@ -367,38 +399,43 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
- grpc_connected_subchannel **target,
+ const grpc_lb_policy_pick_args *pick_args,
+ grpc_connected_subchannel **target, void **user_data,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
+ /* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+
+ if (user_data != NULL) {
+ *user_data = selected->user_data;
+ }
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
- selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
+ (void *)*target, (void *)selected);
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
return 1;
} else {
+ /* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
pp->on_complete = on_complete;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->user_data = user_data;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -427,7 +464,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
+ sd->ready_list_node = add_connected_sc_locked(p, sd);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@@ -439,12 +476,16 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
+
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ if (pp->user_data != NULL) {
+ *pp->user_data = selected->user_data;
+ }
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- selected->subchannel, selected);
+ (void *)selected->subchannel, (void *)selected);
}
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
@@ -576,20 +617,25 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
+ if (args->num_addresses == 0) return NULL;
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->num_addresses = args->num_addresses;
+ p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
+ p->user_data_pointers = gpr_malloc(sizeof(void *) * p->num_addresses);
+ memset(p->user_data_pointers, 0, sizeof(void *) * p->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
- sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+ sc_args.addr = (struct sockaddr *)args->addresses[i].resolved_address->addr;
+ sc_args.addr_len = args->addresses[i].resolved_address->len;
+
+ p->user_data_pointers[i] = args->addresses[i].user_data;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
@@ -601,12 +647,14 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ sd->user_data = p->user_data_pointers[i];
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
}
if (subchannel_idx == 0) {
+ /* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 79682e78b5..32e9de69a6 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -175,10 +175,18 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_args lb_policy_args;
result = grpc_resolver_result_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.addresses = addresses;
+ lb_policy_args.num_addresses = addresses->naddrs;
+ lb_policy_args.addresses =
+ gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
+ memset(lb_policy_args.addresses, 0,
+ sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
+ for (size_t i = 0; i < addresses->naddrs; ++i) {
+ lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i];
+ }
lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
+ gpr_free(lb_policy_args.addresses);
if (lb_policy != NULL) {
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 3807522d2b..425285287c 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -125,10 +125,18 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_resolver_result *result = grpc_resolver_result_create();
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.addresses = r->addresses;
+ lb_policy_args.num_addresses = r->addresses->naddrs;
+ lb_policy_args.addresses =
+ gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
+ memset(lb_policy_args.addresses, 0,
+ sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
+ for (size_t i = 0; i < lb_policy_args.num_addresses; ++i) {
+ lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i];
+ }
lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
+ gpr_free(lb_policy_args.addresses);
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 00999e3b94..53da0e5d70 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -89,13 +89,20 @@ static const grpc_transport_vtable vtable;
static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
-static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
+static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *t,
+ grpc_error *error);
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_error *error,
- const char *reason);
+ grpc_chttp2_transport *t, grpc_error *error);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -105,11 +112,6 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
-/** Perform a transport_op */
-static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *transport_op);
-
/** Cancel a stream: coming from the transport API */
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@@ -121,22 +123,10 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_error *error);
-/** Add endpoint from this transport to pollset */
-static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored, void *pollset);
-static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *pollset_set);
-
/** Start new streams that have been created if we can */
static void maybe_start_some_streams(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
-static void finish_global_actions(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t);
-
static void connectivity_state_set(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_connectivity_state state, grpc_error *error, const char *reason);
@@ -149,14 +139,16 @@ static void incoming_byte_stream_update_flow_control(
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *byte_stream);
+ void *byte_stream,
+ grpc_error *error_ignored);
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_error *error);
+static void set_write_state(grpc_chttp2_transport *t,
+ grpc_chttp2_write_state state, const char *reason);
+
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -165,9 +157,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
size_t i;
- gpr_mu_lock(&t->executor.mu);
-
- GPR_ASSERT(t->ep == NULL);
+ grpc_endpoint_destroy(exec_ctx, t->ep);
gpr_slice_buffer_destroy(&t->global.qbuf);
@@ -191,8 +181,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- gpr_mu_unlock(&t->executor.mu);
- gpr_mu_destroy(&t->executor.mu);
+ grpc_combiner_destroy(exec_ctx, t->executor.combiner);
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
@@ -250,12 +239,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
memset(t, 0, sizeof(*t));
t->base.vtable = &vtable;
+ t->executor.write_state = GRPC_CHTTP2_WRITES_CORKED;
t->ep = ep;
- /* one ref is for destroy, the other for when ep becomes NULL */
- gpr_ref_init(&t->refs, 2);
+ /* one ref is for destroy */
+ gpr_ref_init(&t->refs, 1);
/* ref is dropped at transport close() */
gpr_ref_init(&t->shutdown_ep_refs, 1);
- gpr_mu_init(&t->executor.mu);
+ t->executor.combiner = grpc_combiner_create(grpc_endpoint_get_workqueue(ep));
t->peer_string = grpc_endpoint_get_peer(ep);
t->endpoint_reading = 1;
t->global.next_stream_id = is_client ? 1 : 2;
@@ -281,23 +271,22 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
grpc_closure_init(&t->writing_action, writing_action, t);
grpc_closure_init(&t->reading_action, reading_action, t);
+ grpc_closure_init(&t->reading_action_locked, reading_action_locked, t);
grpc_closure_init(&t->parsing_action, parsing_action, t);
- grpc_closure_init(&t->initiate_writing, initiate_writing, t);
+ grpc_closure_init(&t->post_parse_locked, post_parse_locked, t);
+ grpc_closure_init(&t->initiate_writing, initiate_writing_locked, t);
+ grpc_closure_init(&t->terminate_writing, terminate_writing_with_lock, t);
+ grpc_closure_init(&t->initiate_read_flush_locked, initiate_read_flush_locked,
+ t);
+ grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+ &t->writing);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser);
- grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
- &t->writing);
gpr_slice_buffer_init(&t->read_buffer);
- if (is_client) {
- gpr_slice_buffer_add(
- &t->global.qbuf,
- gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
- grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
- }
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn't waste memory for infrequently used connections, yet
large enough that the exponential growth should happen nicely when it's
@@ -320,6 +309,13 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->global.sent_local_settings = 0;
+ if (is_client) {
+ gpr_slice_buffer_add(
+ &t->writing.outbuf,
+ gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write");
+ }
+
/* configure http2 the way we like it */
if (is_client) {
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
@@ -424,47 +420,39 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
}
+
+ set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "uncork");
+ grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "init");
}
-static void destroy_transport_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *arg_ignored) {
+static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
t->destroying = 1;
drop_connection(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
+ UNREF_TRANSPORT(exec_ctx, t, "destroy");
}
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, destroy_transport_locked,
- NULL, 0);
- UNREF_TRANSPORT(exec_ctx, t, "destroy");
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ grpc_closure_create(destroy_transport_locked, t),
+ GRPC_ERROR_NONE);
}
/** block grpc_endpoint_shutdown being called until a paired
allow_endpoint_shutdown is made */
static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
- GPR_ASSERT(t->ep);
gpr_ref(&t->shutdown_ep_refs);
}
static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
if (gpr_unref(&t->shutdown_ep_refs)) {
- if (t->ep) {
- grpc_endpoint_shutdown(exec_ctx, t->ep);
- }
+ grpc_endpoint_shutdown(exec_ctx, t->ep);
}
}
-static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- grpc_endpoint_destroy(exec_ctx, t->ep);
- t->ep = NULL;
- /* safe because we'll still have the ref for write */
- UNREF_TRANSPORT(exec_ctx, t, "disconnect");
-}
-
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
@@ -475,9 +463,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
t->closed = 1;
connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
- if (t->ep) {
- allow_endpoint_shutdown_locked(exec_ctx, t);
- }
+ allow_endpoint_shutdown_locked(exec_ctx, t);
/* flush writable stream list to avoid dangling references */
grpc_chttp2_stream_global *stream_global;
@@ -511,21 +497,23 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
}
#endif
-static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *arg_ignored) {
- grpc_chttp2_register_stream(t, s);
+static void finish_init_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
+ grpc_error *error) {
+ grpc_chttp2_stream *s = sp;
+ grpc_chttp2_register_stream(s->t, s);
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "init");
}
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
+ GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
memset(s, 0, sizeof(*s));
+ s->t = t;
s->refcount = refcount;
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
@@ -560,16 +548,21 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.in_stream_map = true;
}
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked,
- NULL, 0);
+ grpc_closure_init(&s->init_stream, finish_init_stream_locked, s);
+ GRPC_CHTTP2_STREAM_REF(&s->global, "init");
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
+ GRPC_ERROR_NONE);
+
+ GPR_TIMER_END("init_stream", 0);
return 0;
}
-static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *arg) {
+static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
+ grpc_error *error) {
grpc_byte_stream *bs;
+ grpc_chttp2_stream *s = sp;
+ grpc_chttp2_transport *t = s->t;
GPR_TIMER_BEGIN("destroy_stream", 0);
@@ -588,7 +581,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
@@ -625,16 +618,20 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("destroy_stream", 0);
- gpr_free(arg);
+ gpr_free(s->destroy_stream_arg);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, void *and_free_memory) {
+ GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, destroy_stream_locked,
- and_free_memory, 0);
+ s->destroy_stream_arg = and_free_memory;
+ grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->destroy_stream,
+ GRPC_ERROR_NONE);
+ GPR_TIMER_END("destroy_stream", 0);
}
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
@@ -665,12 +662,10 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
static const char *write_state_name(grpc_chttp2_write_state state) {
switch (state) {
+ case GRPC_CHTTP2_WRITES_CORKED:
+ return "CORKED";
case GRPC_CHTTP2_WRITING_INACTIVE:
return "INACTIVE";
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
- return "REQUESTED[p=0]";
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- return "REQUESTED[p=1]";
case GRPC_CHTTP2_WRITE_SCHEDULED:
return "SCHEDULED";
case GRPC_CHTTP2_WRITING:
@@ -693,120 +688,18 @@ static void set_write_state(grpc_chttp2_transport *t,
t->executor.write_state = state;
}
-static void finish_global_actions(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- grpc_chttp2_executor_action_header *hdr;
- grpc_chttp2_executor_action_header *next;
-
- GPR_TIMER_BEGIN("finish_global_actions", 0);
-
- for (;;) {
- check_read_ops(exec_ctx, &t->global);
-
- gpr_mu_lock(&t->executor.mu);
- if (t->executor.pending_actions_head != NULL) {
- hdr = t->executor.pending_actions_head;
- t->executor.pending_actions_head = t->executor.pending_actions_tail =
- NULL;
- gpr_mu_unlock(&t->executor.mu);
- while (hdr != NULL) {
- GPR_TIMER_BEGIN("chttp2:locked_action", 0);
- hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
- GPR_TIMER_END("chttp2:locked_action", 0);
- next = hdr->next;
- gpr_free(hdr);
- UNREF_TRANSPORT(exec_ctx, t, "pending_action");
- hdr = next;
- }
- continue;
- } else {
- t->executor.global_active = false;
- switch (t->executor.write_state) {
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking");
- REF_TRANSPORT(t, "initiate_writing");
- gpr_mu_unlock(&t->executor.mu);
- grpc_exec_ctx_sched(
- exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE,
- t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL);
- break;
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
- start_writing(exec_ctx, t);
- gpr_mu_unlock(&t->executor.mu);
- break;
- case GRPC_CHTTP2_WRITING_INACTIVE:
- case GRPC_CHTTP2_WRITING:
- case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
- case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
- case GRPC_CHTTP2_WRITE_SCHEDULED:
- gpr_mu_unlock(&t->executor.mu);
- break;
- }
- }
- break;
- }
-
- GPR_TIMER_END("finish_global_actions", 0);
+static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
+ start_writing(exec_ctx, t);
}
-void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *optional_stream,
- grpc_chttp2_locked_action action,
- void *arg, size_t sizeof_arg) {
- grpc_chttp2_executor_action_header *hdr;
-
- GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0);
-
- REF_TRANSPORT(t, "run_global");
- gpr_mu_lock(&t->executor.mu);
-
- for (;;) {
- if (!t->executor.global_active) {
- t->executor.global_active = 1;
- gpr_mu_unlock(&t->executor.mu);
-
- GPR_TIMER_BEGIN("chttp2:locked_action", 0);
- action(exec_ctx, t, optional_stream, arg);
- GPR_TIMER_END("chttp2:locked_action", 0);
-
- finish_global_actions(exec_ctx, t);
- } else {
- gpr_mu_unlock(&t->executor.mu);
-
- hdr = gpr_malloc(sizeof(*hdr) + sizeof_arg);
- hdr->stream = optional_stream;
- hdr->action = action;
- if (sizeof_arg == 0) {
- hdr->arg = arg;
- } else {
- hdr->arg = hdr + 1;
- memcpy(hdr->arg, arg, sizeof_arg);
- }
-
- gpr_mu_lock(&t->executor.mu);
- if (!t->executor.global_active) {
- /* global lock was released while allocating memory: release & retry */
- gpr_free(hdr);
- continue;
- }
- hdr->next = NULL;
- if (t->executor.pending_actions_head != NULL) {
- t->executor.pending_actions_tail =
- t->executor.pending_actions_tail->next = hdr;
- } else {
- t->executor.pending_actions_tail = t->executor.pending_actions_head =
- hdr;
- }
- REF_TRANSPORT(t, "pending_action");
- gpr_mu_unlock(&t->executor.mu);
- }
- break;
- }
-
- UNREF_TRANSPORT(exec_ctx, t, "run_global");
-
- GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0);
+static void initiate_read_flush_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ t->executor.check_read_ops_scheduled = false;
+ check_read_ops(exec_ctx, &t->global);
}
/*******************************************************************************
@@ -816,11 +709,12 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
bool covered_by_poller, const char *reason) {
+ GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0);
+
/* Perform state checks, and transition to a scheduled state if appropriate.
- Each time we finish the global lock execution, we check if we need to
- write. If we do:
- - (if there is a poller surrounding the write) schedule
- initiate_writing, which locks and calls initiate_writing_locked to...
+ If we are inactive, schedule a write chain to begin once the transport
+ combiner finishes any executions in its current batch (which may be
+ scheduled AFTER this code executes). The write chain will:
- call start_writing, which verifies (under the global lock) that there
are things that need to be written by calling
grpc_chttp2_unlocking_check_writes, and if so schedules writing_action
@@ -830,31 +724,28 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
to do *another* write immediately, and if so loops back to
start_writing.
- Current problems:
+ Current problems:
- too much lock entry/exiting
- the writing thread can become stuck indefinitely (punt through the
workqueue periodically to fix) */
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
switch (t->executor.write_state) {
- case GRPC_CHTTP2_WRITING_INACTIVE:
- set_write_state(t, covered_by_poller
- ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
- : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
- reason);
+ case GRPC_CHTTP2_WRITES_CORKED:
break;
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- /* nothing to do: write already requested */
+ case GRPC_CHTTP2_WRITING_INACTIVE:
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, reason);
+ REF_TRANSPORT(t, "writing");
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ covered_by_poller);
break;
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
+ case GRPC_CHTTP2_WRITE_SCHEDULED:
if (covered_by_poller) {
/* upgrade to note poller is available to cover the write */
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason);
+ grpc_combiner_force_async_finally(t->executor.combiner);
}
break;
- case GRPC_CHTTP2_WRITE_SCHEDULED:
- /* nothing to do: write already scheduled */
- break;
case GRPC_CHTTP2_WRITING:
set_write_state(t,
covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
@@ -871,15 +762,15 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
}
break;
}
+ GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
- GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED ||
- t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER);
+ GPR_TIMER_BEGIN("start_writing", 0);
+ GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED);
if (!t->closed &&
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing");
- REF_TRANSPORT(t, "writing");
prevent_endpoint_shutdown(t);
grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
} else {
@@ -890,25 +781,10 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE,
"start_writing:nothing_to_write");
}
- end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE, "Nothing to write");
- if (t->ep && !t->endpoint_reading) {
- destroy_endpoint(exec_ctx, t);
- }
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE);
+ UNREF_TRANSPORT(exec_ctx, t, "writing");
}
-}
-
-static void initiate_writing_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *arg_ignored) {
- start_writing(exec_ctx, t);
- UNREF_TRANSPORT(exec_ctx, t, "initiate_writing");
-}
-
-static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked,
- NULL, 0);
+ GPR_TIMER_END("start_writing", 0);
}
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
@@ -942,15 +818,10 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* error may be GRPC_ERROR_NONE if there is no error allocated yet.
In that case, use "reason" as the text for a new error. */
static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_error *error,
- const char *reason) {
+ grpc_chttp2_transport *t, grpc_error *error) {
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global,
&stream_global)) {
- if (error == GRPC_ERROR_NONE && reason != NULL) {
- /* create error object. */
- error = GRPC_ERROR_CREATE(reason);
- }
fail_pending_writes(exec_ctx, &t->global, stream_global,
GRPC_ERROR_REF(error));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes");
@@ -958,12 +829,10 @@ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(error);
}
-static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_ignored,
- void *a) {
- grpc_error *error = a;
-
+static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
+ grpc_chttp2_transport *t = tp;
allow_endpoint_shutdown_locked(exec_ctx, t);
if (error != GRPC_ERROR_NONE) {
@@ -972,39 +841,46 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
- end_waiting_for_write(exec_ctx, t, error, NULL);
+ end_waiting_for_write(exec_ctx, t, GRPC_ERROR_REF(error));
switch (t->executor.write_state) {
+ case GRPC_CHTTP2_WRITES_CORKED:
case GRPC_CHTTP2_WRITING_INACTIVE:
- case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER:
- case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER:
case GRPC_CHTTP2_WRITE_SCHEDULED:
GPR_UNREACHABLE_CODE(break);
case GRPC_CHTTP2_WRITING:
+ GPR_TIMER_MARK("state=writing", 0);
set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing");
break;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
- "terminate_writing");
+ GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
+ REF_TRANSPORT(t, "writing");
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ true);
break;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER:
- set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
- "terminate_writing");
+ GPR_TIMER_MARK("state=writing_stale_no_poller", 0);
+ set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "terminate_writing");
+ REF_TRANSPORT(t, "writing");
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_writing, GRPC_ERROR_NONE,
+ false);
break;
}
- if (t->ep && !t->endpoint_reading) {
- destroy_endpoint(exec_ctx, t);
- }
-
UNREF_TRANSPORT(exec_ctx, t, "writing");
+ GPR_TIMER_END("terminate_writing_with_lock", 0);
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
void *transport_writing, grpc_error *error) {
+ GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
- grpc_chttp2_run_with_global_lock(
- exec_ctx, t, NULL, terminate_writing_with_lock, GRPC_ERROR_REF(error), 0);
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->terminate_writing,
+ GRPC_ERROR_REF(error));
+ GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
@@ -1148,15 +1024,22 @@ static int contains_non_ok_status(
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
-static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *stream_op) {
+static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
+ grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
grpc_transport_stream_op *op = stream_op;
+ grpc_chttp2_transport *t = op->transport_private.args[0];
+ grpc_chttp2_stream *s = op->transport_private.args[1];
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_stream_global *stream_global = &s->global;
+ if (grpc_http_trace) {
+ char *str = grpc_transport_stream_op_string(op);
+ gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s", str);
+ gpr_free(str);
+ }
+
grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
on_complete = grpc_closure_create(do_nothing, NULL);
@@ -1211,7 +1094,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (!stream_global->write_closed) {
if (transport_global->is_client) {
@@ -1278,7 +1162,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (stream_global->write_closed) {
stream_global->send_trailing_metadata = NULL;
@@ -1303,7 +1188,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
stream_global->recv_initial_metadata_ready =
op->recv_initial_metadata_ready;
stream_global->recv_initial_metadata = op->recv_initial_metadata;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (op->recv_message != NULL) {
@@ -1317,7 +1203,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
exec_ctx, transport_global, stream_global,
transport_global->stream_lookahead, 0);
}
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (op->recv_trailing_metadata != NULL) {
@@ -1326,21 +1213,30 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
add_closure_barrier(on_complete);
stream_global->recv_trailing_metadata = op->recv_trailing_metadata;
stream_global->final_metadata_requested = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
grpc_chttp2_complete_closure_step(exec_ctx, transport_global, stream_global,
&on_complete, GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op_locked", 0);
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "perform_stream_op");
}
static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_transport_stream_op *op) {
+ GPR_TIMER_BEGIN("perform_stream_op", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
- grpc_chttp2_run_with_global_lock(exec_ctx, t, s, perform_stream_op_locked, op,
- sizeof(*op));
+ grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
+ op);
+ op->transport_private.args[0] = gt;
+ op->transport_private.args[1] = gs;
+ GRPC_CHTTP2_STREAM_REF(&s->global, "perform_stream_op");
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &op->transport_private.closure, GRPC_ERROR_NONE);
+ GPR_TIMER_END("perform_stream_op", 0);
}
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1362,13 +1258,20 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping");
}
-static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *opaque_8bytes) {
+typedef struct ack_ping_args {
+ grpc_closure closure;
+ grpc_chttp2_transport *t;
+ uint8_t opaque_8bytes[8];
+} ack_ping_args;
+
+static void ack_ping_locked(grpc_exec_ctx *exec_ctx, void *a,
+ grpc_error *error_ignored) {
+ ack_ping_args *args = a;
grpc_chttp2_outstanding_ping *ping;
- grpc_chttp2_transport_global *transport_global = &t->global;
+ grpc_chttp2_transport_global *transport_global = &args->t->global;
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
- if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
+ if (0 == memcmp(args->opaque_8bytes, ping->id, 8)) {
grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
@@ -1376,21 +1279,27 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
break;
}
}
+ UNREF_TRANSPORT(exec_ctx, args->t, "ack_ping");
+ gpr_free(args);
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *transport_parsing,
const uint8_t *opaque_8bytes) {
- grpc_chttp2_run_with_global_lock(
- exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing), NULL,
- ack_ping_locked, (void *)opaque_8bytes, 8);
+ ack_ping_args *args = gpr_malloc(sizeof(*args));
+ args->t = TRANSPORT_FROM_PARSING(transport_parsing);
+ memcpy(args->opaque_8bytes, opaque_8bytes, sizeof(args->opaque_8bytes));
+ grpc_closure_init(&args->closure, ack_ping_locked, args);
+ REF_TRANSPORT(args->t, "ack_ping");
+ grpc_combiner_execute(exec_ctx, args->t->executor.combiner, &args->closure,
+ GRPC_ERROR_NONE);
}
static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *stream_op) {
+ void *stream_op,
+ grpc_error *error_ignored) {
grpc_transport_op *op = stream_op;
+ grpc_chttp2_transport *t = op->transport_private.args[0];
grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
@@ -1402,8 +1311,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
return;
}
- grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
-
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
@@ -1429,11 +1336,11 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->bind_pollset) {
- add_to_pollset_locked(exec_ctx, t, NULL, op->bind_pollset);
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, op->bind_pollset);
}
if (op->bind_pollset_set) {
- add_to_pollset_set_locked(exec_ctx, t, NULL, op->bind_pollset_set);
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set);
}
if (op->send_ping) {
@@ -1443,13 +1350,21 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
}
+
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+
+ UNREF_TRANSPORT(exec_ctx, t, "transport_op");
}
static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_chttp2_run_with_global_lock(
- exec_ctx, t, NULL, perform_transport_op_locked, op, sizeof(*op));
+ op->transport_private.args[0] = gt;
+ grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
+ op);
+ REF_TRANSPORT(t, "transport_op");
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &op->transport_private.closure, GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1458,6 +1373,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global) {
+ GPR_TIMER_BEGIN("check_read_ops", 0);
grpc_chttp2_stream_global *stream_global;
grpc_byte_stream *bs;
while (
@@ -1467,7 +1383,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1490,7 +1406,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->incoming_frames.head != NULL) {
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
@@ -1511,7 +1427,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
if (stream_global->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(
@@ -1532,6 +1448,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
}
}
+ GPR_TIMER_END("check_read_ops", 0);
}
static void decrement_active_streams_locked(
@@ -1539,7 +1456,8 @@ static void decrement_active_streams_locked(
grpc_chttp2_stream_global *stream_global) {
if ((stream_global->all_incoming_byte_streams_finished =
gpr_unref(&stream_global->active_streams))) {
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
}
@@ -1643,7 +1561,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
}
if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1, due_to_error);
@@ -1655,7 +1574,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_status_code status, gpr_slice *slice) {
if (status != GRPC_STATUS_OK) {
stream_global->seen_error = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
/* stream_global->recv_trailing_metadata_finished gives us a
last chance replacement: we've received trailing metadata,
@@ -1679,7 +1599,8 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
}
stream_global->published_trailing_metadata = true;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (slice) {
gpr_slice_unref(*slice);
@@ -1739,7 +1660,8 @@ void grpc_chttp2_mark_stream_closed(
GRPC_ERROR_UNREF(error);
return;
}
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
if (close_reads && !stream_global->read_closed) {
stream_global->read_closed_error = GRPC_ERROR_REF(error);
stream_global->read_closed = true;
@@ -1920,6 +1842,10 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
+ if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ }
close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
end_all_the_calls(exec_ctx, t, error);
}
@@ -1955,16 +1881,12 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
-static void reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg);
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error);
static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
@@ -1972,16 +1894,20 @@ static void reading_action(grpc_exec_ctx *exec_ctx, void *tp,
reading_action_locked ->
(parse_unlocked -> post_parse_locked)? ->
post_reading_action_locked */
- grpc_chttp2_run_with_global_lock(exec_ctx, tp, NULL, reading_action_locked,
- GRPC_ERROR_REF(error), 0);
+ GPR_TIMER_BEGIN("reading_action", 0);
+ grpc_chttp2_transport *t = tp;
+ grpc_combiner_execute(exec_ctx, t->executor.combiner,
+ &t->reading_action_locked, GRPC_ERROR_REF(error));
+ GPR_TIMER_END("reading_action", 0);
}
-static void reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg) {
+static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("reading_action_locked", 0);
+
+ grpc_chttp2_transport *t = tp;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
- grpc_error *error = arg;
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) {
@@ -1990,10 +1916,13 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
- grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, GRPC_ERROR_REF(error),
+ NULL);
} else {
- post_reading_action_locked(exec_ctx, t, s_unused, arg);
+ post_reading_action_locked(exec_ctx, t, error);
}
+
+ GPR_TIMER_END("reading_action_locked", 0);
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
@@ -2045,13 +1974,15 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
GRPC_ERROR_UNREF(errors[i]);
}
+ grpc_combiner_execute(exec_ctx, t->executor.combiner, &t->post_parse_locked,
+ err);
GPR_TIMER_END("reading_action.parse", 0);
- grpc_chttp2_run_with_global_lock(exec_ctx, t, NULL, post_parse_locked, err,
- 0);
}
-static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *arg) {
+static void post_parse_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("post_parse_locked", 0);
+ grpc_chttp2_transport *t = arg;
grpc_chttp2_transport_global *transport_global = &t->global;
grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
/* copy parsing qbuf to global qbuf */
@@ -2077,7 +2008,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
- perform_transport_op_locked(exec_ctx, t, NULL, op);
+ perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
@@ -2094,39 +2025,32 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
- post_reading_action_locked(exec_ctx, t, s_unused, arg);
+ post_reading_action_locked(exec_ctx, t, error);
+ GPR_TIMER_END("post_parse_locked", 0);
}
-static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *arg) {
- grpc_error *error = arg;
+static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("post_reading_action_locked", 0);
+ grpc_chttp2_transport *t = arg;
bool keep_reading = false;
+ GRPC_ERROR_REF(error);
if (error == GRPC_ERROR_NONE && t->closed) {
error = GRPC_ERROR_CREATE("Transport closed");
}
if (error != GRPC_ERROR_NONE) {
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
- error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAVAILABLE);
- }
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
if (grpc_http_write_state_trace) {
gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t,
write_state_name(t->executor.write_state));
}
- if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) {
- destroy_endpoint(exec_ctx, t);
- }
} else if (!t->closed) {
keep_reading = true;
REF_TRANSPORT(t, "keep_reading");
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
- GRPC_ERROR_UNREF(error);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
@@ -2135,6 +2059,9 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
} else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
+ GRPC_ERROR_UNREF(error);
+
+ GPR_TIMER_END("post_reading_action_locked", 0);
}
/*******************************************************************************
@@ -2156,36 +2083,16 @@ static void connectivity_state_set(
* POLLSET STUFF
*/
-static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused, void *pollset) {
- if (t->ep) {
- grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
- }
-}
-
-static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s_unused,
- void *pollset_set) {
- if (t->ep) {
- grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
- }
-}
-
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {
- /* TODO(ctiller): keep pollset alive */
- grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
- (grpc_chttp2_stream *)gs,
- add_to_pollset_locked, pollset, 0);
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
}
static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset_set *pollset_set) {
- grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
- (grpc_chttp2_stream *)gs,
- add_to_pollset_set_locked, pollset_set, 0);
+ grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
+ grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
}
/*******************************************************************************
@@ -2197,6 +2104,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
gpr_slice_buffer_destroy(&bs->slices);
+ gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
}
@@ -2242,38 +2150,34 @@ static void incoming_byte_stream_update_flow_control(
}
}
-typedef struct {
- grpc_chttp2_incoming_byte_stream *byte_stream;
- gpr_slice *slice;
- size_t max_size_hint;
- grpc_closure *on_complete;
-} incoming_byte_stream_next_arg;
-
static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- incoming_byte_stream_next_arg *arg = argp;
- grpc_chttp2_incoming_byte_stream *bs =
- (grpc_chttp2_incoming_byte_stream *)arg->byte_stream;
+ void *argp,
+ grpc_error *error_ignored) {
+ grpc_chttp2_incoming_byte_stream *bs = argp;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
if (bs->is_tail) {
- incoming_byte_stream_update_flow_control(exec_ctx, transport_global,
- stream_global, arg->max_size_hint,
- bs->slices.length);
- }
+ gpr_mu_lock(&bs->slice_mu);
+ size_t cur_length = bs->slices.length;
+ gpr_mu_unlock(&bs->slice_mu);
+ incoming_byte_stream_update_flow_control(
+ exec_ctx, transport_global, stream_global,
+ bs->next_action.max_size_hint, cur_length);
+ }
+ gpr_mu_lock(&bs->slice_mu);
if (bs->slices.count > 0) {
- *arg->slice = gpr_slice_buffer_take_first(&bs->slices);
- grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL);
- } else if (bs->error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error),
+ *bs->next_action.slice = gpr_slice_buffer_take_first(&bs->slices);
+ grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE,
NULL);
+ } else if (bs->error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, bs->next_action.on_complete,
+ GRPC_ERROR_REF(bs->error), NULL);
} else {
- bs->on_next = arg->on_complete;
- bs->next = arg->slice;
+ bs->on_next = bs->next_action.on_complete;
+ bs->next = bs->next_action.slice;
}
+ gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
@@ -2281,13 +2185,18 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
grpc_closure *on_complete) {
+ GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- incoming_byte_stream_next_arg arg = {bs, slice, max_size_hint, on_complete};
gpr_ref(&bs->refs);
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_next_locked, &arg,
- sizeof(arg));
+ bs->next_action.slice = slice;
+ bs->next_action.max_size_hint = max_size_hint;
+ bs->next_action.on_complete = on_complete;
+ grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
+ bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->next_action.closure, GRPC_ERROR_NONE);
+ GPR_TIMER_END("incoming_byte_stream_next", 0);
return 0;
}
@@ -2295,9 +2204,8 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *byte_stream) {
+ void *byte_stream,
+ grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
decrement_active_streams_locked(exec_ctx, &bs->transport->global,
@@ -2307,10 +2215,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
+ GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_destroy_locked, bs, 0);
+ grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
+ bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->destroy_action, GRPC_ERROR_NONE);
+ GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
typedef struct {
@@ -2318,90 +2230,45 @@ typedef struct {
gpr_slice slice;
} incoming_byte_stream_push_arg;
-static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- incoming_byte_stream_push_arg *arg = argp;
- grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
+void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_incoming_byte_stream *bs,
+ gpr_slice slice) {
+ gpr_mu_lock(&bs->slice_mu);
if (bs->on_next != NULL) {
- *bs->next = arg->slice;
+ *bs->next = slice;
grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
bs->on_next = NULL;
} else {
- gpr_slice_buffer_add(&bs->slices, arg->slice);
+ gpr_slice_buffer_add(&bs->slices, slice);
}
- incoming_byte_stream_unref(exec_ctx, bs);
+ gpr_mu_unlock(&bs->slice_mu);
}
-void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_incoming_byte_stream *bs,
- gpr_slice slice) {
- incoming_byte_stream_push_arg arg = {bs, slice};
- gpr_ref(&bs->refs);
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_push_locked, &arg,
- sizeof(arg));
-}
-
-typedef struct {
- grpc_chttp2_incoming_byte_stream *bs;
- grpc_error *error;
-} bs_fail_args;
-
-static bs_fail_args *make_bs_fail_args(grpc_chttp2_incoming_byte_stream *bs,
- grpc_error *error) {
- bs_fail_args *a = gpr_malloc(sizeof(*a));
- a->bs = bs;
- a->error = error;
- return a;
-}
-
-static void incoming_byte_stream_finished_failed_locked(
- grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- void *argp) {
- bs_fail_args *a = argp;
- grpc_chttp2_incoming_byte_stream *bs = a->bs;
- grpc_error *error = a->error;
- gpr_free(a);
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
- bs->on_next = NULL;
- GRPC_ERROR_UNREF(bs->error);
- bs->error = error;
- incoming_byte_stream_unref(exec_ctx, bs);
-}
-
-static void incoming_byte_stream_finished_ok_locked(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s,
- void *argp) {
- grpc_chttp2_incoming_byte_stream *bs = argp;
+static void incoming_byte_stream_finished_locked(grpc_exec_ctx *exec_ctx,
+ void *bsp, grpc_error *error) {
+ grpc_chttp2_incoming_byte_stream *bs = bsp;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ bs->on_next = NULL;
+ GRPC_ERROR_UNREF(bs->error);
+ bs->error = error;
+ }
incoming_byte_stream_unref(exec_ctx, bs);
}
void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, int from_parsing_thread) {
+ GPR_TIMER_BEGIN("grpc_chttp2_incoming_byte_stream_finished", 0);
if (from_parsing_thread) {
- if (error == GRPC_ERROR_NONE) {
- grpc_chttp2_run_with_global_lock(exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_finished_ok_locked,
- bs, 0);
- } else {
- grpc_chttp2_run_with_global_lock(
- exec_ctx, bs->transport, bs->stream,
- incoming_byte_stream_finished_failed_locked,
- make_bs_fail_args(bs, error), 0);
- }
+ grpc_closure_init(&bs->finished_action,
+ incoming_byte_stream_finished_locked, bs);
+ grpc_combiner_execute(exec_ctx, bs->transport->executor.combiner,
+ &bs->finished_action, GRPC_ERROR_REF(error));
} else {
- if (error == GRPC_ERROR_NONE) {
- incoming_byte_stream_finished_ok_locked(exec_ctx, bs->transport,
- bs->stream, bs);
- } else {
- incoming_byte_stream_finished_failed_locked(
- exec_ctx, bs->transport, bs->stream, make_bs_fail_args(bs, error));
- }
+ incoming_byte_stream_finished_locked(exec_ctx, bs, error);
}
+ GPR_TIMER_END("grpc_chttp2_incoming_byte_stream_finished", 0);
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@@ -2414,6 +2281,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
+ gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index d67c014e54..04b788b702 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -48,6 +48,7 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
+#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -161,9 +162,20 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_chttp2_transport *transport;
grpc_chttp2_stream *stream;
int is_tail;
+
+ gpr_mu slice_mu; // protects slices, on_next
gpr_slice_buffer slices;
grpc_closure *on_next;
gpr_slice *next;
+
+ struct {
+ grpc_closure closure;
+ gpr_slice *slice;
+ size_t max_size_hint;
+ grpc_closure *on_complete;
+ } next_action;
+ grpc_closure destroy_action;
+ grpc_closure finished_action;
};
typedef struct {
@@ -296,23 +308,11 @@ struct grpc_chttp2_transport_parsing {
int64_t outgoing_window;
};
-typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, void *arg);
-
-typedef struct grpc_chttp2_executor_action_header {
- grpc_chttp2_stream *stream;
- grpc_chttp2_locked_action action;
- struct grpc_chttp2_executor_action_header *next;
- void *arg;
-} grpc_chttp2_executor_action_header;
-
typedef enum {
+ /** no writing activity allowed */
+ GRPC_CHTTP2_WRITES_CORKED,
/** no writing activity */
GRPC_CHTTP2_WRITING_INACTIVE,
- /** write has been requested, but not scheduled yet */
- GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER,
- GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER,
/** write has been requested and scheduled against the workqueue */
GRPC_CHTTP2_WRITE_SCHEDULED,
/** write has been initiated after being reaped from the workqueue */
@@ -333,7 +333,7 @@ struct grpc_chttp2_transport {
gpr_refcount shutdown_ep_refs;
struct {
- gpr_mu mu;
+ grpc_combiner *combiner;
/** is a thread currently in the global lock */
bool global_active;
@@ -341,9 +341,8 @@ struct grpc_chttp2_transport {
bool parsing_active;
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
-
- grpc_chttp2_executor_action_header *pending_actions_head;
- grpc_chttp2_executor_action_header *pending_actions_tail;
+ /** has a check_read_ops been scheduled */
+ bool check_read_ops_scheduled;
} executor;
/** is the transport destroying itself? */
@@ -380,10 +379,16 @@ struct grpc_chttp2_transport {
grpc_closure writing_action;
/** closure to start reading from the endpoint */
grpc_closure reading_action;
+ grpc_closure reading_action_locked;
+ grpc_closure post_parse_locked;
/** closure to actually do parsing */
grpc_closure parsing_action;
/** closure to initiate writing */
grpc_closure initiate_writing;
+ /** closure to finish writing */
+ grpc_closure terminate_writing;
+ /** closure to flush read state up the stack */
+ grpc_closure initiate_read_flush_locked;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -527,11 +532,16 @@ struct grpc_chttp2_stream_parsing {
};
struct grpc_chttp2_stream {
+ grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
grpc_chttp2_stream_global global;
grpc_chttp2_stream_writing writing;
grpc_chttp2_stream_parsing parsing;
+ grpc_closure init_stream;
+ grpc_closure destroy_stream;
+ void *destroy_stream_arg;
+
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT];
};
@@ -626,7 +636,7 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_check_read_ops(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
bool grpc_chttp2_list_remove_check_read_ops(
grpc_chttp2_transport_global *transport_global,
@@ -706,12 +716,6 @@ void grpc_chttp2_complete_closure_step(
grpc_chttp2_stream_global *stream_global, grpc_closure **pclosure,
grpc_error *error);
-void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *transport,
- grpc_chttp2_stream *optional_stream,
- grpc_chttp2_locked_action action,
- void *arg, size_t sizeof_arg);
-
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 482cd55c44..0e6d579ba9 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -177,7 +177,8 @@ void grpc_chttp2_publish_reads(
stream_global->seen_error = true;
stream_global->exceeded_metadata_size =
stream_parsing->exceeded_metadata_size;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
/* flush stats to global stream state */
@@ -203,7 +204,8 @@ void grpc_chttp2_publish_reads(
stream_global->incoming_frames.tail->is_tail = 0;
}
if (stream_parsing->data_parser.incoming_frames.head != NULL) {
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
grpc_chttp2_incoming_frame_queue_merge(
&stream_global->incoming_frames,
@@ -219,7 +221,8 @@ void grpc_chttp2_publish_reads(
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
stream_parsing->metadata_buffer[0],
stream_global->received_initial_metadata);
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (!stream_global->published_trailing_metadata &&
stream_parsing->got_metadata_on_parse[1]) {
@@ -228,7 +231,8 @@ void grpc_chttp2_publish_reads(
GPR_SWAP(grpc_chttp2_incoming_metadata_buffer,
stream_parsing->metadata_buffer[1],
stream_global->received_trailing_metadata);
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ grpc_chttp2_list_add_check_read_ops(exec_ctx, transport_global,
+ stream_global);
}
if (stream_parsing->forced_close_error != GRPC_ERROR_NONE) {
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 2eb5f5f632..4dc4968248 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -298,8 +298,15 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
}
void grpc_chttp2_list_add_check_read_ops(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
+ grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
+ if (!t->executor.check_read_ops_scheduled) {
+ grpc_combiner_execute_finally(exec_ctx, t->executor.combiner,
+ &t->initiate_read_flush_locked,
+ GRPC_ERROR_NONE, false);
+ t->executor.check_read_ops_scheduled = true;
+ }
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 311b26e354..979515bd54 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -55,15 +55,6 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
- /* simple writes are queued to qbuf, and flushed here */
- gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
- GPR_ASSERT(transport_global->qbuf.count == 0);
-
- grpc_chttp2_hpack_compressor_set_max_table_size(
- &transport_writing->hpack_compressor,
- transport_global->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
-
if (transport_global->dirtied_local_settings &&
!transport_global->sent_local_settings) {
gpr_slice_buffer_add(
@@ -77,6 +68,16 @@ int grpc_chttp2_unlocking_check_writes(
transport_global->sent_local_settings = 1;
}
+ /* simple writes are queued to qbuf, and flushed here */
+ gpr_slice_buffer_move_into(&transport_global->qbuf,
+ &transport_writing->outbuf);
+ GPR_ASSERT(transport_global->qbuf.count == 0);
+
+ grpc_chttp2_hpack_compressor_set_max_table_size(
+ &transport_writing->hpack_compressor,
+ transport_global->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
+
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
if (transport_writing->outgoing_window > 0) {
@@ -344,6 +345,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_cleanup_writing(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
+ GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
@@ -382,4 +384,5 @@ void grpc_chttp2_cleanup_writing(
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
+ GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 029c15014e..366690acf2 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -294,7 +294,7 @@ static void remove_from_storage(struct stream_obj *s,
/*
Cycle through ops and try to take next action. Break when either
an action with callback is taken, or no action is possible.
- This can be executed from the Cronet network thread via cronet callback
+ This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function.
*/
static void execute_from_storage(stream_obj *s) {
@@ -329,6 +329,7 @@ static void execute_from_storage(stream_obj *s) {
static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_FAILED] = true;
s->cbs = NULL;
@@ -340,6 +341,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -349,6 +351,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
static void on_canceled(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_CANCELED] = true;
s->cbs = NULL;
@@ -360,6 +363,7 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -369,9 +373,11 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
static void on_succeeded(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -381,6 +387,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
/* Free the memory allocated for headers */
@@ -388,6 +395,7 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
gpr_free(s->header_array.headers);
s->header_array.headers = NULL;
}
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -401,6 +409,7 @@ static void on_response_headers_received(
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
memset(&s->state.rs.initial_metadata, 0,
sizeof(s->state.rs.initial_metadata));
grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
@@ -412,6 +421,7 @@ static void on_response_headers_received(
grpc_mdstr_from_string(headers->headers[i].value)));
}
s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -422,11 +432,13 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
const char *data) {
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
+ gpr_mu_lock(&s->mu);
if (s->state.ws.write_buffer) {
gpr_free(s->state.ws.write_buffer);
s->state.ws.write_buffer = NULL;
}
s->state.state_callback_received[OP_SEND_MESSAGE] = true;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -438,6 +450,7 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
+ gpr_mu_lock(&s->mu);
s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0) {
s->state.rs.received_bytes += count;
@@ -448,11 +461,14 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
cronet_bidirectional_stream_read(
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
s->state.rs.remaining_bytes);
+ gpr_mu_unlock(&s->mu);
} else {
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
} else {
s->state.rs.read_stream_closed = true;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
}
@@ -466,6 +482,7 @@ static void on_response_trailers_received(
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
stream_obj *s = (stream_obj *)stream->annotation;
+ gpr_mu_lock(&s->mu);
memset(&s->state.rs.trailing_metadata, 0,
sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false;
@@ -481,6 +498,7 @@ static void on_response_trailers_received(
s->state.rs.trailing_metadata_valid = true;
}
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
+ gpr_mu_unlock(&s->mu);
execute_from_storage(s);
}
@@ -757,14 +775,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
- if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ stream_state->state_callback_received[OP_FAILED]) {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED, NULL);
+ } else {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
GRPC_ERROR_NONE, NULL);
- } else {
- grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
- GRPC_ERROR_CANCELLED, NULL);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@@ -772,32 +791,40 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
- gpr_slice_buffer write_slice_buffer;
- gpr_slice slice;
- gpr_slice_buffer_init(&write_slice_buffer);
- grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
- stream_op->send_message->length, NULL);
- /* Check that compression flag is OFF. We don't support compression yet. */
- if (stream_op->send_message->flags != 0) {
- gpr_log(GPR_ERROR, "Compression is not supported");
- GPR_ASSERT(stream_op->send_message->flags == 0);
- }
- gpr_slice_buffer_add(&write_slice_buffer, slice);
- if (write_slice_buffer.count != 1) {
- /* Empty request not handled yet */
- gpr_log(GPR_ERROR, "Empty request is not supported");
- GPR_ASSERT(write_slice_buffer.count == 1);
- }
- if (write_slice_buffer.count > 0) {
- size_t write_buffer_size;
- create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
- &write_buffer_size);
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
- s->cbs, stream_state->ws.write_buffer);
- stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
- (int)write_buffer_size, false);
- result = ACTION_TAKEN_WITH_CALLBACK;
+ if (stream_state->state_callback_received[OP_FAILED]) {
+ result = NO_ACTION_POSSIBLE;
+ CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
+ } else {
+ gpr_slice_buffer write_slice_buffer;
+ gpr_slice slice;
+ gpr_slice_buffer_init(&write_slice_buffer);
+ grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
+ stream_op->send_message->length, NULL);
+ /* Check that compression flag is OFF. We don't support compression yet.
+ */
+ if (stream_op->send_message->flags != 0) {
+ gpr_log(GPR_ERROR, "Compression is not supported");
+ GPR_ASSERT(stream_op->send_message->flags == 0);
+ }
+ gpr_slice_buffer_add(&write_slice_buffer, slice);
+ if (write_slice_buffer.count != 1) {
+ /* Empty request not handled yet */
+ gpr_log(GPR_ERROR, "Empty request is not supported");
+ GPR_ASSERT(write_slice_buffer.count == 1);
+ }
+ if (write_slice_buffer.count > 0) {
+ size_t write_buffer_size;
+ create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size);
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
+ s->cbs, stream_state->ws.write_buffer);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
+ (int)write_buffer_size, false);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ result = NO_ACTION_POSSIBLE;
+ }
}
stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true;
@@ -805,7 +832,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
- if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ stream_state->state_callback_received[OP_FAILED]) {
+ CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
@@ -861,8 +890,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
true; /* Indicates that at least one read request has been made */
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ result = NO_ACTION_POSSIBLE;
}
- result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_state->rs.remaining_bytes == 0) {
CRONET_LOG(GPR_DEBUG, "read operation complete");
gpr_slice read_data_slice =
@@ -903,11 +934,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_SEND_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
- stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ if (stream_state->state_callback_received[OP_FAILED]) {
+ result = NO_ACTION_POSSIBLE;
+ CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
+ } else {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)",
+ s->cbs);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ }
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
- result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_op->cancel_error &&
op_can_be_run(stream_op, stream_state, &oas->state,
OP_CANCEL_ERROR)) {