aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/alarm.c37
-rw-r--r--src/core/lib/surface/byte_buffer.c6
-rw-r--r--src/core/lib/surface/call.c583
-rw-r--r--src/core/lib/surface/call.h12
-rw-r--r--src/core/lib/surface/call_log_batch.c2
-rw-r--r--src/core/lib/surface/call_test_only.h12
-rw-r--r--src/core/lib/surface/channel.c43
-rw-r--r--src/core/lib/surface/channel_init.c12
-rw-r--r--src/core/lib/surface/channel_ping.c4
-rw-r--r--src/core/lib/surface/completion_queue.c113
-rw-r--r--src/core/lib/surface/init.c14
-rw-r--r--src/core/lib/surface/lame_client.cc19
-rw-r--r--src/core/lib/surface/server.c153
-rw-r--r--src/core/lib/surface/version.c2
14 files changed, 692 insertions, 320 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index 7d60b1de17..7712f560b9 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -44,7 +44,9 @@ static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); }
static void alarm_unref(grpc_alarm *alarm) {
if (gpr_unref(&alarm->refs)) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ if (alarm->cq != NULL) {
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ }
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(alarm);
}
@@ -78,12 +80,12 @@ static void alarm_unref_dbg(grpc_alarm *alarm, const char *reason,
static void alarm_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
grpc_cq_completion *c) {
- grpc_alarm *alarm = arg;
+ grpc_alarm *alarm = (grpc_alarm *)arg;
GRPC_ALARM_UNREF(alarm, "dequeue-end-op");
}
static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_alarm *alarm = arg;
+ grpc_alarm *alarm = (grpc_alarm *)arg;
/* We are queuing an op on completion queue. This means, the alarm's structure
cannot be destroyed until the op is dequeued. Adding an extra ref
@@ -93,12 +95,8 @@ static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
(void *)alarm, &alarm->completion);
}
-grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
- void *tag) {
- grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm));
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
- gpr_ref_init(&alarm->refs, 1);
+grpc_alarm *grpc_alarm_create(void *reserved) {
+ grpc_alarm *alarm = (grpc_alarm *)gpr_malloc(sizeof(grpc_alarm));
#ifndef NDEBUG
if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
@@ -106,27 +104,36 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
}
#endif
+ gpr_ref_init(&alarm->refs, 1);
+ grpc_timer_init_unset(&alarm->alarm);
+ alarm->cq = NULL;
+ GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
+ grpc_schedule_on_exec_ctx);
+ return alarm;
+}
+
+void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq,
+ gpr_timespec deadline, void *tag, void *reserved) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
GRPC_CQ_INTERNAL_REF(cq, "alarm");
alarm->cq = cq;
alarm->tag = tag;
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
- GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
- grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&alarm->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_exec_ctx_finish(&exec_ctx);
- return alarm;
}
-void grpc_alarm_cancel(grpc_alarm *alarm) {
+void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_timer_cancel(&exec_ctx, &alarm->alarm);
grpc_exec_ctx_finish(&exec_ctx);
}
-void grpc_alarm_destroy(grpc_alarm *alarm) {
- grpc_alarm_cancel(alarm);
+void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved) {
+ grpc_alarm_cancel(alarm, reserved);
GRPC_ALARM_UNREF(alarm, "alarm_destroy");
}
diff --git a/src/core/lib/surface/byte_buffer.c b/src/core/lib/surface/byte_buffer.c
index 0bc990d487..7ed550ef87 100644
--- a/src/core/lib/surface/byte_buffer.c
+++ b/src/core/lib/surface/byte_buffer.c
@@ -32,7 +32,8 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
grpc_slice *slices, size_t nslices,
grpc_compression_algorithm compression) {
size_t i;
- grpc_byte_buffer *bb = gpr_malloc(sizeof(grpc_byte_buffer));
+ grpc_byte_buffer *bb =
+ (grpc_byte_buffer *)gpr_malloc(sizeof(grpc_byte_buffer));
bb->type = GRPC_BB_RAW;
bb->data.raw.compression = compression;
grpc_slice_buffer_init(&bb->data.raw.slice_buffer);
@@ -45,7 +46,8 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
grpc_byte_buffer_reader *reader) {
- grpc_byte_buffer *bb = gpr_malloc(sizeof(grpc_byte_buffer));
+ grpc_byte_buffer *bb =
+ (grpc_byte_buffer *)gpr_malloc(sizeof(grpc_byte_buffer));
grpc_slice slice;
bb->type = GRPC_BB_RAW;
bb->data.raw.compression = GRPC_COMPRESS_NONE;
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 00ec9c7c9a..03f47553a1 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -32,6 +32,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/compression/algorithm_metadata.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -121,6 +122,7 @@ typedef struct batch_control {
bool is_closure;
} notify_tag;
} completion_data;
+ grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
@@ -144,15 +146,19 @@ typedef struct {
grpc_call *sibling_prev;
} child_call;
+#define RECV_NONE ((gpr_atm)0)
+#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
+
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
+ grpc_call_combiner call_combiner;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
gpr_timespec start_time;
/* parent_call* */ gpr_atm parent_call_atm;
- child_call *child_call;
+ child_call *child;
/* client or server call */
bool is_client;
@@ -170,9 +176,6 @@ struct grpc_call {
gpr_atm any_ops_sent_atm;
gpr_atm received_final_op_atm;
- /* have we received initial metadata */
- bool has_initial_md_been_received;
-
batch_control *active_batches[MAX_CONCURRENT_BATCHES];
grpc_transport_stream_op_batch_payload stream_op_payload;
@@ -183,6 +186,11 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
+ grpc_metadata compression_md;
+
+ // A char* indicating the peer name.
+ gpr_atm peer_string;
+
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
@@ -192,8 +200,12 @@ struct grpc_call {
/* Compression algorithm for *incoming* data */
grpc_compression_algorithm incoming_compression_algorithm;
+ /* Stream compression algorithm for *incoming* data */
+ grpc_stream_compression_algorithm incoming_stream_compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */
uint32_t encodings_accepted_by_peer;
+ /* Supported stream encodings (stream compression algorithms), a bitset */
+ uint32_t stream_encodings_accepted_by_peer;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -226,7 +238,23 @@ struct grpc_call {
} server;
} final_op;
- void *saved_receiving_stream_ready_bctlp;
+ /* recv_state can contain one of the following values:
+ RECV_NONE : : no initial metadata and messages received
+ RECV_INITIAL_METADATA_FIRST : received initial metadata first
+ a batch_control* : received messages first
+
+ +------1------RECV_NONE------3-----+
+ | |
+ | |
+ v v
+ RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
+ | ^ | ^
+ | | | |
+ +-----2-----+ +-----4-----+
+
+ For 1, 4: See receiving_initial_metadata_ready() function
+ For 2, 3: See receiving_stream_ready() function */
+ gpr_atm recv_state;
};
grpc_tracer_flag grpc_call_error_trace =
@@ -241,8 +269,9 @@ grpc_tracer_flag grpc_compression_trace =
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op);
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op,
+ grpc_closure *start_batch_closure);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -264,11 +293,11 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_error *error, bool has_cancelled);
-static void add_init_error(grpc_error **composite, grpc_error *new) {
- if (new == GRPC_ERROR_NONE) return;
+static void add_init_error(grpc_error **composite, grpc_error *new_err) {
+ if (new_err == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE)
*composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
- *composite = grpc_error_add_child(*composite, new);
+ *composite = grpc_error_add_child(*composite, new_err);
}
void *grpc_call_arena_alloc(grpc_call *call, size_t size) {
@@ -278,7 +307,7 @@ void *grpc_call_arena_alloc(grpc_call *call, size_t size) {
static parent_call *get_or_create_parent_call(grpc_call *call) {
parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
if (p == NULL) {
- p = gpr_arena_alloc(call->arena, sizeof(*p));
+ p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p));
gpr_mu_init(&p->child_list_mu);
if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
gpr_mu_destroy(&p->child_list_mu);
@@ -301,12 +330,14 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
grpc_channel_get_channel_stack(args->channel);
grpc_call *call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
- gpr_arena *arena =
- gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel));
- call = gpr_arena_alloc(arena,
- sizeof(grpc_call) + channel_stack->call_stack_size);
+ size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
+ GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, initial_size);
+ gpr_arena *arena = gpr_arena_create(initial_size);
+ call = (grpc_call *)gpr_arena_alloc(
+ arena, sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
+ grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
call->channel = args->channel;
call->cq = args->cq;
@@ -314,6 +345,11 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == NULL;
+ if (call->is_client) {
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx);
+ } else {
+ GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx);
+ }
call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
@@ -342,24 +378,24 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
bool immediately_cancel = false;
- if (args->parent_call != NULL) {
- child_call *cc = call->child_call =
- gpr_arena_alloc(arena, sizeof(child_call));
- call->child_call->parent = args->parent_call;
+ if (args->parent != NULL) {
+ child_call *cc = call->child =
+ (child_call *)gpr_arena_alloc(arena, sizeof(child_call));
+ call->child->parent = args->parent;
- GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
+ GRPC_CALL_INTERNAL_REF(args->parent, "child");
GPR_ASSERT(call->is_client);
- GPR_ASSERT(!args->parent_call->is_client);
+ GPR_ASSERT(!args->parent->is_client);
- parent_call *pc = get_or_create_parent_call(args->parent_call);
+ parent_call *pc = get_or_create_parent_call(args->parent);
gpr_mu_lock(&pc->child_list_mu);
if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min(
gpr_convert_clock_type(send_deadline,
- args->parent_call->send_deadline.clock_type),
- args->parent_call->send_deadline);
+ args->parent->send_deadline.clock_type),
+ args->parent->send_deadline);
}
/* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */
@@ -371,9 +407,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
"Census tracing propagation requested "
"without Census context propagation"));
}
- grpc_call_context_set(
- call, GRPC_CONTEXT_TRACING,
- args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
+ grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
+ args->parent->context[GRPC_CONTEXT_TRACING].value,
+ NULL);
} else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Census context propagation requested "
@@ -381,7 +417,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
}
if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1;
- if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) {
+ if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
immediately_cancel = true;
}
}
@@ -391,9 +427,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
cc->sibling_next = cc->sibling_prev = call;
} else {
cc->sibling_next = pc->first_child;
- cc->sibling_prev = pc->first_child->child_call->sibling_prev;
- cc->sibling_next->child_call->sibling_prev =
- cc->sibling_prev->child_call->sibling_next = call;
+ cc->sibling_prev = pc->first_child->child->sibling_prev;
+ cc->sibling_next->child->sibling_prev =
+ cc->sibling_prev->child->sibling_next = call;
}
gpr_mu_unlock(&pc->child_list_mu);
@@ -410,7 +446,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
- .arena = call->arena};
+ .arena = call->arena,
+ .call_combiner = &call->call_combiner};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
@@ -475,8 +512,10 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
- grpc_call *c = call;
+ grpc_call *c = (grpc_call *)call;
grpc_channel *channel = c->channel;
+ grpc_call_combiner_destroy(&c->call_combiner);
+ gpr_free((char *)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
@@ -486,7 +525,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
size_t i;
int ii;
- grpc_call *c = call;
+ grpc_call *c = (grpc_call *)call;
GPR_TIMER_BEGIN("destroy_call", 0);
for (i = 0; i < 2; i++) {
grpc_metadata_batch_destroy(
@@ -511,7 +550,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
GRPC_CQ_INTERNAL_UNREF(exec_ctx, c->cq, "bind");
}
- get_final_status(call, set_status_value_directly, &c->final_info.final_status,
+ get_final_status(c, set_status_value_directly, &c->final_info.final_status,
NULL);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
@@ -532,7 +571,7 @@ void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); }
void grpc_call_unref(grpc_call *c) {
if (!gpr_unref(&c->ext_ref)) return;
- child_call *cc = c->child_call;
+ child_call *cc = c->child;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_call_unref", 0);
@@ -547,8 +586,8 @@ void grpc_call_unref(grpc_call *c) {
pc->first_child = NULL;
}
}
- cc->sibling_prev->child_call->sibling_next = cc->sibling_next;
- cc->sibling_next->child_call->sibling_prev = cc->sibling_prev;
+ cc->sibling_prev->child->sibling_next = cc->sibling_next;
+ cc->sibling_next->child->sibling_prev = cc->sibling_prev;
gpr_mu_unlock(&pc->child_list_mu);
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child");
}
@@ -560,6 +599,12 @@ void grpc_call_unref(grpc_call *c) {
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
+ } else {
+ // Unset the call combiner cancellation closure. This has the
+ // effect of scheduling the previously set cancellation closure, if
+ // any, so that it can release any internal references it may be
+ // holding to the call stack.
+ grpc_call_combiner_set_notify_on_cancel(&exec_ctx, &c->call_combiner, NULL);
}
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
grpc_exec_ctx_finish(&exec_ctx);
@@ -576,30 +621,37 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return GRPC_CALL_OK;
}
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op) {
- grpc_call_element *elem;
-
- GPR_TIMER_BEGIN("execute_op", 0);
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
- GPR_TIMER_END("execute_op", 0);
+// This is called via the call combiner to start sending a batch down
+// the filter stack.
+static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *ignored) {
+ grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
+ grpc_call *call = (grpc_call *)batch->handler_private.extra_arg;
+ GPR_TIMER_BEGIN("execute_batch", 0);
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
+ GPR_TIMER_END("execute_batch", 0);
+}
+
+// start_batch_closure points to a caller-allocated closure to be used
+// for entering the call combiner.
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *batch,
+ grpc_closure *start_batch_closure) {
+ batch->handler_private.extra_arg = call;
+ GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
+ GRPC_ERROR_NONE, "executing batch");
}
char *grpc_call_get_peer(grpc_call *call) {
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *result;
- GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
- result = elem->filter->get_peer(&exec_ctx, elem);
- if (result == NULL) {
- result = grpc_channel_get_target(call->channel);
- }
- if (result == NULL) {
- result = gpr_strdup("unknown");
- }
- grpc_exec_ctx_finish(&exec_ctx);
- return result;
+ char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
+ if (peer_string != NULL) return gpr_strdup(peer_string);
+ peer_string = grpc_channel_get_target(call->channel);
+ if (peer_string != NULL) return peer_string;
+ return gpr_strdup("unknown");
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -626,20 +678,41 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return GRPC_CALL_OK;
}
-static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
+typedef struct {
+ grpc_call *call;
+ grpc_closure start_batch;
+ grpc_closure finish_batch;
+} cancel_state;
+
+// The on_complete callback used when sending a cancel_stream batch down
+// the filter stack. Yields the call combiner when the batch is done.
+static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
+ cancel_state *state = (cancel_state *)arg;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
+ "on_complete for cancel_stream op");
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
+ gpr_free(state);
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
+ // Inform the call combiner of the cancellation, so that it can cancel
+ // any in-flight asynchronous actions that may be holding the call
+ // combiner. This ensures that the cancel_stream batch can be sent
+ // down the filter stack in a timely manner.
+ grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
- GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
+ cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
+ state->call = c;
+ GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_transport_stream_op_batch *op =
+ grpc_make_transport_stream_op(&state->finish_batch);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_op(exec_ctx, c, op);
+ execute_batch(exec_ctx, c, op, &state->start_batch);
}
static grpc_error *error_from_status(grpc_status_code status,
@@ -752,6 +825,12 @@ static void set_incoming_compression_algorithm(
call->incoming_compression_algorithm = algo;
}
+static void set_incoming_stream_compression_algorithm(
+ grpc_call *call, grpc_stream_compression_algorithm algo) {
+ GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
+ call->incoming_stream_compression_algorithm = algo;
+}
+
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call *call) {
grpc_compression_algorithm algorithm;
@@ -765,6 +844,13 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked(
call->encodings_accepted_by_peer);
}
+static grpc_stream_compression_algorithm
+stream_compression_algorithm_for_level_locked(
+ grpc_call *call, grpc_stream_compression_level level) {
+ return grpc_stream_compression_algorithm_for_level(
+ level, call->stream_encodings_accepted_by_peer);
+}
+
uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
uint32_t flags;
flags = call->test_only_last_message_flags;
@@ -819,12 +905,70 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
(void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
}
+static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
+ grpc_call *call,
+ grpc_mdelem mdel) {
+ size_t i;
+ grpc_stream_compression_algorithm algorithm;
+ grpc_slice_buffer accept_encoding_parts;
+ grpc_slice accept_encoding_slice;
+ void *accepted_user_data;
+
+ accepted_user_data =
+ grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
+ if (accepted_user_data != NULL) {
+ call->stream_encodings_accepted_by_peer =
+ (uint32_t)(((uintptr_t)accepted_user_data) - 1);
+ return;
+ }
+
+ accept_encoding_slice = GRPC_MDVALUE(mdel);
+ grpc_slice_buffer_init(&accept_encoding_parts);
+ grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
+
+ /* Always support no compression */
+ GPR_BITSET(&call->stream_encodings_accepted_by_peer,
+ GRPC_STREAM_COMPRESS_NONE);
+ for (i = 0; i < accept_encoding_parts.count; i++) {
+ grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
+ if (grpc_stream_compression_algorithm_parse(accept_encoding_entry_slice,
+ &algorithm)) {
+ GPR_BITSET(&call->stream_encodings_accepted_by_peer, algorithm);
+ } else {
+ char *accept_encoding_entry_str =
+ grpc_slice_to_c_string(accept_encoding_entry_slice);
+ gpr_log(GPR_ERROR,
+ "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
+ accept_encoding_entry_str);
+ gpr_free(accept_encoding_entry_str);
+ }
+ }
+
+ grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
+
+ grpc_mdelem_set_user_data(
+ mdel, destroy_encodings_accepted_by_peer,
+ (void *)(((uintptr_t)call->stream_encodings_accepted_by_peer) + 1));
+}
+
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
uint32_t encodings_accepted_by_peer;
encodings_accepted_by_peer = call->encodings_accepted_by_peer;
return encodings_accepted_by_peer;
}
+uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
+ grpc_call *call) {
+ uint32_t stream_encodings_accepted_by_peer;
+ stream_encodings_accepted_by_peer = call->stream_encodings_accepted_by_peer;
+ return stream_encodings_accepted_by_peer;
+}
+
+grpc_stream_compression_algorithm
+grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) {
+ return call->incoming_stream_compression_algorithm;
+}
+
static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) {
return (grpc_linked_mdelem *)&md->internal_data;
}
@@ -936,6 +1080,22 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
return algorithm;
}
+static grpc_stream_compression_algorithm decode_stream_compression(
+ grpc_mdelem md) {
+ grpc_stream_compression_algorithm algorithm =
+ grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
+ if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
+ char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid incoming stream compression algorithm: '%s'. Interpreting "
+ "incoming data as uncompressed.",
+ md_c_str);
+ gpr_free(md_c_str);
+ return GRPC_STREAM_COMPRESS_NONE;
+ }
+ return algorithm;
+}
+
static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
int is_trailing) {
if (b->list.count == 0) return;
@@ -946,8 +1106,8 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
if (dest->count + b->list.count > dest->capacity) {
dest->capacity =
GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
- dest->metadata =
- gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
+ dest->metadata = (grpc_metadata *)gpr_realloc(
+ dest->metadata, sizeof(grpc_metadata) * dest->capacity);
}
for (grpc_linked_mdelem *l = b->list.head; l != NULL; l = l->next) {
mdusr = &dest->metadata[dest->count++];
@@ -960,7 +1120,19 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch *b) {
- if (b->idx.named.grpc_encoding != NULL) {
+ if (b->idx.named.content_encoding != NULL) {
+ if (b->idx.named.grpc_encoding != NULL) {
+ gpr_log(GPR_ERROR,
+ "Received both content-encoding and grpc-encoding header. "
+ "Ignoring grpc-encoding.");
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
+ }
+ GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0);
+ set_incoming_stream_compression_algorithm(
+ call, decode_stream_compression(b->idx.named.content_encoding->md));
+ GPR_TIMER_END("incoming_stream_compression_algorithm", 0);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding);
+ } else if (b->idx.named.grpc_encoding != NULL) {
GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
set_incoming_compression_algorithm(
call, decode_compression(b->idx.named.grpc_encoding->md));
@@ -974,12 +1146,19 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
}
+ if (b->idx.named.accept_encoding != NULL) {
+ GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0);
+ set_stream_encodings_accepted_by_peer(exec_ctx, call,
+ b->idx.named.accept_encoding->md);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding);
+ GPR_TIMER_END("stream_encodings_accepted_by_peer", 0);
+ }
publish_app_metadata(call, b, false);
}
static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args,
grpc_metadata_batch *b) {
- grpc_call *call = args;
+ grpc_call *call = (grpc_call *)args;
if (b->idx.named.grpc_status != NULL) {
uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
grpc_error *error =
@@ -1063,7 +1242,8 @@ static batch_control *allocate_batch_control(grpc_call *call,
int slot = batch_slot_for_op(ops[0].op);
batch_control **pslot = &call->active_batches[slot];
if (*pslot == NULL) {
- *pslot = gpr_arena_alloc(call->arena, sizeof(batch_control));
+ *pslot =
+ (batch_control *)gpr_arena_alloc(call->arena, sizeof(batch_control));
}
batch_control *bctl = *pslot;
if (bctl->call != NULL) {
@@ -1077,7 +1257,7 @@ static batch_control *allocate_batch_control(grpc_call *call,
static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_cq_completion *storage) {
- batch_control *bctl = user_data;
+ batch_control *bctl = (batch_control *)user_data;
grpc_call *call = bctl->call;
bctl->call = NULL;
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
@@ -1137,7 +1317,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
child = pc->first_child;
if (child != NULL) {
do {
- next_child_call = child->child_call->sibling_next;
+ next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE,
@@ -1166,7 +1346,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = NULL;
- GRPC_CLOSURE_RUN(exec_ctx, bctl->completion_data.notify_tag.tag, error);
+ GRPC_CLOSURE_RUN(
+ exec_ctx, (grpc_closure *)bctl->completion_data.notify_tag.tag, error);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
/* unrefs bctl->error */
@@ -1220,7 +1401,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
- batch_control *bctl = bctlp;
+ batch_control *bctl = (batch_control *)bctlp;
grpc_call *call = bctl->call;
grpc_byte_stream *bs = call->receiving_stream;
bool release_error = false;
@@ -1279,7 +1460,7 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx,
static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
- batch_control *bctl = bctlp;
+ batch_control *bctl = (batch_control *)bctlp;
grpc_call *call = bctl->call;
if (error != GRPC_ERROR_NONE) {
if (call->receiving_stream != NULL) {
@@ -1290,19 +1471,73 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
- if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
- call->receiving_stream == NULL) {
- process_data_after_md(exec_ctx, bctlp);
- } else {
- call->saved_receiving_stream_ready_bctlp = bctlp;
+ /* If recv_state is RECV_NONE, we will save the batch_control
+ * object with rel_cas, and will not use it after the cas. Its corresponding
+ * acq_load is in receiving_initial_metadata_ready() */
+ if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL ||
+ !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
+ process_data_after_md(exec_ctx, bctl);
}
}
+// The recv_message_ready callback used when sending a batch containing
+// a recv_message op down the filter stack. Yields the call combiner
+// before processing the received message.
+static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *bctlp,
+ grpc_error *error) {
+ batch_control *bctl = (batch_control *)bctlp;
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
+ receiving_stream_ready(exec_ctx, bctlp, error);
+}
+
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
- /* validate call->incoming_compression_algorithm */
- if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
+ /* validate compression algorithms */
+ if (call->incoming_stream_compression_algorithm !=
+ GRPC_STREAM_COMPRESS_NONE) {
+ const grpc_stream_compression_algorithm algo =
+ call->incoming_stream_compression_algorithm;
+ char *error_msg = NULL;
+ const grpc_compression_options compression_options =
+ grpc_channel_compression_options(call->channel);
+ if (algo >= GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
+ gpr_asprintf(&error_msg,
+ "Invalid stream compression algorithm value '%d'.", algo);
+ gpr_log(GPR_ERROR, "%s", error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ } else if (grpc_compression_options_is_stream_compression_algorithm_enabled(
+ &compression_options, algo) == 0) {
+ /* check if algorithm is supported by current channel config */
+ const char *algo_name = NULL;
+ grpc_stream_compression_algorithm_name(algo, &algo_name);
+ gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.",
+ algo_name);
+ gpr_log(GPR_ERROR, "%s", error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ }
+ gpr_free(error_msg);
+
+ GPR_ASSERT(call->stream_encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->stream_encodings_accepted_by_peer,
+ call->incoming_stream_compression_algorithm)) {
+ if (GRPC_TRACER_ON(grpc_compression_trace)) {
+ const char *algo_name = NULL;
+ grpc_stream_compression_algorithm_name(
+ call->incoming_stream_compression_algorithm, &algo_name);
+ gpr_log(
+ GPR_ERROR,
+ "Stream compression algorithm (content-encoding = '%s') not "
+ "present in the bitset of accepted encodings (accept-encodings: "
+ "'0x%x')",
+ algo_name, call->stream_encodings_accepted_by_peer);
+ }
+ }
+ } else if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
const grpc_compression_algorithm algo =
call->incoming_compression_algorithm;
char *error_msg = NULL;
@@ -1318,7 +1553,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
- char *algo_name = NULL;
+ const char *algo_name = NULL;
grpc_compression_algorithm_name(algo, &algo_name);
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
@@ -1329,22 +1564,20 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
call->incoming_compression_algorithm = algo;
}
gpr_free(error_msg);
- }
- /* make sure the received grpc-encoding is amongst the ones listed in
- * grpc-accept-encoding */
- GPR_ASSERT(call->encodings_accepted_by_peer != 0);
- if (!GPR_BITGET(call->encodings_accepted_by_peer,
- call->incoming_compression_algorithm)) {
- if (GRPC_TRACER_ON(grpc_compression_trace)) {
- char *algo_name = NULL;
- grpc_compression_algorithm_name(call->incoming_compression_algorithm,
- &algo_name);
- gpr_log(GPR_ERROR,
- "Compression algorithm (grpc-encoding = '%s') not present in "
- "the bitset of accepted encodings (grpc-accept-encodings: "
- "'0x%x')",
- algo_name, call->encodings_accepted_by_peer);
+ GPR_ASSERT(call->encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->encodings_accepted_by_peer,
+ call->incoming_compression_algorithm)) {
+ if (GRPC_TRACER_ON(grpc_compression_trace)) {
+ const char *algo_name = NULL;
+ grpc_compression_algorithm_name(call->incoming_compression_algorithm,
+ &algo_name);
+ gpr_log(GPR_ERROR,
+ "Compression algorithm (grpc-encoding = '%s') not present in "
+ "the bitset of accepted encodings (grpc-accept-encodings: "
+ "'0x%x')",
+ algo_name, call->encodings_accepted_by_peer);
+ }
}
}
}
@@ -1362,9 +1595,12 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
void *bctlp, grpc_error *error) {
- batch_control *bctl = bctlp;
+ batch_control *bctl = (batch_control *)bctlp;
grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
+ "recv_initial_metadata_ready");
+
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1384,12 +1620,31 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
}
- call->has_initial_md_been_received = true;
- if (call->saved_receiving_stream_ready_bctlp != NULL) {
- grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE(
- receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
- grpc_schedule_on_exec_ctx);
- call->saved_receiving_stream_ready_bctlp = NULL;
+ grpc_closure *saved_rsr_closure = NULL;
+ while (true) {
+ gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
+ /* Should only receive initial metadata once */
+ GPR_ASSERT(rsr_bctlp != 1);
+ if (rsr_bctlp == 0) {
+ /* We haven't seen initial metadata and messages before, thus initial
+ * metadata is received first.
+ * no_barrier_cas is used, as this function won't access the batch_control
+ * object saved by receiving_stream_ready() if the initial metadata is
+ * received first. */
+ if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
+ RECV_INITIAL_METADATA_FIRST)) {
+ break;
+ }
+ } else {
+ /* Already received messages */
+ saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready,
+ (batch_control *)rsr_bctlp,
+ grpc_schedule_on_exec_ctx);
+ /* No need to modify recv_state */
+ break;
+ }
+ }
+ if (saved_rsr_closure != NULL) {
GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}
@@ -1398,8 +1653,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
- batch_control *bctl = bctlp;
-
+ batch_control *bctl = (batch_control *)bctlp;
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
@@ -1418,9 +1674,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
batch_control *bctl;
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
-
- // sent_initial_metadata guards against variable reuse.
- grpc_metadata compression_md;
+ grpc_transport_stream_op_batch *stream_op;
+ grpc_transport_stream_op_batch_payload *stream_op_payload;
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1428,11 +1683,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
- grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
- free_no_op_completion, NULL,
- gpr_malloc(sizeof(grpc_cq_completion)));
+ grpc_cq_end_op(
+ exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
+ free_no_op_completion, NULL,
+ (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion)));
} else {
- GRPC_CLOSURE_SCHED(exec_ctx, notify_tag, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)notify_tag, GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;
@@ -1446,9 +1702,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
bctl->completion_data.notify_tag.is_closure =
(uint8_t)(is_notify_tag_closure != 0);
- grpc_transport_stream_op_batch *stream_op = &bctl->op;
- grpc_transport_stream_op_batch_payload *stream_op_payload =
- &call->stream_op_payload;
+ stream_op = &bctl->op;
+ stream_op_payload = &call->stream_op_payload;
/* rewrite batch ops into a transport op */
for (i = 0; i < nops; i++) {
@@ -1458,7 +1713,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
switch (op->op) {
- case GRPC_OP_SEND_INITIAL_METADATA:
+ case GRPC_OP_SEND_INITIAL_METADATA: {
/* Flag validation: currently allow no flags */
if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
@@ -1469,31 +1724,60 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- memset(&compression_md, 0, sizeof(compression_md));
+ memset(&call->compression_md, 0, sizeof(call->compression_md));
size_t additional_metadata_count = 0;
- grpc_compression_level effective_compression_level;
+ grpc_compression_level effective_compression_level =
+ GRPC_COMPRESS_LEVEL_NONE;
+ grpc_stream_compression_level effective_stream_compression_level =
+ GRPC_STREAM_COMPRESS_LEVEL_NONE;
bool level_set = false;
- if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
+ bool stream_compression = false;
+ if (op->data.send_initial_metadata.maybe_stream_compression_level
+ .is_set) {
+ effective_stream_compression_level =
+ op->data.send_initial_metadata.maybe_stream_compression_level
+ .level;
+ level_set = true;
+ stream_compression = true;
+ } else if (op->data.send_initial_metadata.maybe_compression_level
+ .is_set) {
effective_compression_level =
op->data.send_initial_metadata.maybe_compression_level.level;
level_set = true;
} else {
const grpc_compression_options copts =
grpc_channel_compression_options(call->channel);
- level_set = copts.default_level.is_set;
- if (level_set) {
+ if (copts.default_stream_compression_level.is_set) {
+ level_set = true;
+ effective_stream_compression_level =
+ copts.default_stream_compression_level.level;
+ stream_compression = true;
+ } else if (copts.default_level.is_set) {
+ level_set = true;
effective_compression_level = copts.default_level.level;
}
}
if (level_set && !call->is_client) {
- const grpc_compression_algorithm calgo =
- compression_algorithm_for_level_locked(
- call, effective_compression_level);
- // the following will be picked up by the compress filter and used as
- // the call's compression algorithm.
- compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- compression_md.value = grpc_compression_algorithm_slice(calgo);
- additional_metadata_count++;
+ if (stream_compression) {
+ const grpc_stream_compression_algorithm calgo =
+ stream_compression_algorithm_for_level_locked(
+ call, effective_stream_compression_level);
+ call->compression_md.key =
+ GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
+ call->compression_md.value =
+ grpc_stream_compression_algorithm_slice(calgo);
+ } else {
+ const grpc_compression_algorithm calgo =
+ compression_algorithm_for_level_locked(
+ call, effective_compression_level);
+ /* the following will be picked up by the compress filter and used
+ * as the call's compression algorithm. */
+ call->compression_md.key =
+ GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ call->compression_md.value =
+ grpc_compression_algorithm_slice(calgo);
+ additional_metadata_count++;
+ }
}
if (op->data.send_initial_metadata.count + additional_metadata_count >
@@ -1506,7 +1790,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
- &compression_md, (int)additional_metadata_count)) {
+ &call->compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1518,8 +1802,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
+ if (call->is_client) {
+ stream_op_payload->send_initial_metadata.peer_string =
+ &call->peer_string;
+ }
break;
- case GRPC_OP_SEND_MESSAGE:
+ }
+ case GRPC_OP_SEND_MESSAGE: {
if (!are_write_flags_valid(op->flags)) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
@@ -1548,7 +1837,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op_payload->send_message.send_message =
&call->sending_stream.base;
break;
- case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ }
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
@@ -1567,7 +1857,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
- case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ }
+ case GRPC_OP_SEND_STATUS_FROM_SERVER: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
@@ -1629,7 +1920,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
break;
- case GRPC_OP_RECV_INITIAL_METADATA:
+ }
+ case GRPC_OP_RECV_INITIAL_METADATA: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
@@ -1650,9 +1942,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
+ if (!call->is_client) {
+ stream_op_payload->recv_initial_metadata.peer_string =
+ &call->peer_string;
+ }
num_completion_callbacks_needed++;
break;
- case GRPC_OP_RECV_MESSAGE:
+ }
+ case GRPC_OP_RECV_MESSAGE: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
@@ -1666,13 +1963,15 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
- bctl, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
+ receiving_stream_ready_in_call_combiner, bctl,
+ grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
break;
- case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ }
+ case GRPC_OP_RECV_STATUS_ON_CLIENT: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
@@ -1699,7 +1998,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
- case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ }
+ case GRPC_OP_RECV_CLOSE_ON_SERVER: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
@@ -1723,6 +2023,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op_payload->collect_stats.collect_stats =
&call->final_info.stats.transport_stream_stats;
break;
+ }
}
}
@@ -1737,7 +2038,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_op(exec_ctx, call, stream_op);
+ execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 185bfccb77..c680139cf6 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -19,6 +19,10 @@
#ifndef GRPC_CORE_LIB_SURFACE_CALL_H
#define GRPC_CORE_LIB_SURFACE_CALL_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/surface/api_trace.h"
@@ -26,10 +30,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
grpc_call *call, int success,
void *user_data);
@@ -37,7 +37,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
typedef struct grpc_call_create_args {
grpc_channel *channel;
- grpc_call *parent_call;
+ grpc_call *parent;
uint32_t propagation_mask;
grpc_completion_queue *cq;
@@ -89,7 +89,7 @@ grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
-void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag);
diff --git a/src/core/lib/surface/call_log_batch.c b/src/core/lib/surface/call_log_batch.c
index 4443aba58a..4a1c265817 100644
--- a/src/core/lib/surface/call_log_batch.c
+++ b/src/core/lib/surface/call_log_batch.c
@@ -103,7 +103,7 @@ char *grpc_op_string(const grpc_op *op) {
return out;
}
-void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag) {
char *tmp;
diff --git a/src/core/lib/surface/call_test_only.h b/src/core/lib/surface/call_test_only.h
index 2f1b80bfd7..a5a01b3679 100644
--- a/src/core/lib/surface/call_test_only.h
+++ b/src/core/lib/surface/call_test_only.h
@@ -42,6 +42,18 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call);
* To be indexed by grpc_compression_algorithm enum values. */
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call);
+/** Returns a bitset for the stream encodings (stream compression algorithms)
+ * supported by \a call's peer.
+ *
+ * To be indexed by grpc_stream_compression_algorithm enum values. */
+uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
+ grpc_call *call);
+
+/** Returns the incoming stream compression algorithm (content-encoding header)
+ * received by a call. */
+grpc_stream_compression_algorithm
+grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 5780a18ce8..48962e5e45 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -27,6 +27,7 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/support/string.h"
@@ -77,6 +78,11 @@ grpc_channel *grpc_channel_create_with_builder(
grpc_channel_args *args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
grpc_channel *channel;
+ if (channel_stack_type == GRPC_SERVER_CHANNEL) {
+ GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx);
+ } else {
+ GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx);
+ }
grpc_error *error = grpc_channel_stack_builder_finish(
exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL,
(void **)&channel);
@@ -142,6 +148,16 @@ grpc_channel *grpc_channel_create_with_builder(
GRPC_COMPRESS_LEVEL_NONE,
GRPC_COMPRESS_LEVEL_COUNT - 1});
} else if (0 == strcmp(args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) {
+ channel->compression_options.default_stream_compression_level.is_set =
+ true;
+ channel->compression_options.default_stream_compression_level.level =
+ (grpc_stream_compression_level)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){GRPC_STREAM_COMPRESS_LEVEL_NONE,
+ GRPC_STREAM_COMPRESS_LEVEL_NONE,
+ GRPC_STREAM_COMPRESS_LEVEL_COUNT - 1});
+ } else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
channel->compression_options.default_algorithm.is_set = true;
channel->compression_options.default_algorithm.algorithm =
@@ -149,12 +165,31 @@ grpc_channel *grpc_channel_create_with_builder(
&args->args[i],
(grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_ALGORITHMS_COUNT - 1});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
+ channel->compression_options.default_stream_compression_algorithm.is_set =
+ true;
+ channel->compression_options.default_stream_compression_algorithm
+ .algorithm =
+ (grpc_stream_compression_algorithm)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){
+ GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE,
+ GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT - 1});
} else if (0 ==
strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
channel->compression_options.enabled_algorithms_bitset =
(uint32_t)args->args[i].value.integer |
0x1; /* always support no compression */
+ } else if (0 ==
+ strcmp(
+ args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
+ channel->compression_options
+ .enabled_stream_compression_algorithms_bitset =
+ (uint32_t)args->args[i].value.integer |
+ 0x1; /* always support no compression */
}
}
@@ -247,7 +282,7 @@ static grpc_call *grpc_channel_create_call_internal(
grpc_call_create_args args;
memset(&args, 0, sizeof(args));
args.channel = channel;
- args.parent_call = parent_call;
+ args.parent = parent_call;
args.propagation_mask = propagation_mask;
args.cq = cq;
args.pollset_set_alternative = pollset_set_alternative;
@@ -298,7 +333,7 @@ grpc_call *grpc_channel_create_pollset_set_call(
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host, void *reserved) {
- registered_call *rc = gpr_malloc(sizeof(registered_call));
+ registered_call *rc = (registered_call *)gpr_malloc(sizeof(registered_call));
GRPC_API_TRACE(
"grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)",
4, (channel, method, host, reserved));
@@ -325,7 +360,7 @@ grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline, void *reserved) {
- registered_call *rc = registered_call_handle;
+ registered_call *rc = (registered_call *)registered_call_handle;
GRPC_API_TRACE(
"grpc_channel_create_registered_call("
"channel=%p, parent_call=%p, propagation_mask=%x, completion_queue=%p, "
@@ -363,7 +398,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx,
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_channel *channel = arg;
+ grpc_channel *channel = (grpc_channel *)arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
registered_call *rc = channel->registered_calls;
diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c
index a1391ffe56..33f444b89e 100644
--- a/src/core/lib/surface/channel_init.c
+++ b/src/core/lib/surface/channel_init.c
@@ -53,9 +53,9 @@ void grpc_channel_init_register_stage(grpc_channel_stack_type type,
GPR_ASSERT(!g_finalized);
if (g_slots[type].cap_slots == g_slots[type].num_slots) {
g_slots[type].cap_slots = GPR_MAX(8, 3 * g_slots[type].cap_slots / 2);
- g_slots[type].slots =
- gpr_realloc(g_slots[type].slots,
- g_slots[type].cap_slots * sizeof(*g_slots[type].slots));
+ g_slots[type].slots = (stage_slot *)gpr_realloc(
+ g_slots[type].slots,
+ g_slots[type].cap_slots * sizeof(*g_slots[type].slots));
}
stage_slot *s = &g_slots[type].slots[g_slots[type].num_slots++];
s->insertion_order = g_slots[type].num_slots;
@@ -65,8 +65,8 @@ void grpc_channel_init_register_stage(grpc_channel_stack_type type,
}
static int compare_slots(const void *a, const void *b) {
- const stage_slot *sa = a;
- const stage_slot *sb = b;
+ const stage_slot *sa = (const stage_slot *)a;
+ const stage_slot *sb = (const stage_slot *)b;
int c = GPR_ICMP(sa->priority, sb->priority);
if (c != 0) return c;
@@ -85,7 +85,7 @@ void grpc_channel_init_finalize(void) {
void grpc_channel_init_shutdown(void) {
for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
gpr_free(g_slots[i].slots);
- g_slots[i].slots = (void *)(uintptr_t)0xdeadbeef;
+ g_slots[i].slots = (stage_slot *)(void *)(uintptr_t)0xdeadbeef;
}
}
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index e85b308850..f45b568958 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -39,7 +39,7 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg,
}
static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- ping_result *pr = arg;
+ ping_result *pr = (ping_result *)arg;
grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, GRPC_ERROR_REF(error), ping_destroy,
pr, &pr->completion_storage);
}
@@ -49,7 +49,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_API_TRACE("grpc_channel_ping(channel=%p, cq=%p, tag=%p, reserved=%p)", 4,
(channel, cq, tag, reserved));
grpc_transport_op *op = grpc_make_transport_op(NULL);
- ping_result *pr = gpr_malloc(sizeof(*pr));
+ ping_result *pr = (ping_result *)gpr_malloc(sizeof(*pr));
grpc_channel_element *top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index c20cfbc740..fed66e3a20 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -26,6 +26,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
@@ -54,7 +55,7 @@ typedef struct {
bool can_listen;
size_t (*size)(void);
void (*init)(grpc_pollset *pollset, gpr_mu **mu);
- grpc_error *(*kick)(grpc_pollset *pollset,
+ grpc_error *(*kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
@@ -130,7 +131,8 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
}
static grpc_error *non_polling_poller_kick(
- grpc_pollset *pollset, grpc_pollset_worker *specific_worker) {
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker *specific_worker) {
non_polling_poller *p = (non_polling_poller *)pollset;
if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
if (specific_worker != NULL) {
@@ -327,25 +329,12 @@ static void cq_destroy_pluck(void *data);
/* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = {
/* GRPC_CQ_NEXT */
- {.data_size = sizeof(cq_next_data),
- .cq_completion_type = GRPC_CQ_NEXT,
- .init = cq_init_next,
- .shutdown = cq_shutdown_next,
- .destroy = cq_destroy_next,
- .begin_op = cq_begin_op_for_next,
- .end_op = cq_end_op_for_next,
- .next = cq_next,
- .pluck = NULL},
+ {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
+ cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next, NULL},
/* GRPC_CQ_PLUCK */
- {.data_size = sizeof(cq_pluck_data),
- .cq_completion_type = GRPC_CQ_PLUCK,
- .init = cq_init_pluck,
- .shutdown = cq_shutdown_pluck,
- .destroy = cq_destroy_pluck,
- .begin_op = cq_begin_op_for_pluck,
- .end_op = cq_end_op_for_pluck,
- .next = NULL,
- .pluck = cq_pluck},
+ {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
+ cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, NULL,
+ cq_pluck},
};
#define DATA_FROM_CQ(cq) ((void *)(cq + 1))
@@ -420,8 +409,13 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
const cq_poller_vtable *poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
- cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
- poller_vtable->size());
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_STATS_INC_CQS_CREATED(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
+
+ cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) +
+ vtable->data_size +
+ poller_vtable->size());
cq->vtable = vtable;
cq->poller_vtable = poller_vtable;
@@ -441,7 +435,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
}
static void cq_init_next(void *ptr) {
- cq_next_data *cqd = ptr;
+ cq_next_data *cqd = (cq_next_data *)ptr;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
@@ -450,13 +444,13 @@ static void cq_init_next(void *ptr) {
}
static void cq_destroy_next(void *ptr) {
- cq_next_data *cqd = ptr;
+ cq_next_data *cqd = (cq_next_data *)ptr;
GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
cq_event_queue_destroy(&cqd->queue);
}
static void cq_init_pluck(void *ptr) {
- cq_pluck_data *cqd = ptr;
+ cq_pluck_data *cqd = (cq_pluck_data *)ptr;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
@@ -468,7 +462,7 @@ static void cq_init_pluck(void *ptr) {
}
static void cq_destroy_pluck(void *ptr) {
- cq_pluck_data *cqd = ptr;
+ cq_pluck_data *cqd = (cq_pluck_data *)ptr;
GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
}
@@ -501,7 +495,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cq) {
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_completion_queue *cq = arg;
+ grpc_completion_queue *cq = (grpc_completion_queue *)arg;
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy");
}
@@ -559,13 +553,13 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
* true if the increment was successful; false if the counter is zero */
static bool atm_inc_if_nonzero(gpr_atm *counter) {
while (true) {
- gpr_atm count = gpr_atm_no_barrier_load(counter);
+ gpr_atm count = gpr_atm_acq_load(counter);
/* If zero, we are done. If not, we must to a CAS (instead of an atomic
* increment) to maintain the contract: do not increment the counter if it
* is zero. */
if (count == 0) {
return false;
- } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) {
+ } else if (gpr_atm_full_cas(counter, count, count + 1)) {
break;
}
}
@@ -574,12 +568,12 @@ static bool atm_inc_if_nonzero(gpr_atm *counter) {
}
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
@@ -588,9 +582,9 @@ bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
gpr_mu_lock(cq->mu);
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags =
- gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
- cq->outstanding_tag_capacity);
+ cq->outstanding_tags = (void **)gpr_realloc(
+ cq->outstanding_tags,
+ sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity);
}
cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
gpr_mu_unlock(cq->mu);
@@ -624,7 +618,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
}
}
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
storage->tag = tag;
@@ -637,15 +631,19 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
/* Add the completion to the queue */
bool is_first = cq_event_queue_push(&cqd->queue, storage);
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
- bool will_definitely_shutdown =
- gpr_atm_no_barrier_load(&cqd->pending_events) == 1;
+
+ /* Since we do not hold the cq lock here, it is important to do an 'acquire'
+ load here (instead of a 'no_barrier' load) to match with the release store
+ (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
+ */
+ bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
if (!will_definitely_shutdown) {
/* Only kick if this is the first item queued */
if (is_first) {
gpr_mu_lock(cq->mu);
grpc_error *kick_error =
- cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
@@ -685,7 +683,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
@@ -731,7 +729,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
}
grpc_error *kick_error =
- cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
+ cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), pluck_worker);
gpr_mu_unlock(cq->mu);
@@ -766,9 +764,9 @@ typedef struct {
} cq_is_finished_arg;
static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
- cq_is_finished_arg *a = arg;
+ cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
grpc_completion_queue *cq = a->cq;
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -819,7 +817,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) {
grpc_event ret;
gpr_timespec now;
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -882,7 +880,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
}
}
- if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) {
+ if (gpr_atm_acq_load(&cqd->pending_events) == 0) {
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop() can sometimes return NULL even if the queue is not
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
@@ -928,9 +926,9 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
}
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
- gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
+ gpr_atm_acq_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
- cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), NULL);
gpr_mu_unlock(cq->mu);
}
@@ -952,7 +950,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
this function */
static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
@@ -963,7 +961,7 @@ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
+ cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_next() below, that would call pollset shutdown.
@@ -976,10 +974,12 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
- GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
cqd->shutdown_called = true;
+ /* Doing a full_fetch_add (i.e acq/release) here to match with
+ * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
+ * on this counter without necessarily holding a lock on cq */
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_next(exec_ctx, cq);
}
@@ -994,7 +994,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
static int add_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
@@ -1006,7 +1006,7 @@ static int add_plucker(grpc_completion_queue *cq, void *tag,
static void del_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
cqd->num_pluckers--;
@@ -1018,9 +1018,9 @@ static void del_plucker(grpc_completion_queue *cq, void *tag,
}
static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
- cq_is_finished_arg *a = arg;
+ cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
grpc_completion_queue *cq = a->cq;
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -1057,7 +1057,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -1181,7 +1181,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
@@ -1195,7 +1195,7 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
* merging them is a bit tricky and probably not worth it */
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
@@ -1208,7 +1208,6 @@ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
- GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
cqd->shutdown_called = true;
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index d199ac060e..b089da2c54 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -28,12 +28,15 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/handshaker_registry.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/resource_quota.h"
+#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/alarm_internal.h"
@@ -118,6 +121,7 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
gpr_time_init();
+ grpc_stats_init();
grpc_slice_intern_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
@@ -127,6 +131,7 @@ void grpc_init(void) {
grpc_register_tracer(&grpc_trace_channel_stack_builder);
grpc_register_tracer(&grpc_http1_trace);
grpc_register_tracer(&grpc_cq_pluck_trace); // default on
+ grpc_register_tracer(&grpc_call_combiner_trace);
grpc_register_tracer(&grpc_combiner_trace);
grpc_register_tracer(&grpc_server_channel_trace);
grpc_register_tracer(&grpc_bdp_estimator_trace);
@@ -175,17 +180,20 @@ void grpc_shutdown(void) {
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
- grpc_iomgr_shutdown(&exec_ctx);
- gpr_timers_global_destroy();
- grpc_tracer_shutdown();
+ grpc_executor_shutdown(&exec_ctx);
+ grpc_timer_manager_set_threading(false); // shutdown timer_manager thread
for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != NULL) {
g_all_of_the_plugins[i].destroy();
}
}
+ grpc_iomgr_shutdown(&exec_ctx);
+ gpr_timers_global_destroy();
+ grpc_tracer_shutdown();
grpc_mdctx_global_shutdown(&exec_ctx);
grpc_handshaker_factory_registry_shutdown(&exec_ctx);
grpc_slice_intern_shutdown();
+ grpc_stats_shutdown();
}
gpr_mu_unlock(&g_init_mu);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index a0791080a9..6286f9159d 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -40,6 +40,7 @@ namespace grpc_core {
namespace {
struct CallData {
+ grpc_call_combiner *call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
grpc_core::atomic<bool> filled_metadata;
@@ -52,14 +53,14 @@ struct ChannelData {
static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *mdb) {
- CallData *calld = static_cast<CallData *>(elem->call_data);
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
bool expected = false;
if (!calld->filled_metadata.compare_exchange_strong(
expected, true, grpc_core::memory_order_relaxed,
grpc_core::memory_order_relaxed)) {
return;
}
- ChannelData *chand = static_cast<ChannelData *>(elem->channel_data);
+ ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data);
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
calld->status.md = grpc_mdelem_from_slices(
@@ -79,6 +80,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void lame_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
if (op->recv_initial_metadata) {
fill_metadata(exec_ctx, elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
@@ -87,12 +89,8 @@ static void lame_start_transport_stream_op_batch(
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
-}
-
-static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return NULL;
+ exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
+ calld->call_combiner);
}
static void lame_get_channel_info(grpc_exec_ctx *exec_ctx,
@@ -122,6 +120,8 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+ calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -156,7 +156,6 @@ extern "C" const grpc_channel_filter grpc_lame_filter = {
sizeof(grpc_core::ChannelData),
grpc_core::init_channel_elem,
grpc_core::destroy_channel_elem,
- grpc_core::lame_get_peer,
grpc_core::lame_get_channel_info,
"lame-client",
};
@@ -176,7 +175,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
"error_message=%s)",
3, (target, (int)error_code, error_message));
GPR_ASSERT(elem->filter == &grpc_lame_filter);
- auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data);
+ auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data);
chand->error_code = error_code;
chand->error_message = error_message;
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 66dcc299aa..1d0fd472d0 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -29,6 +29,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -75,7 +76,7 @@ typedef struct requested_call {
grpc_call_details *details;
} batch;
struct {
- registered_method *registered_method;
+ registered_method *method;
gpr_timespec *deadline;
grpc_byte_buffer **optional_payload;
} registered;
@@ -144,7 +145,7 @@ struct call_data {
uint32_t recv_initial_metadata_flags;
grpc_metadata_array initial_metadata;
- request_matcher *request_matcher;
+ request_matcher *matcher;
grpc_byte_buffer *payload;
grpc_closure got_initial_metadata;
@@ -170,7 +171,7 @@ struct registered_method {
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
/* one request matcher per method */
- request_matcher request_matcher;
+ request_matcher matcher;
registered_method *next;
};
@@ -250,7 +251,8 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
count++;
}
cb->num_channels = count;
- cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
+ cb->channels =
+ (grpc_channel **)gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
cb->channels[count++] = c->channel;
@@ -265,14 +267,15 @@ struct shutdown_cleanup_args {
static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- struct shutdown_cleanup_args *a = arg;
+ struct shutdown_cleanup_args *a = (struct shutdown_cleanup_args *)arg;
grpc_slice_unref_internal(exec_ctx, a->slice);
gpr_free(a);
}
static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
bool send_goaway, grpc_error *send_disconnect) {
- struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc));
+ struct shutdown_cleanup_args *sc =
+ (struct shutdown_cleanup_args *)gpr_malloc(sizeof(*sc));
GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
grpc_schedule_on_exec_ctx);
grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
@@ -314,8 +317,8 @@ static void request_matcher_init(request_matcher *rm, size_t entries,
grpc_server *server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
- rm->requests_per_cq =
- gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count);
+ rm->requests_per_cq = (gpr_stack_lockfree **)gpr_malloc(
+ sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) {
rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
}
@@ -331,7 +334,7 @@ static void request_matcher_destroy(request_matcher *rm) {
static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem,
grpc_error *error) {
- grpc_call_unref(grpc_call_from_top_element(elem));
+ grpc_call_unref(grpc_call_from_top_element((grpc_call_element *)elem));
}
static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
@@ -384,7 +387,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
if (server->started) {
- request_matcher_destroy(&rm->request_matcher);
+ request_matcher_destroy(&rm->matcher);
}
gpr_free(rm->method);
gpr_free(rm->host);
@@ -426,7 +429,7 @@ static void orphan_channel(channel_data *chand) {
static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
grpc_error *error) {
- channel_data *chand = cd;
+ channel_data *chand = (channel_data *)cd;
grpc_server *server = chand->server;
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
server_unref(exec_ctx, server);
@@ -459,7 +462,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
grpc_cq_completion *c) {
- requested_call *rc = req;
+ requested_call *rc = (requested_call *)req;
grpc_server *server = rc->server;
if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
@@ -505,7 +508,7 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- channel_data *chand = elem->channel_data;
+ channel_data *chand = (channel_data *)elem->channel_data;
server_ref(chand->server);
grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE,
done_request_event, rc, &rc->completion);
@@ -513,10 +516,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_call_element *call_elem = arg;
- call_data *calld = call_elem->call_data;
- channel_data *chand = call_elem->channel_data;
- request_matcher *rm = calld->request_matcher;
+ grpc_call_element *call_elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)call_elem->call_data;
+ channel_data *chand = (channel_data *)call_elem->channel_data;
+ request_matcher *rm = calld->matcher;
grpc_server *server = rm->server;
if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
@@ -538,6 +541,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
if (request_id == -1) {
continue;
} else {
+ GRPC_STATS_INC_SERVER_CQS_CHECKED(exec_ctx, i);
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
@@ -548,6 +552,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
}
/* no cq to take the request found: queue it on the slow list */
+ GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx);
gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state);
calld->state = PENDING;
@@ -566,7 +571,7 @@ static void finish_start_new_rpc(
grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem,
request_matcher *rm,
grpc_server_register_method_payload_handling payload_handling) {
- call_data *calld = elem->call_data;
+ call_data *calld = (call_data *)elem->call_data;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
gpr_mu_lock(&calld->mu_state);
@@ -578,7 +583,7 @@ static void finish_start_new_rpc(
return;
}
- calld->request_matcher = rm;
+ calld->matcher = rm;
switch (payload_handling) {
case GRPC_SRM_PAYLOAD_NONE:
@@ -599,8 +604,8 @@ static void finish_start_new_rpc(
}
static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- call_data *calld = elem->call_data;
+ channel_data *chand = (channel_data *)elem->channel_data;
+ call_data *calld = (call_data *)elem->call_data;
grpc_server *server = chand->server;
uint32_t i;
uint32_t hash;
@@ -624,7 +629,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
continue;
}
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
+ &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling);
return;
}
@@ -642,7 +647,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
continue;
}
finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
+ &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling);
return;
}
@@ -663,7 +668,7 @@ static int num_listeners(grpc_server *server) {
static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server,
grpc_cq_completion *completion) {
- server_unref(exec_ctx, server);
+ server_unref(exec_ctx, (grpc_server *)server);
}
static int num_channels(grpc_server *server) {
@@ -686,9 +691,9 @@ static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
exec_ctx, &server->unregistered_request_matcher);
for (registered_method *rm = server->registered_methods; rm;
rm = rm->next) {
- request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher,
+ request_matcher_kill_requests(exec_ctx, server, &rm->matcher,
GRPC_ERROR_REF(error));
- request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
+ request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher);
}
}
GRPC_ERROR_UNREF(error);
@@ -732,8 +737,8 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
grpc_error *error) {
- grpc_call_element *elem = ptr;
- call_data *calld = elem->call_data;
+ grpc_call_element *elem = (grpc_call_element *)ptr;
+ call_data *calld = (call_data *)elem->call_data;
gpr_timespec op_deadline;
if (error == GRPC_ERROR_NONE) {
@@ -771,7 +776,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
static void server_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
- call_data *calld = elem->call_data;
+ call_data *calld = (call_data *)elem->call_data;
if (op->recv_initial_metadata) {
GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == NULL);
@@ -789,15 +794,14 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
grpc_error *error) {
- grpc_call_element *elem = ptr;
- call_data *calld = elem->call_data;
+ grpc_call_element *elem = (grpc_call_element *)ptr;
+ call_data *calld = (call_data *)elem->call_data;
if (error == GRPC_ERROR_NONE) {
start_new_rpc(exec_ctx, elem);
} else {
@@ -823,7 +827,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
grpc_transport *transport,
const void *transport_server_data) {
- channel_data *chand = cd;
+ channel_data *chand = (channel_data *)cd;
/* create a call */
grpc_call_create_args args;
memset(&args, 0, sizeof(args));
@@ -839,7 +843,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
GRPC_ERROR_UNREF(error);
return;
}
- call_data *calld = elem->call_data;
+ call_data *calld = (call_data *)elem->call_data;
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_INITIAL_METADATA;
@@ -853,7 +857,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
grpc_error *error) {
- channel_data *chand = cd;
+ channel_data *chand = (channel_data *)cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
grpc_transport_op *op = grpc_make_transport_op(NULL);
@@ -874,8 +878,8 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
+ call_data *calld = (call_data *)elem->call_data;
+ channel_data *chand = (channel_data *)elem->channel_data;
memset(calld, 0, sizeof(call_data));
calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
calld->call = grpc_call_from_top_element(elem);
@@ -892,8 +896,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
grpc_closure *ignored) {
- channel_data *chand = elem->channel_data;
- call_data *calld = elem->call_data;
+ channel_data *chand = (channel_data *)elem->channel_data;
+ call_data *calld = (call_data *)elem->call_data;
GPR_ASSERT(calld->state != PENDING);
@@ -914,7 +918,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
- channel_data *chand = elem->channel_data;
+ channel_data *chand = (channel_data *)elem->channel_data;
GPR_ASSERT(args->is_first);
GPR_ASSERT(!args->is_last);
chand->server = NULL;
@@ -931,7 +935,7 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
size_t i;
- channel_data *chand = elem->channel_data;
+ channel_data *chand = (channel_data *)elem->channel_data;
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
grpc_slice_unref_internal(exec_ctx, chand->registered_methods[i].method);
@@ -962,7 +966,6 @@ const grpc_channel_filter grpc_server_top_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server",
};
@@ -978,8 +981,8 @@ static void register_completion_queue(grpc_server *server,
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
- server->cqs = gpr_realloc(server->cqs,
- server->cq_count * sizeof(grpc_completion_queue *));
+ server->cqs = (grpc_completion_queue **)gpr_realloc(
+ server->cqs, server->cq_count * sizeof(grpc_completion_queue *));
server->cqs[n] = cq;
}
@@ -1004,7 +1007,7 @@ void grpc_server_register_completion_queue(grpc_server *server,
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
- grpc_server *server = gpr_zalloc(sizeof(grpc_server));
+ grpc_server *server = (grpc_server *)gpr_zalloc(sizeof(grpc_server));
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
@@ -1055,7 +1058,7 @@ void *grpc_server_register_method(
flags);
return NULL;
}
- m = gpr_zalloc(sizeof(registered_method));
+ m = (registered_method *)gpr_zalloc(sizeof(registered_method));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
@@ -1067,7 +1070,7 @@ void *grpc_server_register_method(
static void start_listeners(grpc_exec_ctx *exec_ctx, void *s,
grpc_error *error) {
- grpc_server *server = s;
+ grpc_server *server = (grpc_server *)s;
for (listener *l = server->listeners; l; l = l->next) {
l->start(exec_ctx, server, l->arg, server->pollsets, server->pollset_count);
}
@@ -1088,11 +1091,12 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
server->pollset_count = 0;
- server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
- server->request_freelist_per_cq =
- gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
- server->requested_calls_per_cq =
- gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
+ server->pollsets =
+ (grpc_pollset **)gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->request_freelist_per_cq = (gpr_stack_lockfree **)gpr_malloc(
+ sizeof(*server->request_freelist_per_cq) * server->cq_count);
+ server->requested_calls_per_cq = (requested_call **)gpr_malloc(
+ sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
@@ -1103,22 +1107,24 @@ void grpc_server_start(grpc_server *server) {
for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
}
- server->requested_calls_per_cq[i] =
- gpr_malloc((size_t)server->max_requested_calls_per_cq *
- sizeof(*server->requested_calls_per_cq[i]));
+ server->requested_calls_per_cq[i] = (requested_call *)gpr_malloc(
+ (size_t)server->max_requested_calls_per_cq *
+ sizeof(*server->requested_calls_per_cq[i]));
}
request_matcher_init(&server->unregistered_request_matcher,
(size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher,
+ request_matcher_init(&rm->matcher,
(size_t)server->max_requested_calls_per_cq, server);
}
server_ref(server);
server->starting = true;
- GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server,
- grpc_executor_scheduler),
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ &exec_ctx,
+ GRPC_CLOSURE_CREATE(start_listeners, server,
+ grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
+ GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -1173,7 +1179,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
if (num_registered_methods > 0) {
slots = 2 * num_registered_methods;
alloc = sizeof(channel_registered_method) * slots;
- chand->registered_methods = gpr_zalloc(alloc);
+ chand->registered_methods = (channel_registered_method *)gpr_zalloc(alloc);
for (rm = s->registered_methods; rm; rm = rm->next) {
grpc_slice host;
bool has_host;
@@ -1234,7 +1240,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s,
grpc_error *error) {
- grpc_server *server = s;
+ grpc_server *server = (grpc_server *)s;
gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;
maybe_finish_shutdown(exec_ctx, server);
@@ -1261,14 +1267,15 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* stay locked, and gather up some stuff to do */
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
- grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
- NULL, gpr_malloc(sizeof(grpc_cq_completion)));
+ grpc_cq_end_op(
+ &exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL,
+ (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion)));
gpr_mu_unlock(&server->mu_global);
goto done;
}
- server->shutdown_tags =
- gpr_realloc(server->shutdown_tags,
- sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
+ server->shutdown_tags = (shutdown_tag *)gpr_realloc(
+ server->shutdown_tags,
+ sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
sdt = &server->shutdown_tags[server->num_shutdown_tags++];
sdt->tag = tag;
sdt->cq = cq;
@@ -1351,7 +1358,7 @@ void grpc_server_add_listener(
grpc_pollset **pollsets, size_t pollset_count),
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
grpc_closure *on_done)) {
- listener *l = gpr_malloc(sizeof(listener));
+ listener *l = (listener *)gpr_malloc(sizeof(listener));
l->arg = arg;
l->start = start;
l->destroy = destroy;
@@ -1384,7 +1391,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &server->unregistered_request_matcher;
break;
case REGISTERED_CALL:
- rm = &rc->data.registered.registered_method->request_matcher;
+ rm = &rc->data.registered.method->matcher;
break;
}
server->requested_calls_per_cq[cq_idx][request_id] = *rc;
@@ -1428,7 +1435,8 @@ grpc_call_error grpc_server_request_call(
grpc_completion_queue *cq_for_notification, void *tag) {
grpc_call_error error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- requested_call *rc = gpr_malloc(sizeof(*rc));
+ requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc));
+ GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx);
GRPC_API_TRACE(
"grpc_server_request_call("
"server=%p, call=%p, details=%p, initial_metadata=%p, "
@@ -1473,8 +1481,9 @@ grpc_call_error grpc_server_request_registered_call(
grpc_completion_queue *cq_for_notification, void *tag) {
grpc_call_error error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- requested_call *rc = gpr_malloc(sizeof(*rc));
- registered_method *rm = rmp;
+ requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc));
+ registered_method *rm = (registered_method *)rmp;
+ GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx);
GRPC_API_TRACE(
"grpc_server_request_registered_call("
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
@@ -1511,7 +1520,7 @@ grpc_call_error grpc_server_request_registered_call(
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
rc->call = call;
- rc->data.registered.registered_method = rm;
+ rc->data.registered.method = rm;
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index 96c16105e7..fd6ea4daa9 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -21,6 +21,6 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "4.0.0-dev"; }
+const char *grpc_version_string(void) { return "5.0.0-dev"; }
const char *grpc_g_stands_for(void) { return "gambit"; }