diff options
Diffstat (limited to 'src/core/lib/transport/transport.c')
-rw-r--r-- | src/core/lib/transport/transport.c | 99 |
1 files changed, 87 insertions, 12 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 004e748f25..055edbb39f 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -40,7 +40,6 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> -#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" @@ -70,16 +69,6 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { #endif if (gpr_unref(&refcount->refs)) { - if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) { - /* Ick. - The thread we're running on MAY be owned (indirectly) by a call-stack. - If that's the case, destroying the call-stack MAY try to destroy the - thread, which is a tangled mess that we just don't want to ever have to - cope with. - Throw this over to the executor (on a core-owned thread) and process it - there. */ - refcount->destroy.scheduler = grpc_executor_scheduler; - } grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); } } @@ -184,7 +173,93 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, GRPC_ERROR_REF(error)); grpc_closure_sched(exec_ctx, op->on_complete, error); - GRPC_ERROR_UNREF(op->cancel_error); +} + +typedef struct { + grpc_error *error; + grpc_closure *then_call; + grpc_closure closure; +} close_message_data; + +static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { + close_message_data *cmd = p; + GRPC_ERROR_UNREF(cmd->error); + if (cmd->then_call != NULL) { + cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, error); + } + gpr_free(cmd); +} + +static void add_error(grpc_transport_stream_op *op, grpc_error **which, + grpc_error *error) { + close_message_data *cmd; + cmd = gpr_malloc(sizeof(*cmd)); + cmd->error = error; + cmd->then_call = op->on_complete; + grpc_closure_init(&cmd->closure, free_message, cmd, + grpc_schedule_on_exec_ctx); + op->on_complete = &cmd->closure; + *which = error; +} + +void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, + grpc_status_code status) { + GPR_ASSERT(status != GRPC_STATUS_OK); + if (op->cancel_error == GRPC_ERROR_NONE) { + op->cancel_error = grpc_error_set_int(GRPC_ERROR_CANCELLED, + GRPC_ERROR_INT_GRPC_STATUS, status); + op->close_error = GRPC_ERROR_NONE; + } +} + +void grpc_transport_stream_op_add_cancellation_with_message( + grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, + grpc_status_code status, grpc_slice *optional_message) { + GPR_ASSERT(status != GRPC_STATUS_OK); + if (op->cancel_error != GRPC_ERROR_NONE) { + if (optional_message) { + grpc_slice_unref_internal(exec_ctx, *optional_message); + } + return; + } + grpc_error *error; + if (optional_message != NULL) { + char *msg = grpc_dump_slice(*optional_message, GPR_DUMP_ASCII); + error = grpc_error_set_str(GRPC_ERROR_CREATE(msg), + GRPC_ERROR_STR_GRPC_MESSAGE, msg); + gpr_free(msg); + grpc_slice_unref_internal(exec_ctx, *optional_message); + } else { + error = GRPC_ERROR_CREATE("Call cancelled"); + } + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); + add_error(op, &op->cancel_error, error); +} + +void grpc_transport_stream_op_add_close(grpc_exec_ctx *exec_ctx, + grpc_transport_stream_op *op, + grpc_status_code status, + grpc_slice *optional_message) { + GPR_ASSERT(status != GRPC_STATUS_OK); + if (op->cancel_error != GRPC_ERROR_NONE || + op->close_error != GRPC_ERROR_NONE) { + if (optional_message) { + grpc_slice_unref_internal(exec_ctx, *optional_message); + } + return; + } + grpc_error *error; + if (optional_message != NULL) { + char *msg = grpc_dump_slice(*optional_message, GPR_DUMP_ASCII); + error = grpc_error_set_str(GRPC_ERROR_CREATE(msg), + GRPC_ERROR_STR_GRPC_MESSAGE, msg); + gpr_free(msg); + grpc_slice_unref_internal(exec_ctx, *optional_message); + } else { + error = GRPC_ERROR_CREATE("Call force closed"); + } + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); + add_error(op, &op->close_error, error); } typedef struct { |