aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c43
-rw-r--r--src/core/surface/channel.c5
-rw-r--r--src/core/surface/channel_create.c3
-rw-r--r--src/core/surface/init.c6
-rw-r--r--src/core/surface/secure_channel_create.c3
-rw-r--r--src/core/surface/server.c46
6 files changed, 70 insertions, 36 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index e3995a407b..88ff5cfbce 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -31,6 +31,7 @@
*
*/
+#include "src/core/census/grpc_context.h"
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
@@ -226,6 +227,7 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ grpc_iomgr_closure destroy_closure;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -242,9 +244,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
static void execute_op(grpc_call *call, grpc_transport_op *op);
static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
-static grpc_call_error cancel_with_status(
- grpc_call *c, grpc_status_code status, const char *description,
- gpr_uint8 locked);
+static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
+ const char *description,
+ gpr_uint8 locked);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -268,6 +270,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
if (call->is_client) {
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
+ call->context[GRPC_CONTEXT_TRACING].value = grpc_census_context_create();
+ call->context[GRPC_CONTEXT_TRACING].destroy = grpc_census_context_destroy;
}
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
for (i = 0; i < add_initial_metadata_count; i++) {
@@ -367,7 +371,9 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
if (allow_immediate_deletion) {
destroy_call(c, 1);
} else {
- grpc_iomgr_add_callback(destroy_call, c);
+ c->destroy_closure.cb = destroy_call;
+ c->destroy_closure.cb_arg = c;
+ grpc_iomgr_add_callback(&c->destroy_closure);
}
}
}
@@ -403,7 +409,8 @@ static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
- (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
+ (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
+ grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
@@ -556,13 +563,13 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
break;
case GRPC_IOREQ_RECV_INITIAL_METADATA:
GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0],
- *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
- .recv_metadata);
+ *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
+ .recv_metadata);
break;
case GRPC_IOREQ_RECV_TRAILING_METADATA:
GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1],
- *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
- .recv_metadata);
+ *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
+ .recv_metadata);
break;
case GRPC_IOREQ_OP_COUNT:
abort();
@@ -676,9 +683,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
}
/* we have to be reading a message to know what to do here */
if (!call->reading_message) {
- cancel_with_status(
- call, GRPC_STATUS_INVALID_ARGUMENT,
- "Received payload data while not reading a message", 1);
+ cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
+ "Received payload data while not reading a message", 1);
return 0;
}
/* append the slice to the incoming buffer */
@@ -1025,9 +1031,9 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return cancel_with_status(c, status, description, 0);
}
-static grpc_call_error cancel_with_status(
- grpc_call *c, grpc_status_code status, const char *description,
- gpr_uint8 locked) {
+static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
+ const char *description,
+ gpr_uint8 locked) {
grpc_transport_op op;
grpc_mdstr *details =
description ? grpc_mdstr_from_string(c->metadata_context, description)
@@ -1294,12 +1300,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_cq_begin_op(call->cq, call);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func,
- tag);
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}
-void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *value,
- void (*destroy)(void *value)) {
+void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
+ void *value, void (*destroy)(void *value)) {
if (call->context[elem].destroy) {
call->context[elem].destroy(call->context[elem].value);
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index be9da2b7f9..947011c613 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -61,6 +61,7 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
+ grpc_iomgr_closure destroy_closure;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -193,7 +194,9 @@ static void destroy_channel(void *p, int ok) {
void grpc_channel_internal_unref(grpc_channel *channel) {
if (gpr_unref(&channel->refs)) {
- grpc_iomgr_add_callback(destroy_channel, channel);
+ channel->destroy_closure.cb = destroy_channel;
+ channel->destroy_closure.cb_arg = channel;
+ grpc_iomgr_add_callback(&channel->destroy_closure);
}
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index daa8d3a7c6..9fa6696bf6 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -195,9 +195,10 @@ grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_filter *filters[MAX_FILTERS];
int n = 0;
filters[n++] = &grpc_client_surface_filter;
+ /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
- }
+ } */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index d6eb9b2c24..ac6871c6f2 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -31,11 +31,11 @@
*
*/
+#include <grpc/census.h>
#include <grpc/grpc.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/debug/trace.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/statistics/census_interface.h"
#include "src/core/profiling/timers.h"
#include "src/core/surface/call.h"
#include "src/core/surface/init.h"
@@ -64,7 +64,9 @@ void grpc_init(void) {
grpc_security_pre_init();
grpc_iomgr_init();
grpc_tracer_init("GRPC_TRACE");
- census_init();
+ if (census_initialize(CENSUS_NONE)) {
+ gpr_log(GPR_ERROR, "Could not initialize census.");
+ }
grpc_timers_global_init();
}
gpr_mu_unlock(&g_init_mu);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index a71d12291e..8ef121dc48 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -234,9 +234,10 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg);
filters[n++] = &grpc_client_surface_filter;
+ /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
- }
+ } */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 71b1dd0630..f4aa6d5b2a 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -122,6 +122,8 @@ struct channel_data {
channel_registered_method *registered_methods;
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
+ grpc_iomgr_closure finish_shutdown_channel_closure;
+ grpc_iomgr_closure finish_destroy_channel_closure;
};
typedef struct shutdown_tag {
@@ -183,6 +185,8 @@ struct call_data {
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
+ grpc_iomgr_closure kill_zombie_closure;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -312,7 +316,9 @@ static void destroy_channel(channel_data *chand) {
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
server_ref(chand->server);
- grpc_iomgr_add_callback(finish_destroy_channel, chand);
+ chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
+ chand->finish_destroy_channel_closure.cb_arg = chand;
+ grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
}
static void finish_start_new_rpc_and_unlock(grpc_server *server,
@@ -444,7 +450,8 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -452,11 +459,14 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+
}
if (call_list_remove(calld, ALL_CALLS)) {
maybe_finish_shutdown(chand->server);
@@ -533,7 +543,9 @@ static void finish_shutdown_channel(void *cd, int success) {
static void shutdown_channel(channel_data *chand) {
grpc_channel_internal_ref(chand->channel);
- grpc_iomgr_add_callback(finish_shutdown_channel, chand);
+ chand->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
+ chand->finish_shutdown_channel_closure.cb_arg = chand;
+ grpc_iomgr_add_callback(&chand->finish_shutdown_channel_closure);
}
static void init_call_elem(grpc_call_element *elem,
@@ -621,9 +633,15 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
- destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "server",
+ server_start_transport_op,
+ channel_op,
+ sizeof(call_data),
+ init_call_elem,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ "server",
};
void grpc_server_register_completion_queue(grpc_server *server,
@@ -643,7 +661,9 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args) {
size_t i;
- int census_enabled = grpc_channel_args_is_census_enabled(args);
+ /* TODO(census): restore this once we finalize census filter etc.
+ int census_enabled = grpc_channel_args_is_census_enabled(args); */
+ int census_enabled = 0;
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -668,9 +688,10 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
server->channel_filters =
gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
server->channel_filters[0] = &server_surface_filter;
+ /* TODO(census): restore this once we rework census filter
if (census_enabled) {
server->channel_filters[1] = &grpc_server_census_filter;
- }
+ } */
for (i = 0; i < filter_count; i++) {
server->channel_filters[i + 1 + census_enabled] = filters[i];
}
@@ -969,9 +990,10 @@ void grpc_server_destroy(grpc_server *server) {
while ((calld = call_list_remove_head(&server->lists[PENDING_START],
PENDING_START)) != NULL) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(
- kill_zombie,
+ grpc_iomgr_closure_init(
+ &calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
for (c = server->root_channel_data.next; c != &server->root_channel_data;