aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/byte_buffer_reader.c10
-rw-r--r--src/core/lib/surface/call.c115
-rw-r--r--src/core/lib/surface/call.h1
-rw-r--r--src/core/lib/surface/channel.c18
-rw-r--r--src/core/lib/surface/completion_queue.c38
-rw-r--r--src/core/lib/surface/completion_queue.h4
-rw-r--r--src/core/lib/surface/init.c7
-rw-r--r--src/core/lib/surface/server.c76
-rw-r--r--src/core/lib/surface/surface_trace.h48
-rw-r--r--src/core/lib/surface/version.c2
10 files changed, 162 insertions, 157 deletions
diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c
index c97079f638..310bacb2c9 100644
--- a/src/core/lib/surface/byte_buffer_reader.c
+++ b/src/core/lib/surface/byte_buffer_reader.c
@@ -54,8 +54,8 @@ static int is_compressed(grpc_byte_buffer *buffer) {
return 1 /* GPR_TRUE */;
}
-void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
- grpc_byte_buffer *buffer) {
+int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
+ grpc_byte_buffer *buffer) {
gpr_slice_buffer decompressed_slices_buffer;
reader->buffer_in = buffer;
switch (reader->buffer_in->type) {
@@ -67,9 +67,10 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
&decompressed_slices_buffer) == 0) {
gpr_log(GPR_ERROR,
"Unexpected error decompressing data for algorithm with enum "
- "value '%d'. Reading data as if it were uncompressed.",
+ "value '%d'.",
reader->buffer_in->data.raw.compression);
- reader->buffer_out = reader->buffer_in;
+ memset(reader, 0, sizeof(*reader));
+ return 0;
} else { /* all fine */
reader->buffer_out =
grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
@@ -82,6 +83,7 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
reader->current.index = 0;
break;
}
+ return 1;
}
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 04291b0ee0..fc9df76dc1 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -259,7 +259,8 @@ grpc_call *grpc_call_create(
call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
}
}
- call->send_deadline = send_deadline;
+ call->send_deadline =
+ gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
GRPC_CHANNEL_INTERNAL_REF(channel, "call");
/* initial refcount dropped by grpc_call_destroy */
grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
@@ -402,8 +403,51 @@ static void set_status_code(grpc_call *call, status_source source,
call->status[source].is_set = 1;
call->status[source].code = (grpc_status_code)status;
+}
- /* TODO(ctiller): what to do about the flush that was previously here */
+static void set_status_details(grpc_call *call, status_source source,
+ grpc_mdstr *status) {
+ if (call->status[source].details != NULL) {
+ GRPC_MDSTR_UNREF(status);
+ } else {
+ call->status[source].details = status;
+ }
+}
+
+static void get_final_status(grpc_call *call,
+ void (*set_value)(grpc_status_code code,
+ void *user_data),
+ void *set_value_user_data) {
+ int i;
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ if (call->status[i].is_set) {
+ set_value(call->status[i].code, set_value_user_data);
+ return;
+ }
+ }
+ if (call->is_client) {
+ set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
+ } else {
+ set_value(GRPC_STATUS_OK, set_value_user_data);
+ }
+}
+
+static void set_status_from_error(grpc_call *call, status_source source,
+ grpc_error *error) {
+ intptr_t status;
+ if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
+ set_status_code(call, source, (uint32_t)status);
+ } else {
+ set_status_code(call, source, GRPC_STATUS_INTERNAL);
+ }
+ const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
+ bool free_msg = false;
+ if (msg == NULL) {
+ free_msg = true;
+ msg = grpc_error_string(error);
+ }
+ set_status_details(call, source, grpc_mdstr_from_string(msg));
+ if (free_msg) grpc_error_free_string(msg);
}
static void set_incoming_compression_algorithm(
@@ -492,32 +536,6 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
return encodings_accepted_by_peer;
}
-static void set_status_details(grpc_call *call, status_source source,
- grpc_mdstr *status) {
- if (call->status[source].details != NULL) {
- GRPC_MDSTR_UNREF(call->status[source].details);
- }
- call->status[source].details = status;
-}
-
-static void get_final_status(grpc_call *call,
- void (*set_value)(grpc_status_code code,
- void *user_data),
- void *set_value_user_data) {
- int i;
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set) {
- set_value(call->status[i].code, set_value_user_data);
- return;
- }
- }
- if (call->is_client) {
- set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
- } else {
- set_value(GRPC_STATUS_OK, set_value_user_data);
- }
-}
-
static void get_final_details(grpc_call *call, char **out_details,
size_t *out_details_capacity) {
int i;
@@ -741,8 +759,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
- grpc_status_code status;
- gpr_slice optional_message;
+ grpc_error *error;
grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
} termination_closure;
@@ -758,7 +775,7 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close");
break;
}
- gpr_slice_unref(tc->optional_message);
+ GRPC_ERROR_UNREF(tc->error);
grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL);
gpr_free(tc);
}
@@ -767,7 +784,7 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
grpc_transport_stream_op op;
termination_closure *tc = tcp;
memset(&op, 0, sizeof(op));
- op.cancel_with_status = tc->status;
+ op.cancel_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
op.on_complete = &tc->closure;
@@ -778,8 +795,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
grpc_transport_stream_op op;
termination_closure *tc = tcp;
memset(&op, 0, sizeof(op));
- tc->optional_message = gpr_slice_ref(tc->optional_message);
- grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message);
+ op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
tc->op_closure = op.on_complete;
@@ -789,14 +805,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
termination_closure *tc) {
- grpc_mdstr *details = NULL;
- if (GPR_SLICE_LENGTH(tc->optional_message) > 0) {
- tc->optional_message = gpr_slice_ref(tc->optional_message);
- details = grpc_mdstr_from_slice(tc->optional_message);
- }
-
- set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status);
- set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details);
+ set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error);
if (tc->type == TC_CANCEL) {
grpc_closure_init(&tc->closure, send_cancel, tc);
@@ -812,13 +821,15 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(termination_closure));
tc->type = TC_CANCEL;
tc->call = c;
- tc->optional_message = gpr_slice_from_copied_string(description);
- GPR_ASSERT(status != GRPC_STATUS_OK);
- tc->status = status;
+ tc->error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE(description),
+ GRPC_ERROR_STR_GRPC_MESSAGE, description),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
return terminate_with_status(exec_ctx, tc);
}
@@ -826,13 +837,15 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(termination_closure));
tc->type = TC_CLOSE;
tc->call = c;
- tc->optional_message = gpr_slice_from_copied_string(description);
- GPR_ASSERT(status != GRPC_STATUS_OK);
- tc->status = status;
+ tc->error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE(description),
+ GRPC_ERROR_STR_GRPC_MESSAGE, description),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
return terminate_with_status(exec_ctx, tc);
}
@@ -1194,7 +1207,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
- char *algo_name;
+ char *algo_name = NULL;
grpc_compression_algorithm_name(algo, &algo_name);
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
@@ -1213,7 +1226,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
call->incoming_compression_algorithm)) {
extern int grpc_compression_trace;
if (grpc_compression_trace) {
- char *algo_name;
+ char *algo_name = NULL;
grpc_compression_algorithm_name(call->incoming_compression_algorithm,
&algo_name);
gpr_log(GPR_ERROR,
@@ -1414,7 +1427,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
const grpc_compression_algorithm calgo =
compression_algorithm_for_level_locked(
call, effective_compression_level);
- char *calgo_name;
+ char *calgo_name = NULL;
grpc_compression_algorithm_name(calgo, &calgo_name);
// the following will be picked up by the compress filter and used as
// the call's compression algorithm.
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index b640345c21..3a78fe3aa3 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -37,7 +37,6 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/surface/api_trace.h"
-#include "src/core/lib/surface/surface_trace.h"
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 8936e7164f..6d2b1c4935 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -81,7 +81,7 @@ struct grpc_channel {
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024)
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
@@ -223,11 +223,12 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
"grpc_channel_create_call("
"channel=%p, parent_call=%p, propagation_mask=%x, cq=%p, method=%s, "
"host=%s, "
- "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
+ "deadline=gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 10, (channel, parent_call, (unsigned)propagation_mask, cq, method, host,
- (long long)deadline.tv_sec, (int)deadline.tv_nsec,
- (int)deadline.clock_type, reserved));
+ 10,
+ (channel, parent_call, (unsigned)propagation_mask, cq, method, host,
+ deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved));
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, cq, NULL,
@@ -282,11 +283,12 @@ grpc_call *grpc_channel_create_registered_call(
"grpc_channel_create_registered_call("
"channel=%p, parent_call=%p, propagation_mask=%x, completion_queue=%p, "
"registered_call_handle=%p, "
- "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
+ "deadline=gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
9, (channel, parent_call, (unsigned)propagation_mask, completion_queue,
- registered_call_handle, (long long)deadline.tv_sec,
- (int)deadline.tv_nsec, (int)deadline.clock_type, reserved));
+ registered_call_handle, deadline.tv_sec, deadline.tv_nsec,
+ (int)deadline.clock_type, reserved));
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, completion_queue, NULL,
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 2cc6aa74e0..5978884db8 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -48,7 +48,6 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/event_string.h"
-#include "src/core/lib/surface/surface_trace.h"
int grpc_trace_operation_failures;
@@ -93,6 +92,17 @@ struct grpc_completion_queue {
static gpr_mu g_freelist_mu;
static grpc_completion_queue *g_freelist;
+int grpc_cq_pluck_trace;
+int grpc_cq_event_timeout_trace;
+
+#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
+ if (grpc_api_trace && \
+ (grpc_cq_pluck_trace || (event)->type != GRPC_QUEUE_TIMEOUT)) { \
+ char *_ev = grpc_event_string(event); \
+ gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
+ gpr_free(_ev); \
+ }
+
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error);
@@ -240,7 +250,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
"grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, "
"done_arg=%p, storage=%p)",
7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
- if (grpc_trace_operation_failures) {
+ if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
grpc_error_free_string(errmsg);
@@ -316,10 +326,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
GRPC_API_TRACE(
"grpc_completion_queue_next("
"cc=%p, "
- "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
+ "deadline=gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
"reserved=%p)",
- 5, (cc, (long long)deadline.tv_sec, (int)deadline.tv_nsec,
- (int)deadline.clock_type, reserved));
+ 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
+ reserved));
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -425,13 +436,16 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
- GRPC_API_TRACE(
- "grpc_completion_queue_pluck("
- "cc=%p, tag=%p, "
- "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
- "reserved=%p)",
- 6, (cc, tag, (long long)deadline.tv_sec, (int)deadline.tv_nsec,
- (int)deadline.clock_type, reserved));
+ if (grpc_cq_pluck_trace) {
+ GRPC_API_TRACE(
+ "grpc_completion_queue_pluck("
+ "cc=%p, tag=%p, "
+ "deadline=gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
+ "reserved=%p)",
+ 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec,
+ (int)deadline.clock_type, reserved));
+ }
GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index b9dd1092b6..3049284f68 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -39,6 +39,10 @@
#include <grpc/grpc.h>
#include "src/core/lib/iomgr/pollset.h"
+/* These trace flags default to 1. The corresponding lines are only traced
+ if grpc_api_trace is also truthy */
+extern int grpc_cq_pluck_trace;
+extern int grpc_cq_event_timeout_trace;
extern int grpc_trace_operation_failures;
typedef struct grpc_cq_completion {
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index f07039cb94..5397913a21 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -57,7 +57,6 @@
#include "src/core/lib/surface/init.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/surface/server.h"
-#include "src/core/lib/surface/surface_trace.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -165,6 +164,12 @@ void grpc_init(void) {
&grpc_trace_channel_stack_builder);
grpc_register_tracer("http1", &grpc_http1_trace);
grpc_register_tracer("compression", &grpc_compression_trace);
+ grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
+ // Default pluck trace to 1
+ grpc_cq_pluck_trace = 1;
+ grpc_register_tracer("queue_timeout", &grpc_cq_event_timeout_trace);
+ // Default timeout trace to 1
+ grpc_cq_event_timeout_trace = 1;
grpc_register_tracer("op_failure", &grpc_trace_operation_failures);
grpc_security_pre_init();
grpc_iomgr_init();
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index def6e5068b..2f108af48a 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct requested_call {
requested_call_type type;
+ size_t cq_idx;
void *tag;
grpc_server *server;
grpc_completion_queue *cq_bound_to_call;
@@ -206,11 +207,11 @@ struct grpc_server {
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
- /** free list of available requested_calls indices */
- gpr_stack_lockfree *request_freelist;
+ /** free list of available requested_calls_per_cq indices */
+ gpr_stack_lockfree **request_freelist_per_cq;
/** requested call backing data */
- requested_call *requested_calls;
- size_t max_requested_calls;
+ requested_call **requested_calls_per_cq;
+ int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < server->cq_count; i++) {
while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
-1) {
- fail_call(exec_ctx, server, i, &server->requested_calls[request_id],
+ fail_call(exec_ctx, server, i,
+ &server->requested_calls_per_cq[i][request_id],
GRPC_ERROR_REF(error));
}
}
@@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
+ if (server->started) {
+ gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
+ gpr_free(server->requested_calls_per_cq[i]);
+ }
}
- gpr_stack_lockfree_destroy(server->request_freelist);
+ gpr_free(server->request_freelist_per_cq);
+ gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
- gpr_free(server->requested_calls);
gpr_free(server);
}
@@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
requested_call *rc = req;
grpc_server *server = rc->server;
- if (rc >= server->requested_calls &&
- rc < server->requested_calls + server->max_requested_calls) {
- GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
- gpr_stack_lockfree_push(server->request_freelist,
- (int)(rc - server->requested_calls));
+ if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
+ rc < server->requested_calls_per_cq[rc->cq_idx] +
+ server->max_requested_calls_per_cq) {
+ GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
+ gpr_stack_lockfree_push(
+ server->request_freelist_per_cq[rc->cq_idx],
+ (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
} else {
gpr_free(req);
}
@@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls[request_id]);
+ &server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */
}
}
@@ -979,8 +987,6 @@ void grpc_server_register_non_listening_completion_queue(
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
- size_t i;
-
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -998,15 +1004,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
&server->root_channel_data;
/* TODO(ctiller): expose a channel_arg for this */
- server->max_requested_calls = 32768;
- server->request_freelist =
- gpr_stack_lockfree_create(server->max_requested_calls);
- for (i = 0; i < (size_t)server->max_requested_calls; i++) {
- gpr_stack_lockfree_push(server->request_freelist, (int)i);
- }
- server->requested_calls = gpr_malloc(server->max_requested_calls *
- sizeof(*server->requested_calls));
-
+ server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -1066,16 +1064,28 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
size_t pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->request_freelist_per_cq =
+ gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
+ server->requested_calls_per_cq =
+ gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]);
}
+ server->request_freelist_per_cq[i] =
+ gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
+ for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
+ gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
+ }
+ server->requested_calls_per_cq[i] =
+ gpr_malloc((size_t)server->max_requested_calls_per_cq *
+ sizeof(*server->requested_calls_per_cq[i]));
}
request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls, server);
+ (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher, server->max_requested_calls,
- server);
+ request_matcher_init(&rm->request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
}
for (l = server->listeners; l; l = l->next) {
@@ -1307,11 +1317,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_CREATE("Server Shutdown"));
return GRPC_CALL_OK;
}
- request_id = gpr_stack_lockfree_pop(server->request_freelist);
+ request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
if (request_id == -1) {
/* out of request ids: just fail this one */
fail_call(exec_ctx, server, cq_idx, rc,
- GRPC_ERROR_CREATE("Server Shutdown"));
+ grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"),
+ GRPC_ERROR_INT_LIMIT,
+ server->max_requested_calls_per_cq));
return GRPC_CALL_OK;
}
switch (rc->type) {
@@ -1322,7 +1334,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher;
break;
}
- server->requested_calls[request_id] = *rc;
+ server->requested_calls_per_cq[cq_idx][request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start
@@ -1346,7 +1358,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls[request_id]);
+ &server->requested_calls_per_cq[cq_idx][request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1382,6 +1394,7 @@ grpc_call_error grpc_server_request_call(
}
grpc_cq_begin_op(cq_for_notification, tag);
details->reserved = NULL;
+ rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
rc->server = server;
rc->tag = tag;
@@ -1430,6 +1443,7 @@ grpc_call_error grpc_server_request_registered_call(
goto done;
}
grpc_cq_begin_op(cq_for_notification, tag);
+ rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
rc->tag = tag;
diff --git a/src/core/lib/surface/surface_trace.h b/src/core/lib/surface/surface_trace.h
deleted file mode 100644
index a69a0fff57..0000000000
--- a/src/core/lib/surface/surface_trace.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_SURFACE_SURFACE_TRACE_H
-#define GRPC_CORE_LIB_SURFACE_SURFACE_TRACE_H
-
-#include <grpc/support/log.h>
-#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/surface/api_trace.h"
-
-#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
- if (grpc_api_trace) { \
- char *_ev = grpc_event_string(event); \
- gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
- gpr_free(_ev); \
- }
-
-#endif /* GRPC_CORE_LIB_SURFACE_SURFACE_TRACE_H */
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index aca76d2bb7..1942075054 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,4 +36,4 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "0.15.0-dev"; }
+const char *grpc_version_string(void) { return "1.1.0-dev"; }