diff options
author | 2016-09-16 13:12:21 -0700 | |
---|---|---|
committer | 2016-09-16 13:12:21 -0700 | |
commit | eaea5caab3e5c34eea8c17bb8df7e97796f1d5b5 (patch) | |
tree | 07199f6c9b642ccafca86180d09280a12c740759 /src/core/lib/transport | |
parent | 6e3084b02ce806d57225d6404a7866349e98f5c7 (diff) | |
parent | ee43d7bbb11a36c91c6f1ebff6bf3da70b7546b2 (diff) |
Merge remote-tracking branch 'upstream/master' into deadline_filter
Diffstat (limited to 'src/core/lib/transport')
-rw-r--r-- | src/core/lib/transport/metadata.c | 18 | ||||
-rw-r--r-- | src/core/lib/transport/metadata.h | 2 | ||||
-rw-r--r-- | src/core/lib/transport/transport.c | 27 | ||||
-rw-r--r-- | src/core/lib/transport/transport.h | 21 |
4 files changed, 60 insertions, 8 deletions
diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index 0677f29766..4b40c275ad 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -278,7 +278,7 @@ static void ref_md_locked(mdtab_shard *shard, internal_metadata *md DEBUG_ARGS) { #ifdef GRPC_METADATA_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "ELM REF:%p:%d->%d: '%s' = '%s'", md, + "ELM REF:%p:%zu->%zu: '%s' = '%s'", (void *)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) + 1, grpc_mdstr_as_c_string((grpc_mdstr *)md->key), @@ -566,7 +566,7 @@ grpc_mdelem *grpc_mdelem_from_metadata_strings(grpc_mdstr *mkey, shard->elems[idx] = md; gpr_mu_init(&md->mu_user_data); #ifdef GRPC_METADATA_REFCOUNT_DEBUG - gpr_log(GPR_DEBUG, "ELM NEW:%p:%d: '%s' = '%s'", md, + gpr_log(GPR_DEBUG, "ELM NEW:%p:%zu: '%s' = '%s'", (void *)md, gpr_atm_no_barrier_load(&md->refcnt), grpc_mdstr_as_c_string((grpc_mdstr *)md->key), grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); @@ -639,7 +639,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) { if (is_mdelem_static(gmd)) return gmd; #ifdef GRPC_METADATA_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "ELM REF:%p:%d->%d: '%s' = '%s'", md, + "ELM REF:%p:%zu->%zu: '%s' = '%s'", (void *)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) + 1, grpc_mdstr_as_c_string((grpc_mdstr *)md->key), @@ -649,7 +649,7 @@ grpc_mdelem *grpc_mdelem_ref(grpc_mdelem *gmd DEBUG_ARGS) { this function - meaning that no adjustment to mdtab_free is necessary, simplifying the logic here to be just an atomic increment */ /* use C assert to have this removed in opt builds */ - assert(gpr_atm_no_barrier_load(&md->refcnt) >= 1); + GPR_ASSERT(gpr_atm_no_barrier_load(&md->refcnt) >= 1); gpr_atm_no_barrier_fetch_add(&md->refcnt, 1); return gmd; } @@ -660,14 +660,16 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { if (is_mdelem_static(gmd)) return; #ifdef GRPC_METADATA_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "ELM UNREF:%p:%d->%d: '%s' = '%s'", md, + "ELM UNREF:%p:%zu->%zu: '%s' = '%s'", (void *)md, gpr_atm_no_barrier_load(&md->refcnt), gpr_atm_no_barrier_load(&md->refcnt) - 1, grpc_mdstr_as_c_string((grpc_mdstr *)md->key), grpc_mdstr_as_c_string((grpc_mdstr *)md->value)); #endif uint32_t hash = GRPC_MDSTR_KV_HASH(md->key->hash, md->value->hash); - if (1 == gpr_atm_full_fetch_add(&md->refcnt, -1)) { + const gpr_atm prev_refcount = gpr_atm_full_fetch_add(&md->refcnt, -1); + GPR_ASSERT(prev_refcount >= 1); + if (1 == prev_refcount) { /* once the refcount hits zero, some other thread can come along and free md at any time: it's unsafe from this point on to access it */ mdtab_shard *shard = @@ -676,10 +678,12 @@ void grpc_mdelem_unref(grpc_mdelem *gmd DEBUG_ARGS) { } } -const char *grpc_mdstr_as_c_string(grpc_mdstr *s) { +const char *grpc_mdstr_as_c_string(const grpc_mdstr *s) { return (const char *)GPR_SLICE_START_PTR(s->slice); } +size_t grpc_mdstr_length(const grpc_mdstr *s) { return GRPC_MDSTR_LENGTH(s); } + grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) { internal_string *s = (internal_string *)gs; if (is_mdstr_static(gs)) return gs; diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index 2b0921c8d7..71eff0acf2 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -147,7 +147,7 @@ void grpc_mdelem_unref(grpc_mdelem *md); /* Recover a char* from a grpc_mdstr. The returned string is null terminated. Does not promise that the returned string has no embedded nulls however. */ -const char *grpc_mdstr_as_c_string(grpc_mdstr *s); +const char *grpc_mdstr_as_c_string(const grpc_mdstr *s); #define GRPC_MDSTR_LENGTH(s) (GPR_SLICE_LENGTH(s->slice)) diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 205a136742..82fc605218 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -32,10 +32,14 @@ */ #include "src/core/lib/transport/transport.h" + +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> + #include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport_impl.h" @@ -247,3 +251,26 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); add_error(op, &op->close_error, error); } + +typedef struct { + grpc_closure outer_on_complete; + grpc_closure *inner_on_complete; + grpc_transport_op op; +} made_transport_op; + +static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + made_transport_op *op = arg; + grpc_exec_ctx_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error), + NULL); + gpr_free(op); +} + +grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { + made_transport_op *op = gpr_malloc(sizeof(*op)); + grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op); + op->inner_on_complete = on_complete; + memset(&op->op, 0, sizeof(op->op)); + op->op.on_consumed = &op->outer_on_complete; + return &op->op; +} diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 26ed6cb839..8dc393fd61 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -100,6 +100,11 @@ void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, void grpc_transport_move_stats(grpc_transport_stream_stats *from, grpc_transport_stream_stats *to); +typedef struct { + grpc_closure closure; + void *args[2]; +} grpc_transport_private_op_data; + /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { @@ -149,6 +154,12 @@ typedef struct grpc_transport_stream_op { /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; + + /*************************************************************************** + * remaining fields are initialized and used at the discretion of the + * transport implementation */ + + grpc_transport_private_op_data transport_private; } grpc_transport_stream_op; /** Transport op: a set of operations to perform on a transport as a whole */ @@ -182,6 +193,12 @@ typedef struct grpc_transport_op { grpc_pollset_set *bind_pollset_set; /** send a ping, call this back if not NULL */ grpc_closure *send_ping; + + /*************************************************************************** + * remaining fields are initialized and used at the discretion of the + * transport implementation */ + + grpc_transport_private_op_data transport_private; } grpc_transport_op; /* Returns the amount of memory required to store a grpc_stream for this @@ -273,6 +290,10 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *transport); +/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to + \a on_consumed and then delete the returned transport op */ +grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed); + #ifdef __cplusplus } #endif |