aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/alarm.cc38
-rw-r--r--src/core/lib/surface/byte_buffer.cc24
-rw-r--r--src/core/lib/surface/byte_buffer_reader.cc18
-rw-r--r--src/core/lib/surface/call.cc480
-rw-r--r--src/core/lib/surface/call.h56
-rw-r--r--src/core/lib/surface/call_log_batch.cc16
-rw-r--r--src/core/lib/surface/call_test_only.h10
-rw-r--r--src/core/lib/surface/channel.cc123
-rw-r--r--src/core/lib/surface/channel.h36
-rw-r--r--src/core/lib/surface/channel_init.cc22
-rw-r--r--src/core/lib/surface/channel_init.h8
-rw-r--r--src/core/lib/surface/channel_ping.cc20
-rw-r--r--src/core/lib/surface/channel_stack_type.cc2
-rw-r--r--src/core/lib/surface/channel_stack_type.h2
-rw-r--r--src/core/lib/surface/completion_queue.cc515
-rw-r--r--src/core/lib/surface/completion_queue.h39
-rw-r--r--src/core/lib/surface/event_string.cc14
-rw-r--r--src/core/lib/surface/event_string.h2
-rw-r--r--src/core/lib/surface/init.cc15
-rw-r--r--src/core/lib/surface/init_secure.cc8
-rw-r--r--src/core/lib/surface/lame_client.cc52
-rw-r--r--src/core/lib/surface/server.cc615
-rw-r--r--src/core/lib/surface/server.h24
-rw-r--r--src/core/lib/surface/validate_metadata.cc18
-rw-r--r--src/core/lib/surface/validate_metadata.h4
-rw-r--r--src/core/lib/surface/version.cc4
26 files changed, 1084 insertions, 1081 deletions
diff --git a/src/core/lib/surface/alarm.cc b/src/core/lib/surface/alarm.cc
index 4f3bc5172d..395ffd393c 100644
--- a/src/core/lib/surface/alarm.cc
+++ b/src/core/lib/surface/alarm.cc
@@ -38,14 +38,14 @@ struct grpc_alarm {
grpc_closure on_alarm;
grpc_cq_completion completion;
/** completion queue where events about this alarm will be posted */
- grpc_completion_queue *cq;
+ grpc_completion_queue* cq;
/** user supplied tag */
- void *tag;
+ void* tag;
};
-static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); }
+static void alarm_ref(grpc_alarm* alarm) { gpr_ref(&alarm->refs); }
-static void alarm_unref(grpc_alarm *alarm) {
+static void alarm_unref(grpc_alarm* alarm) {
if (gpr_unref(&alarm->refs)) {
ExecCtx _local_exec_ctx;
if (alarm->cq != NULL) {
@@ -57,8 +57,8 @@ static void alarm_unref(grpc_alarm *alarm) {
}
#ifndef NDEBUG
-static void alarm_ref_dbg(grpc_alarm *alarm, const char *reason,
- const char *file, int line) {
+static void alarm_ref_dbg(grpc_alarm* alarm, const char* reason,
+ const char* file, int line) {
if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -69,8 +69,8 @@ static void alarm_ref_dbg(grpc_alarm *alarm, const char *reason,
alarm_ref(alarm);
}
-static void alarm_unref_dbg(grpc_alarm *alarm, const char *reason,
- const char *file, int line) {
+static void alarm_unref_dbg(grpc_alarm* alarm, const char* reason,
+ const char* file, int line) {
if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -82,24 +82,24 @@ static void alarm_unref_dbg(grpc_alarm *alarm, const char *reason,
}
#endif
-static void alarm_end_completion(void *arg, grpc_cq_completion *c) {
- grpc_alarm *alarm = (grpc_alarm *)arg;
+static void alarm_end_completion(void* arg, grpc_cq_completion* c) {
+ grpc_alarm* alarm = (grpc_alarm*)arg;
GRPC_ALARM_UNREF(alarm, "dequeue-end-op");
}
-static void alarm_cb(void *arg, grpc_error *error) {
- grpc_alarm *alarm = (grpc_alarm *)arg;
+static void alarm_cb(void* arg, grpc_error* error) {
+ grpc_alarm* alarm = (grpc_alarm*)arg;
/* We are queuing an op on completion queue. This means, the alarm's structure
cannot be destroyed until the op is dequeued. Adding an extra ref
here and unref'ing when the op is dequeued will achieve this */
GRPC_ALARM_REF(alarm, "queue-end-op");
grpc_cq_end_op(alarm->cq, alarm->tag, error, alarm_end_completion,
- (void *)alarm, &alarm->completion);
+ (void*)alarm, &alarm->completion);
}
-grpc_alarm *grpc_alarm_create(void *reserved) {
- grpc_alarm *alarm = (grpc_alarm *)gpr_malloc(sizeof(grpc_alarm));
+grpc_alarm* grpc_alarm_create(void* reserved) {
+ grpc_alarm* alarm = (grpc_alarm*)gpr_malloc(sizeof(grpc_alarm));
#ifndef NDEBUG
if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
@@ -115,8 +115,8 @@ grpc_alarm *grpc_alarm_create(void *reserved) {
return alarm;
}
-void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq,
- gpr_timespec deadline, void *tag, void *reserved) {
+void grpc_alarm_set(grpc_alarm* alarm, grpc_completion_queue* cq,
+ gpr_timespec deadline, void* tag, void* reserved) {
ExecCtx _local_exec_ctx;
GRPC_CQ_INTERNAL_REF(cq, "alarm");
@@ -129,13 +129,13 @@ void grpc_alarm_set(grpc_alarm *alarm, grpc_completion_queue *cq,
grpc_exec_ctx_finish();
}
-void grpc_alarm_cancel(grpc_alarm *alarm, void *reserved) {
+void grpc_alarm_cancel(grpc_alarm* alarm, void* reserved) {
ExecCtx _local_exec_ctx;
grpc_timer_cancel(&alarm->alarm);
grpc_exec_ctx_finish();
}
-void grpc_alarm_destroy(grpc_alarm *alarm, void *reserved) {
+void grpc_alarm_destroy(grpc_alarm* alarm, void* reserved) {
grpc_alarm_cancel(alarm, reserved);
GRPC_ALARM_UNREF(alarm, "alarm_destroy");
}
diff --git a/src/core/lib/surface/byte_buffer.cc b/src/core/lib/surface/byte_buffer.cc
index 5123189671..f3c10797f3 100644
--- a/src/core/lib/surface/byte_buffer.cc
+++ b/src/core/lib/surface/byte_buffer.cc
@@ -22,18 +22,18 @@
#include "src/core/lib/slice/slice_internal.h"
-grpc_byte_buffer *grpc_raw_byte_buffer_create(grpc_slice *slices,
+grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slices,
size_t nslices) {
return grpc_raw_compressed_byte_buffer_create(slices, nslices,
GRPC_COMPRESS_NONE);
}
-grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
- grpc_slice *slices, size_t nslices,
+grpc_byte_buffer* grpc_raw_compressed_byte_buffer_create(
+ grpc_slice* slices, size_t nslices,
grpc_compression_algorithm compression) {
size_t i;
- grpc_byte_buffer *bb =
- (grpc_byte_buffer *)gpr_malloc(sizeof(grpc_byte_buffer));
+ grpc_byte_buffer* bb =
+ (grpc_byte_buffer*)gpr_malloc(sizeof(grpc_byte_buffer));
bb->type = GRPC_BB_RAW;
bb->data.raw.compression = compression;
grpc_slice_buffer_init(&bb->data.raw.slice_buffer);
@@ -44,10 +44,10 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
return bb;
}
-grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
- grpc_byte_buffer_reader *reader) {
- grpc_byte_buffer *bb =
- (grpc_byte_buffer *)gpr_malloc(sizeof(grpc_byte_buffer));
+grpc_byte_buffer* grpc_raw_byte_buffer_from_reader(
+ grpc_byte_buffer_reader* reader) {
+ grpc_byte_buffer* bb =
+ (grpc_byte_buffer*)gpr_malloc(sizeof(grpc_byte_buffer));
grpc_slice slice;
bb->type = GRPC_BB_RAW;
bb->data.raw.compression = GRPC_COMPRESS_NONE;
@@ -59,7 +59,7 @@ grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
return bb;
}
-grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
+grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) {
switch (bb->type) {
case GRPC_BB_RAW:
return grpc_raw_compressed_byte_buffer_create(
@@ -69,7 +69,7 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
GPR_UNREACHABLE_CODE(return NULL);
}
-void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
+void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
if (!bb) return;
ExecCtx _local_exec_ctx;
switch (bb->type) {
@@ -81,7 +81,7 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
grpc_exec_ctx_finish();
}
-size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) {
+size_t grpc_byte_buffer_length(grpc_byte_buffer* bb) {
switch (bb->type) {
case GRPC_BB_RAW:
return bb->data.raw.slice_buffer.length;
diff --git a/src/core/lib/surface/byte_buffer_reader.cc b/src/core/lib/surface/byte_buffer_reader.cc
index 5f5596c803..fb66829baa 100644
--- a/src/core/lib/surface/byte_buffer_reader.cc
+++ b/src/core/lib/surface/byte_buffer_reader.cc
@@ -29,7 +29,7 @@
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/slice/slice_internal.h"
-static int is_compressed(grpc_byte_buffer *buffer) {
+static int is_compressed(grpc_byte_buffer* buffer) {
switch (buffer->type) {
case GRPC_BB_RAW:
if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) {
@@ -40,8 +40,8 @@ static int is_compressed(grpc_byte_buffer *buffer) {
return 1 /* GPR_TRUE */;
}
-int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
- grpc_byte_buffer *buffer) {
+int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
+ grpc_byte_buffer* buffer) {
ExecCtx _local_exec_ctx;
grpc_slice_buffer decompressed_slices_buffer;
reader->buffer_in = buffer;
@@ -74,7 +74,7 @@ int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
return 1;
}
-void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
+void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) {
switch (reader->buffer_in->type) {
case GRPC_BB_RAW:
/* keeping the same if-else structure as in the init function */
@@ -85,11 +85,11 @@ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
}
}
-int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
- grpc_slice *slice) {
+int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
+ grpc_slice* slice) {
switch (reader->buffer_in->type) {
case GRPC_BB_RAW: {
- grpc_slice_buffer *slice_buffer;
+ grpc_slice_buffer* slice_buffer;
slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
if (reader->current.index < slice_buffer->count) {
*slice = grpc_slice_ref_internal(
@@ -103,12 +103,12 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
return 0;
}
-grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) {
+grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader* reader) {
grpc_slice in_slice;
size_t bytes_read = 0;
const size_t input_size = grpc_byte_buffer_length(reader->buffer_out);
grpc_slice out_slice = GRPC_SLICE_MALLOC(input_size);
- uint8_t *const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */
+ uint8_t* const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */
ExecCtx _local_exec_ctx;
while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) {
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 26002056ea..5e1c0badd0 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -86,7 +86,7 @@ typedef enum {
typedef struct {
bool is_set;
- grpc_error *error;
+ grpc_error* error;
} received_status;
static gpr_atm pack_received_status(received_status r) {
@@ -97,14 +97,14 @@ static received_status unpack_received_status(gpr_atm atm) {
if ((atm & 1) == 0) {
return {false, GRPC_ERROR_NONE};
} else {
- return {true, (grpc_error *)(atm & ~(gpr_atm)1)};
+ return {true, (grpc_error*)(atm & ~(gpr_atm)1)};
}
}
#define MAX_ERRORS_PER_BATCH 4
typedef struct batch_control {
- grpc_call *call;
+ grpc_call* call;
/* Share memory for cq_completion and notify_tag as they are never needed
simultaneously. Each byte used in this data structure count as six bytes
per call, so any savings we can make are worthwhile,
@@ -120,7 +120,7 @@ typedef struct batch_control {
\a is_closure is true, \a tag indicates a closure to be invoked;
otherwise, \a tag indicates the tag to be used in the notification to
be sent to the completion queue. */
- void *tag;
+ void* tag;
bool is_closure;
} notify_tag;
} completion_data;
@@ -128,7 +128,7 @@ typedef struct batch_control {
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
- grpc_error *errors[MAX_ERRORS_PER_BATCH];
+ grpc_error* errors[MAX_ERRORS_PER_BATCH];
gpr_atm num_errors;
grpc_transport_stream_op_batch op;
@@ -136,16 +136,16 @@ typedef struct batch_control {
typedef struct {
gpr_mu child_list_mu;
- grpc_call *first_child;
+ grpc_call* first_child;
} parent_call;
typedef struct {
- grpc_call *parent;
+ grpc_call* parent;
/** siblings: children of the same parent form a list, and this list is
protected under
parent->mu */
- grpc_call *sibling_next;
- grpc_call *sibling_prev;
+ grpc_call* sibling_next;
+ grpc_call* sibling_prev;
} child_call;
#define RECV_NONE ((gpr_atm)0)
@@ -153,14 +153,14 @@ typedef struct {
struct grpc_call {
gpr_refcount ext_ref;
- gpr_arena *arena;
+ gpr_arena* arena;
grpc_call_combiner call_combiner;
- grpc_completion_queue *cq;
+ grpc_completion_queue* cq;
grpc_polling_entity pollent;
- grpc_channel *channel;
+ grpc_channel* channel;
gpr_timespec start_time;
/* parent_call* */ gpr_atm parent_call_atm;
- child_call *child;
+ child_call* child;
/* client or server call */
bool is_client;
@@ -178,7 +178,7 @@ struct grpc_call {
gpr_atm any_ops_sent_atm;
gpr_atm received_final_op_atm;
- batch_control *active_batches[MAX_CONCURRENT_BATCHES];
+ batch_control* active_batches[MAX_CONCURRENT_BATCHES];
grpc_transport_stream_op_batch_payload stream_op_payload;
/* first idx: is_receiving, second idx: is_trailing */
@@ -186,7 +186,7 @@ struct grpc_call {
/* Buffered read metadata waiting to be returned to the application.
Element 0 is initial metadata, element 1 is trailing metadata. */
- grpc_metadata_array *buffered_metadata[2];
+ grpc_metadata_array* buffered_metadata[2];
grpc_metadata compression_md;
@@ -220,8 +220,8 @@ struct grpc_call {
grpc_slice_buffer_stream sending_stream;
- grpc_byte_stream *receiving_stream;
- grpc_byte_buffer **receiving_buffer;
+ grpc_byte_stream* receiving_stream;
+ grpc_byte_buffer** receiving_buffer;
grpc_slice receiving_slice;
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
@@ -232,11 +232,11 @@ struct grpc_call {
union {
struct {
- grpc_status_code *status;
- grpc_slice *status_details;
+ grpc_status_code* status;
+ grpc_slice* status_details;
} client;
struct {
- int *cancelled;
+ int* cancelled;
} server;
} final_op;
@@ -264,74 +264,74 @@ grpc_tracer_flag grpc_call_error_trace =
grpc_tracer_flag grpc_compression_trace =
GRPC_TRACER_INITIALIZER(false, "compression");
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
-#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1))
+#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_batch(grpc_call *call, grpc_transport_stream_op_batch *op,
- grpc_closure *start_batch_closure);
-static void cancel_with_status(grpc_call *c, status_source source,
+static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
+ grpc_closure* start_batch_closure);
+static void cancel_with_status(grpc_call* c, status_source source,
grpc_status_code status,
- const char *description);
-static void cancel_with_error(grpc_call *c, status_source source,
- grpc_error *error);
-static void destroy_call(void *call_stack, grpc_error *error);
-static void receiving_slice_ready(void *bctlp, grpc_error *error);
-static void get_final_status(grpc_call *call,
+ const char* description);
+static void cancel_with_error(grpc_call* c, status_source source,
+ grpc_error* error);
+static void destroy_call(void* call_stack, grpc_error* error);
+static void receiving_slice_ready(void* bctlp, grpc_error* error);
+static void get_final_status(grpc_call* call,
void (*set_value)(grpc_status_code code,
- void *user_data),
- void *set_value_user_data, grpc_slice *details);
-static void set_status_value_directly(grpc_status_code status, void *dest);
-static void set_status_from_error(grpc_call *call, status_source source,
- grpc_error *error);
-static void process_data_after_md(batch_control *bctl);
-static void post_batch_completion(batch_control *bctl);
-static void add_batch_error(batch_control *bctl, grpc_error *error,
+ void* user_data),
+ void* set_value_user_data, grpc_slice* details);
+static void set_status_value_directly(grpc_status_code status, void* dest);
+static void set_status_from_error(grpc_call* call, status_source source,
+ grpc_error* error);
+static void process_data_after_md(batch_control* bctl);
+static void post_batch_completion(batch_control* bctl);
+static void add_batch_error(batch_control* bctl, grpc_error* error,
bool has_cancelled);
-static void add_init_error(grpc_error **composite, grpc_error *new_err) {
+static void add_init_error(grpc_error** composite, grpc_error* new_err) {
if (new_err == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE)
*composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
*composite = grpc_error_add_child(*composite, new_err);
}
-void *grpc_call_arena_alloc(grpc_call *call, size_t size) {
+void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
return gpr_arena_alloc(call->arena, size);
}
-static parent_call *get_or_create_parent_call(grpc_call *call) {
- parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+static parent_call* get_or_create_parent_call(grpc_call* call) {
+ parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
if (p == NULL) {
- p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p));
+ p = (parent_call*)gpr_arena_alloc(call->arena, sizeof(*p));
gpr_mu_init(&p->child_list_mu);
if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) {
gpr_mu_destroy(&p->child_list_mu);
- p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+ p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
}
return p;
}
-static parent_call *get_parent_call(grpc_call *call) {
- return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm);
+static parent_call* get_parent_call(grpc_call* call) {
+ return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
}
-grpc_error *grpc_call_create(const grpc_call_create_args *args,
- grpc_call **out_call) {
+grpc_error* grpc_call_create(const grpc_call_create_args* args,
+ grpc_call** out_call) {
size_t i, j;
- grpc_error *error = GRPC_ERROR_NONE;
- grpc_channel_stack *channel_stack =
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_channel_stack* channel_stack =
grpc_channel_get_channel_stack(args->channel);
- grpc_call *call;
+ grpc_call* call;
GPR_TIMER_BEGIN("grpc_call_create", 0);
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
- gpr_arena *arena = gpr_arena_create(initial_size);
- call = (grpc_call *)gpr_arena_alloc(
+ gpr_arena* arena = gpr_arena_create(initial_size);
+ call = (grpc_call*)gpr_arena_alloc(
arena, sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
@@ -376,15 +376,15 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
bool immediately_cancel = false;
if (args->parent != NULL) {
- child_call *cc = call->child =
- (child_call *)gpr_arena_alloc(arena, sizeof(child_call));
+ child_call* cc = call->child =
+ (child_call*)gpr_arena_alloc(arena, sizeof(child_call));
call->child->parent = args->parent;
GRPC_CALL_INTERNAL_REF(args->parent, "child");
GPR_ASSERT(call->is_client);
GPR_ASSERT(!args->parent->is_client);
- parent_call *pc = get_or_create_parent_call(args->parent);
+ parent_call* pc = get_or_create_parent_call(args->parent);
gpr_mu_lock(&pc->child_list_mu);
@@ -472,8 +472,8 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
return error;
}
-void grpc_call_set_completion_queue(grpc_call *call,
- grpc_completion_queue *cq) {
+void grpc_call_set_completion_queue(grpc_call* call,
+ grpc_completion_queue* cq) {
GPR_ASSERT(cq);
if (grpc_polling_entity_pollset_set(&call->pollent) != NULL) {
@@ -489,32 +489,32 @@ void grpc_call_set_completion_queue(grpc_call *call,
#ifndef NDEBUG
#define REF_REASON reason
-#define REF_ARG , const char *reason
+#define REF_ARG , const char* reason
#else
#define REF_REASON ""
#define REF_ARG
#endif
-void grpc_call_internal_ref(grpc_call *c REF_ARG) {
+void grpc_call_internal_ref(grpc_call* c REF_ARG) {
GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
}
-void grpc_call_internal_unref(grpc_call *c REF_ARG) {
+void grpc_call_internal_unref(grpc_call* c REF_ARG) {
GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
}
-static void release_call(void *call, grpc_error *error) {
- grpc_call *c = (grpc_call *)call;
- grpc_channel *channel = c->channel;
+static void release_call(void* call, grpc_error* error) {
+ grpc_call* c = (grpc_call*)call;
+ grpc_channel* channel = c->channel;
grpc_call_combiner_destroy(&c->call_combiner);
- gpr_free((char *)c->peer_string);
+ gpr_free((char*)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
}
-static void set_status_value_directly(grpc_status_code status, void *dest);
-static void destroy_call(void *call, grpc_error *error) {
+static void set_status_value_directly(grpc_status_code status, void* dest);
+static void destroy_call(void* call, grpc_error* error) {
size_t i;
int ii;
- grpc_call *c = (grpc_call *)call;
+ grpc_call* c = (grpc_call*)call;
GPR_TIMER_BEGIN("destroy_call", 0);
for (i = 0; i < 2; i++) {
grpc_metadata_batch_destroy(
@@ -523,7 +523,7 @@ static void destroy_call(void *call, grpc_error *error) {
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(c->receiving_stream);
}
- parent_call *pc = get_parent_call(c);
+ parent_call* pc = get_parent_call(c);
if (pc != NULL) {
gpr_mu_destroy(&pc->child_list_mu);
}
@@ -555,19 +555,19 @@ static void destroy_call(void *call, grpc_error *error) {
GPR_TIMER_END("destroy_call", 0);
}
-void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); }
+void grpc_call_ref(grpc_call* c) { gpr_ref(&c->ext_ref); }
-void grpc_call_unref(grpc_call *c) {
+void grpc_call_unref(grpc_call* c) {
if (!gpr_unref(&c->ext_ref)) return;
- child_call *cc = c->child;
+ child_call* cc = c->child;
ExecCtx _local_exec_ctx;
GPR_TIMER_BEGIN("grpc_call_unref", 0);
GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
if (cc) {
- parent_call *pc = get_parent_call(cc->parent);
+ parent_call* pc = get_parent_call(cc->parent);
gpr_mu_lock(&pc->child_list_mu);
if (c == pc->first_child) {
pc->first_child = cc->sibling_next;
@@ -599,7 +599,7 @@ void grpc_call_unref(grpc_call *c) {
GPR_TIMER_END("grpc_call_unref", 0);
}
-grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
+grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
ExecCtx _local_exec_ctx;
@@ -610,11 +610,11 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
// This is called via the call combiner to start sending a batch down
// the filter stack.
-static void execute_batch_in_call_combiner(void *arg, grpc_error *ignored) {
- grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
- grpc_call *call = (grpc_call *)batch->handler_private.extra_arg;
+static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
+ grpc_transport_stream_op_batch* batch = (grpc_transport_stream_op_batch*)arg;
+ grpc_call* call = (grpc_call*)batch->handler_private.extra_arg;
GPR_TIMER_BEGIN("execute_batch", 0);
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
elem->filter->start_transport_stream_op_batch(elem, batch);
GPR_TIMER_END("execute_batch", 0);
@@ -622,9 +622,9 @@ static void execute_batch_in_call_combiner(void *arg, grpc_error *ignored) {
// start_batch_closure points to a caller-allocated closure to be used
// for entering the call combiner.
-static void execute_batch(grpc_call *call,
- grpc_transport_stream_op_batch *batch,
- grpc_closure *start_batch_closure) {
+static void execute_batch(grpc_call* call,
+ grpc_transport_stream_op_batch* batch,
+ grpc_closure* start_batch_closure) {
batch->handler_private.extra_arg = call;
GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
@@ -632,15 +632,15 @@ static void execute_batch(grpc_call *call,
GRPC_ERROR_NONE, "executing batch");
}
-char *grpc_call_get_peer(grpc_call *call) {
- char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
+char* grpc_call_get_peer(grpc_call* call) {
+ char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
if (peer_string != NULL) return gpr_strdup(peer_string);
peer_string = grpc_channel_get_target(call->channel);
if (peer_string != NULL) return peer_string;
return gpr_strdup("unknown");
}
-grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
+grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
return CALL_FROM_TOP_ELEM(elem);
}
@@ -648,10 +648,10 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
* CANCELLATION
*/
-grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
+grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
grpc_status_code status,
- const char *description,
- void *reserved) {
+ const char* description,
+ void* reserved) {
ExecCtx _local_exec_ctx;
GRPC_API_TRACE(
"grpc_call_cancel_with_status("
@@ -664,23 +664,23 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
}
typedef struct {
- grpc_call *call;
+ grpc_call* call;
grpc_closure start_batch;
grpc_closure finish_batch;
} cancel_state;
// The on_complete callback used when sending a cancel_stream batch down
// the filter stack. Yields the call combiner when the batch is done.
-static void done_termination(void *arg, grpc_error *error) {
- cancel_state *state = (cancel_state *)arg;
+static void done_termination(void* arg, grpc_error* error) {
+ cancel_state* state = (cancel_state*)arg;
GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
"on_complete for cancel_stream op");
GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
gpr_free(state);
}
-static void cancel_with_error(grpc_call *c, status_source source,
- grpc_error *error) {
+static void cancel_with_error(grpc_call* c, status_source source,
+ grpc_error* error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
@@ -688,19 +688,19 @@ static void cancel_with_error(grpc_call *c, status_source source,
// down the filter stack in a timely manner.
grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(c, source, GRPC_ERROR_REF(error));
- cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
+ cancel_state* state = (cancel_state*)gpr_malloc(sizeof(*state));
state->call = c;
GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
grpc_schedule_on_exec_ctx);
- grpc_transport_stream_op_batch *op =
+ grpc_transport_stream_op_batch* op =
grpc_make_transport_stream_op(&state->finish_batch);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
execute_batch(c, op, &state->start_batch);
}
-static grpc_error *error_from_status(grpc_status_code status,
- const char *description) {
+static grpc_error* error_from_status(grpc_status_code status,
+ const char* description) {
// copying 'description' is needed to ensure the grpc_call_cancel_with_status
// guarantee that can be short-lived.
return grpc_error_set_int(
@@ -710,9 +710,9 @@ static grpc_error *error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status);
}
-static void cancel_with_status(grpc_call *c, status_source source,
+static void cancel_with_status(grpc_call* c, status_source source,
grpc_status_code status,
- const char *description) {
+ const char* description) {
cancel_with_error(c, source, error_from_status(status, description));
}
@@ -721,9 +721,9 @@ static void cancel_with_status(grpc_call *c, status_source source,
*/
static bool get_final_status_from(
- grpc_call *call, grpc_error *error, bool allow_ok_status,
- void (*set_value)(grpc_status_code code, void *user_data),
- void *set_value_user_data, grpc_slice *details) {
+ grpc_call* call, grpc_error* error, bool allow_ok_status,
+ void (*set_value)(grpc_status_code code, void* user_data),
+ void* set_value_user_data, grpc_slice* details) {
grpc_status_code code;
grpc_slice slice = grpc_empty_slice();
grpc_error_get_status(error, call->send_deadline, &code, &slice, NULL);
@@ -738,10 +738,10 @@ static bool get_final_status_from(
return true;
}
-static void get_final_status(grpc_call *call,
+static void get_final_status(grpc_call* call,
void (*set_value)(grpc_status_code code,
- void *user_data),
- void *set_value_user_data, grpc_slice *details) {
+ void* user_data),
+ void* set_value_user_data, grpc_slice* details) {
int i;
received_status status[STATUS_SOURCE_COUNT];
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
@@ -787,8 +787,8 @@ static void get_final_status(grpc_call *call,
}
}
-static void set_status_from_error(grpc_call *call, status_source source,
- grpc_error *error) {
+static void set_status_from_error(grpc_call* call, status_source source,
+ grpc_error* error) {
if (!gpr_atm_rel_cas(&call->status[source],
pack_received_status({false, GRPC_ERROR_NONE}),
pack_received_status({true, error}))) {
@@ -801,51 +801,51 @@ static void set_status_from_error(grpc_call *call, status_source source,
*/
static void set_incoming_compression_algorithm(
- grpc_call *call, grpc_compression_algorithm algo) {
+ grpc_call* call, grpc_compression_algorithm algo) {
GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT);
call->incoming_compression_algorithm = algo;
}
static void set_incoming_stream_compression_algorithm(
- grpc_call *call, grpc_stream_compression_algorithm algo) {
+ grpc_call* call, grpc_stream_compression_algorithm algo) {
GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
call->incoming_stream_compression_algorithm = algo;
}
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
- grpc_call *call) {
+ grpc_call* call) {
grpc_compression_algorithm algorithm;
algorithm = call->incoming_compression_algorithm;
return algorithm;
}
static grpc_compression_algorithm compression_algorithm_for_level_locked(
- grpc_call *call, grpc_compression_level level) {
+ grpc_call* call, grpc_compression_level level) {
return grpc_compression_algorithm_for_level(level,
call->encodings_accepted_by_peer);
}
static grpc_stream_compression_algorithm
stream_compression_algorithm_for_level_locked(
- grpc_call *call, grpc_stream_compression_level level) {
+ grpc_call* call, grpc_stream_compression_level level) {
return grpc_stream_compression_algorithm_for_level(
level, call->stream_encodings_accepted_by_peer);
}
-uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
+uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
uint32_t flags;
flags = call->test_only_last_message_flags;
return flags;
}
-static void destroy_encodings_accepted_by_peer(void *p) { return; }
+static void destroy_encodings_accepted_by_peer(void* p) { return; }
-static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem mdel) {
+static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel) {
size_t i;
grpc_compression_algorithm algorithm;
grpc_slice_buffer accept_encoding_parts;
grpc_slice accept_encoding_slice;
- void *accepted_user_data;
+ void* accepted_user_data;
accepted_user_data =
grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
@@ -869,7 +869,7 @@ static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem mdel) {
&algorithm)) {
GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
} else {
- char *accept_encoding_entry_str =
+ char* accept_encoding_entry_str =
grpc_slice_to_c_string(accept_encoding_entry_slice);
gpr_log(GPR_ERROR,
"Invalid entry in accept encoding metadata: '%s'. Ignoring.",
@@ -882,16 +882,16 @@ static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem mdel) {
grpc_mdelem_set_user_data(
mdel, destroy_encodings_accepted_by_peer,
- (void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
+ (void*)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
}
-static void set_stream_encodings_accepted_by_peer(grpc_call *call,
+static void set_stream_encodings_accepted_by_peer(grpc_call* call,
grpc_mdelem mdel) {
size_t i;
grpc_stream_compression_algorithm algorithm;
grpc_slice_buffer accept_encoding_parts;
grpc_slice accept_encoding_slice;
- void *accepted_user_data;
+ void* accepted_user_data;
accepted_user_data =
grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
@@ -914,7 +914,7 @@ static void set_stream_encodings_accepted_by_peer(grpc_call *call,
&algorithm)) {
GPR_BITSET(&call->stream_encodings_accepted_by_peer, algorithm);
} else {
- char *accept_encoding_entry_str =
+ char* accept_encoding_entry_str =
grpc_slice_to_c_string(accept_encoding_entry_slice);
gpr_log(GPR_ERROR,
"Invalid entry in accept encoding metadata: '%s'. Ignoring.",
@@ -927,54 +927,54 @@ static void set_stream_encodings_accepted_by_peer(grpc_call *call,
grpc_mdelem_set_user_data(
mdel, destroy_encodings_accepted_by_peer,
- (void *)(((uintptr_t)call->stream_encodings_accepted_by_peer) + 1));
+ (void*)(((uintptr_t)call->stream_encodings_accepted_by_peer) + 1));
}
-uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
+uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
uint32_t encodings_accepted_by_peer;
encodings_accepted_by_peer = call->encodings_accepted_by_peer;
return encodings_accepted_by_peer;
}
uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
- grpc_call *call) {
+ grpc_call* call) {
uint32_t stream_encodings_accepted_by_peer;
stream_encodings_accepted_by_peer = call->stream_encodings_accepted_by_peer;
return stream_encodings_accepted_by_peer;
}
grpc_stream_compression_algorithm
-grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) {
+grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
return call->incoming_stream_compression_algorithm;
}
-static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) {
- return (grpc_linked_mdelem *)&md->internal_data;
+static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
+ return (grpc_linked_mdelem*)&md->internal_data;
}
-static grpc_metadata *get_md_elem(grpc_metadata *metadata,
- grpc_metadata *additional_metadata, int i,
+static grpc_metadata* get_md_elem(grpc_metadata* metadata,
+ grpc_metadata* additional_metadata, int i,
int count) {
- grpc_metadata *res =
+ grpc_metadata* res =
i < count ? &metadata[i] : &additional_metadata[i - count];
GPR_ASSERT(res);
return res;
}
-static int prepare_application_metadata(grpc_call *call, int count,
- grpc_metadata *metadata,
+static int prepare_application_metadata(grpc_call* call, int count,
+ grpc_metadata* metadata,
int is_trailing,
int prepend_extra_metadata,
- grpc_metadata *additional_metadata,
+ grpc_metadata* additional_metadata,
int additional_metadata_count) {
int total_count = count + additional_metadata_count;
int i;
- grpc_metadata_batch *batch =
+ grpc_metadata_batch* batch =
&call->metadata_batch[0 /* is_receiving */][is_trailing];
for (i = 0; i < total_count; i++) {
- const grpc_metadata *md =
+ const grpc_metadata* md =
get_md_elem(metadata, additional_metadata, i, count);
- grpc_linked_mdelem *l = linked_from_md(md);
+ grpc_linked_mdelem* l = linked_from_md(md);
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
if (!GRPC_LOG_IF_ERROR("validate_metadata",
grpc_validate_header_key_is_legal(md->key))) {
@@ -985,13 +985,13 @@ static int prepare_application_metadata(grpc_call *call, int count,
grpc_validate_header_nonbin_value_is_legal(md->value))) {
break;
}
- l->md = grpc_mdelem_from_grpc_metadata((grpc_metadata *)md);
+ l->md = grpc_mdelem_from_grpc_metadata((grpc_metadata*)md);
}
if (i != total_count) {
for (int j = 0; j < i; j++) {
- const grpc_metadata *md =
+ const grpc_metadata* md =
get_md_elem(metadata, additional_metadata, j, count);
- grpc_linked_mdelem *l = linked_from_md(md);
+ grpc_linked_mdelem* l = linked_from_md(md);
GRPC_MDELEM_UNREF(l->md);
}
return 0;
@@ -1008,9 +1008,9 @@ static int prepare_application_metadata(grpc_call *call, int count,
}
}
for (i = 0; i < total_count; i++) {
- grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
- grpc_linked_mdelem *l = linked_from_md(md);
- grpc_error *error = grpc_metadata_batch_link_tail(batch, l);
+ grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
+ grpc_linked_mdelem* l = linked_from_md(md);
+ grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
if (error != GRPC_ERROR_NONE) {
GRPC_MDELEM_UNREF(l->md);
}
@@ -1025,11 +1025,11 @@ static int prepare_application_metadata(grpc_call *call, int count,
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
#define STATUS_OFFSET 1
-static void destroy_status(void *ignored) {}
+static void destroy_status(void* ignored) {}
static uint32_t decode_status(grpc_mdelem md) {
uint32_t status;
- void *user_data;
+ void* user_data;
if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) return 0;
if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_1)) return 1;
if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_2)) return 2;
@@ -1041,7 +1041,7 @@ static uint32_t decode_status(grpc_mdelem md) {
status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
}
grpc_mdelem_set_user_data(md, destroy_status,
- (void *)(intptr_t)(status + STATUS_OFFSET));
+ (void*)(intptr_t)(status + STATUS_OFFSET));
}
return status;
}
@@ -1050,7 +1050,7 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
grpc_compression_algorithm algorithm =
grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md));
if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
- char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
"Invalid incoming compression algorithm: '%s'. Interpreting "
"incoming data as uncompressed.",
@@ -1066,7 +1066,7 @@ static grpc_stream_compression_algorithm decode_stream_compression(
grpc_stream_compression_algorithm algorithm =
grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
- char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
gpr_log(GPR_ERROR,
"Invalid incoming stream compression algorithm: '%s'. Interpreting "
"incoming data as uncompressed.",
@@ -1077,20 +1077,20 @@ static grpc_stream_compression_algorithm decode_stream_compression(
return algorithm;
}
-static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
+static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
int is_trailing) {
if (b->list.count == 0) return;
GPR_TIMER_BEGIN("publish_app_metadata", 0);
- grpc_metadata_array *dest;
- grpc_metadata *mdusr;
+ grpc_metadata_array* dest;
+ grpc_metadata* mdusr;
dest = call->buffered_metadata[is_trailing];
if (dest->count + b->list.count > dest->capacity) {
dest->capacity =
GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
- dest->metadata = (grpc_metadata *)gpr_realloc(
+ dest->metadata = (grpc_metadata*)gpr_realloc(
dest->metadata, sizeof(grpc_metadata) * dest->capacity);
}
- for (grpc_linked_mdelem *l = b->list.head; l != NULL; l = l->next) {
+ for (grpc_linked_mdelem* l = b->list.head; l != NULL; l = l->next) {
mdusr = &dest->metadata[dest->count++];
/* we pass back borrowed slices that are valid whilst the call is valid */
mdusr->key = GRPC_MDKEY(l->md);
@@ -1099,7 +1099,7 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
GPR_TIMER_END("publish_app_metadata", 0);
}
-static void recv_initial_filter(grpc_call *call, grpc_metadata_batch *b) {
+static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
if (b->idx.named.content_encoding != NULL) {
if (b->idx.named.grpc_encoding != NULL) {
gpr_log(GPR_ERROR,
@@ -1135,11 +1135,11 @@ static void recv_initial_filter(grpc_call *call, grpc_metadata_batch *b) {
publish_app_metadata(call, b, false);
}
-static void recv_trailing_filter(void *args, grpc_metadata_batch *b) {
- grpc_call *call = (grpc_call *)args;
+static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
+ grpc_call* call = (grpc_call*)args;
if (b->idx.named.grpc_status != NULL) {
uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
- grpc_error *error =
+ grpc_error* error =
status_code == GRPC_STATUS_OK
? GRPC_ERROR_NONE
: grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@@ -1161,7 +1161,7 @@ static void recv_trailing_filter(void *args, grpc_metadata_batch *b) {
publish_app_metadata(call, b, true);
}
-grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
+grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
return CALL_STACK_FROM_CALL(call);
}
@@ -1169,12 +1169,12 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
* BATCH API IMPLEMENTATION
*/
-static void set_status_value_directly(grpc_status_code status, void *dest) {
- *(grpc_status_code *)dest = status;
+static void set_status_value_directly(grpc_status_code status, void* dest) {
+ *(grpc_status_code*)dest = status;
}
-static void set_cancelled_value(grpc_status_code status, void *dest) {
- *(int *)dest = (status != GRPC_STATUS_OK);
+static void set_cancelled_value(grpc_status_code status, void* dest) {
+ *(int*)dest = (status != GRPC_STATUS_OK);
}
static bool are_write_flags_valid(uint32_t flags) {
@@ -1214,16 +1214,16 @@ static int batch_slot_for_op(grpc_op_type type) {
GPR_UNREACHABLE_CODE(return 123456789);
}
-static batch_control *allocate_batch_control(grpc_call *call,
- const grpc_op *ops,
+static batch_control* allocate_batch_control(grpc_call* call,
+ const grpc_op* ops,
size_t num_ops) {
int slot = batch_slot_for_op(ops[0].op);
- batch_control **pslot = &call->active_batches[slot];
+ batch_control** pslot = &call->active_batches[slot];
if (*pslot == NULL) {
*pslot =
- (batch_control *)gpr_arena_alloc(call->arena, sizeof(batch_control));
+ (batch_control*)gpr_arena_alloc(call->arena, sizeof(batch_control));
}
- batch_control *bctl = *pslot;
+ batch_control* bctl = *pslot;
if (bctl->call != NULL) {
return NULL;
}
@@ -1233,26 +1233,26 @@ static batch_control *allocate_batch_control(grpc_call *call,
return bctl;
}
-static void finish_batch_completion(void *user_data,
- grpc_cq_completion *storage) {
- batch_control *bctl = (batch_control *)user_data;
- grpc_call *call = bctl->call;
+static void finish_batch_completion(void* user_data,
+ grpc_cq_completion* storage) {
+ batch_control* bctl = (batch_control*)user_data;
+ grpc_call* call = bctl->call;
bctl->call = NULL;
GRPC_CALL_INTERNAL_UNREF(call, "completion");
}
-static grpc_error *consolidate_batch_errors(batch_control *bctl) {
+static grpc_error* consolidate_batch_errors(batch_control* bctl) {
size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors);
if (n == 0) {
return GRPC_ERROR_NONE;
} else if (n == 1) {
/* Skip creating a composite error in the case that only one error was
logged */
- grpc_error *e = bctl->errors[0];
+ grpc_error* e = bctl->errors[0];
bctl->errors[0] = NULL;
return e;
} else {
- grpc_error *error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Call batch failed", bctl->errors, n);
for (size_t i = 0; i < n; i++) {
GRPC_ERROR_UNREF(bctl->errors[i]);
@@ -1262,10 +1262,10 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) {
}
}
-static void post_batch_completion(batch_control *bctl) {
- grpc_call *next_child_call;
- grpc_call *call = bctl->call;
- grpc_error *error = consolidate_batch_errors(bctl);
+static void post_batch_completion(batch_control* bctl) {
+ grpc_call* next_child_call;
+ grpc_call* call = bctl->call;
+ grpc_error* error = consolidate_batch_errors(bctl);
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
@@ -1279,15 +1279,15 @@ static void post_batch_completion(batch_control *bctl) {
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
- grpc_metadata_batch *md =
+ grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(call, md);
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
- parent_call *pc = get_parent_call(call);
+ parent_call* pc = get_parent_call(call);
if (pc != NULL) {
- grpc_call *child;
+ grpc_call* child;
gpr_mu_lock(&pc->child_list_mu);
child = pc->first_child;
if (child != NULL) {
@@ -1326,7 +1326,7 @@ static void post_batch_completion(batch_control *bctl) {
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = NULL;
- GRPC_CLOSURE_RUN((grpc_closure *)bctl->completion_data.notify_tag.tag,
+ GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
@@ -1337,15 +1337,15 @@ static void post_batch_completion(batch_control *bctl) {
}
}
-static void finish_batch_step(batch_control *bctl) {
+static void finish_batch_step(batch_control* bctl) {
if (gpr_unref(&bctl->steps_to_complete)) {
post_batch_completion(bctl);
}
}
-static void continue_receiving_slices(batch_control *bctl) {
- grpc_error *error;
- grpc_call *call = bctl->call;
+static void continue_receiving_slices(batch_control* bctl) {
+ grpc_error* error;
+ grpc_call* call = bctl->call;
for (;;) {
size_t remaining = call->receiving_stream->length -
(*call->receiving_buffer)->data.raw.slice_buffer.length;
@@ -1378,10 +1378,10 @@ static void continue_receiving_slices(batch_control *bctl) {
}
}
-static void receiving_slice_ready(void *bctlp, grpc_error *error) {
- batch_control *bctl = (batch_control *)bctlp;
- grpc_call *call = bctl->call;
- grpc_byte_stream *bs = call->receiving_stream;
+static void receiving_slice_ready(void* bctlp, grpc_error* error) {
+ batch_control* bctl = (batch_control*)bctlp;
+ grpc_call* call = bctl->call;
+ grpc_byte_stream* bs = call->receiving_stream;
bool release_error = false;
if (error == GRPC_ERROR_NONE) {
@@ -1414,8 +1414,8 @@ static void receiving_slice_ready(void *bctlp, grpc_error *error) {
}
}
-static void process_data_after_md(batch_control *bctl) {
- grpc_call *call = bctl->call;
+static void process_data_after_md(batch_control* bctl) {
+ grpc_call* call = bctl->call;
if (call->receiving_stream == NULL) {
*call->receiving_buffer = NULL;
call->receiving_message = 0;
@@ -1435,9 +1435,9 @@ static void process_data_after_md(batch_control *bctl) {
}
}
-static void receiving_stream_ready(void *bctlp, grpc_error *error) {
- batch_control *bctl = (batch_control *)bctlp;
- grpc_call *call = bctl->call;
+static void receiving_stream_ready(void* bctlp, grpc_error* error) {
+ batch_control* bctl = (batch_control*)bctlp;
+ grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
if (call->receiving_stream != NULL) {
grpc_byte_stream_destroy(call->receiving_stream);
@@ -1458,22 +1458,22 @@ static void receiving_stream_ready(void *bctlp, grpc_error *error) {
// The recv_message_ready callback used when sending a batch containing
// a recv_message op down the filter stack. Yields the call combiner
// before processing the received message.
-static void receiving_stream_ready_in_call_combiner(void *bctlp,
- grpc_error *error) {
- batch_control *bctl = (batch_control *)bctlp;
- grpc_call *call = bctl->call;
+static void receiving_stream_ready_in_call_combiner(void* bctlp,
+ grpc_error* error) {
+ batch_control* bctl = (batch_control*)bctlp;
+ grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
receiving_stream_ready(bctlp, error);
}
-static void validate_filtered_metadata(batch_control *bctl) {
- grpc_call *call = bctl->call;
+static void validate_filtered_metadata(batch_control* bctl) {
+ grpc_call* call = bctl->call;
/* validate compression algorithms */
if (call->incoming_stream_compression_algorithm !=
GRPC_STREAM_COMPRESS_NONE) {
const grpc_stream_compression_algorithm algo =
call->incoming_stream_compression_algorithm;
- char *error_msg = NULL;
+ char* error_msg = NULL;
const grpc_compression_options compression_options =
grpc_channel_compression_options(call->channel);
if (algo >= GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
@@ -1485,7 +1485,7 @@ static void validate_filtered_metadata(batch_control *bctl) {
} else if (grpc_compression_options_is_stream_compression_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
- const char *algo_name = NULL;
+ const char* algo_name = NULL;
grpc_stream_compression_algorithm_name(algo, &algo_name);
gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.",
algo_name);
@@ -1499,7 +1499,7 @@ static void validate_filtered_metadata(batch_control *bctl) {
if (!GPR_BITGET(call->stream_encodings_accepted_by_peer,
call->incoming_stream_compression_algorithm)) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
- const char *algo_name = NULL;
+ const char* algo_name = NULL;
grpc_stream_compression_algorithm_name(
call->incoming_stream_compression_algorithm, &algo_name);
gpr_log(
@@ -1513,7 +1513,7 @@ static void validate_filtered_metadata(batch_control *bctl) {
} else if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
const grpc_compression_algorithm algo =
call->incoming_compression_algorithm;
- char *error_msg = NULL;
+ char* error_msg = NULL;
const grpc_compression_options compression_options =
grpc_channel_compression_options(call->channel);
/* check if algorithm is known */
@@ -1526,7 +1526,7 @@ static void validate_filtered_metadata(batch_control *bctl) {
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
- const char *algo_name = NULL;
+ const char* algo_name = NULL;
grpc_compression_algorithm_name(algo, &algo_name);
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
@@ -1542,7 +1542,7 @@ static void validate_filtered_metadata(batch_control *bctl) {
if (!GPR_BITGET(call->encodings_accepted_by_peer,
call->incoming_compression_algorithm)) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
- const char *algo_name = NULL;
+ const char* algo_name = NULL;
grpc_compression_algorithm_name(call->incoming_compression_algorithm,
&algo_name);
gpr_log(GPR_ERROR,
@@ -1555,7 +1555,7 @@ static void validate_filtered_metadata(batch_control *bctl) {
}
}
-static void add_batch_error(batch_control *bctl, grpc_error *error,
+static void add_batch_error(batch_control* bctl, grpc_error* error,
bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1);
@@ -1565,15 +1565,15 @@ static void add_batch_error(batch_control *bctl, grpc_error *error,
bctl->errors[idx] = error;
}
-static void receiving_initial_metadata_ready(void *bctlp, grpc_error *error) {
- batch_control *bctl = (batch_control *)bctlp;
- grpc_call *call = bctl->call;
+static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
+ batch_control* bctl = (batch_control*)bctlp;
+ grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
- grpc_metadata_batch *md =
+ grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
recv_initial_filter(call, md);
@@ -1587,7 +1587,7 @@ static void receiving_initial_metadata_ready(void *bctlp, grpc_error *error) {
}
}
- grpc_closure *saved_rsr_closure = NULL;
+ grpc_closure* saved_rsr_closure = NULL;
while (true) {
gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
/* Should only receive initial metadata once */
@@ -1604,9 +1604,9 @@ static void receiving_initial_metadata_ready(void *bctlp, grpc_error *error) {
}
} else {
/* Already received messages */
- saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready,
- (batch_control *)rsr_bctlp,
- grpc_schedule_on_exec_ctx);
+ saved_rsr_closure =
+ GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
+ grpc_schedule_on_exec_ctx);
/* No need to modify recv_state */
break;
}
@@ -1618,28 +1618,28 @@ static void receiving_initial_metadata_ready(void *bctlp, grpc_error *error) {
finish_batch_step(bctl);
}
-static void finish_batch(void *bctlp, grpc_error *error) {
- batch_control *bctl = (batch_control *)bctlp;
- grpc_call *call = bctl->call;
+static void finish_batch(void* bctlp, grpc_error* error) {
+ batch_control* bctl = (batch_control*)bctlp;
+ grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(bctl);
}
-static void free_no_op_completion(void *p, grpc_cq_completion *completion) {
+static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
gpr_free(completion);
}
-static grpc_call_error call_start_batch(grpc_call *call, const grpc_op *ops,
- size_t nops, void *notify_tag,
+static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
+ size_t nops, void* notify_tag,
int is_notify_tag_closure) {
size_t i;
- const grpc_op *op;
- batch_control *bctl;
+ const grpc_op* op;
+ batch_control* bctl;
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
- grpc_transport_stream_op_batch *stream_op;
- grpc_transport_stream_op_batch_payload *stream_op_payload;
+ grpc_transport_stream_op_batch* stream_op;
+ grpc_transport_stream_op_batch_payload* stream_op_payload;
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1649,9 +1649,9 @@ static grpc_call_error call_start_batch(grpc_call *call, const grpc_op *ops,
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(
call->cq, notify_tag, GRPC_ERROR_NONE, free_no_op_completion, NULL,
- (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion)));
+ (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion)));
} else {
- GRPC_CLOSURE_SCHED((grpc_closure *)notify_tag, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;
@@ -1847,7 +1847,7 @@ static grpc_call_error call_start_batch(grpc_call *call, const grpc_op *ops,
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
call->channel, op->data.send_status_from_server.status);
{
- grpc_error *override_error = GRPC_ERROR_NONE;
+ grpc_error* override_error = GRPC_ERROR_NONE;
if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
override_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Error from server send status");
@@ -1858,7 +1858,7 @@ static grpc_call_error call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_slice_ref_internal(
*op->data.send_status_from_server.status_details));
call->send_extra_metadata_count++;
- char *msg = grpc_slice_to_c_string(
+ char* msg = grpc_slice_to_c_string(
GRPC_MDVALUE(call->send_extra_metadata[1].md));
override_error =
grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
@@ -2032,8 +2032,8 @@ done_with_error:
goto done;
}
-grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
- size_t nops, void *tag, void *reserved) {
+grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
+ size_t nops, void* tag, void* reserved) {
ExecCtx _local_exec_ctx;
grpc_call_error err;
@@ -2052,15 +2052,15 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return err;
}
-grpc_call_error grpc_call_start_batch_and_execute(grpc_call *call,
- const grpc_op *ops,
+grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
+ const grpc_op* ops,
size_t nops,
- grpc_closure *closure) {
+ grpc_closure* closure) {
return call_start_batch(call, ops, nops, closure, 1);
}
-void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
- void *value, void (*destroy)(void *value)) {
+void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
+ void* value, void (*destroy)(void* value)) {
if (call->context[elem].destroy) {
call->context[elem].destroy(call->context[elem].value);
}
@@ -2068,20 +2068,20 @@ void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
call->context[elem].destroy = destroy;
}
-void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
+void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
return call->context[elem].value;
}
-uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; }
+uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
grpc_compression_algorithm grpc_call_compression_for_level(
- grpc_call *call, grpc_compression_level level) {
+ grpc_call* call, grpc_compression_level level) {
grpc_compression_algorithm algo =
compression_algorithm_for_level_locked(call, level);
return algo;
}
-const char *grpc_call_error_to_string(grpc_call_error error) {
+const char* grpc_call_error_to_string(grpc_call_error error) {
switch (error) {
case GRPC_CALL_ERROR:
return "GRPC_CALL_ERROR";
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index eba96e04b9..61cc6520da 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -30,22 +30,22 @@ extern "C" {
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
-typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
- void *user_data);
+typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
+ void* user_data);
typedef struct grpc_call_create_args {
- grpc_channel *channel;
+ grpc_channel* channel;
- grpc_call *parent;
+ grpc_call* parent;
uint32_t propagation_mask;
- grpc_completion_queue *cq;
+ grpc_completion_queue* cq;
/* if not NULL, it'll be used in lieu of cq */
- grpc_pollset_set *pollset_set_alternative;
+ grpc_pollset_set* pollset_set_alternative;
- const void *server_transport_data;
+ const void* server_transport_data;
- grpc_mdelem *add_initial_metadata;
+ grpc_mdelem* add_initial_metadata;
size_t add_initial_metadata_count;
grpc_millis send_deadline;
@@ -54,57 +54,57 @@ typedef struct grpc_call_create_args {
/* Create a new call based on \a args.
Regardless of success or failure, always returns a valid new call into *call
*/
-grpc_error *grpc_call_create(const grpc_call_create_args *args,
- grpc_call **call);
+grpc_error* grpc_call_create(const grpc_call_create_args* args,
+ grpc_call** call);
-void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
+void grpc_call_set_completion_queue(grpc_call* call, grpc_completion_queue* cq);
#ifndef NDEBUG
-void grpc_call_internal_ref(grpc_call *call, const char *reason);
-void grpc_call_internal_unref(grpc_call *call, const char *reason);
+void grpc_call_internal_ref(grpc_call* call, const char* reason);
+void grpc_call_internal_unref(grpc_call* call, const char* reason);
#define GRPC_CALL_INTERNAL_REF(call, reason) \
grpc_call_internal_ref(call, reason)
#define GRPC_CALL_INTERNAL_UNREF(call, reason) \
grpc_call_internal_unref(call, reason)
#else
-void grpc_call_internal_ref(grpc_call *call);
-void grpc_call_internal_unref(grpc_call *call);
+void grpc_call_internal_ref(grpc_call* call);
+void grpc_call_internal_unref(grpc_call* call);
#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call)
#define GRPC_CALL_INTERNAL_UNREF(call, reason) grpc_call_internal_unref(call)
#endif
-grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
+grpc_call_stack* grpc_call_get_call_stack(grpc_call* call);
-grpc_call_error grpc_call_start_batch_and_execute(grpc_call *call,
- const grpc_op *ops,
+grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
+ const grpc_op* ops,
size_t nops,
- grpc_closure *closure);
+ grpc_closure* closure);
/* Given the top call_element, get the call object. */
-grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
+grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element);
-void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
- grpc_call *call, const grpc_op *ops, size_t nops,
- void *tag);
+void grpc_call_log_batch(const char* file, int line, gpr_log_severity severity,
+ grpc_call* call, const grpc_op* ops, size_t nops,
+ void* tag);
/* Set a context pointer.
No thread safety guarantees are made wrt this value. */
/* TODO(#9731): add exec_ctx to destroy */
-void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
- void *value, void (*destroy)(void *value));
+void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
+ void* value, void (*destroy)(void* value));
/* Get a context pointer. */
-void *grpc_call_context_get(grpc_call *call, grpc_context_index elem);
+void* grpc_call_context_get(grpc_call* call, grpc_context_index elem);
#define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \
if (GRPC_TRACER_ON(grpc_api_trace)) \
grpc_call_log_batch(sev, call, ops, nops, tag)
-uint8_t grpc_call_is_client(grpc_call *call);
+uint8_t grpc_call_is_client(grpc_call* call);
/* Return an appropriate compression algorithm for the requested compression \a
* level in the context of \a call. */
grpc_compression_algorithm grpc_call_compression_for_level(
- grpc_call *call, grpc_compression_level level);
+ grpc_call* call, grpc_compression_level level);
extern grpc_tracer_flag grpc_call_error_trace;
extern grpc_tracer_flag grpc_compression_trace;
diff --git a/src/core/lib/surface/call_log_batch.cc b/src/core/lib/surface/call_log_batch.cc
index 5557927b7c..030964675d 100644
--- a/src/core/lib/surface/call_log_batch.cc
+++ b/src/core/lib/surface/call_log_batch.cc
@@ -25,7 +25,7 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
-static void add_metadata(gpr_strvec *b, const grpc_metadata *md, size_t count) {
+static void add_metadata(gpr_strvec* b, const grpc_metadata* md, size_t count) {
size_t i;
if (md == NULL) {
gpr_strvec_add(b, gpr_strdup("(nil)"));
@@ -41,9 +41,9 @@ static void add_metadata(gpr_strvec *b, const grpc_metadata *md, size_t count) {
}
}
-char *grpc_op_string(const grpc_op *op) {
- char *tmp;
- char *out;
+char* grpc_op_string(const grpc_op* op) {
+ char* tmp;
+ char* out;
gpr_strvec b;
gpr_strvec_init(&b);
@@ -105,10 +105,10 @@ char *grpc_op_string(const grpc_op *op) {
return out;
}
-void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
- grpc_call *call, const grpc_op *ops, size_t nops,
- void *tag) {
- char *tmp;
+void grpc_call_log_batch(const char* file, int line, gpr_log_severity severity,
+ grpc_call* call, const grpc_op* ops, size_t nops,
+ void* tag) {
+ char* tmp;
size_t i;
for (i = 0; i < nops; i++) {
tmp = grpc_op_string(&ops[i]);
diff --git a/src/core/lib/surface/call_test_only.h b/src/core/lib/surface/call_test_only.h
index a5a01b3679..2ff4a487d5 100644
--- a/src/core/lib/surface/call_test_only.h
+++ b/src/core/lib/surface/call_test_only.h
@@ -29,30 +29,30 @@ extern "C" {
*
* \warning This function should \b only be used in test code. */
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
- grpc_call *call);
+ grpc_call* call);
/** Return the message flags from \a call.
*
* \warning This function should \b only be used in test code. */
-uint32_t grpc_call_test_only_get_message_flags(grpc_call *call);
+uint32_t grpc_call_test_only_get_message_flags(grpc_call* call);
/** Returns a bitset for the encodings (compression algorithms) supported by \a
* call's peer.
*
* To be indexed by grpc_compression_algorithm enum values. */
-uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call);
+uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call);
/** Returns a bitset for the stream encodings (stream compression algorithms)
* supported by \a call's peer.
*
* To be indexed by grpc_stream_compression_algorithm enum values. */
uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
- grpc_call *call);
+ grpc_call* call);
/** Returns the incoming stream compression algorithm (content-encoding header)
* received by a call. */
grpc_stream_compression_algorithm
-grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call);
+grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call);
#ifdef __cplusplus
}
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 0511f561ac..dc1fc1632e 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -47,7 +47,7 @@
typedef struct registered_call {
grpc_mdelem path;
grpc_mdelem authority;
- struct registered_call *next;
+ struct registered_call* next;
} registered_call;
struct grpc_channel {
@@ -58,34 +58,34 @@ struct grpc_channel {
gpr_atm call_size_estimate;
gpr_mu registered_call_mu;
- registered_call *registered_calls;
+ registered_call* registered_calls;
- char *target;
+ char* target;
};
-#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack*)((c) + 1))
#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \
- (((grpc_channel *)(channel_stack)) - 1)
+ (((grpc_channel*)(channel_stack)) - 1)
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
-static void destroy_channel(void *arg, grpc_error *error);
+static void destroy_channel(void* arg, grpc_error* error);
-grpc_channel *grpc_channel_create_with_builder(
- grpc_channel_stack_builder *builder,
+grpc_channel* grpc_channel_create_with_builder(
+ grpc_channel_stack_builder* builder,
grpc_channel_stack_type channel_stack_type) {
- char *target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
- grpc_channel_args *args = grpc_channel_args_copy(
+ char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
+ grpc_channel_args* args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
- grpc_channel *channel;
+ grpc_channel* channel;
if (channel_stack_type == GRPC_SERVER_CHANNEL) {
GRPC_STATS_INC_SERVER_CHANNELS_CREATED();
} else {
GRPC_STATS_INC_CLIENT_CHANNELS_CREATED();
}
- grpc_error *error = grpc_channel_stack_builder_finish(
+ grpc_error* error = grpc_channel_stack_builder_finish(
builder, sizeof(grpc_channel), 1, destroy_channel, NULL,
- (void **)&channel);
+ (void**)&channel);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "channel stack builder failed: %s",
grpc_error_string(error));
@@ -194,11 +194,11 @@ done:
return channel;
}
-grpc_channel *grpc_channel_create(const char *target,
- const grpc_channel_args *input_args,
+grpc_channel* grpc_channel_create(const char* target,
+ const grpc_channel_args* input_args,
grpc_channel_stack_type channel_stack_type,
- grpc_transport *optional_transport) {
- grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
+ grpc_transport* optional_transport) {
+ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(builder, input_args);
grpc_channel_stack_builder_set_target(builder, target);
grpc_channel_stack_builder_set_transport(builder, optional_transport);
@@ -209,7 +209,7 @@ grpc_channel *grpc_channel_create(const char *target,
return grpc_channel_create_with_builder(builder, channel_stack_type);
}
-size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) {
+size_t grpc_channel_get_call_size_estimate(grpc_channel* channel) {
#define ROUND_UP_SIZE 256
/* We round up our current estimate to the NEXT value of ROUND_UP_SIZE.
This ensures:
@@ -222,7 +222,7 @@ size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) {
~(size_t)(ROUND_UP_SIZE - 1);
}
-void grpc_channel_update_call_size_estimate(grpc_channel *channel,
+void grpc_channel_update_call_size_estimate(grpc_channel* channel,
size_t size) {
size_t cur = (size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate);
if (cur < size) {
@@ -241,23 +241,23 @@ void grpc_channel_update_call_size_estimate(grpc_channel *channel,
}
}
-char *grpc_channel_get_target(grpc_channel *channel) {
+char* grpc_channel_get_target(grpc_channel* channel) {
GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel));
return gpr_strdup(channel->target);
}
-void grpc_channel_get_info(grpc_channel *channel,
- const grpc_channel_info *channel_info) {
+void grpc_channel_get_info(grpc_channel* channel,
+ const grpc_channel_info* channel_info) {
ExecCtx _local_exec_ctx;
- grpc_channel_element *elem =
+ grpc_channel_element* elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
elem->filter->get_channel_info(elem, channel_info);
grpc_exec_ctx_finish();
}
-static grpc_call *grpc_channel_create_call_internal(
- grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
- grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative,
+static grpc_call* grpc_channel_create_call_internal(
+ grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
grpc_mdelem path_mdelem, grpc_mdelem authority_mdelem,
grpc_millis deadline) {
grpc_mdelem send_metadata[2];
@@ -285,20 +285,20 @@ static grpc_call *grpc_channel_create_call_internal(
args.add_initial_metadata_count = num_metadata;
args.send_deadline = deadline;
- grpc_call *call;
+ grpc_call* call;
GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
return call;
}
-grpc_call *grpc_channel_create_call(grpc_channel *channel,
- grpc_call *parent_call,
+grpc_call* grpc_channel_create_call(grpc_channel* channel,
+ grpc_call* parent_call,
uint32_t propagation_mask,
- grpc_completion_queue *cq,
- grpc_slice method, const grpc_slice *host,
- gpr_timespec deadline, void *reserved) {
+ grpc_completion_queue* cq,
+ grpc_slice method, const grpc_slice* host,
+ gpr_timespec deadline, void* reserved) {
GPR_ASSERT(!reserved);
ExecCtx _local_exec_ctx;
- grpc_call *call = grpc_channel_create_call_internal(
+ grpc_call* call = grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, cq, NULL,
grpc_mdelem_from_slices(GRPC_MDSTR_PATH, grpc_slice_ref_internal(method)),
host != NULL ? grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY,
@@ -309,10 +309,10 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
return call;
}
-grpc_call *grpc_channel_create_pollset_set_call(
- grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
- grpc_pollset_set *pollset_set, grpc_slice method, const grpc_slice *host,
- grpc_millis deadline, void *reserved) {
+grpc_call* grpc_channel_create_pollset_set_call(
+ grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_pollset_set* pollset_set, grpc_slice method, const grpc_slice* host,
+ grpc_millis deadline, void* reserved) {
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, NULL, pollset_set,
@@ -323,9 +323,9 @@ grpc_call *grpc_channel_create_pollset_set_call(
deadline);
}
-void *grpc_channel_register_call(grpc_channel *channel, const char *method,
- const char *host, void *reserved) {
- registered_call *rc = (registered_call *)gpr_malloc(sizeof(registered_call));
+void* grpc_channel_register_call(grpc_channel* channel, const char* method,
+ const char* host, void* reserved) {
+ registered_call* rc = (registered_call*)gpr_malloc(sizeof(registered_call));
GRPC_API_TRACE(
"grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)",
4, (channel, method, host, reserved));
@@ -348,11 +348,11 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
return rc;
}
-grpc_call *grpc_channel_create_registered_call(
- grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
- grpc_completion_queue *completion_queue, void *registered_call_handle,
- gpr_timespec deadline, void *reserved) {
- registered_call *rc = (registered_call *)registered_call_handle;
+grpc_call* grpc_channel_create_registered_call(
+ grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_completion_queue* completion_queue, void* registered_call_handle,
+ gpr_timespec deadline, void* reserved) {
+ registered_call* rc = (registered_call*)registered_call_handle;
GRPC_API_TRACE(
"grpc_channel_create_registered_call("
"channel=%p, parent_call=%p, propagation_mask=%x, completion_queue=%p, "
@@ -360,12 +360,13 @@ grpc_call *grpc_channel_create_registered_call(
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 9, (channel, parent_call, (unsigned)propagation_mask, completion_queue,
- registered_call_handle, deadline.tv_sec, deadline.tv_nsec,
- (int)deadline.clock_type, reserved));
+ 9,
+ (channel, parent_call, (unsigned)propagation_mask, completion_queue,
+ registered_call_handle, deadline.tv_sec, deadline.tv_nsec,
+ (int)deadline.clock_type, reserved));
GPR_ASSERT(!reserved);
ExecCtx _local_exec_ctx;
- grpc_call *call = grpc_channel_create_call_internal(
+ grpc_call* call = grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, completion_queue, NULL,
GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority),
grpc_timespec_to_millis_round_up(deadline));
@@ -375,24 +376,24 @@ grpc_call *grpc_channel_create_registered_call(
#ifndef NDEBUG
#define REF_REASON reason
-#define REF_ARG , const char *reason
+#define REF_ARG , const char* reason
#else
#define REF_REASON ""
#define REF_ARG
#endif
-void grpc_channel_internal_ref(grpc_channel *c REF_ARG) {
+void grpc_channel_internal_ref(grpc_channel* c REF_ARG) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
}
-void grpc_channel_internal_unref(grpc_channel *c REF_ARG) {
+void grpc_channel_internal_unref(grpc_channel* c REF_ARG) {
GRPC_CHANNEL_STACK_UNREF(CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
}
-static void destroy_channel(void *arg, grpc_error *error) {
- grpc_channel *channel = (grpc_channel *)arg;
+static void destroy_channel(void* arg, grpc_error* error) {
+ grpc_channel* channel = (grpc_channel*)arg;
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
- registered_call *rc = channel->registered_calls;
+ registered_call* rc = channel->registered_calls;
channel->registered_calls = rc->next;
GRPC_MDELEM_UNREF(rc->path);
GRPC_MDELEM_UNREF(rc->authority);
@@ -404,9 +405,9 @@ static void destroy_channel(void *arg, grpc_error *error) {
gpr_free(channel);
}
-void grpc_channel_destroy(grpc_channel *channel) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
- grpc_channel_element *elem;
+void grpc_channel_destroy(grpc_channel* channel) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ grpc_channel_element* elem;
ExecCtx _local_exec_ctx;
GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel));
op->disconnect_with_error =
@@ -419,16 +420,16 @@ void grpc_channel_destroy(grpc_channel *channel) {
grpc_exec_ctx_finish();
}
-grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
+grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel) {
return CHANNEL_STACK_FROM_CHANNEL(channel);
}
grpc_compression_options grpc_channel_compression_options(
- const grpc_channel *channel) {
+ const grpc_channel* channel) {
return channel->compression_options;
}
-grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
+grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_channel* channel, int i) {
char tmp[GPR_LTOA_MIN_BUFSIZE];
switch (i) {
case 0:
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index dd8b8983ba..6499486896 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -27,13 +27,13 @@
extern "C" {
#endif
-grpc_channel *grpc_channel_create(const char *target,
- const grpc_channel_args *args,
+grpc_channel* grpc_channel_create(const char* target,
+ const grpc_channel_args* args,
grpc_channel_stack_type channel_stack_type,
- grpc_transport *optional_transport);
+ grpc_transport* optional_transport);
-grpc_channel *grpc_channel_create_with_builder(
- grpc_channel_stack_builder *builder,
+grpc_channel* grpc_channel_create_with_builder(
+ grpc_channel_stack_builder* builder,
grpc_channel_stack_type channel_stack_type);
/** Create a call given a grpc_channel, in order to call \a method.
@@ -44,34 +44,34 @@ grpc_channel *grpc_channel_create_with_builder(
non-NULL, it must be a server-side call. It will be used to propagate
properties from the server call to this new client call, depending on the
value of \a propagation_mask (see propagation_bits.h for possible values) */
-grpc_call *grpc_channel_create_pollset_set_call(
- grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
- grpc_pollset_set *pollset_set, grpc_slice method, const grpc_slice *host,
- grpc_millis deadline, void *reserved);
+grpc_call* grpc_channel_create_pollset_set_call(
+ grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
+ grpc_pollset_set* pollset_set, grpc_slice method, const grpc_slice* host,
+ grpc_millis deadline, void* reserved);
/** Get a (borrowed) pointer to this channels underlying channel stack */
-grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
+grpc_channel_stack* grpc_channel_get_channel_stack(grpc_channel* channel);
/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of
status_code.
The returned elem is owned by the caller. */
-grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_channel *channel,
+grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_channel* channel,
int status_code);
-size_t grpc_channel_get_call_size_estimate(grpc_channel *channel);
-void grpc_channel_update_call_size_estimate(grpc_channel *channel, size_t size);
+size_t grpc_channel_get_call_size_estimate(grpc_channel* channel);
+void grpc_channel_update_call_size_estimate(grpc_channel* channel, size_t size);
#ifndef NDEBUG
-void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
-void grpc_channel_internal_unref(grpc_channel *channel, const char *reason);
+void grpc_channel_internal_ref(grpc_channel* channel, const char* reason);
+void grpc_channel_internal_unref(grpc_channel* channel, const char* reason);
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel, reason)
#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
grpc_channel_internal_unref(channel, reason)
#else
-void grpc_channel_internal_ref(grpc_channel *channel);
-void grpc_channel_internal_unref(grpc_channel *channel);
+void grpc_channel_internal_ref(grpc_channel* channel);
+void grpc_channel_internal_unref(grpc_channel* channel);
#define GRPC_CHANNEL_INTERNAL_REF(channel, reason) \
grpc_channel_internal_ref(channel)
#define GRPC_CHANNEL_INTERNAL_UNREF(channel, reason) \
@@ -80,7 +80,7 @@ void grpc_channel_internal_unref(grpc_channel *channel);
/** Return the channel's compression options. */
grpc_compression_options grpc_channel_compression_options(
- const grpc_channel *channel);
+ const grpc_channel* channel);
#ifdef __cplusplus
}
diff --git a/src/core/lib/surface/channel_init.cc b/src/core/lib/surface/channel_init.cc
index a2ea3160c0..065de5e742 100644
--- a/src/core/lib/surface/channel_init.cc
+++ b/src/core/lib/surface/channel_init.cc
@@ -23,13 +23,13 @@
typedef struct stage_slot {
grpc_channel_init_stage fn;
- void *arg;
+ void* arg;
int priority;
size_t insertion_order;
} stage_slot;
typedef struct stage_slots {
- stage_slot *slots;
+ stage_slot* slots;
size_t num_slots;
size_t cap_slots;
} stage_slots;
@@ -49,24 +49,24 @@ void grpc_channel_init_init(void) {
void grpc_channel_init_register_stage(grpc_channel_stack_type type,
int priority,
grpc_channel_init_stage stage,
- void *stage_arg) {
+ void* stage_arg) {
GPR_ASSERT(!g_finalized);
if (g_slots[type].cap_slots == g_slots[type].num_slots) {
g_slots[type].cap_slots = GPR_MAX(8, 3 * g_slots[type].cap_slots / 2);
- g_slots[type].slots = (stage_slot *)gpr_realloc(
+ g_slots[type].slots = (stage_slot*)gpr_realloc(
g_slots[type].slots,
g_slots[type].cap_slots * sizeof(*g_slots[type].slots));
}
- stage_slot *s = &g_slots[type].slots[g_slots[type].num_slots++];
+ stage_slot* s = &g_slots[type].slots[g_slots[type].num_slots++];
s->insertion_order = g_slots[type].num_slots;
s->priority = priority;
s->fn = stage;
s->arg = stage_arg;
}
-static int compare_slots(const void *a, const void *b) {
- const stage_slot *sa = (const stage_slot *)a;
- const stage_slot *sb = (const stage_slot *)b;
+static int compare_slots(const void* a, const void* b) {
+ const stage_slot* sa = (const stage_slot*)a;
+ const stage_slot* sb = (const stage_slot*)b;
int c = GPR_ICMP(sa->priority, sb->priority);
if (c != 0) return c;
@@ -85,11 +85,11 @@ void grpc_channel_init_finalize(void) {
void grpc_channel_init_shutdown(void) {
for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
gpr_free(g_slots[i].slots);
- g_slots[i].slots = (stage_slot *)(void *)(uintptr_t)0xdeadbeef;
+ g_slots[i].slots = (stage_slot*)(void*)(uintptr_t)0xdeadbeef;
}
}
-bool grpc_channel_init_create_stack(grpc_channel_stack_builder *builder,
+bool grpc_channel_init_create_stack(grpc_channel_stack_builder* builder,
grpc_channel_stack_type type) {
GPR_ASSERT(g_finalized);
@@ -97,7 +97,7 @@ bool grpc_channel_init_create_stack(grpc_channel_stack_builder *builder,
grpc_channel_stack_type_string(type));
for (size_t i = 0; i < g_slots[type].num_slots; i++) {
- const stage_slot *slot = &g_slots[type].slots[i];
+ const stage_slot* slot = &g_slots[type].slots[i];
if (!slot->fn(builder, slot->arg)) {
return false;
}
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index 1c5e0a2976..b4bee7c673 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -36,8 +36,8 @@ extern "C" {
/// One stage of mutation: call functions against \a builder to influence the
/// finally constructed channel stack
-typedef bool (*grpc_channel_init_stage)(grpc_channel_stack_builder *builder,
- void *arg);
+typedef bool (*grpc_channel_init_stage)(grpc_channel_stack_builder* builder,
+ void* arg);
/// Global initialization of the system
void grpc_channel_init_init(void);
@@ -50,7 +50,7 @@ void grpc_channel_init_init(void);
void grpc_channel_init_register_stage(grpc_channel_stack_type type,
int priority,
grpc_channel_init_stage stage_fn,
- void *stage_arg);
+ void* stage_arg);
/// Finalize registration. No more calls to grpc_channel_init_register_stage are
/// allowed.
@@ -69,7 +69,7 @@ void grpc_channel_init_shutdown(void);
/// \a optional_transport is either NULL or a constructed transport object
/// Returns a pointer to the base of the memory allocated (the actual channel
/// stack object will be prefix_bytes past that pointer)
-bool grpc_channel_init_create_stack(grpc_channel_stack_builder *builder,
+bool grpc_channel_init_create_stack(grpc_channel_stack_builder* builder,
grpc_channel_stack_type type);
#ifdef __cplusplus
diff --git a/src/core/lib/surface/channel_ping.cc b/src/core/lib/surface/channel_ping.cc
index 5d41b8e9e4..0966a8d967 100644
--- a/src/core/lib/surface/channel_ping.cc
+++ b/src/core/lib/surface/channel_ping.cc
@@ -28,28 +28,28 @@
typedef struct {
grpc_closure closure;
- void *tag;
- grpc_completion_queue *cq;
+ void* tag;
+ grpc_completion_queue* cq;
grpc_cq_completion completion_storage;
} ping_result;
-static void ping_destroy(void *arg, grpc_cq_completion *storage) {
+static void ping_destroy(void* arg, grpc_cq_completion* storage) {
gpr_free(arg);
}
-static void ping_done(void *arg, grpc_error *error) {
- ping_result *pr = (ping_result *)arg;
+static void ping_done(void* arg, grpc_error* error) {
+ ping_result* pr = (ping_result*)arg;
grpc_cq_end_op(pr->cq, pr->tag, GRPC_ERROR_REF(error), ping_destroy, pr,
&pr->completion_storage);
}
-void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
- void *tag, void *reserved) {
+void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq,
+ void* tag, void* reserved) {
GRPC_API_TRACE("grpc_channel_ping(channel=%p, cq=%p, tag=%p, reserved=%p)", 4,
(channel, cq, tag, reserved));
- grpc_transport_op *op = grpc_make_transport_op(NULL);
- ping_result *pr = (ping_result *)gpr_malloc(sizeof(*pr));
- grpc_channel_element *top_elem =
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ ping_result* pr = (ping_result*)gpr_malloc(sizeof(*pr));
+ grpc_channel_element* top_elem =
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
ExecCtx _local_exec_ctx;
GPR_ASSERT(reserved == NULL);
diff --git a/src/core/lib/surface/channel_stack_type.cc b/src/core/lib/surface/channel_stack_type.cc
index 5f5c877727..366c452942 100644
--- a/src/core/lib/surface/channel_stack_type.cc
+++ b/src/core/lib/surface/channel_stack_type.cc
@@ -38,7 +38,7 @@ bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) {
GPR_UNREACHABLE_CODE(return true;);
}
-const char *grpc_channel_stack_type_string(grpc_channel_stack_type type) {
+const char* grpc_channel_stack_type_string(grpc_channel_stack_type type) {
switch (type) {
case GRPC_CLIENT_CHANNEL:
return "CLIENT_CHANNEL";
diff --git a/src/core/lib/surface/channel_stack_type.h b/src/core/lib/surface/channel_stack_type.h
index c77848794c..feecd3aa44 100644
--- a/src/core/lib/surface/channel_stack_type.h
+++ b/src/core/lib/surface/channel_stack_type.h
@@ -44,7 +44,7 @@ typedef enum {
bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type);
-const char *grpc_channel_stack_type_string(grpc_channel_stack_type type);
+const char* grpc_channel_stack_type_string(grpc_channel_stack_type type);
#ifdef __cplusplus
}
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 8abcb70d25..b69d40534d 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
+#include <grpc/support/tls.h>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/pollset.h"
@@ -48,60 +49,68 @@ grpc_tracer_flag grpc_trace_cq_refcount =
GRPC_TRACER_INITIALIZER(false, "cq_refcount");
#endif
+// Specifies a cq thread local cache.
+// The first event that occurs on a thread
+// with a cq cache will go into that cache, and
+// will only be returned on the thread that initialized the cache.
+// NOTE: Only one event will ever be cached.
+GPR_TLS_DECL(g_cached_event);
+GPR_TLS_DECL(g_cached_cq);
+
typedef struct {
- grpc_pollset_worker **worker;
- void *tag;
+ grpc_pollset_worker** worker;
+ void* tag;
} plucker;
typedef struct {
bool can_get_pollset;
bool can_listen;
size_t (*size)(void);
- void (*init)(grpc_pollset *pollset, gpr_mu **mu);
- grpc_error *(*kick)(grpc_pollset *pollset,
- grpc_pollset_worker *specific_worker);
- grpc_error *(*work)(grpc_pollset *pollset, grpc_pollset_worker **worker,
+ void (*init)(grpc_pollset* pollset, gpr_mu** mu);
+ grpc_error* (*kick)(grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker);
+ grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
grpc_millis deadline);
- void (*shutdown)(grpc_pollset *pollset, grpc_closure *closure);
- void (*destroy)(grpc_pollset *pollset);
+ void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
+ void (*destroy)(grpc_pollset* pollset);
} cq_poller_vtable;
typedef struct non_polling_worker {
gpr_cv cv;
bool kicked;
- struct non_polling_worker *next;
- struct non_polling_worker *prev;
+ struct non_polling_worker* next;
+ struct non_polling_worker* prev;
} non_polling_worker;
typedef struct {
gpr_mu mu;
- non_polling_worker *root;
- grpc_closure *shutdown;
+ non_polling_worker* root;
+ grpc_closure* shutdown;
} non_polling_poller;
static size_t non_polling_poller_size(void) {
return sizeof(non_polling_poller);
}
-static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) {
- non_polling_poller *npp = (non_polling_poller *)pollset;
+static void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
+ non_polling_poller* npp = (non_polling_poller*)pollset;
gpr_mu_init(&npp->mu);
*mu = &npp->mu;
}
-static void non_polling_poller_destroy(grpc_pollset *pollset) {
- non_polling_poller *npp = (non_polling_poller *)pollset;
+static void non_polling_poller_destroy(grpc_pollset* pollset) {
+ non_polling_poller* npp = (non_polling_poller*)pollset;
gpr_mu_destroy(&npp->mu);
}
-static grpc_error *non_polling_poller_work(grpc_pollset *pollset,
- grpc_pollset_worker **worker,
+static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
+ grpc_pollset_worker** worker,
grpc_millis deadline) {
- non_polling_poller *npp = (non_polling_poller *)pollset;
+ non_polling_poller* npp = (non_polling_poller*)pollset;
if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
gpr_cv_init(&w.cv);
- if (worker != NULL) *worker = (grpc_pollset_worker *)&w;
+ if (worker != NULL) *worker = (grpc_pollset_worker*)&w;
if (npp->root == NULL) {
npp->root = w.next = w.prev = &w;
} else {
@@ -115,6 +124,7 @@ static grpc_error *non_polling_poller_work(grpc_pollset *pollset,
while (!npp->shutdown && !w.kicked &&
!gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
;
+ grpc_exec_ctx_invalidate_now();
if (&w == npp->root) {
npp->root = w.next;
if (&w == npp->root) {
@@ -131,12 +141,12 @@ static grpc_error *non_polling_poller_work(grpc_pollset *pollset,
return GRPC_ERROR_NONE;
}
-static grpc_error *non_polling_poller_kick(
- grpc_pollset *pollset, grpc_pollset_worker *specific_worker) {
- non_polling_poller *p = (non_polling_poller *)pollset;
- if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root;
+static grpc_error* non_polling_poller_kick(
+ grpc_pollset* pollset, grpc_pollset_worker* specific_worker) {
+ non_polling_poller* p = (non_polling_poller*)pollset;
+ if (specific_worker == NULL) specific_worker = (grpc_pollset_worker*)p->root;
if (specific_worker != NULL) {
- non_polling_worker *w = (non_polling_worker *)specific_worker;
+ non_polling_worker* w = (non_polling_worker*)specific_worker;
if (!w->kicked) {
w->kicked = true;
gpr_cv_signal(&w->cv);
@@ -145,15 +155,15 @@ static grpc_error *non_polling_poller_kick(
return GRPC_ERROR_NONE;
}
-static void non_polling_poller_shutdown(grpc_pollset *pollset,
- grpc_closure *closure) {
- non_polling_poller *p = (non_polling_poller *)pollset;
+static void non_polling_poller_shutdown(grpc_pollset* pollset,
+ grpc_closure* closure) {
+ non_polling_poller* p = (non_polling_poller*)pollset;
GPR_ASSERT(closure != NULL);
p->shutdown = closure;
if (p->root == NULL) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
} else {
- non_polling_worker *w = p->root;
+ non_polling_worker* w = p->root;
do {
gpr_cv_signal(&w->cv);
w = w->next;
@@ -177,17 +187,17 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type;
size_t data_size;
- void (*init)(void *data);
- void (*shutdown)(grpc_completion_queue *cq);
- void (*destroy)(void *data);
- bool (*begin_op)(grpc_completion_queue *cq, void *tag);
- void (*end_op)(grpc_completion_queue *cq, void *tag, grpc_error *error,
- void (*done)(void *done_arg, grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage);
- grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline,
- void *reserved);
- grpc_event (*pluck)(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline, void *reserved);
+ void (*init)(void* data);
+ void (*shutdown)(grpc_completion_queue* cq);
+ void (*destroy)(void* data);
+ bool (*begin_op)(grpc_completion_queue* cq, void* tag);
+ void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
+ grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
+ void* reserved);
+ grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline, void* reserved);
} cq_vtable;
/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
@@ -224,7 +234,7 @@ typedef struct cq_next_data {
typedef struct cq_pluck_data {
/** Completed events for completion-queues of type GRPC_CQ_PLUCK */
grpc_cq_completion completed_head;
- grpc_cq_completion *completed_tail;
+ grpc_cq_completion* completed_tail;
/** Number of pending events (+1 if we're not shutdown) */
gpr_atm pending_events;
@@ -251,13 +261,13 @@ struct grpc_completion_queue {
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
- gpr_mu *mu;
+ gpr_mu* mu;
- const cq_vtable *vtable;
- const cq_poller_vtable *poller_vtable;
+ const cq_vtable* vtable;
+ const cq_poller_vtable* poller_vtable;
#ifndef NDEBUG
- void **outstanding_tags;
+ void** outstanding_tags;
size_t outstanding_tag_count;
size_t outstanding_tag_capacity;
#endif
@@ -267,36 +277,36 @@ struct grpc_completion_queue {
};
/* Forward declarations */
-static void cq_finish_shutdown_next(grpc_completion_queue *cq);
-static void cq_finish_shutdown_pluck(grpc_completion_queue *cq);
-static void cq_shutdown_next(grpc_completion_queue *cq);
-static void cq_shutdown_pluck(grpc_completion_queue *cq);
+static void cq_finish_shutdown_next(grpc_completion_queue* cq);
+static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
+static void cq_shutdown_next(grpc_completion_queue* cq);
+static void cq_shutdown_pluck(grpc_completion_queue* cq);
-static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
+static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
-static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage);
+static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
-static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage);
+static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
-static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
- void *reserved);
+static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
+ void* reserved);
-static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline, void *reserved);
+static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline, void* reserved);
-static void cq_init_next(void *data);
-static void cq_init_pluck(void *data);
-static void cq_destroy_next(void *data);
-static void cq_destroy_pluck(void *data);
+static void cq_init_next(void* data);
+static void cq_init_pluck(void* data);
+static void cq_destroy_next(void* data);
+static void cq_destroy_pluck(void* data);
/* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = {
@@ -309,9 +319,9 @@ static const cq_vtable g_cq_vtable[] = {
cq_pluck},
};
-#define DATA_FROM_CQ(cq) ((void *)(cq + 1))
+#define DATA_FROM_CQ(cq) ((void*)(cq + 1))
#define POLLSET_FROM_CQ(cq) \
- ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq)))
+ ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
grpc_tracer_flag grpc_cq_pluck_trace =
GRPC_TRACER_INITIALIZER(true, "queue_pluck");
@@ -322,37 +332,77 @@ grpc_tracer_flag grpc_cq_event_timeout_trace =
if (GRPC_TRACER_ON(grpc_api_trace) && \
(GRPC_TRACER_ON(grpc_cq_pluck_trace) || \
(event)->type != GRPC_QUEUE_TIMEOUT)) { \
- char *_ev = grpc_event_string(event); \
+ char* _ev = grpc_event_string(event); \
gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
gpr_free(_ev); \
}
-static void on_pollset_shutdown_done(void *cq, grpc_error *error);
+static void on_pollset_shutdown_done(void* cq, grpc_error* error);
+
+void grpc_cq_global_init() {
+ gpr_tls_init(&g_cached_event);
+ gpr_tls_init(&g_cached_cq);
+}
+
+void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
+ if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
+ gpr_tls_set(&g_cached_event, (intptr_t)0);
+ gpr_tls_set(&g_cached_cq, (intptr_t)cq);
+ }
+}
-static void cq_event_queue_init(grpc_cq_event_queue *q) {
+int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
+ void** tag, int* ok) {
+ grpc_cq_completion* storage =
+ (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
+ int ret = 0;
+ if (storage != NULL &&
+ (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
+ *tag = storage->tag;
+ ExecCtx _local_exec_ctx;
+ *ok = (storage->next & (uintptr_t)(1)) == 1;
+ storage->done(storage->done_arg, storage);
+ ret = 1;
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
+ }
+ grpc_exec_ctx_finish();
+ }
+ gpr_tls_set(&g_cached_event, (intptr_t)0);
+ gpr_tls_set(&g_cached_cq, (intptr_t)0);
+
+ return ret;
+}
+
+static void cq_event_queue_init(grpc_cq_event_queue* q) {
gpr_mpscq_init(&q->queue);
q->queue_lock = GPR_SPINLOCK_INITIALIZER;
gpr_atm_no_barrier_store(&q->num_queue_items, 0);
}
-static void cq_event_queue_destroy(grpc_cq_event_queue *q) {
+static void cq_event_queue_destroy(grpc_cq_event_queue* q) {
gpr_mpscq_destroy(&q->queue);
}
-static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) {
- gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c);
+static bool cq_event_queue_push(grpc_cq_event_queue* q, grpc_cq_completion* c) {
+ gpr_mpscq_push(&q->queue, (gpr_mpscq_node*)c);
return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0;
}
-static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
- grpc_cq_completion *c = NULL;
+static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
+ grpc_cq_completion* c = NULL;
ExecCtx _local_exec_ctx;
if (gpr_spinlock_trylock(&q->queue_lock)) {
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
bool is_empty = false;
- c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
+ c = (grpc_cq_completion*)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty);
gpr_spinlock_unlock(&q->queue_lock);
if (c == NULL && !is_empty) {
@@ -373,14 +423,14 @@ static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) {
/* Note: The counter is not incremented/decremented atomically with push/pop.
* The count is only eventually consistent */
-static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
+static long cq_event_queue_num_items(grpc_cq_event_queue* q) {
return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
}
-grpc_completion_queue *grpc_completion_queue_create_internal(
+grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type,
grpc_cq_polling_type polling_type) {
- grpc_completion_queue *cq;
+ grpc_completion_queue* cq;
GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
@@ -389,17 +439,17 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
"polling_type=%d)",
2, (completion_type, polling_type));
- const cq_vtable *vtable = &g_cq_vtable[completion_type];
- const cq_poller_vtable *poller_vtable =
+ const cq_vtable* vtable = &g_cq_vtable[completion_type];
+ const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];
ExecCtx _local_exec_ctx;
GRPC_STATS_INC_CQS_CREATED();
grpc_exec_ctx_finish();
- cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) +
- vtable->data_size +
- poller_vtable->size());
+ cq = (grpc_completion_queue*)gpr_zalloc(sizeof(grpc_completion_queue) +
+ vtable->data_size +
+ poller_vtable->size());
cq->vtable = vtable;
cq->poller_vtable = poller_vtable;
@@ -418,8 +468,8 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
return cq;
}
-static void cq_init_next(void *ptr) {
- cq_next_data *cqd = (cq_next_data *)ptr;
+static void cq_init_next(void* ptr) {
+ cq_next_data* cqd = (cq_next_data*)ptr;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
@@ -427,14 +477,14 @@ static void cq_init_next(void *ptr) {
cq_event_queue_init(&cqd->queue);
}
-static void cq_destroy_next(void *ptr) {
- cq_next_data *cqd = (cq_next_data *)ptr;
+static void cq_destroy_next(void* ptr) {
+ cq_next_data* cqd = (cq_next_data*)ptr;
GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0);
cq_event_queue_destroy(&cqd->queue);
}
-static void cq_init_pluck(void *ptr) {
- cq_pluck_data *cqd = (cq_pluck_data *)ptr;
+static void cq_init_pluck(void* ptr) {
+ cq_pluck_data* cqd = (cq_pluck_data*)ptr;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
@@ -445,16 +495,16 @@ static void cq_init_pluck(void *ptr) {
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
}
-static void cq_destroy_pluck(void *ptr) {
- cq_pluck_data *cqd = (cq_pluck_data *)ptr;
+static void cq_destroy_pluck(void* ptr) {
+ cq_pluck_data* cqd = (cq_pluck_data*)ptr;
GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head);
}
-grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) {
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
return cq->vtable->cq_completion_type;
}
-int grpc_get_cq_poll_num(grpc_completion_queue *cq) {
+int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
int cur_num_polls;
gpr_mu_lock(cq->mu);
cur_num_polls = cq->num_polls;
@@ -463,8 +513,8 @@ int grpc_get_cq_poll_num(grpc_completion_queue *cq) {
}
#ifndef NDEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason,
- const char *file, int line) {
+void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
+ const char* file, int line) {
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -472,19 +522,19 @@ void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason,
reason);
}
#else
-void grpc_cq_internal_ref(grpc_completion_queue *cq) {
+void grpc_cq_internal_ref(grpc_completion_queue* cq) {
#endif
gpr_ref(&cq->owning_refs);
}
-static void on_pollset_shutdown_done(void *arg, grpc_error *error) {
- grpc_completion_queue *cq = (grpc_completion_queue *)arg;
+static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
+ grpc_completion_queue* cq = (grpc_completion_queue*)arg;
GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
}
#ifndef NDEBUG
-void grpc_cq_internal_unref(grpc_completion_queue *cq, const char *reason,
- const char *file, int line) {
+void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
+ const char* file, int line) {
if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) {
gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -492,7 +542,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cq, const char *reason,
reason);
}
#else
-void grpc_cq_internal_unref(grpc_completion_queue *cq) {
+void grpc_cq_internal_unref(grpc_completion_queue* cq) {
#endif
if (gpr_unref(&cq->owning_refs)) {
cq->vtable->destroy(DATA_FROM_CQ(cq));
@@ -505,7 +555,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cq) {
}
#ifndef NDEBUG
-static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
+static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
int found = 0;
if (lock_cq) {
gpr_mu_lock(cq->mu);
@@ -514,7 +564,7 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
if (cq->outstanding_tags[i] == tag) {
cq->outstanding_tag_count--;
- GPR_SWAP(void *, cq->outstanding_tags[i],
+ GPR_SWAP(void*, cq->outstanding_tags[i],
cq->outstanding_tags[cq->outstanding_tag_count]);
found = 1;
break;
@@ -528,12 +578,12 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
GPR_ASSERT(found);
}
#else
-static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
+static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
#endif
/* Atomically increments a counter only if the counter is not zero. Returns
* true if the increment was successful; false if the counter is zero */
-static bool atm_inc_if_nonzero(gpr_atm *counter) {
+static bool atm_inc_if_nonzero(gpr_atm* counter) {
while (true) {
gpr_atm count = gpr_atm_acq_load(counter);
/* If zero, we are done. If not, we must to a CAS (instead of an atomic
@@ -549,22 +599,22 @@ static bool atm_inc_if_nonzero(gpr_atm *counter) {
return true;
}
-static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
-static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events);
}
-bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags = (void **)gpr_realloc(
+ cq->outstanding_tags = (void**)gpr_realloc(
cq->outstanding_tags,
sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity);
}
@@ -577,28 +627,27 @@ bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
-static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
+static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage) {
GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
if (GRPC_TRACER_ON(grpc_api_trace) ||
(GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
- const char *errmsg = grpc_error_string(error);
+ const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
"cq_end_op_for_next(=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (&exec_ctx, cq, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
}
-
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
storage->tag = tag;
@@ -608,44 +657,50 @@ static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag,
cq_check_tag(cq, tag, true); /* Used in debug builds only */
- /* Add the completion to the queue */
- bool is_first = cq_event_queue_push(&cqd->queue, storage);
- gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
-
- /* Since we do not hold the cq lock here, it is important to do an 'acquire'
- load here (instead of a 'no_barrier' load) to match with the release store
- (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
- */
- bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
-
- if (!will_definitely_shutdown) {
- /* Only kick if this is the first item queued */
- if (is_first) {
- gpr_mu_lock(cq->mu);
- grpc_error *kick_error =
- cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
- gpr_mu_unlock(cq->mu);
+ if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
+ (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
+ gpr_tls_set(&g_cached_event, (intptr_t)storage);
+ } else {
+ /* Add the completion to the queue */
+ bool is_first = cq_event_queue_push(&cqd->queue, storage);
+ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
+
+ /* Since we do not hold the cq lock here, it is important to do an 'acquire'
+ load here (instead of a 'no_barrier' load) to match with the release
+ store
+ (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
+ */
+ bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
+
+ if (!will_definitely_shutdown) {
+ /* Only kick if this is the first item queued */
+ if (is_first) {
+ gpr_mu_lock(cq->mu);
+ grpc_error* kick_error =
+ cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL);
+ gpr_mu_unlock(cq->mu);
- if (kick_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(kick_error);
- gpr_log(GPR_ERROR, "Kick failed: %s", msg);
- GRPC_ERROR_UNREF(kick_error);
+ if (kick_error != GRPC_ERROR_NONE) {
+ const char* msg = grpc_error_string(kick_error);
+ gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+ GRPC_ERROR_UNREF(kick_error);
+ }
}
- }
- if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_mu_lock(cq->mu);
+ cq_finish_shutdown_next(cq);
+ gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
+ }
+ } else {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+ gpr_atm_rel_store(&cqd->pending_events, 0);
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
- } else {
- GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
- gpr_atm_rel_store(&cqd->pending_events, 0);
- gpr_mu_lock(cq->mu);
- cq_finish_shutdown_next(cq);
- gpr_mu_unlock(cq->mu);
- GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
GPR_TIMER_END("cq_end_op_for_next", 0);
@@ -656,12 +711,12 @@ static void cq_end_op_for_next(grpc_completion_queue *cq, void *tag,
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_PLUCK) */
-static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag,
- grpc_error *error,
- void (*done)(void *done_arg,
- grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
+ grpc_error* error,
+ void (*done)(void* done_arg,
+ grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
@@ -669,11 +724,11 @@ static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag,
if (GRPC_TRACER_ON(grpc_api_trace) ||
(GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
- const char *errmsg = grpc_error_string(error);
+ const char* errmsg = grpc_error_string(error);
GRPC_API_TRACE(
"cq_end_op_for_pluck(=%p, cq=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)",
- 7, (&exec_ctx, cq, tag, errmsg, done, done_arg, storage));
+ 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage));
if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
@@ -698,7 +753,7 @@ static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag,
cq_finish_shutdown_pluck(cq);
gpr_mu_unlock(cq->mu);
} else {
- grpc_pollset_worker *pluck_worker = NULL;
+ grpc_pollset_worker* pluck_worker = NULL;
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag) {
pluck_worker = *cqd->pluckers[i].worker;
@@ -706,13 +761,13 @@ static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag,
}
}
- grpc_error *kick_error =
+ grpc_error* kick_error =
cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
gpr_mu_unlock(cq->mu);
if (kick_error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(kick_error);
+ const char* msg = grpc_error_string(kick_error);
gpr_log(GPR_ERROR, "Kick failed: %s", msg);
GRPC_ERROR_UNREF(kick_error);
@@ -724,25 +779,25 @@ static void cq_end_op_for_pluck(grpc_completion_queue *cq, void *tag,
GRPC_ERROR_UNREF(error);
}
-void grpc_cq_end_op(grpc_completion_queue *cq, void *tag, grpc_error *error,
- void (*done)(void *done_arg, grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage) {
+void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage) {
cq->vtable->end_op(cq, tag, error, done, done_arg, storage);
}
typedef struct {
gpr_atm last_seen_things_queued_ever;
- grpc_completion_queue *cq;
+ grpc_completion_queue* cq;
grpc_millis deadline;
- grpc_cq_completion *stolen_completion;
- void *tag; /* for pluck */
+ grpc_cq_completion* stolen_completion;
+ void* tag; /* for pluck */
bool first_loop;
} cq_is_finished_arg;
-static bool cq_is_next_finished(void *arg) {
- cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
- grpc_completion_queue *cq = a->cq;
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+static bool cq_is_next_finished(void* arg) {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
+ grpc_completion_queue* cq = a->cq;
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -766,7 +821,7 @@ static bool cq_is_next_finished(void *arg) {
}
#ifndef NDEBUG
-static void dump_pending_tags(grpc_completion_queue *cq) {
+static void dump_pending_tags(grpc_completion_queue* cq) {
if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return;
gpr_strvec v;
@@ -774,24 +829,24 @@ static void dump_pending_tags(grpc_completion_queue *cq) {
gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
gpr_mu_lock(cq->mu);
for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
- char *s;
+ char* s;
gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
gpr_strvec_add(&v, s);
}
gpr_mu_unlock(cq->mu);
- char *out = gpr_strvec_flatten(&v, NULL);
+ char* out = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
gpr_log(GPR_DEBUG, "%s", out);
gpr_free(out);
}
#else
-static void dump_pending_tags(grpc_completion_queue *cq) {}
+static void dump_pending_tags(grpc_completion_queue* cq) {}
#endif
-static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
- void *reserved) {
+static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
+ void* reserved) {
grpc_event ret;
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@@ -801,8 +856,9 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
- reserved));
+ 5,
+ (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ reserved));
GPR_ASSERT(!reserved);
dump_pending_tags(cq);
@@ -822,7 +878,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
grpc_millis iteration_deadline = deadline_millis;
if (is_finished_arg.stolen_completion != NULL) {
- grpc_cq_completion *c = is_finished_arg.stolen_completion;
+ grpc_cq_completion* c = is_finished_arg.stolen_completion;
is_finished_arg.stolen_completion = NULL;
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
@@ -831,7 +887,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
break;
}
- grpc_cq_completion *c = cq_event_queue_pop(&cqd->queue);
+ grpc_cq_completion* c = cq_event_queue_pop(&cqd->queue);
if (c != NULL) {
ret.type = GRPC_OP_COMPLETE;
@@ -877,12 +933,12 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cq->mu);
cq->num_polls++;
- grpc_error *err =
+ grpc_error* err =
cq->poller_vtable->work(POLLSET_FROM_CQ(cq), NULL, iteration_deadline);
gpr_mu_unlock(cq->mu);
if (err != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(err);
+ const char* msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
GRPC_ERROR_UNREF(err);
@@ -917,8 +973,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
- Must be called only once in completion queue's lifetime
- grpc_completion_queue_shutdown() MUST have been called before calling
this function */
-static void cq_finish_shutdown_next(grpc_completion_queue *cq) {
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
@@ -926,8 +982,8 @@ static void cq_finish_shutdown_next(grpc_completion_queue *cq) {
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
-static void cq_shutdown_next(grpc_completion_queue *cq) {
- cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+static void cq_shutdown_next(grpc_completion_queue* cq) {
+ cq_next_data* cqd = (cq_next_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_next() below, that would call pollset shutdown.
@@ -953,14 +1009,14 @@ static void cq_shutdown_next(grpc_completion_queue *cq) {
GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
}
-grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
- gpr_timespec deadline, void *reserved) {
+grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
+ gpr_timespec deadline, void* reserved) {
return cq->vtable->next(cq, deadline, reserved);
}
-static int add_plucker(grpc_completion_queue *cq, void *tag,
- grpc_pollset_worker **worker) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static int add_plucker(grpc_completion_queue* cq, void* tag,
+ grpc_pollset_worker** worker) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0;
}
@@ -970,9 +1026,9 @@ static int add_plucker(grpc_completion_queue *cq, void *tag,
return 1;
}
-static void del_plucker(grpc_completion_queue *cq, void *tag,
- grpc_pollset_worker **worker) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static void del_plucker(grpc_completion_queue* cq, void* tag,
+ grpc_pollset_worker** worker) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
cqd->num_pluckers--;
@@ -983,10 +1039,10 @@ static void del_plucker(grpc_completion_queue *cq, void *tag,
GPR_UNREACHABLE_CODE(return );
}
-static bool cq_is_pluck_finished(void *arg) {
- cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
- grpc_completion_queue *cq = a->cq;
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static bool cq_is_pluck_finished(void* arg) {
+ cq_is_finished_arg* a = (cq_is_finished_arg*)arg;
+ grpc_completion_queue* cq = a->cq;
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever =
@@ -995,9 +1051,9 @@ static bool cq_is_pluck_finished(void *arg) {
gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cqd->things_queued_ever);
- grpc_cq_completion *c;
- grpc_cq_completion *prev = &cqd->completed_head;
- while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
+ grpc_cq_completion* c;
+ grpc_cq_completion* prev = &cqd->completed_head;
+ while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
&cqd->completed_head) {
if (c->tag == a->tag) {
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
@@ -1015,13 +1071,13 @@ static bool cq_is_pluck_finished(void *arg) {
return !a->first_loop && a->deadline < grpc_exec_ctx_now();
}
-static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline, void *reserved) {
+static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline, void* reserved) {
grpc_event ret;
- grpc_cq_completion *c;
- grpc_cq_completion *prev;
- grpc_pollset_worker *worker = NULL;
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+ grpc_cq_completion* c;
+ grpc_cq_completion* prev;
+ grpc_pollset_worker* worker = NULL;
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@@ -1032,8 +1088,9 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
"deadline=gpr_timespec { tv_sec: %" PRId64
", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec,
- (int)deadline.clock_type, reserved));
+ 6,
+ (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ reserved));
}
GPR_ASSERT(!reserved);
@@ -1062,7 +1119,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
break;
}
prev = &cqd->completed_head;
- while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
+ while ((c = (grpc_cq_completion*)(prev->next & ~(uintptr_t)1)) !=
&cqd->completed_head) {
if (c->tag == tag) {
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
@@ -1105,12 +1162,12 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
break;
}
cq->num_polls++;
- grpc_error *err =
+ grpc_error* err =
cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
if (err != GRPC_ERROR_NONE) {
del_plucker(cq, tag, &worker);
gpr_mu_unlock(cq->mu);
- const char *msg = grpc_error_string(err);
+ const char* msg = grpc_error_string(err);
gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
GRPC_ERROR_UNREF(err);
@@ -1133,13 +1190,13 @@ done:
return ret;
}
-grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
- gpr_timespec deadline, void *reserved) {
+grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline, void* reserved) {
return cq->vtable->pluck(cq, tag, deadline, reserved);
}
-static void cq_finish_shutdown_pluck(grpc_completion_queue *cq) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
@@ -1150,8 +1207,8 @@ static void cq_finish_shutdown_pluck(grpc_completion_queue *cq) {
/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
* merging them is a bit tricky and probably not worth it */
-static void cq_shutdown_pluck(grpc_completion_queue *cq) {
- cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
+static void cq_shutdown_pluck(grpc_completion_queue* cq) {
+ cq_pluck_data* cqd = (cq_pluck_data*)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because:
* We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
@@ -1176,7 +1233,7 @@ static void cq_shutdown_pluck(grpc_completion_queue *cq) {
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */
-void grpc_completion_queue_shutdown(grpc_completion_queue *cq) {
+void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
ExecCtx _local_exec_ctx;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
@@ -1185,7 +1242,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cq) {
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
-void grpc_completion_queue_destroy(grpc_completion_queue *cq) {
+void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
grpc_completion_queue_shutdown(cq);
@@ -1196,10 +1253,10 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cq) {
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}
-grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) {
+grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL;
}
-bool grpc_cq_can_listen(grpc_completion_queue *cq) {
+bool grpc_cq_can_listen(grpc_completion_queue* cq) {
return cq->poller_vtable->can_listen;
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 304afb17f7..5ddafd9eb5 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -44,52 +44,55 @@ typedef struct grpc_cq_completion {
gpr_mpscq_node node;
/** user supplied tag */
- void *tag;
+ void* tag;
/** done callback - called when this queue element is no longer
needed by the completion queue */
- void (*done)(void *done_arg, struct grpc_cq_completion *c);
- void *done_arg;
+ void (*done)(void* done_arg, struct grpc_cq_completion* c);
+ void* done_arg;
/** next pointer; low bit is used to indicate success or not */
uintptr_t next;
} grpc_cq_completion;
#ifndef NDEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
- const char *file, int line);
-void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
- const char *file, int line);
+void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason,
+ const char* file, int line);
+void grpc_cq_internal_unref(grpc_completion_queue* cc, const char* reason,
+ const char* file, int line);
#define GRPC_CQ_INTERNAL_REF(cc, reason) \
grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__)
#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \
grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__)
#else
-void grpc_cq_internal_ref(grpc_completion_queue *cc);
-void grpc_cq_internal_unref(grpc_completion_queue *cc);
+void grpc_cq_internal_ref(grpc_completion_queue* cc);
+void grpc_cq_internal_unref(grpc_completion_queue* cc);
#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc)
#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc)
#endif
+/* Initializes global variables used by completion queues */
+void grpc_cq_global_init();
+
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
\a tag is currently used only in debug builds. Return true on success, and
false if completion_queue has been shutdown. */
-bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
+bool grpc_cq_begin_op(grpc_completion_queue* cc, void* tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
-void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_error *error,
- void (*done)(void *done_arg, grpc_cq_completion *storage),
- void *done_arg, grpc_cq_completion *storage);
+void grpc_cq_end_op(grpc_completion_queue* cc, void* tag, grpc_error* error,
+ void (*done)(void* done_arg, grpc_cq_completion* storage),
+ void* done_arg, grpc_cq_completion* storage);
-grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
+grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cc);
-bool grpc_cq_can_listen(grpc_completion_queue *cc);
+bool grpc_cq_can_listen(grpc_completion_queue* cc);
-grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cc);
-int grpc_get_cq_poll_num(grpc_completion_queue *cc);
+int grpc_get_cq_poll_num(grpc_completion_queue* cc);
-grpc_completion_queue *grpc_completion_queue_create_internal(
+grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
#ifdef __cplusplus
diff --git a/src/core/lib/surface/event_string.cc b/src/core/lib/surface/event_string.cc
index f236272e2a..b92ee6ad07 100644
--- a/src/core/lib/surface/event_string.cc
+++ b/src/core/lib/surface/event_string.cc
@@ -24,22 +24,22 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/support/string.h"
-static void addhdr(gpr_strvec *buf, grpc_event *ev) {
- char *tmp;
+static void addhdr(gpr_strvec* buf, grpc_event* ev) {
+ char* tmp;
gpr_asprintf(&tmp, "tag:%p", ev->tag);
gpr_strvec_add(buf, tmp);
}
-static const char *errstr(int success) { return success ? "OK" : "ERROR"; }
+static const char* errstr(int success) { return success ? "OK" : "ERROR"; }
-static void adderr(gpr_strvec *buf, int success) {
- char *tmp;
+static void adderr(gpr_strvec* buf, int success) {
+ char* tmp;
gpr_asprintf(&tmp, " %s", errstr(success));
gpr_strvec_add(buf, tmp);
}
-char *grpc_event_string(grpc_event *ev) {
- char *out;
+char* grpc_event_string(grpc_event* ev) {
+ char* out;
gpr_strvec buf;
if (ev == NULL) return gpr_strdup("null");
diff --git a/src/core/lib/surface/event_string.h b/src/core/lib/surface/event_string.h
index 2d53cf0fac..4bdb11f35e 100644
--- a/src/core/lib/surface/event_string.h
+++ b/src/core/lib/surface/event_string.h
@@ -26,7 +26,7 @@ extern "C" {
#endif
/* Returns a string describing an event. Must be later freed with gpr_free() */
-char *grpc_event_string(grpc_event *ev);
+char* grpc_event_string(grpc_event* ev);
#ifdef __cplusplus
}
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 16d2cd189d..66c8c3b6da 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -64,17 +64,18 @@ static void do_basic_init(void) {
gpr_log_verbosity_init();
gpr_mu_init(&g_init_mu);
grpc_register_built_in_plugins();
+ grpc_cq_global_init();
g_initializations = 0;
}
-static bool append_filter(grpc_channel_stack_builder *builder, void *arg) {
+static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
return grpc_channel_stack_builder_append_filter(
- builder, (const grpc_channel_filter *)arg, NULL, NULL);
+ builder, (const grpc_channel_filter*)arg, NULL, NULL);
}
-static bool prepend_filter(grpc_channel_stack_builder *builder, void *arg) {
+static bool prepend_filter(grpc_channel_stack_builder* builder, void* arg) {
return grpc_channel_stack_builder_prepend_filter(
- builder, (const grpc_channel_filter *)arg, NULL, NULL);
+ builder, (const grpc_channel_filter*)arg, NULL, NULL);
}
static void register_builtin_channel_init() {
@@ -89,9 +90,9 @@ static void register_builtin_channel_init() {
grpc_add_connected_filter, NULL);
grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- append_filter, (void *)&grpc_lame_filter);
+ append_filter, (void*)&grpc_lame_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
- (void *)&grpc_server_top_filter);
+ (void*)&grpc_server_top_filter);
}
typedef struct grpc_plugin {
@@ -104,7 +105,7 @@ static int g_number_of_plugins = 0;
void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) {
GRPC_API_TRACE("grpc_register_plugin(init=%p, destroy=%p)", 2,
- ((void *)(intptr_t)init, (void *)(intptr_t)destroy));
+ ((void*)(intptr_t)init, (void*)(intptr_t)destroy));
GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS);
g_all_of_the_plugins[g_number_of_plugins].init = init;
g_all_of_the_plugins[g_number_of_plugins].destroy = destroy;
diff --git a/src/core/lib/surface/init_secure.cc b/src/core/lib/surface/init_secure.cc
index 9fc721b5e4..722670ebce 100644
--- a/src/core/lib/surface/init_secure.cc
+++ b/src/core/lib/surface/init_secure.cc
@@ -47,8 +47,8 @@ void grpc_security_pre_init(void) {
}
static bool maybe_prepend_client_auth_filter(
- grpc_channel_stack_builder *builder, void *arg) {
- const grpc_channel_args *args =
+ grpc_channel_stack_builder* builder, void* arg) {
+ const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (args) {
for (size_t i = 0; i < args->num_args; i++) {
@@ -62,8 +62,8 @@ static bool maybe_prepend_client_auth_filter(
}
static bool maybe_prepend_server_auth_filter(
- grpc_channel_stack_builder *builder, void *arg) {
- const grpc_channel_args *args =
+ grpc_channel_stack_builder* builder, void* arg) {
+ const grpc_channel_args* args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (args) {
for (size_t i = 0; i < args->num_args; i++) {
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index 79131a7b68..5cd8c1fd89 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -40,7 +40,7 @@ namespace grpc_core {
namespace {
struct CallData {
- grpc_call_combiner *call_combiner;
+ grpc_call_combiner* call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
grpc_core::atomic<bool> filled_metadata;
@@ -48,18 +48,18 @@ struct CallData {
struct ChannelData {
grpc_status_code error_code;
- const char *error_message;
+ const char* error_message;
};
-static void fill_metadata(grpc_call_element *elem, grpc_metadata_batch *mdb) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+static void fill_metadata(grpc_call_element* elem, grpc_metadata_batch* mdb) {
+ CallData* calld = reinterpret_cast<CallData*>(elem->call_data);
bool expected = false;
if (!calld->filled_metadata.compare_exchange_strong(
expected, true, grpc_core::memory_order_relaxed,
grpc_core::memory_order_relaxed)) {
return;
}
- ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data);
+ ChannelData* chand = reinterpret_cast<ChannelData*>(elem->channel_data);
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
calld->status.md = grpc_mdelem_from_slices(
@@ -77,8 +77,8 @@ static void fill_metadata(grpc_call_element *elem, grpc_metadata_batch *mdb) {
}
static void lame_start_transport_stream_op_batch(
- grpc_call_element *elem, grpc_transport_stream_op_batch *op) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+ grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
+ CallData* calld = reinterpret_cast<CallData*>(elem->call_data);
if (op->recv_initial_metadata) {
fill_metadata(elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
@@ -91,11 +91,11 @@ static void lame_start_transport_stream_op_batch(
calld->call_combiner);
}
-static void lame_get_channel_info(grpc_channel_element *elem,
- const grpc_channel_info *channel_info) {}
+static void lame_get_channel_info(grpc_channel_element* elem,
+ const grpc_channel_info* channel_info) {}
-static void lame_start_transport_op(grpc_channel_element *elem,
- grpc_transport_op *op) {
+static void lame_start_transport_op(grpc_channel_element* elem,
+ grpc_transport_op* op) {
if (op->on_connectivity_state_change) {
GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN);
*op->connectivity_state = GRPC_CHANNEL_SHUTDOWN;
@@ -111,27 +111,27 @@ static void lame_start_transport_op(grpc_channel_element *elem,
}
}
-static grpc_error *init_call_elem(grpc_call_element *elem,
- const grpc_call_element_args *args) {
- CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+static grpc_error* init_call_elem(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ CallData* calld = reinterpret_cast<CallData*>(elem->call_data);
calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
-static void destroy_call_elem(grpc_call_element *elem,
- const grpc_call_final_info *final_info,
- grpc_closure *then_schedule_closure) {
+static void destroy_call_elem(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_schedule_closure) {
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
-static grpc_error *init_channel_elem(grpc_channel_element *elem,
- grpc_channel_element_args *args) {
+static grpc_error* init_channel_elem(grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last);
return GRPC_ERROR_NONE;
}
-static void destroy_channel_elem(grpc_channel_element *elem) {}
+static void destroy_channel_elem(grpc_channel_element* elem) {}
} // namespace
@@ -151,14 +151,14 @@ extern "C" const grpc_channel_filter grpc_lame_filter = {
"lame-client",
};
-#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack*)((c) + 1))
-grpc_channel *grpc_lame_client_channel_create(const char *target,
+grpc_channel* grpc_lame_client_channel_create(const char* target,
grpc_status_code error_code,
- const char *error_message) {
+ const char* error_message) {
ExecCtx _local_exec_ctx;
- grpc_channel_element *elem;
- grpc_channel *channel =
+ grpc_channel_element* elem;
+ grpc_channel* channel =
grpc_channel_create(target, NULL, GRPC_CLIENT_LAME_CHANNEL, NULL);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GRPC_API_TRACE(
@@ -166,7 +166,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
"error_message=%s)",
3, (target, (int)error_code, error_message));
GPR_ASSERT(elem->filter == &grpc_lame_filter);
- auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data);
+ auto chand = reinterpret_cast<grpc_core::ChannelData*>(elem->channel_data);
chand->error_code = error_code;
chand->error_message = error_message;
grpc_exec_ctx_finish();
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index cf6883bd7e..6e3ce005a2 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -33,7 +33,8 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/support/stack_lockfree.h"
+#include "src/core/lib/support/mpscq.h"
+#include "src/core/lib/support/spinlock.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -44,11 +45,11 @@
#include "src/core/lib/transport/static_metadata.h"
typedef struct listener {
- void *arg;
- void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
+ void* arg;
+ void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
size_t pollset_count);
- void (*destroy)(grpc_server *server, void *arg, grpc_closure *closure);
- struct listener *next;
+ void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
+ struct listener* next;
grpc_closure destroy_done;
} listener;
@@ -62,28 +63,29 @@ grpc_tracer_flag grpc_server_channel_trace =
GRPC_TRACER_INITIALIZER(false, "server_channel");
typedef struct requested_call {
+ gpr_mpscq_node request_link; /* must be first */
requested_call_type type;
size_t cq_idx;
- void *tag;
- grpc_server *server;
- grpc_completion_queue *cq_bound_to_call;
- grpc_call **call;
+ void* tag;
+ grpc_server* server;
+ grpc_completion_queue* cq_bound_to_call;
+ grpc_call** call;
grpc_cq_completion completion;
- grpc_metadata_array *initial_metadata;
+ grpc_metadata_array* initial_metadata;
union {
struct {
- grpc_call_details *details;
+ grpc_call_details* details;
} batch;
struct {
- registered_method *method;
- gpr_timespec *deadline;
- grpc_byte_buffer **optional_payload;
+ registered_method* method;
+ gpr_timespec* deadline;
+ grpc_byte_buffer** optional_payload;
} registered;
} data;
} requested_call;
typedef struct channel_registered_method {
- registered_method *server_registered_method;
+ registered_method* server_registered_method;
uint32_t flags;
bool has_host;
grpc_slice method;
@@ -91,14 +93,14 @@ typedef struct channel_registered_method {
} channel_registered_method;
struct channel_data {
- grpc_server *server;
+ grpc_server* server;
grpc_connectivity_state connectivity_state;
- grpc_channel *channel;
+ grpc_channel* channel;
size_t cq_idx;
/* linked list of all channels on a server */
- channel_data *next;
- channel_data *prev;
- channel_registered_method *registered_methods;
+ channel_data* next;
+ channel_data* prev;
+ channel_registered_method* registered_methods;
uint32_t registered_method_slots;
uint32_t registered_method_max_probes;
grpc_closure finish_destroy_channel_closure;
@@ -106,8 +108,8 @@ struct channel_data {
};
typedef struct shutdown_tag {
- void *tag;
- grpc_completion_queue *cq;
+ void* tag;
+ grpc_completion_queue* cq;
grpc_cq_completion completion;
} shutdown_tag;
@@ -125,12 +127,9 @@ typedef enum {
typedef struct request_matcher request_matcher;
struct call_data {
- grpc_call *call;
+ grpc_call* call;
- /** protects state */
- gpr_mu mu_state;
- /** the current state of a call - see call_state */
- call_state state;
+ gpr_atm state;
bool path_set;
bool host_set;
@@ -138,52 +137,52 @@ struct call_data {
grpc_slice host;
grpc_millis deadline;
- grpc_completion_queue *cq_new;
+ grpc_completion_queue* cq_new;
- grpc_metadata_batch *recv_initial_metadata;
+ grpc_metadata_batch* recv_initial_metadata;
uint32_t recv_initial_metadata_flags;
grpc_metadata_array initial_metadata;
- request_matcher *matcher;
- grpc_byte_buffer *payload;
+ request_matcher* matcher;
+ grpc_byte_buffer* payload;
grpc_closure got_initial_metadata;
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
- grpc_closure *on_done_recv_initial_metadata;
+ grpc_closure* on_done_recv_initial_metadata;
grpc_closure publish;
- call_data *pending_next;
+ call_data* pending_next;
};
struct request_matcher {
- grpc_server *server;
- call_data *pending_head;
- call_data *pending_tail;
- gpr_stack_lockfree **requests_per_cq;
+ grpc_server* server;
+ call_data* pending_head;
+ call_data* pending_tail;
+ gpr_locked_mpscq* requests_per_cq;
};
struct registered_method {
- char *method;
- char *host;
+ char* method;
+ char* host;
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
/* one request matcher per method */
request_matcher matcher;
- registered_method *next;
+ registered_method* next;
};
typedef struct {
- grpc_channel **channels;
+ grpc_channel** channels;
size_t num_channels;
} channel_broadcaster;
struct grpc_server {
- grpc_channel_args *channel_args;
+ grpc_channel_args* channel_args;
- grpc_completion_queue **cqs;
- grpc_pollset **pollsets;
+ grpc_completion_queue** cqs;
+ grpc_pollset** pollsets;
size_t cq_count;
size_t pollset_count;
bool started;
@@ -203,23 +202,18 @@ struct grpc_server {
bool starting;
gpr_cv starting_cv;
- registered_method *registered_methods;
+ registered_method* registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
- /** free list of available requested_calls_per_cq indices */
- gpr_stack_lockfree **request_freelist_per_cq;
- /** requested call backing data */
- requested_call **requested_calls_per_cq;
- int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
size_t num_shutdown_tags;
- shutdown_tag *shutdown_tags;
+ shutdown_tag* shutdown_tags;
channel_data root_channel_data;
- listener *listeners;
+ listener* listeners;
int listeners_destroyed;
gpr_refcount internal_refcount;
@@ -228,29 +222,29 @@ struct grpc_server {
};
#define SERVER_FROM_CALL_ELEM(elem) \
- (((channel_data *)(elem)->channel_data)->server)
+ (((channel_data*)(elem)->channel_data)->server)
-static void publish_new_rpc(void *calld, grpc_error *error);
-static void fail_call(grpc_server *server, size_t cq_idx, requested_call *rc,
- grpc_error *error);
+static void publish_new_rpc(void* calld, grpc_error* error);
+static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
+ grpc_error* error);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
-static void maybe_finish_shutdown(grpc_server *server);
+static void maybe_finish_shutdown(grpc_server* server);
/*
* channel broadcaster
*/
/* assumes server locked */
-static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
- channel_data *c;
+static void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {
+ channel_data* c;
size_t count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
count++;
}
cb->num_channels = count;
cb->channels =
- (grpc_channel **)gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
+ (grpc_channel**)gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
count = 0;
for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
cb->channels[count++] = c->channel;
@@ -263,20 +257,20 @@ struct shutdown_cleanup_args {
grpc_slice slice;
};
-static void shutdown_cleanup(void *arg, grpc_error *error) {
- struct shutdown_cleanup_args *a = (struct shutdown_cleanup_args *)arg;
+static void shutdown_cleanup(void* arg, grpc_error* error) {
+ struct shutdown_cleanup_args* a = (struct shutdown_cleanup_args*)arg;
grpc_slice_unref_internal(a->slice);
gpr_free(a);
}
-static void send_shutdown(grpc_channel *channel, bool send_goaway,
- grpc_error *send_disconnect) {
- struct shutdown_cleanup_args *sc =
- (struct shutdown_cleanup_args *)gpr_malloc(sizeof(*sc));
+static void send_shutdown(grpc_channel* channel, bool send_goaway,
+ grpc_error* send_disconnect) {
+ struct shutdown_cleanup_args* sc =
+ (struct shutdown_cleanup_args*)gpr_malloc(sizeof(*sc));
GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
grpc_schedule_on_exec_ctx);
- grpc_transport_op *op = grpc_make_transport_op(&sc->closure);
- grpc_channel_element *elem;
+ grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
+ grpc_channel_element* elem;
op->goaway_error =
send_goaway ? grpc_error_set_int(
@@ -291,9 +285,9 @@ static void send_shutdown(grpc_channel *channel, bool send_goaway,
elem->filter->start_transport_op(elem, op);
}
-static void channel_broadcaster_shutdown(channel_broadcaster *cb,
+static void channel_broadcaster_shutdown(channel_broadcaster* cb,
bool send_goaway,
- grpc_error *force_disconnect) {
+ grpc_error* force_disconnect) {
size_t i;
for (i = 0; i < cb->num_channels; i++) {
@@ -309,36 +303,33 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb,
* request_matcher
*/
-static void request_matcher_init(request_matcher *rm, size_t entries,
- grpc_server *server) {
+static void request_matcher_init(request_matcher* rm, grpc_server* server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
- rm->requests_per_cq = (gpr_stack_lockfree **)gpr_malloc(
+ rm->requests_per_cq = (gpr_locked_mpscq*)gpr_malloc(
sizeof(*rm->requests_per_cq) * server->cq_count);
for (size_t i = 0; i < server->cq_count; i++) {
- rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries);
+ gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
}
}
-static void request_matcher_destroy(request_matcher *rm) {
+static void request_matcher_destroy(request_matcher* rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) {
- GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
- gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
+ GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL);
+ gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
}
gpr_free(rm->requests_per_cq);
}
-static void kill_zombie(void *elem, grpc_error *error) {
- grpc_call_unref(grpc_call_from_top_element((grpc_call_element *)elem));
+static void kill_zombie(void* elem, grpc_error* error) {
+ grpc_call_unref(grpc_call_from_top_element((grpc_call_element*)elem));
}
-static void request_matcher_zombify_all_pending_calls(request_matcher *rm) {
+static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
while (rm->pending_head) {
- call_data *calld = rm->pending_head;
+ call_data* calld = rm->pending_head;
rm->pending_head = calld->pending_next;
- gpr_mu_lock(&calld->mu_state);
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
@@ -347,15 +338,20 @@ static void request_matcher_zombify_all_pending_calls(request_matcher *rm) {
}
}
-static void request_matcher_kill_requests(grpc_server *server,
- request_matcher *rm,
- grpc_error *error) {
- int request_id;
+static void request_matcher_kill_requests(grpc_server* server,
+ request_matcher* rm,
+ grpc_error* error) {
+ requested_call* rc;
for (size_t i = 0; i < server->cq_count; i++) {
- while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
- -1) {
- fail_call(server, i, &server->requested_calls_per_cq[i][request_id],
- GRPC_ERROR_REF(error));
+ /* Here we know:
+ 1. no requests are being added (since the server is shut down)
+ 2. no other threads are pulling (since the shut down process is single
+ threaded)
+ So, we can ignore the queue lock and just pop, with the guarantee that a
+ NULL returned here truly means that the queue is empty */
+ while ((rc = (requested_call*)gpr_mpscq_pop(
+ &rm->requests_per_cq[i].queue)) != NULL) {
+ fail_call(server, i, rc, GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
@@ -365,12 +361,12 @@ static void request_matcher_kill_requests(grpc_server *server,
* server proper
*/
-static void server_ref(grpc_server *server) {
+static void server_ref(grpc_server* server) {
gpr_ref(&server->internal_refcount);
}
-static void server_delete(grpc_server *server) {
- registered_method *rm;
+static void server_delete(grpc_server* server) {
+ registered_method* rm;
size_t i;
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
@@ -390,43 +386,37 @@ static void server_delete(grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
- if (server->started) {
- gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
- gpr_free(server->requested_calls_per_cq[i]);
- }
}
- gpr_free(server->request_freelist_per_cq);
- gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
gpr_free(server);
}
-static void server_unref(grpc_server *server) {
+static void server_unref(grpc_server* server) {
if (gpr_unref(&server->internal_refcount)) {
server_delete(server);
}
}
-static int is_channel_orphaned(channel_data *chand) {
+static int is_channel_orphaned(channel_data* chand) {
return chand->next == chand;
}
-static void orphan_channel(channel_data *chand) {
+static void orphan_channel(channel_data* chand) {
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
}
-static void finish_destroy_channel(void *cd, grpc_error *error) {
- channel_data *chand = (channel_data *)cd;
- grpc_server *server = chand->server;
+static void finish_destroy_channel(void* cd, grpc_error* error) {
+ channel_data* chand = (channel_data*)cd;
+ grpc_server* server = chand->server;
GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
server_unref(server);
}
-static void destroy_channel(channel_data *chand, grpc_error *error) {
+static void destroy_channel(channel_data* chand, grpc_error* error) {
if (is_channel_orphaned(chand)) return;
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
@@ -436,12 +426,12 @@ static void destroy_channel(channel_data *chand, grpc_error *error) {
finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
if (GRPC_TRACER_ON(grpc_server_channel_trace) && error != GRPC_ERROR_NONE) {
- const char *msg = grpc_error_string(error);
+ const char* msg = grpc_error_string(error);
gpr_log(GPR_INFO, "Disconnected client: %s", msg);
}
GRPC_ERROR_UNREF(error);
- grpc_transport_op *op =
+ grpc_transport_op* op =
grpc_make_transport_op(&chand->finish_destroy_channel_closure);
op->set_accept_stream = true;
grpc_channel_next_op(grpc_channel_stack_element(
@@ -449,28 +439,14 @@ static void destroy_channel(channel_data *chand, grpc_error *error) {
op);
}
-static void done_request_event(void *req, grpc_cq_completion *c) {
- requested_call *rc = (requested_call *)req;
- grpc_server *server = rc->server;
-
- if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
- rc < server->requested_calls_per_cq[rc->cq_idx] +
- server->max_requested_calls_per_cq) {
- GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
- gpr_stack_lockfree_push(
- server->request_freelist_per_cq[rc->cq_idx],
- (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
- } else {
- gpr_free(req);
- }
-
- server_unref(server);
+static void done_request_event(void* req, grpc_cq_completion* c) {
+ gpr_free(req);
}
-static void publish_call(grpc_server *server, call_data *calld, size_t cq_idx,
- requested_call *rc) {
+static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
+ requested_call* rc) {
grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
- grpc_call *call = calld->call;
+ grpc_call* call = calld->call;
*rc->call = call;
calld->cq_new = server->cqs[cq_idx];
GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
@@ -496,25 +472,19 @@ static void publish_call(grpc_server *server, call_data *calld, size_t cq_idx,
GPR_UNREACHABLE_CODE(return );
}
- grpc_call_element *elem =
- grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- channel_data *chand = (channel_data *)elem->channel_data;
- server_ref(chand->server);
grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
rc, &rc->completion);
}
-static void publish_new_rpc(void *arg, grpc_error *error) {
- grpc_call_element *call_elem = (grpc_call_element *)arg;
- call_data *calld = (call_data *)call_elem->call_data;
- channel_data *chand = (channel_data *)call_elem->channel_data;
- request_matcher *rm = calld->matcher;
- grpc_server *server = rm->server;
+static void publish_new_rpc(void* arg, grpc_error* error) {
+ grpc_call_element* call_elem = (grpc_call_element*)arg;
+ call_data* calld = (call_data*)call_elem->call_data;
+ channel_data* chand = (channel_data*)call_elem->channel_data;
+ request_matcher* rm = calld->matcher;
+ grpc_server* server = rm->server;
if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
- gpr_mu_lock(&calld->mu_state);
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
@@ -525,16 +495,14 @@ static void publish_new_rpc(void *arg, grpc_error *error) {
for (size_t i = 0; i < server->cq_count; i++) {
size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
- int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
- if (request_id == -1) {
+ requested_call* rc =
+ (requested_call*)gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]);
+ if (rc == NULL) {
continue;
} else {
GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
- gpr_mu_lock(&calld->mu_state);
- calld->state = ACTIVATED;
- gpr_mu_unlock(&calld->mu_state);
- publish_call(server, calld, cq_idx,
- &server->requested_calls_per_cq[cq_idx][request_id]);
+ gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
+ publish_call(server, calld, cq_idx, rc);
return; /* early out */
}
}
@@ -542,9 +510,27 @@ static void publish_new_rpc(void *arg, grpc_error *error) {
/* no cq to take the request found: queue it on the slow list */
GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
gpr_mu_lock(&server->mu_call);
- gpr_mu_lock(&calld->mu_state);
- calld->state = PENDING;
- gpr_mu_unlock(&calld->mu_state);
+
+ // We need to ensure that all the queues are empty. We do this under
+ // the server mu_call lock to ensure that if something is added to
+ // an empty request queue, it will block until the call is actually
+ // added to the pending list.
+ for (size_t i = 0; i < server->cq_count; i++) {
+ size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
+ requested_call* rc =
+ (requested_call*)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
+ if (rc == NULL) {
+ continue;
+ } else {
+ gpr_mu_unlock(&server->mu_call);
+ GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);
+ gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
+ publish_call(server, calld, cq_idx, rc);
+ return; /* early out */
+ }
+ }
+
+ gpr_atm_no_barrier_store(&calld->state, PENDING);
if (rm->pending_head == NULL) {
rm->pending_tail = rm->pending_head = calld;
} else {
@@ -556,14 +542,12 @@ static void publish_new_rpc(void *arg, grpc_error *error) {
}
static void finish_start_new_rpc(
- grpc_server *server, grpc_call_element *elem, request_matcher *rm,
+ grpc_server* server, grpc_call_element* elem, request_matcher* rm,
grpc_server_register_method_payload_handling payload_handling) {
- call_data *calld = (call_data *)elem->call_data;
+ call_data* calld = (call_data*)elem->call_data;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
- gpr_mu_lock(&calld->mu_state);
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
@@ -589,13 +573,13 @@ static void finish_start_new_rpc(
}
}
-static void start_new_rpc(grpc_call_element *elem) {
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
- grpc_server *server = chand->server;
+static void start_new_rpc(grpc_call_element* elem) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
+ grpc_server* server = chand->server;
uint32_t i;
uint32_t hash;
- channel_registered_method *rm;
+ channel_registered_method* rm;
if (chand->registered_methods && calld->path_set && calld->host_set) {
/* TODO(ctiller): unify these two searches */
@@ -640,8 +624,8 @@ static void start_new_rpc(grpc_call_element *elem) {
GRPC_SRM_PAYLOAD_NONE);
}
-static int num_listeners(grpc_server *server) {
- listener *l;
+static int num_listeners(grpc_server* server) {
+ listener* l;
int n = 0;
for (l = server->listeners; l; l = l->next) {
n++;
@@ -649,12 +633,12 @@ static int num_listeners(grpc_server *server) {
return n;
}
-static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
- server_unref((grpc_server *)server);
+static void done_shutdown_event(void* server, grpc_cq_completion* completion) {
+ server_unref((grpc_server*)server);
}
-static int num_channels(grpc_server *server) {
- channel_data *chand;
+static int num_channels(grpc_server* server) {
+ channel_data* chand;
int n = 0;
for (chand = server->root_channel_data.next;
chand != &server->root_channel_data; chand = chand->next) {
@@ -663,13 +647,13 @@ static int num_channels(grpc_server *server) {
return n;
}
-static void kill_pending_work_locked(grpc_server *server, grpc_error *error) {
+static void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
if (server->started) {
request_matcher_kill_requests(server, &server->unregistered_request_matcher,
GRPC_ERROR_REF(error));
request_matcher_zombify_all_pending_calls(
&server->unregistered_request_matcher);
- for (registered_method *rm = server->registered_methods; rm;
+ for (registered_method* rm = server->registered_methods; rm;
rm = rm->next) {
request_matcher_kill_requests(server, &rm->matcher,
GRPC_ERROR_REF(error));
@@ -679,7 +663,7 @@ static void kill_pending_work_locked(grpc_server *server, grpc_error *error) {
GRPC_ERROR_UNREF(error);
}
-static void maybe_finish_shutdown(grpc_server *server) {
+static void maybe_finish_shutdown(grpc_server* server) {
size_t i;
if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
return;
@@ -712,9 +696,9 @@ static void maybe_finish_shutdown(grpc_server *server) {
}
}
-static void server_on_recv_initial_metadata(void *ptr, grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)ptr;
- call_data *calld = (call_data *)elem->call_data;
+static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
+ grpc_call_element* elem = (grpc_call_element*)ptr;
+ call_data* calld = (call_data*)elem->call_data;
grpc_millis op_deadline;
if (error == GRPC_ERROR_NONE) {
@@ -741,7 +725,7 @@ static void server_on_recv_initial_metadata(void *ptr, grpc_error *error) {
if (calld->host_set && calld->path_set) {
/* do nothing */
} else {
- grpc_error *src_error = error;
+ grpc_error* src_error = error;
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Missing :authority or :path", &error, 1);
GRPC_ERROR_UNREF(src_error);
@@ -750,9 +734,9 @@ static void server_on_recv_initial_metadata(void *ptr, grpc_error *error) {
GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
}
-static void server_mutate_op(grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- call_data *calld = (call_data *)elem->call_data;
+static void server_mutate_op(grpc_call_element* elem,
+ grpc_transport_stream_op_batch* op) {
+ call_data* calld = (call_data*)elem->call_data;
if (op->recv_initial_metadata) {
GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == NULL);
@@ -768,54 +752,47 @@ static void server_mutate_op(grpc_call_element *elem,
}
static void server_start_transport_stream_op_batch(
- grpc_call_element *elem, grpc_transport_stream_op_batch *op) {
+ grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
-static void got_initial_metadata(void *ptr, grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)ptr;
- call_data *calld = (call_data *)elem->call_data;
+static void got_initial_metadata(void* ptr, grpc_error* error) {
+ grpc_call_element* elem = (grpc_call_element*)ptr;
+ call_data* calld = (call_data*)elem->call_data;
if (error == GRPC_ERROR_NONE) {
start_new_rpc(elem);
} else {
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == NOT_STARTED) {
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
- } else if (calld->state == PENDING) {
- calld->state = ZOMBIED;
- gpr_mu_unlock(&calld->mu_state);
+ } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
/* zombied call will be destroyed when it's removed from the pending
queue... later */
- } else {
- gpr_mu_unlock(&calld->mu_state);
}
}
}
-static void accept_stream(void *cd, grpc_transport *transport,
- const void *transport_server_data) {
- channel_data *chand = (channel_data *)cd;
+static void accept_stream(void* cd, grpc_transport* transport,
+ const void* transport_server_data) {
+ channel_data* chand = (channel_data*)cd;
/* create a call */
grpc_call_create_args args;
memset(&args, 0, sizeof(args));
args.channel = chand->channel;
args.server_transport_data = transport_server_data;
args.send_deadline = GRPC_MILLIS_INF_FUTURE;
- grpc_call *call;
- grpc_error *error = grpc_call_create(&args, &call);
- grpc_call_element *elem =
+ grpc_call* call;
+ grpc_error* error = grpc_call_create(&args, &call);
+ grpc_call_element* elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
if (error != GRPC_ERROR_NONE) {
got_initial_metadata(elem, error);
GRPC_ERROR_UNREF(error);
return;
}
- call_data *calld = (call_data *)elem->call_data;
+ call_data* calld = (call_data*)elem->call_data;
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_INITIAL_METADATA;
@@ -826,11 +803,11 @@ static void accept_stream(void *cd, grpc_transport *transport,
grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
}
-static void channel_connectivity_changed(void *cd, grpc_error *error) {
- channel_data *chand = (channel_data *)cd;
- grpc_server *server = chand->server;
+static void channel_connectivity_changed(void* cd, grpc_error* error) {
+ channel_data* chand = (channel_data*)cd;
+ grpc_server* server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
op->on_connectivity_state_change = &chand->channel_connectivity_changed,
op->connectivity_state = &chand->connectivity_state;
grpc_channel_next_op(grpc_channel_stack_element(
@@ -844,14 +821,13 @@ static void channel_connectivity_changed(void *cd, grpc_error *error) {
}
}
-static grpc_error *init_call_elem(grpc_call_element *elem,
- const grpc_call_element_args *args) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+static grpc_error* init_call_elem(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
memset(calld, 0, sizeof(call_data));
calld->deadline = GRPC_MILLIS_INF_FUTURE;
calld->call = grpc_call_from_top_element(elem);
- gpr_mu_init(&calld->mu_state);
GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
server_on_recv_initial_metadata, elem,
@@ -861,11 +837,11 @@ static grpc_error *init_call_elem(grpc_call_element *elem,
return GRPC_ERROR_NONE;
}
-static void destroy_call_elem(grpc_call_element *elem,
- const grpc_call_final_info *final_info,
- grpc_closure *ignored) {
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static void destroy_call_elem(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* ignored) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
GPR_ASSERT(calld->state != PENDING);
@@ -878,14 +854,12 @@ static void destroy_call_elem(grpc_call_element *elem,
grpc_metadata_array_destroy(&calld->initial_metadata);
grpc_byte_buffer_destroy(calld->payload);
- gpr_mu_destroy(&calld->mu_state);
-
server_unref(chand->server);
}
-static grpc_error *init_channel_elem(grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- channel_data *chand = (channel_data *)elem->channel_data;
+static grpc_error* init_channel_elem(grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(args->is_first);
GPR_ASSERT(!args->is_last);
chand->server = NULL;
@@ -899,9 +873,9 @@ static grpc_error *init_channel_elem(grpc_channel_element *elem,
return GRPC_ERROR_NONE;
}
-static void destroy_channel_elem(grpc_channel_element *elem) {
+static void destroy_channel_elem(grpc_channel_element* elem) {
size_t i;
- channel_data *chand = (channel_data *)elem->channel_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
if (chand->registered_methods) {
for (i = 0; i < chand->registered_method_slots; i++) {
grpc_slice_unref_internal(chand->registered_methods[i].method);
@@ -936,9 +910,9 @@ const grpc_channel_filter grpc_server_top_filter = {
"server",
};
-static void register_completion_queue(grpc_server *server,
- grpc_completion_queue *cq,
- void *reserved) {
+static void register_completion_queue(grpc_server* server,
+ grpc_completion_queue* cq,
+ void* reserved) {
size_t i, n;
GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
@@ -947,14 +921,14 @@ static void register_completion_queue(grpc_server *server,
GRPC_CQ_INTERNAL_REF(cq, "server");
n = server->cq_count++;
- server->cqs = (grpc_completion_queue **)gpr_realloc(
- server->cqs, server->cq_count * sizeof(grpc_completion_queue *));
+ server->cqs = (grpc_completion_queue**)gpr_realloc(
+ server->cqs, server->cq_count * sizeof(grpc_completion_queue*));
server->cqs[n] = cq;
}
-void grpc_server_register_completion_queue(grpc_server *server,
- grpc_completion_queue *cq,
- void *reserved) {
+void grpc_server_register_completion_queue(grpc_server* server,
+ grpc_completion_queue* cq,
+ void* reserved) {
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
@@ -970,10 +944,10 @@ void grpc_server_register_completion_queue(grpc_server *server,
register_completion_queue(server, cq, reserved);
}
-grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
+grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
- grpc_server *server = (grpc_server *)gpr_zalloc(sizeof(grpc_server));
+ grpc_server* server = (grpc_server*)gpr_zalloc(sizeof(grpc_server));
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
@@ -984,25 +958,23 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
server->root_channel_data.next = server->root_channel_data.prev =
&server->root_channel_data;
- /* TODO(ctiller): expose a channel_arg for this */
- server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
}
-static int streq(const char *a, const char *b) {
+static int streq(const char* a, const char* b) {
if (a == NULL && b == NULL) return 1;
if (a == NULL) return 0;
if (b == NULL) return 0;
return 0 == strcmp(a, b);
}
-void *grpc_server_register_method(
- grpc_server *server, const char *method, const char *host,
+void* grpc_server_register_method(
+ grpc_server* server, const char* method, const char* host,
grpc_server_register_method_payload_handling payload_handling,
uint32_t flags) {
- registered_method *m;
+ registered_method* m;
GRPC_API_TRACE(
"grpc_server_register_method(server=%p, method=%s, host=%s, "
"flags=0x%08x)",
@@ -1024,7 +996,7 @@ void *grpc_server_register_method(
flags);
return NULL;
}
- m = (registered_method *)gpr_zalloc(sizeof(registered_method));
+ m = (registered_method*)gpr_zalloc(sizeof(registered_method));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
@@ -1034,9 +1006,9 @@ void *grpc_server_register_method(
return m;
}
-static void start_listeners(void *s, grpc_error *error) {
- grpc_server *server = (grpc_server *)s;
- for (listener *l = server->listeners; l; l = l->next) {
+static void start_listeners(void* s, grpc_error* error) {
+ grpc_server* server = (grpc_server*)s;
+ for (listener* l = server->listeners; l; l = l->next) {
l->start(server, l->arg, server->pollsets, server->pollset_count);
}
@@ -1048,7 +1020,7 @@ static void start_listeners(void *s, grpc_error *error) {
server_unref(server);
}
-void grpc_server_start(grpc_server *server) {
+void grpc_server_start(grpc_server* server) {
size_t i;
ExecCtx _local_exec_ctx;
@@ -1057,30 +1029,16 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
server->pollset_count = 0;
server->pollsets =
- (grpc_pollset **)gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
- server->request_freelist_per_cq = (gpr_stack_lockfree **)gpr_malloc(
- sizeof(*server->request_freelist_per_cq) * server->cq_count);
- server->requested_calls_per_cq = (requested_call **)gpr_malloc(
- sizeof(*server->requested_calls_per_cq) * server->cq_count);
+ (grpc_pollset**)gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (grpc_cq_can_listen(server->cqs[i])) {
server->pollsets[server->pollset_count++] =
grpc_cq_pollset(server->cqs[i]);
}
- server->request_freelist_per_cq[i] =
- gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
- for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
- gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
- }
- server->requested_calls_per_cq[i] = (requested_call *)gpr_malloc(
- (size_t)server->max_requested_calls_per_cq *
- sizeof(*server->requested_calls_per_cq[i]));
}
- request_matcher_init(&server->unregistered_request_matcher,
- (size_t)server->max_requested_calls_per_cq, server);
- for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->matcher,
- (size_t)server->max_requested_calls_per_cq, server);
+ request_matcher_init(&server->unregistered_request_matcher, server);
+ for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
+ request_matcher_init(&rm->matcher, server);
}
server_ref(server);
@@ -1093,29 +1051,29 @@ void grpc_server_start(grpc_server *server) {
grpc_exec_ctx_finish();
}
-void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
- size_t *pollset_count) {
+void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
+ size_t* pollset_count) {
*pollset_count = server->pollset_count;
*pollsets = server->pollsets;
}
-void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
- grpc_pollset *accepting_pollset,
- const grpc_channel_args *args) {
+void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
+ grpc_pollset* accepting_pollset,
+ const grpc_channel_args* args) {
size_t num_registered_methods;
size_t alloc;
- registered_method *rm;
- channel_registered_method *crm;
- grpc_channel *channel;
- channel_data *chand;
+ registered_method* rm;
+ channel_registered_method* crm;
+ grpc_channel* channel;
+ channel_data* chand;
uint32_t hash;
size_t slots;
uint32_t probes;
uint32_t max_probes = 0;
- grpc_transport_op *op = NULL;
+ grpc_transport_op* op = NULL;
channel = grpc_channel_create(NULL, args, GRPC_SERVER_CHANNEL, transport);
- chand = (channel_data *)grpc_channel_stack_element(
+ chand = (channel_data*)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;
chand->server = s;
@@ -1141,7 +1099,7 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
if (num_registered_methods > 0) {
slots = 2 * num_registered_methods;
alloc = sizeof(channel_registered_method) * slots;
- chand->registered_methods = (channel_registered_method *)gpr_zalloc(alloc);
+ chand->registered_methods = (channel_registered_method*)gpr_zalloc(alloc);
for (rm = s->registered_methods; rm; rm = rm->next) {
grpc_slice host;
bool has_host;
@@ -1194,23 +1152,23 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op(transport, op);
}
-void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) {
+void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
(void)done_arg;
gpr_free(storage);
}
-static void listener_destroy_done(void *s, grpc_error *error) {
- grpc_server *server = (grpc_server *)s;
+static void listener_destroy_done(void* s, grpc_error* error) {
+ grpc_server* server = (grpc_server*)s;
gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu_global);
}
-void grpc_server_shutdown_and_notify(grpc_server *server,
- grpc_completion_queue *cq, void *tag) {
- listener *l;
- shutdown_tag *sdt;
+void grpc_server_shutdown_and_notify(grpc_server* server,
+ grpc_completion_queue* cq, void* tag) {
+ listener* l;
+ shutdown_tag* sdt;
channel_broadcaster broadcaster;
ExecCtx _local_exec_ctx;
@@ -1227,13 +1185,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* stay locked, and gather up some stuff to do */
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
- grpc_cq_end_op(
- cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL,
- (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion)));
+ grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL,
+ (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion)));
gpr_mu_unlock(&server->mu_global);
goto done;
}
- server->shutdown_tags = (shutdown_tag *)gpr_realloc(
+ server->shutdown_tags = (shutdown_tag*)gpr_realloc(
server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
sdt = &server->shutdown_tags[server->num_shutdown_tags++];
@@ -1273,7 +1230,7 @@ done:
grpc_exec_ctx_finish();
}
-void grpc_server_cancel_all_calls(grpc_server *server) {
+void grpc_server_cancel_all_calls(grpc_server* server) {
channel_broadcaster broadcaster;
ExecCtx _local_exec_ctx;
@@ -1289,8 +1246,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
grpc_exec_ctx_finish();
}
-void grpc_server_destroy(grpc_server *server) {
- listener *l;
+void grpc_server_destroy(grpc_server* server) {
+ listener* l;
ExecCtx _local_exec_ctx;
GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
@@ -1311,13 +1268,13 @@ void grpc_server_destroy(grpc_server *server) {
grpc_exec_ctx_finish();
}
-void grpc_server_add_listener(grpc_server *server, void *arg,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset **pollsets,
+void grpc_server_add_listener(grpc_server* server, void* arg,
+ void (*start)(grpc_server* server, void* arg,
+ grpc_pollset** pollsets,
size_t pollset_count),
- void (*destroy)(grpc_server *server, void *arg,
- grpc_closure *on_done)) {
- listener *l = (listener *)gpr_malloc(sizeof(listener));
+ void (*destroy)(grpc_server* server, void* arg,
+ grpc_closure* on_done)) {
+ listener* l = (listener*)gpr_malloc(sizeof(listener));
l->arg = arg;
l->start = start;
l->destroy = destroy;
@@ -1325,25 +1282,15 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
server->listeners = l;
}
-static grpc_call_error queue_call_request(grpc_server *server, size_t cq_idx,
- requested_call *rc) {
- call_data *calld = NULL;
- request_matcher *rm = NULL;
- int request_id;
+static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
+ requested_call* rc) {
+ call_data* calld = NULL;
+ request_matcher* rm = NULL;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(server, cq_idx, rc,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
return GRPC_CALL_OK;
}
- request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
- if (request_id == -1) {
- /* out of request ids: just fail this one */
- fail_call(server, cq_idx, rc,
- grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"),
- GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq));
- return GRPC_CALL_OK;
- }
switch (rc->type) {
case BATCH_CALL:
rm = &server->unregistered_request_matcher;
@@ -1352,31 +1299,24 @@ static grpc_call_error queue_call_request(grpc_server *server, size_t cq_idx,
rm = &rc->data.registered.method->matcher;
break;
}
- server->requested_calls_per_cq[cq_idx][request_id] = *rc;
- gpr_free(rc);
- if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
+ if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock(&server->mu_call);
while ((calld = rm->pending_head) != NULL) {
- request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]);
- if (request_id == -1) break;
+ rc = (requested_call*)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]);
+ if (rc == NULL) break;
rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call);
- gpr_mu_lock(&calld->mu_state);
- if (calld->state == ZOMBIED) {
- gpr_mu_unlock(&calld->mu_state);
+ if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
+ // Zombied Call
GRPC_CLOSURE_INIT(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
} else {
- GPR_ASSERT(calld->state == PENDING);
- calld->state = ACTIVATED;
- gpr_mu_unlock(&calld->mu_state);
- publish_call(server, calld, cq_idx,
- &server->requested_calls_per_cq[cq_idx][request_id]);
+ publish_call(server, calld, cq_idx, rc);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1386,20 +1326,21 @@ static grpc_call_error queue_call_request(grpc_server *server, size_t cq_idx,
}
grpc_call_error grpc_server_request_call(
- grpc_server *server, grpc_call **call, grpc_call_details *details,
- grpc_metadata_array *initial_metadata,
- grpc_completion_queue *cq_bound_to_call,
- grpc_completion_queue *cq_for_notification, void *tag) {
+ grpc_server* server, grpc_call** call, grpc_call_details* details,
+ grpc_metadata_array* initial_metadata,
+ grpc_completion_queue* cq_bound_to_call,
+ grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
ExecCtx _local_exec_ctx;
- requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc));
+ requested_call* rc = (requested_call*)gpr_malloc(sizeof(*rc));
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
GRPC_API_TRACE(
"grpc_server_request_call("
"server=%p, call=%p, details=%p, initial_metadata=%p, "
"cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
- 7, (server, call, details, initial_metadata, cq_bound_to_call,
- cq_for_notification, tag));
+ 7,
+ (server, call, details, initial_metadata, cq_bound_to_call,
+ cq_for_notification, tag));
size_t cq_idx;
for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
if (server->cqs[cq_idx] == cq_for_notification) {
@@ -1432,22 +1373,23 @@ done:
}
grpc_call_error grpc_server_request_registered_call(
- grpc_server *server, void *rmp, grpc_call **call, gpr_timespec *deadline,
- grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
- grpc_completion_queue *cq_bound_to_call,
- grpc_completion_queue *cq_for_notification, void *tag) {
+ grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
+ grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
+ grpc_completion_queue* cq_bound_to_call,
+ grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
ExecCtx _local_exec_ctx;
- requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc));
- registered_method *rm = (registered_method *)rmp;
+ requested_call* rc = (requested_call*)gpr_malloc(sizeof(*rc));
+ registered_method* rm = (registered_method*)rmp;
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
GRPC_API_TRACE(
"grpc_server_request_registered_call("
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
"optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
"tag=%p)",
- 9, (server, rmp, call, deadline, initial_metadata, optional_payload,
- cq_bound_to_call, cq_for_notification, tag));
+ 9,
+ (server, rmp, call, deadline, initial_metadata, optional_payload,
+ cq_bound_to_call, cq_for_notification, tag));
size_t cq_idx;
for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
@@ -1487,22 +1429,21 @@ done:
return error;
}
-static void fail_call(grpc_server *server, size_t cq_idx, requested_call *rc,
- grpc_error *error) {
+static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
+ grpc_error* error) {
*rc->call = NULL;
rc->initial_metadata->count = 0;
GPR_ASSERT(error != GRPC_ERROR_NONE);
- server_ref(server);
grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
&rc->completion);
}
-const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
+const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
return server->channel_args;
}
-int grpc_server_has_open_connections(grpc_server *server) {
+int grpc_server_has_open_connections(grpc_server* server) {
int r;
gpr_mu_lock(&server->mu_global);
r = server->root_channel_data.next != &server->root_channel_data;
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index 604e038b80..dfd777e8a5 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -35,27 +35,27 @@ extern grpc_tracer_flag grpc_server_channel_trace;
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
-void grpc_server_add_listener(grpc_server *server, void *listener,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset **pollsets,
+void grpc_server_add_listener(grpc_server* server, void* listener,
+ void (*start)(grpc_server* server, void* arg,
+ grpc_pollset** pollsets,
size_t npollsets),
- void (*destroy)(grpc_server *server, void *arg,
- grpc_closure *on_done));
+ void (*destroy)(grpc_server* server, void* arg,
+ grpc_closure* on_done));
/* Setup a transport - creates a channel stack, binds the transport to the
server */
-void grpc_server_setup_transport(grpc_server *server, grpc_transport *transport,
- grpc_pollset *accepting_pollset,
- const grpc_channel_args *args);
+void grpc_server_setup_transport(grpc_server* server, grpc_transport* transport,
+ grpc_pollset* accepting_pollset,
+ const grpc_channel_args* args);
-const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);
+const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server);
-int grpc_server_has_open_connections(grpc_server *server);
+int grpc_server_has_open_connections(grpc_server* server);
/* Do not call this before grpc_server_start. Returns the pollsets and the
* number of pollsets via 'pollsets' and 'pollset_count'. */
-void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
- size_t *pollset_count);
+void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
+ size_t* pollset_count);
#ifdef __cplusplus
}
diff --git a/src/core/lib/surface/validate_metadata.cc b/src/core/lib/surface/validate_metadata.cc
index 81d07fae44..fc94ea7dbe 100644
--- a/src/core/lib/surface/validate_metadata.cc
+++ b/src/core/lib/surface/validate_metadata.cc
@@ -28,17 +28,17 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/validate_metadata.h"
-static grpc_error *conforms_to(grpc_slice slice, const uint8_t *legal_bits,
- const char *err_desc) {
- const uint8_t *p = GRPC_SLICE_START_PTR(slice);
- const uint8_t *e = GRPC_SLICE_END_PTR(slice);
+static grpc_error* conforms_to(grpc_slice slice, const uint8_t* legal_bits,
+ const char* err_desc) {
+ const uint8_t* p = GRPC_SLICE_START_PTR(slice);
+ const uint8_t* e = GRPC_SLICE_END_PTR(slice);
for (; p != e; p++) {
int idx = *p;
int byte = idx / 8;
int bit = idx % 8;
if ((legal_bits[byte] & (1 << bit)) == 0) {
- char *dump = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- grpc_error *error = grpc_error_set_str(
+ char* dump = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ grpc_error* error = grpc_error_set_str(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(err_desc),
GRPC_ERROR_INT_OFFSET,
p - GRPC_SLICE_START_PTR(slice)),
@@ -50,13 +50,13 @@ static grpc_error *conforms_to(grpc_slice slice, const uint8_t *legal_bits,
return GRPC_ERROR_NONE;
}
-static int error2int(grpc_error *error) {
+static int error2int(grpc_error* error) {
int r = (error == GRPC_ERROR_NONE);
GRPC_ERROR_UNREF(error);
return r;
}
-grpc_error *grpc_validate_header_key_is_legal(grpc_slice slice) {
+grpc_error* grpc_validate_header_key_is_legal(grpc_slice slice) {
static const uint8_t legal_header_bits[256 / 8] = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0x03, 0x00, 0x00, 0x00,
0x80, 0xfe, 0xff, 0xff, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
@@ -76,7 +76,7 @@ int grpc_header_key_is_legal(grpc_slice slice) {
return error2int(grpc_validate_header_key_is_legal(slice));
}
-grpc_error *grpc_validate_header_nonbin_value_is_legal(grpc_slice slice) {
+grpc_error* grpc_validate_header_nonbin_value_is_legal(grpc_slice slice) {
static const uint8_t legal_header_bits[256 / 8] = {
0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
diff --git a/src/core/lib/surface/validate_metadata.h b/src/core/lib/surface/validate_metadata.h
index afc8be6dfd..9ca20692b5 100644
--- a/src/core/lib/surface/validate_metadata.h
+++ b/src/core/lib/surface/validate_metadata.h
@@ -26,8 +26,8 @@
extern "C" {
#endif
-grpc_error *grpc_validate_header_key_is_legal(grpc_slice slice);
-grpc_error *grpc_validate_header_nonbin_value_is_legal(grpc_slice slice);
+grpc_error* grpc_validate_header_key_is_legal(grpc_slice slice);
+grpc_error* grpc_validate_header_nonbin_value_is_legal(grpc_slice slice);
#ifdef __cplusplus
}
diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc
index 6cb8e7e1a0..f4feadc640 100644
--- a/src/core/lib/surface/version.cc
+++ b/src/core/lib/surface/version.cc
@@ -21,6 +21,6 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "5.0.0-dev"; }
+const char* grpc_version_string(void) { return "5.0.0-dev"; }
-const char *grpc_g_stands_for(void) { return "generous"; }
+const char* grpc_g_stands_for(void) { return "generous"; }