aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c99
-rw-r--r--src/core/surface/call.h4
-rw-r--r--src/core/surface/channel.c19
-rw-r--r--src/core/surface/completion_queue.c5
-rw-r--r--src/core/surface/server.c4
5 files changed, 115 insertions, 16 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 327a096ffb..d3e66e9c4c 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -143,6 +143,8 @@ typedef enum {
struct grpc_call {
grpc_completion_queue *cq;
grpc_channel *channel;
+ grpc_call *parent;
+ grpc_call *first_child;
grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
@@ -176,6 +178,8 @@ struct grpc_call {
gpr_uint8 cancel_alarm;
/** bitmask of allocated completion events in completions */
gpr_uint8 allocated_completions;
+ /** flag indicating that cancellation is inherited */
+ gpr_uint8 cancellation_is_inherited;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
@@ -267,6 +271,11 @@ struct grpc_call {
/** completion events - for completion queue use */
grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
+
+ /** siblings: children of the same parent form a list, and this list is protected under
+ parent->mu */
+ grpc_call *sibling_next;
+ grpc_call *sibling_prev;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -290,7 +299,9 @@ static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
-grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
+grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
+ gpr_uint32 propagation_mask,
+ grpc_completion_queue *cq,
const void *server_transport_data,
grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count,
@@ -306,9 +317,10 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_mu_init(&call->completion_mu);
call->channel = channel;
call->cq = cq;
- if (cq) {
+ if (cq != NULL) {
GRPC_CQ_INTERNAL_REF(cq, "bind");
}
+ call->parent = parent_call;
call->is_client = server_transport_data == NULL;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
call->request_set[i] = REQSET_EMPTY;
@@ -347,6 +359,46 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
+ if (parent_call != NULL) {
+ GRPC_CALL_INTERNAL_REF(parent_call, "child");
+ GPR_ASSERT(call->is_client);
+ GPR_ASSERT(!parent_call->is_client);
+
+ gpr_mu_lock(&parent_call->mu);
+
+ if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
+ send_deadline = gpr_time_min(
+ gpr_convert_clock_type(send_deadline,
+ parent_call->send_deadline.clock_type),
+ parent_call->send_deadline);
+ }
+ /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
+ * GRPC_PROPAGATE_STATS_CONTEXT */
+ /* TODO(ctiller): This should change to use the appropriate census start_op
+ * call. */
+ if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
+ GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
+ grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
+ parent_call->context[GRPC_CONTEXT_TRACING].value,
+ NULL);
+ } else {
+ GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
+ }
+ if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
+ call->cancellation_is_inherited = 1;
+ }
+
+ if (parent_call->first_child == NULL) {
+ parent_call->first_child = call;
+ call->sibling_next = call->sibling_prev = call;
+ } else {
+ call->sibling_next = parent_call->first_child;
+ call->sibling_prev = parent_call->first_child->sibling_prev;
+ call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call;
+ }
+
+ gpr_mu_unlock(&parent_call->mu);
+ }
if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
0) {
set_deadline_alarm(call, send_deadline);
@@ -404,6 +456,20 @@ void grpc_call_internal_ref(grpc_call *c) {
static void destroy_call(void *call, int ignored_success) {
size_t i;
grpc_call *c = call;
+ grpc_call *parent = c->parent;
+ if (parent) {
+ gpr_mu_lock(&parent->mu);
+ if (call == parent->first_child) {
+ parent->first_child = c->sibling_next;
+ if (c == parent->first_child) {
+ parent->first_child = NULL;
+ }
+ c->sibling_prev->sibling_next = c->sibling_next;
+ c->sibling_next->sibling_prev = c->sibling_prev;
+ }
+ gpr_mu_unlock(&parent->mu);
+ GRPC_CALL_INTERNAL_UNREF(parent, "child", 1);
+ }
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
gpr_mu_destroy(&c->mu);
@@ -870,6 +936,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc;
+ grpc_call *child_call;
+ grpc_call *next_child_call;
size_t i;
GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
lock(call);
@@ -903,6 +971,19 @@ static void call_on_done_recv(void *pc, int success) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
call->cancel_alarm |= call->have_alarm;
+ /* propagate cancellation to any interested children */
+ child_call = call->first_child;
+ if (child_call != NULL) {
+ do {
+ next_child_call = child_call->sibling_next;
+ if (child_call->cancellation_is_inherited) {
+ GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
+ grpc_call_cancel(child_call);
+ GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
+ }
+ child_call = next_child_call;
+ } while (child_call != call->first_child);
+ }
GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
}
finish_read_ops(call);
@@ -1283,9 +1364,9 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
- grpc_alarm_init(&call->alarm,
- gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC));
+ call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call,
+ gpr_now(GPR_CLOCK_MONOTONIC));
}
/* we offset status by a small amount when storing it into transport metadata
@@ -1377,7 +1458,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
}
}
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
- 0) {
+ 0 &&
+ !call->is_client) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
@@ -1465,6 +1547,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
if (!are_write_flags_valid(op->flags)) {
return GRPC_CALL_ERROR_INVALID_FLAGS;
}
+ if (op->data.send_message == NULL) {
+ return GRPC_CALL_ERROR_INVALID_MESSAGE;
+ }
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
@@ -1514,6 +1599,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
+ req->data.recv_metadata->count = 0;
req->flags = op->flags;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1545,6 +1631,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
req->data.recv_metadata =
op->data.recv_status_on_client.trailing_metadata;
+ req->data.recv_metadata->count = 0;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 265638d519..75bdbce980 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -85,7 +85,9 @@ typedef struct {
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
void *user_data);
-grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
+grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
+ gpr_uint32 propagation_mask,
+ grpc_completion_queue *cq,
const void *server_transport_data,
grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count,
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index c10547133e..c87937f669 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -147,7 +147,8 @@ char *grpc_channel_get_target(grpc_channel *channel) {
}
static grpc_call *grpc_channel_create_call_internal(
- grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
+ grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
+ grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
grpc_mdelem *send_metadata[2];
int num_metadata = 0;
@@ -159,16 +160,18 @@ static grpc_call *grpc_channel_create_call_internal(
send_metadata[num_metadata++] = authority_mdelem;
}
- return grpc_call_create(channel, cq, NULL, send_metadata,
- num_metadata, deadline);
+ return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL,
+ send_metadata, num_metadata, deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
+ grpc_call *parent_call,
+ gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
return grpc_channel_create_call_internal(
- channel, cq,
+ channel, parent_call, propagation_mask, cq,
grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0)),
@@ -196,11 +199,13 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
}
grpc_call *grpc_channel_create_registered_call(
- grpc_channel *channel, grpc_completion_queue *completion_queue,
- void *registered_call_handle, gpr_timespec deadline) {
+ grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
+ grpc_completion_queue *completion_queue, void *registered_call_handle,
+ gpr_timespec deadline) {
registered_call *rc = registered_call_handle;
return grpc_channel_create_call_internal(
- channel, completion_queue, GRPC_MDELEM_REF(rc->path),
+ channel, parent_call, propagation_mask, completion_queue,
+ GRPC_MDELEM_REF(rc->path),
rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 00429fac19..36d69cfe5f 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -114,6 +114,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
}
void grpc_cq_begin_op(grpc_completion_queue *cc) {
+#ifndef NDEBUG
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ GPR_ASSERT(!cc->shutdown_called);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+#endif
gpr_ref(&cc->pending_events);
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 29d893db71..cd1dc589e1 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -644,8 +644,8 @@ static void accept_stream(void *cd, grpc_transport *transport,
const void *transport_server_data) {
channel_data *chand = cd;
/* create a call */
- grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL,
+ 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {