diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 99 | ||||
-rw-r--r-- | src/core/surface/call.h | 4 | ||||
-rw-r--r-- | src/core/surface/channel.c | 19 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 5 | ||||
-rw-r--r-- | src/core/surface/server.c | 4 |
5 files changed, 115 insertions, 16 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 327a096ffb..d3e66e9c4c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -143,6 +143,8 @@ typedef enum { struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; + grpc_call *parent; + grpc_call *first_child; grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; @@ -176,6 +178,8 @@ struct grpc_call { gpr_uint8 cancel_alarm; /** bitmask of allocated completion events in completions */ gpr_uint8 allocated_completions; + /** flag indicating that cancellation is inherited */ + gpr_uint8 cancellation_is_inherited; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -267,6 +271,11 @@ struct grpc_call { /** completion events - for completion queue use */ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; + + /** siblings: children of the same parent form a list, and this list is protected under + parent->mu */ + grpc_call *sibling_next; + grpc_call *sibling_prev; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -290,7 +299,9 @@ static void finished_loose_op(void *call, int success); static void lock(grpc_call *call); static void unlock(grpc_call *call); -grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, + gpr_uint32 propagation_mask, + grpc_completion_queue *cq, const void *server_transport_data, grpc_mdelem **add_initial_metadata, size_t add_initial_metadata_count, @@ -306,9 +317,10 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; - if (cq) { + if (cq != NULL) { GRPC_CQ_INTERNAL_REF(cq, "bind"); } + call->parent = parent_call; call->is_client = server_transport_data == NULL; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { call->request_set[i] = REQSET_EMPTY; @@ -347,6 +359,46 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); + if (parent_call != NULL) { + GRPC_CALL_INTERNAL_REF(parent_call, "child"); + GPR_ASSERT(call->is_client); + GPR_ASSERT(!parent_call->is_client); + + gpr_mu_lock(&parent_call->mu); + + if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { + send_deadline = gpr_time_min( + gpr_convert_clock_type(send_deadline, + parent_call->send_deadline.clock_type), + parent_call->send_deadline); + } + /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with + * GRPC_PROPAGATE_STATS_CONTEXT */ + /* TODO(ctiller): This should change to use the appropriate census start_op + * call. */ + if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { + GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, + parent_call->context[GRPC_CONTEXT_TRACING].value, + NULL); + } else { + GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); + } + if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { + call->cancellation_is_inherited = 1; + } + + if (parent_call->first_child == NULL) { + parent_call->first_child = call; + call->sibling_next = call->sibling_prev = call; + } else { + call->sibling_next = parent_call->first_child; + call->sibling_prev = parent_call->first_child->sibling_prev; + call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call; + } + + gpr_mu_unlock(&parent_call->mu); + } if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) { set_deadline_alarm(call, send_deadline); @@ -404,6 +456,20 @@ void grpc_call_internal_ref(grpc_call *c) { static void destroy_call(void *call, int ignored_success) { size_t i; grpc_call *c = call; + grpc_call *parent = c->parent; + if (parent) { + gpr_mu_lock(&parent->mu); + if (call == parent->first_child) { + parent->first_child = c->sibling_next; + if (c == parent->first_child) { + parent->first_child = NULL; + } + c->sibling_prev->sibling_next = c->sibling_next; + c->sibling_next->sibling_prev = c->sibling_prev; + } + gpr_mu_unlock(&parent->mu); + GRPC_CALL_INTERNAL_UNREF(parent, "child", 1); + } grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); gpr_mu_destroy(&c->mu); @@ -870,6 +936,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { static void call_on_done_recv(void *pc, int success) { grpc_call *call = pc; + grpc_call *child_call; + grpc_call *next_child_call; size_t i; GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0); lock(call); @@ -903,6 +971,19 @@ static void call_on_done_recv(void *pc, int success) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; call->cancel_alarm |= call->have_alarm; + /* propagate cancellation to any interested children */ + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call); + GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } GRPC_CALL_INTERNAL_UNREF(call, "closed", 0); } finish_read_ops(call); @@ -1283,9 +1364,9 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC)); + call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call, + gpr_now(GPR_CLOCK_MONOTONIC)); } /* we offset status by a small amount when storing it into transport metadata @@ -1377,7 +1458,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } } if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0) { + 0 && + !call->is_client) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { @@ -1465,6 +1547,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, if (!are_write_flags_valid(op->flags)) { return GRPC_CALL_ERROR_INVALID_FLAGS; } + if (op->data.send_message == NULL) { + return GRPC_CALL_ERROR_INVALID_MESSAGE; + } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; @@ -1514,6 +1599,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; + req->data.recv_metadata->count = 0; req->flags = op->flags; break; case GRPC_OP_RECV_MESSAGE: @@ -1545,6 +1631,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; req->data.recv_metadata = op->data.recv_status_on_client.trailing_metadata; + req->data.recv_metadata->count = 0; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; finish_func = finish_batch_with_close; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 265638d519..75bdbce980 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -85,7 +85,9 @@ typedef struct { typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, void *user_data); -grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, + gpr_uint32 propagation_mask, + grpc_completion_queue *cq, const void *server_transport_data, grpc_mdelem **add_initial_metadata, size_t add_initial_metadata_count, diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index c10547133e..c87937f669 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -147,7 +147,8 @@ char *grpc_channel_get_target(grpc_channel *channel) { } static grpc_call *grpc_channel_create_call_internal( - grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, + grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, + grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { grpc_mdelem *send_metadata[2]; int num_metadata = 0; @@ -159,16 +160,18 @@ static grpc_call *grpc_channel_create_call_internal( send_metadata[num_metadata++] = authority_mdelem; } - return grpc_call_create(channel, cq, NULL, send_metadata, - num_metadata, deadline); + return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL, + send_metadata, num_metadata, deadline); } grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_call *parent_call, + gpr_uint32 propagation_mask, grpc_completion_queue *cq, const char *method, const char *host, gpr_timespec deadline) { return grpc_channel_create_call_internal( - channel, cq, + channel, parent_call, propagation_mask, cq, grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method, 0)), @@ -196,11 +199,13 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, } grpc_call *grpc_channel_create_registered_call( - grpc_channel *channel, grpc_completion_queue *completion_queue, - void *registered_call_handle, gpr_timespec deadline) { + grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, + grpc_completion_queue *completion_queue, void *registered_call_handle, + gpr_timespec deadline) { registered_call *rc = registered_call_handle; return grpc_channel_create_call_internal( - channel, completion_queue, GRPC_MDELEM_REF(rc->path), + channel, parent_call, propagation_mask, completion_queue, + GRPC_MDELEM_REF(rc->path), rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 00429fac19..36d69cfe5f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -114,6 +114,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { } void grpc_cq_begin_op(grpc_completion_queue *cc) { +#ifndef NDEBUG + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + GPR_ASSERT(!cc->shutdown_called); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +#endif gpr_ref(&cc->pending_events); } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 29d893db71..cd1dc589e1 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -644,8 +644,8 @@ static void accept_stream(void *cd, grpc_transport *transport, const void *transport_server_data) { channel_data *chand = cd; /* create a call */ - grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0, - gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL, + 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { |