diff options
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r-- | src/core/lib/channel/channel_args.cc | 16 | ||||
-rw-r--r-- | src/core/lib/channel/channel_args.h | 9 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.cc | 46 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 64 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack_builder.cc | 25 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack_builder.h | 11 | ||||
-rw-r--r-- | src/core/lib/channel/connected_channel.cc | 57 | ||||
-rw-r--r-- | src/core/lib/channel/connected_channel.h | 3 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker.cc | 77 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker.h | 33 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker_factory.cc | 12 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker_factory.h | 12 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker_registry.cc | 22 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker_registry.h | 5 |
14 files changed, 230 insertions, 162 deletions
diff --git a/src/core/lib/channel/channel_args.cc b/src/core/lib/channel/channel_args.cc index 578475b248..735fcbe405 100644 --- a/src/core/lib/channel/channel_args.cc +++ b/src/core/lib/channel/channel_args.cc @@ -188,7 +188,7 @@ grpc_channel_args* grpc_channel_args_normalize(const grpc_channel_args* a) { return b; } -void grpc_channel_args_destroy(grpc_channel_args* a) { +void grpc_channel_args_destroy(grpc_exec_ctx* exec_ctx, grpc_channel_args* a) { size_t i; if (!a) return; for (i = 0; i < a->num_args; i++) { @@ -199,7 +199,8 @@ void grpc_channel_args_destroy(grpc_channel_args* a) { case GRPC_ARG_INTEGER: break; case GRPC_ARG_POINTER: - a->args[i].value.pointer.vtable->destroy(a->args[i].value.pointer.p); + a->args[i].value.pointer.vtable->destroy(exec_ctx, + a->args[i].value.pointer.p); break; } gpr_free(a->args[i].key); @@ -298,7 +299,8 @@ static int find_stream_compression_algorithm_states_bitset( } grpc_channel_args* grpc_channel_args_compression_algorithm_set_state( - grpc_channel_args** a, grpc_compression_algorithm algorithm, int state) { + grpc_exec_ctx* exec_ctx, grpc_channel_args** a, + grpc_compression_algorithm algorithm, int state) { int* states_arg = nullptr; grpc_channel_args* result = *a; const int states_arg_found = @@ -331,15 +333,15 @@ grpc_channel_args* grpc_channel_args_compression_algorithm_set_state( GPR_BITCLEAR((unsigned*)&tmp.value.integer, algorithm); } result = grpc_channel_args_copy_and_add(*a, &tmp, 1); - grpc_channel_args_destroy(*a); + grpc_channel_args_destroy(exec_ctx, *a); *a = result; } return result; } grpc_channel_args* grpc_channel_args_stream_compression_algorithm_set_state( - grpc_channel_args** a, grpc_stream_compression_algorithm algorithm, - int state) { + grpc_exec_ctx* exec_ctx, grpc_channel_args** a, + grpc_stream_compression_algorithm algorithm, int state) { int* states_arg = nullptr; grpc_channel_args* result = *a; const int states_arg_found = @@ -373,7 +375,7 @@ grpc_channel_args* grpc_channel_args_stream_compression_algorithm_set_state( GPR_BITCLEAR((unsigned*)&tmp.value.integer, algorithm); } result = grpc_channel_args_copy_and_add(*a, &tmp, 1); - grpc_channel_args_destroy(*a); + grpc_channel_args_destroy(exec_ctx, *a); *a = result; } return result; diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 9c7d06f34e..f6cb7fa73d 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -53,7 +53,7 @@ grpc_channel_args* grpc_channel_args_union(const grpc_channel_args* a, const grpc_channel_args* b); /** Destroy arguments created by \a grpc_channel_args_copy */ -void grpc_channel_args_destroy(grpc_channel_args* a); +void grpc_channel_args_destroy(grpc_exec_ctx* exec_ctx, grpc_channel_args* a); /** Returns the compression algorithm set in \a a. */ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( @@ -85,7 +85,8 @@ grpc_channel_args* grpc_channel_args_set_stream_compression_algorithm( * modified to point to the returned instance (which may be different from the * input value of \a a). */ grpc_channel_args* grpc_channel_args_compression_algorithm_set_state( - grpc_channel_args** a, grpc_compression_algorithm algorithm, int enabled); + grpc_exec_ctx* exec_ctx, grpc_channel_args** a, + grpc_compression_algorithm algorithm, int enabled); /** Sets the support for the given stream compression algorithm. By default, all * stream compression algorithms are enabled. It's an error to disable an @@ -95,8 +96,8 @@ grpc_channel_args* grpc_channel_args_compression_algorithm_set_state( * modified to point to the returned instance (which may be different from the * input value of \a a). */ grpc_channel_args* grpc_channel_args_stream_compression_algorithm_set_state( - grpc_channel_args** a, grpc_stream_compression_algorithm algorithm, - int enabled); + grpc_exec_ctx* exec_ctx, grpc_channel_args** a, + grpc_stream_compression_algorithm algorithm, int enabled); /** Returns the bitset representing the support state (true for enabled, false * for disabled) for compression algorithms. diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc index 195fe0b195..7629d18789 100644 --- a/src/core/lib/channel/channel_stack.cc +++ b/src/core/lib/channel/channel_stack.cc @@ -88,8 +88,8 @@ grpc_call_element* grpc_call_stack_element(grpc_call_stack* call_stack, } grpc_error* grpc_channel_stack_init( - int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, - const grpc_channel_filter** filters, size_t filter_count, + grpc_exec_ctx* exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, + void* destroy_arg, const grpc_channel_filter** filters, size_t filter_count, const grpc_channel_args* channel_args, grpc_transport* optional_transport, const char* name, grpc_channel_stack* stack) { size_t call_size = @@ -117,7 +117,8 @@ grpc_error* grpc_channel_stack_init( args.is_last = i == (filter_count - 1); elems[i].filter = filters[i]; elems[i].channel_data = user_data; - grpc_error* error = elems[i].filter->init_channel_elem(&elems[i], &args); + grpc_error* error = + elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args); if (error != GRPC_ERROR_NONE) { if (first_error == GRPC_ERROR_NONE) { first_error = error; @@ -137,18 +138,20 @@ grpc_error* grpc_channel_stack_init( return first_error; } -void grpc_channel_stack_destroy(grpc_channel_stack* stack) { +void grpc_channel_stack_destroy(grpc_exec_ctx* exec_ctx, + grpc_channel_stack* stack) { grpc_channel_element* channel_elems = CHANNEL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ for (i = 0; i < count; i++) { - channel_elems[i].filter->destroy_channel_elem(&channel_elems[i]); + channel_elems[i].filter->destroy_channel_elem(exec_ctx, &channel_elems[i]); } } -grpc_error* grpc_call_stack_init(grpc_channel_stack* channel_stack, +grpc_error* grpc_call_stack_init(grpc_exec_ctx* exec_ctx, + grpc_channel_stack* channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, const grpc_call_element_args* elem_args) { @@ -171,8 +174,8 @@ grpc_error* grpc_call_stack_init(grpc_channel_stack* channel_stack, call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; - grpc_error* error = - call_elems[i].filter->init_call_elem(&call_elems[i], elem_args); + grpc_error* error = call_elems[i].filter->init_call_elem( + exec_ctx, &call_elems[i], elem_args); if (error != GRPC_ERROR_NONE) { if (first_error == GRPC_ERROR_NONE) { first_error = error; @@ -186,7 +189,8 @@ grpc_error* grpc_call_stack_init(grpc_channel_stack* channel_stack, return first_error; } -void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack, +void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_call_stack* call_stack, grpc_polling_entity* pollent) { size_t count = call_stack->count; grpc_call_element* call_elems; @@ -199,16 +203,18 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack, /* init per-filter data */ for (i = 0; i < count; i++) { - call_elems[i].filter->set_pollset_or_pollset_set(&call_elems[i], pollent); + call_elems[i].filter->set_pollset_or_pollset_set(exec_ctx, &call_elems[i], + pollent); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); } } void grpc_call_stack_ignore_set_pollset_or_pollset_set( - grpc_call_element* elem, grpc_polling_entity* pollent) {} + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_polling_entity* pollent) {} -void grpc_call_stack_destroy(grpc_call_stack* stack, +void grpc_call_stack_destroy(grpc_exec_ctx* exec_ctx, grpc_call_stack* stack, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure) { grpc_call_element* elems = CALL_ELEMS_FROM_STACK(stack); @@ -218,27 +224,29 @@ void grpc_call_stack_destroy(grpc_call_stack* stack, /* destroy per-filter data */ for (i = 0; i < count; i++) { elems[i].filter->destroy_call_elem( - &elems[i], final_info, + exec_ctx, &elems[i], final_info, i == count - 1 ? then_schedule_closure : nullptr); } } -void grpc_call_next_op(grpc_call_element* elem, +void grpc_call_next_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { grpc_call_element* next_elem = elem + 1; GRPC_CALL_LOG_OP(GPR_INFO, next_elem, op); - next_elem->filter->start_transport_stream_op_batch(next_elem, op); + next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op); } -void grpc_channel_next_get_info(grpc_channel_element* elem, +void grpc_channel_next_get_info(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, const grpc_channel_info* channel_info) { grpc_channel_element* next_elem = elem + 1; - next_elem->filter->get_channel_info(next_elem, channel_info); + next_elem->filter->get_channel_info(exec_ctx, next_elem, channel_info); } -void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op) { +void grpc_channel_next_op(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, + grpc_transport_op* op) { grpc_channel_element* next_elem = elem + 1; - next_elem->filter->start_transport_op(next_elem, op); + next_elem->filter->start_transport_op(exec_ctx, next_elem, op); } grpc_channel_stack* grpc_channel_stack_from_top_element( diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 716866be26..1b6e5396a5 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -96,12 +96,14 @@ typedef struct { typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*start_transport_stream_op_batch)(grpc_call_element* elem, + void (*start_transport_stream_op_batch)(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_transport_stream_op_batch* op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ - void (*start_transport_op)(grpc_channel_element* elem, grpc_transport_op* op); + void (*start_transport_op)(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_transport_op* op); /* sizeof(per call data) */ size_t sizeof_call_data; @@ -114,9 +116,11 @@ typedef struct { transport and is on the server. Most filters want to ignore this argument. Implementations may assume that elem->call_data is all zeros. */ - grpc_error* (*init_call_elem)(grpc_call_element* elem, + grpc_error* (*init_call_elem)(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args); - void (*set_pollset_or_pollset_set)(grpc_call_element* elem, + void (*set_pollset_or_pollset_set)(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_polling_entity* pollent); /* Destroy per call data. The filter does not need to do any chaining. @@ -124,7 +128,7 @@ typedef struct { \a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when destruction is complete. \a final_info contains data about the completed call, mainly for reporting purposes. */ - void (*destroy_call_elem)(grpc_call_element* elem, + void (*destroy_call_elem)(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure); @@ -137,14 +141,16 @@ typedef struct { useful for asserting correct configuration by upper layer code. The filter does not need to do any chaining. Implementations may assume that elem->call_data is all zeros. */ - grpc_error* (*init_channel_elem)(grpc_channel_element* elem, + grpc_error* (*init_channel_elem)(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args); /* Destroy per channel data. The filter does not need to do any chaining */ - void (*destroy_channel_elem)(grpc_channel_element* elem); + void (*destroy_channel_elem)(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem); /* Implement grpc_channel_get_info() */ - void (*get_channel_info)(grpc_channel_element* elem, + void (*get_channel_info)(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, const grpc_channel_info* channel_info); /* The name of this filter */ @@ -202,62 +208,68 @@ size_t grpc_channel_stack_size(const grpc_channel_filter** filters, size_t filter_count); /* Initialize a channel stack given some filters */ grpc_error* grpc_channel_stack_init( - int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, - const grpc_channel_filter** filters, size_t filter_count, + grpc_exec_ctx* exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy, + void* destroy_arg, const grpc_channel_filter** filters, size_t filter_count, const grpc_channel_args* args, grpc_transport* optional_transport, const char* name, grpc_channel_stack* stack); /* Destroy a channel stack */ -void grpc_channel_stack_destroy(grpc_channel_stack* stack); +void grpc_channel_stack_destroy(grpc_exec_ctx* exec_ctx, + grpc_channel_stack* stack); /* Initialize a call stack given a channel stack. transport_server_data is expected to be NULL on a client, or an opaque transport owned pointer on the server. */ -grpc_error* grpc_call_stack_init(grpc_channel_stack* channel_stack, +grpc_error* grpc_call_stack_init(grpc_exec_ctx* exec_ctx, + grpc_channel_stack* channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg, const grpc_call_element_args* elem_args); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ -void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack, +void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_call_stack* call_stack, grpc_polling_entity* pollent); #ifndef NDEBUG #define GRPC_CALL_STACK_REF(call_stack, reason) \ grpc_stream_ref(&(call_stack)->refcount, reason) -#define GRPC_CALL_STACK_UNREF(call_stack, reason) \ - grpc_stream_unref(&(call_stack)->refcount, reason) +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ + grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason) #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ grpc_stream_ref(&(channel_stack)->refcount, reason) -#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \ - grpc_stream_unref(&(channel_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason) #else #define GRPC_CALL_STACK_REF(call_stack, reason) \ grpc_stream_ref(&(call_stack)->refcount) -#define GRPC_CALL_STACK_UNREF(call_stack, reason) \ - grpc_stream_unref(&(call_stack)->refcount) +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ + grpc_stream_unref(exec_ctx, &(call_stack)->refcount) #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ grpc_stream_ref(&(channel_stack)->refcount) -#define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \ - grpc_stream_unref(&(channel_stack)->refcount) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount) #endif /* Destroy a call stack */ -void grpc_call_stack_destroy(grpc_call_stack* stack, +void grpc_call_stack_destroy(grpc_exec_ctx* exec_ctx, grpc_call_stack* stack, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure); /* Ignore set pollset{_set} - used by filters if they don't care about pollsets * at all. Does nothing. */ void grpc_call_stack_ignore_set_pollset_or_pollset_set( - grpc_call_element* elem, grpc_polling_entity* pollent); + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_polling_entity* pollent); /* Call the next operation in a call stack */ -void grpc_call_next_op(grpc_call_element* elem, +void grpc_call_next_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op); /* Call the next operation (depending on call directionality) in a channel stack */ -void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op); +void grpc_channel_next_op(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, + grpc_transport_op* op); /* Pass through a request to get_channel_info() to the next child element */ -void grpc_channel_next_get_info(grpc_channel_element* elem, +void grpc_channel_next_get_info(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, const grpc_channel_info* channel_info); /* Given the top element of a channel stack, get the channel stack itself */ diff --git a/src/core/lib/channel/channel_stack_builder.cc b/src/core/lib/channel/channel_stack_builder.cc index fcba826644..77b7854f94 100644 --- a/src/core/lib/channel/channel_stack_builder.cc +++ b/src/core/lib/channel/channel_stack_builder.cc @@ -150,9 +150,10 @@ void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder* builder, } void grpc_channel_stack_builder_set_channel_arguments( - grpc_channel_stack_builder* builder, const grpc_channel_args* args) { + grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, + const grpc_channel_args* args) { if (builder->args != nullptr) { - grpc_channel_args_destroy(builder->args); + grpc_channel_args_destroy(exec_ctx, builder->args); } builder->args = grpc_channel_args_copy(args); } @@ -240,7 +241,8 @@ bool grpc_channel_stack_builder_add_filter_after( return true; } -void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder* builder) { +void grpc_channel_stack_builder_destroy(grpc_exec_ctx* exec_ctx, + grpc_channel_stack_builder* builder) { filter_node* p = builder->begin.next; while (p != &builder->end) { filter_node* next = p->next; @@ -248,15 +250,16 @@ void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder* builder) { p = next; } if (builder->args != nullptr) { - grpc_channel_args_destroy(builder->args); + grpc_channel_args_destroy(exec_ctx, builder->args); } gpr_free(builder->target); gpr_free(builder); } grpc_error* grpc_channel_stack_builder_finish( - grpc_channel_stack_builder* builder, size_t prefix_bytes, int initial_refs, - grpc_iomgr_cb_func destroy, void* destroy_arg, void** result) { + grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, + size_t prefix_bytes, int initial_refs, grpc_iomgr_cb_func destroy, + void* destroy_arg, void** result) { // count the number of filters size_t num_filters = 0; for (filter_node* p = builder->begin.next; p != &builder->end; p = p->next) { @@ -281,12 +284,12 @@ grpc_error* grpc_channel_stack_builder_finish( (grpc_channel_stack*)((char*)(*result) + prefix_bytes); // and initialize it grpc_error* error = grpc_channel_stack_init( - initial_refs, destroy, destroy_arg == nullptr ? *result : destroy_arg, - filters, num_filters, builder->args, builder->transport, builder->name, - channel_stack); + exec_ctx, initial_refs, destroy, + destroy_arg == nullptr ? *result : destroy_arg, filters, num_filters, + builder->args, builder->transport, builder->name, channel_stack); if (error != GRPC_ERROR_NONE) { - grpc_channel_stack_destroy(channel_stack); + grpc_channel_stack_destroy(exec_ctx, channel_stack); gpr_free(*result); *result = nullptr; } else { @@ -302,7 +305,7 @@ grpc_error* grpc_channel_stack_builder_finish( } } - grpc_channel_stack_builder_destroy(builder); + grpc_channel_stack_builder_destroy(exec_ctx, builder); gpr_free((grpc_channel_filter**)filters); return error; diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index d00ddc698c..10019542b1 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -54,7 +54,8 @@ grpc_transport* grpc_channel_stack_builder_get_transport( /// Set channel arguments: copies args void grpc_channel_stack_builder_set_channel_arguments( - grpc_channel_stack_builder* builder, const grpc_channel_args* args); + grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, + const grpc_channel_args* args); /// Return a borrowed pointer to the channel arguments const grpc_channel_args* grpc_channel_stack_builder_get_channel_arguments( @@ -147,11 +148,13 @@ void grpc_channel_stack_builder_iterator_destroy( /// \a initial_refs, \a destroy, \a destroy_arg are as per /// grpc_channel_stack_init grpc_error* grpc_channel_stack_builder_finish( - grpc_channel_stack_builder* builder, size_t prefix_bytes, int initial_refs, - grpc_iomgr_cb_func destroy, void* destroy_arg, void** result); + grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, + size_t prefix_bytes, int initial_refs, grpc_iomgr_cb_func destroy, + void* destroy_arg, void** result); /// Destroy the builder without creating a channel stack -void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder* builder); +void grpc_channel_stack_builder_destroy(grpc_exec_ctx* exec_ctx, + grpc_channel_stack_builder* builder); extern grpc_core::TraceFlag grpc_trace_channel_stack_builder; diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index 9d07cfff4e..af2f88ab2e 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -51,14 +51,17 @@ typedef struct connected_channel_call_data { callback_state recv_message_ready; } call_data; -static void run_in_call_combiner(void* arg, grpc_error* error) { +static void run_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { callback_state* state = (callback_state*)arg; - GRPC_CALL_COMBINER_START(state->call_combiner, state->original_closure, - GRPC_ERROR_REF(error), state->reason); + GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner, + state->original_closure, GRPC_ERROR_REF(error), + state->reason); } -static void run_cancel_in_call_combiner(void* arg, grpc_error* error) { - run_in_call_combiner(arg, error); +static void run_cancel_in_call_combiner(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + run_in_call_combiner(exec_ctx, arg, error); gpr_free(arg); } @@ -95,7 +98,8 @@ static callback_state* get_state_for_batch( /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ static void con_start_transport_stream_op_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; if (batch->recv_initial_metadata) { @@ -122,52 +126,58 @@ static void con_start_transport_stream_op_batch( callback_state* state = get_state_for_batch(calld, batch); intercept_callback(calld, state, false, "on_complete", &batch->on_complete); } - grpc_transport_perform_stream_op( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), batch); - GRPC_CALL_COMBINER_STOP(calld->call_combiner, "passed batch to transport"); + grpc_transport_perform_stream_op(exec_ctx, chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + batch); + GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, + "passed batch to transport"); } -static void con_start_transport_op(grpc_channel_element* elem, +static void con_start_transport_op(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_transport_op* op) { channel_data* chand = (channel_data*)elem->channel_data; - grpc_transport_perform_op(chand->transport, op); + grpc_transport_perform_op(exec_ctx, chand->transport, op); } /* Constructor for call_data */ -static grpc_error* init_call_elem(grpc_call_element* elem, +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, const grpc_call_element_args* args) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; calld->call_combiner = args->call_combiner; int r = grpc_transport_init_stream( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), + exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount, args->server_transport_data, args->arena); return r == 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING( "transport stream initialization failed"); } -static void set_pollset_or_pollset_set(grpc_call_element* elem, +static void set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_polling_entity* pollent) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - grpc_transport_set_pops(chand->transport, + grpc_transport_set_pops(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollent); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element* elem, +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* then_schedule_closure) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - grpc_transport_destroy_stream(chand->transport, + grpc_transport_destroy_stream(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), then_schedule_closure); } /* Constructor for channel_data */ -static grpc_error* init_channel_elem(grpc_channel_element* elem, +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, grpc_channel_element_args* args) { channel_data* cd = (channel_data*)elem->channel_data; GPR_ASSERT(args->is_last); @@ -176,15 +186,17 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem, } /* Destructor for channel_data */ -static void destroy_channel_elem(grpc_channel_element* elem) { +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) { channel_data* cd = (channel_data*)elem->channel_data; if (cd->transport) { - grpc_transport_destroy(cd->transport); + grpc_transport_destroy(exec_ctx, cd->transport); } } /* No-op. */ -static void con_get_channel_info(grpc_channel_element* elem, +static void con_get_channel_info(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, const grpc_channel_info* channel_info) {} const grpc_channel_filter grpc_connected_filter = { @@ -218,7 +230,8 @@ static void bind_transport(grpc_channel_stack* channel_stack, grpc_transport_stream_size((grpc_transport*)t); } -bool grpc_add_connected_filter(grpc_channel_stack_builder* builder, +bool grpc_add_connected_filter(grpc_exec_ctx* exec_ctx, + grpc_channel_stack_builder* builder, void* arg_must_be_null) { GPR_ASSERT(arg_must_be_null == nullptr); grpc_transport* t = grpc_channel_stack_builder_get_transport(builder); diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h index 91de8022db..cab8aad154 100644 --- a/src/core/lib/channel/connected_channel.h +++ b/src/core/lib/channel/connected_channel.h @@ -23,7 +23,8 @@ extern const grpc_channel_filter grpc_connected_filter; -bool grpc_add_connected_filter(grpc_channel_stack_builder* builder, +bool grpc_add_connected_filter(grpc_exec_ctx* exec_ctx, + grpc_channel_stack_builder* builder, void* arg_must_be_null); /* Debug helper to dig the transport stream out of a call element */ diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc index dcb149c03e..58c30b165b 100644 --- a/src/core/lib/channel/handshaker.cc +++ b/src/core/lib/channel/handshaker.cc @@ -34,20 +34,23 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, handshaker->vtable = vtable; } -void grpc_handshaker_destroy(grpc_handshaker* handshaker) { - handshaker->vtable->destroy(handshaker); +void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker) { + handshaker->vtable->destroy(exec_ctx, handshaker); } -void grpc_handshaker_shutdown(grpc_handshaker* handshaker, grpc_error* why) { - handshaker->vtable->shutdown(handshaker, why); +void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_error* why) { + handshaker->vtable->shutdown(exec_ctx, handshaker, why); } -void grpc_handshaker_do_handshake(grpc_handshaker* handshaker, +void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args) { - handshaker->vtable->do_handshake(handshaker, acceptor, on_handshake_done, - args); + handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor, + on_handshake_done, args); } // @@ -113,9 +116,9 @@ void grpc_handshake_manager_pending_list_remove(grpc_handshake_manager** head, } void grpc_handshake_manager_pending_list_shutdown_all( - grpc_handshake_manager* head, grpc_error* why) { + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* head, grpc_error* why) { while (head != nullptr) { - grpc_handshake_manager_shutdown(head, GRPC_ERROR_REF(why)); + grpc_handshake_manager_shutdown(exec_ctx, head, GRPC_ERROR_REF(why)); head = head->next; } GRPC_ERROR_UNREF(why); @@ -142,10 +145,11 @@ void grpc_handshake_manager_add(grpc_handshake_manager* mgr, gpr_mu_unlock(&mgr->mu); } -static void grpc_handshake_manager_unref(grpc_handshake_manager* mgr) { +static void grpc_handshake_manager_unref(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr) { if (gpr_unref(&mgr->refs)) { for (size_t i = 0; i < mgr->count; ++i) { - grpc_handshaker_destroy(mgr->handshakers[i]); + grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]); } gpr_free(mgr->handshakers); gpr_mu_destroy(&mgr->mu); @@ -153,17 +157,19 @@ static void grpc_handshake_manager_unref(grpc_handshake_manager* mgr) { } } -void grpc_handshake_manager_destroy(grpc_handshake_manager* mgr) { - grpc_handshake_manager_unref(mgr); +void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr) { + grpc_handshake_manager_unref(exec_ctx, mgr); } -void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr, +void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr, grpc_error* why) { gpr_mu_lock(&mgr->mu); // Shutdown the handshaker that's currently in progress, if any. if (!mgr->shutdown && mgr->index > 0) { mgr->shutdown = true; - grpc_handshaker_shutdown(mgr->handshakers[mgr->index - 1], + grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1], GRPC_ERROR_REF(why)); } gpr_mu_unlock(&mgr->mu); @@ -173,7 +179,8 @@ void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr, // Helper function to call either the next handshaker or the // on_handshake_done callback. // Returns true if we've scheduled the on_handshake_done callback. -static bool call_next_handshaker_locked(grpc_handshake_manager* mgr, +static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr, grpc_error* error) { GPR_ASSERT(mgr->index <= mgr->count); // If we got an error or we've been shut down or we're exiting early or @@ -183,12 +190,13 @@ static bool call_next_handshaker_locked(grpc_handshake_manager* mgr, mgr->index == mgr->count) { // Cancel deadline timer, since we're invoking the on_handshake_done // callback now. - grpc_timer_cancel(&mgr->deadline_timer); - GRPC_CLOSURE_SCHED(&mgr->on_handshake_done, error); + grpc_timer_cancel(exec_ctx, &mgr->deadline_timer); + GRPC_CLOSURE_SCHED(exec_ctx, &mgr->on_handshake_done, error); mgr->shutdown = true; } else { - grpc_handshaker_do_handshake(mgr->handshakers[mgr->index], mgr->acceptor, - &mgr->call_next_handshaker, &mgr->args); + grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index], + mgr->acceptor, &mgr->call_next_handshaker, + &mgr->args); } ++mgr->index; return mgr->shutdown; @@ -196,34 +204,37 @@ static bool call_next_handshaker_locked(grpc_handshake_manager* mgr, // A function used as the handshaker-done callback when chaining // handshakers together. -static void call_next_handshaker(void* arg, grpc_error* error) { +static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { grpc_handshake_manager* mgr = (grpc_handshake_manager*)arg; gpr_mu_lock(&mgr->mu); - bool done = call_next_handshaker_locked(mgr, GRPC_ERROR_REF(error)); + bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_REF(error)); gpr_mu_unlock(&mgr->mu); // If we're invoked the final callback, we won't be coming back // to this function, so we can release our reference to the // handshake manager. if (done) { - grpc_handshake_manager_unref(mgr); + grpc_handshake_manager_unref(exec_ctx, mgr); } } // Callback invoked when deadline is exceeded. -static void on_timeout(void* arg, grpc_error* error) { +static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_handshake_manager* mgr = (grpc_handshake_manager*)arg; if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. grpc_handshake_manager_shutdown( - mgr, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake timed out")); + exec_ctx, mgr, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Handshake timed out")); } - grpc_handshake_manager_unref(mgr); + grpc_handshake_manager_unref(exec_ctx, mgr); } void grpc_handshake_manager_do_handshake( - grpc_handshake_manager* mgr, grpc_pollset_set* interested_parties, - grpc_endpoint* endpoint, const grpc_channel_args* channel_args, - grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, - grpc_iomgr_cb_func on_handshake_done, void* user_data) { + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, + grpc_pollset_set* interested_parties, grpc_endpoint* endpoint, + const grpc_channel_args* channel_args, grpc_millis deadline, + grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, + void* user_data) { gpr_mu_lock(&mgr->mu); GPR_ASSERT(mgr->index == 0); GPR_ASSERT(!mgr->shutdown); @@ -246,12 +257,12 @@ void grpc_handshake_manager_do_handshake( gpr_ref(&mgr->refs); GRPC_CLOSURE_INIT(&mgr->on_timeout, on_timeout, mgr, grpc_schedule_on_exec_ctx); - grpc_timer_init(&mgr->deadline_timer, deadline, &mgr->on_timeout); + grpc_timer_init(exec_ctx, &mgr->deadline_timer, deadline, &mgr->on_timeout); // Start first handshaker, which also owns a ref. gpr_ref(&mgr->refs); - bool done = call_next_handshaker_locked(mgr, GRPC_ERROR_NONE); + bool done = call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE); gpr_mu_unlock(&mgr->mu); if (done) { - grpc_handshake_manager_unref(mgr); + grpc_handshake_manager_unref(exec_ctx, mgr); } } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 68e5463123..8e4114dc64 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -68,17 +68,18 @@ typedef struct { typedef struct { /// Destroys the handshaker. - void (*destroy)(grpc_handshaker* handshaker); + void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker); /// Shuts down the handshaker (e.g., to clean up when the operation is /// aborted in the middle). - void (*shutdown)(grpc_handshaker* handshaker, grpc_error* why); + void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, + grpc_error* why); /// Performs handshaking, modifying \a args as needed (e.g., to /// replace \a endpoint with a wrapped endpoint). /// When finished, invokes \a on_handshake_done. /// \a acceptor will be NULL for client-side handshakers. - void (*do_handshake)(grpc_handshaker* handshaker, + void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args); @@ -94,9 +95,12 @@ struct grpc_handshaker { void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, grpc_handshaker* handshaker); -void grpc_handshaker_destroy(grpc_handshaker* handshaker); -void grpc_handshaker_shutdown(grpc_handshaker* handshaker, grpc_error* why); -void grpc_handshaker_do_handshake(grpc_handshaker* handshaker, +void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker); +void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_error* why); +void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, + grpc_handshaker* handshaker, grpc_tcp_server_acceptor* acceptor, grpc_closure* on_handshake_done, grpc_handshaker_args* args); @@ -116,13 +120,15 @@ void grpc_handshake_manager_add(grpc_handshake_manager* mgr, grpc_handshaker* handshaker); /// Destroys the handshake manager. -void grpc_handshake_manager_destroy(grpc_handshake_manager* mgr); +void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr); /// Shuts down the handshake manager (e.g., to clean up when the operation is /// aborted in the middle). /// The caller must still call grpc_handshake_manager_destroy() after /// calling this function. -void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr, +void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, + grpc_handshake_manager* mgr, grpc_error* why); /// Invokes handshakers in the order they were added. @@ -140,10 +146,11 @@ void grpc_handshake_manager_shutdown(grpc_handshake_manager* mgr, /// the necessary clean-up. Otherwise, the callback takes ownership of /// the arguments. void grpc_handshake_manager_do_handshake( - grpc_handshake_manager* mgr, grpc_pollset_set* interested_parties, - grpc_endpoint* endpoint, const grpc_channel_args* channel_args, - grpc_millis deadline, grpc_tcp_server_acceptor* acceptor, - grpc_iomgr_cb_func on_handshake_done, void* user_data); + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, + grpc_pollset_set* interested_parties, grpc_endpoint* endpoint, + const grpc_channel_args* channel_args, grpc_millis deadline, + grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, + void* user_data); /// Add \a mgr to the server side list of all pending handshake managers, the /// list starts with \a *head. @@ -159,6 +166,6 @@ void grpc_handshake_manager_pending_list_remove(grpc_handshake_manager** head, /// Shutdown all pending handshake managers on the server side. // Not thread-safe. Caller needs to synchronize. void grpc_handshake_manager_pending_list_shutdown_all( - grpc_handshake_manager* head, grpc_error* why); + grpc_exec_ctx* exec_ctx, grpc_handshake_manager* head, grpc_error* why); #endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */ diff --git a/src/core/lib/channel/handshaker_factory.cc b/src/core/lib/channel/handshaker_factory.cc index 2380d98300..015006ade0 100644 --- a/src/core/lib/channel/handshaker_factory.cc +++ b/src/core/lib/channel/handshaker_factory.cc @@ -21,19 +21,19 @@ #include <grpc/support/log.h> void grpc_handshaker_factory_add_handshakers( - grpc_handshaker_factory* handshaker_factory, const grpc_channel_args* args, - grpc_handshake_manager* handshake_mgr) { + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* handshaker_factory, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { if (handshaker_factory != nullptr) { GPR_ASSERT(handshaker_factory->vtable != nullptr); - handshaker_factory->vtable->add_handshakers(handshaker_factory, args, - handshake_mgr); + handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory, + args, handshake_mgr); } } void grpc_handshaker_factory_destroy( - grpc_handshaker_factory* handshaker_factory) { + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* handshaker_factory) { if (handshaker_factory != nullptr) { GPR_ASSERT(handshaker_factory->vtable != nullptr); - handshaker_factory->vtable->destroy(handshaker_factory); + handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory); } } diff --git a/src/core/lib/channel/handshaker_factory.h b/src/core/lib/channel/handshaker_factory.h index 8a7c0157e8..ca7c26b50a 100644 --- a/src/core/lib/channel/handshaker_factory.h +++ b/src/core/lib/channel/handshaker_factory.h @@ -29,10 +29,12 @@ typedef struct grpc_handshaker_factory grpc_handshaker_factory; typedef struct { - void (*add_handshakers)(grpc_handshaker_factory* handshaker_factory, + void (*add_handshakers)(grpc_exec_ctx* exec_ctx, + grpc_handshaker_factory* handshaker_factory, const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr); - void (*destroy)(grpc_handshaker_factory* handshaker_factory); + void (*destroy)(grpc_exec_ctx* exec_ctx, + grpc_handshaker_factory* handshaker_factory); } grpc_handshaker_factory_vtable; struct grpc_handshaker_factory { @@ -40,10 +42,10 @@ struct grpc_handshaker_factory { }; void grpc_handshaker_factory_add_handshakers( - grpc_handshaker_factory* handshaker_factory, const grpc_channel_args* args, - grpc_handshake_manager* handshake_mgr); + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* handshaker_factory, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr); void grpc_handshaker_factory_destroy( - grpc_handshaker_factory* handshaker_factory); + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory* handshaker_factory); #endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_FACTORY_H */ diff --git a/src/core/lib/channel/handshaker_registry.cc b/src/core/lib/channel/handshaker_registry.cc index 098eabf084..c6bc87d704 100644 --- a/src/core/lib/channel/handshaker_registry.cc +++ b/src/core/lib/channel/handshaker_registry.cc @@ -47,17 +47,18 @@ static void grpc_handshaker_factory_list_register( } static void grpc_handshaker_factory_list_add_handshakers( - grpc_handshaker_factory_list* list, const grpc_channel_args* args, - grpc_handshake_manager* handshake_mgr) { + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory_list* list, + const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { for (size_t i = 0; i < list->num_factories; ++i) { - grpc_handshaker_factory_add_handshakers(list->list[i], args, handshake_mgr); + grpc_handshaker_factory_add_handshakers(exec_ctx, list->list[i], args, + handshake_mgr); } } static void grpc_handshaker_factory_list_destroy( - grpc_handshaker_factory_list* list) { + grpc_exec_ctx* exec_ctx, grpc_handshaker_factory_list* list) { for (size_t i = 0; i < list->num_factories; ++i) { - grpc_handshaker_factory_destroy(list->list[i]); + grpc_handshaker_factory_destroy(exec_ctx, list->list[i]); } gpr_free(list->list); } @@ -73,9 +74,10 @@ void grpc_handshaker_factory_registry_init() { memset(g_handshaker_factory_lists, 0, sizeof(g_handshaker_factory_lists)); } -void grpc_handshaker_factory_registry_shutdown() { +void grpc_handshaker_factory_registry_shutdown(grpc_exec_ctx* exec_ctx) { for (size_t i = 0; i < NUM_HANDSHAKER_TYPES; ++i) { - grpc_handshaker_factory_list_destroy(&g_handshaker_factory_lists[i]); + grpc_handshaker_factory_list_destroy(exec_ctx, + &g_handshaker_factory_lists[i]); } } @@ -86,9 +88,11 @@ void grpc_handshaker_factory_register(bool at_start, &g_handshaker_factory_lists[handshaker_type], at_start, factory); } -void grpc_handshakers_add(grpc_handshaker_type handshaker_type, +void grpc_handshakers_add(grpc_exec_ctx* exec_ctx, + grpc_handshaker_type handshaker_type, const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr) { grpc_handshaker_factory_list_add_handshakers( - &g_handshaker_factory_lists[handshaker_type], args, handshake_mgr); + exec_ctx, &g_handshaker_factory_lists[handshaker_type], args, + handshake_mgr); } diff --git a/src/core/lib/channel/handshaker_registry.h b/src/core/lib/channel/handshaker_registry.h index 0b05531b7e..a3b2ac1dc7 100644 --- a/src/core/lib/channel/handshaker_registry.h +++ b/src/core/lib/channel/handshaker_registry.h @@ -31,7 +31,7 @@ typedef enum { } grpc_handshaker_type; void grpc_handshaker_factory_registry_init(); -void grpc_handshaker_factory_registry_shutdown(); +void grpc_handshaker_factory_registry_shutdown(grpc_exec_ctx* exec_ctx); /// Registers a new handshaker factory. Takes ownership. /// If \a at_start is true, the new handshaker will be at the beginning of @@ -40,7 +40,8 @@ void grpc_handshaker_factory_register(bool at_start, grpc_handshaker_type handshaker_type, grpc_handshaker_factory* factory); -void grpc_handshakers_add(grpc_handshaker_type handshaker_type, +void grpc_handshakers_add(grpc_exec_ctx* exec_ctx, + grpc_handshaker_type handshaker_type, const grpc_channel_args* args, grpc_handshake_manager* handshake_mgr); |