aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_config/client_channel.c8
-rw-r--r--src/core/ext/client_config/lb_policy.c7
-rw-r--r--src/core/ext/client_config/lb_policy.h52
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h8
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.h2
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c63
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c10
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c115
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c1303
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c1
10 files changed, 1023 insertions, 546 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 2c0c4abffc..c1c5e38cb0 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -401,9 +401,11 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
int r;
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
- initial_metadata, initial_metadata_flags,
- connected_subchannel, on_ready);
+ const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
+ initial_metadata_flags,
+ &calld->lb_token_mdelem};
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
+ on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
GPR_TIMER_END("cc_pick_subchannel", 0);
return r;
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 8b980b2cca..71170f5655 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -100,13 +100,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ return policy->vtable->pick(exec_ctx, policy, pick_args, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index a2f5446fc6..6f133a2948 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -53,23 +53,37 @@ struct grpc_lb_policy {
grpc_pollset_set *interested_parties;
};
+/** Extra arguments for an LB pick */
+typedef struct grpc_lb_policy_pick_args {
+ /** Parties interested in the pick's progress */
+ grpc_polling_entity *pollent;
+ /** Initial metadata associated with the picking call. */
+ grpc_metadata_batch *initial_metadata;
+ /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
+ uint32_t initial_metadata_flags;
+ /** Storage for LB token in \a initial_metadata, or NULL if not used */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+} grpc_lb_policy_pick_args;
+
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** implement grpc_lb_policy_pick */
+ /** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, grpc_closure *on_complete);
+
+ /** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+
+ /** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
+ /** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -83,8 +97,7 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
- state cancels the subscription.
- */
+ state cancels the subscription. */
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
@@ -124,26 +137,31 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
-/** Given initial metadata in \a initial_metadata, find an appropriate
- target for this rpc, and 'return' it by calling \a on_complete after setting
- \a target.
- Picking can be asynchronous. Any IO should be done under \a pollent. */
+/** Find an appropriate target for this call, based on \a pick_args.
+ Upon completion \a on_complete will be called, with \a *target set to an
+ appropriate connected subchannel if the pick was successful or NULL
+ otherwise.
+ Picking can be asynchronous. Any IO should be done under \a
+ pick_args->pollent. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
+/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+ against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
+/** Cancel picks for \a target.
+ The \a on_complete callback of the pending picks will be invoked with \a
+ *target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
-/** Cancel all pending picks which have:
- (initial_metadata_flags & initial_metadata_flags_mask) ==
- initial_metadata_flags_eq */
+/** Cancel all pending picks for which their \a initial_metadata_flags (as given
+ in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
+ when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index 1c89b28b59..635efb6e6a 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -49,8 +49,16 @@ struct grpc_lb_policy_factory {
const grpc_lb_policy_factory_vtable *vtable;
};
+typedef struct grpc_lb_policy_address_token {
+ uint8_t *token;
+ size_t token_size;
+} grpc_lb_policy_address_token;
+
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
+ /* It not NULL, array of load balancing tokens associated with \a addresses,
+ * on a 1:1 correspondence. Some indices may be NULL for missing tokens. */
+ grpc_lb_policy_address_token *tokens;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 8d2deb02f3..40a0681a3b 100644
--- a/src/core/ext/client_config/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -81,6 +81,8 @@ typedef struct grpc_subchannel_call_holder {
grpc_closure next_step;
grpc_call_stack *owning_call;
+
+ grpc_linked_mdelem lb_token_mdelem;
} grpc_subchannel_call_holder;
void grpc_subchannel_call_holder_init(
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 1ef475d086..5e97d4c6ac 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -164,6 +164,9 @@ typedef struct pending_pick {
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
@@ -180,19 +183,19 @@ typedef struct pending_pick {
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
-static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+static void add_pending_pick(pending_pick **root,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pending_pick *pp = gpr_malloc(sizeof(*pp));
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
- pp->pollent = pollent;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata = initial_metadata;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata = pick_args->initial_metadata;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
&pp->wrapped_on_complete_arg);
@@ -295,7 +298,10 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
(const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
grpc_lb_policy_args args;
+ memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
+ args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) *
+ serverlist->num_servers);
args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
args.addresses->naddrs = serverlist->num_servers;
args.addresses->addrs =
@@ -310,6 +316,14 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
args.addresses->addrs[out_addrs_idx].len = sa_len;
++out_addrs_idx;
+ const size_t token_max_size =
+ GPR_ARRAY_SIZE(serverlist->servers[i]->load_balance_token);
+ serverlist->servers[i]->load_balance_token[token_max_size - 1] = '\0';
+ args.tokens[i].token_size =
+ strlen(serverlist->servers[i]->load_balance_token);
+ args.tokens[i].token = gpr_malloc(args.tokens[i].token_size);
+ memcpy(args.tokens[i].token, serverlist->servers[i]->load_balance_token,
+ args.tokens[i].token_size);
} else {
gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
host_ports[i]);
@@ -325,11 +339,14 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
gpr_free(host_ports);
gpr_free(args.addresses->addrs);
gpr_free(args.addresses);
+ gpr_free(args.tokens);
return rr;
}
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_error *error) {
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
GRPC_ERROR_REF(error);
glb_policy->rr_policy =
create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
@@ -359,9 +376,11 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
- pp->initial_metadata, pp->initial_metadata_flags,
- pp->target, &pp->wrapped_on_complete);
+ const grpc_lb_policy_pick_args pick_args = {
+ pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
+ pp->lb_token_mdelem_storage};
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
+ &pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
}
@@ -601,12 +620,22 @@ static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+
+ if (pick_args->lb_token_mdelem_storage == NULL) {
+ /* TODO(dgq): should this be an assert? If storage is NULL, something has
+ * gone very wrong at the client channel filter */
+ gpr_log(GPR_ERROR,
+ "No mdelem storage for the LB token. Load reporting won't work "
+ "without it. Failing");
+ *target = NULL;
+ grpc_exec_ctx_sched(exec_ctx, on_complete, GRPC_ERROR_NONE, NULL);
+ return 1;
+ }
+
gpr_mu_lock(&glb_policy->mu);
int r;
@@ -621,8 +650,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
glb_policy->wc_arg.wrapped_closure = on_complete;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
- r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
- initial_metadata, initial_metadata_flags, target,
+
+ r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
&glb_policy->wrapped_on_complete);
if (r != 0) {
/* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
@@ -637,10 +666,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
GRPC_ERROR_NONE, NULL);
}
} else {
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties);
- add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ add_pending_pick(&glb_policy->pending_picks, pick_args, target,
+ on_complete);
if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy);
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9decf70692..e1277b353f 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -199,9 +199,7 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -225,13 +223,13 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7bcf608ab9..8fda405fb8 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -66,6 +66,7 @@
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/static_metadata.h"
typedef struct round_robin_lb_policy round_robin_lb_policy;
@@ -76,15 +77,33 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
+
+ /* polling entity for the pick()'s async notification */
grpc_polling_entity *pollent;
+
+ /* the initial metadata for the pick. See grpc_lb_policy_pick() */
+ grpc_metadata_batch *initial_metadata;
+
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
+ /* bitmask passed to pick() and used for selective cancelling. See
+ * grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
+
+ /* output argument where to store the pick()ed connected subchannel, or NULL
+ * upon error. */
grpc_connected_subchannel **target;
+
+ /* to be invoked once the pick() has completed (regardless of success) */
grpc_closure *on_complete;
} pending_pick;
/** List of subchannels in a connectivity READY state */
typedef struct ready_list {
grpc_subchannel *subchannel;
+ /* references namesake entry in subchannel_data */
+ grpc_lb_policy_address_token *lb_token;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@@ -102,12 +121,19 @@ typedef struct {
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
+ /** the subchannel's target LB token */
+ grpc_lb_policy_address_token *lb_token;
} subchannel_data;
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
+ /** total number of addresses received at creation time */
+ size_t num_addresses;
+ /** load balancing tokens, one per incoming address */
+ grpc_lb_policy_address_token *lb_tokens;
+
/** all our subchannels */
size_t num_subchannels;
subchannel_data **subchannels;
@@ -166,16 +192,19 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
- p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
+ (void *)p->ready_list_last_pick,
+ (void *)p->ready_list_last_pick->subchannel);
}
}
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *sc) {
+ subchannel_data *sd) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = sc;
+ memset(new_elem, 0, sizeof(ready_list));
+ new_elem->subchannel = sd->subchannel;
+ new_elem->lb_token = sd->lb_token;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -189,7 +218,8 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
+ (void *)new_elem, (void *)sd->subchannel);
}
return new_elem;
}
@@ -217,7 +247,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
- node->subchannel);
+ (void *)node->subchannel);
}
node->next = NULL;
@@ -251,6 +281,13 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_free(elem);
elem = tmp;
}
+
+ if (p->lb_tokens != NULL) {
+ for (i = 0; i < p->num_addresses; i++) {
+ gpr_free(p->lb_tokens[i].token);
+ }
+ gpr_free(p->lb_tokens);
+ }
gpr_free(p);
}
@@ -337,7 +374,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
p->started_picking = 1;
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
p->num_subchannels);
}
@@ -360,10 +397,25 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
gpr_mu_unlock(&p->mu);
}
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static void initial_metadata_add_lb_token(
+ grpc_metadata_batch *initial_metadata,
+ grpc_linked_mdelem *lb_token_mdelem_storage,
+ grpc_lb_policy_address_token *lb_token) {
+ if (lb_token != NULL && lb_token->token_size > 0) {
+ GPR_ASSERT(lb_token->token != NULL);
+ grpc_mdstr *lb_token_mdstr =
+ grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size);
+ grpc_metadata_batch_add_tail(
+ initial_metadata, lb_token_mdelem_storage,
+ grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL,
+ lb_token_mdstr));
+ }
+}
+
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -371,28 +423,35 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
ready_list *selected;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
+ /* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ initial_metadata_add_lb_token(pick_args->initial_metadata,
+ pick_args->lb_token_mdelem_storage,
+ selected->lb_token);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
- selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
+ (void *)*target, (void *)selected);
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
return 1;
} else {
+ /* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
pp->on_complete = on_complete;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata = pick_args->initial_metadata;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -421,7 +480,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
+ sd->ready_list_node = add_connected_sc_locked(p, sd);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@@ -433,12 +492,16 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
+
+ initial_metadata_add_lb_token(pp->initial_metadata,
+ pp->lb_token_mdelem_storage,
+ selected->lb_token);
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- selected->subchannel, selected);
+ (void *)selected->subchannel, (void *)selected);
}
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
@@ -574,13 +637,21 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->num_addresses = args->addresses->naddrs;
+ if (args->tokens != NULL) {
+ /* we need to copy because args contents aren't owned */
+ p->lb_tokens =
+ gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses);
+ memcpy(p->lb_tokens, args->tokens,
+ sizeof(grpc_lb_policy_address_token) * p->num_addresses);
+ }
+
+ p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
@@ -595,12 +666,16 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ if (p->lb_tokens != NULL) {
+ sd->lb_token = &p->lb_tokens[i];
+ }
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
}
if (subchannel_idx == 0) {
+ /* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 25d8aca250..029c15014e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -46,617 +46,964 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
-// Global flag that gets set with GRPC_TRACE env variable
-int grpc_cronet_trace = 1;
+#define CRONET_LOG(...) \
+ do { \
+ if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
+ } while (0)
-// Cronet transport object
+/* TODO (makdharma): Hook up into the wider tracing mechanism */
+int grpc_cronet_trace = 0;
+
+enum e_op_result {
+ ACTION_TAKEN_WITH_CALLBACK,
+ ACTION_TAKEN_NO_CALLBACK,
+ NO_ACTION_POSSIBLE
+};
+
+enum e_op_id {
+ OP_SEND_INITIAL_METADATA = 0,
+ OP_SEND_MESSAGE,
+ OP_SEND_TRAILING_METADATA,
+ OP_RECV_MESSAGE,
+ OP_RECV_INITIAL_METADATA,
+ OP_RECV_TRAILING_METADATA,
+ OP_CANCEL_ERROR,
+ OP_ON_COMPLETE,
+ OP_FAILED,
+ OP_SUCCEEDED,
+ OP_CANCELED,
+ OP_RECV_MESSAGE_AND_ON_COMPLETE,
+ OP_READ_REQ_MADE,
+ OP_NUM_OPS
+};
+
+/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
+
+static void on_request_headers_sent(cronet_bidirectional_stream *);
+static void on_response_headers_received(
+ cronet_bidirectional_stream *,
+ const cronet_bidirectional_stream_header_array *, const char *);
+static void on_write_completed(cronet_bidirectional_stream *, const char *);
+static void on_read_completed(cronet_bidirectional_stream *, char *, int);
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *,
+ const cronet_bidirectional_stream_header_array *);
+static void on_succeeded(cronet_bidirectional_stream *);
+static void on_failed(cronet_bidirectional_stream *, int);
+static void on_canceled(cronet_bidirectional_stream *);
+static cronet_bidirectional_stream_callback cronet_callbacks = {
+ on_request_headers_sent,
+ on_response_headers_received,
+ on_read_completed,
+ on_write_completed,
+ on_response_trailers_received,
+ on_succeeded,
+ on_failed,
+ on_canceled};
+
+/* Cronet transport object */
struct grpc_cronet_transport {
grpc_transport base; /* must be first element in this structure */
cronet_engine *engine;
char *host;
};
-
typedef struct grpc_cronet_transport grpc_cronet_transport;
-enum send_state {
- CRONET_SEND_IDLE = 0,
- CRONET_REQ_STARTED,
- CRONET_SEND_HEADER,
- CRONET_WRITE,
- CRONET_WRITE_COMPLETED,
+/* TODO (makdharma): reorder structure for memory efficiency per
+ http://www.catb.org/esr/structure-packing/#_structure_reordering: */
+struct read_state {
+ /* vars to store data coming from server */
+ char *read_buffer;
+ bool length_field_received;
+ int received_bytes;
+ int remaining_bytes;
+ int length_field;
+ char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
+ char *payload_field;
+ bool read_stream_closed;
+
+ /* vars for holding data destined for the application */
+ struct grpc_slice_buffer_stream sbs;
+ gpr_slice_buffer read_slice_buffer;
+
+ /* vars for trailing metadata */
+ grpc_chttp2_incoming_metadata_buffer trailing_metadata;
+ bool trailing_metadata_valid;
+
+ /* vars for initial metadata */
+ grpc_chttp2_incoming_metadata_buffer initial_metadata;
};
-enum recv_state {
- CRONET_RECV_IDLE = 0,
- CRONET_RECV_READ_LENGTH,
- CRONET_RECV_READ_DATA,
- CRONET_RECV_CLOSED,
+struct write_state {
+ char *write_buffer;
};
-static const char *recv_state_name[] = {
- "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,",
- "CRONET_RECV_CLOSED"};
+/* track state of one stream op */
+struct op_state {
+ bool state_op_done[OP_NUM_OPS];
+ bool state_callback_received[OP_NUM_OPS];
+ /* data structure for storing data coming from server */
+ struct read_state rs;
+ /* data structure for storing data going to the server */
+ struct write_state ws;
+};
-// Enum that identifies calling function.
-enum e_caller {
- PERFORM_STREAM_OP,
- ON_READ_COMPLETE,
- ON_RESPONSE_HEADERS_RECEIVED,
- ON_RESPONSE_TRAILERS_RECEIVED
+struct op_and_state {
+ grpc_transport_stream_op op;
+ struct op_state state;
+ bool done;
+ struct stream_obj *s; /* Pointer back to the stream object */
+ struct op_and_state *next; /* next op_and_state in the linked list */
};
-enum callback_id {
- CB_SEND_INITIAL_METADATA = 0,
- CB_SEND_MESSAGE,
- CB_SEND_TRAILING_METADATA,
- CB_RECV_MESSAGE,
- CB_RECV_INITIAL_METADATA,
- CB_RECV_TRAILING_METADATA,
- CB_NUM_CALLBACKS
+struct op_storage {
+ int num_pending_ops;
+ struct op_and_state *head;
};
struct stream_obj {
- // we store received bytes here as they trickle in.
- gpr_slice_buffer write_slice_buffer;
+ struct op_and_state *oas;
+ grpc_transport_stream_op *curr_op;
+ grpc_cronet_transport curr_ct;
+ grpc_stream *curr_gs;
cronet_bidirectional_stream *cbs;
- gpr_slice slice;
- gpr_slice_buffer read_slice_buffer;
- struct grpc_slice_buffer_stream sbs;
- char *read_buffer;
- int remaining_read_bytes;
- int total_read_bytes;
-
- char *write_buffer;
- size_t write_buffer_size;
-
- // Hold the URL
- char *url;
-
- bool response_headers_received;
- bool read_requested;
- bool response_trailers_received;
- bool read_closed;
-
- // Recv message stuff
- grpc_byte_buffer **recv_message;
- // Initial metadata stuff
- grpc_metadata_batch *recv_initial_metadata;
- // Trailing metadata stuff
- grpc_metadata_batch *recv_trailing_metadata;
- grpc_chttp2_incoming_metadata_buffer imb;
-
- // This mutex protects receive state machine execution
- gpr_mu recv_mu;
- // we can queue up up to 2 callbacks for each OP
- grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
-
- // storage for header
- cronet_bidirectional_stream_header *headers;
- uint32_t num_headers;
cronet_bidirectional_stream_header_array header_array;
- // state tracking
- enum recv_state cronet_recv_state;
- enum send_state cronet_send_state;
-};
-typedef struct stream_obj stream_obj;
+ /* Stream level state. Some state will be tracked both at stream and stream_op
+ * level */
+ struct op_state state;
-static void next_send_step(stream_obj *s);
-static void next_recv_step(stream_obj *s, enum e_caller caller);
+ /* OP storage */
+ struct op_storage storage;
-static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset *pollset) {}
+ /* Mutex to protect storage */
+ gpr_mu mu;
+};
+typedef struct stream_obj stream_obj;
-static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
- grpc_transport *gt, grpc_stream *gs,
- grpc_pollset_set *pollset_set) {}
+static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
+ struct op_and_state *oas);
-static void enqueue_callbacks(grpc_closure *callback_list[]) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- if (callback_list[0]) {
- grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL);
- callback_list[0] = NULL;
- }
- if (callback_list[1]) {
- grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL);
- callback_list[1] = NULL;
+/*
+ Utility function to translate enum into string for printing
+*/
+static const char *op_result_string(enum e_op_result i) {
+ switch (i) {
+ case ACTION_TAKEN_WITH_CALLBACK:
+ return "ACTION_TAKEN_WITH_CALLBACK";
+ case ACTION_TAKEN_NO_CALLBACK:
+ return "ACTION_TAKEN_NO_CALLBACK";
+ case NO_ACTION_POSSIBLE:
+ return "NO_ACTION_POSSIBLE";
}
- grpc_exec_ctx_finish(&exec_ctx);
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
-static void on_canceled(cronet_bidirectional_stream *stream) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "on_canceled %p", stream);
+static const char *op_id_string(enum e_op_id i) {
+ switch (i) {
+ case OP_SEND_INITIAL_METADATA:
+ return "OP_SEND_INITIAL_METADATA";
+ case OP_SEND_MESSAGE:
+ return "OP_SEND_MESSAGE";
+ case OP_SEND_TRAILING_METADATA:
+ return "OP_SEND_TRAILING_METADATA";
+ case OP_RECV_MESSAGE:
+ return "OP_RECV_MESSAGE";
+ case OP_RECV_INITIAL_METADATA:
+ return "OP_RECV_INITIAL_METADATA";
+ case OP_RECV_TRAILING_METADATA:
+ return "OP_RECV_TRAILING_METADATA";
+ case OP_CANCEL_ERROR:
+ return "OP_CANCEL_ERROR";
+ case OP_ON_COMPLETE:
+ return "OP_ON_COMPLETE";
+ case OP_FAILED:
+ return "OP_FAILED";
+ case OP_SUCCEEDED:
+ return "OP_SUCCEEDED";
+ case OP_CANCELED:
+ return "OP_CANCELED";
+ case OP_RECV_MESSAGE_AND_ON_COMPLETE:
+ return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
+ case OP_READ_REQ_MADE:
+ return "OP_READ_REQ_MADE";
+ case OP_NUM_OPS:
+ return "OP_NUM_OPS";
}
+ return "UNKNOWN";
}
-static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
- }
+/*
+ Add a new stream op to op storage.
+*/
+static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
+ struct op_storage *storage = &s->storage;
+ /* add new op at the beginning of the linked list. The memory is freed
+ in remove_from_storage */
+ struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state));
+ memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op));
+ memset(&new_op->state, 0, sizeof(new_op->state));
+ new_op->s = s;
+ new_op->done = false;
+ gpr_mu_lock(&s->mu);
+ new_op->next = storage->head;
+ storage->head = new_op;
+ storage->num_pending_ops++;
+ CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
+ storage->num_pending_ops);
+ gpr_mu_unlock(&s->mu);
}
-static void on_succeeded(cronet_bidirectional_stream *stream) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "on_succeeded %p", stream);
+/*
+ Traverse the linked list and delete op and free memory
+*/
+static void remove_from_storage(struct stream_obj *s,
+ struct op_and_state *oas) {
+ struct op_and_state *curr;
+ if (s->storage.head == NULL || oas == NULL) {
+ return;
}
-}
-
-static void on_response_trailers_received(
- cronet_bidirectional_stream *stream,
- const cronet_bidirectional_stream_header_array *trailers) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: on_response_trailers_received");
+ if (s->storage.head == oas) {
+ s->storage.head = oas->next;
+ gpr_free(oas);
+ s->storage.num_pending_ops--;
+ CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
+ s->storage.num_pending_ops);
+ } else {
+ for (curr = s->storage.head; curr != NULL; curr = curr->next) {
+ if (curr->next == oas) {
+ curr->next = oas->next;
+ s->storage.num_pending_ops--;
+ CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
+ s->storage.num_pending_ops);
+ gpr_free(oas);
+ break;
+ } else if (curr->next == NULL) {
+ CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
+ }
+ }
}
- stream_obj *s = (stream_obj *)stream->annotation;
+}
- memset(&s->imb, 0, sizeof(s->imb));
- grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
- unsigned int i = 0;
- for (i = 0; i < trailers->count; i++) {
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->imb, grpc_mdelem_from_metadata_strings(
- grpc_mdstr_from_string(trailers->headers[i].key),
- grpc_mdstr_from_string(trailers->headers[i].value)));
+/*
+ Cycle through ops and try to take next action. Break when either
+ an action with callback is taken, or no action is possible.
+ This can be executed from the Cronet network thread via cronet callback
+ or on the application supplied thread via the perform_stream_op function.
+*/
+static void execute_from_storage(stream_obj *s) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_lock(&s->mu);
+ for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
+ CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
+ GPR_ASSERT(curr->done == 0);
+ enum e_op_result result = execute_stream_op(&exec_ctx, curr);
+ CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
+ op_result_string(result));
+ /* if this op is done, then remove it and free memory */
+ if (curr->done) {
+ struct op_and_state *next = curr->next;
+ remove_from_storage(s, curr);
+ curr = next;
+ }
+ /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
+ if (result == NO_ACTION_POSSIBLE) {
+ curr = curr->next;
+ } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
+ break;
+ }
}
- s->response_trailers_received = true;
- next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
+ gpr_mu_unlock(&s->mu);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void on_write_completed(cronet_bidirectional_stream *stream,
- const char *data) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "W: on_write_completed");
- }
+/*
+ Cronet callback
+*/
+static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+ CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
stream_obj *s = (stream_obj *)stream->annotation;
- enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
- s->cronet_send_state = CRONET_WRITE_COMPLETED;
- next_send_step(s);
+ cronet_bidirectional_stream_destroy(s->cbs);
+ s->state.state_callback_received[OP_FAILED] = true;
+ s->cbs = NULL;
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
+ }
+ if (s->state.ws.write_buffer) {
+ gpr_free(s->state.ws.write_buffer);
+ s->state.ws.write_buffer = NULL;
+ }
+ execute_from_storage(s);
}
-static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
- gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
- uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
- if (s->total_read_bytes > 0) {
- // Only copy if there is non-zero number of bytes
- memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
- gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+/*
+ Cronet callback
+*/
+static void on_canceled(cronet_bidirectional_stream *stream) {
+ CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
+ stream_obj *s = (stream_obj *)stream->annotation;
+ cronet_bidirectional_stream_destroy(s->cbs);
+ s->state.state_callback_received[OP_CANCELED] = true;
+ s->cbs = NULL;
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
}
- grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
- *s->recv_message = (grpc_byte_buffer *)&s->sbs;
+ if (s->state.ws.write_buffer) {
+ gpr_free(s->state.ws.write_buffer);
+ s->state.ws.write_buffer = NULL;
+ }
+ execute_from_storage(s);
}
-static int parse_grpc_header(const uint8_t *data) {
- const uint8_t *p = data + 1;
- int length = 0;
- length |= ((uint8_t)*p++) << 24;
- length |= ((uint8_t)*p++) << 16;
- length |= ((uint8_t)*p++) << 8;
- length |= ((uint8_t)*p++);
- return length;
+/*
+ Cronet callback
+*/
+static void on_succeeded(cronet_bidirectional_stream *stream) {
+ CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
+ stream_obj *s = (stream_obj *)stream->annotation;
+ cronet_bidirectional_stream_destroy(s->cbs);
+ s->state.state_callback_received[OP_SUCCEEDED] = true;
+ s->cbs = NULL;
+ execute_from_storage(s);
}
-static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
- int count) {
+/*
+ Cronet callback
+*/
+static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+ CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d",
- count, s->total_read_bytes, s->remaining_read_bytes);
- }
- if (count > 0) {
- GPR_ASSERT(s->recv_message);
- s->remaining_read_bytes -= count;
- next_recv_step(s, ON_READ_COMPLETE);
- } else {
- s->read_closed = true;
- next_recv_step(s, ON_READ_COMPLETE);
+ s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
+ s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
+ /* Free the memory allocated for headers */
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
}
+ execute_from_storage(s);
}
+/*
+ Cronet callback
+*/
static void on_response_headers_received(
cronet_bidirectional_stream *stream,
const cronet_bidirectional_stream_header_array *headers,
const char *negotiated_protocol) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: on_response_headers_received");
- }
+ CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
+ headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation;
- enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
- s->response_headers_received = true;
- next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
-}
-
-static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "W: on_request_headers_sent");
+ memset(&s->state.rs.initial_metadata, 0,
+ sizeof(s->state.rs.initial_metadata));
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
+ for (size_t i = 0; i < headers->count; i++) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->state.rs.initial_metadata,
+ grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(headers->headers[i].key),
+ grpc_mdstr_from_string(headers->headers[i].value)));
}
- stream_obj *s = (stream_obj *)stream->annotation;
- enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
- s->cronet_send_state = CRONET_SEND_HEADER;
- next_send_step(s);
+ s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
+ execute_from_storage(s);
}
-// Callback function pointers (invoked by cronet in response to events)
-static cronet_bidirectional_stream_callback callbacks = {
- on_request_headers_sent,
- on_response_headers_received,
- on_read_completed,
- on_write_completed,
- on_response_trailers_received,
- on_succeeded,
- on_failed,
- on_canceled};
-
-static void invoke_closing_callback(stream_obj *s) {
- grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
- s->recv_trailing_metadata);
- if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
- enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
+/*
+ Cronet callback
+*/
+static void on_write_completed(cronet_bidirectional_stream *stream,
+ const char *data) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
+ if (s->state.ws.write_buffer) {
+ gpr_free(s->state.ws.write_buffer);
+ s->state.ws.write_buffer = NULL;
}
+ s->state.state_callback_received[OP_SEND_MESSAGE] = true;
+ execute_from_storage(s);
}
-static void set_recv_state(stream_obj *s, enum recv_state state) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]);
+/*
+ Cronet callback
+*/
+static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+ int count) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
+ count);
+ s->state.state_callback_received[OP_RECV_MESSAGE] = true;
+ if (count > 0) {
+ s->state.rs.received_bytes += count;
+ s->state.rs.remaining_bytes -= count;
+ if (s->state.rs.remaining_bytes > 0) {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ s->state.state_op_done[OP_READ_REQ_MADE] = true;
+ cronet_bidirectional_stream_read(
+ s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
+ s->state.rs.remaining_bytes);
+ } else {
+ execute_from_storage(s);
+ }
+ } else {
+ s->state.rs.read_stream_closed = true;
+ execute_from_storage(s);
}
- s->cronet_recv_state = state;
}
-// This is invoked from perform_stream_op, and all on_xxxx callbacks.
-static void next_recv_step(stream_obj *s, enum e_caller caller) {
- gpr_mu_lock(&s->recv_mu);
- switch (s->cronet_recv_state) {
- case CRONET_RECV_IDLE:
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
- }
- if (caller == PERFORM_STREAM_OP ||
- caller == ON_RESPONSE_HEADERS_RECEIVED) {
- if (s->read_closed && s->response_trailers_received) {
- invoke_closing_callback(s);
- set_recv_state(s, CRONET_RECV_CLOSED);
- } else if (s->response_headers_received == true &&
- s->read_requested == true) {
- set_recv_state(s, CRONET_RECV_READ_LENGTH);
- s->total_read_bytes = s->remaining_read_bytes =
- GRPC_HEADER_SIZE_IN_BYTES;
- GPR_ASSERT(s->read_buffer);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
- }
- cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
- s->remaining_read_bytes);
- }
- }
- break;
- case CRONET_RECV_READ_LENGTH:
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH");
- }
- if (caller == ON_READ_COMPLETE) {
- if (s->read_closed) {
- invoke_closing_callback(s);
- enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
- set_recv_state(s, CRONET_RECV_CLOSED);
- } else {
- GPR_ASSERT(s->remaining_read_bytes == 0);
- set_recv_state(s, CRONET_RECV_READ_DATA);
- s->total_read_bytes = s->remaining_read_bytes =
- parse_grpc_header((const uint8_t *)s->read_buffer);
- s->read_buffer =
- gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
- GPR_ASSERT(s->read_buffer);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
- }
- if (s->remaining_read_bytes > 0) {
- cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
- s->remaining_read_bytes);
- } else {
- // Calling the closing callback directly since this is a 0 byte read
- // for an empty message.
- process_recv_message(s, NULL);
- enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
- invoke_closing_callback(s);
- set_recv_state(s, CRONET_RECV_CLOSED);
- }
- }
- }
- break;
- case CRONET_RECV_READ_DATA:
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
- }
- if (caller == ON_READ_COMPLETE) {
- if (s->remaining_read_bytes > 0) {
- int offset = s->total_read_bytes - s->remaining_read_bytes;
- GPR_ASSERT(s->read_buffer);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
- }
- cronet_bidirectional_stream_read(
- s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes);
- } else {
- gpr_slice_buffer_init(&s->read_slice_buffer);
- uint8_t *p = (uint8_t *)s->read_buffer;
- process_recv_message(s, p);
- set_recv_state(s, CRONET_RECV_IDLE);
- enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
- }
- }
- break;
- case CRONET_RECV_CLOSED:
- break;
- default:
- GPR_ASSERT(0); // Should not reach here
- break;
+/*
+ Cronet callback
+*/
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *trailers) {
+ CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
+ trailers);
+ stream_obj *s = (stream_obj *)stream->annotation;
+ memset(&s->state.rs.trailing_metadata, 0,
+ sizeof(s->state.rs.trailing_metadata));
+ s->state.rs.trailing_metadata_valid = false;
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata);
+ for (size_t i = 0; i < trailers->count; i++) {
+ CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
+ trailers->headers[i].value);
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->state.rs.trailing_metadata,
+ grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
+ s->state.rs.trailing_metadata_valid = true;
}
- gpr_mu_unlock(&s->recv_mu);
+ s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
+ execute_from_storage(s);
}
-// This function takes the data from s->write_slice_buffer and assembles into
-// a contiguous byte stream with 5 byte gRPC header prepended.
-static void create_grpc_frame(stream_obj *s) {
- gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer);
- uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
+/*
+ Utility function that takes the data from s->write_slice_buffer and assembles
+ into a contiguous byte stream with 5 byte gRPC header prepended.
+*/
+static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
+ char **pp_write_buffer,
+ size_t *p_write_buffer_size) {
+ gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer);
size_t length = GPR_SLICE_LENGTH(slice);
- s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
- s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
- uint8_t *p = (uint8_t *)s->write_buffer;
- // Append 5 byte header
+ *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
+ /* This is freed in the on_write_completed callback */
+ char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES);
+ *pp_write_buffer = write_buffer;
+ uint8_t *p = (uint8_t *)write_buffer;
+ /* Append 5 byte header */
*p++ = 0;
*p++ = (uint8_t)(length >> 24);
*p++ = (uint8_t)(length >> 16);
*p++ = (uint8_t)(length >> 8);
*p++ = (uint8_t)(length);
- // append actual data
- memcpy(p, raw_data, length);
+ /* append actual data */
+ memcpy(p, GPR_SLICE_START_PTR(slice), length);
}
-static void do_write(stream_obj *s) {
- gpr_slice_buffer *sb = &s->write_slice_buffer;
- GPR_ASSERT(sb->count <= 1);
- if (sb->count > 0) {
- create_grpc_frame(s);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
- }
- cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
- (int)s->write_buffer_size, false);
- }
-}
-
-//
-static void next_send_step(stream_obj *s) {
- switch (s->cronet_send_state) {
- case CRONET_SEND_IDLE:
- GPR_ASSERT(
- s->cbs); // cronet_bidirectional_stream is not initialized yet.
- s->cronet_send_state = CRONET_REQ_STARTED;
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
- }
- cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
- &s->header_array, false);
- // we no longer need the memory that was allocated earlier.
- gpr_free(s->header_array.headers);
- break;
- case CRONET_SEND_HEADER:
- do_write(s);
- s->cronet_send_state = CRONET_WRITE;
- break;
- case CRONET_WRITE_COMPLETED:
- do_write(s);
- break;
- default:
- GPR_ASSERT(0);
- break;
- }
-}
-
-static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
- const char *host,
- stream_obj *s) {
+/*
+ Convert metadata in a format that Cronet can consume
+*/
+static void convert_metadata_to_cronet_headers(
+ grpc_linked_mdelem *head, const char *host, char **pp_url,
+ cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) {
grpc_linked_mdelem *curr = head;
- // Walk the linked list and get number of header fields
- uint32_t num_headers_available = 0;
+ /* Walk the linked list and get number of header fields */
+ size_t num_headers_available = 0;
while (curr != NULL) {
curr = curr->next;
num_headers_available++;
}
- // Allocate enough memory
- s->headers = (cronet_bidirectional_stream_header *)gpr_malloc(
- sizeof(cronet_bidirectional_stream_header) * num_headers_available);
-
- // Walk the linked list again, this time copying the header fields.
- // s->num_headers
- // can be less than num_headers_available, as some headers are not used for
- // cronet
+ /* Allocate enough memory. It is freed in the on_request_headers_sent callback
+ */
+ cronet_bidirectional_stream_header *headers =
+ (cronet_bidirectional_stream_header *)gpr_malloc(
+ sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+ *pp_headers = headers;
+
+ /* Walk the linked list again, this time copying the header fields.
+ s->num_headers can be less than num_headers_available, as some headers
+ are not used for cronet.
+ TODO (makdharma): Eliminate need to traverse the LL second time for perf.
+ */
curr = head;
- s->num_headers = 0;
- while (s->num_headers < num_headers_available) {
+ size_t num_headers = 0;
+ while (num_headers < num_headers_available) {
grpc_mdelem *mdelem = curr->md;
curr = curr->next;
const char *key = grpc_mdstr_as_c_string(mdelem->key);
const char *value = grpc_mdstr_as_c_string(mdelem->value);
- if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
- strcmp(key, ":authority") == 0) {
- // Cronet populates these fields on its own.
+ if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME ||
+ mdelem->key == GRPC_MDSTR_AUTHORITY) {
+ /* Cronet populates these fields on its own */
continue;
}
- if (strcmp(key, ":path") == 0) {
- // Create URL by appending :path value to the hostname
- gpr_asprintf(&s->url, "https://%s%s", host, value);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
- }
+ if (mdelem->key == GRPC_MDSTR_PATH) {
+ /* Create URL by appending :path value to the hostname */
+ gpr_asprintf(pp_url, "https://%s%s", host, value);
continue;
}
- s->headers[s->num_headers].key = key;
- s->headers[s->num_headers].value = value;
- s->num_headers++;
+ CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
+ headers[num_headers].key = key;
+ headers[num_headers].value = value;
+ num_headers++;
if (curr == NULL) {
break;
}
}
+ *p_num_headers = (size_t)num_headers;
}
-static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_transport_stream_op *op) {
- grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
- GPR_ASSERT(ct->engine);
- stream_obj *s = (stream_obj *)gs;
- if (op->recv_trailing_metadata) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG,
- "perform_stream_op - recv_trailing_metadata: on_complete=%p",
- op->on_complete);
+static int parse_grpc_header(const uint8_t *data) {
+ const uint8_t *p = data + 1;
+ int length = 0;
+ length |= ((uint8_t)*p++) << 24;
+ length |= ((uint8_t)*p++) << 16;
+ length |= ((uint8_t)*p++) << 8;
+ length |= ((uint8_t)*p++);
+ return length;
+}
+
+/*
+ Op Execution: Decide if one of the actions contained in the stream op can be
+ executed. This is the heart of the state machine.
+*/
+static bool op_can_be_run(grpc_transport_stream_op *curr_op,
+ struct op_state *stream_state,
+ struct op_state *op_state, enum e_op_id op_id) {
+ bool result = true;
+ /* When call is canceled, every op can be run, except under following
+ conditions
+ */
+ bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ stream_state->state_callback_received[OP_FAILED];
+ if (is_canceled_of_failed) {
+ if (op_id == OP_SEND_INITIAL_METADATA) result = false;
+ if (op_id == OP_SEND_MESSAGE) result = false;
+ if (op_id == OP_SEND_TRAILING_METADATA) result = false;
+ if (op_id == OP_CANCEL_ERROR) result = false;
+ /* already executed */
+ if (op_id == OP_RECV_INITIAL_METADATA &&
+ stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
+ result = false;
+ if (op_id == OP_RECV_MESSAGE &&
+ stream_state->state_op_done[OP_RECV_MESSAGE])
+ result = false;
+ if (op_id == OP_RECV_TRAILING_METADATA &&
+ stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
+ result = false;
+ } else if (op_id == OP_SEND_INITIAL_METADATA) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
+ } else if (op_id == OP_RECV_INITIAL_METADATA) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
+ /* we haven't sent headers yet. */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ /* we haven't received headers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
+ result = false;
+ } else if (op_id == OP_SEND_MESSAGE) {
+ /* already executed (note we're checking op specific state, not stream
+ state) */
+ if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
+ /* we haven't sent headers yet. */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ } else if (op_id == OP_RECV_MESSAGE) {
+ /* already executed */
+ if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
+ /* we haven't received headers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
+ result = false;
+ } else if (op_id == OP_RECV_TRAILING_METADATA) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
+ /* we have asked for but haven't received message yet. */
+ else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
+ !stream_state->state_op_done[OP_RECV_MESSAGE])
+ result = false;
+ /* we haven't received trailers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
+ result = false;
+ /* we haven't received on_succeeded yet. */
+ else if (!stream_state->state_callback_received[OP_SUCCEEDED])
+ result = false;
+ } else if (op_id == OP_SEND_TRAILING_METADATA) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
+ /* we haven't sent initial metadata yet */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ /* we haven't sent message yet */
+ else if (curr_op->send_message &&
+ !stream_state->state_op_done[OP_SEND_MESSAGE])
+ result = false;
+ /* we haven't got on_write_completed for the send yet */
+ else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ result = false;
+ } else if (op_id == OP_CANCEL_ERROR) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
+ } else if (op_id == OP_ON_COMPLETE) {
+ /* already executed (note we're checking op specific state, not stream
+ state) */
+ if (op_state->state_op_done[OP_ON_COMPLETE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
}
- s->recv_trailing_metadata = op->recv_trailing_metadata;
- GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
- s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
- }
- if (op->recv_message) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p",
- op->on_complete);
+ /* Check if every op that was asked for is done. */
+ else if (curr_op->send_initial_metadata &&
+ !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->send_message &&
+ !op_state->state_op_done[OP_SEND_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->send_message &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->send_trailing_metadata &&
+ !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->recv_initial_metadata &&
+ !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->recv_message &&
+ !stream_state->state_op_done[OP_RECV_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->recv_trailing_metadata) {
+ /* We aren't done with trailing metadata yet */
+ if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ /* We've asked for actual message in an earlier op, and it hasn't been
+ delivered yet. */
+ else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
+ /* If this op is not the one asking for read, (which means some earlier
+ op has asked), and the read hasn't been delivered. */
+ if (!curr_op->recv_message &&
+ !stream_state->state_callback_received[OP_SUCCEEDED]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ }
}
- s->recv_message = (grpc_byte_buffer **)op->recv_message;
- GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
- GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
- s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
- s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
- s->read_requested = true;
- next_recv_step(s, PERFORM_STREAM_OP);
+ /* We should see at least one on_write_completed for the trailers that we
+ sent */
+ else if (curr_op->send_trailing_metadata &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ result = false;
}
- if (op->recv_initial_metadata) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p",
- op->on_complete);
+ CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
+ result ? "YES" : "NO");
+ return result;
+}
+
+/*
+ TODO (makdharma): Break down this function in smaller chunks for readability.
+*/
+static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
+ struct op_and_state *oas) {
+ grpc_transport_stream_op *stream_op = &oas->op;
+ struct stream_obj *s = oas->s;
+ struct op_state *stream_state = &s->state;
+ enum e_op_result result = NO_ACTION_POSSIBLE;
+ if (stream_op->send_initial_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_INITIAL_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
+ /* This OP is the beginning. Reset various states */
+ memset(&s->header_array, 0, sizeof(s->header_array));
+ memset(&stream_state->rs, 0, sizeof(stream_state->rs));
+ memset(&stream_state->ws, 0, sizeof(stream_state->ws));
+ memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done));
+ memset(stream_state->state_callback_received, 0,
+ sizeof(stream_state->state_callback_received));
+ /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
+ * on_failed */
+ GPR_ASSERT(s->cbs == NULL);
+ s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
+ &cronet_callbacks);
+ CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
+ char *url;
+ s->header_array.headers = NULL;
+ convert_metadata_to_cronet_headers(
+ stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
+ &s->header_array.headers, &s->header_array.count);
+ s->header_array.capacity = s->header_array.count;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs,
+ url);
+ cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array,
+ false);
+ stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else if (stream_op->recv_initial_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_INITIAL_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
+ if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, NULL);
+ } else {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED, NULL);
}
- s->recv_initial_metadata = op->recv_initial_metadata;
- GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
- GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
- s->callback_list[CB_RECV_INITIAL_METADATA][0] =
- op->recv_initial_metadata_ready;
- s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
- }
- if (op->send_initial_metadata) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG,
- "perform_stream_op - send_initial_metadata: on_complete=%p",
- op->on_complete);
+ stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ } else if (stream_op->send_message &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_MESSAGE)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
+ gpr_slice_buffer write_slice_buffer;
+ gpr_slice slice;
+ gpr_slice_buffer_init(&write_slice_buffer);
+ grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
+ stream_op->send_message->length, NULL);
+ /* Check that compression flag is OFF. We don't support compression yet. */
+ if (stream_op->send_message->flags != 0) {
+ gpr_log(GPR_ERROR, "Compression is not supported");
+ GPR_ASSERT(stream_op->send_message->flags == 0);
}
- s->num_headers = 0;
- convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
- ct->host, s);
- s->header_array.count = s->num_headers;
- s->header_array.capacity = s->num_headers;
- s->header_array.headers = s->headers;
- GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
- s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
- }
- if (op->send_message) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p",
- op->on_complete);
+ gpr_slice_buffer_add(&write_slice_buffer, slice);
+ if (write_slice_buffer.count != 1) {
+ /* Empty request not handled yet */
+ gpr_log(GPR_ERROR, "Empty request is not supported");
+ GPR_ASSERT(write_slice_buffer.count == 1);
+ }
+ if (write_slice_buffer.count > 0) {
+ size_t write_buffer_size;
+ create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size);
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
+ s->cbs, stream_state->ws.write_buffer);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
+ (int)write_buffer_size, false);
+ result = ACTION_TAKEN_WITH_CALLBACK;
}
- grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
- op->send_message->length, NULL);
- // Check that compression flag is not ON. We don't support compression yet.
- // TODO (makdharma): add compression support
- GPR_ASSERT(op->send_message->flags == 0);
- gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
- if (s->cbs == NULL) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
+ stream_state->state_op_done[OP_SEND_MESSAGE] = true;
+ oas->state.state_op_done[OP_SEND_MESSAGE] = true;
+ } else if (stream_op->recv_message &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_MESSAGE)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_CANCELLED, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ } else if (stream_state->rs.read_stream_closed == true) {
+ /* No more data will be received */
+ CRONET_LOG(GPR_DEBUG, "read stream closed");
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ } else if (stream_state->rs.length_field_received == false) {
+ if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
+ stream_state->rs.remaining_bytes == 0) {
+ /* Start a read operation for data */
+ stream_state->rs.length_field_received = true;
+ stream_state->rs.length_field = stream_state->rs.remaining_bytes =
+ parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer);
+ CRONET_LOG(GPR_DEBUG, "length field = %d",
+ stream_state->rs.length_field);
+ if (stream_state->rs.length_field > 0) {
+ stream_state->rs.read_buffer =
+ gpr_malloc((size_t)stream_state->rs.length_field);
+ GPR_ASSERT(stream_state->rs.read_buffer);
+ stream_state->rs.remaining_bytes = stream_state->rs.length_field;
+ stream_state->rs.received_bytes = 0;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ stream_state->state_op_done[OP_READ_REQ_MADE] =
+ true; /* Indicates that at least one read request has been made */
+ cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ stream_state->rs.remaining_bytes = 0;
+ CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
+ gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
+ grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
+ &stream_state->rs.read_slice_buffer, 0);
+ *((grpc_byte_buffer **)stream_op->recv_message) =
+ (grpc_byte_buffer *)&stream_state->rs.sbs;
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ }
+ } else if (stream_state->rs.remaining_bytes == 0) {
+ /* Start a read operation for first 5 bytes (GRPC header) */
+ stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
+ stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
+ stream_state->rs.received_bytes = 0;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ stream_state->state_op_done[OP_READ_REQ_MADE] =
+ true; /* Indicates that at least one read request has been made */
+ cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
}
- s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
- GPR_ASSERT(s->cbs);
- s->read_closed = false;
- s->response_trailers_received = false;
- s->response_headers_received = false;
- s->cronet_send_state = CRONET_SEND_IDLE;
- s->cronet_recv_state = CRONET_RECV_IDLE;
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else if (stream_state->rs.remaining_bytes == 0) {
+ CRONET_LOG(GPR_DEBUG, "read operation complete");
+ gpr_slice read_data_slice =
+ gpr_slice_malloc((uint32_t)stream_state->rs.length_field);
+ uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ memcpy(dst_p, stream_state->rs.read_buffer,
+ (size_t)stream_state->rs.length_field);
+ gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
+ gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer,
+ read_data_slice);
+ grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
+ &stream_state->rs.read_slice_buffer, 0);
+ *((grpc_byte_buffer **)stream_op->recv_message) =
+ (grpc_byte_buffer *)&stream_state->rs.sbs;
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ /* Clear read state of the stream, so next read op (if it were to come)
+ * will work */
+ stream_state->rs.received_bytes = stream_state->rs.remaining_bytes =
+ stream_state->rs.length_field_received = 0;
+ result = ACTION_TAKEN_NO_CALLBACK;
}
- GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
- s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
- next_send_step(s);
- }
- if (op->send_trailing_metadata) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG,
- "perform_stream_op - send_trailing_metadata: on_complete=%p",
- op->on_complete);
+ } else if (stream_op->recv_trailing_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_TRAILING_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
+ if (oas->s->state.rs.trailing_metadata_valid) {
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ &oas->s->state.rs.trailing_metadata,
+ stream_op->recv_trailing_metadata);
+ stream_state->rs.trailing_metadata_valid = false;
}
- GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
- s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
+ stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ } else if (stream_op->send_trailing_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_TRAILING_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else if (stream_op->cancel_error &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_CANCEL_ERROR)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
+ CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
- // Send an "empty" write to the far end to signal that we're done.
- // This will induce the server to send down trailers.
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
- }
- cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
- } else {
- // We never created a stream. This was probably an empty request.
- invoke_closing_callback(s);
+ cronet_bidirectional_stream_cancel(s->cbs);
}
+ stream_state->state_op_done[OP_CANCEL_ERROR] = true;
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else if (stream_op->on_complete &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_ON_COMPLETE)) {
+ /* All actions in this stream_op are complete. Call the on_complete callback
+ */
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
+ grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
+ NULL);
+ oas->state.state_op_done[OP_ON_COMPLETE] = true;
+ oas->done = true;
+ /* reset any send message state, only if this ON_COMPLETE is about a send.
+ */
+ if (stream_op->send_message) {
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ stream_state->state_op_done[OP_SEND_MESSAGE] = false;
+ }
+ result = ACTION_TAKEN_NO_CALLBACK;
+ /* If this is the on_complete callback being called for a received message -
+ make a note */
+ if (stream_op->recv_message)
+ stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
+ } else {
+ result = NO_ACTION_POSSIBLE;
}
+ return result;
}
+/*
+ Functions used by upper layers to access transport functionality.
+*/
+
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
stream_obj *s = (stream_obj *)gs;
- memset(s->callback_list, 0, sizeof(s->callback_list));
+ memset(&s->storage, 0, sizeof(s->storage));
+ s->storage.head = NULL;
+ memset(&s->state, 0, sizeof(s->state));
+ s->curr_op = NULL;
s->cbs = NULL;
- gpr_mu_init(&s->recv_mu);
- s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
- s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
- gpr_slice_buffer_init(&s->write_slice_buffer);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_transport - init_stream");
- }
+ memset(&s->header_array, 0, sizeof(s->header_array));
+ memset(&s->state.rs, 0, sizeof(s->state.rs));
+ memset(&s->state.ws, 0, sizeof(s->state.ws));
+ memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
+ memset(s->state.state_callback_received, 0,
+ sizeof(s->state.state_callback_received));
+ gpr_mu_init(&s->mu);
return 0;
}
-static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "Destroy stream");
- }
+static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {}
+
+static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
+ grpc_transport *gt, grpc_stream *gs,
+ grpc_pollset_set *pollset_set) {}
+
+static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_transport_stream_op *op) {
+ CRONET_LOG(GPR_DEBUG, "perform_stream_op");
stream_obj *s = (stream_obj *)gs;
- s->cbs = NULL;
- gpr_free(s->read_buffer);
- gpr_free(s->write_buffer);
- gpr_free(s->url);
- gpr_mu_destroy(&s->recv_mu);
- if (and_free_memory) {
- gpr_free(and_free_memory);
- }
+ s->curr_gs = gs;
+ memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport));
+ add_to_storage(s, op);
+ execute_from_storage(s);
}
-static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
- grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
- gpr_free(ct->host);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "Destroy transport");
- }
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, void *and_free_memory) {}
+
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}
+
+static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ return NULL;
}
+static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_transport_op *op) {}
+
const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
"cronet_http",
init_stream,
set_pollset_do_nothing,
set_pollset_set_do_nothing,
perform_stream_op,
- NULL,
+ perform_op,
destroy_stream,
destroy_transport,
- NULL};
+ get_peer};
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 6a63c4d1d1..02bcbaa10f 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -42,6 +42,7 @@
#include <assert.h>
#include <errno.h>
#include <poll.h>
+#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>