diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/alarm.c | 4 | ||||
-rw-r--r-- | src/core/lib/surface/api_trace.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/api_trace.h | 4 | ||||
-rw-r--r-- | src/core/lib/surface/byte_buffer_reader.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 395 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 6 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 15 | ||||
-rw-r--r-- | src/core/lib/surface/channel_init.c | 21 | ||||
-rw-r--r-- | src/core/lib/surface/channel_stack_type.c | 18 | ||||
-rw-r--r-- | src/core/lib/surface/channel_stack_type.h | 2 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 333 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 31 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue_factory.c | 33 | ||||
-rw-r--r-- | src/core/lib/surface/init.c | 57 | ||||
-rw-r--r-- | src/core/lib/surface/init_secure.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.cc (renamed from src/core/lib/surface/lame_client.c) | 81 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 121 | ||||
-rw-r--r-- | src/core/lib/surface/server.h | 3 | ||||
-rw-r--r-- | src/core/lib/surface/version.c | 4 |
19 files changed, 703 insertions, 432 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index e71c0ebfc5..b72d534b7e 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -81,7 +81,9 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { } void grpc_alarm_destroy(grpc_alarm *alarm) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_alarm_cancel(alarm); - GRPC_CQ_INTERNAL_UNREF(alarm->cq, "alarm"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm"); gpr_free(alarm); + grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/surface/api_trace.c b/src/core/lib/surface/api_trace.c index 79e3e5ca9b..d8941cdf42 100644 --- a/src/core/lib/surface/api_trace.c +++ b/src/core/lib/surface/api_trace.c @@ -32,5 +32,6 @@ */ #include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/debug/trace.h" -int grpc_api_trace = 0; +grpc_tracer_flag grpc_api_trace = GRPC_TRACER_INITIALIZER(false); diff --git a/src/core/lib/surface/api_trace.h b/src/core/lib/surface/api_trace.h index c60aaba5e9..d4fbc8d90d 100644 --- a/src/core/lib/surface/api_trace.h +++ b/src/core/lib/surface/api_trace.h @@ -37,7 +37,7 @@ #include <grpc/support/log.h> #include "src/core/lib/debug/trace.h" -extern int grpc_api_trace; +extern grpc_tracer_flag grpc_api_trace; /* Provide unwrapping macros because we're in C89 and variadic macros weren't introduced until C99... */ @@ -58,7 +58,7 @@ extern int grpc_api_trace; /* Due to the limitations of C89's preprocessor, the arity of the var-arg list 'nargs' must be specified. */ #define GRPC_API_TRACE(fmt, nargs, args) \ - if (grpc_api_trace) { \ + if (GRPC_TRACER_ON(grpc_api_trace)) { \ gpr_log(GPR_INFO, fmt GRPC_API_TRACE_UNWRAP##nargs args); \ } diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c index 1a6ccdaddb..539b014278 100644 --- a/src/core/lib/surface/byte_buffer_reader.c +++ b/src/core/lib/surface/byte_buffer_reader.c @@ -124,7 +124,7 @@ grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) { grpc_slice in_slice; size_t bytes_read = 0; const size_t input_size = grpc_byte_buffer_length(reader->buffer_out); - grpc_slice out_slice = grpc_slice_malloc(input_size); + grpc_slice out_slice = GRPC_SLICE_MALLOC(input_size); uint8_t *const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index a9317a4694..201969cd45 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -117,41 +117,61 @@ static received_status unpack_received_status(gpr_atm atm) { typedef struct batch_control { grpc_call *call; - grpc_cq_completion cq_completion; + /* Share memory for cq_completion and notify_tag as they are never needed + simultaneously. Each byte used in this data structure count as six bytes + per call, so any savings we can make are worthwhile, + + We use notify_tag to determine whether or not to send notification to the + completion queue. Once we've made that determination, we can reuse the + memory for cq_completion. */ + union { + grpc_cq_completion cq_completion; + struct { + /* Any given op indicates completion by either (a) calling a closure or + (b) sending a notification on the call's completion queue. If + \a is_closure is true, \a tag indicates a closure to be invoked; + otherwise, \a tag indicates the tag to be used in the notification to + be sent to the completion queue. */ + void *tag; + bool is_closure; + } notify_tag; + } completion_data; grpc_closure finish_batch; - void *notify_tag; gpr_refcount steps_to_complete; grpc_error *errors[MAX_ERRORS_PER_BATCH]; gpr_atm num_errors; - uint8_t send_initial_metadata; - uint8_t send_message; - uint8_t send_final_op; - uint8_t recv_initial_metadata; - uint8_t recv_message; - uint8_t recv_final_op; - uint8_t is_notify_tag_closure; - - /* TODO(ctiller): now that this is inlined, figure out how much of the above - state can be eliminated */ - grpc_transport_stream_op op; + grpc_transport_stream_op_batch op; } batch_control; +typedef struct { + gpr_mu child_list_mu; + grpc_call *first_child; +} parent_call; + +typedef struct { + grpc_call *parent; + /** siblings: children of the same parent form a list, and this list is + protected under + parent->mu */ + grpc_call *sibling_next; + grpc_call *sibling_prev; +} child_call; + struct grpc_call { + gpr_refcount ext_ref; gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; - grpc_call *parent; - grpc_call *first_child; gpr_timespec start_time; - /* protects first_child, and child next/prev links */ - gpr_mu child_list_mu; + /* parent_call* */ gpr_atm parent_call_atm; + child_call *child_call; /* client or server call */ bool is_client; - /** has grpc_call_destroy been called */ + /** has grpc_call_unref been called */ bool destroy_called; /** flag indicating that cancellation is inherited */ bool cancellation_is_inherited; @@ -168,7 +188,8 @@ struct grpc_call { /* have we received initial metadata */ bool has_initial_md_been_received; - batch_control active_batches[MAX_CONCURRENT_BATCHES]; + batch_control *active_batches[MAX_CONCURRENT_BATCHES]; + grpc_transport_stream_op_batch_payload stream_op_payload; /* first idx: is_receiving, second idx: is_trailing */ grpc_metadata_batch metadata_batch[2][2]; @@ -198,12 +219,6 @@ struct grpc_call { int send_extra_metadata_count; gpr_timespec send_deadline; - /** siblings: children of the same parent form a list, and this list is - protected under - parent->mu */ - grpc_call *sibling_next; - grpc_call *sibling_prev; - grpc_slice_buffer_stream sending_stream; grpc_byte_stream *receiving_stream; @@ -229,7 +244,8 @@ struct grpc_call { void *saved_receiving_stream_ready_bctlp; }; -int grpc_call_error_trace = 0; +grpc_tracer_flag grpc_call_error_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_compression_trace = GRPC_TRACER_INITIALIZER(false); #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) @@ -239,7 +255,7 @@ int grpc_call_error_trace = 0; CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op *op); + grpc_transport_stream_op_batch *op); static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description); @@ -268,6 +284,27 @@ static void add_init_error(grpc_error **composite, grpc_error *new) { *composite = grpc_error_add_child(*composite, new); } +void *grpc_call_arena_alloc(grpc_call *call, size_t size) { + return gpr_arena_alloc(call->arena, size); +} + +static parent_call *get_or_create_parent_call(grpc_call *call) { + parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); + if (p == NULL) { + p = gpr_arena_alloc(call->arena, sizeof(*p)); + gpr_mu_init(&p->child_list_mu); + if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) { + gpr_mu_destroy(&p->child_list_mu); + p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); + } + } + return p; +} + +static parent_call *get_parent_call(grpc_call *call) { + return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); +} + grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, const grpc_call_create_args *args, grpc_call **out_call) { @@ -281,16 +318,16 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel)); call = gpr_arena_alloc(arena, sizeof(grpc_call) + channel_stack->call_stack_size); + gpr_ref_init(&call->ext_ref, 1); call->arena = arena; *out_call = call; - gpr_mu_init(&call->child_list_mu); call->channel = args->channel; call->cq = args->cq; - call->parent = args->parent_call; call->start_time = gpr_now(GPR_CLOCK_MONOTONIC); /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; + call->stream_op_payload.context = call->context; grpc_slice path = grpc_empty_slice(); if (call->is_client) { GPR_ASSERT(args->add_initial_metadata_count < @@ -316,12 +353,20 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_timespec send_deadline = gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); + bool immediately_cancel = false; + if (args->parent_call != NULL) { + child_call *cc = call->child_call = + gpr_arena_alloc(arena, sizeof(child_call)); + call->child_call->parent = args->parent_call; + GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); GPR_ASSERT(call->is_client); GPR_ASSERT(!args->parent_call->is_client); - gpr_mu_lock(&args->parent_call->child_list_mu); + parent_call *pc = get_or_create_parent_call(args->parent_call); + + gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -350,28 +395,27 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { - cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); + immediately_cancel = true; } } - if (args->parent_call->first_child == NULL) { - args->parent_call->first_child = call; - call->sibling_next = call->sibling_prev = call; + if (pc->first_child == NULL) { + pc->first_child = call; + cc->sibling_next = cc->sibling_prev = call; } else { - call->sibling_next = args->parent_call->first_child; - call->sibling_prev = args->parent_call->first_child->sibling_prev; - call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = - call; + cc->sibling_next = pc->first_child; + cc->sibling_prev = pc->first_child->child_call->sibling_prev; + cc->sibling_next->child_call->sibling_prev = + cc->sibling_prev->child_call->sibling_next = call; } - gpr_mu_unlock(&args->parent_call->child_list_mu); + gpr_mu_unlock(&pc->child_list_mu); } call->send_deadline = send_deadline; GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); - /* initial refcount dropped by grpc_call_destroy */ + /* initial refcount dropped by grpc_call_unref */ grpc_call_element_args call_args = { .call_stack = CALL_STACK_FROM_CALL(call), .server_transport_data = args->server_transport_data, @@ -386,6 +430,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } + if (immediately_cancel) { + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + } if (args->cq != NULL) { GPR_ASSERT( args->pollset_set_alternative == NULL && @@ -460,7 +508,10 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - gpr_mu_destroy(&c->child_list_mu); + parent_call *pc = get_parent_call(c); + if (pc != NULL) { + gpr_mu_destroy(&pc->child_list_mu); + } for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } @@ -470,7 +521,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, } } if (c->cq) { - GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind"); } get_final_status(call, set_status_value_directly, &c->final_info.final_status, @@ -489,39 +540,43 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_END("destroy_call", 0); } -void grpc_call_destroy(grpc_call *c) { - int cancel; - grpc_call *parent = c->parent; +void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); } + +void grpc_call_unref(grpc_call *c) { + if (!gpr_unref(&c->ext_ref)) return; + + child_call *cc = c->child_call; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_call_destroy", 0); - GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); + GPR_TIMER_BEGIN("grpc_call_unref", 0); + GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); - if (parent) { - gpr_mu_lock(&parent->child_list_mu); - if (c == parent->first_child) { - parent->first_child = c->sibling_next; - if (c == parent->first_child) { - parent->first_child = NULL; + if (cc) { + parent_call *pc = get_parent_call(cc->parent); + gpr_mu_lock(&pc->child_list_mu); + if (c == pc->first_child) { + pc->first_child = cc->sibling_next; + if (c == pc->first_child) { + pc->first_child = NULL; } } - c->sibling_prev->sibling_next = c->sibling_next; - c->sibling_next->sibling_prev = c->sibling_prev; - gpr_mu_unlock(&parent->child_list_mu); - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); + cc->sibling_prev->child_call->sibling_next = cc->sibling_next; + cc->sibling_next->child_call->sibling_prev = cc->sibling_prev; + gpr_mu_unlock(&pc->child_list_mu); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); } GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && - !gpr_atm_acq_load(&c->received_final_op_atm); + bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 && + gpr_atm_acq_load(&c->received_final_op_atm) == 0; if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); } GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); grpc_exec_ctx_finish(&exec_ctx); - GPR_TIMER_END("grpc_call_destroy", 0); + GPR_TIMER_END("grpc_call_unref", 0); } grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { @@ -535,13 +590,12 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { } static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { grpc_call_element *elem; GPR_TIMER_BEGIN("execute_op", 0); elem = CALL_ELEM_FROM_CALL(call, 0); - op->context = call->context; - elem->filter->start_transport_stream_op(exec_ctx, elem, op); + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); GPR_TIMER_END("execute_op", 0); } @@ -594,9 +648,10 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - grpc_transport_stream_op *op = grpc_make_transport_stream_op( + grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); - op->cancel_error = error; + op->cancel_stream = true; + op->payload->cancel_stream.cancel_error = error; execute_op(exec_ctx, c, op); } @@ -647,7 +702,7 @@ static void get_final_status(grpc_call *call, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); } - if (grpc_call_error_trace) { + if (GRPC_TRACER_ON(grpc_call_error_trace)) { gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR"); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (status[i].is_set) { @@ -1025,16 +1080,17 @@ static batch_control *allocate_batch_control(grpc_call *call, const grpc_op *ops, size_t num_ops) { int slot = batch_slot_for_op(ops[0].op); - for (size_t i = 1; i < num_ops; i++) { - int op_slot = batch_slot_for_op(ops[i].op); - slot = GPR_MIN(slot, op_slot); + batch_control **pslot = &call->active_batches[slot]; + if (*pslot == NULL) { + *pslot = gpr_arena_alloc(call->arena, sizeof(batch_control)); } - batch_control *bctl = &call->active_batches[slot]; + batch_control *bctl = *pslot; if (bctl->call != NULL) { return NULL; } memset(bctl, 0, sizeof(*bctl)); bctl->call = call; + bctl->op.payload = &call->stream_op_payload; return bctl; } @@ -1069,46 +1125,49 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) { static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { - grpc_call *child_call; grpc_call *next_child_call; grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); - if (bctl->send_initial_metadata) { + if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } - if (bctl->send_message) { + if (bctl->op.send_message) { call->sending_message = false; } - if (bctl->send_final_op) { + if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( exec_ctx, &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } - if (bctl->recv_final_op) { + if (bctl->op.recv_trailing_metadata) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); - gpr_mu_lock(&call->child_list_mu); - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); + parent_call *pc = get_parent_call(call); + if (pc != NULL) { + grpc_call *child; + gpr_mu_lock(&pc->child_list_mu); + child = pc->first_child; + if (child != NULL) { + do { + next_child_call = child->child_call->sibling_next; + if (child->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); + cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel"); + } + child = next_child_call; + } while (child != pc->first_child); + } + gpr_mu_unlock(&pc->child_list_mu); } - gpr_mu_unlock(&call->child_list_mu); if (call->is_client) { get_final_status(call, set_status_value_directly, @@ -1123,15 +1182,16 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, error = GRPC_ERROR_NONE; } - if (bctl->is_notify_tag_closure) { + if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = NULL; - grpc_closure_run(exec_ctx, bctl->notify_tag, error); + grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ - grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, error, - finish_batch_completion, bctl, &bctl->cq_completion); + grpc_cq_end_op( + exec_ctx, bctl->call->cq, bctl->completion_data.notify_tag.tag, error, + finish_batch_completion, bctl, &bctl->completion_data.cq_completion); } } @@ -1143,6 +1203,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, batch_control *bctl) { + grpc_error *error; grpc_call *call = bctl->call; for (;;) { size_t remaining = call->receiving_stream->length - @@ -1154,11 +1215,22 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, finish_batch_step(exec_ctx, bctl); return; } - if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, - &call->receiving_slice, remaining, + if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, &call->receiving_slice_ready)) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); + error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream, + &call->receiving_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + } else { + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + grpc_byte_buffer_destroy(*call->receiving_buffer); + *call->receiving_buffer = NULL; + call->receiving_message = 0; + finish_batch_step(exec_ctx, bctl); + return; + } } else { return; } @@ -1169,20 +1241,36 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; + grpc_byte_stream *bs = call->receiving_stream; + bool release_error = false; if (error == GRPC_ERROR_NONE) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); - continue_receiving_slices(exec_ctx, bctl); - } else { - if (grpc_trace_operation_failures) { + grpc_slice slice; + error = grpc_byte_stream_pull(exec_ctx, bs, &slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + slice); + continue_receiving_slices(exec_ctx, bctl); + } else { + /* Error returned by grpc_byte_stream_pull needs to be released manually + */ + release_error = true; + } + } + + if (error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_trace_operation_failures)) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; + call->receiving_message = 0; finish_batch_step(exec_ctx, bctl); + if (release_error) { + GRPC_ERROR_UNREF(error); + } } } @@ -1267,8 +1355,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, GPR_ASSERT(call->encodings_accepted_by_peer != 0); if (!GPR_BITGET(call->encodings_accepted_by_peer, call->incoming_compression_algorithm)) { - extern int grpc_compression_trace; - if (grpc_compression_trace) { + if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name = NULL; grpc_compression_algorithm_name(call->incoming_compression_algorithm, &algo_name); @@ -1374,11 +1461,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (bctl == NULL) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } - bctl->notify_tag = notify_tag; - bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); + bctl->completion_data.notify_tag.tag = notify_tag; + bctl->completion_data.notify_tag.is_closure = + (uint8_t)(is_notify_tag_closure != 0); - grpc_transport_stream_op *stream_op = &bctl->op; - memset(stream_op, 0, sizeof(*stream_op)); + grpc_transport_stream_op_batch *stream_op = &bctl->op; + grpc_transport_stream_op_batch_payload *stream_op_payload = + &call->stream_op_payload; stream_op->covered_by_poller = true; /* rewrite batch ops into a transport op */ @@ -1432,8 +1521,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - bctl->send_initial_metadata = 1; - call->sent_initial_metadata = 1; + stream_op->send_initial_metadata = true; + call->sent_initial_metadata = true; if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, @@ -1443,9 +1532,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } /* TODO(ctiller): just make these the same variable? */ call->metadata_batch[0][0].deadline = call->send_deadline; - stream_op->send_initial_metadata = + stream_op_payload->send_initial_metadata.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; - stream_op->send_initial_metadata_flags = op->flags; + stream_op_payload->send_initial_metadata.send_initial_metadata_flags = + op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1460,8 +1550,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - bctl->send_message = 1; - call->sending_message = 1; + stream_op->send_message = true; + call->sending_message = true; grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message.send_message->data.raw.slice_buffer, @@ -1473,7 +1563,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, GRPC_COMPRESS_NONE) { call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - stream_op->send_message = &call->sending_stream.base; + stream_op_payload->send_message.send_message = + &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ @@ -1489,9 +1580,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - bctl->send_final_op = 1; - call->sent_final_op = 1; - stream_op->send_trailing_metadata = + stream_op->send_trailing_metadata = true; + call->sent_final_op = true; + stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: @@ -1513,8 +1604,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - bctl->send_final_op = 1; - call->sent_final_op = 1; + stream_op->send_trailing_metadata = true; + call->sent_final_op = true; GPR_ASSERT(call->send_extra_metadata_count == 0); call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( @@ -1553,7 +1644,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - stream_op->send_trailing_metadata = + stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: @@ -1570,16 +1661,16 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, from server.c. In that case, it's coming from accept_stream, and in that case we're not necessarily covered by a poller. */ stream_op->covered_by_poller = call->is_client; - call->received_initial_metadata = 1; + call->received_initial_metadata = true; call->buffered_metadata[0] = op->data.recv_initial_metadata.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl, grpc_schedule_on_exec_ctx); - bctl->recv_initial_metadata = 1; - stream_op->recv_initial_metadata = + stream_op->recv_initial_metadata = true; + stream_op_payload->recv_initial_metadata.recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - stream_op->recv_initial_metadata_ready = + stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; num_completion_callbacks_needed++; break; @@ -1593,13 +1684,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->receiving_message = 1; - bctl->recv_message = 1; + call->receiving_message = true; + stream_op->recv_message = true; call->receiving_buffer = op->data.recv_message.recv_message; - stream_op->recv_message = &call->receiving_stream; + stream_op_payload->recv_message.recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, bctl, grpc_schedule_on_exec_ctx); - stream_op->recv_message_ready = &call->receiving_stream_ready; + stream_op_payload->recv_message.recv_message_ready = + &call->receiving_stream_ready; num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: @@ -1616,16 +1708,17 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->requested_final_op = 1; + call->requested_final_op = true; call->buffered_metadata[1] = op->data.recv_status_on_client.trailing_metadata; call->final_op.client.status = op->data.recv_status_on_client.status; call->final_op.client.status_details = op->data.recv_status_on_client.status_details; - bctl->recv_final_op = 1; - stream_op->recv_trailing_metadata = + stream_op->recv_trailing_metadata = true; + stream_op->collect_stats = true; + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op->collect_stats = + stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: @@ -1642,13 +1735,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->requested_final_op = 1; + call->requested_final_op = true; call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; - bctl->recv_final_op = 1; - stream_op->recv_trailing_metadata = + stream_op->recv_trailing_metadata = true; + stream_op->collect_stats = true; + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op->collect_stats = + stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; break; } @@ -1660,7 +1754,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); - stream_op->context = call->context; grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; @@ -1674,26 +1767,26 @@ done: done_with_error: /* reverse any mutations that occured */ - if (bctl->send_initial_metadata) { - call->sent_initial_metadata = 0; + if (stream_op->send_initial_metadata) { + call->sent_initial_metadata = false; grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][0]); } - if (bctl->send_message) { - call->sending_message = 0; + if (stream_op->send_message) { + call->sending_message = false; grpc_byte_stream_destroy(exec_ctx, &call->sending_stream.base); } - if (bctl->send_final_op) { - call->sent_final_op = 0; + if (stream_op->send_trailing_metadata) { + call->sent_final_op = false; grpc_metadata_batch_clear(exec_ctx, &call->metadata_batch[0][1]); } - if (bctl->recv_initial_metadata) { - call->received_initial_metadata = 0; + if (stream_op->recv_initial_metadata) { + call->received_initial_metadata = false; } - if (bctl->recv_message) { - call->receiving_message = 0; + if (stream_op->recv_message) { + call->receiving_message = false; } - if (bctl->recv_final_op) { - call->requested_final_op = 0; + if (stream_op->recv_trailing_metadata) { + call->requested_final_op = false; } goto done; } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index 7d4d0db28d..256a5fa2fe 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -117,7 +117,8 @@ void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *grpc_call_context_get(grpc_call *call, grpc_context_index elem); #define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \ - if (grpc_api_trace) grpc_call_log_batch(sev, call, ops, nops, tag) + if (GRPC_TRACER_ON(grpc_api_trace)) \ + grpc_call_log_batch(sev, call, ops, nops, tag) uint8_t grpc_call_is_client(grpc_call *call); @@ -126,7 +127,8 @@ uint8_t grpc_call_is_client(grpc_call *call); grpc_compression_algorithm grpc_call_compression_for_level( grpc_call *call, grpc_compression_level level); -extern int grpc_call_error_trace; +extern grpc_tracer_flag grpc_call_error_trace; +extern grpc_tracer_flag grpc_compression_trace; #ifdef __cplusplus } diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index b4bfb92042..b3ba826bbc 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -150,17 +150,20 @@ grpc_channel *grpc_channel_create_with_builder( } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) { channel->compression_options.default_level.is_set = true; - GPR_ASSERT(args->args[i].value.integer >= 0 && - args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT); channel->compression_options.default_level.level = - (grpc_compression_level)args->args[i].value.integer; + (grpc_compression_level)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){GRPC_COMPRESS_LEVEL_NONE, + GRPC_COMPRESS_LEVEL_NONE, + GRPC_COMPRESS_LEVEL_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) { channel->compression_options.default_algorithm.is_set = true; - GPR_ASSERT(args->args[i].value.integer >= 0 && - args->args[i].value.integer < GRPC_COMPRESS_ALGORITHMS_COUNT); channel->compression_options.default_algorithm.algorithm = - (grpc_compression_algorithm)args->args[i].value.integer; + (grpc_compression_algorithm)grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, + GRPC_COMPRESS_ALGORITHMS_COUNT - 1}); } else if (0 == strcmp(args->args[i].key, GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) { diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c index 7acb444d9b..20f5753004 100644 --- a/src/core/lib/surface/channel_init.c +++ b/src/core/lib/surface/channel_init.c @@ -104,30 +104,13 @@ void grpc_channel_init_shutdown(void) { } } -static const char *name_for_type(grpc_channel_stack_type type) { - switch (type) { - case GRPC_CLIENT_CHANNEL: - return "CLIENT_CHANNEL"; - case GRPC_CLIENT_SUBCHANNEL: - return "CLIENT_SUBCHANNEL"; - case GRPC_SERVER_CHANNEL: - return "SERVER_CHANNEL"; - case GRPC_CLIENT_LAME_CHANNEL: - return "CLIENT_LAME_CHANNEL"; - case GRPC_CLIENT_DIRECT_CHANNEL: - return "CLIENT_DIRECT_CHANNEL"; - case GRPC_NUM_CHANNEL_STACK_TYPES: - break; - } - GPR_UNREACHABLE_CODE(return "UNKNOWN"); -} - bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, grpc_channel_stack_type type) { GPR_ASSERT(g_finalized); - grpc_channel_stack_builder_set_name(builder, name_for_type(type)); + grpc_channel_stack_builder_set_name(builder, + grpc_channel_stack_type_string(type)); for (size_t i = 0; i < g_slots[type].num_slots; i++) { const stage_slot *slot = &g_slots[type].slots[i]; diff --git a/src/core/lib/surface/channel_stack_type.c b/src/core/lib/surface/channel_stack_type.c index c35d603ca3..ed3b53fb36 100644 --- a/src/core/lib/surface/channel_stack_type.c +++ b/src/core/lib/surface/channel_stack_type.c @@ -52,3 +52,21 @@ bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) { } GPR_UNREACHABLE_CODE(return true;); } + +const char *grpc_channel_stack_type_string(grpc_channel_stack_type type) { + switch (type) { + case GRPC_CLIENT_CHANNEL: + return "CLIENT_CHANNEL"; + case GRPC_CLIENT_SUBCHANNEL: + return "CLIENT_SUBCHANNEL"; + case GRPC_SERVER_CHANNEL: + return "SERVER_CHANNEL"; + case GRPC_CLIENT_LAME_CHANNEL: + return "CLIENT_LAME_CHANNEL"; + case GRPC_CLIENT_DIRECT_CHANNEL: + return "CLIENT_DIRECT_CHANNEL"; + case GRPC_NUM_CHANNEL_STACK_TYPES: + break; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} diff --git a/src/core/lib/surface/channel_stack_type.h b/src/core/lib/surface/channel_stack_type.h index 4eea4f1b01..ccf4e53d27 100644 --- a/src/core/lib/surface/channel_stack_type.h +++ b/src/core/lib/surface/channel_stack_type.h @@ -55,4 +55,6 @@ typedef enum { bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type); +const char *grpc_channel_stack_type_string(grpc_channel_stack_type type); + #endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_STACK_TYPE_H */ diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b4594817e4..df5b70205c 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -50,9 +50,9 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/event_string.h" -int grpc_trace_operation_failures; +grpc_tracer_flag grpc_trace_operation_failures = GRPC_TRACER_INITIALIZER(false); #ifndef NDEBUG -int grpc_trace_pending_tags; +grpc_tracer_flag grpc_trace_pending_tags = GRPC_TRACER_INITIALIZER(false); #endif typedef struct { @@ -60,10 +60,156 @@ typedef struct { void *tag; } plucker; +typedef struct { + bool can_get_pollset; + bool can_listen; + size_t (*size)(void); + void (*init)(grpc_pollset *pollset, gpr_mu **mu); + grpc_error *(*kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); + grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure); + void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); +} cq_poller_vtable; + +typedef struct non_polling_worker { + gpr_cv cv; + bool kicked; + struct non_polling_worker *next; + struct non_polling_worker *prev; +} non_polling_worker; + +typedef struct { + gpr_mu mu; + non_polling_worker *root; + grpc_closure *shutdown; +} non_polling_poller; + +static size_t non_polling_poller_size(void) { + return sizeof(non_polling_poller); +} + +static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) { + non_polling_poller *npp = (non_polling_poller *)pollset; + gpr_mu_init(&npp->mu); + *mu = &npp->mu; +} + +static void non_polling_poller_destroy(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + non_polling_poller *npp = (non_polling_poller *)pollset; + gpr_mu_destroy(&npp->mu); +} + +static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker **worker, + gpr_timespec now, + gpr_timespec deadline) { + non_polling_poller *npp = (non_polling_poller *)pollset; + if (npp->shutdown) return GRPC_ERROR_NONE; + non_polling_worker w; + gpr_cv_init(&w.cv); + if (worker != NULL) *worker = (grpc_pollset_worker *)&w; + if (npp->root == NULL) { + npp->root = w.next = w.prev = &w; + } else { + w.next = npp->root; + w.prev = w.next->prev; + w.next->prev = w.prev->next = &w; + } + w.kicked = false; + while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline)) + ; + if (&w == npp->root) { + npp->root = w.next; + if (&w == npp->root) { + if (npp->shutdown) { + grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); + } + npp->root = NULL; + } + } + w.next->prev = w.prev; + w.prev->next = w.next; + gpr_cv_destroy(&w.cv); + if (worker != NULL) *worker = NULL; + return GRPC_ERROR_NONE; +} + +static grpc_error *non_polling_poller_kick( + grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + non_polling_poller *p = (non_polling_poller *)pollset; + if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root; + if (specific_worker != NULL) { + non_polling_worker *w = (non_polling_worker *)specific_worker; + if (!w->kicked) { + w->kicked = true; + gpr_cv_signal(&w->cv); + } + } + return GRPC_ERROR_NONE; +} + +static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_closure *closure) { + non_polling_poller *p = (non_polling_poller *)pollset; + GPR_ASSERT(closure != NULL); + p->shutdown = closure; + if (p->root == NULL) { + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + } else { + non_polling_worker *w = p->root; + do { + gpr_cv_signal(&w->cv); + w = w->next; + } while (w != p->root); + } +} + +static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { + /* GRPC_CQ_DEFAULT_POLLING */ + {.can_get_pollset = true, + .can_listen = true, + .size = grpc_pollset_size, + .init = grpc_pollset_init, + .kick = grpc_pollset_kick, + .work = grpc_pollset_work, + .shutdown = grpc_pollset_shutdown, + .destroy = grpc_pollset_destroy}, + /* GRPC_CQ_NON_LISTENING */ + {.can_get_pollset = true, + .can_listen = false, + .size = grpc_pollset_size, + .init = grpc_pollset_init, + .kick = grpc_pollset_kick, + .work = grpc_pollset_work, + .shutdown = grpc_pollset_shutdown, + .destroy = grpc_pollset_destroy}, + /* GRPC_CQ_NON_POLLING */ + {.can_get_pollset = false, + .can_listen = false, + .size = non_polling_poller_size, + .init = non_polling_poller_init, + .kick = non_polling_poller_kick, + .work = non_polling_poller_work, + .shutdown = non_polling_poller_shutdown, + .destroy = non_polling_poller_destroy}, +}; + /* Completion queue structure */ struct grpc_completion_queue { /** owned by pollset */ gpr_mu *mu; + + grpc_cq_completion_type completion_type; + + const cq_poller_vtable *poller_vtable; + /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -79,6 +225,7 @@ struct grpc_completion_queue { int shutdown_called; int is_server_cq; /** Can the server cq accept incoming channels */ + /* TODO: sreek - This will no longer be needed. Use polling_type set */ int is_non_listening_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; @@ -96,35 +243,46 @@ struct grpc_completion_queue { #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) #define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) -int grpc_cq_pluck_trace; -int grpc_cq_event_timeout_trace; +grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true); +grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); -#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ - if (grpc_api_trace && \ - (grpc_cq_pluck_trace || (event)->type != GRPC_QUEUE_TIMEOUT)) { \ - char *_ev = grpc_event_string(event); \ - gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \ - gpr_free(_ev); \ +#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ + if (GRPC_TRACER_ON(grpc_api_trace) && \ + (GRPC_TRACER_ON(grpc_cq_pluck_trace) || \ + (event)->type != GRPC_QUEUE_TIMEOUT)) { \ + char *_ev = grpc_event_string(event); \ + gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \ + gpr_free(_ev); \ } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, grpc_error *error); -grpc_completion_queue *grpc_completion_queue_create(void *reserved) { +grpc_completion_queue *grpc_completion_queue_create_internal( + grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type) { grpc_completion_queue *cc; - GPR_ASSERT(!reserved); - GPR_TIMER_BEGIN("grpc_completion_queue_create", 0); + GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0); - GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); + GRPC_API_TRACE( + "grpc_completion_queue_create_internal(completion_type=%d, " + "polling_type=%d)", + 2, (completion_type, polling_type)); + + const cq_poller_vtable *poller_vtable = + &g_poller_vtable_by_poller_type[polling_type]; - cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); - grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); + cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); + poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; #endif + cc->completion_type = completion_type; + cc->poller_vtable = poller_vtable; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ @@ -143,11 +301,15 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); - GPR_TIMER_END("grpc_completion_queue_create", 0); + GPR_TIMER_END("grpc_completion_queue_create_internal", 0); return cc; } +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { + return cc->completion_type; +} + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { @@ -162,20 +324,21 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) { static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_completion_queue *cc = arg; - GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy"); } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, - const char *file, int line) { +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); #else -void grpc_cq_internal_unref(grpc_completion_queue *cc) { +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); + cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc)); #ifndef NDEBUG gpr_free(cc->outstanding_tags); #endif @@ -215,14 +378,16 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, #endif GPR_TIMER_BEGIN("grpc_cq_end_op", 0); - if (grpc_api_trace || - (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { + if (GRPC_TRACER_ON(grpc_api_trace) || + (GRPC_TRACER_ON(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " "done_arg=%p, storage=%p)", 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); - if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } @@ -260,7 +425,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, } } grpc_error *kick_error = - grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); gpr_mu_unlock(cc->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -275,8 +440,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); } @@ -321,7 +486,7 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { #ifndef NDEBUG static void dump_pending_tags(grpc_completion_queue *cc) { - if (!grpc_trace_pending_tags) return; + if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; gpr_strvec v; gpr_strvec_init(&v); @@ -345,9 +510,15 @@ static void dump_pending_tags(grpc_completion_queue *cc) {} grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; - grpc_pollset_worker *worker = NULL; gpr_timespec now; + if (cc->completion_type != GRPC_CQ_NEXT) { + gpr_log(GPR_ERROR, + "grpc_completion_queue_next() cannot be called on this completion " + "queue since its completion type is not GRPC_CQ_NEXT"); + abort(); + } + GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GRPC_API_TRACE( @@ -414,36 +585,23 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, dump_pending_tags(cc); break; } - /* Check alarms - these are a global resource so we just ping - each time through on every pollset. - May update deadline to ensure timely wakeups. - TODO(ctiller): can this work be localized? */ - gpr_timespec iteration_deadline = deadline; - if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { - GPR_TIMER_MARK("alarm_triggered", 0); + grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + NULL, now, deadline); + if (err != GRPC_ERROR_NONE) { gpr_mu_unlock(cc->mu); - grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(cc->mu); - continue; - } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), - &worker, now, iteration_deadline); - if (err != GRPC_ERROR_NONE) { - gpr_mu_unlock(cc->mu); - const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); - GRPC_ERROR_UNREF(err); - memset(&ret, 0, sizeof(ret)); - ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); - break; - } + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); + break; } is_finished_arg.first_loop = false; } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "next"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -517,7 +675,14 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); - if (grpc_cq_pluck_trace) { + if (cc->completion_type != GRPC_CQ_PLUCK) { + gpr_log(GPR_ERROR, + "grpc_completion_queue_pluck() cannot be called on this completion " + "queue since its completion type is not GRPC_CQ_PLUCK"); + abort(); + } + + if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" "cc=%p, tag=%p, " @@ -600,38 +765,26 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, dump_pending_tags(cc); break; } - /* Check alarms - these are a global resource so we just ping - each time through on every pollset. - May update deadline to ensure timely wakeups. - TODO(ctiller): can this work be localized? */ - gpr_timespec iteration_deadline = deadline; - if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { - GPR_TIMER_MARK("alarm_triggered", 0); + grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + &worker, now, deadline); + if (err != GRPC_ERROR_NONE) { + del_plucker(cc, tag, &worker); gpr_mu_unlock(cc->mu); - grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(cc->mu); - } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), - &worker, now, iteration_deadline); - if (err != GRPC_ERROR_NONE) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cc->mu); - const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); - GRPC_ERROR_UNREF(err); - memset(&ret, 0, sizeof(ret)); - ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); - break; - } + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); + break; } is_finished_arg.first_loop = false; del_plucker(cc, tag, &worker); } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -656,8 +809,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); } gpr_mu_unlock(cc->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -668,12 +821,14 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); - GRPC_CQ_INTERNAL_UNREF(cc, "destroy"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return POLLSET_FROM_CQ(cc); + return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; } grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { @@ -681,13 +836,23 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { } void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { + /* TODO: sreek - use cc->polling_type field here and add a validation check + (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose + polling_type is set to GRPC_CQ_NON_LISTENING */ cc->is_non_listening_server_cq = 1; } bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { + /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */ return (cc->is_non_listening_server_cq == 1); } void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } -int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } +bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { + return cc->is_server_cq; +} + +bool grpc_cq_can_listen(grpc_completion_queue *cc) { + return cc->poller_vtable->can_listen; +} diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 5d73dd7216..8d9ce2ec02 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -37,15 +37,16 @@ /* Internal API for completion queues */ #include <grpc/grpc.h> +#include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/pollset.h" /* These trace flags default to 1. The corresponding lines are only traced if grpc_api_trace is also truthy */ -extern int grpc_cq_pluck_trace; -extern int grpc_cq_event_timeout_trace; -extern int grpc_trace_operation_failures; +extern grpc_tracer_flag grpc_cq_pluck_trace; +extern grpc_tracer_flag grpc_cq_event_timeout_trace; +extern grpc_tracer_flag grpc_trace_operation_failures; #ifndef NDEBUG -extern int grpc_trace_pending_tags; +extern grpc_tracer_flag grpc_trace_pending_tags; #endif typedef struct grpc_cq_completion { @@ -65,17 +66,17 @@ typedef struct grpc_cq_completion { #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line); -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, - const char *file, int line); +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + const char *reason, const char *file, int line); #define GRPC_CQ_INTERNAL_REF(cc, reason) \ grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__) -#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \ - grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__) +#define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) \ + grpc_cq_internal_unref(ec, cc, reason, __FILE__, __LINE__) #else void grpc_cq_internal_ref(grpc_completion_queue *cc); -void grpc_cq_internal_unref(grpc_completion_queue *cc); +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); #define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc) -#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc) +#define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc) #endif /* Flag that an operation is beginning: the completion channel will not finish @@ -94,9 +95,13 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -int grpc_cq_is_server_cq(grpc_completion_queue *cc); +bool grpc_cq_is_server_cq(grpc_completion_queue *cc); +bool grpc_cq_can_listen(grpc_completion_queue *cc); + +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); + +grpc_completion_queue *grpc_completion_queue_create_internal( + grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c index db67a5192b..d68b84eddd 100644 --- a/src/core/lib/surface/completion_queue_factory.c +++ b/src/core/lib/surface/completion_queue_factory.c @@ -36,12 +36,15 @@ #include <grpc/support/log.h> -/* TODO (sreek) - Currently this does not use the attributes arg. This will be - added in a future PR */ +/* + * == Default completion queue factory implementation == + */ + static grpc_completion_queue* default_create( const grpc_completion_queue_factory* factory, - const grpc_completion_queue_attributes* attributes) { - return grpc_completion_queue_create(NULL); + const grpc_completion_queue_attributes* attr) { + return grpc_completion_queue_create_internal(attr->cq_completion_type, + attr->cq_polling_type); } static grpc_completion_queue_factory_vtable default_vtable = {default_create}; @@ -49,19 +52,24 @@ static grpc_completion_queue_factory_vtable default_vtable = {default_create}; static const grpc_completion_queue_factory g_default_cq_factory = { "Default Factory", NULL, &default_vtable}; +/* + * == Completion queue factory APIs + */ + const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup( const grpc_completion_queue_attributes* attributes) { - /* As we add more fields to grpc_completion_queue_attributes, we may have to - change this assert to: - GPR_ASSERT (attributes->version >= 1 && - attributes->version <= GRPC_CQ_CURRENT_VERSION) */ - GPR_ASSERT(attributes->version == 1); + GPR_ASSERT(attributes->version >= 1 && + attributes->version <= GRPC_CQ_CURRENT_VERSION); /* The default factory can handle version 1 of the attributes structure. We may have to change this as more fields are added to the structure */ return &g_default_cq_factory; } +/* + * == Completion queue creation APIs == + */ + grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) { GPR_ASSERT(!reserved); grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT, @@ -75,3 +83,10 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { GRPC_CQ_DEFAULT_POLLING}; return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); } + +grpc_completion_queue* grpc_completion_queue_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attr, void* reserved) { + GPR_ASSERT(!reserved); + return factory->vtable->create(factory, attr); +} diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index b46ecac18d..6163776152 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -41,14 +41,8 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/channel/compress_filter.h" #include "src/core/lib/channel/connected_channel.h" -#include "src/core/lib/channel/deadline_filter.h" #include "src/core/lib/channel/handshaker_registry.h" -#include "src/core/lib/channel/http_client_filter.h" -#include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/channel/max_age_filter.h" -#include "src/core/lib/channel/message_size_filter.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/combiner.h" @@ -96,60 +90,13 @@ static bool prepend_filter(grpc_exec_ctx *exec_ctx, builder, (const grpc_channel_filter *)arg, NULL, NULL); } -static bool maybe_add_http_filter(grpc_exec_ctx *exec_ctx, - grpc_channel_stack_builder *builder, - void *arg) { - grpc_transport *t = grpc_channel_stack_builder_get_transport(builder); - if (t && strstr(t->vtable->name, "http")) { - return grpc_channel_stack_builder_prepend_filter( - builder, (const grpc_channel_filter *)arg, NULL, NULL); - } - return true; -} - static void register_builtin_channel_init() { - grpc_channel_init_register_stage( - GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - prepend_filter, (void *)&grpc_client_deadline_filter); - grpc_channel_init_register_stage( - GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, - (void *)&grpc_server_deadline_filter); - grpc_channel_init_register_stage( - GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, - (void *)&grpc_max_age_filter); - grpc_channel_init_register_stage( - GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - prepend_filter, (void *)&grpc_message_size_filter); - grpc_channel_init_register_stage( - GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - prepend_filter, (void *)&grpc_message_size_filter); - grpc_channel_init_register_stage( - GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, - (void *)&grpc_message_size_filter); - grpc_channel_init_register_stage( - GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, - (void *)&grpc_compress_filter); - grpc_channel_init_register_stage( - GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - prepend_filter, (void *)&grpc_compress_filter); - grpc_channel_init_register_stage( - GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter, - (void *)&grpc_compress_filter); - grpc_channel_init_register_stage( - GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_http_filter, (void *)&grpc_http_client_filter); grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, grpc_add_connected_filter, NULL); - grpc_channel_init_register_stage( - GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_http_filter, (void *)&grpc_http_client_filter); grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, grpc_add_connected_filter, NULL); - grpc_channel_init_register_stage( - GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, - maybe_add_http_filter, (void *)&grpc_http_server_filter); grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, grpc_add_connected_filter, NULL); @@ -193,16 +140,13 @@ void grpc_init(void) { grpc_register_tracer("channel_stack_builder", &grpc_trace_channel_stack_builder); grpc_register_tracer("http1", &grpc_http1_trace); - grpc_register_tracer("compression", &grpc_compression_trace); grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); grpc_register_tracer("combiner", &grpc_combiner_trace); grpc_register_tracer("server_channel", &grpc_server_channel_trace); grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace); // Default pluck trace to 1 - grpc_cq_pluck_trace = 1; grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace); // Default timeout trace to 1 - grpc_cq_event_timeout_trace = 1; grpc_register_tracer("op_failure", &grpc_trace_operation_failures); grpc_register_tracer("resource_quota", &grpc_resource_quota_trace); grpc_register_tracer("call_error", &grpc_call_error_trace); @@ -227,6 +171,7 @@ void grpc_init(void) { grpc_tracer_init("GRPC_TRACE"); /* no more changes to channel init pipelines */ grpc_channel_init_finalize(); + grpc_iomgr_start(); } gpr_mu_unlock(&g_init_mu); GRPC_API_TRACE("grpc_init(void)", 0, ()); diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 921ef87e36..746134676f 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -31,6 +31,8 @@ * */ +#include <grpc/support/port_platform.h> + #include "src/core/lib/surface/init.h" #include <limits.h> diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.cc index 0c408aa288..88f4eaac08 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.cc @@ -31,39 +31,50 @@ * */ -#include "src/core/lib/surface/lame_client.h" - #include <grpc/grpc.h> #include <string.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> + +#include "src/core/lib/support/atomic.h" + +extern "C" { #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/lame_client.h" #include "src/core/lib/transport/static_metadata.h" +} -typedef struct { +namespace grpc_core { + +namespace { + +struct CallData { grpc_linked_mdelem status; grpc_linked_mdelem details; - gpr_atm filled_metadata; -} call_data; + grpc_core::atomic<bool> filled_metadata; +}; -typedef struct { +struct ChannelData { grpc_status_code error_code; const char *error_message; -} channel_data; +}; static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *mdb) { - call_data *calld = elem->call_data; - if (!gpr_atm_no_barrier_cas(&calld->filled_metadata, 0, 1)) { + CallData *calld = static_cast<CallData *>(elem->call_data); + bool expected = false; + if (!calld->filled_metadata.compare_exchange_strong( + expected, true, grpc_core::memory_order_relaxed, + grpc_core::memory_order_relaxed)) { return; } - channel_data *chand = elem->channel_data; + ChannelData *chand = static_cast<ChannelData *>(elem->channel_data); char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(chand->error_code, tmp); calld->status.md = grpc_mdelem_from_slices( @@ -80,16 +91,17 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } -static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->recv_initial_metadata != NULL) { - fill_metadata(exec_ctx, elem, op->recv_initial_metadata); - } else if (op->recv_trailing_metadata != NULL) { - fill_metadata(exec_ctx, elem, op->recv_trailing_metadata); +static void lame_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { + if (op->recv_initial_metadata) { + fill_metadata(exec_ctx, elem, + op->payload->recv_initial_metadata.recv_initial_metadata); + } else if (op->recv_trailing_metadata) { + fill_metadata(exec_ctx, elem, + op->payload->recv_trailing_metadata.recv_trailing_metadata); } - grpc_transport_stream_op_finish_with_failure( + grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } @@ -125,8 +137,6 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - call_data *calld = elem->call_data; - gpr_atm_no_barrier_store(&calld->filled_metadata, 0); return GRPC_ERROR_NONE; } @@ -147,18 +157,22 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} -const grpc_channel_filter grpc_lame_filter = { - lame_start_transport_stream_op, - lame_start_transport_op, - sizeof(call_data), - init_call_elem, +} // namespace + +} // namespace grpc_core + +extern "C" const grpc_channel_filter grpc_lame_filter = { + grpc_core::lame_start_transport_stream_op_batch, + grpc_core::lame_start_transport_op, + sizeof(grpc_core::CallData), + grpc_core::init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - lame_get_peer, - lame_get_channel_info, + grpc_core::destroy_call_elem, + sizeof(grpc_core::ChannelData), + grpc_core::init_channel_elem, + grpc_core::destroy_channel_elem, + grpc_core::lame_get_peer, + grpc_core::lame_get_channel_info, "lame-client", }; @@ -169,7 +183,6 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, const char *error_message) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_channel_element *elem; - channel_data *chand; grpc_channel *channel = grpc_channel_create(&exec_ctx, target, NULL, GRPC_CLIENT_LAME_CHANNEL, NULL); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); @@ -178,7 +191,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, "error_message=%s)", 3, (target, (int)error_code, error_message)); GPR_ASSERT(elem->filter == &grpc_lame_filter); - chand = (channel_data *)elem->channel_data; + auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data); chand->error_code = error_code; chand->error_message = error_message; grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index a123c9ca43..560229e892 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -44,6 +44,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/stack_lockfree.h" @@ -72,7 +73,7 @@ typedef struct registered_method registered_method; typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; -int grpc_server_channel_trace = 0; +grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { requested_call_type type; @@ -154,8 +155,7 @@ struct call_data { grpc_completion_queue *cq_new; grpc_metadata_batch *recv_initial_metadata; - bool recv_idempotent_request; - bool recv_cacheable_request; + uint32_t recv_initial_metadata_flags; grpc_metadata_array initial_metadata; request_matcher *request_matcher; @@ -212,6 +212,11 @@ struct grpc_server { gpr_mu mu_global; /* mutex for server and channel state */ gpr_mu mu_call; /* mutex for call-specific state */ + /* startup synchronization: flag is protected by mu_global, signals whether + we are doing the listener start routine or not */ + bool starting; + gpr_cv starting_cv; + registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; @@ -340,7 +345,7 @@ static void request_matcher_destroy(request_matcher *rm) { static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, grpc_error *error) { - grpc_call_destroy(grpc_call_from_top_element(elem)); + grpc_call_unref(grpc_call_from_top_element(elem)); } static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, @@ -389,6 +394,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { grpc_channel_args_destroy(exec_ctx, server->channel_args); gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_call); + gpr_cv_destroy(&server->starting_cv); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; if (server->started) { @@ -402,7 +408,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { request_matcher_destroy(&server->unregistered_request_matcher); } for (i = 0; i < server->cq_count; i++) { - GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); if (server->started) { gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); gpr_free(server->requested_calls_per_cq[i]); @@ -450,7 +456,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_closure_init(&chand->finish_destroy_channel_closure, finish_destroy_channel, chand, grpc_schedule_on_exec_ctx); - if (grpc_server_channel_trace && error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_server_channel_trace) && error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(error); gpr_log(GPR_INFO, "Disconnected client: %s", msg); } @@ -498,13 +504,7 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->data.batch.details->host = grpc_slice_ref_internal(calld->host); rc->data.batch.details->method = grpc_slice_ref_internal(calld->path); rc->data.batch.details->deadline = calld->deadline; - rc->data.batch.details->flags = - (calld->recv_idempotent_request - ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST - : 0) | - (calld->recv_cacheable_request - ? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST - : 0); + rc->data.batch.details->flags = calld->recv_initial_metadata_flags; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; @@ -632,7 +632,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!grpc_slice_eq(rm->host, calld->host)) continue; if (!grpc_slice_eq(rm->method, calld->path)) continue; if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && - !calld->recv_idempotent_request) { + 0 == (calld->recv_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } finish_start_new_rpc(exec_ctx, server, elem, @@ -649,7 +650,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (rm->has_host) continue; if (!grpc_slice_eq(rm->method, calld->path)) continue; if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && - !calld->recv_idempotent_request) { + 0 == (calld->recv_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) { continue; } finish_start_new_rpc(exec_ctx, server, elem, @@ -781,22 +783,25 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, } static void server_mutate_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op_batch *op) { call_data *calld = elem->call_data; - if (op->recv_initial_metadata != NULL) { - GPR_ASSERT(op->recv_idempotent_request == NULL); - calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; - op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; - op->recv_idempotent_request = &calld->recv_idempotent_request; - op->recv_cacheable_request = &calld->recv_cacheable_request; + if (op->recv_initial_metadata) { + GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == NULL); + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + calld->on_done_recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->server_on_recv_initial_metadata; + op->payload->recv_initial_metadata.recv_flags = + &calld->recv_initial_metadata_flags; } } -static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { +static void server_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); @@ -960,7 +965,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_server_top_filter = { - server_start_transport_stream_op, + server_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, @@ -976,7 +981,7 @@ const grpc_channel_filter grpc_server_top_filter = { static void register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - bool is_non_listening, void *reserved) { + void *reserved) { size_t i, n; GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { @@ -985,10 +990,6 @@ static void register_completion_queue(grpc_server *server, grpc_cq_mark_server_cq(cq); - if (is_non_listening) { - grpc_cq_mark_non_listening_server_cq(cq); - } - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1002,16 +1003,16 @@ void grpc_server_register_completion_queue(grpc_server *server, GRPC_API_TRACE( "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, (server, cq, reserved)); - register_completion_queue(server, cq, false, reserved); -} -void grpc_server_register_non_listening_completion_queue( - grpc_server *server, grpc_completion_queue *cq, void *reserved) { - GRPC_API_TRACE( - "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " - "reserved=%p)", - 3, (server, cq, reserved)); - register_completion_queue(server, cq, true, reserved); + if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) { + gpr_log(GPR_INFO, + "Completion queue which is not of type GRPC_CQ_NEXT is being " + "registered as a server-completion-queue"); + /* Ideally we should log an error and abort but ruby-wrapped-language API + calls grpc_completion_queue_pluck() on server completion queues */ + } + + register_completion_queue(server, cq, reserved); } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { @@ -1019,10 +1020,9 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { grpc_server *server = gpr_zalloc(sizeof(grpc_server)); - GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); - gpr_mu_init(&server->mu_global); gpr_mu_init(&server->mu_call); + gpr_cv_init(&server->starting_cv); /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); @@ -1079,8 +1079,22 @@ void *grpc_server_register_method( return m; } +static void start_listeners(grpc_exec_ctx *exec_ctx, void *s, + grpc_error *error) { + grpc_server *server = s; + for (listener *l = server->listeners; l; l = l->next) { + l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count); + } + + gpr_mu_lock(&server->mu_global); + server->starting = false; + gpr_cv_signal(&server->starting_cv); + gpr_mu_unlock(&server->mu_global); + + server_unref(exec_ctx, server); +} + void grpc_server_start(grpc_server *server) { - listener *l; size_t i; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -1094,7 +1108,7 @@ void grpc_server_start(grpc_server *server) { server->requested_calls_per_cq = gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { - if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { + if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } @@ -1114,10 +1128,11 @@ void grpc_server_start(grpc_server *server) { (size_t)server->max_requested_calls_per_cq, server); } - for (l = server->listeners; l; l = l->next) { - l->start(&exec_ctx, server, l->arg, server->pollsets, - server->pollset_count); - } + server_ref(server); + server->starting = true; + grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server, + grpc_executor_scheduler), + GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } @@ -1251,8 +1266,14 @@ void grpc_server_shutdown_and_notify(grpc_server *server, GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); - /* lock, and gather up some stuff to do */ + /* wait for startup to be finished: locks mu_global */ gpr_mu_lock(&server->mu_global); + while (server->starting) { + gpr_cv_wait(&server->starting_cv, &server->mu_global, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + + /* stay locked, and gather up some stuff to do */ grpc_cq_begin_op(cq, tag); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index a85d9f4964..cd2fca0fe0 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -36,12 +36,13 @@ #include <grpc/grpc.h> #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/transport.h" extern const grpc_channel_filter grpc_server_top_filter; /** Lightweight tracing of server channel state */ -extern int grpc_server_channel_trace; +extern grpc_tracer_flag grpc_server_channel_trace; /* Add a listener to the server: when the server starts, it will call start, and when it shuts down, it will call destroy */ diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index ba80bd801e..cddc595e4c 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -36,6 +36,6 @@ #include <grpc/grpc.h> -const char *grpc_version_string(void) { return "3.0.0-dev"; } +const char *grpc_version_string(void) { return "4.0.0-dev"; } -const char *grpc_g_stands_for(void) { return "gentle"; } +const char *grpc_g_stands_for(void) { return "gregarious"; } |