aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channel_stack.c2
-rw-r--r--src/core/lib/iomgr/error.c14
-rw-r--r--src/core/lib/iomgr/error.h16
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c3
-rw-r--r--src/core/lib/surface/call.c105
-rw-r--r--src/core/lib/surface/completion_queue.c2
-rw-r--r--src/core/lib/transport/transport.c65
-rw-r--r--src/core/lib/transport/transport.h9
-rw-r--r--src/core/lib/transport/transport_op_string.c15
9 files changed, 139 insertions, 92 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index bbba85d80b..42075b127b 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -263,6 +263,6 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem) {
grpc_transport_stream_op op;
memset(&op, 0, sizeof(op));
- op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ op.cancel_error = GRPC_ERROR_CANCELLED;
grpc_call_next_op(exec_ctx, cur_elem, &op);
}
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 540fb4fa7e..da0d697236 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -37,6 +37,7 @@
#include <stdbool.h>
#include <string.h>
+#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
#include <grpc/support/log.h>
@@ -115,6 +116,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "wsa_error";
case GRPC_ERROR_INT_HTTP_STATUS:
return "http_status";
+ case GRPC_ERROR_INT_LIMIT:
+ return "limit";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -271,6 +274,12 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
void *pp;
+ if (is_special(err)) {
+ if (err == GRPC_ERROR_CANCELLED && which == GRPC_ERROR_INT_GRPC_STATUS) {
+ return GRPC_STATUS_CANCELLED;
+ }
+ return false;
+ }
if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) {
if (p != NULL) *p = (intptr_t)pp;
return true;
@@ -286,6 +295,11 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
return new;
}
+const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) {
+ if (is_special(err)) return NULL;
+ return gpr_avl_get(err->strs, (void *)(uintptr_t)which);
+}
+
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
grpc_error *new = copy_error_and_unref(src);
new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 69cdf3028e..13f898e31a 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -92,6 +92,8 @@ typedef enum {
GRPC_ERROR_INT_FD,
/// HTTP status (i.e. 404)
GRPC_ERROR_INT_HTTP_STATUS,
+ /// context sensitive limit associated with the error
+ GRPC_ERROR_INT_LIMIT,
} grpc_error_ints;
typedef enum {
@@ -163,23 +165,25 @@ void grpc_error_unref(grpc_error *err);
#endif
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
- intptr_t value);
+ intptr_t value) GRPC_MUST_USE_RESULT;
bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p);
grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
- gpr_timespec value);
+ gpr_timespec value) GRPC_MUST_USE_RESULT;
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
- const char *value);
+ const char *value) GRPC_MUST_USE_RESULT;
+const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which);
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
/// errors that contributed to them.
-grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child);
+grpc_error *grpc_error_add_child(grpc_error *src,
+ grpc_error *child) GRPC_MUST_USE_RESULT;
grpc_error *grpc_os_error(const char *file, int line, int err,
- const char *call_name);
+ const char *call_name) GRPC_MUST_USE_RESULT;
/// create an error associated with errno!=0 (an 'operating system' error)
#define GRPC_OS_ERROR(err, call_name) \
grpc_os_error(__FILE__, __LINE__, err, call_name)
grpc_error *grpc_wsa_error(const char *file, int line, int err,
- const char *call_name);
+ const char *call_name) GRPC_MUST_USE_RESULT;
/// windows only: create an error associated with WSAGetLastError()!=0
#define GRPC_WSA_ERROR(err, call_name) \
grpc_wsa_error(__FILE__, __LINE__, err, call_name)
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 76be2acd72..e053afc745 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -220,8 +220,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_linked_mdelem *l;
grpc_client_security_context *sec_ctx = NULL;
- if (calld->security_context_set == 0 &&
- op->cancel_with_status == GRPC_STATUS_OK) {
+ if (calld->security_context_set == 0 && op->cancel_error == GRPC_ERROR_NONE) {
calld->security_context_set = 1;
GPR_ASSERT(op->context);
if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) {
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 04291b0ee0..77c17a4975 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -402,8 +402,50 @@ static void set_status_code(grpc_call *call, status_source source,
call->status[source].is_set = 1;
call->status[source].code = (grpc_status_code)status;
+}
+
+static void set_status_details(grpc_call *call, status_source source,
+ grpc_mdstr *status) {
+ if (call->status[source].details != NULL) {
+ GRPC_MDSTR_UNREF(call->status[source].details);
+ }
+ call->status[source].details = status;
+}
+
+static void get_final_status(grpc_call *call,
+ void (*set_value)(grpc_status_code code,
+ void *user_data),
+ void *set_value_user_data) {
+ int i;
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ if (call->status[i].is_set) {
+ set_value(call->status[i].code, set_value_user_data);
+ return;
+ }
+ }
+ if (call->is_client) {
+ set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
+ } else {
+ set_value(GRPC_STATUS_OK, set_value_user_data);
+ }
+}
- /* TODO(ctiller): what to do about the flush that was previously here */
+static void set_status_from_error(grpc_call *call, status_source source,
+ grpc_error *error) {
+ intptr_t status;
+ if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
+ set_status_code(call, source, (uint32_t)status);
+ } else {
+ set_status_code(call, source, GRPC_STATUS_INTERNAL);
+ }
+ const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
+ bool free_msg = false;
+ if (msg == NULL) {
+ free_msg = true;
+ msg = grpc_error_string(error);
+ }
+ set_status_details(call, source, grpc_mdstr_from_string(msg));
+ if (free_msg) grpc_error_free_string(msg);
}
static void set_incoming_compression_algorithm(
@@ -492,32 +534,6 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
return encodings_accepted_by_peer;
}
-static void set_status_details(grpc_call *call, status_source source,
- grpc_mdstr *status) {
- if (call->status[source].details != NULL) {
- GRPC_MDSTR_UNREF(call->status[source].details);
- }
- call->status[source].details = status;
-}
-
-static void get_final_status(grpc_call *call,
- void (*set_value)(grpc_status_code code,
- void *user_data),
- void *set_value_user_data) {
- int i;
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (call->status[i].is_set) {
- set_value(call->status[i].code, set_value_user_data);
- return;
- }
- }
- if (call->is_client) {
- set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
- } else {
- set_value(GRPC_STATUS_OK, set_value_user_data);
- }
-}
-
static void get_final_details(grpc_call *call, char **out_details,
size_t *out_details_capacity) {
int i;
@@ -741,8 +757,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
- grpc_status_code status;
- gpr_slice optional_message;
+ grpc_error *error;
grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
} termination_closure;
@@ -758,7 +773,7 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close");
break;
}
- gpr_slice_unref(tc->optional_message);
+ GRPC_ERROR_UNREF(tc->error);
grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL);
gpr_free(tc);
}
@@ -767,7 +782,7 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
grpc_transport_stream_op op;
termination_closure *tc = tcp;
memset(&op, 0, sizeof(op));
- op.cancel_with_status = tc->status;
+ op.cancel_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
op.on_complete = &tc->closure;
@@ -778,8 +793,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
grpc_transport_stream_op op;
termination_closure *tc = tcp;
memset(&op, 0, sizeof(op));
- tc->optional_message = gpr_slice_ref(tc->optional_message);
- grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message);
+ op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
tc->op_closure = op.on_complete;
@@ -789,14 +803,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
termination_closure *tc) {
- grpc_mdstr *details = NULL;
- if (GPR_SLICE_LENGTH(tc->optional_message) > 0) {
- tc->optional_message = gpr_slice_ref(tc->optional_message);
- details = grpc_mdstr_from_slice(tc->optional_message);
- }
-
- set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status);
- set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details);
+ set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error);
if (tc->type == TC_CANCEL) {
grpc_closure_init(&tc->closure, send_cancel, tc);
@@ -812,13 +819,15 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(termination_closure));
tc->type = TC_CANCEL;
tc->call = c;
- tc->optional_message = gpr_slice_from_copied_string(description);
- GPR_ASSERT(status != GRPC_STATUS_OK);
- tc->status = status;
+ tc->error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE(description),
+ GRPC_ERROR_STR_GRPC_MESSAGE, description),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
return terminate_with_status(exec_ctx, tc);
}
@@ -826,13 +835,15 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(termination_closure));
tc->type = TC_CLOSE;
tc->call = c;
- tc->optional_message = gpr_slice_from_copied_string(description);
- GPR_ASSERT(status != GRPC_STATUS_OK);
- tc->status = status;
+ tc->error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE(description),
+ GRPC_ERROR_STR_GRPC_MESSAGE, description),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
return terminate_with_status(exec_ctx, tc);
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 2cc6aa74e0..db8010ef9a 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -240,7 +240,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
"grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, "
"done_arg=%p, storage=%p)",
7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
- if (grpc_trace_operation_failures) {
+ if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
}
grpc_error_free_string(errmsg);
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 1105494a85..67920b0527 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -36,6 +36,7 @@
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport_impl.h"
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
@@ -162,55 +163,63 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL);
}
-void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
- grpc_status_code status) {
- GPR_ASSERT(status != GRPC_STATUS_OK);
- if (op->cancel_with_status == GRPC_STATUS_OK) {
- op->cancel_with_status = status;
- }
- if (op->close_with_status != GRPC_STATUS_OK) {
- op->close_with_status = GRPC_STATUS_OK;
- if (op->optional_close_message != NULL) {
- gpr_slice_unref(*op->optional_close_message);
- op->optional_close_message = NULL;
- }
- }
-}
-
typedef struct {
- gpr_slice message;
+ grpc_error *error;
grpc_closure *then_call;
grpc_closure closure;
} close_message_data;
static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) {
close_message_data *cmd = p;
- gpr_slice_unref(cmd->message);
+ GRPC_ERROR_UNREF(cmd->error);
if (cmd->then_call != NULL) {
cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, GRPC_ERROR_REF(error));
}
gpr_free(cmd);
}
+static void add_error(grpc_transport_stream_op *op, grpc_error **which,
+ grpc_error *error) {
+ close_message_data *cmd;
+ cmd = gpr_malloc(sizeof(*cmd));
+ cmd->error = error;
+ cmd->then_call = op->on_complete;
+ grpc_closure_init(&cmd->closure, free_message, cmd);
+ op->on_complete = &cmd->closure;
+ *which = error;
+}
+
+void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
+ grpc_status_code status) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+ if (op->cancel_error == GRPC_STATUS_OK) {
+ op->cancel_error = grpc_error_set_int(GRPC_ERROR_CANCELLED,
+ GRPC_ERROR_INT_GRPC_STATUS, status);
+ op->close_error = GRPC_ERROR_NONE;
+ }
+}
+
void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
grpc_status_code status,
gpr_slice *optional_message) {
- close_message_data *cmd;
GPR_ASSERT(status != GRPC_STATUS_OK);
- if (op->cancel_with_status != GRPC_STATUS_OK ||
- op->close_with_status != GRPC_STATUS_OK) {
+ if (op->cancel_error != GRPC_ERROR_NONE ||
+ op->close_error != GRPC_ERROR_NONE) {
if (optional_message) {
gpr_slice_unref(*optional_message);
}
return;
}
- if (optional_message) {
- cmd = gpr_malloc(sizeof(*cmd));
- cmd->message = *optional_message;
- cmd->then_call = op->on_complete;
- grpc_closure_init(&cmd->closure, free_message, cmd);
- op->on_complete = &cmd->closure;
- op->optional_close_message = &cmd->message;
+ grpc_error *error;
+ if (optional_message != NULL) {
+ char *msg = gpr_dump_slice(*optional_message, GPR_DUMP_ASCII);
+ error = grpc_error_set_str(GRPC_ERROR_CREATE(msg),
+ GRPC_ERROR_STR_GRPC_MESSAGE, msg);
+ gpr_free(msg);
+ gpr_slice_unref(*optional_message);
+ } else {
+ error = GRPC_ERROR_CREATE("Call force closed");
}
- op->close_with_status = status;
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
+ add_error(op, &op->close_error, error);
}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index a46ccb643c..d2f6344ee3 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -135,13 +135,12 @@ typedef struct grpc_transport_stream_op {
/** Collect any stats into provided buffer, zero internal stat counters */
grpc_transport_stream_stats *collect_stats;
- /** If != GRPC_STATUS_OK, cancel this stream */
- grpc_status_code cancel_with_status;
+ /** If != GRPC_ERROR_NONE, cancel this stream */
+ grpc_error *cancel_error;
- /** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this
+ /** If != GRPC_ERROR, send grpc-status, grpc-message, and close this
stream for both reading and writing */
- grpc_status_code close_with_status;
- gpr_slice *optional_close_message;
+ grpc_error *close_error;
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;
diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c
index df04c61127..a862401df2 100644
--- a/src/core/lib/transport/transport_op_string.c
+++ b/src/core/lib/transport/transport_op_string.c
@@ -119,10 +119,21 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA"));
}
- if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (op->cancel_error != GRPC_STATUS_OK) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
+ const char *msg = grpc_error_string(op->cancel_error);
+ gpr_asprintf(&tmp, "CANCEL:%s", msg);
+ grpc_error_free_string(msg);
+ gpr_strvec_add(&b, tmp);
+ }
+
+ if (op->close_error != GRPC_STATUS_OK) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ const char *msg = grpc_error_string(op->close_error);
+ gpr_asprintf(&tmp, "CLOSE:%s", msg);
+ grpc_error_free_string(msg);
gpr_strvec_add(&b, tmp);
}