aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/lame_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/lame_client.c')
-rw-r--r--src/core/surface/lame_client.c82
1 files changed, 33 insertions, 49 deletions
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index e72264fbcd..a60e9d20da 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -49,49 +49,41 @@ typedef struct {
} call_data;
typedef struct {
- grpc_mdctx *mdctx;
- grpc_channel *master;
grpc_status_code error_code;
const char *error_message;
} channel_data;
+static void fill_metadata(grpc_call_element *elem, grpc_metadata_batch *mdb) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ char tmp[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(chand->error_code, tmp);
+ calld->status.md = grpc_mdelem_from_strings("grpc-status", tmp);
+ calld->details.md =
+ grpc_mdelem_from_strings("grpc-message", chand->error_message);
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb->list.head = &calld->status;
+ mdb->list.tail = &calld->details;
+ mdb->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+}
+
static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- if (op->send_ops != NULL) {
- grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
- }
- if (op->recv_ops != NULL) {
- char tmp[GPR_LTOA_MIN_BUFSIZE];
- grpc_metadata_batch mdb;
- gpr_ltoa(chand->error_code, tmp);
- calld->status.md =
- grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp);
- calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message",
- chand->error_message);
- calld->status.prev = calld->details.next = NULL;
- calld->status.next = &calld->details;
- calld->details.prev = &calld->status;
- mdb.list.head = &calld->status;
- mdb.list.tail = &calld->details;
- mdb.garbage.head = mdb.garbage.tail = NULL;
- mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- grpc_sopb_add_metadata(op->recv_ops, mdb);
- *op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
- }
- if (op->on_consumed != NULL) {
- op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
+ if (op->recv_initial_metadata != NULL) {
+ fill_metadata(elem, op->recv_initial_metadata);
+ } else if (op->recv_trailing_metadata != NULL) {
+ fill_metadata(elem, op->recv_trailing_metadata);
}
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0);
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_channel_get_target(chand->master);
+ return NULL;
}
static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -109,25 +101,16 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
}
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const void *transport_server_data,
- grpc_transport_stream_op *initial_op) {
- if (initial_op) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, initial_op);
- }
-}
+ grpc_call_element_args *args) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
- channel_data *chand = elem->channel_data;
- GPR_ASSERT(is_first);
- GPR_ASSERT(is_last);
- chand->mdctx = mdctx;
- chand->master = master;
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
+ GPR_ASSERT(args->is_first);
+ GPR_ASSERT(args->is_last);
}
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
@@ -135,8 +118,9 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
static const grpc_channel_filter lame_filter = {
lame_start_transport_stream_op, lame_start_transport_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, lame_get_peer, "lame-client",
+ init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ lame_get_peer, "lame-client",
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -149,8 +133,8 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
channel_data *chand;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
static const grpc_channel_filter *filters[] = {&lame_filter};
- channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, 1,
- NULL, grpc_mdctx_create(), 1);
+ channel =
+ grpc_channel_create_from_filters(&exec_ctx, target, filters, 1, NULL, 1);
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
GRPC_API_TRACE(
"grpc_lame_client_channel_create(target=%s, error_code=%d, "