diff options
Diffstat (limited to 'src/core/surface/lame_client.c')
-rw-r--r-- | src/core/surface/lame_client.c | 82 |
1 files changed, 33 insertions, 49 deletions
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index e72264fbcd..a60e9d20da 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -49,49 +49,41 @@ typedef struct { } call_data; typedef struct { - grpc_mdctx *mdctx; - grpc_channel *master; grpc_status_code error_code; const char *error_message; } channel_data; +static void fill_metadata(grpc_call_element *elem, grpc_metadata_batch *mdb) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + char tmp[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(chand->error_code, tmp); + calld->status.md = grpc_mdelem_from_strings("grpc-status", tmp); + calld->details.md = + grpc_mdelem_from_strings("grpc-message", chand->error_message); + calld->status.prev = calld->details.next = NULL; + calld->status.next = &calld->details; + calld->details.prev = &calld->status; + mdb->list.head = &calld->status; + mdb->list.tail = &calld->details; + mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); +} + static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->send_ops != NULL) { - grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0); - } - if (op->recv_ops != NULL) { - char tmp[GPR_LTOA_MIN_BUFSIZE]; - grpc_metadata_batch mdb; - gpr_ltoa(chand->error_code, tmp); - calld->status.md = - grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp); - calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message", - chand->error_message); - calld->status.prev = calld->details.next = NULL; - calld->status.next = &calld->details; - calld->details.prev = &calld->status; - mdb.list.head = &calld->status; - mdb.list.tail = &calld->details; - mdb.garbage.head = mdb.garbage.tail = NULL; - mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - grpc_sopb_add_metadata(op->recv_ops, mdb); - *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1); - } - if (op->on_consumed != NULL) { - op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0); + if (op->recv_initial_metadata != NULL) { + fill_metadata(elem, op->recv_initial_metadata); + } else if (op->recv_trailing_metadata != NULL) { + fill_metadata(elem, op->recv_trailing_metadata); } + grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_channel_get_target(chand->master); + return NULL; } static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -109,25 +101,16 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *transport_server_data, - grpc_transport_stream_op *initial_op) { - if (initial_op) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, initial_op); - } -} + grpc_call_element_args *args) {} static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {} static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { - channel_data *chand = elem->channel_data; - GPR_ASSERT(is_first); - GPR_ASSERT(is_last); - chand->mdctx = mdctx; - chand->master = master; + grpc_channel_element *elem, + grpc_channel_element_args *args) { + GPR_ASSERT(args->is_first); + GPR_ASSERT(args->is_last); } static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, @@ -135,8 +118,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, static const grpc_channel_filter lame_filter = { lame_start_transport_stream_op, lame_start_transport_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, lame_get_peer, "lame-client", + init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + lame_get_peer, "lame-client", }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -149,8 +133,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, channel_data *chand; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; static const grpc_channel_filter *filters[] = {&lame_filter}; - channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, 1, - NULL, grpc_mdctx_create(), 1); + channel = + grpc_channel_create_from_filters(&exec_ctx, target, filters, 1, NULL, 1); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); GRPC_API_TRACE( "grpc_lame_client_channel_create(target=%s, error_code=%d, " |