aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
authorGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-08-08 01:45:38 +0200
committerGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-08-08 01:45:38 +0200
commit9d72b149a9e3462c2fa13afa27a1e52bfe7bf186 (patch)
treeeedff1af6f56fc97e61c3bee236b109b6d007d69 /src/core/surface/call.c
parentf75df57a8ffaddb11f064dfa5e54ec8404a81e08 (diff)
parent95a98ca768683f3864b1aefc9d6f266b22705b2a (diff)
Merge branch 'master' of github.com:grpc/grpc into the-ultimate-showdown
Conflicts: include/grpc/grpc.h src/core/surface/channel.c src/core/surface/channel_create.c src/core/surface/completion_queue.c src/cpp/client/channel.cc src/cpp/client/insecure_credentials.cc src/csharp/ext/grpc_csharp_ext.c src/node/ext/call.cc src/node/ext/channel.cc src/php/ext/grpc/call.c src/php/ext/grpc/channel.c src/python/grpcio/grpc/_adapter/_c/types/channel.c src/ruby/ext/grpc/rb_channel.c test/core/end2end/dualstack_socket_test.c test/core/end2end/fixtures/chttp2_fullstack.c test/core/end2end/fixtures/chttp2_fullstack_compression.c test/core/end2end/fixtures/chttp2_fullstack_uds_posix.c test/core/end2end/fixtures/chttp2_fullstack_with_poll.c test/core/end2end/multiple_server_queues_test.c test/core/end2end/no_server_test.c test/core/end2end/tests/bad_hostname.c test/core/end2end/tests/cancel_after_accept.c test/core/end2end/tests/cancel_after_accept_and_writes_closed.c test/core/end2end/tests/cancel_after_invoke.c test/core/end2end/tests/cancel_before_invoke.c test/core/end2end/tests/cancel_in_a_vacuum.c test/core/end2end/tests/census_simple_request.c test/core/end2end/tests/disappearing_server.c test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c test/core/end2end/tests/empty_batch.c test/core/end2end/tests/graceful_server_shutdown.c test/core/end2end/tests/invoke_large_request.c test/core/end2end/tests/max_concurrent_streams.c test/core/end2end/tests/max_message_length.c test/core/end2end/tests/ping_pong_streaming.c test/core/end2end/tests/registered_call.c test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c test/core/end2end/tests/request_response_with_metadata_and_payload.c test/core/end2end/tests/request_response_with_payload.c test/core/end2end/tests/request_response_with_payload_and_call_creds.c test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c test/core/end2end/tests/request_with_compressed_payload.c test/core/end2end/tests/request_with_flags.c test/core/end2end/tests/request_with_large_metadata.c test/core/end2end/tests/request_with_payload.c test/core/end2end/tests/server_finishes_request.c test/core/end2end/tests/simple_delayed_request.c test/core/end2end/tests/simple_request.c test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c test/core/fling/client.c test/core/fling/server.c test/core/surface/lame_client_test.c
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c122
1 files changed, 110 insertions, 12 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index e7d6c7d88c..c0ebd508b1 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -40,7 +40,6 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/census/grpc_context.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h"
@@ -144,6 +143,8 @@ typedef enum {
struct grpc_call {
grpc_completion_queue *cq;
grpc_channel *channel;
+ grpc_call *parent;
+ grpc_call *first_child;
grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
@@ -177,6 +178,8 @@ struct grpc_call {
gpr_uint8 cancel_alarm;
/** bitmask of allocated completion events in completions */
gpr_uint8 allocated_completions;
+ /** flag indicating that cancellation is inherited */
+ gpr_uint8 cancellation_is_inherited;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
@@ -268,6 +271,11 @@ struct grpc_call {
/** completion events - for completion queue use */
grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
+
+ /** siblings: children of the same parent form a list, and this list is protected under
+ parent->mu */
+ grpc_call *sibling_next;
+ grpc_call *sibling_prev;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -291,7 +299,9 @@ static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
-grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
+grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
+ gpr_uint32 propagation_mask,
+ grpc_completion_queue *cq,
const void *server_transport_data,
grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count,
@@ -307,9 +317,10 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_mu_init(&call->completion_mu);
call->channel = channel;
call->cq = cq;
- if (cq) {
+ if (cq != NULL) {
GRPC_CQ_INTERNAL_REF(cq, "bind");
}
+ call->parent = parent_call;
call->is_client = server_transport_data == NULL;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
call->request_set[i] = REQSET_EMPTY;
@@ -348,7 +359,48 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
- if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) {
+ if (parent_call != NULL) {
+ GRPC_CALL_INTERNAL_REF(parent_call, "child");
+ GPR_ASSERT(call->is_client);
+ GPR_ASSERT(!parent_call->is_client);
+
+ gpr_mu_lock(&parent_call->mu);
+
+ if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
+ send_deadline = gpr_time_min(
+ gpr_convert_clock_type(send_deadline,
+ parent_call->send_deadline.clock_type),
+ parent_call->send_deadline);
+ }
+ /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
+ * GRPC_PROPAGATE_STATS_CONTEXT */
+ /* TODO(ctiller): This should change to use the appropriate census start_op
+ * call. */
+ if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
+ GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
+ grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
+ parent_call->context[GRPC_CONTEXT_TRACING].value,
+ NULL);
+ } else {
+ GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
+ }
+ if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
+ call->cancellation_is_inherited = 1;
+ }
+
+ if (parent_call->first_child == NULL) {
+ parent_call->first_child = call;
+ call->sibling_next = call->sibling_prev = call;
+ } else {
+ call->sibling_next = parent_call->first_child;
+ call->sibling_prev = parent_call->first_child->sibling_prev;
+ call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call;
+ }
+
+ gpr_mu_unlock(&parent_call->mu);
+ }
+ if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
+ 0) {
set_deadline_alarm(call, send_deadline);
}
return call;
@@ -870,6 +922,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
static void call_on_done_recv(void *pc, int success) {
grpc_call *call = pc;
+ grpc_call *child_call;
+ grpc_call *next_child_call;
size_t i;
GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
lock(call);
@@ -903,6 +957,19 @@ static void call_on_done_recv(void *pc, int success) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
call->cancel_alarm |= call->have_alarm;
+ /* propagate cancellation to any interested children */
+ child_call = call->first_child;
+ if (child_call != NULL) {
+ do {
+ next_child_call = child_call->sibling_next;
+ if (child_call->cancellation_is_inherited) {
+ GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
+ grpc_call_cancel(child_call, NULL);
+ GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
+ }
+ child_call = next_child_call;
+ } while (child_call != call->first_child);
+ }
GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
}
finish_read_ops(call);
@@ -932,7 +999,7 @@ static int prepare_application_metadata(grpc_call *call, size_t count,
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
(const gpr_uint8 *)md->value,
- md->value_length);
+ md->value_length, 1);
if (!grpc_mdstr_is_legal_header(l->md->key)) {
gpr_log(GPR_ERROR, "attempt to send invalid metadata key");
return 0;
@@ -1176,6 +1243,22 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
void grpc_call_destroy(grpc_call *c) {
int cancel;
+ grpc_call *parent = c->parent;
+
+ if (parent) {
+ gpr_mu_lock(&parent->mu);
+ if (c == parent->first_child) {
+ parent->first_child = c->sibling_next;
+ if (c == parent->first_child) {
+ parent->first_child = NULL;
+ }
+ c->sibling_prev->sibling_next = c->sibling_next;
+ c->sibling_next->sibling_prev = c->sibling_prev;
+ }
+ gpr_mu_unlock(&parent->mu);
+ GRPC_CALL_INTERNAL_UNREF(parent, "child", 1);
+ }
+
lock(c);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
@@ -1207,7 +1290,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description) {
grpc_mdstr *details =
- description ? grpc_mdstr_from_string(c->metadata_context, description)
+ description ? grpc_mdstr_from_string(c->metadata_context, description, 0)
: NULL;
GPR_ASSERT(status != GRPC_STATUS_OK);
@@ -1257,6 +1340,11 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
elem->filter->start_transport_stream_op(elem, op);
}
+char *grpc_call_get_peer(grpc_call *call) {
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ return elem->filter->get_peer(elem);
+}
+
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
@@ -1282,7 +1370,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
- grpc_alarm_init(&call->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), call_alarm, call,
+ call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call,
gpr_now(GPR_CLOCK_MONOTONIC));
}
@@ -1318,15 +1407,17 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
grpc_compression_algorithm algorithm;
void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
- algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ algorithm =
+ ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
assert(0);
}
- grpc_mdelem_set_user_data(md, destroy_compression,
- (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
+ grpc_mdelem_set_user_data(
+ md, destroy_compression,
+ (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
}
return algorithm;
}
@@ -1372,7 +1463,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
l->md = 0;
}
}
- if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
+ 0 &&
+ !call->is_client) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
@@ -1462,6 +1555,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
if (!are_write_flags_valid(op->flags)) {
return GRPC_CALL_ERROR_INVALID_FLAGS;
}
+ if (op->data.send_message == NULL) {
+ return GRPC_CALL_ERROR_INVALID_MESSAGE;
+ }
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
@@ -1497,7 +1593,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.send_status_from_server.status_details != NULL
? grpc_mdstr_from_string(
call->metadata_context,
- op->data.send_status_from_server.status_details)
+ op->data.send_status_from_server.status_details, 0)
: NULL;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
@@ -1511,6 +1607,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
+ req->data.recv_metadata->count = 0;
req->flags = op->flags;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1542,6 +1639,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
req->data.recv_metadata =
op->data.recv_status_on_client.trailing_metadata;
+ req->data.recv_metadata->count = 0;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;