aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/transport/transport.c')
-rw-r--r--src/core/lib/transport/transport.c99
1 files changed, 87 insertions, 12 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 004e748f25..055edbb39f 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -40,7 +40,6 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
-#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@@ -70,16 +69,6 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
- if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
- /* Ick.
- The thread we're running on MAY be owned (indirectly) by a call-stack.
- If that's the case, destroying the call-stack MAY try to destroy the
- thread, which is a tangled mess that we just don't want to ever have to
- cope with.
- Throw this over to the executor (on a core-owned thread) and process it
- there. */
- refcount->destroy.scheduler = grpc_executor_scheduler;
- }
grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}
}
@@ -184,7 +173,93 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
grpc_closure_sched(exec_ctx, op->on_complete, error);
- GRPC_ERROR_UNREF(op->cancel_error);
+}
+
+typedef struct {
+ grpc_error *error;
+ grpc_closure *then_call;
+ grpc_closure closure;
+} close_message_data;
+
+static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) {
+ close_message_data *cmd = p;
+ GRPC_ERROR_UNREF(cmd->error);
+ if (cmd->then_call != NULL) {
+ cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, error);
+ }
+ gpr_free(cmd);
+}
+
+static void add_error(grpc_transport_stream_op *op, grpc_error **which,
+ grpc_error *error) {
+ close_message_data *cmd;
+ cmd = gpr_malloc(sizeof(*cmd));
+ cmd->error = error;
+ cmd->then_call = op->on_complete;
+ grpc_closure_init(&cmd->closure, free_message, cmd,
+ grpc_schedule_on_exec_ctx);
+ op->on_complete = &cmd->closure;
+ *which = error;
+}
+
+void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
+ grpc_status_code status) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+ if (op->cancel_error == GRPC_ERROR_NONE) {
+ op->cancel_error = grpc_error_set_int(GRPC_ERROR_CANCELLED,
+ GRPC_ERROR_INT_GRPC_STATUS, status);
+ op->close_error = GRPC_ERROR_NONE;
+ }
+}
+
+void grpc_transport_stream_op_add_cancellation_with_message(
+ grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op,
+ grpc_status_code status, grpc_slice *optional_message) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+ if (op->cancel_error != GRPC_ERROR_NONE) {
+ if (optional_message) {
+ grpc_slice_unref_internal(exec_ctx, *optional_message);
+ }
+ return;
+ }
+ grpc_error *error;
+ if (optional_message != NULL) {
+ char *msg = grpc_dump_slice(*optional_message, GPR_DUMP_ASCII);
+ error = grpc_error_set_str(GRPC_ERROR_CREATE(msg),
+ GRPC_ERROR_STR_GRPC_MESSAGE, msg);
+ gpr_free(msg);
+ grpc_slice_unref_internal(exec_ctx, *optional_message);
+ } else {
+ error = GRPC_ERROR_CREATE("Call cancelled");
+ }
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
+ add_error(op, &op->cancel_error, error);
+}
+
+void grpc_transport_stream_op_add_close(grpc_exec_ctx *exec_ctx,
+ grpc_transport_stream_op *op,
+ grpc_status_code status,
+ grpc_slice *optional_message) {
+ GPR_ASSERT(status != GRPC_STATUS_OK);
+ if (op->cancel_error != GRPC_ERROR_NONE ||
+ op->close_error != GRPC_ERROR_NONE) {
+ if (optional_message) {
+ grpc_slice_unref_internal(exec_ctx, *optional_message);
+ }
+ return;
+ }
+ grpc_error *error;
+ if (optional_message != NULL) {
+ char *msg = grpc_dump_slice(*optional_message, GPR_DUMP_ASCII);
+ error = grpc_error_set_str(GRPC_ERROR_CREATE(msg),
+ GRPC_ERROR_STR_GRPC_MESSAGE, msg);
+ gpr_free(msg);
+ grpc_slice_unref_internal(exec_ctx, *optional_message);
+ } else {
+ error = GRPC_ERROR_CREATE("Call force closed");
+ }
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
+ add_error(op, &op->close_error, error);
}
typedef struct {