aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c143
-rw-r--r--src/core/lib/surface/completion_queue.h2
-rw-r--r--src/core/lib/surface/init.c7
3 files changed, 57 insertions, 95 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 979e0aeaea..5690bcab1e 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -126,8 +126,6 @@ struct grpc_call {
/* client or server call */
bool is_client;
- /* is the alarm set */
- bool have_alarm;
/** has grpc_call_destroy been called */
bool destroy_called;
/** flag indicating that cancellation is inherited */
@@ -170,9 +168,6 @@ struct grpc_call {
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
- /* Deadline alarm - if have_alarm is non-zero */
- grpc_timer alarm;
-
/* for the client, extra metadata is initial metadata; for the
server, it's trailing metadata */
grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
@@ -215,8 +210,6 @@ struct grpc_call {
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
- gpr_timespec deadline);
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_transport_stream_op *op);
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
@@ -264,39 +257,8 @@ grpc_call *grpc_call_create(
call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
}
}
- call->send_deadline =
- gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CHANNEL_INTERNAL_REF(channel, "call");
- /* initial refcount dropped by grpc_call_destroy */
- grpc_error *error = grpc_call_stack_init(
- &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
- server_transport_data, CALL_STACK_FROM_CALL(call));
- if (error != GRPC_ERROR_NONE) {
- intptr_t status;
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status))
- status = GRPC_STATUS_UNKNOWN;
- const char *error_str =
- grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION);
- close_with_status(&exec_ctx, call, (grpc_status_code)status,
- error_str == NULL ? "unknown error" : error_str);
- GRPC_ERROR_UNREF(error);
- }
- if (cq != NULL) {
- GPR_ASSERT(
- pollset_set_alternative == NULL &&
- "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
- GRPC_CQ_INTERNAL_REF(cq, "bind");
- call->pollent =
- grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
- }
- if (pollset_set_alternative != NULL) {
- call->pollent =
- grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
- }
- if (!grpc_polling_entity_is_empty(&call->pollent)) {
- grpc_call_stack_set_pollset_or_pollset_set(
- &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
- }
+ send_deadline = gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
+
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
@@ -338,10 +300,38 @@ grpc_call *grpc_call_create(
gpr_mu_unlock(&parent_call->mu);
}
- if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
- 0) {
- set_deadline_alarm(&exec_ctx, call, send_deadline);
+
+ call->send_deadline = send_deadline;
+
+ GRPC_CHANNEL_INTERNAL_REF(channel, "call");
+ /* initial refcount dropped by grpc_call_destroy */
+ grpc_error *error = grpc_call_stack_init(
+ &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
+ server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call));
+ if (error != GRPC_ERROR_NONE) {
+ grpc_status_code status;
+ const char *error_str;
+ grpc_error_get_status(error, &status, &error_str);
+ close_with_status(&exec_ctx, call, status, error_str);
+ GRPC_ERROR_UNREF(error);
}
+ if (cq != NULL) {
+ GPR_ASSERT(
+ pollset_set_alternative == NULL &&
+ "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
+ GRPC_CQ_INTERNAL_REF(cq, "bind");
+ call->pollent =
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
+ }
+ if (pollset_set_alternative != NULL) {
+ call->pollent =
+ grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
+ }
+ if (!grpc_polling_entity_is_empty(&call->pollent)) {
+ grpc_call_stack_set_pollset_or_pollset_set(
+ &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
+ }
+
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_create", 0);
return call;
@@ -454,20 +444,11 @@ static void set_status_details(grpc_call *call, status_source source,
static void set_status_from_error(grpc_call *call, status_source source,
grpc_error *error) {
- intptr_t status;
- if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
- set_status_code(call, source, (uint32_t)status);
- } else {
- set_status_code(call, source, GRPC_STATUS_INTERNAL);
- }
- const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
- bool free_msg = false;
- if (msg == NULL) {
- free_msg = true;
- msg = grpc_error_string(error);
- }
+ grpc_status_code status;
+ const char *msg;
+ grpc_error_get_status(error, &status, &msg);
+ set_status_code(call, source, (uint32_t)status);
set_status_details(call, source, grpc_mdstr_from_string(msg));
- if (free_msg) grpc_error_free_string(msg);
}
static void set_incoming_compression_algorithm(
@@ -740,9 +721,6 @@ void grpc_call_destroy(grpc_call *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- if (c->have_alarm) {
- grpc_timer_cancel(&exec_ctx, &c->alarm);
- }
cancel = !c->received_final_op;
gpr_mu_unlock(&c->mu);
if (cancel) grpc_call_cancel(c, NULL);
@@ -780,7 +758,6 @@ typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_error *error;
- grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
grpc_transport_stream_op op;
} termination_closure;
@@ -797,7 +774,6 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
break;
}
GRPC_ERROR_UNREF(tc->error);
- grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL);
gpr_free(tc);
}
@@ -817,7 +793,6 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
tc->op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op_closure = tc->op.on_complete;
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op);
}
@@ -900,32 +875,6 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_call *call = arg;
- gpr_mu_lock(&call->mu);
- call->have_alarm = 0;
- if (error != GRPC_ERROR_CANCELLED) {
- cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED,
- "Deadline Exceeded");
- }
- gpr_mu_unlock(&call->mu);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
-}
-
-static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
- gpr_timespec deadline) {
- if (call->have_alarm) {
- gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
- assert(0);
- return;
- }
- GRPC_CALL_INTERNAL_REF(call, "alarm");
- call->have_alarm = 1;
- call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
- gpr_now(GPR_CLOCK_MONOTONIC));
-}
-
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
@@ -1267,9 +1216,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
- GPR_TIMER_BEGIN("set_deadline_alarm", 0);
- set_deadline_alarm(exec_ctx, call, md->deadline);
- GPR_TIMER_END("set_deadline_alarm", 0);
+ call->send_deadline =
+ gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
}
}
@@ -1298,9 +1246,17 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
GRPC_ERROR_REF(error);
gpr_mu_lock(&call->mu);
+
+ // If the error has an associated status code, set the call's status.
+ intptr_t status;
+ if (error != GRPC_ERROR_NONE &&
+ grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
+ set_status_from_error(call, STATUS_FROM_CORE, error);
+ }
+
if (bctl->send_initial_metadata) {
if (error != GRPC_ERROR_NONE) {
- set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
+ set_status_from_error(call, STATUS_FROM_CORE, error);
}
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
@@ -1318,9 +1274,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_metadata_batch_filter(md, recv_trailing_filter, call);
call->received_final_op = true;
- if (call->have_alarm) {
- grpc_timer_cancel(exec_ctx, &call->alarm);
- }
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 3049284f68..4dbf3aae63 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -57,6 +57,8 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
+//#define GRPC_CQ_REF_COUNT_DEBUG
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 64bdfc3446..289f4ce8e8 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -43,6 +43,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/channel/message_size_filter.h"
@@ -100,6 +101,12 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder,
static void register_builtin_channel_init() {
grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ prepend_filter, (void *)&grpc_client_deadline_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
+ (void *)&grpc_server_deadline_filter);
+ grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_message_size_filter);
grpc_channel_init_register_stage(