aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/transport/transport.c')
-rw-r--r--src/core/lib/transport/transport.c99
1 files changed, 12 insertions, 87 deletions
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 055edbb39f..004e748f25 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -40,6 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@@ -69,6 +70,16 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
+ if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
+ /* Ick.
+ The thread we're running on MAY be owned (indirectly) by a call-stack.
+ If that's the case, destroying the call-stack MAY try to destroy the
+ thread, which is a tangled mess that we just don't want to ever have to
+ cope with.
+ Throw this over to the executor (on a core-owned thread) and process it
+ there. */
+ refcount->destroy.scheduler = grpc_executor_scheduler;
+ }
grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
}
}
@@ -173,93 +184,7 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
grpc_closure_sched(exec_ctx, op->on_complete, error);
-}
-
-typedef struct {
- grpc_error *error;
- grpc_closure *then_call;
- grpc_closure closure;
-} close_message_data;
-
-static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) {
- close_message_data *cmd = p;
- GRPC_ERROR_UNREF(cmd->error);
- if (cmd->then_call != NULL) {
- cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, error);
- }
- gpr_free(cmd);
-}
-
-static void add_error(grpc_transport_stream_op *op, grpc_error **which,
- grpc_error *error) {
- close_message_data *cmd;
- cmd = gpr_malloc(sizeof(*cmd));
- cmd->error = error;
- cmd->then_call = op->on_complete;
- grpc_closure_init(&cmd->closure, free_message, cmd,
- grpc_schedule_on_exec_ctx);
- op->on_complete = &cmd->closure;
- *which = error;
-}
-
-void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
- grpc_status_code status) {
- GPR_ASSERT(status != GRPC_STATUS_OK);
- if (op->cancel_error == GRPC_ERROR_NONE) {
- op->cancel_error = grpc_error_set_int(GRPC_ERROR_CANCELLED,
- GRPC_ERROR_INT_GRPC_STATUS, status);
- op->close_error = GRPC_ERROR_NONE;
- }
-}
-
-void grpc_transport_stream_op_add_cancellation_with_message(
- grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op,
- grpc_status_code status, grpc_slice *optional_message) {
- GPR_ASSERT(status != GRPC_STATUS_OK);
- if (op->cancel_error != GRPC_ERROR_NONE) {
- if (optional_message) {
- grpc_slice_unref_internal(exec_ctx, *optional_message);
- }
- return;
- }
- grpc_error *error;
- if (optional_message != NULL) {
- char *msg = grpc_dump_slice(*optional_message, GPR_DUMP_ASCII);
- error = grpc_error_set_str(GRPC_ERROR_CREATE(msg),
- GRPC_ERROR_STR_GRPC_MESSAGE, msg);
- gpr_free(msg);
- grpc_slice_unref_internal(exec_ctx, *optional_message);
- } else {
- error = GRPC_ERROR_CREATE("Call cancelled");
- }
- error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
- add_error(op, &op->cancel_error, error);
-}
-
-void grpc_transport_stream_op_add_close(grpc_exec_ctx *exec_ctx,
- grpc_transport_stream_op *op,
- grpc_status_code status,
- grpc_slice *optional_message) {
- GPR_ASSERT(status != GRPC_STATUS_OK);
- if (op->cancel_error != GRPC_ERROR_NONE ||
- op->close_error != GRPC_ERROR_NONE) {
- if (optional_message) {
- grpc_slice_unref_internal(exec_ctx, *optional_message);
- }
- return;
- }
- grpc_error *error;
- if (optional_message != NULL) {
- char *msg = grpc_dump_slice(*optional_message, GPR_DUMP_ASCII);
- error = grpc_error_set_str(GRPC_ERROR_CREATE(msg),
- GRPC_ERROR_STR_GRPC_MESSAGE, msg);
- gpr_free(msg);
- grpc_slice_unref_internal(exec_ctx, *optional_message);
- } else {
- error = GRPC_ERROR_CREATE("Call force closed");
- }
- error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
- add_error(op, &op->close_error, error);
+ GRPC_ERROR_UNREF(op->cancel_error);
}
typedef struct {