diff options
author | vjpai <vpai@google.com> | 2016-03-03 06:59:16 -0800 |
---|---|---|
committer | vjpai <vpai@google.com> | 2016-03-03 06:59:16 -0800 |
commit | b7117e409bf35895dd9029bd811ac16fd02e3d83 (patch) | |
tree | 732339fe04361ad8ebfdb168dedc465c9cfdd5d5 /src/core | |
parent | c47c9497fa6518205a598c2bf74517df09541fed (diff) | |
parent | 40d27ba78455d9c5ef1feef250d0d889d6bfb746 (diff) |
Merge branch 'master' into debug_qps_stream
Diffstat (limited to 'src/core')
50 files changed, 599 insertions, 509 deletions
diff --git a/src/core/census/context.c b/src/core/census/context.c index e60330de64..89b8ee0b39 100644 --- a/src/core/census/context.c +++ b/src/core/census/context.c @@ -60,10 +60,10 @@ // limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS. // * Keep all tag information (keys/values/flags) in a single memory buffer, // that can be directly copied to the wire. -// * Binary tags share the same structure as, but are encoded separately from, -// non-binary tags. This is primarily because non-binary tags are far more -// likely to be repeated across multiple RPC calls, so are more efficiently -// cached and compressed in any metadata schemes. + +// min and max valid chars in tag keys and values. All printable ASCII is OK. +#define MIN_VALID_TAG_CHAR 32 // ' ' +#define MAX_VALID_TAG_CHAR 126 // '~' // Structure representing a set of tags. Essentially a count of number of tags // present, and pointer to a chunk of memory that contains the per-tag details. @@ -77,7 +77,7 @@ struct tag_set { char *kvm; // key/value memory. Consists of repeated entries of: // Offset Size Description // 0 1 Key length, including trailing 0. (K) - // 1 1 Value length. (V) + // 1 1 Value length, including trailing 0 (V) // 2 1 Flags // 3 K Key bytes // 3 + K V Value bytes @@ -108,19 +108,36 @@ struct raw_tag { #define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED #define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED) -// Primary (external) representation of a context. Composed of 3 underlying -// tag_set structs, one for each of the binary/printable propagated tags, and -// one for everything else. This is to efficiently support tag -// encoding/decoding. +// Primary representation of a context. Composed of 2 underlying tag_set +// structs, one each for propagated and local (non-propagated) tags. This is +// to efficiently support tag encoding/decoding. +// TODO(aveitch): need to add tracing id's/structure. struct census_context { - struct tag_set tags[3]; + struct tag_set tags[2]; census_context_status status; }; // Indices into the tags member of census_context #define PROPAGATED_TAGS 0 -#define PROPAGATED_BINARY_TAGS 1 -#define LOCAL_TAGS 2 +#define LOCAL_TAGS 1 + +// Validate (check all characters are in range and size is less than limit) a +// key or value string. Returns 0 if the string is invalid, or the length +// (including terminator) if valid. +static size_t validate_tag(const char *kv) { + size_t len = 1; + char ch; + while ((ch = *kv++) != 0) { + if (ch < MIN_VALID_TAG_CHAR || ch > MAX_VALID_TAG_CHAR) { + return 0; + } + len++; + } + if (len > CENSUS_MAX_TAG_KV_LEN) { + return 0; + } + return len; +} // Extract a raw tag given a pointer (raw) to the tag header. Allow for some // extra bytes in the tag header (see encode/decode functions for usage: this @@ -166,9 +183,7 @@ static bool context_delete_tag(census_context *context, const census_tag *tag, size_t key_len) { return ( tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) || - tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len) || - tag_set_delete_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag->key, - key_len)); + tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len)); } // Add a tag to a tag_set. Return true on success, false if the tag could @@ -176,11 +191,11 @@ static bool context_delete_tag(census_context *context, const census_tag *tag, // not be called if the tag may already exist (in a non-deleted state) in // the tag_set, as that would result in two tags with the same key. static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, - size_t key_len) { + size_t key_len, size_t value_len) { if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) { return false; } - const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE; + const size_t tag_size = key_len + value_len + TAG_HEADER_SIZE; if (tags->kvm_used + tag_size > tags->kvm_size) { // allocate new memory if needed tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE; @@ -191,13 +206,12 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, } char *kvp = tags->kvm + tags->kvm_used; *kvp++ = (char)key_len; - *kvp++ = (char)tag->value_len; + *kvp++ = (char)value_len; // ensure reserved flags are not used. - *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS | - CENSUS_TAG_BINARY)); + *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS)); memcpy(kvp, tag->key, key_len); kvp += key_len; - memcpy(kvp, tag->value, tag->value_len); + memcpy(kvp, tag->value, value_len); tags->kvm_used += tag_size; tags->ntags++; tags->ntags_alloc++; @@ -207,30 +221,20 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag, // Add/modify/delete a tag to/in a context. Caller must validate that tag key // etc. are valid. static void context_modify_tag(census_context *context, const census_tag *tag, - size_t key_len) { + size_t key_len, size_t value_len) { // First delete the tag if it is already present. bool deleted = context_delete_tag(context, tag, key_len); - // Determine if we need to add it back. - bool call_add = tag->value != NULL && tag->value_len != 0; bool added = false; - if (call_add) { - if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { - if (CENSUS_TAG_IS_BINARY(tag->flags)) { - added = tag_set_add_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag, - key_len); - } else { - added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len); - } - } else { - added = tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len); - } + if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) { + added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len, + value_len); + } else { + added = + tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len, value_len); } + if (deleted) { - if (call_add) { - context->status.n_modified_tags++; - } else { - context->status.n_deleted_tags++; - } + context->status.n_modified_tags++; } else { if (added) { context->status.n_added_tags++; @@ -292,8 +296,6 @@ census_context *census_context_create(const census_context *base, memset(context, 0, sizeof(census_context)); } else { tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]); - tag_set_copy(&context->tags[PROPAGATED_BINARY_TAGS], - &base->tags[PROPAGATED_BINARY_TAGS]); tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]); memset(&context->status, 0, sizeof(context->status)); } @@ -301,22 +303,29 @@ census_context *census_context_create(const census_context *base, // the context to add/replace/delete as required. for (int i = 0; i < ntags; i++) { const census_tag *tag = &tags[i]; - size_t key_len = strlen(tag->key) + 1; - // ignore the tag if it is too long/short. - if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN && - tag->value_len <= CENSUS_MAX_TAG_KV_LEN) { - context_modify_tag(context, tag, key_len); - } else { + size_t key_len = validate_tag(tag->key); + // ignore the tag if it is invalid or too short. + if (key_len <= 1) { context->status.n_invalid_tags++; + } else { + if (tag->value != NULL) { + size_t value_len = validate_tag(tag->value); + if (value_len != 0) { + context_modify_tag(context, tag, key_len, value_len); + } else { + context->status.n_invalid_tags++; + } + } else { + if (context_delete_tag(context, tag, key_len)) { + context->status.n_deleted_tags++; + } + } } } // Remove any deleted tags, update status if needed, and return. tag_set_flatten(&context->tags[PROPAGATED_TAGS]); - tag_set_flatten(&context->tags[PROPAGATED_BINARY_TAGS]); tag_set_flatten(&context->tags[LOCAL_TAGS]); context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; - context->status.n_propagated_binary_tags = - context->tags[PROPAGATED_BINARY_TAGS].ntags; context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags; if (status) { *status = &context->status; @@ -331,7 +340,6 @@ const census_context_status *census_context_get_status( void census_context_destroy(census_context *context) { gpr_free(context->tags[PROPAGATED_TAGS].kvm); - gpr_free(context->tags[PROPAGATED_BINARY_TAGS].kvm); gpr_free(context->tags[LOCAL_TAGS].kvm); gpr_free(context); } @@ -343,9 +351,6 @@ void census_context_initialize_iterator(const census_context *context, if (context->tags[PROPAGATED_TAGS].ntags != 0) { iterator->base = PROPAGATED_TAGS; iterator->kvm = context->tags[PROPAGATED_TAGS].kvm; - } else if (context->tags[PROPAGATED_BINARY_TAGS].ntags != 0) { - iterator->base = PROPAGATED_BINARY_TAGS; - iterator->kvm = context->tags[PROPAGATED_BINARY_TAGS].kvm; } else if (context->tags[LOCAL_TAGS].ntags != 0) { iterator->base = LOCAL_TAGS; iterator->kvm = context->tags[LOCAL_TAGS].kvm; @@ -363,7 +368,6 @@ int census_context_next_tag(census_context_iterator *iterator, iterator->kvm = decode_tag(&raw, iterator->kvm, 0); tag->key = raw.key; tag->value = raw.value; - tag->value_len = raw.value_len; tag->flags = raw.flags; if (++iterator->index == iterator->context->tags[iterator->base].ntags) { do { @@ -388,7 +392,6 @@ static bool tag_set_get_tag(const struct tag_set *tags, const char *key, if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) { tag->key = raw.key; tag->value = raw.value; - tag->value_len = raw.value_len; tag->flags = raw.flags; return true; } @@ -403,8 +406,6 @@ int census_context_get_tag(const census_context *context, const char *key, return 0; } if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) || - tag_set_get_tag(&context->tags[PROPAGATED_BINARY_TAGS], key, key_len, - tag) || tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) { return 1; } @@ -447,21 +448,9 @@ static size_t tag_set_encode(const struct tag_set *tags, char *buffer, return ENCODED_HEADER_SIZE + tags->kvm_used; } -char *census_context_encode(const census_context *context, char *buffer, - size_t buf_size, size_t *print_buf_size, - size_t *bin_buf_size) { - *print_buf_size = - tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size); - if (*print_buf_size == 0) { - return NULL; - } - char *b_buffer = buffer + *print_buf_size; - *bin_buf_size = tag_set_encode(&context->tags[PROPAGATED_BINARY_TAGS], - b_buffer, buf_size - *print_buf_size); - if (*bin_buf_size == 0) { - return NULL; - } - return b_buffer; +size_t census_context_encode(const census_context *context, char *buffer, + size_t buf_size) { + return tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size); } // Decode a tag set. @@ -506,8 +495,7 @@ static void tag_set_decode(struct tag_set *tags, const char *buffer, } } -census_context *census_context_decode(const char *buffer, size_t size, - const char *bin_buffer, size_t bin_size) { +census_context *census_context_decode(const char *buffer, size_t size) { census_context *context = gpr_malloc(sizeof(census_context)); memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set)); if (buffer == NULL) { @@ -515,16 +503,7 @@ census_context *census_context_decode(const char *buffer, size_t size, } else { tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size); } - if (bin_buffer == NULL) { - memset(&context->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set)); - } else { - tag_set_decode(&context->tags[PROPAGATED_BINARY_TAGS], bin_buffer, - bin_size); - } memset(&context->status, 0, sizeof(context->status)); context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags; - context->status.n_propagated_binary_tags = - context->tags[PROPAGATED_BINARY_TAGS].ntags; - // TODO(aveitch): check that BINARY flag is correct for each type. return context; } diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 7176c01b05..eeac3c146c 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -78,8 +78,8 @@ typedef struct client_channel_channel_data { int exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; - /** interested parties */ - grpc_pollset_set interested_parties; + /** interested parties (owned) */ + grpc_pollset_set *interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, - &chand->interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, + chand->interested_parties); } gpr_mu_lock(&chand->mu_config); @@ -231,9 +231,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, - &old_lb_policy->interested_parties, - &chand->interested_parties); + grpc_pollset_set_del_pollset_set( + exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -254,7 +253,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(op->set_accept_stream == NULL); if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, op->bind_pollset); } @@ -284,8 +283,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - &chand->lb_policy->interested_parties, - &chand->interested_parties); + chand->lb_policy->interested_parties, + chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -411,7 +410,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); - grpc_pollset_set_init(&chand->interested_parties); + chand->interested_parties = grpc_pollset_set_create(); } /* Destructor for channel_data */ @@ -425,12 +424,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - &chand->lb_policy->interested_parties, - &chand->interested_parties); + chand->lb_policy->interested_parties, + chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(&chand->interested_parties); + grpc_pollset_set_destroy(chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -501,7 +500,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, w->pollset); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); @@ -517,7 +516,7 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; - grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); grpc_closure_init(&w->my_closure, on_external_watch_complete, w); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 459bbebb68..81167b31c8 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -31,8 +31,8 @@ * */ -#include "src/core/client_config/lb_policy_factory.h" #include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_factory.h" #include <string.h> @@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); @@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -195,8 +195,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, - pollset); + grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -253,7 +252,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, &p->base.interested_parties, + exec_ctx, selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -278,13 +277,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, &p->base.interested_parties, + exec_ctx, selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -298,7 +297,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -311,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index b1171c45b0..98d9acc75b 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { subchannel_data *sd = p->subchannels[i]; sd->connectivity_state = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } @@ -322,8 +322,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, - pollset); + grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -374,13 +373,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: @@ -389,13 +388,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, sd->connectivity_state, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index d4672f6b25..0d8b007336 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); - grpc_pollset_set_init(&policy->interested_parties); + policy->interested_parties = grpc_pollset_set_create(); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(&policy->interested_parties); + grpc_pollset_set_destroy(policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index db5238c8ca..3457390606 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,7 +48,8 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; - grpc_pollset_set interested_parties; + /* owned pointer to interested parties in load balancing decisions */ + grpc_pollset_set *interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6599c75dba..d91dd116b8 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -108,7 +108,7 @@ struct grpc_subchannel { /** pollset_set tracking who's interested in a connection being setup */ - grpc_pollset_set pollset_set; + grpc_pollset_set *pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -209,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_slice_unref(c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(&c->pollset_set); + grpc_pollset_set_destroy(c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -326,7 +326,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); - grpc_pollset_set_init(&c->pollset_set); + c->pollset_set = grpc_pollset_set_create(); c->addr_len = args->addr_len; grpc_set_initial_connect_string(&c->addr, &c->addr_len, &c->initial_connect_string); @@ -345,7 +345,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = &c->pollset_set; + args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -379,7 +379,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, + grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set, w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); @@ -395,7 +395,6 @@ void grpc_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify) { - int do_connect = 0; external_state_watcher *w; if (state == NULL) { @@ -415,7 +414,7 @@ void grpc_subchannel_notify_on_state_change( w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); if (interested_parties != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, + grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); @@ -425,17 +424,13 @@ void grpc_subchannel_notify_on_state_change( w->next->prev = w->prev->next = w; if (grpc_connectivity_state_notify_on_state_change( exec_ctx, &c->state_tracker, state, &w->closure)) { - do_connect = 1; c->connecting = 1; /* released by connection */ GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + start_connect(exec_ctx, c); } gpr_mu_unlock(&c->mu); } - - if (do_connect) { - start_connect(exec_ctx, c); - } } void grpc_connected_subchannel_process_transport_op( @@ -573,7 +568,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( - exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state, + exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state, &sw_subchannel->closure); /* signal completion */ @@ -635,11 +630,12 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { if (c->disconnected) { iomgr_success = 0; } - gpr_mu_unlock(&c->mu); if (iomgr_success) { update_reconnect_parameters(c); continue_connect(exec_ctx, c); + gpr_mu_unlock(&c->mu); } else { + gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 71237bb614..1219c444c7 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -31,20 +31,22 @@ * */ -#include "src/core/iomgr/sockaddr.h" #include "src/core/httpcli/httpcli.h" +#include "src/core/iomgr/sockaddr.h" #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/httpcli/format_request.h" +#include "src/core/httpcli/parser.h" #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" -#include "src/core/httpcli/format_request.h" -#include "src/core/httpcli/parser.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> typedef struct { gpr_slice request_text; @@ -84,18 +86,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http", plaintext_handshake}; void grpc_httpcli_context_init(grpc_httpcli_context *context) { - grpc_pollset_set_init(&context->pollset_set); + context->pollset_set = grpc_pollset_set_create(); } void grpc_httpcli_context_destroy(grpc_httpcli_context *context) { - grpc_pollset_set_destroy(&context->pollset_set); + grpc_pollset_set_destroy(context->pollset_set); } static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req); static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, int success) { - grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set, + grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, req->pollset); req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL); grpc_httpcli_parser_destroy(&req->parser); @@ -197,7 +199,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) { addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); grpc_tcp_client_connect( - exec_ctx, &req->connected, &req->ep, &req->context->pollset_set, + exec_ctx, &req->connected, &req->ep, req->context->pollset_set, (struct sockaddr *)&addr->addr, addr->len, req->deadline); } @@ -237,7 +239,7 @@ static void internal_request_begin( req->host = gpr_strdup(request->host); req->ssl_host_override = gpr_strdup(request->ssl_host_override); - grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set, + grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set, req->pollset); grpc_resolve_address(request->host, req->handshaker->default_port, on_resolved, req); diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index 30875d71f1..c9cd987c79 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -39,6 +39,7 @@ #include <grpc/support/time.h> #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset_set.h" /* User agent this library reports */ @@ -56,7 +57,7 @@ typedef struct grpc_httpcli_header { TODO(ctiller): allow caching and capturing multiple requests for the same content and combining them */ typedef struct grpc_httpcli_context { - grpc_pollset_set pollset_set; + grpc_pollset_set *pollset_set; } grpc_httpcli_context; typedef struct { diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/httpcli/httpcli_security_connector.c index 41ad1de6c0..156961a377 100644 --- a/src/core/httpcli/httpcli_security_connector.c +++ b/src/core/httpcli/httpcli_security_connector.c @@ -59,7 +59,7 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) { } static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_security_connector *sc, + grpc_channel_security_connector *sc, grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data) { @@ -78,8 +78,8 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx, tsi_result_to_string(result)); cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); } else { - grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb, - user_data); + grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true, + nonsecure_endpoint, cb, user_data); } } @@ -103,7 +103,7 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, } static grpc_security_connector_vtable httpcli_ssl_vtable = { - httpcli_ssl_destroy, httpcli_ssl_do_handshake, httpcli_ssl_check_peer}; + httpcli_ssl_destroy, httpcli_ssl_check_peer}; static grpc_security_status httpcli_ssl_channel_security_connector_create( const unsigned char *pem_root_certs, size_t pem_root_certs_size, @@ -121,7 +121,6 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( memset(c, 0, sizeof(grpc_httpcli_ssl_channel_security_connector)); gpr_ref_init(&c->base.base.refcount, 1); - c->base.base.is_client_side = 1; c->base.base.vtable = &httpcli_ssl_vtable; if (secure_peer_name != NULL) { c->secure_peer_name = gpr_strdup(secure_peer_name); @@ -136,6 +135,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( *sc = NULL; return GRPC_SECURITY_ERROR; } + c->base.do_handshake = httpcli_ssl_do_handshake; *sc = &c->base; return GRPC_SECURITY_OK; } @@ -180,8 +180,8 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(httpcli_ssl_channel_security_connector_create( pem_root_certs, pem_root_certs_size, host, &sc) == GRPC_SECURITY_OK); - grpc_security_connector_do_handshake(exec_ctx, &sc->base, tcp, - on_secure_transport_setup_done, c); + grpc_channel_security_connector_do_handshake( + exec_ctx, sc, tcp, on_secure_transport_setup_done, c); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli"); } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 85eadd754b..4ba7c5df94 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -46,6 +46,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/pollset_posix.h" + #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) @@ -175,11 +177,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void pollset_kick_locked(grpc_fd_watcher *watcher) { - gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); grpc_pollset_kick_ext(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); - gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_unlock(&watcher->pollset->mu); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 04580150f3..9c89c2c08a 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -41,9 +41,11 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/timer.h" +#include "src/core/support/env.h" #include "src/core/support/string.h" static gpr_mu g_mu; @@ -116,6 +118,9 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); dump_objects("LEAKED"); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } break; } @@ -154,3 +159,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_mu_unlock(&g_mu); gpr_free(obj->name); } + +bool grpc_iomgr_abort_on_leaks(void) { + char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS"); + if (env == NULL) return false; + static const char *truthy[] = {"yes", "Yes", "YES", "true", + "True", "TRUE", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == strcmp(env, truthy[i])) return true; + } + return false; +} diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index e372c18e8a..ac2c46ebe6 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H +#include <stdbool.h> + #include "src/core/iomgr/iomgr.h" #include <grpc/support/sync.h> @@ -55,4 +57,6 @@ void grpc_iomgr_platform_flush(void); /** tear down all platform specific global iomgr structures */ void grpc_iomgr_platform_shutdown(void); +bool grpc_iomgr_abort_on_leaks(void); + #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index c6b0214dea..92a0374ddd 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,8 +35,11 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H #include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/iomgr/exec_ctx.h" + #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) /* A grpc_pollset is a set of file descriptors that a higher level item is @@ -46,15 +49,11 @@ - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_posix.h" -#endif - -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_windows.h" -#endif +typedef struct grpc_pollset grpc_pollset; +typedef struct grpc_pollset_worker grpc_pollset_worker; -void grpc_pollset_init(grpc_pollset *pollset); +size_t grpc_pollset_size(void); +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); /* Begin shutting down the pollset, and call closure when done. * GRPC_POLLSET_MU(pollset) must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -83,7 +82,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset); pollset lock */ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker, gpr_timespec now, gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 4acae2bb71..2e0f27fab8 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -45,6 +45,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/block_annotate.h" diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 809f8f39da..4dddfff230 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -42,13 +42,15 @@ #include <stdlib.h> #include <string.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/support/block_annotate.h" + typedef struct { /* all polled fds */ size_t fd_count; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 1063727248..e895a77884 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,16 +42,16 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_utils_posix.h" -#include "src/core/profiling/timers.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); @@ -97,6 +97,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, uint32_t flags) { @@ -186,8 +188,9 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); -void grpc_pollset_init(grpc_pollset *pollset) { +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); + *mu = &pollset->mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; @@ -204,7 +207,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); pollset->vtable->destroy(pollset); - gpr_mu_destroy(&pollset->mu); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); @@ -246,8 +248,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + /* pollset->mu already held */ int added_worker = 0; int locked = 1; @@ -255,16 +260,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int keep_polling = 0; GPR_TIMER_BEGIN("grpc_pollset_work", 0); /* this must happen before we (potentially) drop pollset->mu */ - worker->next = worker->prev = NULL; - worker->reevaluate_polling_on_wakeup = 0; + worker.next = worker.prev = NULL; + worker.reevaluate_polling_on_wakeup = 0; if (pollset->local_wakeup_cache != NULL) { - worker->wakeup_fd = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd->next; + worker.wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { - worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd)); - grpc_wakeup_fd_init(&worker->wakeup_fd->fd); + worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + grpc_wakeup_fd_init(&worker.wakeup_fd->fd); } - worker->kicked_specifically = 0; + worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the pollset is idle, then do that work */ if (!grpc_pollset_has_workers(pollset) && @@ -293,13 +298,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, keep_polling = 0; if (!pollset->kicked_without_pollers) { if (!added_worker) { - push_front_worker(pollset, worker); + push_front_worker(pollset, &worker); added_worker = 1; - gpr_tls_set(&g_current_thread_worker, (intptr_t)worker); + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, + pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; @@ -321,10 +326,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via grpc_pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker->reevaluate_polling_on_wakeup) { - worker->reevaluate_polling_on_wakeup = 0; + if (worker.reevaluate_polling_on_wakeup) { + worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; - if (queued_work || worker->kicked_specifically) { + if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -333,12 +338,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } if (added_worker) { - remove_worker(pollset, worker); + remove_worker(pollset, &worker); gpr_tls_set(&g_current_thread_worker, 0); } /* release wakeup fd to the local pool */ - worker->wakeup_fd->next = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd; + worker.wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd; /* check shutdown conditions */ if (pollset->shutting_down) { if (grpc_pollset_has_workers(pollset)) { @@ -360,6 +365,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } } + *worker_hdl = NULL; GPR_TIMER_END("grpc_pollset_work", 0); } diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 5868b3fa21..bbedb66b00 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -37,8 +37,10 @@ #include <poll.h> #include <grpc/support/sync.h> + #include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -53,15 +55,15 @@ typedef struct grpc_cached_wakeup_fd { struct grpc_cached_wakeup_fd *next; } grpc_cached_wakeup_fd; -typedef struct grpc_pollset_worker { +struct grpc_pollset_worker { grpc_cached_wakeup_fd *wakeup_fd; int reevaluate_polling_on_wakeup; int kicked_specifically; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; -} grpc_pollset_worker; +}; -typedef struct grpc_pollset { +struct grpc_pollset { /* pollsets under posix can mutate representation as fds are added and removed. For example, we may choose a poll() based implementation on linux for @@ -81,7 +83,7 @@ typedef struct grpc_pollset { } data; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; -} grpc_pollset; +}; struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -93,8 +95,6 @@ struct grpc_pollset_vtable { void (*destroy)(grpc_pollset *pollset); }; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) - /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 09c04438f7..dddcd8313f 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,15 +41,9 @@ fd's (etc) that have been registered with the set_set to that pollset. Registering fd's automatically adds them to all current pollsets. */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_set_posix.h" -#endif +typedef struct grpc_pollset_set grpc_pollset_set; -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_set_windows.h" -#endif - -void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +grpc_pollset_set *grpc_pollset_set_create(void); void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 4ec92202e3..9dc9aff4a8 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,11 +41,30 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" -void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { +struct grpc_pollset_set { + gpr_mu mu; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; +}; + +grpc_pollset_set *grpc_pollset_set_create(void) { + grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); memset(pollset_set, 0, sizeof(*pollset_set)); gpr_mu_init(&pollset_set->mu); + return pollset_set; } void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { @@ -57,6 +76,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); + gpr_free(pollset_set); } void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 4820a61e4b..7d1aaf4181 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,23 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H #include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/pollset_posix.h" - -typedef struct grpc_pollset_set { - gpr_mu mu; - - size_t pollset_count; - size_t pollset_capacity; - grpc_pollset **pollsets; - - size_t pollset_set_count; - size_t pollset_set_capacity; - struct grpc_pollset_set **pollset_sets; - - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; -} grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 157b46ec32..3b8eca28e6 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,9 +35,9 @@ #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_set_windows.h" -void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {} +grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; } void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h index cada0d2b61..9661cd2c39 100644 --- a/src/core/iomgr/pollset_set_windows.h +++ b/src/core/iomgr/pollset_set_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H -typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 35a956b27f..c7f30f435f 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -89,12 +89,15 @@ static void push_front_worker(grpc_pollset_worker *root, worker->links[type].next->links[type].prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. We're still going to provide a minimal set of features for the sake of the rest of grpc. But grpc_pollset_work won't actually do any polling, and return as quickly as possible. */ -void grpc_pollset_init(grpc_pollset *pollset) { +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + *mu = &grpc_polling_mu; memset(pollset, 0, sizeof(*pollset)); pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = @@ -125,22 +128,25 @@ void grpc_pollset_reset(grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + int added_worker = 0; - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; - worker->kicked = 0; - worker->pollset = pollset; - gpr_cv_init(&worker->cv); + worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; + worker.kicked = 0; + worker.pollset = pollset; + gpr_cv_init(&worker.cv); if (!pollset->kicked_without_pollers && !pollset->shutting_down) { if (g_active_poller == NULL) { grpc_pollset_worker *next_worker; /* become poller */ pollset->is_iocp_worker = 1; - g_active_poller = worker; + g_active_poller = &worker; gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(exec_ctx, deadline); grpc_exec_ctx_flush(exec_ctx); @@ -167,12 +173,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, - worker); + &worker); push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, - worker); + &worker); added_worker = 1; - while (!worker->kicked) { - if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) { + while (!worker.kicked) { + if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, deadline)) { break; } } @@ -186,10 +192,11 @@ done: gpr_mu_lock(&grpc_polling_mu); } if (added_worker) { - remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); - remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET); + remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } - gpr_cv_destroy(&worker->cv); + gpr_cv_destroy(&worker.cv); + *worker_hdl = NULL; } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 65ba80619b..dc0b7a4104 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -72,8 +72,4 @@ struct grpc_pollset { grpc_closure *on_shutdown; }; -extern gpr_mu grpc_polling_mu; - -#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) - #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c76c2e3b0f..15727856ab 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -42,17 +42,19 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/time.h> extern int grpc_tcp_trace; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 048e907441..f74eb3fe51 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -40,8 +40,8 @@ #include <errno.h> #include <stdlib.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -51,9 +51,11 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/support/string.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/string.h" #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 73a21c80ab..a9d0489edf 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/fd_posix.h" /* Forward decl of grpc_server */ typedef struct grpc_server grpc_server; diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index da11df67ef..c096dbfb30 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -44,6 +44,7 @@ #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 589034fe1b..68f195ee0d 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H +#include "src/core/iomgr/wakeup_fd_posix.h" + struct grpc_fd; struct grpc_workqueue { diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 57b367d00f..332d4259d2 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -310,7 +310,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(auth_context != NULL); /* initialize members */ - GPR_ASSERT(sc->is_client_side); chand->security_connector = (grpc_channel_security_connector *)GRPC_SECURITY_CONNECTOR_REF( sc, "client_auth_filter"); diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index c58574bd6d..b4fa616fa7 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -166,7 +166,7 @@ void grpc_server_credentials_release(grpc_server_credentials *creds) { } grpc_security_status grpc_server_credentials_create_security_connector( - grpc_server_credentials *creds, grpc_security_connector **sc) { + grpc_server_credentials *creds, grpc_server_security_connector **sc) { if (creds == NULL || creds->vtable->create_security_connector == NULL) { gpr_log(GPR_ERROR, "Server credentials cannot create security context."); return GRPC_SECURITY_ERROR; @@ -298,7 +298,7 @@ static grpc_security_status ssl_create_security_connector( } static grpc_security_status ssl_server_create_security_connector( - grpc_server_credentials *creds, grpc_security_connector **sc) { + grpc_server_credentials *creds, grpc_server_security_connector **sc) { grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; return grpc_ssl_server_security_connector_create(&c->config, sc); } @@ -894,7 +894,7 @@ static grpc_security_status fake_transport_security_create_security_connector( static grpc_security_status fake_transport_security_server_create_security_connector( - grpc_server_credentials *c, grpc_security_connector **sc) { + grpc_server_credentials *c, grpc_server_security_connector **sc) { *sc = grpc_fake_server_security_connector_create(); return GRPC_SECURITY_OK; } diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index 3cd652cd57..0de4cd9468 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -234,7 +234,7 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token( typedef struct { void (*destruct)(grpc_server_credentials *c); grpc_security_status (*create_security_connector)( - grpc_server_credentials *c, grpc_security_connector **sc); + grpc_server_credentials *c, grpc_server_security_connector **sc); } grpc_server_credentials_vtable; struct grpc_server_credentials { @@ -245,7 +245,7 @@ struct grpc_server_credentials { }; grpc_security_status grpc_server_credentials_create_security_connector( - grpc_server_credentials *creds, grpc_security_connector **sc); + grpc_server_credentials *creds, grpc_server_security_connector **sc); grpc_server_credentials *grpc_server_credentials_ref( grpc_server_credentials *creds); diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index bd4b0e462a..1f4f3e4aa5 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -52,13 +52,14 @@ static grpc_channel_credentials *default_credentials = NULL; static int compute_engine_detection_done = 0; -static gpr_mu g_mu; +static gpr_mu g_state_mu; +static gpr_mu *g_polling_mu; static gpr_once g_once = GPR_ONCE_INIT; -static void init_default_credentials(void) { gpr_mu_init(&g_mu); } +static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); } typedef struct { - grpc_pollset pollset; + grpc_pollset *pollset; int is_done; int success; } compute_engine_detector; @@ -80,10 +81,10 @@ static void on_compute_engine_detection_http_response( } } } - gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset)); + gpr_mu_lock(g_polling_mu); detector->is_done = 1; - grpc_pollset_kick(&detector->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); + grpc_pollset_kick(detector->pollset, NULL); + gpr_mu_unlock(g_polling_mu); } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) { @@ -101,7 +102,8 @@ static int is_stack_running_on_compute_engine(void) { on compute engine. */ gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); - grpc_pollset_init(&detector.pollset); + detector.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(detector.pollset, &g_polling_mu); detector.is_done = 0; detector.success = 0; @@ -112,7 +114,7 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); grpc_httpcli_get( - &exec_ctx, &context, &detector.pollset, &request, + &exec_ctx, &context, detector.pollset, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), on_compute_engine_detection_http_response, &detector); @@ -120,19 +122,22 @@ static int is_stack_running_on_compute_engine(void) { /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ - gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); + gpr_mu_lock(g_polling_mu); while (!detector.is_done) { - grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &detector.pollset, &worker, + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); + gpr_mu_unlock(g_polling_mu); grpc_httpcli_context_destroy(&context); - grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset); - grpc_pollset_shutdown(&exec_ctx, &detector.pollset, &destroy_closure); + grpc_closure_init(&destroy_closure, destroy_pollset, detector.pollset); + grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure); grpc_exec_ctx_finish(&exec_ctx); + g_polling_mu = NULL; + + gpr_free(detector.pollset); return detector.success; } @@ -184,7 +189,7 @@ grpc_channel_credentials *grpc_google_default_credentials_create(void) { gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_state_mu); if (default_credentials != NULL) { result = grpc_channel_credentials_ref(default_credentials); @@ -230,19 +235,19 @@ end: gpr_log(GPR_ERROR, "Could not create google default credentials."); } } - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_state_mu); return result; } void grpc_flush_cached_google_default_credentials(void) { gpr_once_init(&g_once, init_default_credentials); - gpr_mu_lock(&g_mu); + gpr_mu_lock(&g_state_mu); if (default_credentials != NULL) { grpc_channel_credentials_unref(default_credentials); default_credentials = NULL; } compute_engine_detection_done = 0; - gpr_mu_unlock(&g_mu); + gpr_mu_unlock(&g_state_mu); } /* -- Well known credentials path. -- */ diff --git a/src/core/security/handshake.c b/src/core/security/handshake.c index a8b2fef629..b5bb6667a7 100644 --- a/src/core/security/handshake.c +++ b/src/core/security/handshake.c @@ -33,6 +33,7 @@ #include "src/core/security/handshake.h" +#include <stdbool.h> #include <string.h> #include "src/core/security/security_context.h" @@ -46,6 +47,7 @@ typedef struct { grpc_security_connector *connector; tsi_handshaker *handshaker; + bool is_client_side; unsigned char *handshake_buffer; size_t handshake_buffer_size; grpc_endpoint *wrapped_endpoint; @@ -67,9 +69,11 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup, bool success); static void security_connector_remove_handshake(grpc_security_handshake *h) { + GPR_ASSERT(!h->is_client_side); grpc_security_connector_handshake_list *node; grpc_security_connector_handshake_list *tmp; - grpc_security_connector *sc = h->connector; + grpc_server_security_connector *sc = + (grpc_server_security_connector *)h->connector; gpr_mu_lock(&sc->mu); node = sc->handshaking_handshakes; if (node && node->handshake == h) { @@ -94,7 +98,7 @@ static void security_connector_remove_handshake(grpc_security_handshake *h) { static void security_handshake_done(grpc_exec_ctx *exec_ctx, grpc_security_handshake *h, int is_success) { - if (!h->connector->is_client_side) { + if (!h->is_client_side) { security_connector_remove_handshake(h); } if (is_success) { @@ -290,6 +294,7 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, grpc_security_connector *connector, + bool is_client_side, grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data) { @@ -298,6 +303,7 @@ void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx, memset(h, 0, sizeof(grpc_security_handshake)); h->handshaker = handshaker; h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake"); + h->is_client_side = is_client_side; h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); h->wrapped_endpoint = nonsecure_endpoint; @@ -310,13 +316,15 @@ void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_init(&h->left_overs); gpr_slice_buffer_init(&h->outgoing); gpr_slice_buffer_init(&h->incoming); - if (!connector->is_client_side) { + if (!is_client_side) { + grpc_server_security_connector *server_connector = + (grpc_server_security_connector *)connector; handshake_node = gpr_malloc(sizeof(grpc_security_connector_handshake_list)); handshake_node->handshake = h; - gpr_mu_lock(&connector->mu); - handshake_node->next = connector->handshaking_handshakes; - connector->handshaking_handshakes = handshake_node; - gpr_mu_unlock(&connector->mu); + gpr_mu_lock(&server_connector->mu); + handshake_node->next = server_connector->handshaking_handshakes; + server_connector->handshaking_handshakes = handshake_node; + gpr_mu_unlock(&server_connector->mu); } send_handshake_bytes_to_peer(exec_ctx, h); } diff --git a/src/core/security/handshake.h b/src/core/security/handshake.h index 44215d16ef..db8b374921 100644 --- a/src/core/security/handshake.h +++ b/src/core/security/handshake.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,6 +41,7 @@ void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, grpc_security_connector *connector, + bool is_client_side, grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data); diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index 51a99b4492..33c62a20c2 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -33,6 +33,7 @@ #include "src/core/security/security_connector.h" +#include <stdbool.h> #include <string.h> #include <grpc/support/alloc.h> @@ -110,31 +111,39 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, return NULL; } -void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx, - grpc_security_connector *connector) { +void grpc_server_security_connector_shutdown( + grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector) { grpc_security_connector_handshake_list *tmp; - if (!connector->is_client_side) { - gpr_mu_lock(&connector->mu); - while (connector->handshaking_handshakes) { - tmp = connector->handshaking_handshakes; - grpc_security_handshake_shutdown( - exec_ctx, connector->handshaking_handshakes->handshake); - connector->handshaking_handshakes = tmp->next; - gpr_free(tmp); - } - gpr_mu_unlock(&connector->mu); + gpr_mu_lock(&connector->mu); + while (connector->handshaking_handshakes) { + tmp = connector->handshaking_handshakes; + grpc_security_handshake_shutdown( + exec_ctx, connector->handshaking_handshakes->handshake); + connector->handshaking_handshakes = tmp->next; + gpr_free(tmp); + } + gpr_mu_unlock(&connector->mu); +} + +void grpc_channel_security_connector_do_handshake( + grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, + void *user_data) { + if (sc == NULL || nonsecure_endpoint == NULL) { + cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); + } else { + sc->do_handshake(exec_ctx, sc, nonsecure_endpoint, cb, user_data); } } -void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_security_connector *sc, - grpc_endpoint *nonsecure_endpoint, - grpc_security_handshake_done_cb cb, - void *user_data) { +void grpc_server_security_connector_do_handshake( + grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, + grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, void *user_data) { if (sc == NULL || nonsecure_endpoint == NULL) { cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL); } else { - sc->vtable->do_handshake(exec_ctx, sc, nonsecure_endpoint, cb, user_data); + sc->do_handshake(exec_ctx, sc, acceptor, nonsecure_endpoint, cb, user_data); } } @@ -248,7 +257,8 @@ static void fake_channel_destroy(grpc_security_connector *sc) { } static void fake_server_destroy(grpc_security_connector *sc) { - gpr_mu_destroy(&sc->mu); + grpc_server_security_connector *c = (grpc_server_security_connector *)sc; + gpr_mu_destroy(&c->mu); gpr_free(sc); } @@ -298,49 +308,52 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx, } static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_security_connector *sc, + grpc_channel_security_connector *sc, grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data) { - grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(1), sc, - nonsecure_endpoint, cb, user_data); + grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(1), &sc->base, + true, nonsecure_endpoint, cb, user_data); } static void fake_server_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_security_connector *sc, + grpc_server_security_connector *sc, + grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data) { - grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), sc, - nonsecure_endpoint, cb, user_data); + grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), &sc->base, + false, nonsecure_endpoint, cb, user_data); } static grpc_security_connector_vtable fake_channel_vtable = { - fake_channel_destroy, fake_channel_do_handshake, fake_check_peer}; + fake_channel_destroy, fake_check_peer}; -static grpc_security_connector_vtable fake_server_vtable = { - fake_server_destroy, fake_server_do_handshake, fake_check_peer}; +static grpc_security_connector_vtable fake_server_vtable = {fake_server_destroy, + fake_check_peer}; grpc_channel_security_connector *grpc_fake_channel_security_connector_create( grpc_call_credentials *request_metadata_creds) { grpc_channel_security_connector *c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); gpr_ref_init(&c->base.refcount, 1); - c->base.is_client_side = 1; c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; c->base.vtable = &fake_channel_vtable; c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->check_call_host = fake_channel_check_call_host; + c->do_handshake = fake_channel_do_handshake; return c; } -grpc_security_connector *grpc_fake_server_security_connector_create(void) { - grpc_security_connector *c = gpr_malloc(sizeof(grpc_security_connector)); - memset(c, 0, sizeof(grpc_security_connector)); - gpr_ref_init(&c->refcount, 1); - c->is_client_side = 0; - c->vtable = &fake_server_vtable; - c->url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; +grpc_server_security_connector *grpc_fake_server_security_connector_create( + void) { + grpc_server_security_connector *c = + gpr_malloc(sizeof(grpc_server_security_connector)); + memset(c, 0, sizeof(*c)); + gpr_ref_init(&c->base.refcount, 1); + c->base.vtable = &fake_server_vtable; + c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; + c->do_handshake = fake_server_do_handshake; gpr_mu_init(&c->mu); return c; } @@ -355,7 +368,7 @@ typedef struct { } grpc_ssl_channel_security_connector; typedef struct { - grpc_security_connector base; + grpc_server_security_connector base; tsi_ssl_handshaker_factory *handshaker_factory; } grpc_ssl_server_security_connector; @@ -378,12 +391,12 @@ static void ssl_server_destroy(grpc_security_connector *sc) { if (c->handshaker_factory != NULL) { tsi_ssl_handshaker_factory_destroy(c->handshaker_factory); } - gpr_mu_destroy(&sc->mu); + gpr_mu_destroy(&c->base.mu); gpr_free(sc); } static grpc_security_status ssl_create_handshaker( - tsi_ssl_handshaker_factory *handshaker_factory, int is_client, + tsi_ssl_handshaker_factory *handshaker_factory, bool is_client, const char *peer_name, tsi_handshaker **handshaker) { tsi_result result = TSI_OK; if (handshaker_factory == NULL) return GRPC_SECURITY_ERROR; @@ -398,7 +411,7 @@ static grpc_security_status ssl_create_handshaker( } static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_security_connector *sc, + grpc_channel_security_connector *sc, grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data) { @@ -406,20 +419,21 @@ static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx, (grpc_ssl_channel_security_connector *)sc; tsi_handshaker *handshaker; grpc_security_status status = ssl_create_handshaker( - c->handshaker_factory, 1, + c->handshaker_factory, true, c->overridden_target_name != NULL ? c->overridden_target_name : c->target_name, &handshaker); if (status != GRPC_SECURITY_OK) { cb(exec_ctx, user_data, status, NULL, NULL); } else { - grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb, - user_data); + grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true, + nonsecure_endpoint, cb, user_data); } } static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_security_connector *sc, + grpc_server_security_connector *sc, + grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, void *user_data) { @@ -427,12 +441,12 @@ static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx, (grpc_ssl_server_security_connector *)sc; tsi_handshaker *handshaker; grpc_security_status status = - ssl_create_handshaker(c->handshaker_factory, 0, NULL, &handshaker); + ssl_create_handshaker(c->handshaker_factory, false, NULL, &handshaker); if (status != GRPC_SECURITY_OK) { cb(exec_ctx, user_data, status, NULL, NULL); } else { - grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb, - user_data); + grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, false, + nonsecure_endpoint, cb, user_data); } } @@ -603,10 +617,10 @@ static void ssl_channel_check_call_host(grpc_exec_ctx *exec_ctx, } static grpc_security_connector_vtable ssl_channel_vtable = { - ssl_channel_destroy, ssl_channel_do_handshake, ssl_channel_check_peer}; + ssl_channel_destroy, ssl_channel_check_peer}; static grpc_security_connector_vtable ssl_server_vtable = { - ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer}; + ssl_server_destroy, ssl_server_check_peer}; static gpr_slice compute_default_pem_root_certs_once(void) { gpr_slice result = gpr_empty_slice(); @@ -700,11 +714,11 @@ grpc_security_status grpc_ssl_channel_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.vtable = &ssl_channel_vtable; - c->base.base.is_client_side = 1; c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = ssl_channel_check_call_host; + c->base.do_handshake = ssl_channel_do_handshake; gpr_split_host_port(target_name, &c->target_name, &port); gpr_free(port); if (overridden_target_name != NULL) { @@ -735,7 +749,7 @@ error: } grpc_security_status grpc_ssl_server_security_connector_create( - const grpc_ssl_server_config *config, grpc_security_connector **sc) { + const grpc_ssl_server_config *config, grpc_server_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); const unsigned char **alpn_protocol_strings = gpr_malloc(sizeof(const char *) * num_alpn_protocols); @@ -759,9 +773,9 @@ grpc_security_status grpc_ssl_server_security_connector_create( c = gpr_malloc(sizeof(grpc_ssl_server_security_connector)); memset(c, 0, sizeof(grpc_ssl_server_security_connector)); - gpr_ref_init(&c->base.refcount, 1); - c->base.url_scheme = GRPC_SSL_URL_SCHEME; - c->base.vtable = &ssl_server_vtable; + gpr_ref_init(&c->base.base.refcount, 1); + c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; + c->base.base.vtable = &ssl_server_vtable; result = tsi_create_ssl_server_handshaker_factory( (const unsigned char **)config->pem_private_keys, config->pem_private_keys_sizes, @@ -774,11 +788,12 @@ grpc_security_status grpc_ssl_server_security_connector_create( if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", tsi_result_to_string(result)); - ssl_server_destroy(&c->base); + ssl_server_destroy(&c->base.base); *sc = NULL; goto error; } gpr_mu_init(&c->base.mu); + c->base.do_handshake = ssl_server_do_handshake; *sc = &c->base; gpr_free((void *)alpn_protocol_strings); gpr_free(alpn_protocol_string_lengths); diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h index 39df7821f0..1e35d3f9b7 100644 --- a/src/core/security/security_connector.h +++ b/src/core/security/security_connector.h @@ -36,6 +36,7 @@ #include <grpc/grpc_security.h> #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/tcp_server.h" #include "src/core/tsi/transport_security_interface.h" /* --- status enum. --- */ @@ -68,9 +69,6 @@ typedef void (*grpc_security_handshake_done_cb)( typedef struct { void (*destroy)(grpc_security_connector *sc); - void (*do_handshake)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, - grpc_endpoint *nonsecure_endpoint, - grpc_security_handshake_done_cb cb, void *user_data); void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, grpc_security_peer_check_cb cb, void *user_data); @@ -84,13 +82,7 @@ typedef struct grpc_security_connector_handshake_list { struct grpc_security_connector { const grpc_security_connector_vtable *vtable; gpr_refcount refcount; - int is_client_side; const char *url_scheme; - /* Used on server side only. */ - /* TODO(yangg): Create a grpc_server_security_connector with these. */ - gpr_mu mu; - grpc_security_connector_handshake_list *handshaking_handshakes; - const grpc_channel_args *channel_args; }; /* Refcounting. */ @@ -113,13 +105,6 @@ grpc_security_connector *grpc_security_connector_ref( void grpc_security_connector_unref(grpc_security_connector *policy); #endif -/* Handshake. */ -void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx, - grpc_security_connector *connector, - grpc_endpoint *nonsecure_endpoint, - grpc_security_handshake_done_cb cb, - void *user_data); - /* Check the peer. Callee takes ownership of the peer object. The callback will include the resulting auth_context. */ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, @@ -128,9 +113,6 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_security_peer_check_cb cb, void *user_data); -void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx, - grpc_security_connector *connector); - /* Util to encapsulate the connector in a channel arg. */ grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc); @@ -153,12 +135,16 @@ typedef void (*grpc_security_call_host_check_cb)(grpc_exec_ctx *exec_ctx, grpc_security_status status); struct grpc_channel_security_connector { - grpc_security_connector base; /* requires is_client_side to be non 0. */ + grpc_security_connector base; grpc_call_credentials *request_metadata_creds; void (*check_call_host)(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, grpc_auth_context *auth_context, grpc_security_call_host_check_cb cb, void *user_data); + void (*do_handshake)(grpc_exec_ctx *exec_ctx, + grpc_channel_security_connector *sc, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, void *user_data); }; /* Checks that the host that will be set for a call is acceptable. */ @@ -167,6 +153,39 @@ void grpc_channel_security_connector_check_call_host( const char *host, grpc_auth_context *auth_context, grpc_security_call_host_check_cb cb, void *user_data); +/* Handshake. */ +void grpc_channel_security_connector_do_handshake( + grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector, + grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb, + void *user_data); + +/* --- server_security_connector object. --- + + A server security connector object represents away to configure the + underlying transport security mechanism on the server side. */ + +typedef struct grpc_server_security_connector grpc_server_security_connector; + +struct grpc_server_security_connector { + grpc_security_connector base; + gpr_mu mu; + grpc_security_connector_handshake_list *handshaking_handshakes; + const grpc_channel_args *channel_args; + void (*do_handshake)(grpc_exec_ctx *exec_ctx, + grpc_server_security_connector *sc, + grpc_tcp_server_acceptor *acceptor, + grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, void *user_data); +}; + +void grpc_server_security_connector_do_handshake( + grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, + grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint, + grpc_security_handshake_done_cb cb, void *user_data); + +void grpc_server_security_connector_shutdown( + grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector); + /* --- Creation security connectors. --- */ /* For TESTING ONLY! @@ -176,7 +195,8 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( /* For TESTING ONLY! Creates a fake connector that emulates real server security. */ -grpc_security_connector *grpc_fake_server_security_connector_create(void); +grpc_server_security_connector *grpc_fake_server_security_connector_create( + void); /* Config for ssl clients. */ typedef struct { @@ -231,7 +251,7 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_server_security_connector_create( - const grpc_ssl_server_config *config, grpc_security_connector **sc); + const grpc_ssl_server_config *config, grpc_server_security_connector **sc); /* Util. */ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 84a883390c..91547eb26e 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -55,7 +55,7 @@ typedef struct grpc_server_secure_state { grpc_server *server; grpc_tcp_server *tcp; - grpc_security_connector *sc; + grpc_server_security_connector *sc; grpc_server_credentials *creds; int is_shutdown; gpr_mu mu; @@ -74,7 +74,7 @@ static void state_unref(grpc_server_secure_state *state) { gpr_mu_lock(&state->mu); gpr_mu_unlock(&state->mu); /* clean up */ - GRPC_SECURITY_CONNECTOR_UNREF(state->sc, "server"); + GRPC_SECURITY_CONNECTOR_UNREF(&state->sc->base, "server"); grpc_server_credentials_unref(state->creds); gpr_free(state); } @@ -130,8 +130,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, grpc_tcp_server_acceptor *acceptor) { grpc_server_secure_state *state = statep; state_ref(state); - grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp, - on_secure_handshake_done, state); + grpc_server_security_connector_do_handshake( + exec_ctx, state->sc, acceptor, tcp, on_secure_handshake_done, state); } /* Server callback: start listening on our ports */ @@ -148,7 +148,7 @@ static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) { state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, success); } - grpc_security_connector_shutdown(exec_ctx, state->sc); + grpc_server_security_connector_shutdown(exec_ctx, state->sc); state_unref(state); } @@ -176,7 +176,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, int port_num = -1; int port_temp; grpc_security_status status = GRPC_SECURITY_ERROR; - grpc_security_connector *sc = NULL; + grpc_server_security_connector *sc = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE( @@ -256,7 +256,7 @@ error: grpc_tcp_server_unref(&exec_ctx, tcp); } else { if (sc) { - GRPC_SECURITY_CONNECTOR_UNREF(sc, "server"); + GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server"); } if (state) { gpr_free(state); diff --git a/src/core/support/sync.c b/src/core/support/sync.c index d368422d9e..69e3e39c5c 100644 --- a/src/core/support/sync.c +++ b/src/core/support/sync.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -98,6 +98,11 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); } void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); } +void gpr_ref_non_zero(gpr_refcount *r) { + gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1); + GPR_ASSERT(prior > 0); +} + void gpr_refn(gpr_refcount *r, int n) { gpr_atm_no_barrier_fetch_add(&r->count, n); } diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c index fb496f6c47..8169ede065 100644 --- a/src/core/surface/alarm.c +++ b/src/core/surface/alarm.c @@ -64,8 +64,9 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->tag = tag; grpc_cq_begin_op(cq, tag); - grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm, - gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(&exec_ctx, &alarm->alarm, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + alarm_cb, alarm, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_exec_ctx_finish(&exec_ctx); return alarm; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 0a80680f02..f6a95ebbd3 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,26 +36,29 @@ #include <stdio.h> #include <string.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/timer.h" +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" -#include "src/core/profiling/timers.h" -#include <grpc/support/alloc.h> -#include <grpc/support/atm.h> -#include <grpc/support/log.h> -#include <grpc/support/time.h> typedef struct { - grpc_pollset_worker *worker; + grpc_pollset_worker **worker; void *tag; } plucker; /* Completion queue structure */ struct grpc_completion_queue { + /** owned by pollset */ + gpr_mu *mu; /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -63,8 +66,6 @@ struct grpc_completion_queue { gpr_refcount pending_events; /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /** the set of low level i/o things that concern this cq */ - grpc_pollset pollset; /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; @@ -82,6 +83,8 @@ struct grpc_completion_queue { grpc_completion_queue *next_free; }; +#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) + static gpr_mu g_freelist_mu; grpc_completion_queue *g_freelist; @@ -94,7 +97,7 @@ void grpc_cq_global_shutdown(void) { gpr_mu_destroy(&g_freelist_mu); while (g_freelist) { grpc_completion_queue *next = g_freelist->next_free; - grpc_pollset_destroy(&g_freelist->pollset); + grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist)); #ifndef NDEBUG gpr_free(g_freelist->outstanding_tags); #endif @@ -124,8 +127,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { if (g_freelist == NULL) { gpr_mu_unlock(&g_freelist_mu); - cc = gpr_malloc(sizeof(grpc_completion_queue)); - grpc_pollset_init(&cc->pollset); + cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); + grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; @@ -184,7 +187,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_reset(&cc->pollset); + grpc_pollset_reset(POLLSET_FROM_CQ(cc)); gpr_mu_lock(&g_freelist_mu); cc->next_free = g_freelist; g_freelist = cc; @@ -194,7 +197,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { #ifndef NDEBUG - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); GPR_ASSERT(!cc->shutdown_called); if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); @@ -203,7 +206,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { cc->outstanding_tag_capacity); } cc->outstanding_tags[cc->outstanding_tag_count++] = tag; - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); #endif gpr_ref(&cc->pending_events); } @@ -231,7 +234,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, storage->next = ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0)); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); #ifndef NDEBUG for (i = 0; i < (int)cc->outstanding_tag_count; i++) { if (cc->outstanding_tags[i] == tag) { @@ -252,12 +255,12 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, pluck_worker = NULL; for (i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag) { - pluck_worker = cc->pluckers[i].worker; + pluck_worker = *cc->pluckers[i].worker; break; } } - grpc_pollset_kick(&cc->pollset, pluck_worker); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + gpr_mu_unlock(cc->mu); } else { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -265,8 +268,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); + gpr_mu_unlock(cc->mu); } GPR_TIMER_END("grpc_cq_end_op", 0); @@ -275,7 +279,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; int first_loop = 1; gpr_timespec now; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -294,7 +298,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); for (;;) { if (cc->completed_tail != &cc->completed_head) { grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; @@ -302,7 +306,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, if (c == cc->completed_tail) { cc->completed_tail = &cc->completed_head; } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -310,14 +314,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } if (cc->shutdown) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -330,11 +334,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); + continue; } else { - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); } } @@ -348,7 +353,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } static int add_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { + grpc_pollset_worker **worker) { if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -359,7 +364,7 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, } static void del_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { + grpc_pollset_worker **worker) { int i; for (i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) { @@ -376,7 +381,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; gpr_timespec now; int first_loop = 1; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -395,7 +400,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); for (;;) { prev = &cc->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != @@ -405,7 +410,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, if (c == cc->completed_tail) { cc->completed_tail = prev; } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -415,7 +420,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (cc->shutdown) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -425,7 +430,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -434,7 +439,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -447,11 +452,11 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); } else { - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); } del_plucker(cc, tag, &worker); @@ -472,9 +477,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(cc->mu); if (cc->shutdown_called) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } @@ -482,9 +487,10 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); + grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(cc->mu); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -498,7 +504,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return &cc->pollset; + return POLLSET_FROM_CQ(cc); } void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 9c04426d87..aadfac4c91 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -130,9 +130,9 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, bool success) { connector *c = arg; - grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base, - c->connecting_endpoint, - on_secure_handshake_done, c); + grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector, + c->connecting_endpoint, + on_secure_handshake_done, c); } static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { @@ -153,9 +153,8 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, &c->initial_string_sent); } else { - grpc_security_connector_do_handshake(exec_ctx, - &c->security_connector->base, tcp, - on_secure_handshake_done, c); + grpc_channel_security_connector_do_handshake( + exec_ctx, c->security_connector, tcp, on_secure_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index d76d31be23..891aad6ef2 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -417,7 +417,7 @@ typedef struct { /** HTTP2 stream id for this stream, or zero if one has not been assigned */ uint32_t id; uint8_t fetching; - uint8_t sent_initial_metadata; + bool sent_initial_metadata; uint8_t sent_message; uint8_t sent_trailing_metadata; uint8_t read_closed; @@ -509,7 +509,7 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *global, grpc_chttp2_transport_parsing *parsing); -void grpc_chttp2_list_add_writable_stream( +bool grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); /** Get a writable stream @@ -519,14 +519,13 @@ int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); -void grpc_chttp2_list_remove_writable_stream( +bool grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); + grpc_chttp2_stream_global *stream_global) GRPC_MUST_USE_RESULT; -/* returns 1 if stream added, 0 if it was already present */ -int grpc_chttp2_list_add_writing_stream( +void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT; + grpc_chttp2_stream_writing *stream_writing); int grpc_chttp2_list_have_writing_streams( grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_list_pop_writing_stream( @@ -770,4 +769,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *parsing, const uint8_t *opaque_8bytes); +/** add a ref to the stream and add it to the writable list; + ref will be dropped in writing.c */ +void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); + #endif diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 8fdebd7f13..0516f39fa9 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -149,7 +149,7 @@ void grpc_chttp2_publish_reads( if (was_zero && !is_zero) { while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, &stream_global)) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -178,7 +178,7 @@ void grpc_chttp2_publish_reads( outgoing_window); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index b284c78818..60fe735cfc 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -100,11 +100,14 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, } } -static void stream_list_maybe_remove(grpc_chttp2_transport *t, +static bool stream_list_maybe_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) { if (s->included[id]) { stream_list_remove(t, s, id); + return true; + } else { + return false; } } @@ -125,23 +128,24 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, s->included[id] = 1; } -static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_chttp2_stream_list_id id) { +static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + grpc_chttp2_stream_list_id id) { if (s->included[id]) { - return 0; + return false; } stream_list_add_tail(t, s, id); - return 1; + return true; } /* wrappers for specializations */ -void grpc_chttp2_list_add_writable_stream( +bool grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { GPR_ASSERT(stream_global->id != 0); - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); + return stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); } int grpc_chttp2_list_pop_writable_stream( @@ -159,20 +163,20 @@ int grpc_chttp2_list_pop_writable_stream( return r; } -void grpc_chttp2_list_remove_writable_stream( +bool grpc_chttp2_list_remove_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_WRITABLE); + return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); } -int grpc_chttp2_list_add_writing_stream( +void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), - STREAM_FROM_WRITING(stream_writing), - GRPC_CHTTP2_LIST_WRITING); + GPR_ASSERT(stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_WRITING)); } int grpc_chttp2_list_have_writing_streams( @@ -332,7 +336,7 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport( while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { if (is_window_available) { - grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); + grpc_chttp2_become_writable(&transport->global, &stream->global); } else { grpc_chttp2_list_add_stalled_by_transport(transport_writing, &stream->writing); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 356fd8174a..107725cbc7 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -83,7 +83,8 @@ int grpc_chttp2_unlocking_check_writes( (according to available window sizes) and add to the output buffer */ while (grpc_chttp2_list_pop_writable_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { - uint8_t sent_initial_metadata; + bool sent_initial_metadata = stream_writing->sent_initial_metadata; + bool become_writable = false; stream_writing->id = stream_global->id; stream_writing->read_closed = stream_global->read_closed; @@ -92,16 +93,12 @@ int grpc_chttp2_unlocking_check_writes( outgoing_window, stream_global, outgoing_window); - sent_initial_metadata = stream_writing->sent_initial_metadata; if (!sent_initial_metadata && stream_global->send_initial_metadata) { stream_writing->send_initial_metadata = stream_global->send_initial_metadata; stream_global->send_initial_metadata = NULL; - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } - sent_initial_metadata = 1; + become_writable = true; + sent_initial_metadata = true; } if (sent_initial_metadata) { if (stream_global->send_message != NULL) { @@ -128,10 +125,7 @@ int grpc_chttp2_unlocking_check_writes( stream_writing->flow_controlled_buffer.length > 0) && stream_writing->outgoing_window > 0) { if (transport_writing->outgoing_window > 0) { - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; } else { grpc_chttp2_list_add_stalled_by_transport(transport_writing, stream_writing); @@ -141,10 +135,7 @@ int grpc_chttp2_unlocking_check_writes( stream_writing->send_trailing_metadata = stream_global->send_trailing_metadata; stream_global->send_trailing_metadata = NULL; - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; } } @@ -153,10 +144,13 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing, announce_window, stream_global, unannounced_incoming_window_for_writing); - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); - } + become_writable = true; + } + + if (become_writable) { + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); + } else { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing"); } } @@ -310,10 +304,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, (stream_writing->send_message && !stream_writing->fetching)) && stream_writing->outgoing_window > 0) { if (transport_writing->outgoing_window > 0) { - if (grpc_chttp2_list_add_writing_stream(transport_writing, - stream_writing)) { - /* do nothing - already reffed */ - } + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } else { grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, stream_writing); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index b9f511e946..a1ca78d4c4 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -142,7 +142,7 @@ static void incoming_byte_stream_update_flow_control( static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global); -/* +/******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -521,7 +521,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->global.id) == NULL); } - grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global, &s->global); grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global); @@ -583,7 +582,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( return &accepting->parsing; } -/* +/******************************************************************************* * LOCK MANAGEMENT */ @@ -611,10 +610,18 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { GPR_TIMER_END("unlock", 0); } -/* +/******************************************************************************* * OUTPUT PROCESSING */ +void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && + grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { + GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + } +} + static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters *sp = @@ -732,7 +739,7 @@ static void maybe_start_some_streams( stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = 1; transport_global->concurrent_stream_count++; - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -821,7 +828,7 @@ static void perform_stream_op_locked( maybe_start_some_streams(exec_ctx, transport_global); } else { GPR_ASSERT(stream_global->id != 0); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } else { grpc_chttp2_complete_closure_step( @@ -838,7 +845,7 @@ static void perform_stream_op_locked( exec_ctx, &stream_global->send_message_finished, 0); } else if (stream_global->id != 0) { stream_global->send_message = op->send_message; - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -858,7 +865,7 @@ static void perform_stream_op_locked( } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -999,7 +1006,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } } -/* +/******************************************************************************* * INPUT PROCESSING */ @@ -1064,7 +1071,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (!s) { s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id); } - grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); GPR_ASSERT(s); s->global.in_stream_map = 0; if (t->parsing.incoming_stream == &s->parsing) { @@ -1080,6 +1086,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) { close_transport_locked(exec_ctx, t); } + if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) { + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing"); + } new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) + grpc_chttp2_stream_map_size(&t->new_stream_map); @@ -1331,7 +1340,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) { is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -1426,7 +1435,7 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) { GPR_TIMER_END("recv_data", 0); } -/* +/******************************************************************************* * CALLBACK LOOP */ @@ -1440,7 +1449,7 @@ static void connectivity_state_set( state, reason); } -/* +/******************************************************************************* * POLLSET STUFF */ @@ -1468,7 +1477,7 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, unlock(exec_ctx, t); } -/* +/******************************************************************************* * BYTE STREAM */ @@ -1508,7 +1517,7 @@ static void incoming_byte_stream_update_flow_control( add_max_recv_bytes); grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, stream_global); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + grpc_chttp2_become_writable(transport_global, stream_global); } } @@ -1623,7 +1632,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( return incoming_byte_stream; } -/* +/******************************************************************************* * TRACING */ @@ -1709,7 +1718,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, gpr_free(prefix); } -/* +/******************************************************************************* * INTEGRATION GLUE */ diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index 14912af7df..807ae071a3 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -43,11 +43,13 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/time.h> + #include "src/core/profiling/timers.h" #include "src/core/support/murmur_hash.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/bin_encoder.h" #include "src/core/transport/static_metadata.h" +#include "src/core/iomgr/iomgr_internal.h" /* There are two kinds of mdelem and mdstr instances. * Static instances are declared in static_metadata.{h,c} and @@ -227,6 +229,9 @@ void grpc_mdctx_global_shutdown(void) { if (shard->count != 0) { gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked", shard->count); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } gpr_free(shard->elems); } @@ -237,6 +242,9 @@ void grpc_mdctx_global_shutdown(void) { if (shard->count != 0) { gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked", shard->count); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } gpr_free(shard->strs); } diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 6e154b629a..3b555fa933 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -45,7 +45,7 @@ void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { #else void grpc_stream_ref(grpc_stream_refcount *refcount) { #endif - gpr_ref(&refcount->refs); + gpr_ref_non_zero(&refcount->refs); } #ifdef GRPC_STREAM_REFCOUNT_DEBUG |