diff options
author | Craig Tiller <ctiller@google.com> | 2016-03-07 20:08:24 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-03-07 20:08:24 -0800 |
commit | 41cb61de8b53d59247ff0007aadac0b7f314988f (patch) | |
tree | 31477e895e6f69317648d380f050ec73ab80a3a0 /src/core | |
parent | bd137fb1a7956cfccc1a814420fbcfc83504750c (diff) | |
parent | dba948569a90fa12b4d024555e559c1abfc75607 (diff) |
Merge github.com:grpc/grpc into filter-selection
Diffstat (limited to 'src/core')
43 files changed, 471 insertions, 384 deletions
diff --git a/src/core/census/context.c b/src/core/census/context.c index e60330de64..89b8ee0b39 100644 --- a/src/core/census/context.c +++ b/src/core/census/context.c @@ -60,10 +60,10 @@ // limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS. // * Keep all tag information (keys/values/flags) in a single memory buffer, // that can be directly copied to the wire. -// * Binary tags share the same structure as, but are encoded separately from, -// non-binary tags. This is primarily because non-binary tags are far more -// likely to be repeated across multiple RPC calls, so are more efficiently -// cached and compressed in any metadata schemes. + +// min and max valid chars in tag keys and values. All printable ASCII is OK. +#define MIN_VALID_TAG_CHAR 32 // ' ' +#define MAX_VALID_TAG_CHAR 126 // '~' // Structure representing a set of tags. Essentially a count of number of tags // present, and pointer to a chunk of memory that contains the per-tag details. @@ -77,7 +77,7 @@ struct tag_set { char *kvm; // key/value memory. Consists of repeated entries of: // Offset Size Description // 0 1 Key length, including trailing 0. (K) - // 1 1 Value length. (V) + // 1 1 Value length, including trailing 0 (V) // 2 1 Flags // 3 K Key bytes // 3 + K V Value bytes @@ -108,19 +108,36 @@ struct raw_tag { #define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED #define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED) -// Primary (external) representation of a context. Composed of 3 underlying -// tag_set structs, one for each of the binary/printable propagated tags, and -// one for everything else. This is to efficiently support tag -// encoding/decoding. +// Primary representation of a context. Composed of 2 underlying tag_set +// structs, one each for propagated and local (non-propagated) tags. This is +// to efficiently support tag encoding/decoding. +// TODO(aveitch): need to add tracing id's/structure. struct census_context { - struct tag_set tags[3]; + struct tag_set tags[2]; census_context_status status; }; // Indices into the tags member of census_context #define PROPAGATED_TAGS 0 -#define PROPAGATED_BINARY_TAGS 1 -#define LOCAL_TAGS 2 +#define LOCAL_TAGS 1 + +// Validate (check all characters are in range and size is less than limit) a +// key or value string. Returns 0 if the string is invalid, or the length +// (including terminator) if valid. +static size_t validate_tag(const char *kv) { + size_t len = 1; + char ch; + while ((ch = *kv++) != 0) { + if (ch < MIN_VALID_TAG_CHAR || ch > MAX_VALID_TAG_CHAR) { + return 0; + } + len++; + } + if (len > CENSUS_MAX_TAG_KV_LEN) { + return 0; + } + return len; +} // Extract a raw tag given a pointer (raw) to the tag header. Allow for some // extra bytes in the tag header (see encode/decode functions for usage: this @@ -166,9 +183,7 @@ static bool context_delete_tag(census_context *context, const census_tag *tag, size_t key_len) { return ( tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) || - tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len) || - tag_set_delete_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag->key, - key_len)); + tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len)); } // Add a tag to a tag_set. Return true on success, false if the tag could @@ -176,11 +191,11 @@ static bool context_delete_tag(census_context *context, const census_tag *tag, // not be called if the tag may already exist (in a non-deleted state) in // the tag_set, as that would result in two tags with the same key. static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, - size_t key_len) { + size_t key_len, size_t value_len) { if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) { return false; } - const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE; + const size_t tag_size = key_len + value_len + TAG_HEADER_SIZE; if (tags->kvm_used + tag_size > tags->kvm_size) { // allocate new memory if needed tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE; @@ -191,13 +206,12 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, } char *kvp = tags->kvm + tags->kvm_used; *kvp++ = (char)key_len; - *kvp++ = (char)tag->value_len; + *kvp++ = (char)value_len; // ensure reserved flags are not used. - *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS | - CENSUS_TAG_BINARY)); + *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS)); memcpy(kvp, tag->key, key_len); kvp += key_len; - memcpy(kvp, tag->value, tag->value_len); + memcpy(kvp, tag->value, value_len); tags->kvm_used += tag_size; tags->ntags++; tags->ntags_alloc++; @@ -207,30 +221,20 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, // Add/modify/delete a tag to/in a context. Caller must validate that tag key // etc. are valid. static void context_modify_tag(census_context *context, const census_tag *tag, - size_t key_len) { + size_t key_len, size_t value_len) { // First delete the tag if it is already present. bool deleted = context_delete_tag(context, tag, key_len); - // Determine if we need to add it back. - bool call_add = tag->value != NULL && tag->value_len != 0; bool added = false; - if (call_add) { - if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { - if (CENSUS_TAG_IS_BINARY(tag->flags)) { - added = tag_set_add_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag, - key_len); - } else { - added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len); - } - } else { - added = tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len); - } + if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { + added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len, + value_len); + } else { + added = + tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len, value_len); } + if (deleted) { - if (call_add) { - context->status.n_modified_tags++; - } else { - context->status.n_deleted_tags++; - } + context->status.n_modified_tags++; } else { if (added) { context->status.n_added_tags++; @@ -292,8 +296,6 @@ census_context *census_context_create(const census_context *base, memset(context, 0, sizeof(census_context)); } else { tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]); - tag_set_copy(&context->tags[PROPAGATED_BINARY_TAGS], - &base->tags[PROPAGATED_BINARY_TAGS]); tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]); memset(&context->status, 0, sizeof(context->status)); } @@ -301,22 +303,29 @@ census_context *census_context_create(const census_context *base, // the context to add/replace/delete as required. for (int i = 0; i < ntags; i++) { const census_tag *tag = &tags[i]; - size_t key_len = strlen(tag->key) + 1; - // ignore the tag if it is too long/short. - if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN && - tag->value_len <= CENSUS_MAX_TAG_KV_LEN) { - context_modify_tag(context, tag, key_len); - } else { + size_t key_len = validate_tag(tag->key); + // ignore the tag if it is invalid or too short. + if (key_len <= 1) { context->status.n_invalid_tags++; + } else { + if (tag->value != NULL) { + size_t value_len = validate_tag(tag->value); + if (value_len != 0) { + context_modify_tag(context, tag, key_len, value_len); + } else { + context->status.n_invalid_tags++; + } + } else { + if (context_delete_tag(context, tag, key_len)) { + context->status.n_deleted_tags++; + } + } } } // Remove any deleted tags, update status if needed, and return. tag_set_flatten(&context->tags[PROPAGATED_TAGS]); - tag_set_flatten(&context->tags[PROPAGATED_BINARY_TAGS]); tag_set_flatten(&context->tags[LOCAL_TAGS]); context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; - context->status.n_propagated_binary_tags = - context->tags[PROPAGATED_BINARY_TAGS].ntags; context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags; if (status) { *status = &context->status; @@ -331,7 +340,6 @@ const census_context_status *census_context_get_status( void census_context_destroy(census_context *context) { gpr_free(context->tags[PROPAGATED_TAGS].kvm); - gpr_free(context->tags[PROPAGATED_BINARY_TAGS].kvm); gpr_free(context->tags[LOCAL_TAGS].kvm); gpr_free(context); } @@ -343,9 +351,6 @@ void census_context_initialize_iterator(const census_context *context, if (context->tags[PROPAGATED_TAGS].ntags != 0) { iterator->base = PROPAGATED_TAGS; iterator->kvm = context->tags[PROPAGATED_TAGS].kvm; - } else if (context->tags[PROPAGATED_BINARY_TAGS].ntags != 0) { - iterator->base = PROPAGATED_BINARY_TAGS; - iterator->kvm = context->tags[PROPAGATED_BINARY_TAGS].kvm; } else if (context->tags[LOCAL_TAGS].ntags != 0) { iterator->base = LOCAL_TAGS; iterator->kvm = context->tags[LOCAL_TAGS].kvm; @@ -363,7 +368,6 @@ int census_context_next_tag(census_context_iterator *iterator, iterator->kvm = decode_tag(&raw, iterator->kvm, 0); tag->key = raw.key; tag->value = raw.value; - tag->value_len = raw.value_len; tag->flags = raw.flags; if (++iterator->index == iterator->context->tags[iterator->base].ntags) { do { @@ -388,7 +392,6 @@ static bool tag_set_get_tag(const struct tag_set *tags, const char *key, if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) { tag->key = raw.key; tag->value = raw.value; - tag->value_len = raw.value_len; tag->flags = raw.flags; return true; } @@ -403,8 +406,6 @@ int census_context_get_tag(const census_context *context, const char *key, return 0; } if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) || - tag_set_get_tag(&context->tags[PROPAGATED_BINARY_TAGS], key, key_len, - tag) || tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) { return 1; } @@ -447,21 +448,9 @@ static size_t tag_set_encode(const struct tag_set *tags, char *buffer, return ENCODED_HEADER_SIZE + tags->kvm_used; } -char *census_context_encode(const census_context *context, char *buffer, - size_t buf_size, size_t *print_buf_size, - size_t *bin_buf_size) { - *print_buf_size = - tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size); - if (*print_buf_size == 0) { - return NULL; - } - char *b_buffer = buffer + *print_buf_size; - *bin_buf_size = tag_set_encode(&context->tags[PROPAGATED_BINARY_TAGS], - b_buffer, buf_size - *print_buf_size); - if (*bin_buf_size == 0) { - return NULL; - } - return b_buffer; +size_t census_context_encode(const census_context *context, char *buffer, + size_t buf_size) { + return tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size); } // Decode a tag set. @@ -506,8 +495,7 @@ static void tag_set_decode(struct tag_set *tags, const char *buffer, } } -census_context *census_context_decode(const char *buffer, size_t size, - const char *bin_buffer, size_t bin_size) { +census_context *census_context_decode(const char *buffer, size_t size) { census_context *context = gpr_malloc(sizeof(census_context)); memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set)); if (buffer == NULL) { @@ -515,16 +503,7 @@ census_context *census_context_decode(const char *buffer, size_t size, } else { tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size); } - if (bin_buffer == NULL) { - memset(&context->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set)); - } else { - tag_set_decode(&context->tags[PROPAGATED_BINARY_TAGS], bin_buffer, - bin_size); - } memset(&context->status, 0, sizeof(context->status)); context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; - context->status.n_propagated_binary_tags = - context->tags[PROPAGATED_BINARY_TAGS].ntags; - // TODO(aveitch): check that BINARY flag is correct for each type. return context; } diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 7176c01b05..d4ba950818 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -78,8 +78,8 @@ typedef struct client_channel_channel_data { int exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; - /** interested parties */ - grpc_pollset_set interested_parties; + /** interested parties (owned) */ + grpc_pollset_set *interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, - &chand->interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, + chand->interested_parties); } gpr_mu_lock(&chand->mu_config); @@ -231,9 +231,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, - &old_lb_policy->interested_parties, - &chand->interested_parties); + grpc_pollset_set_del_pollset_set( + exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -252,9 +251,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, op->bind_pollset); } @@ -284,8 +283,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - &chand->lb_policy->interested_parties, - &chand->interested_parties); + chand->lb_policy->interested_parties, + chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -411,7 +410,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); - grpc_pollset_set_init(&chand->interested_parties); + chand->interested_parties = grpc_pollset_set_create(); } /* Destructor for channel_data */ @@ -425,12 +424,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - &chand->lb_policy->interested_parties, - &chand->interested_parties); + chand->lb_policy->interested_parties, + chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(&chand->interested_parties); + grpc_pollset_set_destroy(chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -501,7 +500,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, w->pollset); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); @@ -517,7 +516,7 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; - grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); grpc_closure_init(&w->my_closure, on_external_watch_complete, w); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index bc997b192c..d32327206e 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -107,7 +107,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->set_accept_stream == false); GPR_ASSERT(op->bind_pollset == NULL); if (op->on_connectivity_state_change != NULL) { diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 81297c8d44..8f46885a04 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -168,21 +168,23 @@ retry: static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_subchannel_call_holder *holder = arg; - grpc_subchannel_call *call; gpr_mu_lock(&holder->mu); GPR_ASSERT(holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); - call = GET_CALL(holder); - GPR_ASSERT(call == NULL || call == CANCELLED_CALL); holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (holder->connected_subchannel == NULL) { fail_locked(exec_ctx, holder); } else { - gpr_atm_rel_store( - &holder->subchannel_call, - (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( - exec_ctx, holder->connected_subchannel, holder->pollset)); - retry_waiting_locked(exec_ctx, holder); + if (!gpr_atm_rel_cas( + &holder->subchannel_call, 0, + (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset))) { + GPR_ASSERT(gpr_atm_acq_load(&holder->subchannel_call) == 1); + /* if this cas fails, the call was cancelled before the pick completed */ + fail_locked(exec_ctx, holder); + } else { + retry_waiting_locked(exec_ctx, holder); + } } gpr_mu_unlock(&holder->mu); GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 459bbebb68..81167b31c8 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -31,8 +31,8 @@ * */ -#include "src/core/client_config/lb_policy_factory.h" #include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_factory.h" #include <string.h> @@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); @@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -195,8 +195,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, - pollset); + grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -253,7 +252,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, &p->base.interested_parties, + exec_ctx, selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -278,13 +277,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, &p->base.interested_parties, + exec_ctx, selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -298,7 +297,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -311,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index b1171c45b0..98d9acc75b 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { subchannel_data *sd = p->subchannels[i]; sd->connectivity_state = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } @@ -322,8 +322,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, - pollset); + grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -374,13 +373,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: @@ -389,13 +388,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, sd->connectivity_state, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index d4672f6b25..0d8b007336 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); - grpc_pollset_set_init(&policy->interested_parties); + policy->interested_parties = grpc_pollset_set_create(); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(&policy->interested_parties); + grpc_pollset_set_destroy(policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index db5238c8ca..3457390606 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,7 +48,8 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; - grpc_pollset_set interested_parties; + /* owned pointer to interested parties in load balancing decisions */ + grpc_pollset_set *interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 86d7ef783f..a5e8ebdfb0 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -109,7 +109,7 @@ struct grpc_subchannel { /** pollset_set tracking who's interested in a connection being setup */ - grpc_pollset_set pollset_set; + grpc_pollset_set *pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -210,7 +210,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_slice_unref(c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(&c->pollset_set); + grpc_pollset_set_destroy(c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -327,7 +327,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); - grpc_pollset_set_init(&c->pollset_set); + c->pollset_set = grpc_pollset_set_create(); c->addr_len = args->addr_len; grpc_set_initial_connect_string(&c->addr, &c->addr_len, &c->initial_connect_string); @@ -346,7 +346,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = &c->pollset_set; + args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -380,7 +380,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, + grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set, w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); @@ -396,7 +396,6 @@ void grpc_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify) { - int do_connect = 0; external_state_watcher *w; if (state == NULL) { @@ -416,7 +415,7 @@ void grpc_subchannel_notify_on_state_change( w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); if (interested_parties != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, + grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); @@ -426,17 +425,13 @@ void grpc_subchannel_notify_on_state_change( w->next->prev = w->prev->next = w; if (grpc_connectivity_state_notify_on_state_change( exec_ctx, &c->state_tracker, state, &w->closure)) { - do_connect = 1; c->connecting = 1; /* released by connection */ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + start_connect(exec_ctx, c); } gpr_mu_unlock(&c->mu); } - - if (do_connect) { - start_connect(exec_ctx, c); - } } void grpc_connected_subchannel_process_transport_op( @@ -556,7 +551,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( - exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state, + exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state, &sw_subchannel->closure); /* signal completion */ @@ -617,11 +612,12 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { if (c->disconnected) { iomgr_success = 0; } - gpr_mu_unlock(&c->mu); if (iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); + gpr_mu_unlock(&c->mu); } else { + gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 71237bb614..1219c444c7 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -31,20 +31,22 @@ * */ -#include "src/core/iomgr/sockaddr.h" #include "src/core/httpcli/httpcli.h" +#include "src/core/iomgr/sockaddr.h" #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/httpcli/format_request.h" +#include "src/core/httpcli/parser.h" #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" -#include "src/core/httpcli/format_request.h" -#include "src/core/httpcli/parser.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> typedef struct { gpr_slice request_text; @@ -84,18 +86,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http", plaintext_handshake}; void grpc_httpcli_context_init(grpc_httpcli_context *context) { - grpc_pollset_set_init(&context->pollset_set); + context->pollset_set = grpc_pollset_set_create(); } void grpc_httpcli_context_destroy(grpc_httpcli_context *context) { - grpc_pollset_set_destroy(&context->pollset_set); + grpc_pollset_set_destroy(context->pollset_set); } static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req); static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, int success) { - grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set, + grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, req->pollset); req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL); grpc_httpcli_parser_destroy(&req->parser); @@ -197,7 +199,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) { addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); grpc_tcp_client_connect( - exec_ctx, &req->connected, &req->ep, &req->context->pollset_set, + exec_ctx, &req->connected, &req->ep, req->context->pollset_set, (struct sockaddr *)&addr->addr, addr->len, req->deadline); } @@ -237,7 +239,7 @@ static void internal_request_begin( req->host = gpr_strdup(request->host); req->ssl_host_override = gpr_strdup(request->ssl_host_override); - grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set, + grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set, req->pollset); grpc_resolve_address(request->host, req->handshaker->default_port, on_resolved, req); diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index 30875d71f1..c9cd987c79 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -39,6 +39,7 @@ #include <grpc/support/time.h> #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset_set.h" /* User agent this library reports */ @@ -56,7 +57,7 @@ typedef struct grpc_httpcli_header { TODO(ctiller): allow caching and capturing multiple requests for the same content and combining them */ typedef struct grpc_httpcli_context { - grpc_pollset_set pollset_set; + grpc_pollset_set *pollset_set; } grpc_httpcli_context; typedef struct { diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 85eadd754b..4ba7c5df94 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -46,6 +46,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/pollset_posix.h" + #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) @@ -175,11 +177,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void pollset_kick_locked(grpc_fd_watcher *watcher) { - gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); grpc_pollset_kick_ext(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); - gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_unlock(&watcher->pollset->mu); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 04580150f3..9c89c2c08a 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -41,9 +41,11 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/timer.h" +#include "src/core/support/env.h" #include "src/core/support/string.h" static gpr_mu g_mu; @@ -116,6 +118,9 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); dump_objects("LEAKED"); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } break; } @@ -154,3 +159,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_mu_unlock(&g_mu); gpr_free(obj->name); } + +bool grpc_iomgr_abort_on_leaks(void) { + char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS"); + if (env == NULL) return false; + static const char *truthy[] = {"yes", "Yes", "YES", "true", + "True", "TRUE", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == strcmp(env, truthy[i])) return true; + } + return false; +} diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index e372c18e8a..ac2c46ebe6 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H +#include <stdbool.h> + #include "src/core/iomgr/iomgr.h" #include <grpc/support/sync.h> @@ -55,4 +57,6 @@ void grpc_iomgr_platform_flush(void); /** tear down all platform specific global iomgr structures */ void grpc_iomgr_platform_shutdown(void); +bool grpc_iomgr_abort_on_leaks(void); + #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 6585326f81..92a0374ddd 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -35,8 +35,11 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H #include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/iomgr/exec_ctx.h" + #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) /* A grpc_pollset is a set of file descriptors that a higher level item is @@ -46,15 +49,11 @@ - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_posix.h" -#endif - -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_windows.h" -#endif +typedef struct grpc_pollset grpc_pollset; +typedef struct grpc_pollset_worker grpc_pollset_worker; -void grpc_pollset_init(grpc_pollset *pollset); +size_t grpc_pollset_size(void); +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); /* Begin shutting down the pollset, and call closure when done. * GRPC_POLLSET_MU(pollset) must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 4acae2bb71..2e0f27fab8 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -45,6 +45,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/block_annotate.h" diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 809f8f39da..92d6fb7241 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -42,13 +42,15 @@ #include <stdlib.h> #include <string.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/support/block_annotate.h" + typedef struct { /* all polled fds */ size_t fd_count; @@ -120,6 +122,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } else { h->fds[fd_count++] = h->fds[i]; watchers[pfd_count].fd = h->fds[i]; + GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start"); pfds[pfd_count].fd = h->fds[i]->fd; pfds[pfd_count].revents = 0; pfd_count++; @@ -133,8 +136,10 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_mu_unlock(&pollset->mu); for (i = 2; i < pfd_count; i++) { - pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker, - POLLIN, POLLOUT, &watchers[i]); + grpc_fd *fd = watchers[i].fd; + pfds[i].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, + POLLOUT, &watchers[i]); + GRPC_FD_UNREF(fd, "multipoller_start"); } /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index ee7e9f48f4..e895a77884 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,16 +42,16 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_utils_posix.h" -#include "src/core/profiling/timers.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); @@ -97,6 +97,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, uint32_t flags) { @@ -186,8 +188,9 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); -void grpc_pollset_init(grpc_pollset *pollset) { +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); + *mu = &pollset->mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; @@ -204,7 +207,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); pollset->vtable->destroy(pollset); - gpr_mu_destroy(&pollset->mu); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 5868b3fa21..bbedb66b00 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -37,8 +37,10 @@ #include <poll.h> #include <grpc/support/sync.h> + #include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -53,15 +55,15 @@ typedef struct grpc_cached_wakeup_fd { struct grpc_cached_wakeup_fd *next; } grpc_cached_wakeup_fd; -typedef struct grpc_pollset_worker { +struct grpc_pollset_worker { grpc_cached_wakeup_fd *wakeup_fd; int reevaluate_polling_on_wakeup; int kicked_specifically; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; -} grpc_pollset_worker; +}; -typedef struct grpc_pollset { +struct grpc_pollset { /* pollsets under posix can mutate representation as fds are added and removed. For example, we may choose a poll() based implementation on linux for @@ -81,7 +83,7 @@ typedef struct grpc_pollset { } data; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; -} grpc_pollset; +}; struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -93,8 +95,6 @@ struct grpc_pollset_vtable { void (*destroy)(grpc_pollset *pollset); }; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) - /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 09c04438f7..dddcd8313f 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,15 +41,9 @@ fd's (etc) that have been registered with the set_set to that pollset. Registering fd's automatically adds them to all current pollsets. */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_set_posix.h" -#endif +typedef struct grpc_pollset_set grpc_pollset_set; -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_set_windows.h" -#endif - -void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +grpc_pollset_set *grpc_pollset_set_create(void); void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 4ec92202e3..9dc9aff4a8 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,11 +41,30 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" -void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { +struct grpc_pollset_set { + gpr_mu mu; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; +}; + +grpc_pollset_set *grpc_pollset_set_create(void) { + grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); memset(pollset_set, 0, sizeof(*pollset_set)); gpr_mu_init(&pollset_set->mu); + return pollset_set; } void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { @@ -57,6 +76,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); + gpr_free(pollset_set); } void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 4820a61e4b..7d1aaf4181 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,23 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H #include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/pollset_posix.h" - -typedef struct grpc_pollset_set { - gpr_mu mu; - - size_t pollset_count; - size_t pollset_capacity; - grpc_pollset **pollsets; - - size_t pollset_set_count; - size_t pollset_set_capacity; - struct grpc_pollset_set **pollset_sets; - - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; -} grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 157b46ec32..3b8eca28e6 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,9 +35,9 @@ #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_set_windows.h" -void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {} +grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; } void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h index cada0d2b61..9661cd2c39 100644 --- a/src/core/iomgr/pollset_set_windows.h +++ b/src/core/iomgr/pollset_set_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H -typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index bbce23b46a..c7f30f435f 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -89,12 +89,15 @@ static void push_front_worker(grpc_pollset_worker *root, worker->links[type].next->links[type].prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. We're still going to provide a minimal set of features for the sake of the rest of grpc. But grpc_pollset_work won't actually do any polling, and return as quickly as possible. */ -void grpc_pollset_init(grpc_pollset *pollset) { +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + *mu = &grpc_polling_mu; memset(pollset, 0, sizeof(*pollset)); pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 65ba80619b..dc0b7a4104 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -72,8 +72,4 @@ struct grpc_pollset { grpc_closure *on_shutdown; }; -extern gpr_mu grpc_polling_mu; - -#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) - #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c76c2e3b0f..15727856ab 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -42,17 +42,19 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/time.h> extern int grpc_tcp_trace; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 048e907441..f74eb3fe51 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -40,8 +40,8 @@ #include <errno.h> #include <stdlib.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -51,9 +51,11 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/support/string.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/string.h" #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 73a21c80ab..a9d0489edf 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/fd_posix.h" /* Forward decl of grpc_server */ typedef struct grpc_server grpc_server; diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index da11df67ef..c096dbfb30 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -44,6 +44,7 @@ #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 589034fe1b..68f195ee0d 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H +#include "src/core/iomgr/wakeup_fd_posix.h" + struct grpc_fd; struct grpc_workqueue { diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 458d0d3ac3..1f4f3e4aa5 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -52,13 +52,14 @@ static grpc_channel_credentials *default_credentials = NULL; static int compute_engine_detection_done = 0; -static gpr_mu g_mu; +static gpr_mu g_state_mu; +static gpr_mu *g_polling_mu; static gpr_once g_once = GPR_ONCE_INIT; -static void init_default_credentials(void) { gpr_mu_init(&g_mu); } +static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); } typedef struct { - grpc_pollset pollset; + grpc_pollset *pollset; int is_done; int success; } compute_engine_detector; @@ -80,10 +81,10 @@ static void on_compute_engine_detection_http_response( } } } - gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset)); + gpr_mu_lock(g_polling_mu); detector->is_done = 1; - grpc_pollset_kick(&detector->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); + grpc_pollset_kick(detector->pollset, NULL); + gpr_mu_unlock(g_polling_mu); } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) { @@ -101,7 +102,8 @@ static int is_stack_running_on_compute_engine(void) { on compute engine. */ gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); - grpc_pollset_init(&detector.pollset); + detector.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(detector.pollset, &g_polling_mu); detector.is_done = 0; detector.success = 0; @@ -112,7 +114,7 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); grpc_httpcli_get( - &exec_ctx, &context, &detector.pollset, &request, + &exec_ctx, &context, detector.pollset, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), on_compute_engine_detection_http_response, &detector); @@ -120,19 +122,22 @@ static int is_stack_running_on_compute_engine(void) { /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ - gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); + gpr_mu_lock(g_polling_mu); while (!detector.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, &detector.pollset, &worker, + grpc_pollset_work(&exec_ctx, detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); + gpr_mu_unlock(g_polling_mu); grpc_httpcli_context_destroy(&context); - grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset); - grpc_pollset_shutdown(&exec_ctx, &detector.pollset, &destroy_closure); + grpc_closure_init(&destroy_closure, destroy_pollset, detector.pollset); + grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure); grpc_exec_ctx_finish(&exec_ctx); + g_polling_mu = NULL; + + gpr_free(detector.pollset); return detector.success; } @@ -184,7 +189,7 @@ grpc_channel_credentials *grpc_google_default_credentials_create(void) { gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_state_mu); if (default_credentials != NULL) { result = grpc_channel_credentials_ref(default_credentials); @@ -230,19 +235,19 @@ end: gpr_log(GPR_ERROR, "Could not create google default credentials."); } } - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_state_mu); return result; } void grpc_flush_cached_google_default_credentials(void) { gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_state_mu); if (default_credentials != NULL) { grpc_channel_credentials_unref(default_credentials); default_credentials = NULL; } compute_engine_detection_done = 0; - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_state_mu); } /* -- Well known credentials path. -- */ diff --git a/src/core/support/sync.c b/src/core/support/sync.c index d368422d9e..69e3e39c5c 100644 --- a/src/core/support/sync.c +++ b/src/core/support/sync.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -98,6 +98,11 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); } void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); } +void gpr_ref_non_zero(gpr_refcount *r) { + gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1); + GPR_ASSERT(prior > 0); +} + void gpr_refn(gpr_refcount *r, int n) { gpr_atm_no_barrier_fetch_add(&r->count, n); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index f9cb852722..f6a95ebbd3 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,18 +36,19 @@ #include <stdio.h> #include <string.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/timer.h" +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" -#include "src/core/profiling/timers.h" -#include <grpc/support/alloc.h> -#include <grpc/support/atm.h> -#include <grpc/support/log.h> -#include <grpc/support/time.h> typedef struct { grpc_pollset_worker **worker; @@ -56,6 +57,8 @@ typedef struct { /* Completion queue structure */ struct grpc_completion_queue { + /** owned by pollset */ + gpr_mu *mu; /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -63,8 +66,6 @@ struct grpc_completion_queue { gpr_refcount pending_events; /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /** the set of low level i/o things that concern this cq */ - grpc_pollset pollset; /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; @@ -82,6 +83,8 @@ struct grpc_completion_queue { grpc_completion_queue *next_free; }; +#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) + static gpr_mu g_freelist_mu; grpc_completion_queue *g_freelist; @@ -94,7 +97,7 @@ void grpc_cq_global_shutdown(void) { gpr_mu_destroy(&g_freelist_mu); while (g_freelist) { grpc_completion_queue *next = g_freelist->next_free; - grpc_pollset_destroy(&g_freelist->pollset); + grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist)); #ifndef NDEBUG gpr_free(g_freelist->outstanding_tags); #endif @@ -124,8 +127,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { if (g_freelist == NULL) { gpr_mu_unlock(&g_freelist_mu); - cc = gpr_malloc(sizeof(grpc_completion_queue)); - grpc_pollset_init(&cc->pollset); + cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); + grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; @@ -184,7 +187,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_reset(&cc->pollset); + grpc_pollset_reset(POLLSET_FROM_CQ(cc)); gpr_mu_lock(&g_freelist_mu); cc->next_free = g_freelist; g_freelist = cc; @@ -194,7 +197,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { #ifndef NDEBUG - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); GPR_ASSERT(!cc->shutdown_called); if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); @@ -203,7 +206,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { cc->outstanding_tag_capacity); } cc->outstanding_tags[cc->outstanding_tag_count++] = tag; - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); #endif gpr_ref(&cc->pending_events); } @@ -231,7 +234,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, storage->next = ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0)); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); #ifndef NDEBUG for (i = 0; i < (int)cc->outstanding_tag_count; i++) { if (cc->outstanding_tags[i] == tag) { @@ -256,8 +259,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, break; } } - grpc_pollset_kick(&cc->pollset, pluck_worker); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + gpr_mu_unlock(cc->mu); } else { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -265,8 +268,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); + gpr_mu_unlock(cc->mu); } GPR_TIMER_END("grpc_cq_end_op", 0); @@ -294,7 +298,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); for (;;) { if (cc->completed_tail != &cc->completed_head) { grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; @@ -302,7 +306,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, if (c == cc->completed_tail) { cc->completed_tail = &cc->completed_head; } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -310,14 +314,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } if (cc->shutdown) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -330,11 +334,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); + continue; } else { - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); } } @@ -395,7 +400,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); for (;;) { prev = &cc->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != @@ -405,7 +410,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, if (c == cc->completed_tail) { cc->completed_tail = prev; } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -415,7 +420,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (cc->shutdown) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -425,7 +430,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -434,7 +439,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -447,11 +452,11 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); } else { - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); } del_plucker(cc, tag, &worker); @@ -472,9 +477,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); if (cc->shutdown_called) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } @@ -482,9 +487,10 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); + grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -498,7 +504,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return &cc->pollset; + return POLLSET_FROM_CQ(cc); } void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index c88f769eda..da93474b26 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -403,8 +403,15 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) { maybe_finish_shutdown(exec_ctx, chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; - grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true, - NULL); + + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.set_accept_stream = true; + op.on_consumed = &chand->finish_destroy_channel_closure; + grpc_channel_next_op(exec_ctx, + grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + &op); } static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -931,7 +938,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity"); memset(&op, 0, sizeof(op)); - op.set_accept_stream = accept_stream; + op.set_accept_stream = true; + op.set_accept_stream_fn = accept_stream; op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index d76d31be23..b720d1ab3e 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -358,6 +358,9 @@ struct grpc_chttp2_transport { /** connectivity tracking */ grpc_connectivity_state_tracker state_tracker; } channel_callback; + + /** Transport op to be applied post-parsing */ + grpc_transport_op *post_parsing_op; }; typedef struct { @@ -417,7 +420,7 @@ typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ uint32_t id; uint8_t fetching; - uint8_t sent_initial_metadata; + bool sent_initial_metadata; uint8_t sent_message; uint8_t sent_trailing_metadata; uint8_t read_closed; @@ -509,7 +512,7 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); -void grpc_chttp2_list_add_writable_stream( +bool grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); /** Get a writable stream @@ -519,14 +522,13 @@ int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); -void grpc_chttp2_list_remove_writable_stream( +bool grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); + grpc_chttp2_stream_global *stream_global) GRPC_MUST_USE_RESULT; -/* returns 1 if stream added, 0 if it was already present */ -int grpc_chttp2_list_add_writing_stream( +void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT; + grpc_chttp2_stream_writing *stream_writing); int grpc_chttp2_list_have_writing_streams( grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_list_pop_writing_stream( @@ -770,4 +772,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *parsing, const uint8_t *opaque_8bytes); +/** add a ref to the stream and add it to the writable list; + ref will be dropped in writing.c */ +void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); + #endif diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 8fdebd7f13..0516f39fa9 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -149,7 +149,7 @@ void grpc_chttp2_publish_reads( if (was_zero && !is_zero) { while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, &stream_global)) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -178,7 +178,7 @@ void grpc_chttp2_publish_reads( outgoing_window); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index b284c78818..60fe735cfc 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -100,11 +100,14 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, } } -static void stream_list_maybe_remove(grpc_chttp2_transport *t, +static bool stream_list_maybe_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) { if (s->included[id]) { stream_list_remove(t, s, id); + return true; + } else { + return false; } } @@ -125,23 +128,24 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, s->included[id] = 1; } -static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_stream_list_id id) { +static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_stream_list_id id) { if (s->included[id]) { - return 0; + return false; } stream_list_add_tail(t, s, id); - return 1; + return true; } /* wrappers for specializations */ -void grpc_chttp2_list_add_writable_stream( +bool grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { GPR_ASSERT(stream_global->id != 0); - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); + return stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); } int grpc_chttp2_list_pop_writable_stream( @@ -159,20 +163,20 @@ int grpc_chttp2_list_pop_writable_stream( return r; } -void grpc_chttp2_list_remove_writable_stream( +bool grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_WRITABLE); + return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); } -int grpc_chttp2_list_add_writing_stream( +void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), - STREAM_FROM_WRITING(stream_writing), - GRPC_CHTTP2_LIST_WRITING); + GPR_ASSERT(stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_WRITING)); } int grpc_chttp2_list_have_writing_streams( @@ -332,7 +336,7 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport( while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { if (is_window_available) { - grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); + grpc_chttp2_become_writable(&transport->global, &stream->global); } else { grpc_chttp2_list_add_stalled_by_transport(transport_writing, &stream->writing); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 356fd8174a..107725cbc7 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -83,7 +83,8 @@ int grpc_chttp2_unlocking_check_writes( (according to available window sizes) and add to the output buffer */ while (grpc_chttp2_list_pop_writable_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - uint8_t sent_initial_metadata; + bool sent_initial_metadata = stream_writing->sent_initial_metadata; + bool become_writable = false; stream_writing->id = stream_global->id; stream_writing->read_closed = stream_global->read_closed; @@ -92,16 +93,12 @@ int grpc_chttp2_unlocking_check_writes( outgoing_window, stream_global, outgoing_window); - sent_initial_metadata = stream_writing->sent_initial_metadata; if (!sent_initial_metadata && stream_global->send_initial_metadata) { stream_writing->send_initial_metadata = stream_global->send_initial_metadata; stream_global->send_initial_metadata = NULL; - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } - sent_initial_metadata = 1; + become_writable = true; + sent_initial_metadata = true; } if (sent_initial_metadata) { if (stream_global->send_message != NULL) { @@ -128,10 +125,7 @@ int grpc_chttp2_unlocking_check_writes( stream_writing->flow_controlled_buffer.length > 0) && stream_writing->outgoing_window > 0) { if (transport_writing->outgoing_window > 0) { - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; } else { grpc_chttp2_list_add_stalled_by_transport(transport_writing, stream_writing); @@ -141,10 +135,7 @@ int grpc_chttp2_unlocking_check_writes( stream_writing->send_trailing_metadata = stream_global->send_trailing_metadata; stream_global->send_trailing_metadata = NULL; - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; } } @@ -153,10 +144,13 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing, announce_window, stream_global, unannounced_incoming_window_for_writing); - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; + } + + if (become_writable) { + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); + } else { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); } } @@ -310,10 +304,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, (stream_writing->send_message && !stream_writing->fetching)) && stream_writing->outgoing_window > 0) { if (transport_writing->outgoing_window > 0) { - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - /* do nothing - already reffed */ - } + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } else { grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, stream_writing); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c49e1409a4..620c57f3bc 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -142,7 +142,7 @@ static void incoming_byte_stream_update_flow_control( static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global); -/* +/******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -432,6 +432,14 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, if (t->ep) { allow_endpoint_shutdown_locked(exec_ctx, t); } + + /* flush writable stream list to avoid dangling references */ + grpc_chttp2_stream_global *stream_global; + grpc_chttp2_stream_writing *stream_writing; + while (grpc_chttp2_list_pop_writable_stream( + &t->global, &t->writing, &stream_global, &stream_writing)) { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); + } } } @@ -521,7 +529,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->global.id) == NULL); } - grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); @@ -583,7 +590,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( return &accepting->parsing; } -/* +/******************************************************************************* * LOCK MANAGEMENT */ @@ -611,10 +618,18 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { GPR_TIMER_END("unlock", 0); } -/* +/******************************************************************************* * OUTPUT PROCESSING */ +void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && + grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { + GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + } +} + static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters *sp = @@ -732,7 +747,7 @@ static void maybe_start_some_streams( stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = 1; transport_global->concurrent_stream_count++; - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -821,7 +836,7 @@ static void perform_stream_op_locked( maybe_start_some_streams(exec_ctx, transport_global); } else { GPR_ASSERT(stream_global->id != 0); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } else { grpc_chttp2_complete_closure_step( @@ -838,7 +853,7 @@ static void perform_stream_op_locked( exec_ctx, &stream_global->send_message_finished, 0); } else if (stream_global->id != 0) { stream_global->send_message = op->send_message; - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -858,7 +873,7 @@ static void perform_stream_op_locked( } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -944,12 +959,10 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, unlock(exec_ctx, t); } -static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) { - grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; - int close_transport = 0; - - lock(t); +static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_transport_op *op) { + bool close_transport = false; grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); @@ -968,8 +981,8 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, close_transport = !grpc_chttp2_has_streams(t); } - if (op->set_accept_stream != NULL) { - t->channel_callback.accept_stream = op->set_accept_stream; + if (op->set_accept_stream) { + t->channel_callback.accept_stream = op->set_accept_stream_fn; t->channel_callback.accept_stream_user_data = op->set_accept_stream_user_data; } @@ -990,16 +1003,31 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, close_transport_locked(exec_ctx, t); } - unlock(exec_ctx, t); - if (close_transport) { - lock(t); close_transport_locked(exec_ctx, t); - unlock(exec_ctx, t); } } -/* +static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_transport_op *op) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; + + lock(t); + + /* If there's a set_accept_stream ensure that we're not parsing + to avoid changing things out from underneath */ + if (t->parsing_active && op->set_accept_stream) { + GPR_ASSERT(t->post_parsing_op == NULL); + t->post_parsing_op = gpr_malloc(sizeof(*op)); + memcpy(t->post_parsing_op, op, sizeof(*op)); + } else { + perform_transport_op_locked(exec_ctx, t, op); + } + + unlock(exec_ctx, t); +} + +/******************************************************************************* * INPUT PROCESSING */ @@ -1064,7 +1092,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (!s) { s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id); } - grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); GPR_ASSERT(s); s->global.in_stream_map = 0; if (t->parsing.incoming_stream == &s->parsing) { @@ -1080,6 +1107,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { close_transport_locked(exec_ctx, t); } + if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing"); + } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + grpc_chttp2_stream_map_size(&t->new_stream_map); @@ -1331,7 +1361,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) { is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -1392,6 +1422,13 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { /* handle higher level things */ grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing); t->parsing_active = 0; + /* handle delayed transport ops (if there is one) */ + if (t->post_parsing_op) { + grpc_transport_op *op = t->post_parsing_op; + t->post_parsing_op = NULL; + perform_transport_op_locked(exec_ctx, t, op); + gpr_free(op); + } /* if a stream is in the stream map, and gets cancelled, we need to ensure * we are not parsing before continuing the cancellation to keep things in * a sane state */ @@ -1426,7 +1463,7 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { GPR_TIMER_END("recv_data", 0); } -/* +/******************************************************************************* * CALLBACK LOOP */ @@ -1440,7 +1477,7 @@ static void connectivity_state_set( state, reason); } -/* +/******************************************************************************* * POLLSET STUFF */ @@ -1468,7 +1505,7 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, unlock(exec_ctx, t); } -/* +/******************************************************************************* * BYTE STREAM */ @@ -1508,7 +1545,7 @@ static void incoming_byte_stream_update_flow_control( add_max_recv_bytes); grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, stream_global); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -1623,7 +1660,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( return incoming_byte_stream; } -/* +/******************************************************************************* * TRACING */ @@ -1709,7 +1746,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, gpr_free(prefix); } -/* +/******************************************************************************* * INTEGRATION GLUE */ diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 14912af7df..807ae071a3 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -43,11 +43,13 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/time.h> + #include "src/core/profiling/timers.h" #include "src/core/support/murmur_hash.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/bin_encoder.h" #include "src/core/transport/static_metadata.h" +#include "src/core/iomgr/iomgr_internal.h" /* There are two kinds of mdelem and mdstr instances. * Static instances are declared in static_metadata.{h,c} and @@ -227,6 +229,9 @@ void grpc_mdctx_global_shutdown(void) { if (shard->count != 0) { gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked", shard->count); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } gpr_free(shard->elems); } @@ -237,6 +242,9 @@ void grpc_mdctx_global_shutdown(void) { if (shard->count != 0) { gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked", shard->count); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } gpr_free(shard->strs); } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 6e154b629a..3b555fa933 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -45,7 +45,7 @@ void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { #else void grpc_stream_ref(grpc_stream_refcount *refcount) { #endif - gpr_ref(&refcount->refs); + gpr_ref_non_zero(&refcount->refs); } #ifdef GRPC_STREAM_REFCOUNT_DEBUG diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 8902c5d2f6..ed6e121c9c 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -123,7 +123,7 @@ typedef struct grpc_transport_stream_op { /** Transport op: a set of operations to perform on a transport as a whole */ typedef struct grpc_transport_op { - /** called when processing of this op is done */ + /** Called when processing of this op is done. */ grpc_closure *on_consumed; /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */ grpc_closure *on_connectivity_state_change; @@ -138,9 +138,13 @@ typedef struct grpc_transport_op { grpc_status_code goaway_status; gpr_slice *goaway_message; /** set the callback for accepting new streams; - this is a permanent callback, unlike the other one-shot closures */ - void (*set_accept_stream)(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_transport *transport, const void *server_data); + this is a permanent callback, unlike the other one-shot closures. + If true, the callback is set to set_accept_stream_fn, with its + user_data argument set to set_accept_stream_user_data */ + bool set_accept_stream; + void (*set_accept_stream_fn)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, + const void *server_data); void *set_accept_stream_user_data; /** add this transport to a pollset */ grpc_pollset *bind_pollset; |