aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar vjpai <vpai@google.com>2016-03-03 06:59:16 -0800
committerGravatar vjpai <vpai@google.com>2016-03-03 06:59:16 -0800
commitb7117e409bf35895dd9029bd811ac16fd02e3d83 (patch)
tree732339fe04361ad8ebfdb168dedc465c9cfdd5d5 /src/core
parentc47c9497fa6518205a598c2bf74517df09541fed (diff)
parent40d27ba78455d9c5ef1feef250d0d889d6bfb746 (diff)
Merge branch 'master' into debug_qps_stream
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/context.c149
-rw-r--r--src/core/channel/client_channel.c31
-rw-r--r--src/core/client_config/lb_policies/pick_first.c21
-rw-r--r--src/core/client_config/lb_policies/round_robin.c15
-rw-r--r--src/core/client_config/lb_policy.c6
-rw-r--r--src/core/client_config/lb_policy.h5
-rw-r--r--src/core/client_config/subchannel.c24
-rw-r--r--src/core/httpcli/httpcli.c24
-rw-r--r--src/core/httpcli/httpcli.h5
-rw-r--r--src/core/httpcli/httpcli_security_connector.c14
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/iomgr.c16
-rw-r--r--src/core/iomgr/iomgr_internal.h6
-rw-r--r--src/core/iomgr/pollset.h19
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c1
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c8
-rw-r--r--src/core/iomgr/pollset_posix.c54
-rw-r--r--src/core/iomgr/pollset_posix.h12
-rw-r--r--src/core/iomgr/pollset_set.h12
-rw-r--r--src/core/iomgr/pollset_set_posix.c26
-rw-r--r--src/core/iomgr/pollset_set_posix.h20
-rw-r--r--src/core/iomgr/pollset_set_windows.c6
-rw-r--r--src/core/iomgr/pollset_set_windows.h4
-rw-r--r--src/core/iomgr/pollset_windows.c41
-rw-r--r--src/core/iomgr/pollset_windows.h6
-rw-r--r--src/core/iomgr/tcp_client_posix.c12
-rw-r--r--src/core/iomgr/tcp_posix.c6
-rw-r--r--src/core/iomgr/udp_server.h1
-rw-r--r--src/core/iomgr/workqueue_posix.c1
-rw-r--r--src/core/iomgr/workqueue_posix.h4
-rw-r--r--src/core/security/client_auth_filter.c1
-rw-r--r--src/core/security/credentials.c6
-rw-r--r--src/core/security/credentials.h6
-rw-r--r--src/core/security/google_default_credentials.c41
-rw-r--r--src/core/security/handshake.c22
-rw-r--r--src/core/security/handshake.h3
-rw-r--r--src/core/security/security_connector.c125
-rw-r--r--src/core/security/security_connector.h64
-rw-r--r--src/core/security/server_secure_chttp2.c14
-rw-r--r--src/core/support/sync.c7
-rw-r--r--src/core/surface/alarm.c5
-rw-r--r--src/core/surface/completion_queue.c96
-rw-r--r--src/core/surface/secure_channel_create.c11
-rw-r--r--src/core/transport/chttp2/internal.h18
-rw-r--r--src/core/transport/chttp2/parsing.c6
-rw-r--r--src/core/transport/chttp2/stream_lists.c38
-rw-r--r--src/core/transport/chttp2/writing.c37
-rw-r--r--src/core/transport/chttp2_transport.c43
-rw-r--r--src/core/transport/metadata.c8
-rw-r--r--src/core/transport/transport.c2
50 files changed, 599 insertions, 509 deletions
diff --git a/src/core/census/context.c b/src/core/census/context.c
index e60330de64..89b8ee0b39 100644
--- a/src/core/census/context.c
+++ b/src/core/census/context.c
@@ -60,10 +60,10 @@
// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS.
// * Keep all tag information (keys/values/flags) in a single memory buffer,
// that can be directly copied to the wire.
-// * Binary tags share the same structure as, but are encoded separately from,
-// non-binary tags. This is primarily because non-binary tags are far more
-// likely to be repeated across multiple RPC calls, so are more efficiently
-// cached and compressed in any metadata schemes.
+
+// min and max valid chars in tag keys and values. All printable ASCII is OK.
+#define MIN_VALID_TAG_CHAR 32 // ' '
+#define MAX_VALID_TAG_CHAR 126 // '~'
// Structure representing a set of tags. Essentially a count of number of tags
// present, and pointer to a chunk of memory that contains the per-tag details.
@@ -77,7 +77,7 @@ struct tag_set {
char *kvm; // key/value memory. Consists of repeated entries of:
// Offset Size Description
// 0 1 Key length, including trailing 0. (K)
- // 1 1 Value length. (V)
+ // 1 1 Value length, including trailing 0 (V)
// 2 1 Flags
// 3 K Key bytes
// 3 + K V Value bytes
@@ -108,19 +108,36 @@ struct raw_tag {
#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED
#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED)
-// Primary (external) representation of a context. Composed of 3 underlying
-// tag_set structs, one for each of the binary/printable propagated tags, and
-// one for everything else. This is to efficiently support tag
-// encoding/decoding.
+// Primary representation of a context. Composed of 2 underlying tag_set
+// structs, one each for propagated and local (non-propagated) tags. This is
+// to efficiently support tag encoding/decoding.
+// TODO(aveitch): need to add tracing id's/structure.
struct census_context {
- struct tag_set tags[3];
+ struct tag_set tags[2];
census_context_status status;
};
// Indices into the tags member of census_context
#define PROPAGATED_TAGS 0
-#define PROPAGATED_BINARY_TAGS 1
-#define LOCAL_TAGS 2
+#define LOCAL_TAGS 1
+
+// Validate (check all characters are in range and size is less than limit) a
+// key or value string. Returns 0 if the string is invalid, or the length
+// (including terminator) if valid.
+static size_t validate_tag(const char *kv) {
+ size_t len = 1;
+ char ch;
+ while ((ch = *kv++) != 0) {
+ if (ch < MIN_VALID_TAG_CHAR || ch > MAX_VALID_TAG_CHAR) {
+ return 0;
+ }
+ len++;
+ }
+ if (len > CENSUS_MAX_TAG_KV_LEN) {
+ return 0;
+ }
+ return len;
+}
// Extract a raw tag given a pointer (raw) to the tag header. Allow for some
// extra bytes in the tag header (see encode/decode functions for usage: this
@@ -166,9 +183,7 @@ static bool context_delete_tag(census_context *context, const census_tag *tag,
size_t key_len) {
return (
tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) ||
- tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len) ||
- tag_set_delete_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag->key,
- key_len));
+ tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len));
}
// Add a tag to a tag_set. Return true on success, false if the tag could
@@ -176,11 +191,11 @@ static bool context_delete_tag(census_context *context, const census_tag *tag,
// not be called if the tag may already exist (in a non-deleted state) in
// the tag_set, as that would result in two tags with the same key.
static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
- size_t key_len) {
+ size_t key_len, size_t value_len) {
if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) {
return false;
}
- const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE;
+ const size_t tag_size = key_len + value_len + TAG_HEADER_SIZE;
if (tags->kvm_used + tag_size > tags->kvm_size) {
// allocate new memory if needed
tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE;
@@ -191,13 +206,12 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
}
char *kvp = tags->kvm + tags->kvm_used;
*kvp++ = (char)key_len;
- *kvp++ = (char)tag->value_len;
+ *kvp++ = (char)value_len;
// ensure reserved flags are not used.
- *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS |
- CENSUS_TAG_BINARY));
+ *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS));
memcpy(kvp, tag->key, key_len);
kvp += key_len;
- memcpy(kvp, tag->value, tag->value_len);
+ memcpy(kvp, tag->value, value_len);
tags->kvm_used += tag_size;
tags->ntags++;
tags->ntags_alloc++;
@@ -207,30 +221,20 @@ static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
// Add/modify/delete a tag to/in a context. Caller must validate that tag key
// etc. are valid.
static void context_modify_tag(census_context *context, const census_tag *tag,
- size_t key_len) {
+ size_t key_len, size_t value_len) {
// First delete the tag if it is already present.
bool deleted = context_delete_tag(context, tag, key_len);
- // Determine if we need to add it back.
- bool call_add = tag->value != NULL && tag->value_len != 0;
bool added = false;
- if (call_add) {
- if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
- if (CENSUS_TAG_IS_BINARY(tag->flags)) {
- added = tag_set_add_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag,
- key_len);
- } else {
- added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len);
- }
- } else {
- added = tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len);
- }
+ if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
+ added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len,
+ value_len);
+ } else {
+ added =
+ tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len, value_len);
}
+
if (deleted) {
- if (call_add) {
- context->status.n_modified_tags++;
- } else {
- context->status.n_deleted_tags++;
- }
+ context->status.n_modified_tags++;
} else {
if (added) {
context->status.n_added_tags++;
@@ -292,8 +296,6 @@ census_context *census_context_create(const census_context *base,
memset(context, 0, sizeof(census_context));
} else {
tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]);
- tag_set_copy(&context->tags[PROPAGATED_BINARY_TAGS],
- &base->tags[PROPAGATED_BINARY_TAGS]);
tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]);
memset(&context->status, 0, sizeof(context->status));
}
@@ -301,22 +303,29 @@ census_context *census_context_create(const census_context *base,
// the context to add/replace/delete as required.
for (int i = 0; i < ntags; i++) {
const census_tag *tag = &tags[i];
- size_t key_len = strlen(tag->key) + 1;
- // ignore the tag if it is too long/short.
- if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN &&
- tag->value_len <= CENSUS_MAX_TAG_KV_LEN) {
- context_modify_tag(context, tag, key_len);
- } else {
+ size_t key_len = validate_tag(tag->key);
+ // ignore the tag if it is invalid or too short.
+ if (key_len <= 1) {
context->status.n_invalid_tags++;
+ } else {
+ if (tag->value != NULL) {
+ size_t value_len = validate_tag(tag->value);
+ if (value_len != 0) {
+ context_modify_tag(context, tag, key_len, value_len);
+ } else {
+ context->status.n_invalid_tags++;
+ }
+ } else {
+ if (context_delete_tag(context, tag, key_len)) {
+ context->status.n_deleted_tags++;
+ }
+ }
}
}
// Remove any deleted tags, update status if needed, and return.
tag_set_flatten(&context->tags[PROPAGATED_TAGS]);
- tag_set_flatten(&context->tags[PROPAGATED_BINARY_TAGS]);
tag_set_flatten(&context->tags[LOCAL_TAGS]);
context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags;
- context->status.n_propagated_binary_tags =
- context->tags[PROPAGATED_BINARY_TAGS].ntags;
context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags;
if (status) {
*status = &context->status;
@@ -331,7 +340,6 @@ const census_context_status *census_context_get_status(
void census_context_destroy(census_context *context) {
gpr_free(context->tags[PROPAGATED_TAGS].kvm);
- gpr_free(context->tags[PROPAGATED_BINARY_TAGS].kvm);
gpr_free(context->tags[LOCAL_TAGS].kvm);
gpr_free(context);
}
@@ -343,9 +351,6 @@ void census_context_initialize_iterator(const census_context *context,
if (context->tags[PROPAGATED_TAGS].ntags != 0) {
iterator->base = PROPAGATED_TAGS;
iterator->kvm = context->tags[PROPAGATED_TAGS].kvm;
- } else if (context->tags[PROPAGATED_BINARY_TAGS].ntags != 0) {
- iterator->base = PROPAGATED_BINARY_TAGS;
- iterator->kvm = context->tags[PROPAGATED_BINARY_TAGS].kvm;
} else if (context->tags[LOCAL_TAGS].ntags != 0) {
iterator->base = LOCAL_TAGS;
iterator->kvm = context->tags[LOCAL_TAGS].kvm;
@@ -363,7 +368,6 @@ int census_context_next_tag(census_context_iterator *iterator,
iterator->kvm = decode_tag(&raw, iterator->kvm, 0);
tag->key = raw.key;
tag->value = raw.value;
- tag->value_len = raw.value_len;
tag->flags = raw.flags;
if (++iterator->index == iterator->context->tags[iterator->base].ntags) {
do {
@@ -388,7 +392,6 @@ static bool tag_set_get_tag(const struct tag_set *tags, const char *key,
if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) {
tag->key = raw.key;
tag->value = raw.value;
- tag->value_len = raw.value_len;
tag->flags = raw.flags;
return true;
}
@@ -403,8 +406,6 @@ int census_context_get_tag(const census_context *context, const char *key,
return 0;
}
if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) ||
- tag_set_get_tag(&context->tags[PROPAGATED_BINARY_TAGS], key, key_len,
- tag) ||
tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) {
return 1;
}
@@ -447,21 +448,9 @@ static size_t tag_set_encode(const struct tag_set *tags, char *buffer,
return ENCODED_HEADER_SIZE + tags->kvm_used;
}
-char *census_context_encode(const census_context *context, char *buffer,
- size_t buf_size, size_t *print_buf_size,
- size_t *bin_buf_size) {
- *print_buf_size =
- tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size);
- if (*print_buf_size == 0) {
- return NULL;
- }
- char *b_buffer = buffer + *print_buf_size;
- *bin_buf_size = tag_set_encode(&context->tags[PROPAGATED_BINARY_TAGS],
- b_buffer, buf_size - *print_buf_size);
- if (*bin_buf_size == 0) {
- return NULL;
- }
- return b_buffer;
+size_t census_context_encode(const census_context *context, char *buffer,
+ size_t buf_size) {
+ return tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size);
}
// Decode a tag set.
@@ -506,8 +495,7 @@ static void tag_set_decode(struct tag_set *tags, const char *buffer,
}
}
-census_context *census_context_decode(const char *buffer, size_t size,
- const char *bin_buffer, size_t bin_size) {
+census_context *census_context_decode(const char *buffer, size_t size) {
census_context *context = gpr_malloc(sizeof(census_context));
memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set));
if (buffer == NULL) {
@@ -515,16 +503,7 @@ census_context *census_context_decode(const char *buffer, size_t size,
} else {
tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size);
}
- if (bin_buffer == NULL) {
- memset(&context->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set));
- } else {
- tag_set_decode(&context->tags[PROPAGATED_BINARY_TAGS], bin_buffer,
- bin_size);
- }
memset(&context->status, 0, sizeof(context->status));
context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags;
- context->status.n_propagated_binary_tags =
- context->tags[PROPAGATED_BINARY_TAGS].ntags;
- // TODO(aveitch): check that BINARY flag is correct for each type.
return context;
}
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 7176c01b05..eeac3c146c 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -78,8 +78,8 @@ typedef struct client_channel_channel_data {
int exit_idle_when_lb_policy_arrives;
/** owning stack */
grpc_channel_stack *owning_stack;
- /** interested parties */
- grpc_pollset_set interested_parties;
+ /** interested parties (owned) */
+ grpc_pollset_set *interested_parties;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
@@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->incoming_configuration = NULL;
if (lb_policy != NULL) {
- grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties,
- &chand->interested_parties);
+ grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
+ chand->interested_parties);
}
gpr_mu_lock(&chand->mu_config);
@@ -231,9 +231,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (old_lb_policy != NULL) {
- grpc_pollset_set_del_pollset_set(exec_ctx,
- &old_lb_policy->interested_parties,
- &chand->interested_parties);
+ grpc_pollset_set_del_pollset_set(
+ exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
}
@@ -254,7 +253,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(op->set_accept_stream == NULL);
if (op->bind_pollset != NULL) {
- grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties,
+ grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
op->bind_pollset);
}
@@ -284,8 +283,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
- &chand->lb_policy->interested_parties,
- &chand->interested_parties);
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
@@ -411,7 +410,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
- grpc_pollset_set_init(&chand->interested_parties);
+ chand->interested_parties = grpc_pollset_set_create();
}
/* Destructor for channel_data */
@@ -425,12 +424,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
- &chand->lb_policy->interested_parties,
- &chand->interested_parties);
+ chand->lb_policy->interested_parties,
+ chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
- grpc_pollset_set_destroy(&chand->interested_parties);
+ grpc_pollset_set_destroy(chand->interested_parties);
gpr_mu_destroy(&chand->mu_config);
}
@@ -501,7 +500,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
bool iomgr_success) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
- grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
w->pollset);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
@@ -517,7 +516,7 @@ void grpc_client_channel_watch_connectivity_state(
w->chand = chand;
w->pollset = pollset;
w->on_complete = on_complete;
- grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 459bbebb68..81167b31c8 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -31,8 +31,8 @@
*
*/
-#include "src/core/client_config/lb_policy_factory.h"
#include "src/core/client_config/lb_policies/pick_first.h"
+#include "src/core/client_config/lb_policy_factory.h"
#include <string.h>
@@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
@@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
@@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- &p->base.interested_parties, &p->checking_connectivity,
+ p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
}
@@ -195,8 +195,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
- pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -253,7 +252,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, &p->base.interested_parties,
+ exec_ctx, selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
@@ -278,13 +277,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
- grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, selected, &p->base.interested_parties,
+ exec_ctx, selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
@@ -298,7 +297,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- &p->base.interested_parties, &p->checking_connectivity,
+ p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
} else {
goto loop;
@@ -311,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
- &p->base.interested_parties, &p->checking_connectivity,
+ p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index b1171c45b0..98d9acc75b 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
@@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
subchannel_data *sd = p->subchannels[i];
sd->connectivity_state = GRPC_CHANNEL_IDLE;
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, &p->base.interested_parties,
+ exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity");
}
@@ -322,8 +322,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties,
- pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -374,13 +373,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
+ grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, &p->base.interested_parties,
+ exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_CONNECTING:
@@ -389,13 +388,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
sd->connectivity_state,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, &p->base.interested_parties,
+ exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* renew state notification */
grpc_subchannel_notify_on_state_change(
- exec_ctx, sd->subchannel, &p->base.interested_parties,
+ exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
/* remove from ready list if still present */
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index d4672f6b25..0d8b007336 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable) {
policy->vtable = vtable;
gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS);
- grpc_pollset_set_init(&policy->interested_parties);
+ policy->interested_parties = grpc_pollset_set_create();
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
gpr_atm old_val =
ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF"));
if (old_val == 1) {
- grpc_pollset_set_destroy(&policy->interested_parties);
+ grpc_pollset_set_destroy(policy->interested_parties);
policy->vtable->destroy(exec_ctx, policy);
}
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index db5238c8ca..3457390606 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -48,7 +48,8 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
gpr_atm ref_pair;
- grpc_pollset_set interested_parties;
+ /* owned pointer to interested parties in load balancing decisions */
+ grpc_pollset_set *interested_parties;
};
struct grpc_lb_policy_vtable {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 6599c75dba..d91dd116b8 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -108,7 +108,7 @@ struct grpc_subchannel {
/** pollset_set tracking who's interested in a connection
being setup */
- grpc_pollset_set pollset_set;
+ grpc_pollset_set *pollset_set;
/** active connection, or null; of type grpc_connected_subchannel */
gpr_atm connected_subchannel;
@@ -209,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_slice_unref(c->initial_connect_string);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
- grpc_pollset_set_destroy(&c->pollset_set);
+ grpc_pollset_set_destroy(c->pollset_set);
grpc_subchannel_key_destroy(exec_ctx, c->key);
gpr_free(c);
}
@@ -326,7 +326,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
}
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
- grpc_pollset_set_init(&c->pollset_set);
+ c->pollset_set = grpc_pollset_set_create();
c->addr_len = args->addr_len;
grpc_set_initial_connect_string(&c->addr, &c->addr_len,
&c->initial_connect_string);
@@ -345,7 +345,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connect_in_args args;
- args.interested_parties = &c->pollset_set;
+ args.interested_parties = c->pollset_set;
args.addr = c->addr;
args.addr_len = c->addr_len;
args.deadline = compute_connect_deadline(c);
@@ -379,7 +379,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
- grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set,
+ grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set,
w->pollset_set);
}
gpr_mu_lock(&w->subchannel->mu);
@@ -395,7 +395,6 @@ void grpc_subchannel_notify_on_state_change(
grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
grpc_closure *notify) {
- int do_connect = 0;
external_state_watcher *w;
if (state == NULL) {
@@ -415,7 +414,7 @@ void grpc_subchannel_notify_on_state_change(
w->notify = notify;
grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
if (interested_parties != NULL) {
- grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set,
+ grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set,
interested_parties);
}
GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
@@ -425,17 +424,13 @@ void grpc_subchannel_notify_on_state_change(
w->next->prev = w->prev->next = w;
if (grpc_connectivity_state_notify_on_state_change(
exec_ctx, &c->state_tracker, state, &w->closure)) {
- do_connect = 1;
c->connecting = 1;
/* released by connection */
GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
+ start_connect(exec_ctx, c);
}
gpr_mu_unlock(&c->mu);
}
-
- if (do_connect) {
- start_connect(exec_ctx, c);
- }
}
void grpc_connected_subchannel_process_transport_op(
@@ -573,7 +568,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
grpc_connected_subchannel_notify_on_state_change(
- exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state,
+ exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state,
&sw_subchannel->closure);
/* signal completion */
@@ -635,11 +630,12 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
if (c->disconnected) {
iomgr_success = 0;
}
- gpr_mu_unlock(&c->mu);
if (iomgr_success) {
update_reconnect_parameters(c);
continue_connect(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
} else {
+ gpr_mu_unlock(&c->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting");
}
}
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 71237bb614..1219c444c7 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -31,20 +31,22 @@
*
*/
-#include "src/core/iomgr/sockaddr.h"
#include "src/core/httpcli/httpcli.h"
+#include "src/core/iomgr/sockaddr.h"
#include <string.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/httpcli/format_request.h"
+#include "src/core/httpcli/parser.h"
#include "src/core/iomgr/endpoint.h"
+#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
-#include "src/core/httpcli/format_request.h"
-#include "src/core/httpcli/parser.h"
#include "src/core/support/string.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
typedef struct {
gpr_slice request_text;
@@ -84,18 +86,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http",
plaintext_handshake};
void grpc_httpcli_context_init(grpc_httpcli_context *context) {
- grpc_pollset_set_init(&context->pollset_set);
+ context->pollset_set = grpc_pollset_set_create();
}
void grpc_httpcli_context_destroy(grpc_httpcli_context *context) {
- grpc_pollset_set_destroy(&context->pollset_set);
+ grpc_pollset_set_destroy(context->pollset_set);
}
static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req);
static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
int success) {
- grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set,
+ grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set,
req->pollset);
req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL);
grpc_httpcli_parser_destroy(&req->parser);
@@ -197,7 +199,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) {
addr = &req->addresses->addrs[req->next_address++];
grpc_closure_init(&req->connected, on_connected, req);
grpc_tcp_client_connect(
- exec_ctx, &req->connected, &req->ep, &req->context->pollset_set,
+ exec_ctx, &req->connected, &req->ep, req->context->pollset_set,
(struct sockaddr *)&addr->addr, addr->len, req->deadline);
}
@@ -237,7 +239,7 @@ static void internal_request_begin(
req->host = gpr_strdup(request->host);
req->ssl_host_override = gpr_strdup(request->ssl_host_override);
- grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set,
+ grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set,
req->pollset);
grpc_resolve_address(request->host, req->handshaker->default_port,
on_resolved, req);
diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h
index 30875d71f1..c9cd987c79 100644
--- a/src/core/httpcli/httpcli.h
+++ b/src/core/httpcli/httpcli.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -39,6 +39,7 @@
#include <grpc/support/time.h>
#include "src/core/iomgr/endpoint.h"
+#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset_set.h"
/* User agent this library reports */
@@ -56,7 +57,7 @@ typedef struct grpc_httpcli_header {
TODO(ctiller): allow caching and capturing multiple requests for the
same content and combining them */
typedef struct grpc_httpcli_context {
- grpc_pollset_set pollset_set;
+ grpc_pollset_set *pollset_set;
} grpc_httpcli_context;
typedef struct {
diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/httpcli/httpcli_security_connector.c
index 41ad1de6c0..156961a377 100644
--- a/src/core/httpcli/httpcli_security_connector.c
+++ b/src/core/httpcli/httpcli_security_connector.c
@@ -59,7 +59,7 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) {
}
static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *sc,
+ grpc_channel_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
@@ -78,8 +78,8 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
tsi_result_to_string(result));
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
} else {
- grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb,
- user_data);
+ grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
+ nonsecure_endpoint, cb, user_data);
}
}
@@ -103,7 +103,7 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
}
static grpc_security_connector_vtable httpcli_ssl_vtable = {
- httpcli_ssl_destroy, httpcli_ssl_do_handshake, httpcli_ssl_check_peer};
+ httpcli_ssl_destroy, httpcli_ssl_check_peer};
static grpc_security_status httpcli_ssl_channel_security_connector_create(
const unsigned char *pem_root_certs, size_t pem_root_certs_size,
@@ -121,7 +121,6 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
memset(c, 0, sizeof(grpc_httpcli_ssl_channel_security_connector));
gpr_ref_init(&c->base.base.refcount, 1);
- c->base.base.is_client_side = 1;
c->base.base.vtable = &httpcli_ssl_vtable;
if (secure_peer_name != NULL) {
c->secure_peer_name = gpr_strdup(secure_peer_name);
@@ -136,6 +135,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
*sc = NULL;
return GRPC_SECURITY_ERROR;
}
+ c->base.do_handshake = httpcli_ssl_do_handshake;
*sc = &c->base;
return GRPC_SECURITY_OK;
}
@@ -180,8 +180,8 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(httpcli_ssl_channel_security_connector_create(
pem_root_certs, pem_root_certs_size, host, &sc) ==
GRPC_SECURITY_OK);
- grpc_security_connector_do_handshake(exec_ctx, &sc->base, tcp,
- on_secure_transport_setup_done, c);
+ grpc_channel_security_connector_do_handshake(
+ exec_ctx, sc, tcp, on_secure_transport_setup_done, c);
GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli");
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 85eadd754b..4ba7c5df94 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -46,6 +46,8 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/pollset_posix.h"
+
#define CLOSURE_NOT_READY ((grpc_closure *)0)
#define CLOSURE_READY ((grpc_closure *)1)
@@ -175,11 +177,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
}
static void pollset_kick_locked(grpc_fd_watcher *watcher) {
- gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
+ gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
- gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
+ gpr_mu_unlock(&watcher->pollset->mu);
}
static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 04580150f3..9c89c2c08a 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -41,9 +41,11 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include <grpc/support/useful.h>
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/timer.h"
+#include "src/core/support/env.h"
#include "src/core/support/string.h"
static gpr_mu g_mu;
@@ -116,6 +118,9 @@ void grpc_iomgr_shutdown(void) {
"memory leaks are likely",
count_objects());
dump_objects("LEAKED");
+ if (grpc_iomgr_abort_on_leaks()) {
+ abort();
+ }
}
break;
}
@@ -154,3 +159,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_mu_unlock(&g_mu);
gpr_free(obj->name);
}
+
+bool grpc_iomgr_abort_on_leaks(void) {
+ char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS");
+ if (env == NULL) return false;
+ static const char *truthy[] = {"yes", "Yes", "YES", "true",
+ "True", "TRUE", "1"};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
+ if (0 == strcmp(env, truthy[i])) return true;
+ }
+ return false;
+}
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h
index e372c18e8a..ac2c46ebe6 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_internal.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
+#include <stdbool.h>
+
#include "src/core/iomgr/iomgr.h"
#include <grpc/support/sync.h>
@@ -55,4 +57,6 @@ void grpc_iomgr_platform_flush(void);
/** tear down all platform specific global iomgr structures */
void grpc_iomgr_platform_shutdown(void);
+bool grpc_iomgr_abort_on_leaks(void);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index c6b0214dea..92a0374ddd 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,8 +35,11 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H
#include <grpc/support/port_platform.h>
+#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/iomgr/exec_ctx.h"
+
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
/* A grpc_pollset is a set of file descriptors that a higher level item is
@@ -46,15 +49,11 @@
- a completion queue might keep a pollset with an entry for each transport
that is servicing a call that it's tracking */
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_posix.h"
-#endif
-
-#ifdef GPR_WIN32
-#include "src/core/iomgr/pollset_windows.h"
-#endif
+typedef struct grpc_pollset grpc_pollset;
+typedef struct grpc_pollset_worker grpc_pollset_worker;
-void grpc_pollset_init(grpc_pollset *pollset);
+size_t grpc_pollset_size(void);
+void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu);
/* Begin shutting down the pollset, and call closure when done.
* GRPC_POLLSET_MU(pollset) must be held */
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -83,7 +82,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
pollset
lock */
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker *worker, gpr_timespec now,
+ grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
/* Break one polling thread out of polling work for this pollset.
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 4acae2bb71..2e0f27fab8 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -45,6 +45,7 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
#include "src/core/profiling/timers.h"
#include "src/core/support/block_annotate.h"
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 809f8f39da..4dddfff230 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -42,13 +42,15 @@
#include <stdlib.h>
#include <string.h>
-#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/support/block_annotate.h"
+
typedef struct {
/* all polled fds */
size_t fd_count;
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 1063727248..e895a77884 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -42,16 +42,16 @@
#include <string.h>
#include <unistd.h>
-#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/iomgr_internal.h"
-#include "src/core/iomgr/socket_utils_posix.h"
-#include "src/core/profiling/timers.h"
-#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/socket_utils_posix.h"
+#include "src/core/profiling/timers.h"
+#include "src/core/support/block_annotate.h"
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
@@ -97,6 +97,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
+size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); }
+
void grpc_pollset_kick_ext(grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags) {
@@ -186,8 +188,9 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
-void grpc_pollset_init(grpc_pollset *pollset) {
+void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu);
+ *mu = &pollset->mu;
pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
@@ -204,7 +207,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
GPR_ASSERT(!grpc_pollset_has_workers(pollset));
GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
pollset->vtable->destroy(pollset);
- gpr_mu_destroy(&pollset->mu);
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
@@ -246,8 +248,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker *worker, gpr_timespec now,
+ grpc_pollset_worker **worker_hdl, gpr_timespec now,
gpr_timespec deadline) {
+ grpc_pollset_worker worker;
+ *worker_hdl = &worker;
+
/* pollset->mu already held */
int added_worker = 0;
int locked = 1;
@@ -255,16 +260,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int keep_polling = 0;
GPR_TIMER_BEGIN("grpc_pollset_work", 0);
/* this must happen before we (potentially) drop pollset->mu */
- worker->next = worker->prev = NULL;
- worker->reevaluate_polling_on_wakeup = 0;
+ worker.next = worker.prev = NULL;
+ worker.reevaluate_polling_on_wakeup = 0;
if (pollset->local_wakeup_cache != NULL) {
- worker->wakeup_fd = pollset->local_wakeup_cache;
- pollset->local_wakeup_cache = worker->wakeup_fd->next;
+ worker.wakeup_fd = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd->next;
} else {
- worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd));
- grpc_wakeup_fd_init(&worker->wakeup_fd->fd);
+ worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
+ grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
}
- worker->kicked_specifically = 0;
+ worker.kicked_specifically = 0;
/* If there's work waiting for the pollset to be idle, and the
pollset is idle, then do that work */
if (!grpc_pollset_has_workers(pollset) &&
@@ -293,13 +298,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
keep_polling = 0;
if (!pollset->kicked_without_pollers) {
if (!added_worker) {
- push_front_worker(pollset, worker);
+ push_front_worker(pollset, &worker);
added_worker = 1;
- gpr_tls_set(&g_current_thread_worker, (intptr_t)worker);
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
}
gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
- pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker,
+ pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker,
deadline, now);
GPR_TIMER_END("maybe_work_and_unlock", 0);
locked = 0;
@@ -321,10 +326,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* If we're forced to re-evaluate polling (via grpc_pollset_kick with
GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
a loop */
- if (worker->reevaluate_polling_on_wakeup) {
- worker->reevaluate_polling_on_wakeup = 0;
+ if (worker.reevaluate_polling_on_wakeup) {
+ worker.reevaluate_polling_on_wakeup = 0;
pollset->kicked_without_pollers = 0;
- if (queued_work || worker->kicked_specifically) {
+ if (queued_work || worker.kicked_specifically) {
/* If there's queued work on the list, then set the deadline to be
immediate so we get back out of the polling loop quickly */
deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
@@ -333,12 +338,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
if (added_worker) {
- remove_worker(pollset, worker);
+ remove_worker(pollset, &worker);
gpr_tls_set(&g_current_thread_worker, 0);
}
/* release wakeup fd to the local pool */
- worker->wakeup_fd->next = pollset->local_wakeup_cache;
- pollset->local_wakeup_cache = worker->wakeup_fd;
+ worker.wakeup_fd->next = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd;
/* check shutdown conditions */
if (pollset->shutting_down) {
if (grpc_pollset_has_workers(pollset)) {
@@ -360,6 +365,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu);
}
}
+ *worker_hdl = NULL;
GPR_TIMER_END("grpc_pollset_work", 0);
}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 5868b3fa21..bbedb66b00 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -37,8 +37,10 @@
#include <poll.h>
#include <grpc/support/sync.h>
+
#include "src/core/iomgr/exec_ctx.h"
#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@@ -53,15 +55,15 @@ typedef struct grpc_cached_wakeup_fd {
struct grpc_cached_wakeup_fd *next;
} grpc_cached_wakeup_fd;
-typedef struct grpc_pollset_worker {
+struct grpc_pollset_worker {
grpc_cached_wakeup_fd *wakeup_fd;
int reevaluate_polling_on_wakeup;
int kicked_specifically;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
-} grpc_pollset_worker;
+};
-typedef struct grpc_pollset {
+struct grpc_pollset {
/* pollsets under posix can mutate representation as fds are added and
removed.
For example, we may choose a poll() based implementation on linux for
@@ -81,7 +83,7 @@ typedef struct grpc_pollset {
} data;
/* Local cache of eventfds for workers */
grpc_cached_wakeup_fd *local_wakeup_cache;
-} grpc_pollset;
+};
struct grpc_pollset_vtable {
void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -93,8 +95,6 @@ struct grpc_pollset_vtable {
void (*destroy)(grpc_pollset *pollset);
};
-#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
-
/* Add an fd to a pollset */
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
struct grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 09c04438f7..dddcd8313f 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,15 +41,9 @@
fd's (etc) that have been registered with the set_set to that pollset.
Registering fd's automatically adds them to all current pollsets. */
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/iomgr/pollset_set_posix.h"
-#endif
+typedef struct grpc_pollset_set grpc_pollset_set;
-#ifdef GPR_WIN32
-#include "src/core/iomgr/pollset_set_windows.h"
-#endif
-
-void grpc_pollset_set_init(grpc_pollset_set *pollset_set);
+grpc_pollset_set *grpc_pollset_set_create(void);
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set);
void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set,
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index 4ec92202e3..9dc9aff4a8 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,11 +41,30 @@
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
-#include "src/core/iomgr/pollset_set.h"
+#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/pollset_set_posix.h"
-void grpc_pollset_set_init(grpc_pollset_set *pollset_set) {
+struct grpc_pollset_set {
+ gpr_mu mu;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+};
+
+grpc_pollset_set *grpc_pollset_set_create(void) {
+ grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
memset(pollset_set, 0, sizeof(*pollset_set));
gpr_mu_init(&pollset_set->mu);
+ return pollset_set;
}
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
@@ -57,6 +76,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
gpr_free(pollset_set->pollsets);
gpr_free(pollset_set->pollset_sets);
gpr_free(pollset_set->fds);
+ gpr_free(pollset_set);
}
void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h
index 4820a61e4b..7d1aaf4181 100644
--- a/src/core/iomgr/pollset_set_posix.h
+++ b/src/core/iomgr/pollset_set_posix.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,23 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H
#include "src/core/iomgr/fd_posix.h"
-#include "src/core/iomgr/pollset_posix.h"
-
-typedef struct grpc_pollset_set {
- gpr_mu mu;
-
- size_t pollset_count;
- size_t pollset_capacity;
- grpc_pollset **pollsets;
-
- size_t pollset_set_count;
- size_t pollset_set_capacity;
- struct grpc_pollset_set **pollset_sets;
-
- size_t fd_count;
- size_t fd_capacity;
- grpc_fd **fds;
-} grpc_pollset_set;
+#include "src/core/iomgr/pollset_set.h"
void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c
index 157b46ec32..3b8eca28e6 100644
--- a/src/core/iomgr/pollset_set_windows.c
+++ b/src/core/iomgr/pollset_set_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,9 +35,9 @@
#ifdef GPR_WINSOCK_SOCKET
-#include "src/core/iomgr/pollset_set.h"
+#include "src/core/iomgr/pollset_set_windows.h"
-void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {}
+grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; }
void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h
index cada0d2b61..9661cd2c39 100644
--- a/src/core/iomgr/pollset_set_windows.h
+++ b/src/core/iomgr/pollset_set_windows.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,6 +34,6 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H
-typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set;
+#include "src/core/iomgr/pollset_set.h"
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 35a956b27f..c7f30f435f 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -89,12 +89,15 @@ static void push_front_worker(grpc_pollset_worker *root,
worker->links[type].next->links[type].prev = worker;
}
+size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); }
+
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
won't actually do any polling, and return as quickly as possible. */
-void grpc_pollset_init(grpc_pollset *pollset) {
+void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ *mu = &grpc_polling_mu;
memset(pollset, 0, sizeof(*pollset));
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
@@ -125,22 +128,25 @@ void grpc_pollset_reset(grpc_pollset *pollset) {
}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker *worker, gpr_timespec now,
+ grpc_pollset_worker **worker_hdl, gpr_timespec now,
gpr_timespec deadline) {
+ grpc_pollset_worker worker;
+ *worker_hdl = &worker;
+
int added_worker = 0;
- worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
- worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
- worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
- worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
- worker->kicked = 0;
- worker->pollset = pollset;
- gpr_cv_init(&worker->cv);
+ worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
+ worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
+ worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
+ worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL;
+ worker.kicked = 0;
+ worker.pollset = pollset;
+ gpr_cv_init(&worker.cv);
if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
if (g_active_poller == NULL) {
grpc_pollset_worker *next_worker;
/* become poller */
pollset->is_iocp_worker = 1;
- g_active_poller = worker;
+ g_active_poller = &worker;
gpr_mu_unlock(&grpc_polling_mu);
grpc_iocp_work(exec_ctx, deadline);
grpc_exec_ctx_flush(exec_ctx);
@@ -167,12 +173,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
goto done;
}
push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL,
- worker);
+ &worker);
push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET,
- worker);
+ &worker);
added_worker = 1;
- while (!worker->kicked) {
- if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) {
+ while (!worker.kicked) {
+ if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, deadline)) {
break;
}
}
@@ -186,10 +192,11 @@ done:
gpr_mu_lock(&grpc_polling_mu);
}
if (added_worker) {
- remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
- remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
+ remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL);
+ remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
}
- gpr_cv_destroy(&worker->cv);
+ gpr_cv_destroy(&worker.cv);
+ *worker_hdl = NULL;
}
void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index 65ba80619b..dc0b7a4104 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -72,8 +72,4 @@ struct grpc_pollset {
grpc_closure *on_shutdown;
};
-extern gpr_mu grpc_polling_mu;
-
-#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu)
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index c76c2e3b0f..15727856ab 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -42,17 +42,19 @@
#include <string.h>
#include <unistd.h>
-#include "src/core/iomgr/timer.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+
#include "src/core/iomgr/iomgr_posix.h"
#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/pollset_set_posix.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
+#include "src/core/iomgr/timer.h"
#include "src/core/support/string.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/time.h>
extern int grpc_tcp_trace;
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 048e907441..f74eb3fe51 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -40,8 +40,8 @@
#include <errno.h>
#include <stdlib.h>
#include <string.h>
-#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/types.h>
#include <unistd.h>
#include <grpc/support/alloc.h>
@@ -51,9 +51,11 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-#include "src/core/support/string.h"
#include "src/core/debug/trace.h"
+#include "src/core/iomgr/pollset_posix.h"
+#include "src/core/iomgr/pollset_set_posix.h"
#include "src/core/profiling/timers.h"
+#include "src/core/support/string.h"
#ifdef GPR_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS MSG_NOSIGNAL
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index 73a21c80ab..a9d0489edf 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H
#include "src/core/iomgr/endpoint.h"
+#include "src/core/iomgr/fd_posix.h"
/* Forward decl of grpc_server */
typedef struct grpc_server grpc_server;
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index da11df67ef..c096dbfb30 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -44,6 +44,7 @@
#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success);
diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h
index 589034fe1b..68f195ee0d 100644
--- a/src/core/iomgr/workqueue_posix.h
+++ b/src/core/iomgr/workqueue_posix.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,6 +34,8 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
+#include "src/core/iomgr/wakeup_fd_posix.h"
+
struct grpc_fd;
struct grpc_workqueue {
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index 57b367d00f..332d4259d2 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -310,7 +310,6 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(auth_context != NULL);
/* initialize members */
- GPR_ASSERT(sc->is_client_side);
chand->security_connector =
(grpc_channel_security_connector *)GRPC_SECURITY_CONNECTOR_REF(
sc, "client_auth_filter");
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index c58574bd6d..b4fa616fa7 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -166,7 +166,7 @@ void grpc_server_credentials_release(grpc_server_credentials *creds) {
}
grpc_security_status grpc_server_credentials_create_security_connector(
- grpc_server_credentials *creds, grpc_security_connector **sc) {
+ grpc_server_credentials *creds, grpc_server_security_connector **sc) {
if (creds == NULL || creds->vtable->create_security_connector == NULL) {
gpr_log(GPR_ERROR, "Server credentials cannot create security context.");
return GRPC_SECURITY_ERROR;
@@ -298,7 +298,7 @@ static grpc_security_status ssl_create_security_connector(
}
static grpc_security_status ssl_server_create_security_connector(
- grpc_server_credentials *creds, grpc_security_connector **sc) {
+ grpc_server_credentials *creds, grpc_server_security_connector **sc) {
grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds;
return grpc_ssl_server_security_connector_create(&c->config, sc);
}
@@ -894,7 +894,7 @@ static grpc_security_status fake_transport_security_create_security_connector(
static grpc_security_status
fake_transport_security_server_create_security_connector(
- grpc_server_credentials *c, grpc_security_connector **sc) {
+ grpc_server_credentials *c, grpc_server_security_connector **sc) {
*sc = grpc_fake_server_security_connector_create();
return GRPC_SECURITY_OK;
}
diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h
index 3cd652cd57..0de4cd9468 100644
--- a/src/core/security/credentials.h
+++ b/src/core/security/credentials.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -234,7 +234,7 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token(
typedef struct {
void (*destruct)(grpc_server_credentials *c);
grpc_security_status (*create_security_connector)(
- grpc_server_credentials *c, grpc_security_connector **sc);
+ grpc_server_credentials *c, grpc_server_security_connector **sc);
} grpc_server_credentials_vtable;
struct grpc_server_credentials {
@@ -245,7 +245,7 @@ struct grpc_server_credentials {
};
grpc_security_status grpc_server_credentials_create_security_connector(
- grpc_server_credentials *creds, grpc_security_connector **sc);
+ grpc_server_credentials *creds, grpc_server_security_connector **sc);
grpc_server_credentials *grpc_server_credentials_ref(
grpc_server_credentials *creds);
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index bd4b0e462a..1f4f3e4aa5 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -52,13 +52,14 @@
static grpc_channel_credentials *default_credentials = NULL;
static int compute_engine_detection_done = 0;
-static gpr_mu g_mu;
+static gpr_mu g_state_mu;
+static gpr_mu *g_polling_mu;
static gpr_once g_once = GPR_ONCE_INIT;
-static void init_default_credentials(void) { gpr_mu_init(&g_mu); }
+static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); }
typedef struct {
- grpc_pollset pollset;
+ grpc_pollset *pollset;
int is_done;
int success;
} compute_engine_detector;
@@ -80,10 +81,10 @@ static void on_compute_engine_detection_http_response(
}
}
}
- gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset));
+ gpr_mu_lock(g_polling_mu);
detector->is_done = 1;
- grpc_pollset_kick(&detector->pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
+ grpc_pollset_kick(detector->pollset, NULL);
+ gpr_mu_unlock(g_polling_mu);
}
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) {
@@ -101,7 +102,8 @@ static int is_stack_running_on_compute_engine(void) {
on compute engine. */
gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN);
- grpc_pollset_init(&detector.pollset);
+ detector.pollset = gpr_malloc(grpc_pollset_size());
+ grpc_pollset_init(detector.pollset, &g_polling_mu);
detector.is_done = 0;
detector.success = 0;
@@ -112,7 +114,7 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_context_init(&context);
grpc_httpcli_get(
- &exec_ctx, &context, &detector.pollset, &request,
+ &exec_ctx, &context, detector.pollset, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
on_compute_engine_detection_http_response, &detector);
@@ -120,19 +122,22 @@ static int is_stack_running_on_compute_engine(void) {
/* Block until we get the response. This is not ideal but this should only be
called once for the lifetime of the process by the default credentials. */
- gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
+ gpr_mu_lock(g_polling_mu);
while (!detector.is_done) {
- grpc_pollset_worker worker;
- grpc_pollset_work(&exec_ctx, &detector.pollset, &worker,
+ grpc_pollset_worker *worker = NULL;
+ grpc_pollset_work(&exec_ctx, detector.pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
+ gpr_mu_unlock(g_polling_mu);
grpc_httpcli_context_destroy(&context);
- grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset);
- grpc_pollset_shutdown(&exec_ctx, &detector.pollset, &destroy_closure);
+ grpc_closure_init(&destroy_closure, destroy_pollset, detector.pollset);
+ grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure);
grpc_exec_ctx_finish(&exec_ctx);
+ g_polling_mu = NULL;
+
+ gpr_free(detector.pollset);
return detector.success;
}
@@ -184,7 +189,7 @@ grpc_channel_credentials *grpc_google_default_credentials_create(void) {
gpr_once_init(&g_once, init_default_credentials);
- gpr_mu_lock(&g_mu);
+ gpr_mu_lock(&g_state_mu);
if (default_credentials != NULL) {
result = grpc_channel_credentials_ref(default_credentials);
@@ -230,19 +235,19 @@ end:
gpr_log(GPR_ERROR, "Could not create google default credentials.");
}
}
- gpr_mu_unlock(&g_mu);
+ gpr_mu_unlock(&g_state_mu);
return result;
}
void grpc_flush_cached_google_default_credentials(void) {
gpr_once_init(&g_once, init_default_credentials);
- gpr_mu_lock(&g_mu);
+ gpr_mu_lock(&g_state_mu);
if (default_credentials != NULL) {
grpc_channel_credentials_unref(default_credentials);
default_credentials = NULL;
}
compute_engine_detection_done = 0;
- gpr_mu_unlock(&g_mu);
+ gpr_mu_unlock(&g_state_mu);
}
/* -- Well known credentials path. -- */
diff --git a/src/core/security/handshake.c b/src/core/security/handshake.c
index a8b2fef629..b5bb6667a7 100644
--- a/src/core/security/handshake.c
+++ b/src/core/security/handshake.c
@@ -33,6 +33,7 @@
#include "src/core/security/handshake.h"
+#include <stdbool.h>
#include <string.h>
#include "src/core/security/security_context.h"
@@ -46,6 +47,7 @@
typedef struct {
grpc_security_connector *connector;
tsi_handshaker *handshaker;
+ bool is_client_side;
unsigned char *handshake_buffer;
size_t handshake_buffer_size;
grpc_endpoint *wrapped_endpoint;
@@ -67,9 +69,11 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup,
bool success);
static void security_connector_remove_handshake(grpc_security_handshake *h) {
+ GPR_ASSERT(!h->is_client_side);
grpc_security_connector_handshake_list *node;
grpc_security_connector_handshake_list *tmp;
- grpc_security_connector *sc = h->connector;
+ grpc_server_security_connector *sc =
+ (grpc_server_security_connector *)h->connector;
gpr_mu_lock(&sc->mu);
node = sc->handshaking_handshakes;
if (node && node->handshake == h) {
@@ -94,7 +98,7 @@ static void security_connector_remove_handshake(grpc_security_handshake *h) {
static void security_handshake_done(grpc_exec_ctx *exec_ctx,
grpc_security_handshake *h,
int is_success) {
- if (!h->connector->is_client_side) {
+ if (!h->is_client_side) {
security_connector_remove_handshake(h);
}
if (is_success) {
@@ -290,6 +294,7 @@ static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx,
void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx,
tsi_handshaker *handshaker,
grpc_security_connector *connector,
+ bool is_client_side,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
@@ -298,6 +303,7 @@ void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx,
memset(h, 0, sizeof(grpc_security_handshake));
h->handshaker = handshaker;
h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake");
+ h->is_client_side = is_client_side;
h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE;
h->handshake_buffer = gpr_malloc(h->handshake_buffer_size);
h->wrapped_endpoint = nonsecure_endpoint;
@@ -310,13 +316,15 @@ void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx,
gpr_slice_buffer_init(&h->left_overs);
gpr_slice_buffer_init(&h->outgoing);
gpr_slice_buffer_init(&h->incoming);
- if (!connector->is_client_side) {
+ if (!is_client_side) {
+ grpc_server_security_connector *server_connector =
+ (grpc_server_security_connector *)connector;
handshake_node = gpr_malloc(sizeof(grpc_security_connector_handshake_list));
handshake_node->handshake = h;
- gpr_mu_lock(&connector->mu);
- handshake_node->next = connector->handshaking_handshakes;
- connector->handshaking_handshakes = handshake_node;
- gpr_mu_unlock(&connector->mu);
+ gpr_mu_lock(&server_connector->mu);
+ handshake_node->next = server_connector->handshaking_handshakes;
+ server_connector->handshaking_handshakes = handshake_node;
+ gpr_mu_unlock(&server_connector->mu);
}
send_handshake_bytes_to_peer(exec_ctx, h);
}
diff --git a/src/core/security/handshake.h b/src/core/security/handshake.h
index 44215d16ef..db8b374921 100644
--- a/src/core/security/handshake.h
+++ b/src/core/security/handshake.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,7 @@
void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx,
tsi_handshaker *handshaker,
grpc_security_connector *connector,
+ bool is_client_side,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data);
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index 51a99b4492..33c62a20c2 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -33,6 +33,7 @@
#include "src/core/security/security_connector.h"
+#include <stdbool.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -110,31 +111,39 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
return NULL;
}
-void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *connector) {
+void grpc_server_security_connector_shutdown(
+ grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector) {
grpc_security_connector_handshake_list *tmp;
- if (!connector->is_client_side) {
- gpr_mu_lock(&connector->mu);
- while (connector->handshaking_handshakes) {
- tmp = connector->handshaking_handshakes;
- grpc_security_handshake_shutdown(
- exec_ctx, connector->handshaking_handshakes->handshake);
- connector->handshaking_handshakes = tmp->next;
- gpr_free(tmp);
- }
- gpr_mu_unlock(&connector->mu);
+ gpr_mu_lock(&connector->mu);
+ while (connector->handshaking_handshakes) {
+ tmp = connector->handshaking_handshakes;
+ grpc_security_handshake_shutdown(
+ exec_ctx, connector->handshaking_handshakes->handshake);
+ connector->handshaking_handshakes = tmp->next;
+ gpr_free(tmp);
+ }
+ gpr_mu_unlock(&connector->mu);
+}
+
+void grpc_channel_security_connector_do_handshake(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
+ grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb,
+ void *user_data) {
+ if (sc == NULL || nonsecure_endpoint == NULL) {
+ cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
+ } else {
+ sc->do_handshake(exec_ctx, sc, nonsecure_endpoint, cb, user_data);
}
}
-void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
+void grpc_server_security_connector_do_handshake(
+ grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
+ grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
+ grpc_security_handshake_done_cb cb, void *user_data) {
if (sc == NULL || nonsecure_endpoint == NULL) {
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
} else {
- sc->vtable->do_handshake(exec_ctx, sc, nonsecure_endpoint, cb, user_data);
+ sc->do_handshake(exec_ctx, sc, acceptor, nonsecure_endpoint, cb, user_data);
}
}
@@ -248,7 +257,8 @@ static void fake_channel_destroy(grpc_security_connector *sc) {
}
static void fake_server_destroy(grpc_security_connector *sc) {
- gpr_mu_destroy(&sc->mu);
+ grpc_server_security_connector *c = (grpc_server_security_connector *)sc;
+ gpr_mu_destroy(&c->mu);
gpr_free(sc);
}
@@ -298,49 +308,52 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
}
static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *sc,
+ grpc_channel_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
- grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(1), sc,
- nonsecure_endpoint, cb, user_data);
+ grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(1), &sc->base,
+ true, nonsecure_endpoint, cb, user_data);
}
static void fake_server_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *sc,
+ grpc_server_security_connector *sc,
+ grpc_tcp_server_acceptor *acceptor,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
- grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), sc,
- nonsecure_endpoint, cb, user_data);
+ grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), &sc->base,
+ false, nonsecure_endpoint, cb, user_data);
}
static grpc_security_connector_vtable fake_channel_vtable = {
- fake_channel_destroy, fake_channel_do_handshake, fake_check_peer};
+ fake_channel_destroy, fake_check_peer};
-static grpc_security_connector_vtable fake_server_vtable = {
- fake_server_destroy, fake_server_do_handshake, fake_check_peer};
+static grpc_security_connector_vtable fake_server_vtable = {fake_server_destroy,
+ fake_check_peer};
grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
grpc_call_credentials *request_metadata_creds) {
grpc_channel_security_connector *c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c));
gpr_ref_init(&c->base.refcount, 1);
- c->base.is_client_side = 1;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
c->base.vtable = &fake_channel_vtable;
c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds);
c->check_call_host = fake_channel_check_call_host;
+ c->do_handshake = fake_channel_do_handshake;
return c;
}
-grpc_security_connector *grpc_fake_server_security_connector_create(void) {
- grpc_security_connector *c = gpr_malloc(sizeof(grpc_security_connector));
- memset(c, 0, sizeof(grpc_security_connector));
- gpr_ref_init(&c->refcount, 1);
- c->is_client_side = 0;
- c->vtable = &fake_server_vtable;
- c->url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
+grpc_server_security_connector *grpc_fake_server_security_connector_create(
+ void) {
+ grpc_server_security_connector *c =
+ gpr_malloc(sizeof(grpc_server_security_connector));
+ memset(c, 0, sizeof(*c));
+ gpr_ref_init(&c->base.refcount, 1);
+ c->base.vtable = &fake_server_vtable;
+ c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
+ c->do_handshake = fake_server_do_handshake;
gpr_mu_init(&c->mu);
return c;
}
@@ -355,7 +368,7 @@ typedef struct {
} grpc_ssl_channel_security_connector;
typedef struct {
- grpc_security_connector base;
+ grpc_server_security_connector base;
tsi_ssl_handshaker_factory *handshaker_factory;
} grpc_ssl_server_security_connector;
@@ -378,12 +391,12 @@ static void ssl_server_destroy(grpc_security_connector *sc) {
if (c->handshaker_factory != NULL) {
tsi_ssl_handshaker_factory_destroy(c->handshaker_factory);
}
- gpr_mu_destroy(&sc->mu);
+ gpr_mu_destroy(&c->base.mu);
gpr_free(sc);
}
static grpc_security_status ssl_create_handshaker(
- tsi_ssl_handshaker_factory *handshaker_factory, int is_client,
+ tsi_ssl_handshaker_factory *handshaker_factory, bool is_client,
const char *peer_name, tsi_handshaker **handshaker) {
tsi_result result = TSI_OK;
if (handshaker_factory == NULL) return GRPC_SECURITY_ERROR;
@@ -398,7 +411,7 @@ static grpc_security_status ssl_create_handshaker(
}
static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *sc,
+ grpc_channel_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
@@ -406,20 +419,21 @@ static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx,
(grpc_ssl_channel_security_connector *)sc;
tsi_handshaker *handshaker;
grpc_security_status status = ssl_create_handshaker(
- c->handshaker_factory, 1,
+ c->handshaker_factory, true,
c->overridden_target_name != NULL ? c->overridden_target_name
: c->target_name,
&handshaker);
if (status != GRPC_SECURITY_OK) {
cb(exec_ctx, user_data, status, NULL, NULL);
} else {
- grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb,
- user_data);
+ grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
+ nonsecure_endpoint, cb, user_data);
}
}
static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *sc,
+ grpc_server_security_connector *sc,
+ grpc_tcp_server_acceptor *acceptor,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
@@ -427,12 +441,12 @@ static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx,
(grpc_ssl_server_security_connector *)sc;
tsi_handshaker *handshaker;
grpc_security_status status =
- ssl_create_handshaker(c->handshaker_factory, 0, NULL, &handshaker);
+ ssl_create_handshaker(c->handshaker_factory, false, NULL, &handshaker);
if (status != GRPC_SECURITY_OK) {
cb(exec_ctx, user_data, status, NULL, NULL);
} else {
- grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb,
- user_data);
+ grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, false,
+ nonsecure_endpoint, cb, user_data);
}
}
@@ -603,10 +617,10 @@ static void ssl_channel_check_call_host(grpc_exec_ctx *exec_ctx,
}
static grpc_security_connector_vtable ssl_channel_vtable = {
- ssl_channel_destroy, ssl_channel_do_handshake, ssl_channel_check_peer};
+ ssl_channel_destroy, ssl_channel_check_peer};
static grpc_security_connector_vtable ssl_server_vtable = {
- ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer};
+ ssl_server_destroy, ssl_server_check_peer};
static gpr_slice compute_default_pem_root_certs_once(void) {
gpr_slice result = gpr_empty_slice();
@@ -700,11 +714,11 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
gpr_ref_init(&c->base.base.refcount, 1);
c->base.base.vtable = &ssl_channel_vtable;
- c->base.base.is_client_side = 1;
c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = ssl_channel_check_call_host;
+ c->base.do_handshake = ssl_channel_do_handshake;
gpr_split_host_port(target_name, &c->target_name, &port);
gpr_free(port);
if (overridden_target_name != NULL) {
@@ -735,7 +749,7 @@ error:
}
grpc_security_status grpc_ssl_server_security_connector_create(
- const grpc_ssl_server_config *config, grpc_security_connector **sc) {
+ const grpc_ssl_server_config *config, grpc_server_security_connector **sc) {
size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
const unsigned char **alpn_protocol_strings =
gpr_malloc(sizeof(const char *) * num_alpn_protocols);
@@ -759,9 +773,9 @@ grpc_security_status grpc_ssl_server_security_connector_create(
c = gpr_malloc(sizeof(grpc_ssl_server_security_connector));
memset(c, 0, sizeof(grpc_ssl_server_security_connector));
- gpr_ref_init(&c->base.refcount, 1);
- c->base.url_scheme = GRPC_SSL_URL_SCHEME;
- c->base.vtable = &ssl_server_vtable;
+ gpr_ref_init(&c->base.base.refcount, 1);
+ c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
+ c->base.base.vtable = &ssl_server_vtable;
result = tsi_create_ssl_server_handshaker_factory(
(const unsigned char **)config->pem_private_keys,
config->pem_private_keys_sizes,
@@ -774,11 +788,12 @@ grpc_security_status grpc_ssl_server_security_connector_create(
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.",
tsi_result_to_string(result));
- ssl_server_destroy(&c->base);
+ ssl_server_destroy(&c->base.base);
*sc = NULL;
goto error;
}
gpr_mu_init(&c->base.mu);
+ c->base.do_handshake = ssl_server_do_handshake;
*sc = &c->base;
gpr_free((void *)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);
diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h
index 39df7821f0..1e35d3f9b7 100644
--- a/src/core/security/security_connector.h
+++ b/src/core/security/security_connector.h
@@ -36,6 +36,7 @@
#include <grpc/grpc_security.h>
#include "src/core/iomgr/endpoint.h"
+#include "src/core/iomgr/tcp_server.h"
#include "src/core/tsi/transport_security_interface.h"
/* --- status enum. --- */
@@ -68,9 +69,6 @@ typedef void (*grpc_security_handshake_done_cb)(
typedef struct {
void (*destroy)(grpc_security_connector *sc);
- void (*do_handshake)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint,
- grpc_security_handshake_done_cb cb, void *user_data);
void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc,
tsi_peer peer, grpc_security_peer_check_cb cb,
void *user_data);
@@ -84,13 +82,7 @@ typedef struct grpc_security_connector_handshake_list {
struct grpc_security_connector {
const grpc_security_connector_vtable *vtable;
gpr_refcount refcount;
- int is_client_side;
const char *url_scheme;
- /* Used on server side only. */
- /* TODO(yangg): Create a grpc_server_security_connector with these. */
- gpr_mu mu;
- grpc_security_connector_handshake_list *handshaking_handshakes;
- const grpc_channel_args *channel_args;
};
/* Refcounting. */
@@ -113,13 +105,6 @@ grpc_security_connector *grpc_security_connector_ref(
void grpc_security_connector_unref(grpc_security_connector *policy);
#endif
-/* Handshake. */
-void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *connector,
- grpc_endpoint *nonsecure_endpoint,
- grpc_security_handshake_done_cb cb,
- void *user_data);
-
/* Check the peer. Callee takes ownership of the peer object.
The callback will include the resulting auth_context. */
void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
@@ -128,9 +113,6 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_peer_check_cb cb,
void *user_data);
-void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_security_connector *connector);
-
/* Util to encapsulate the connector in a channel arg. */
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc);
@@ -153,12 +135,16 @@ typedef void (*grpc_security_call_host_check_cb)(grpc_exec_ctx *exec_ctx,
grpc_security_status status);
struct grpc_channel_security_connector {
- grpc_security_connector base; /* requires is_client_side to be non 0. */
+ grpc_security_connector base;
grpc_call_credentials *request_metadata_creds;
void (*check_call_host)(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc, const char *host,
grpc_auth_context *auth_context,
grpc_security_call_host_check_cb cb, void *user_data);
+ void (*do_handshake)(grpc_exec_ctx *exec_ctx,
+ grpc_channel_security_connector *sc,
+ grpc_endpoint *nonsecure_endpoint,
+ grpc_security_handshake_done_cb cb, void *user_data);
};
/* Checks that the host that will be set for a call is acceptable. */
@@ -167,6 +153,39 @@ void grpc_channel_security_connector_check_call_host(
const char *host, grpc_auth_context *auth_context,
grpc_security_call_host_check_cb cb, void *user_data);
+/* Handshake. */
+void grpc_channel_security_connector_do_handshake(
+ grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
+ grpc_endpoint *nonsecure_endpoint, grpc_security_handshake_done_cb cb,
+ void *user_data);
+
+/* --- server_security_connector object. ---
+
+ A server security connector object represents away to configure the
+ underlying transport security mechanism on the server side. */
+
+typedef struct grpc_server_security_connector grpc_server_security_connector;
+
+struct grpc_server_security_connector {
+ grpc_security_connector base;
+ gpr_mu mu;
+ grpc_security_connector_handshake_list *handshaking_handshakes;
+ const grpc_channel_args *channel_args;
+ void (*do_handshake)(grpc_exec_ctx *exec_ctx,
+ grpc_server_security_connector *sc,
+ grpc_tcp_server_acceptor *acceptor,
+ grpc_endpoint *nonsecure_endpoint,
+ grpc_security_handshake_done_cb cb, void *user_data);
+};
+
+void grpc_server_security_connector_do_handshake(
+ grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
+ grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
+ grpc_security_handshake_done_cb cb, void *user_data);
+
+void grpc_server_security_connector_shutdown(
+ grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector);
+
/* --- Creation security connectors. --- */
/* For TESTING ONLY!
@@ -176,7 +195,8 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
/* For TESTING ONLY!
Creates a fake connector that emulates real server security. */
-grpc_security_connector *grpc_fake_server_security_connector_create(void);
+grpc_server_security_connector *grpc_fake_server_security_connector_create(
+ void);
/* Config for ssl clients. */
typedef struct {
@@ -231,7 +251,7 @@ typedef struct {
specific error code otherwise.
*/
grpc_security_status grpc_ssl_server_security_connector_create(
- const grpc_ssl_server_config *config, grpc_security_connector **sc);
+ const grpc_ssl_server_config *config, grpc_server_security_connector **sc);
/* Util. */
const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 84a883390c..91547eb26e 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -55,7 +55,7 @@
typedef struct grpc_server_secure_state {
grpc_server *server;
grpc_tcp_server *tcp;
- grpc_security_connector *sc;
+ grpc_server_security_connector *sc;
grpc_server_credentials *creds;
int is_shutdown;
gpr_mu mu;
@@ -74,7 +74,7 @@ static void state_unref(grpc_server_secure_state *state) {
gpr_mu_lock(&state->mu);
gpr_mu_unlock(&state->mu);
/* clean up */
- GRPC_SECURITY_CONNECTOR_UNREF(state->sc, "server");
+ GRPC_SECURITY_CONNECTOR_UNREF(&state->sc->base, "server");
grpc_server_credentials_unref(state->creds);
gpr_free(state);
}
@@ -130,8 +130,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
grpc_tcp_server_acceptor *acceptor) {
grpc_server_secure_state *state = statep;
state_ref(state);
- grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp,
- on_secure_handshake_done, state);
+ grpc_server_security_connector_do_handshake(
+ exec_ctx, state->sc, acceptor, tcp, on_secure_handshake_done, state);
}
/* Server callback: start listening on our ports */
@@ -148,7 +148,7 @@ static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) {
state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
success);
}
- grpc_security_connector_shutdown(exec_ctx, state->sc);
+ grpc_server_security_connector_shutdown(exec_ctx, state->sc);
state_unref(state);
}
@@ -176,7 +176,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
int port_num = -1;
int port_temp;
grpc_security_status status = GRPC_SECURITY_ERROR;
- grpc_security_connector *sc = NULL;
+ grpc_server_security_connector *sc = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_API_TRACE(
@@ -256,7 +256,7 @@ error:
grpc_tcp_server_unref(&exec_ctx, tcp);
} else {
if (sc) {
- GRPC_SECURITY_CONNECTOR_UNREF(sc, "server");
+ GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server");
}
if (state) {
gpr_free(state);
diff --git a/src/core/support/sync.c b/src/core/support/sync.c
index d368422d9e..69e3e39c5c 100644
--- a/src/core/support/sync.c
+++ b/src/core/support/sync.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -98,6 +98,11 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); }
void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); }
+void gpr_ref_non_zero(gpr_refcount *r) {
+ gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1);
+ GPR_ASSERT(prior > 0);
+}
+
void gpr_refn(gpr_refcount *r, int n) {
gpr_atm_no_barrier_fetch_add(&r->count, n);
}
diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c
index fb496f6c47..8169ede065 100644
--- a/src/core/surface/alarm.c
+++ b/src/core/surface/alarm.c
@@ -64,8 +64,9 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->tag = tag;
grpc_cq_begin_op(cq, tag);
- grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm,
- gpr_now(GPR_CLOCK_MONOTONIC));
+ grpc_timer_init(&exec_ctx, &alarm->alarm,
+ gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
+ alarm_cb, alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_exec_ctx_finish(&exec_ctx);
return alarm;
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 0a80680f02..f6a95ebbd3 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,26 +36,29 @@
#include <stdio.h>
#include <string.h>
-#include "src/core/iomgr/timer.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/atm.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+
#include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/timer.h"
+#include "src/core/profiling/timers.h"
#include "src/core/support/string.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
-#include "src/core/profiling/timers.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/atm.h>
-#include <grpc/support/log.h>
-#include <grpc/support/time.h>
typedef struct {
- grpc_pollset_worker *worker;
+ grpc_pollset_worker **worker;
void *tag;
} plucker;
/* Completion queue structure */
struct grpc_completion_queue {
+ /** owned by pollset */
+ gpr_mu *mu;
/** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
@@ -63,8 +66,6 @@ struct grpc_completion_queue {
gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
- /** the set of low level i/o things that concern this cq */
- grpc_pollset pollset;
/** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
@@ -82,6 +83,8 @@ struct grpc_completion_queue {
grpc_completion_queue *next_free;
};
+#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
+
static gpr_mu g_freelist_mu;
grpc_completion_queue *g_freelist;
@@ -94,7 +97,7 @@ void grpc_cq_global_shutdown(void) {
gpr_mu_destroy(&g_freelist_mu);
while (g_freelist) {
grpc_completion_queue *next = g_freelist->next_free;
- grpc_pollset_destroy(&g_freelist->pollset);
+ grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist));
#ifndef NDEBUG
gpr_free(g_freelist->outstanding_tags);
#endif
@@ -124,8 +127,8 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
if (g_freelist == NULL) {
gpr_mu_unlock(&g_freelist_mu);
- cc = gpr_malloc(sizeof(grpc_completion_queue));
- grpc_pollset_init(&cc->pollset);
+ cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
+ grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
#ifndef NDEBUG
cc->outstanding_tags = NULL;
cc->outstanding_tag_capacity = 0;
@@ -184,7 +187,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
if (gpr_unref(&cc->owning_refs)) {
GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head);
- grpc_pollset_reset(&cc->pollset);
+ grpc_pollset_reset(POLLSET_FROM_CQ(cc));
gpr_mu_lock(&g_freelist_mu);
cc->next_free = g_freelist;
g_freelist = cc;
@@ -194,7 +197,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
#ifndef NDEBUG
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(cc->mu);
GPR_ASSERT(!cc->shutdown_called);
if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) {
cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity);
@@ -203,7 +206,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
cc->outstanding_tag_capacity);
}
cc->outstanding_tags[cc->outstanding_tag_count++] = tag;
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
#endif
gpr_ref(&cc->pending_events);
}
@@ -231,7 +234,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
storage->next =
((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0));
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(cc->mu);
#ifndef NDEBUG
for (i = 0; i < (int)cc->outstanding_tag_count; i++) {
if (cc->outstanding_tags[i] == tag) {
@@ -252,12 +255,12 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
pluck_worker = NULL;
for (i = 0; i < cc->num_pluckers; i++) {
if (cc->pluckers[i].tag == tag) {
- pluck_worker = cc->pluckers[i].worker;
+ pluck_worker = *cc->pluckers[i].worker;
break;
}
}
- grpc_pollset_kick(&cc->pollset, pluck_worker);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker);
+ gpr_mu_unlock(cc->mu);
} else {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@@ -265,8 +268,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
- grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
+ gpr_mu_unlock(cc->mu);
}
GPR_TIMER_END("grpc_cq_end_op", 0);
@@ -275,7 +279,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
- grpc_pollset_worker worker;
+ grpc_pollset_worker *worker = NULL;
int first_loop = 1;
gpr_timespec now;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -294,7 +298,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "next");
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(cc->mu);
for (;;) {
if (cc->completed_tail != &cc->completed_head) {
grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
@@ -302,7 +306,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
if (c == cc->completed_tail) {
cc->completed_tail = &cc->completed_head;
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -310,14 +314,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break;
}
if (cc->shutdown) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
}
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
@@ -330,11 +334,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec iteration_deadline = deadline;
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(cc->mu);
+ continue;
} else {
- grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now,
+ grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
iteration_deadline);
}
}
@@ -348,7 +353,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
}
static int add_plucker(grpc_completion_queue *cc, void *tag,
- grpc_pollset_worker *worker) {
+ grpc_pollset_worker **worker) {
if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
@@ -359,7 +364,7 @@ static int add_plucker(grpc_completion_queue *cc, void *tag,
}
static void del_plucker(grpc_completion_queue *cc, void *tag,
- grpc_pollset_worker *worker) {
+ grpc_pollset_worker **worker) {
int i;
for (i = 0; i < cc->num_pluckers; i++) {
if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
@@ -376,7 +381,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
- grpc_pollset_worker worker;
+ grpc_pollset_worker *worker = NULL;
gpr_timespec now;
int first_loop = 1;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -395,7 +400,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(cc->mu);
for (;;) {
prev = &cc->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
@@ -405,7 +410,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
if (c == cc->completed_tail) {
cc->completed_tail = prev;
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
@@ -415,7 +420,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
prev = c;
}
if (cc->shutdown) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_SHUTDOWN;
break;
@@ -425,7 +430,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
"Too many outstanding grpc_completion_queue_pluck calls: maximum "
"is %d",
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
/* TODO(ctiller): should we use a different result here */
ret.type = GRPC_QUEUE_TIMEOUT;
@@ -434,7 +439,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
del_plucker(cc, tag, &worker);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
break;
@@ -447,11 +452,11 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec iteration_deadline = deadline;
if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) {
GPR_TIMER_MARK("alarm_triggered", 0);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
grpc_exec_ctx_flush(&exec_ctx);
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(cc->mu);
} else {
- grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now,
+ grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now,
iteration_deadline);
}
del_plucker(cc, tag, &worker);
@@ -472,9 +477,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(cc->mu);
if (cc->shutdown_called) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
@@ -482,9 +487,10 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->pending_events)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done);
+ grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc),
+ &cc->pollset_shutdown_done);
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(cc->mu);
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
@@ -498,7 +504,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return &cc->pollset;
+ return POLLSET_FROM_CQ(cc);
}
void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 9c04426d87..aadfac4c91 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -130,9 +130,9 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
bool success) {
connector *c = arg;
- grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base,
- c->connecting_endpoint,
- on_secure_handshake_done, c);
+ grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector,
+ c->connecting_endpoint,
+ on_secure_handshake_done, c);
}
static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
@@ -153,9 +153,8 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
&c->initial_string_sent);
} else {
- grpc_security_connector_do_handshake(exec_ctx,
- &c->security_connector->base, tcp,
- on_secure_handshake_done, c);
+ grpc_channel_security_connector_do_handshake(
+ exec_ctx, c->security_connector, tcp, on_secure_handshake_done, c);
}
} else {
memset(c->result, 0, sizeof(*c->result));
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index d76d31be23..891aad6ef2 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -417,7 +417,7 @@ typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
uint8_t fetching;
- uint8_t sent_initial_metadata;
+ bool sent_initial_metadata;
uint8_t sent_message;
uint8_t sent_trailing_metadata;
uint8_t read_closed;
@@ -509,7 +509,7 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
-void grpc_chttp2_list_add_writable_stream(
+bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
/** Get a writable stream
@@ -519,14 +519,13 @@ int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_global **stream_global,
grpc_chttp2_stream_writing **stream_writing);
-void grpc_chttp2_list_remove_writable_stream(
+bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global *stream_global);
+ grpc_chttp2_stream_global *stream_global) GRPC_MUST_USE_RESULT;
-/* returns 1 if stream added, 0 if it was already present */
-int grpc_chttp2_list_add_writing_stream(
+void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
- grpc_chttp2_stream_writing *stream_writing) GRPC_MUST_USE_RESULT;
+ grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_have_writing_streams(
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_list_pop_writing_stream(
@@ -770,4 +769,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_parsing *parsing,
const uint8_t *opaque_8bytes);
+/** add a ref to the stream and add it to the writable list;
+ ref will be dropped in writing.c */
+void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global);
+
#endif
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 8fdebd7f13..0516f39fa9 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -149,7 +149,7 @@ void grpc_chttp2_publish_reads(
if (was_zero && !is_zero) {
while (grpc_chttp2_list_pop_stalled_by_transport(transport_global,
&stream_global)) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -178,7 +178,7 @@ void grpc_chttp2_publish_reads(
outgoing_window);
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
stream_global->max_recv_bytes -= (uint32_t)GPR_MIN(
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index b284c78818..60fe735cfc 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -100,11 +100,14 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
}
}
-static void stream_list_maybe_remove(grpc_chttp2_transport *t,
+static bool stream_list_maybe_remove(grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
stream_list_remove(t, s, id);
+ return true;
+ } else {
+ return false;
}
}
@@ -125,23 +128,24 @@ static void stream_list_add_tail(grpc_chttp2_transport *t,
s->included[id] = 1;
}
-static int stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_chttp2_stream_list_id id) {
+static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_list_id id) {
if (s->included[id]) {
- return 0;
+ return false;
}
stream_list_add_tail(t, s, id);
- return 1;
+ return true;
}
/* wrappers for specializations */
-void grpc_chttp2_list_add_writable_stream(
+bool grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
GPR_ASSERT(stream_global->id != 0);
- stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
- STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
+ return stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_WRITABLE);
}
int grpc_chttp2_list_pop_writable_stream(
@@ -159,20 +163,20 @@ int grpc_chttp2_list_pop_writable_stream(
return r;
}
-void grpc_chttp2_list_remove_writable_stream(
+bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
- stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
- STREAM_FROM_GLOBAL(stream_global),
- GRPC_CHTTP2_LIST_WRITABLE);
+ return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
+ STREAM_FROM_GLOBAL(stream_global),
+ GRPC_CHTTP2_LIST_WRITABLE);
}
-int grpc_chttp2_list_add_writing_stream(
+void grpc_chttp2_list_add_writing_stream(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
- return stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
- STREAM_FROM_WRITING(stream_writing),
- GRPC_CHTTP2_LIST_WRITING);
+ GPR_ASSERT(stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
+ STREAM_FROM_WRITING(stream_writing),
+ GRPC_CHTTP2_LIST_WRITING));
}
int grpc_chttp2_list_have_writing_streams(
@@ -332,7 +336,7 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport(
while (stream_list_pop(transport, &stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
if (is_window_available) {
- grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
+ grpc_chttp2_become_writable(&transport->global, &stream->global);
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
&stream->writing);
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 356fd8174a..107725cbc7 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -83,7 +83,8 @@ int grpc_chttp2_unlocking_check_writes(
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- uint8_t sent_initial_metadata;
+ bool sent_initial_metadata = stream_writing->sent_initial_metadata;
+ bool become_writable = false;
stream_writing->id = stream_global->id;
stream_writing->read_closed = stream_global->read_closed;
@@ -92,16 +93,12 @@ int grpc_chttp2_unlocking_check_writes(
outgoing_window, stream_global,
outgoing_window);
- sent_initial_metadata = stream_writing->sent_initial_metadata;
if (!sent_initial_metadata && stream_global->send_initial_metadata) {
stream_writing->send_initial_metadata =
stream_global->send_initial_metadata;
stream_global->send_initial_metadata = NULL;
- if (grpc_chttp2_list_add_writing_stream(transport_writing,
- stream_writing)) {
- GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
- }
- sent_initial_metadata = 1;
+ become_writable = true;
+ sent_initial_metadata = true;
}
if (sent_initial_metadata) {
if (stream_global->send_message != NULL) {
@@ -128,10 +125,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->flow_controlled_buffer.length > 0) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
- if (grpc_chttp2_list_add_writing_stream(transport_writing,
- stream_writing)) {
- GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
- }
+ become_writable = true;
} else {
grpc_chttp2_list_add_stalled_by_transport(transport_writing,
stream_writing);
@@ -141,10 +135,7 @@ int grpc_chttp2_unlocking_check_writes(
stream_writing->send_trailing_metadata =
stream_global->send_trailing_metadata;
stream_global->send_trailing_metadata = NULL;
- if (grpc_chttp2_list_add_writing_stream(transport_writing,
- stream_writing)) {
- GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
- }
+ become_writable = true;
}
}
@@ -153,10 +144,13 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
announce_window, stream_global,
unannounced_incoming_window_for_writing);
- if (grpc_chttp2_list_add_writing_stream(transport_writing,
- stream_writing)) {
- GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
- }
+ become_writable = true;
+ }
+
+ if (become_writable) {
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
+ } else {
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
}
@@ -310,10 +304,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
(stream_writing->send_message && !stream_writing->fetching)) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
- if (grpc_chttp2_list_add_writing_stream(transport_writing,
- stream_writing)) {
- /* do nothing - already reffed */
- }
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index b9f511e946..a1ca78d4c4 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -142,7 +142,7 @@ static void incoming_byte_stream_update_flow_control(
static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global);
-/*
+/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -521,7 +521,6 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->global.id) == NULL);
}
- grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
&s->global);
grpc_chttp2_list_remove_stalled_by_transport(&t->global, &s->global);
@@ -583,7 +582,7 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
return &accepting->parsing;
}
-/*
+/*******************************************************************************
* LOCK MANAGEMENT
*/
@@ -611,10 +610,18 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_END("unlock", 0);
}
-/*
+/*******************************************************************************
* OUTPUT PROCESSING
*/
+void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global) {
+ if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed &&
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) {
+ GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
+ }
+}
+
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value) {
const grpc_chttp2_setting_parameters *sp =
@@ -732,7 +739,7 @@ static void maybe_start_some_streams(
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = 1;
transport_global->concurrent_stream_count++;
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -821,7 +828,7 @@ static void perform_stream_op_locked(
maybe_start_some_streams(exec_ctx, transport_global);
} else {
GPR_ASSERT(stream_global->id != 0);
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
} else {
grpc_chttp2_complete_closure_step(
@@ -838,7 +845,7 @@ static void perform_stream_op_locked(
exec_ctx, &stream_global->send_message_finished, 0);
} else if (stream_global->id != 0) {
stream_global->send_message = op->send_message;
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -858,7 +865,7 @@ static void perform_stream_op_locked(
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -999,7 +1006,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
}
-/*
+/*******************************************************************************
* INPUT PROCESSING
*/
@@ -1064,7 +1071,6 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (!s) {
s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
}
- grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
GPR_ASSERT(s);
s->global.in_stream_map = 0;
if (t->parsing.incoming_stream == &s->parsing) {
@@ -1080,6 +1086,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(exec_ctx, t);
}
+ if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
+ }
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map);
@@ -1331,7 +1340,7 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
is_zero = stream_global->outgoing_window <= 0;
if (was_zero && !is_zero) {
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -1426,7 +1435,7 @@ static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
GPR_TIMER_END("recv_data", 0);
}
-/*
+/*******************************************************************************
* CALLBACK LOOP
*/
@@ -1440,7 +1449,7 @@ static void connectivity_state_set(
state, reason);
}
-/*
+/*******************************************************************************
* POLLSET STUFF
*/
@@ -1468,7 +1477,7 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
unlock(exec_ctx, t);
}
-/*
+/*******************************************************************************
* BYTE STREAM
*/
@@ -1508,7 +1517,7 @@ static void incoming_byte_stream_update_flow_control(
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_become_writable(transport_global, stream_global);
}
}
@@ -1623,7 +1632,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
return incoming_byte_stream;
}
-/*
+/*******************************************************************************
* TRACING
*/
@@ -1709,7 +1718,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
gpr_free(prefix);
}
-/*
+/*******************************************************************************
* INTEGRATION GLUE
*/
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 14912af7df..807ae071a3 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -43,11 +43,13 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+
#include "src/core/profiling/timers.h"
#include "src/core/support/murmur_hash.h"
#include "src/core/support/string.h"
#include "src/core/transport/chttp2/bin_encoder.h"
#include "src/core/transport/static_metadata.h"
+#include "src/core/iomgr/iomgr_internal.h"
/* There are two kinds of mdelem and mdstr instances.
* Static instances are declared in static_metadata.{h,c} and
@@ -227,6 +229,9 @@ void grpc_mdctx_global_shutdown(void) {
if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %d metadata elements were leaked",
shard->count);
+ if (grpc_iomgr_abort_on_leaks()) {
+ abort();
+ }
}
gpr_free(shard->elems);
}
@@ -237,6 +242,9 @@ void grpc_mdctx_global_shutdown(void) {
if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %d metadata strings were leaked",
shard->count);
+ if (grpc_iomgr_abort_on_leaks()) {
+ abort();
+ }
}
gpr_free(shard->strs);
}
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 6e154b629a..3b555fa933 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -45,7 +45,7 @@ void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
#else
void grpc_stream_ref(grpc_stream_refcount *refcount) {
#endif
- gpr_ref(&refcount->refs);
+ gpr_ref_non_zero(&refcount->refs);
}
#ifdef GRPC_STREAM_REFCOUNT_DEBUG