aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:05:05 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:05:05 -0800
commitad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /src/core/lib/transport
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'src/core/lib/transport')
-rw-r--r--src/core/lib/transport/bdp_estimator.cc4
-rw-r--r--src/core/lib/transport/bdp_estimator.h2
-rw-r--r--src/core/lib/transport/byte_stream.cc62
-rw-r--r--src/core/lib/transport/byte_stream.h27
-rw-r--r--src/core/lib/transport/connectivity_state.cc22
-rw-r--r--src/core/lib/transport/connectivity_state.h10
-rw-r--r--src/core/lib/transport/error_utils.cc10
-rw-r--r--src/core/lib/transport/error_utils.h5
-rw-r--r--src/core/lib/transport/metadata.cc40
-rw-r--r--src/core/lib/transport/metadata.h20
-rw-r--r--src/core/lib/transport/metadata_batch.cc79
-rw-r--r--src/core/lib/transport/metadata_batch.h42
-rw-r--r--src/core/lib/transport/service_config.cc24
-rw-r--r--src/core/lib/transport/service_config.h8
-rw-r--r--src/core/lib/transport/static_metadata.cc2
-rw-r--r--src/core/lib/transport/status_conversion.cc5
-rw-r--r--src/core/lib/transport/status_conversion.h3
-rw-r--r--src/core/lib/transport/transport.cc84
-rw-r--r--src/core/lib/transport/transport.h33
-rw-r--r--src/core/lib/transport/transport_impl.h27
20 files changed, 296 insertions, 213 deletions
diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc
index 5fcc62ec43..bb0e583045 100644
--- a/src/core/lib/transport/bdp_estimator.cc
+++ b/src/core/lib/transport/bdp_estimator.cc
@@ -37,7 +37,7 @@ BdpEstimator::BdpEstimator(const char* name)
bw_est_(0),
name_(name) {}
-grpc_millis BdpEstimator::CompletePing() {
+grpc_millis BdpEstimator::CompletePing(grpc_exec_ctx* exec_ctx) {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_);
double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec;
@@ -78,7 +78,7 @@ grpc_millis BdpEstimator::CompletePing() {
}
ping_state_ = PingState::UNSCHEDULED;
accumulator_ = 0;
- return grpc_core::ExecCtx::Get()->Now() + inter_ping_delay_;
+ return grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_;
}
} // namespace grpc_core
diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h
index e703af121c..df3a86c5f1 100644
--- a/src/core/lib/transport/bdp_estimator.h
+++ b/src/core/lib/transport/bdp_estimator.h
@@ -73,7 +73,7 @@ class BdpEstimator {
}
// Completes a previously started ping, returns when to schedule the next one
- grpc_millis CompletePing();
+ grpc_millis CompletePing(grpc_exec_ctx* exec_ctx);
private:
enum class PingState { UNSCHEDULED, SCHEDULED, STARTED };
diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc
index 8dcb1e0bdb..b8720250e7 100644
--- a/src/core/lib/transport/byte_stream.cc
+++ b/src/core/lib/transport/byte_stream.cc
@@ -25,28 +25,34 @@
#include "src/core/lib/slice/slice_internal.h"
-bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint,
+bool grpc_byte_stream_next(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream, size_t max_size_hint,
grpc_closure* on_complete) {
- return byte_stream->vtable->next(byte_stream, max_size_hint, on_complete);
+ return byte_stream->vtable->next(exec_ctx, byte_stream, max_size_hint,
+ on_complete);
}
-grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream,
+grpc_error* grpc_byte_stream_pull(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_slice* slice) {
- return byte_stream->vtable->pull(byte_stream, slice);
+ return byte_stream->vtable->pull(exec_ctx, byte_stream, slice);
}
-void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream,
+void grpc_byte_stream_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_error* error) {
- byte_stream->vtable->shutdown(byte_stream, error);
+ byte_stream->vtable->shutdown(exec_ctx, byte_stream, error);
}
-void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream) {
- byte_stream->vtable->destroy(byte_stream);
+void grpc_byte_stream_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream) {
+ byte_stream->vtable->destroy(exec_ctx, byte_stream);
}
// grpc_slice_buffer_stream
-static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream,
+static bool slice_buffer_stream_next(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
size_t max_size_hint,
grpc_closure* on_complete) {
grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream;
@@ -54,7 +60,8 @@ static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream,
return true;
}
-static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream,
+static grpc_error* slice_buffer_stream_pull(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_slice* slice) {
grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) {
@@ -67,16 +74,18 @@ static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream,
return GRPC_ERROR_NONE;
}
-static void slice_buffer_stream_shutdown(grpc_byte_stream* byte_stream,
+static void slice_buffer_stream_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_error* error) {
grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = error;
}
-static void slice_buffer_stream_destroy(grpc_byte_stream* byte_stream) {
+static void slice_buffer_stream_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream) {
grpc_slice_buffer_stream* stream = (grpc_slice_buffer_stream*)byte_stream;
- grpc_slice_buffer_reset_and_unref_internal(stream->backing_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer);
GRPC_ERROR_UNREF(stream->shutdown_error);
}
@@ -104,22 +113,25 @@ void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache,
grpc_slice_buffer_init(&cache->cache_buffer);
}
-void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache) {
- grpc_byte_stream_destroy(cache->underlying_stream);
- grpc_slice_buffer_destroy_internal(&cache->cache_buffer);
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream_cache* cache) {
+ grpc_byte_stream_destroy(exec_ctx, cache->underlying_stream);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &cache->cache_buffer);
}
-static bool caching_byte_stream_next(grpc_byte_stream* byte_stream,
+static bool caching_byte_stream_next(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
size_t max_size_hint,
grpc_closure* on_complete) {
grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
if (stream->cursor < stream->cache->cache_buffer.count) return true;
- return grpc_byte_stream_next(stream->cache->underlying_stream, max_size_hint,
- on_complete);
+ return grpc_byte_stream_next(exec_ctx, stream->cache->underlying_stream,
+ max_size_hint, on_complete);
}
-static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream,
+static grpc_error* caching_byte_stream_pull(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_slice* slice) {
grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) {
@@ -132,7 +144,7 @@ static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream,
return GRPC_ERROR_NONE;
}
grpc_error* error =
- grpc_byte_stream_pull(stream->cache->underlying_stream, slice);
+ grpc_byte_stream_pull(exec_ctx, stream->cache->underlying_stream, slice);
if (error == GRPC_ERROR_NONE) {
++stream->cursor;
grpc_slice_buffer_add(&stream->cache->cache_buffer,
@@ -141,15 +153,17 @@ static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream,
return error;
}
-static void caching_byte_stream_shutdown(grpc_byte_stream* byte_stream,
+static void caching_byte_stream_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_error* error) {
grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = GRPC_ERROR_REF(error);
- grpc_byte_stream_shutdown(stream->cache->underlying_stream, error);
+ grpc_byte_stream_shutdown(exec_ctx, stream->cache->underlying_stream, error);
}
-static void caching_byte_stream_destroy(grpc_byte_stream* byte_stream) {
+static void caching_byte_stream_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream) {
grpc_caching_byte_stream* stream = (grpc_caching_byte_stream*)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
}
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index 52c7a07f56..6bca154cb5 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -31,11 +31,13 @@
typedef struct grpc_byte_stream grpc_byte_stream;
typedef struct {
- bool (*next)(grpc_byte_stream* byte_stream, size_t max_size_hint,
- grpc_closure* on_complete);
- grpc_error* (*pull)(grpc_byte_stream* byte_stream, grpc_slice* slice);
- void (*shutdown)(grpc_byte_stream* byte_stream, grpc_error* error);
- void (*destroy)(grpc_byte_stream* byte_stream);
+ bool (*next)(grpc_exec_ctx* exec_ctx, grpc_byte_stream* byte_stream,
+ size_t max_size_hint, grpc_closure* on_complete);
+ grpc_error* (*pull)(grpc_exec_ctx* exec_ctx, grpc_byte_stream* byte_stream,
+ grpc_slice* slice);
+ void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_byte_stream* byte_stream,
+ grpc_error* error);
+ void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_byte_stream* byte_stream);
} grpc_byte_stream_vtable;
struct grpc_byte_stream {
@@ -50,7 +52,8 @@ struct grpc_byte_stream {
//
// max_size_hint can be set as a hint as to the maximum number
// of bytes that would be acceptable to read.
-bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint,
+bool grpc_byte_stream_next(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream, size_t max_size_hint,
grpc_closure* on_complete);
// Returns the next slice in the byte stream when it is ready (indicated by
@@ -58,7 +61,8 @@ bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint,
// grpc_byte_stream_next is called).
//
// Once a slice is returned into *slice, it is owned by the caller.
-grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream,
+grpc_error* grpc_byte_stream_pull(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_slice* slice);
// Shuts down the byte stream.
@@ -68,10 +72,12 @@ grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream,
//
// The next call to grpc_byte_stream_pull() (if any) will return the error
// passed to grpc_byte_stream_shutdown().
-void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream,
+void grpc_byte_stream_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream,
grpc_error* error);
-void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream);
+void grpc_byte_stream_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream* byte_stream);
// grpc_slice_buffer_stream
//
@@ -113,7 +119,8 @@ void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache,
grpc_byte_stream* underlying_stream);
// Must not be called while still in use by a grpc_caching_byte_stream.
-void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache);
+void grpc_byte_stream_cache_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_byte_stream_cache* cache);
typedef struct {
grpc_byte_stream base;
diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc
index c42cc9c8d0..e7e5dbd1f1 100644
--- a/src/core/lib/transport/connectivity_state.cc
+++ b/src/core/lib/transport/connectivity_state.cc
@@ -51,7 +51,8 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
tracker->name = gpr_strdup(name);
}
-void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) {
+void grpc_connectivity_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_connectivity_state_tracker* tracker) {
grpc_error* error;
grpc_connectivity_state_watcher* w;
while ((w = tracker->watchers)) {
@@ -64,7 +65,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) {
error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner");
}
- GRPC_CLOSURE_SCHED(w->notify, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, w->notify, error);
gpr_free(w);
}
GRPC_ERROR_UNREF(tracker->current_error);
@@ -104,8 +105,8 @@ bool grpc_connectivity_state_has_watchers(
}
bool grpc_connectivity_state_notify_on_state_change(
- grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
- grpc_closure* notify) {
+ grpc_exec_ctx* exec_ctx, grpc_connectivity_state_tracker* tracker,
+ grpc_connectivity_state* current, grpc_closure* notify) {
grpc_connectivity_state cur =
(grpc_connectivity_state)gpr_atm_no_barrier_load(
&tracker->current_state_atm);
@@ -122,7 +123,7 @@ bool grpc_connectivity_state_notify_on_state_change(
if (current == nullptr) {
grpc_connectivity_state_watcher* w = tracker->watchers;
if (w != nullptr && w->notify == notify) {
- GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify, GRPC_ERROR_CANCELLED);
tracker->watchers = w->next;
gpr_free(w);
return false;
@@ -130,7 +131,7 @@ bool grpc_connectivity_state_notify_on_state_change(
while (w != nullptr) {
grpc_connectivity_state_watcher* rm_candidate = w->next;
if (rm_candidate != nullptr && rm_candidate->notify == notify) {
- GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, notify, GRPC_ERROR_CANCELLED);
w->next = w->next->next;
gpr_free(rm_candidate);
return false;
@@ -141,7 +142,8 @@ bool grpc_connectivity_state_notify_on_state_change(
} else {
if (cur != *current) {
*current = cur;
- GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_REF(tracker->current_error));
+ GRPC_CLOSURE_SCHED(exec_ctx, notify,
+ GRPC_ERROR_REF(tracker->current_error));
} else {
grpc_connectivity_state_watcher* w =
(grpc_connectivity_state_watcher*)gpr_malloc(sizeof(*w));
@@ -154,7 +156,8 @@ bool grpc_connectivity_state_notify_on_state_change(
}
}
-void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
+void grpc_connectivity_state_set(grpc_exec_ctx* exec_ctx,
+ grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state state,
grpc_error* error, const char* reason) {
grpc_connectivity_state cur =
@@ -192,7 +195,8 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name,
w->notify);
}
- GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_REF(tracker->current_error));
+ GRPC_CLOSURE_SCHED(exec_ctx, w->notify,
+ GRPC_ERROR_REF(tracker->current_error));
gpr_free(w);
}
}
diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h
index c3a50f3211..653637ebea 100644
--- a/src/core/lib/transport/connectivity_state.h
+++ b/src/core/lib/transport/connectivity_state.h
@@ -51,11 +51,13 @@ const char* grpc_connectivity_state_name(grpc_connectivity_state state);
void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state init_state,
const char* name);
-void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker);
+void grpc_connectivity_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_connectivity_state_tracker* tracker);
/** Set connectivity state; not thread safe; access must be serialized with an
* external lock */
-void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
+void grpc_connectivity_state_set(grpc_exec_ctx* exec_ctx,
+ grpc_connectivity_state_tracker* tracker,
grpc_connectivity_state state,
grpc_error* associated_error,
const char* reason);
@@ -79,7 +81,7 @@ grpc_connectivity_state grpc_connectivity_state_get(
case).
Access must be serialized with an external lock. */
bool grpc_connectivity_state_notify_on_state_change(
- grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
- grpc_closure* notify);
+ grpc_exec_ctx* exec_ctx, grpc_connectivity_state_tracker* tracker,
+ grpc_connectivity_state* current, grpc_closure* notify);
#endif /* GRPC_CORE_LIB_TRANSPORT_CONNECTIVITY_STATE_H */
diff --git a/src/core/lib/transport/error_utils.cc b/src/core/lib/transport/error_utils.cc
index ffaf327081..69c8ae6de3 100644
--- a/src/core/lib/transport/error_utils.cc
+++ b/src/core/lib/transport/error_utils.cc
@@ -40,9 +40,9 @@ static grpc_error* recursively_find_error_with_field(grpc_error* error,
return nullptr;
}
-void grpc_error_get_status(grpc_error* error, grpc_millis deadline,
- grpc_status_code* code, grpc_slice* slice,
- grpc_http2_error_code* http_error,
+void grpc_error_get_status(grpc_exec_ctx* exec_ctx, grpc_error* error,
+ grpc_millis deadline, grpc_status_code* code,
+ grpc_slice* slice, grpc_http2_error_code* http_error,
const char** error_string) {
// Start with the parent error and recurse through the tree of children
// until we find the first one that has a status code.
@@ -65,8 +65,8 @@ void grpc_error_get_status(grpc_error* error, grpc_millis deadline,
status = (grpc_status_code)integer;
} else if (grpc_error_get_int(found_error, GRPC_ERROR_INT_HTTP2_ERROR,
&integer)) {
- status = grpc_http2_error_to_grpc_status((grpc_http2_error_code)integer,
- deadline);
+ status = grpc_http2_error_to_grpc_status(
+ exec_ctx, (grpc_http2_error_code)integer, deadline);
}
if (code != nullptr) *code = status;
diff --git a/src/core/lib/transport/error_utils.h b/src/core/lib/transport/error_utils.h
index 4100f65d6d..8b006ae992 100644
--- a/src/core/lib/transport/error_utils.h
+++ b/src/core/lib/transport/error_utils.h
@@ -30,8 +30,9 @@
/// be populated with the entire error string. If any of the attributes (code,
/// msg, http_status, error_string) are unneeded, they can be passed as
/// NULL.
-void grpc_error_get_status(grpc_error* error, grpc_millis deadline,
- grpc_status_code* code, grpc_slice* slice,
+void grpc_error_get_status(grpc_exec_ctx* exec_ctx, grpc_error* error,
+ grpc_millis deadline, grpc_status_code* code,
+ grpc_slice* slice,
grpc_http2_error_code* http_status,
const char** error_string);
diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc
index 5f0673e014..0f30c7533d 100644
--- a/src/core/lib/transport/metadata.cc
+++ b/src/core/lib/transport/metadata.cc
@@ -108,7 +108,7 @@ typedef struct mdtab_shard {
static mdtab_shard g_shards[SHARD_COUNT];
-static void gc_mdtab(mdtab_shard* shard);
+static void gc_mdtab(grpc_exec_ctx* exec_ctx, mdtab_shard* shard);
void grpc_mdctx_global_init(void) {
/* initialize shards */
@@ -123,11 +123,11 @@ void grpc_mdctx_global_init(void) {
}
}
-void grpc_mdctx_global_shutdown() {
+void grpc_mdctx_global_shutdown(grpc_exec_ctx* exec_ctx) {
for (size_t i = 0; i < SHARD_COUNT; i++) {
mdtab_shard* shard = &g_shards[i];
gpr_mu_destroy(&shard->mu);
- gc_mdtab(shard);
+ gc_mdtab(exec_ctx, shard);
/* TODO(ctiller): GPR_ASSERT(shard->count == 0); */
if (shard->count != 0) {
gpr_log(GPR_DEBUG, "WARNING: %" PRIuPTR " metadata elements were leaked",
@@ -165,7 +165,7 @@ static void ref_md_locked(mdtab_shard* shard,
}
}
-static void gc_mdtab(mdtab_shard* shard) {
+static void gc_mdtab(grpc_exec_ctx* exec_ctx, mdtab_shard* shard) {
size_t i;
interned_metadata** prev_next;
interned_metadata *md, *next;
@@ -178,8 +178,8 @@ static void gc_mdtab(mdtab_shard* shard) {
void* user_data = (void*)gpr_atm_no_barrier_load(&md->user_data);
next = md->bucket_next;
if (gpr_atm_acq_load(&md->refcnt) == 0) {
- grpc_slice_unref_internal(md->key);
- grpc_slice_unref_internal(md->value);
+ grpc_slice_unref_internal(exec_ctx, md->key);
+ grpc_slice_unref_internal(exec_ctx, md->value);
if (md->user_data) {
((destroy_user_data_func)gpr_atm_no_barrier_load(
&md->destroy_user_data))(user_data);
@@ -228,17 +228,17 @@ static void grow_mdtab(mdtab_shard* shard) {
GPR_TIMER_END("grow_mdtab", 0);
}
-static void rehash_mdtab(mdtab_shard* shard) {
+static void rehash_mdtab(grpc_exec_ctx* exec_ctx, mdtab_shard* shard) {
if (gpr_atm_no_barrier_load(&shard->free_estimate) >
(gpr_atm)(shard->capacity / 4)) {
- gc_mdtab(shard);
+ gc_mdtab(exec_ctx, shard);
} else {
grow_mdtab(shard);
}
}
grpc_mdelem grpc_mdelem_create(
- grpc_slice key, grpc_slice value,
+ grpc_exec_ctx* exec_ctx, grpc_slice key, grpc_slice value,
grpc_mdelem_data* compatible_external_backing_store) {
if (!grpc_slice_is_interned(key) || !grpc_slice_is_interned(value)) {
if (compatible_external_backing_store != nullptr) {
@@ -318,7 +318,7 @@ grpc_mdelem grpc_mdelem_create(
shard->count++;
if (shard->count > shard->capacity * 2) {
- rehash_mdtab(shard);
+ rehash_mdtab(exec_ctx, shard);
}
gpr_mu_unlock(&shard->mu);
@@ -328,20 +328,22 @@ grpc_mdelem grpc_mdelem_create(
return GRPC_MAKE_MDELEM(md, GRPC_MDELEM_STORAGE_INTERNED);
}
-grpc_mdelem grpc_mdelem_from_slices(grpc_slice key, grpc_slice value) {
- grpc_mdelem out = grpc_mdelem_create(key, value, nullptr);
- grpc_slice_unref_internal(key);
- grpc_slice_unref_internal(value);
+grpc_mdelem grpc_mdelem_from_slices(grpc_exec_ctx* exec_ctx, grpc_slice key,
+ grpc_slice value) {
+ grpc_mdelem out = grpc_mdelem_create(exec_ctx, key, value, nullptr);
+ grpc_slice_unref_internal(exec_ctx, key);
+ grpc_slice_unref_internal(exec_ctx, value);
return out;
}
-grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata* metadata) {
+grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_exec_ctx* exec_ctx,
+ grpc_metadata* metadata) {
bool changed = false;
grpc_slice key_slice =
grpc_slice_maybe_static_intern(metadata->key, &changed);
grpc_slice value_slice =
grpc_slice_maybe_static_intern(metadata->value, &changed);
- return grpc_mdelem_create(key_slice, value_slice,
+ return grpc_mdelem_create(exec_ctx, key_slice, value_slice,
changed ? nullptr : (grpc_mdelem_data*)metadata);
}
@@ -415,7 +417,7 @@ grpc_mdelem grpc_mdelem_ref(grpc_mdelem gmd DEBUG_ARGS) {
return gmd;
}
-void grpc_mdelem_unref(grpc_mdelem gmd DEBUG_ARGS) {
+void grpc_mdelem_unref(grpc_exec_ctx* exec_ctx, grpc_mdelem gmd DEBUG_ARGS) {
switch (GRPC_MDELEM_STORAGE(gmd)) {
case GRPC_MDELEM_STORAGE_EXTERNAL:
case GRPC_MDELEM_STORAGE_STATIC:
@@ -463,8 +465,8 @@ void grpc_mdelem_unref(grpc_mdelem gmd DEBUG_ARGS) {
const gpr_atm prev_refcount = gpr_atm_full_fetch_add(&md->refcnt, -1);
GPR_ASSERT(prev_refcount >= 1);
if (1 == prev_refcount) {
- grpc_slice_unref_internal(md->key);
- grpc_slice_unref_internal(md->value);
+ grpc_slice_unref_internal(exec_ctx, md->key);
+ grpc_slice_unref_internal(exec_ctx, md->value);
gpr_free(md);
}
break;
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index 78e6beff9b..8d4868d031 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -107,18 +107,20 @@ struct grpc_mdelem {
(uintptr_t)GRPC_MDELEM_STORAGE_INTERNED_BIT))
/* Unrefs the slices. */
-grpc_mdelem grpc_mdelem_from_slices(grpc_slice key, grpc_slice value);
+grpc_mdelem grpc_mdelem_from_slices(grpc_exec_ctx* exec_ctx, grpc_slice key,
+ grpc_slice value);
/* Cheaply convert a grpc_metadata to a grpc_mdelem; may use the grpc_metadata
object as backing storage (so lifetimes should align) */
-grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata* metadata);
+grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_exec_ctx* exec_ctx,
+ grpc_metadata* metadata);
/* Does not unref the slices; if a new non-interned mdelem is needed, allocates
one if compatible_external_backing_store is NULL, or uses
compatible_external_backing_store if it is non-NULL (in which case it's the
users responsibility to ensure that it outlives usage) */
grpc_mdelem grpc_mdelem_create(
- grpc_slice key, grpc_slice value,
+ grpc_exec_ctx* exec_ctx, grpc_slice key, grpc_slice value,
grpc_mdelem_data* compatible_external_backing_store);
bool grpc_mdelem_eq(grpc_mdelem a, grpc_mdelem b);
@@ -134,14 +136,16 @@ void* grpc_mdelem_set_user_data(grpc_mdelem md, void (*destroy_func)(void*),
#ifndef NDEBUG
#define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s), __FILE__, __LINE__)
-#define GRPC_MDELEM_UNREF(s) grpc_mdelem_unref((s), __FILE__, __LINE__)
+#define GRPC_MDELEM_UNREF(exec_ctx, s) \
+ grpc_mdelem_unref((exec_ctx), (s), __FILE__, __LINE__)
grpc_mdelem grpc_mdelem_ref(grpc_mdelem md, const char* file, int line);
-void grpc_mdelem_unref(grpc_mdelem md, const char* file, int line);
+void grpc_mdelem_unref(grpc_exec_ctx* exec_ctx, grpc_mdelem md,
+ const char* file, int line);
#else
#define GRPC_MDELEM_REF(s) grpc_mdelem_ref((s))
-#define GRPC_MDELEM_UNREF(s) grpc_mdelem_unref((s))
+#define GRPC_MDELEM_UNREF(exec_ctx, s) grpc_mdelem_unref((exec_ctx), (s))
grpc_mdelem grpc_mdelem_ref(grpc_mdelem md);
-void grpc_mdelem_unref(grpc_mdelem md);
+void grpc_mdelem_unref(grpc_exec_ctx* exec_ctx, grpc_mdelem md);
#endif
#define GRPC_MDKEY(md) (GRPC_MDELEM_DATA(md)->key)
@@ -158,6 +162,6 @@ void grpc_mdelem_unref(grpc_mdelem md);
#define GRPC_MDSTR_KV_HASH(k_hash, v_hash) (GPR_ROTL((k_hash), 2) ^ (v_hash))
void grpc_mdctx_global_init(void);
-void grpc_mdctx_global_shutdown();
+void grpc_mdctx_global_shutdown(grpc_exec_ctx* exec_ctx);
#endif /* GRPC_CORE_LIB_TRANSPORT_METADATA_H */
diff --git a/src/core/lib/transport/metadata_batch.cc b/src/core/lib/transport/metadata_batch.cc
index 9c95339ba0..5817765aa3 100644
--- a/src/core/lib/transport/metadata_batch.cc
+++ b/src/core/lib/transport/metadata_batch.cc
@@ -51,7 +51,8 @@ static void assert_valid_list(grpc_mdelem_list* list) {
#endif /* NDEBUG */
}
-static void assert_valid_callouts(grpc_metadata_batch* batch) {
+static void assert_valid_callouts(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch) {
#ifndef NDEBUG
for (grpc_linked_mdelem* l = batch->list.head; l != nullptr; l = l->next) {
grpc_slice key_interned = grpc_slice_intern(GRPC_MDKEY(l->md));
@@ -60,7 +61,7 @@ static void assert_valid_callouts(grpc_metadata_batch* batch) {
if (callout_idx != GRPC_BATCH_CALLOUTS_COUNT) {
GPR_ASSERT(batch->idx.array[callout_idx] == l);
}
- grpc_slice_unref_internal(key_interned);
+ grpc_slice_unref_internal(exec_ctx, key_interned);
}
#endif
}
@@ -76,10 +77,11 @@ void grpc_metadata_batch_init(grpc_metadata_batch* batch) {
batch->deadline = GRPC_MILLIS_INF_FUTURE;
}
-void grpc_metadata_batch_destroy(grpc_metadata_batch* batch) {
+void grpc_metadata_batch_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch) {
grpc_linked_mdelem* l;
for (l = batch->list.head; l; l = l->next) {
- GRPC_MDELEM_UNREF(l->md);
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
}
}
@@ -124,12 +126,13 @@ static void maybe_unlink_callout(grpc_metadata_batch* batch,
batch->idx.array[idx] = nullptr;
}
-grpc_error* grpc_metadata_batch_add_head(grpc_metadata_batch* batch,
+grpc_error* grpc_metadata_batch_add_head(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_linked_mdelem* storage,
grpc_mdelem elem_to_add) {
GPR_ASSERT(!GRPC_MDISNULL(elem_to_add));
storage->md = elem_to_add;
- return grpc_metadata_batch_link_head(batch, storage);
+ return grpc_metadata_batch_link_head(exec_ctx, batch, storage);
}
static void link_head(grpc_mdelem_list* list, grpc_linked_mdelem* storage) {
@@ -147,25 +150,27 @@ static void link_head(grpc_mdelem_list* list, grpc_linked_mdelem* storage) {
assert_valid_list(list);
}
-grpc_error* grpc_metadata_batch_link_head(grpc_metadata_batch* batch,
+grpc_error* grpc_metadata_batch_link_head(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_linked_mdelem* storage) {
- assert_valid_callouts(batch);
+ assert_valid_callouts(exec_ctx, batch);
grpc_error* err = maybe_link_callout(batch, storage);
if (err != GRPC_ERROR_NONE) {
- assert_valid_callouts(batch);
+ assert_valid_callouts(exec_ctx, batch);
return err;
}
link_head(&batch->list, storage);
- assert_valid_callouts(batch);
+ assert_valid_callouts(exec_ctx, batch);
return GRPC_ERROR_NONE;
}
-grpc_error* grpc_metadata_batch_add_tail(grpc_metadata_batch* batch,
+grpc_error* grpc_metadata_batch_add_tail(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_linked_mdelem* storage,
grpc_mdelem elem_to_add) {
GPR_ASSERT(!GRPC_MDISNULL(elem_to_add));
storage->md = elem_to_add;
- return grpc_metadata_batch_link_tail(batch, storage);
+ return grpc_metadata_batch_link_tail(exec_ctx, batch, storage);
}
static void link_tail(grpc_mdelem_list* list, grpc_linked_mdelem* storage) {
@@ -184,16 +189,17 @@ static void link_tail(grpc_mdelem_list* list, grpc_linked_mdelem* storage) {
assert_valid_list(list);
}
-grpc_error* grpc_metadata_batch_link_tail(grpc_metadata_batch* batch,
+grpc_error* grpc_metadata_batch_link_tail(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_linked_mdelem* storage) {
- assert_valid_callouts(batch);
+ assert_valid_callouts(exec_ctx, batch);
grpc_error* err = maybe_link_callout(batch, storage);
if (err != GRPC_ERROR_NONE) {
- assert_valid_callouts(batch);
+ assert_valid_callouts(exec_ctx, batch);
return err;
}
link_tail(&batch->list, storage);
- assert_valid_callouts(batch);
+ assert_valid_callouts(exec_ctx, batch);
return GRPC_ERROR_NONE;
}
@@ -214,28 +220,31 @@ static void unlink_storage(grpc_mdelem_list* list,
assert_valid_list(list);
}
-void grpc_metadata_batch_remove(grpc_metadata_batch* batch,
+void grpc_metadata_batch_remove(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_linked_mdelem* storage) {
- assert_valid_callouts(batch);
+ assert_valid_callouts(exec_ctx, batch);
maybe_unlink_callout(batch, storage);
unlink_storage(&batch->list, storage);
- GRPC_MDELEM_UNREF(storage->md);
- assert_valid_callouts(batch);
+ GRPC_MDELEM_UNREF(exec_ctx, storage->md);
+ assert_valid_callouts(exec_ctx, batch);
}
-void grpc_metadata_batch_set_value(grpc_linked_mdelem* storage,
+void grpc_metadata_batch_set_value(grpc_exec_ctx* exec_ctx,
+ grpc_linked_mdelem* storage,
grpc_slice value) {
grpc_mdelem old_mdelem = storage->md;
grpc_mdelem new_mdelem = grpc_mdelem_from_slices(
- grpc_slice_ref_internal(GRPC_MDKEY(old_mdelem)), value);
+ exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old_mdelem)), value);
storage->md = new_mdelem;
- GRPC_MDELEM_UNREF(old_mdelem);
+ GRPC_MDELEM_UNREF(exec_ctx, old_mdelem);
}
-grpc_error* grpc_metadata_batch_substitute(grpc_metadata_batch* batch,
+grpc_error* grpc_metadata_batch_substitute(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_linked_mdelem* storage,
grpc_mdelem new_mdelem) {
- assert_valid_callouts(batch);
+ assert_valid_callouts(exec_ctx, batch);
grpc_error* error = GRPC_ERROR_NONE;
grpc_mdelem old_mdelem = storage->md;
if (!grpc_slice_eq(GRPC_MDKEY(new_mdelem), GRPC_MDKEY(old_mdelem))) {
@@ -244,18 +253,19 @@ grpc_error* grpc_metadata_batch_substitute(grpc_metadata_batch* batch,
error = maybe_link_callout(batch, storage);
if (error != GRPC_ERROR_NONE) {
unlink_storage(&batch->list, storage);
- GRPC_MDELEM_UNREF(storage->md);
+ GRPC_MDELEM_UNREF(exec_ctx, storage->md);
}
} else {
storage->md = new_mdelem;
}
- GRPC_MDELEM_UNREF(old_mdelem);
- assert_valid_callouts(batch);
+ GRPC_MDELEM_UNREF(exec_ctx, old_mdelem);
+ assert_valid_callouts(exec_ctx, batch);
return error;
}
-void grpc_metadata_batch_clear(grpc_metadata_batch* batch) {
- grpc_metadata_batch_destroy(batch);
+void grpc_metadata_batch_clear(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch) {
+ grpc_metadata_batch_destroy(exec_ctx, batch);
grpc_metadata_batch_init(batch);
}
@@ -282,7 +292,8 @@ static void add_error(grpc_error** composite, grpc_error* error,
*composite = grpc_error_add_child(*composite, error);
}
-grpc_error* grpc_metadata_batch_filter(grpc_metadata_batch* batch,
+grpc_error* grpc_metadata_batch_filter(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_metadata_batch_filter_func func,
void* user_data,
const char* composite_error_string) {
@@ -290,12 +301,12 @@ grpc_error* grpc_metadata_batch_filter(grpc_metadata_batch* batch,
grpc_error* error = GRPC_ERROR_NONE;
while (l) {
grpc_linked_mdelem* next = l->next;
- grpc_filtered_mdelem new_mdelem = func(user_data, l->md);
+ grpc_filtered_mdelem new_mdelem = func(exec_ctx, user_data, l->md);
add_error(&error, new_mdelem.error, composite_error_string);
if (GRPC_MDISNULL(new_mdelem.md)) {
- grpc_metadata_batch_remove(batch, l);
+ grpc_metadata_batch_remove(exec_ctx, batch, l);
} else if (new_mdelem.md.payload != l->md.payload) {
- grpc_metadata_batch_substitute(batch, l, new_mdelem.md);
+ grpc_metadata_batch_substitute(exec_ctx, batch, l, new_mdelem.md);
}
l = next;
}
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index 8353a426f8..adfb2d8069 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -53,23 +53,28 @@ typedef struct grpc_metadata_batch {
} grpc_metadata_batch;
void grpc_metadata_batch_init(grpc_metadata_batch* batch);
-void grpc_metadata_batch_destroy(grpc_metadata_batch* batch);
-void grpc_metadata_batch_clear(grpc_metadata_batch* batch);
+void grpc_metadata_batch_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch);
+void grpc_metadata_batch_clear(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch);
bool grpc_metadata_batch_is_empty(grpc_metadata_batch* batch);
/* Returns the transport size of the batch. */
size_t grpc_metadata_batch_size(grpc_metadata_batch* batch);
/** Remove \a storage from the batch, unreffing the mdelem contained */
-void grpc_metadata_batch_remove(grpc_metadata_batch* batch,
+void grpc_metadata_batch_remove(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_linked_mdelem* storage);
/** Substitute a new mdelem for an old value */
-grpc_error* grpc_metadata_batch_substitute(grpc_metadata_batch* batch,
+grpc_error* grpc_metadata_batch_substitute(grpc_exec_ctx* exec_ctx,
+ grpc_metadata_batch* batch,
grpc_linked_mdelem* storage,
grpc_mdelem new_value);
-void grpc_metadata_batch_set_value(grpc_linked_mdelem* storage,
+void grpc_metadata_batch_set_value(grpc_exec_ctx* exec_ctx,
+ grpc_linked_mdelem* storage,
grpc_slice value);
/** Add \a storage to the beginning of \a batch. storage->md is
@@ -77,17 +82,17 @@ void grpc_metadata_batch_set_value(grpc_linked_mdelem* storage,
\a storage is owned by the caller and must survive for the
lifetime of batch. This usually means it should be around
for the lifetime of the call. */
-grpc_error* grpc_metadata_batch_link_head(grpc_metadata_batch* batch,
- grpc_linked_mdelem* storage)
- GRPC_MUST_USE_RESULT;
+grpc_error* grpc_metadata_batch_link_head(
+ grpc_exec_ctx* exec_ctx, grpc_metadata_batch* batch,
+ grpc_linked_mdelem* storage) GRPC_MUST_USE_RESULT;
/** Add \a storage to the end of \a batch. storage->md is
assumed to be valid.
\a storage is owned by the caller and must survive for the
lifetime of batch. This usually means it should be around
for the lifetime of the call. */
-grpc_error* grpc_metadata_batch_link_tail(grpc_metadata_batch* batch,
- grpc_linked_mdelem* storage)
- GRPC_MUST_USE_RESULT;
+grpc_error* grpc_metadata_batch_link_tail(
+ grpc_exec_ctx* exec_ctx, grpc_metadata_batch* batch,
+ grpc_linked_mdelem* storage) GRPC_MUST_USE_RESULT;
/** Add \a elem_to_add as the first element in \a batch, using
\a storage as backing storage for the linked list element.
@@ -96,8 +101,8 @@ grpc_error* grpc_metadata_batch_link_tail(grpc_metadata_batch* batch,
for the lifetime of the call.
Takes ownership of \a elem_to_add */
grpc_error* grpc_metadata_batch_add_head(
- grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
- grpc_mdelem elem_to_add) GRPC_MUST_USE_RESULT;
+ grpc_exec_ctx* exec_ctx, grpc_metadata_batch* batch,
+ grpc_linked_mdelem* storage, grpc_mdelem elem_to_add) GRPC_MUST_USE_RESULT;
/** Add \a elem_to_add as the last element in \a batch, using
\a storage as backing storage for the linked list element.
\a storage is owned by the caller and must survive for the
@@ -105,8 +110,8 @@ grpc_error* grpc_metadata_batch_add_head(
for the lifetime of the call.
Takes ownership of \a elem_to_add */
grpc_error* grpc_metadata_batch_add_tail(
- grpc_metadata_batch* batch, grpc_linked_mdelem* storage,
- grpc_mdelem elem_to_add) GRPC_MUST_USE_RESULT;
+ grpc_exec_ctx* exec_ctx, grpc_metadata_batch* batch,
+ grpc_linked_mdelem* storage, grpc_mdelem elem_to_add) GRPC_MUST_USE_RESULT;
grpc_error* grpc_attach_md_to_error(grpc_error* src, grpc_mdelem md);
@@ -123,10 +128,11 @@ typedef struct {
{ GRPC_ERROR_NONE, GRPC_MDNULL }
typedef grpc_filtered_mdelem (*grpc_metadata_batch_filter_func)(
- void* user_data, grpc_mdelem elem);
+ grpc_exec_ctx* exec_ctx, void* user_data, grpc_mdelem elem);
grpc_error* grpc_metadata_batch_filter(
- grpc_metadata_batch* batch, grpc_metadata_batch_filter_func func,
- void* user_data, const char* composite_error_string) GRPC_MUST_USE_RESULT;
+ grpc_exec_ctx* exec_ctx, grpc_metadata_batch* batch,
+ grpc_metadata_batch_filter_func func, void* user_data,
+ const char* composite_error_string) GRPC_MUST_USE_RESULT;
#ifndef NDEBUG
void grpc_metadata_batch_assert_ok(grpc_metadata_batch* comd);
diff --git a/src/core/lib/transport/service_config.cc b/src/core/lib/transport/service_config.cc
index cbafc33840..adcec8c444 100644
--- a/src/core/lib/transport/service_config.cc
+++ b/src/core/lib/transport/service_config.cc
@@ -152,8 +152,10 @@ static char* parse_json_method_name(grpc_json* json) {
// each name found, incrementing \a idx for each entry added.
// Returns false on error.
static bool parse_json_method_config(
- grpc_json* json, void* (*create_value)(const grpc_json* method_config_json),
- void* (*ref_value)(void* value), void (*unref_value)(void* value),
+ grpc_exec_ctx* exec_ctx, grpc_json* json,
+ void* (*create_value)(const grpc_json* method_config_json),
+ void* (*ref_value)(void* value),
+ void (*unref_value)(grpc_exec_ctx* exec_ctx, void* value),
grpc_slice_hash_table_entry* entries, size_t* idx) {
// Construct value.
void* method_config = create_value(json);
@@ -182,15 +184,16 @@ static bool parse_json_method_config(
}
success = true;
done:
- unref_value(method_config);
+ unref_value(exec_ctx, method_config);
gpr_strvec_destroy(&paths);
return success;
}
grpc_slice_hash_table* grpc_service_config_create_method_config_table(
- const grpc_service_config* service_config,
+ grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config,
void* (*create_value)(const grpc_json* method_config_json),
- void* (*ref_value)(void* value), void (*unref_value)(void* value)) {
+ void* (*ref_value)(void* value),
+ void (*unref_value)(grpc_exec_ctx* exec_ctx, void* value)) {
const grpc_json* json = service_config->json_tree;
// Traverse parsed JSON tree.
if (json->type != GRPC_JSON_OBJECT || json->key != nullptr) return nullptr;
@@ -214,11 +217,11 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
size_t idx = 0;
for (grpc_json* method = field->child; method != nullptr;
method = method->next) {
- if (!parse_json_method_config(method, create_value, ref_value,
+ if (!parse_json_method_config(exec_ctx, method, create_value, ref_value,
unref_value, entries, &idx)) {
for (size_t i = 0; i < idx; ++i) {
- grpc_slice_unref_internal(entries[i].key);
- unref_value(entries[i].value);
+ grpc_slice_unref_internal(exec_ctx, entries[i].key);
+ unref_value(exec_ctx, entries[i].value);
}
gpr_free(entries);
return nullptr;
@@ -237,7 +240,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
return method_config_table;
}
-void* grpc_method_config_table_get(const grpc_slice_hash_table* table,
+void* grpc_method_config_table_get(grpc_exec_ctx* exec_ctx,
+ const grpc_slice_hash_table* table,
grpc_slice path) {
void* value = grpc_slice_hash_table_get(table, path);
// If we didn't find a match for the path, try looking for a wildcard
@@ -253,7 +257,7 @@ void* grpc_method_config_table_get(const grpc_slice_hash_table* table,
grpc_slice wildcard_path = grpc_slice_from_copied_string(buf);
gpr_free(buf);
value = grpc_slice_hash_table_get(table, wildcard_path);
- grpc_slice_unref_internal(wildcard_path);
+ grpc_slice_unref_internal(exec_ctx, wildcard_path);
gpr_free(path_str);
}
return value;
diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h
index 98554b9f0f..75a290bfd8 100644
--- a/src/core/lib/transport/service_config.h
+++ b/src/core/lib/transport/service_config.h
@@ -45,9 +45,10 @@ const char* grpc_service_config_get_lb_policy_name(
/// \a ref_value() and \a unref_value() are used to ref and unref values.
/// Returns NULL on error.
grpc_slice_hash_table* grpc_service_config_create_method_config_table(
- const grpc_service_config* service_config,
+ grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config,
void* (*create_value)(const grpc_json* method_config_json),
- void* (*ref_value)(void* value), void (*unref_value)(void* value));
+ void* (*ref_value)(void* value),
+ void (*unref_value)(grpc_exec_ctx* exec_ctx, void* value));
/// A helper function for looking up values in the table returned by
/// \a grpc_service_config_create_method_config_table().
@@ -55,7 +56,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
/// the form "/service/method".
/// Returns NULL if the method has no config.
/// Caller does NOT own a reference to the result.
-void* grpc_method_config_table_get(const grpc_slice_hash_table* table,
+void* grpc_method_config_table_get(grpc_exec_ctx* exec_ctx,
+ const grpc_slice_hash_table* table,
grpc_slice path);
#endif /* GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H */
diff --git a/src/core/lib/transport/static_metadata.cc b/src/core/lib/transport/static_metadata.cc
index 2213b30f56..844724cbeb 100644
--- a/src/core/lib/transport/static_metadata.cc
+++ b/src/core/lib/transport/static_metadata.cc
@@ -104,7 +104,7 @@ static uint8_t g_bytes[] = {
101, 44, 103, 122, 105, 112};
static void static_ref(void* unused) {}
-static void static_unref(void* unused) {}
+static void static_unref(grpc_exec_ctx* exec_ctx, void* unused) {}
static const grpc_slice_refcount_vtable static_sub_vtable = {
static_ref, static_unref, grpc_slice_default_eq_impl,
grpc_slice_default_hash_impl};
diff --git a/src/core/lib/transport/status_conversion.cc b/src/core/lib/transport/status_conversion.cc
index 46cba4292b..a0a5f1ba4b 100644
--- a/src/core/lib/transport/status_conversion.cc
+++ b/src/core/lib/transport/status_conversion.cc
@@ -37,7 +37,8 @@ grpc_http2_error_code grpc_status_to_http2_error(grpc_status_code status) {
}
}
-grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error,
+grpc_status_code grpc_http2_error_to_grpc_status(grpc_exec_ctx* exec_ctx,
+ grpc_http2_error_code error,
grpc_millis deadline) {
switch (error) {
case GRPC_HTTP2_NO_ERROR:
@@ -46,7 +47,7 @@ grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error,
case GRPC_HTTP2_CANCEL:
/* http2 cancel translates to STATUS_CANCELLED iff deadline hasn't been
* exceeded */
- return grpc_core::ExecCtx::Get()->Now() > deadline
+ return grpc_exec_ctx_now(exec_ctx) > deadline
? GRPC_STATUS_DEADLINE_EXCEEDED
: GRPC_STATUS_CANCELLED;
case GRPC_HTTP2_ENHANCE_YOUR_CALM:
diff --git a/src/core/lib/transport/status_conversion.h b/src/core/lib/transport/status_conversion.h
index 107eb92a53..3637b82801 100644
--- a/src/core/lib/transport/status_conversion.h
+++ b/src/core/lib/transport/status_conversion.h
@@ -25,7 +25,8 @@
/* Conversion of grpc status codes to http2 error codes (for RST_STREAM) */
grpc_http2_error_code grpc_status_to_http2_error(grpc_status_code status);
-grpc_status_code grpc_http2_error_to_grpc_status(grpc_http2_error_code error,
+grpc_status_code grpc_http2_error_to_grpc_status(grpc_exec_ctx* exec_ctx,
+ grpc_http2_error_code error,
grpc_millis deadline);
/* Conversion of HTTP status codes (:status) to grpc status codes */
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index 08aee04ac9..5bda1541a6 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -49,7 +49,8 @@ void grpc_stream_ref(grpc_stream_refcount* refcount) {
}
#ifndef NDEBUG
-void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) {
+void grpc_stream_unref(grpc_exec_ctx* exec_ctx, grpc_stream_refcount* refcount,
+ const char* reason) {
if (grpc_trace_stream_refcount.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s",
@@ -57,11 +58,11 @@ void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) {
val - 1, reason);
}
#else
-void grpc_stream_unref(grpc_stream_refcount* refcount) {
+void grpc_stream_unref(grpc_exec_ctx* exec_ctx,
+ grpc_stream_refcount* refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
- if (grpc_core::ExecCtx::Get()->flags() &
- GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
+ if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
/* Ick.
The thread we're running on MAY be owned (indirectly) by a call-stack.
If that's the case, destroying the call-stack MAY try to destroy the
@@ -72,7 +73,7 @@ void grpc_stream_unref(grpc_stream_refcount* refcount) {
refcount->destroy.scheduler =
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
}
- GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}
}
@@ -88,11 +89,11 @@ static void slice_stream_ref(void* p) {
#endif
}
-static void slice_stream_unref(void* p) {
+static void slice_stream_unref(grpc_exec_ctx* exec_ctx, void* p) {
#ifndef NDEBUG
- grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p), "slice");
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice");
#else
- grpc_stream_unref(STREAM_REF_FROM_SLICE_REF(p));
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p));
#endif
}
@@ -150,50 +151,59 @@ size_t grpc_transport_stream_size(grpc_transport* transport) {
return transport->vtable->sizeof_stream;
}
-void grpc_transport_destroy(grpc_transport* transport) {
- transport->vtable->destroy(transport);
+void grpc_transport_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport) {
+ transport->vtable->destroy(exec_ctx, transport);
}
-int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
+int grpc_transport_init_stream(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport, grpc_stream* stream,
grpc_stream_refcount* refcount,
const void* server_data, gpr_arena* arena) {
- return transport->vtable->init_stream(transport, stream, refcount,
+ return transport->vtable->init_stream(exec_ctx, transport, stream, refcount,
server_data, arena);
}
-void grpc_transport_perform_stream_op(grpc_transport* transport,
+void grpc_transport_perform_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport,
grpc_stream* stream,
grpc_transport_stream_op_batch* op) {
- transport->vtable->perform_stream_op(transport, stream, op);
+ transport->vtable->perform_stream_op(exec_ctx, transport, stream, op);
}
-void grpc_transport_perform_op(grpc_transport* transport,
+void grpc_transport_perform_op(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport,
grpc_transport_op* op) {
- transport->vtable->perform_op(transport, op);
+ transport->vtable->perform_op(exec_ctx, transport, op);
}
-void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
+void grpc_transport_set_pops(grpc_exec_ctx* exec_ctx, grpc_transport* transport,
+ grpc_stream* stream,
grpc_polling_entity* pollent) {
grpc_pollset* pollset;
grpc_pollset_set* pollset_set;
if ((pollset = grpc_polling_entity_pollset(pollent)) != nullptr) {
- transport->vtable->set_pollset(transport, stream, pollset);
+ transport->vtable->set_pollset(exec_ctx, transport, stream, pollset);
} else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) !=
nullptr) {
- transport->vtable->set_pollset_set(transport, stream, pollset_set);
+ transport->vtable->set_pollset_set(exec_ctx, transport, stream,
+ pollset_set);
} else {
abort();
}
}
-void grpc_transport_destroy_stream(grpc_transport* transport,
+void grpc_transport_destroy_stream(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport,
grpc_stream* stream,
grpc_closure* then_schedule_closure) {
- transport->vtable->destroy_stream(transport, stream, then_schedule_closure);
+ transport->vtable->destroy_stream(exec_ctx, transport, stream,
+ then_schedule_closure);
}
-grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
- return transport->vtable->get_endpoint(transport);
+grpc_endpoint* grpc_transport_get_endpoint(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport) {
+ return transport->vtable->get_endpoint(exec_ctx, transport);
}
// This comment should be sung to the tune of
@@ -204,23 +214,25 @@ grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
// though it lives in lib, it handles transport stream ops sure
// it's grpc_transport_stream_op_batch_finish_with_failure
void grpc_transport_stream_op_batch_finish_with_failure(
- grpc_transport_stream_op_batch* batch, grpc_error* error,
- grpc_call_combiner* call_combiner) {
+ grpc_exec_ctx* exec_ctx, grpc_transport_stream_op_batch* batch,
+ grpc_error* error, grpc_call_combiner* call_combiner) {
if (batch->send_message) {
- grpc_byte_stream_destroy(batch->payload->send_message.send_message);
+ grpc_byte_stream_destroy(exec_ctx,
+ batch->payload->send_message.send_message);
}
if (batch->recv_message) {
- GRPC_CALL_COMBINER_START(
- call_combiner, batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error), "failing recv_message_ready");
+ GRPC_CALL_COMBINER_START(exec_ctx, call_combiner,
+ batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error),
+ "failing recv_message_ready");
}
if (batch->recv_initial_metadata) {
GRPC_CALL_COMBINER_START(
- call_combiner,
+ exec_ctx, call_combiner,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
- GRPC_CLOSURE_SCHED(batch->on_complete, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
if (batch->cancel_stream) {
GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
@@ -232,9 +244,10 @@ typedef struct {
grpc_transport_op op;
} made_transport_op;
-static void destroy_made_transport_op(void* arg, grpc_error* error) {
+static void destroy_made_transport_op(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
made_transport_op* op = (made_transport_op*)arg;
- GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
gpr_free(op);
}
@@ -255,11 +268,12 @@ typedef struct {
grpc_transport_stream_op_batch_payload payload;
} made_transport_stream_op;
-static void destroy_made_transport_stream_op(void* arg, grpc_error* error) {
+static void destroy_made_transport_stream_op(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
made_transport_stream_op* op = (made_transport_stream_op*)arg;
grpc_closure* c = op->inner_on_complete;
gpr_free(op);
- GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_REF(error));
}
grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index b03c0218dc..b3cf04c22d 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -55,14 +55,15 @@ void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
grpc_iomgr_cb_func cb, void* cb_arg,
const char* object_type);
void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason);
-void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason);
+void grpc_stream_unref(grpc_exec_ctx* exec_ctx, grpc_stream_refcount* refcount,
+ const char* reason);
#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
#else
void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
grpc_iomgr_cb_func cb, void* cb_arg);
void grpc_stream_ref(grpc_stream_refcount* refcount);
-void grpc_stream_unref(grpc_stream_refcount* refcount);
+void grpc_stream_unref(grpc_exec_ctx* exec_ctx, grpc_stream_refcount* refcount);
#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
grpc_stream_ref_init(rc, ir, cb, cb_arg)
#endif
@@ -236,7 +237,8 @@ typedef struct grpc_transport_op {
If true, the callback is set to set_accept_stream_fn, with its
user_data argument set to set_accept_stream_user_data */
bool set_accept_stream;
- void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport,
+ void (*set_accept_stream_fn)(grpc_exec_ctx* exec_ctx, void* user_data,
+ grpc_transport* transport,
const void* server_data);
void* set_accept_stream_user_data;
/** add this transport to a pollset */
@@ -267,12 +269,13 @@ size_t grpc_transport_stream_size(grpc_transport* transport);
stream - a pointer to uninitialized memory to initialize
server_data - either NULL for a client initiated stream, or a pointer
supplied from the accept_stream callback function */
-int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
+int grpc_transport_init_stream(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport, grpc_stream* stream,
grpc_stream_refcount* refcount,
const void* server_data, gpr_arena* arena);
-void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
- grpc_polling_entity* pollent);
+void grpc_transport_set_pops(grpc_exec_ctx* exec_ctx, grpc_transport* transport,
+ grpc_stream* stream, grpc_polling_entity* pollent);
/* Destroy transport data for a stream.
@@ -284,13 +287,14 @@ void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
transport - the transport on which to create this stream
stream - the grpc_stream to destroy (memory is still owned by the
caller, but any child memory must be cleaned up) */
-void grpc_transport_destroy_stream(grpc_transport* transport,
+void grpc_transport_destroy_stream(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport,
grpc_stream* stream,
grpc_closure* then_schedule_closure);
void grpc_transport_stream_op_batch_finish_with_failure(
- grpc_transport_stream_op_batch* op, grpc_error* error,
- grpc_call_combiner* call_combiner);
+ grpc_exec_ctx* exec_ctx, grpc_transport_stream_op_batch* op,
+ grpc_error* error, grpc_call_combiner* call_combiner);
char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op);
char* grpc_transport_op_string(grpc_transport_op* op);
@@ -305,11 +309,13 @@ char* grpc_transport_op_string(grpc_transport_op* op);
non-NULL and previously initialized by the same transport.
op - a grpc_transport_stream_op_batch specifying the op to perform
*/
-void grpc_transport_perform_stream_op(grpc_transport* transport,
+void grpc_transport_perform_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport,
grpc_stream* stream,
grpc_transport_stream_op_batch* op);
-void grpc_transport_perform_op(grpc_transport* transport,
+void grpc_transport_perform_op(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport,
grpc_transport_op* op);
/* Send a ping on a transport
@@ -322,10 +328,11 @@ void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
grpc_slice debug_data);
/* Destroy the transport */
-void grpc_transport_destroy(grpc_transport* transport);
+void grpc_transport_destroy(grpc_exec_ctx* exec_ctx, grpc_transport* transport);
/* Get the endpoint used by \a transport */
-grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport);
+grpc_endpoint* grpc_transport_get_endpoint(grpc_exec_ctx* exec_ctx,
+ grpc_transport* transport);
/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
\a on_consumed and then delete the returned transport op */
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index 50b8a5f9b7..46be61427e 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -30,34 +30,37 @@ typedef struct grpc_transport_vtable {
const char* name;
/* implementation of grpc_transport_init_stream */
- int (*init_stream)(grpc_transport* self, grpc_stream* stream,
- grpc_stream_refcount* refcount, const void* server_data,
- gpr_arena* arena);
+ int (*init_stream)(grpc_exec_ctx* exec_ctx, grpc_transport* self,
+ grpc_stream* stream, grpc_stream_refcount* refcount,
+ const void* server_data, gpr_arena* arena);
/* implementation of grpc_transport_set_pollset */
- void (*set_pollset)(grpc_transport* self, grpc_stream* stream,
- grpc_pollset* pollset);
+ void (*set_pollset)(grpc_exec_ctx* exec_ctx, grpc_transport* self,
+ grpc_stream* stream, grpc_pollset* pollset);
/* implementation of grpc_transport_set_pollset */
- void (*set_pollset_set)(grpc_transport* self, grpc_stream* stream,
- grpc_pollset_set* pollset_set);
+ void (*set_pollset_set)(grpc_exec_ctx* exec_ctx, grpc_transport* self,
+ grpc_stream* stream, grpc_pollset_set* pollset_set);
/* implementation of grpc_transport_perform_stream_op */
- void (*perform_stream_op)(grpc_transport* self, grpc_stream* stream,
+ void (*perform_stream_op)(grpc_exec_ctx* exec_ctx, grpc_transport* self,
+ grpc_stream* stream,
grpc_transport_stream_op_batch* op);
/* implementation of grpc_transport_perform_op */
- void (*perform_op)(grpc_transport* self, grpc_transport_op* op);
+ void (*perform_op)(grpc_exec_ctx* exec_ctx, grpc_transport* self,
+ grpc_transport_op* op);
/* implementation of grpc_transport_destroy_stream */
- void (*destroy_stream)(grpc_transport* self, grpc_stream* stream,
+ void (*destroy_stream)(grpc_exec_ctx* exec_ctx, grpc_transport* self,
+ grpc_stream* stream,
grpc_closure* then_schedule_closure);
/* implementation of grpc_transport_destroy */
- void (*destroy)(grpc_transport* self);
+ void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_transport* self);
/* implementation of grpc_transport_get_endpoint */
- grpc_endpoint* (*get_endpoint)(grpc_transport* self);
+ grpc_endpoint* (*get_endpoint)(grpc_exec_ctx* exec_ctx, grpc_transport* self);
} grpc_transport_vtable;
/* an instance of a grpc transport */