aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/census_filter.c70
-rw-r--r--src/core/channel/channel_stack.c44
-rw-r--r--src/core/channel/channel_stack.h77
-rw-r--r--src/core/channel/child_channel.c18
-rw-r--r--src/core/channel/client_channel.c93
-rw-r--r--src/core/channel/connected_channel.c261
-rw-r--r--src/core/channel/http_client_filter.c72
-rw-r--r--src/core/channel/http_filter.c137
-rw-r--r--src/core/channel/http_filter.h43
-rw-r--r--src/core/channel/http_server_filter.c116
-rw-r--r--src/core/channel/noop_filter.c13
-rw-r--r--src/core/surface/call.c178
-rw-r--r--src/core/surface/call.h1
-rw-r--r--src/core/transport/chttp2_transport.c4
-rw-r--r--src/core/transport/stream_op.h28
-rw-r--r--src/core/transport/transport.h21
-rw-r--r--src/core/transport/transport_op_string.c141
17 files changed, 461 insertions, 856 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index 9c0c20af22..3e0fc39fc9 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -49,6 +49,11 @@ typedef struct call_data {
census_op_id op_id;
census_rpc_stats stats;
gpr_timespec start_ts;
+
+ /* recv callback */
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -60,55 +65,58 @@ static void init_rpc_stats(census_rpc_stats* stats) {
stats->cnt = 1;
}
-static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
+static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_data* calld,
channel_data* chand) {
grpc_linked_mdelem* m;
- for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
- if (m->md->key == chand->path_str) {
- gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
- census_add_method_tag(
- calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ size_t i;
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op * op = &sopb->ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
+ if (m->md->key == chand->path_str) {
+ gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ census_add_method_tag(
+ calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ }
}
}
}
-static void client_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_SEND_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_RECV_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (op->send_ops) {
+ extract_and_annotate_method_tag(op->send_ops, calld, chand);
}
- /* Always pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
}
-static void server_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void server_on_done_recv(void *ptr, int success) {
+ grpc_call_element *elem = ptr;
+ call_data* calld = elem->call_data;
+ channel_data* chand = elem->channel_data;
+ if (success) {
+ extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_RECV_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_SEND_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (op->recv_ops) {
+ /* substitute our callback for the op callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_done_recv;
+ op->recv_user_data = elem;
}
/* Always pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
@@ -180,11 +188,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
- client_call_op, channel_op, sizeof(call_data), client_init_call_elem,
+ client_start_transport_op, channel_op, sizeof(call_data), client_init_call_elem,
client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
- server_call_op, channel_op, sizeof(call_data), server_init_call_elem,
+ server_start_transport_op, channel_op, sizeof(call_data), server_init_call_elem,
server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "census-server"};
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 3a3a3a75b7..c121e27005 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -35,6 +35,7 @@
#include <grpc/support/log.h>
#include <stdlib.h>
+#include <string.h>
int grpc_trace_channel = 0;
@@ -181,12 +182,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
}
}
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
- grpc_call_element *next_elem = elem + op->dir;
- if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) {
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- }
- next_elem->filter->call_op(next_elem, elem, op);
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
+ grpc_call_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_op(next_elem, op);
}
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
@@ -205,39 +203,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
-static void do_nothing(void *user_data, grpc_op_error error) {}
-
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
- grpc_call_op cancel_op;
- cancel_op.type = GRPC_CANCEL_OP;
- cancel_op.dir = GRPC_CALL_DOWN;
- cancel_op.done_cb = do_nothing;
- cancel_op.user_data = NULL;
- cancel_op.flags = 0;
- cancel_op.bind_pollset = NULL;
- grpc_call_next_op(cur_elem, &cancel_op);
-}
-
-void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
- grpc_call_op finish_op;
- finish_op.type = GRPC_SEND_FINISH;
- finish_op.dir = GRPC_CALL_DOWN;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- finish_op.flags = 0;
- finish_op.bind_pollset = NULL;
- grpc_call_next_op(cur_elem, &finish_op);
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ grpc_call_next_op(cur_elem, &op);
}
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message) {
- grpc_call_op op;
- op.type = GRPC_RECV_SYNTHETIC_STATUS;
- op.dir = GRPC_CALL_UP;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.data.synthetic_status.status = status;
- op.data.synthetic_status.message = message;
- grpc_call_next_op(cur_elem, &op);
+ abort();
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index addc92b272..75897ff651 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,78 +51,11 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
-/* Call operations - things that can be sent and received.
-
- Threading:
- SEND, RECV, and CANCEL ops can be active on a call at the same time, but
- only one SEND, one RECV, and one CANCEL can be active at a time.
-
- If state is shared between send/receive/cancel operations, it is up to
- filters to provide their own protection around that. */
-typedef enum {
- /* send metadata to the channels peer */
- GRPC_SEND_METADATA,
- /* send a message to the channels peer */
- GRPC_SEND_MESSAGE,
- /* send a pre-formatted message to the channels peer */
- GRPC_SEND_PREFORMATTED_MESSAGE,
- /* send half-close to the channels peer */
- GRPC_SEND_FINISH,
- /* request that more data be allowed through flow control */
- GRPC_REQUEST_DATA,
- /* metadata was received from the channels peer */
- GRPC_RECV_METADATA,
- /* a message was received from the channels peer */
- GRPC_RECV_MESSAGE,
- /* half-close was received from the channels peer */
- GRPC_RECV_HALF_CLOSE,
- /* full close was received from the channels peer */
- GRPC_RECV_FINISH,
- /* a status has been sythesized locally */
- GRPC_RECV_SYNTHETIC_STATUS,
- /* the call has been abnormally terminated */
- GRPC_CANCEL_OP
-} grpc_call_op_type;
-
/* The direction of the call.
The values of the enums (1, -1) matter here - they are used to increment
or decrement a pointer to find the next element to call */
typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
-/* A single filterable operation to be performed on a call */
-typedef struct {
- /* The type of operation we're performing */
- grpc_call_op_type type;
- /* The directionality of this call - does the operation begin at the bottom
- of the stack and flow up, or does the operation start at the top of the
- stack and flow down through the filters. */
- grpc_call_dir dir;
-
- /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */
- gpr_uint32 flags;
-
- /* Argument data, matching up with grpc_call_op_type names */
- union {
- grpc_byte_buffer *message;
- grpc_metadata_batch metadata;
- struct {
- grpc_status_code status;
- const char *message;
- } synthetic_status;
- } data;
-
- grpc_pollset *bind_pollset;
-
- /* Must be called when processing of this call-op is complete.
- Signature chosen to match transport flow control callbacks */
- void (*done_cb)(void *user_data, grpc_op_error error);
- /* User data to be passed into done_cb */
- void *user_data;
-} grpc_call_op;
-
-/* returns a string representation of op, that can be destroyed with gpr_free */
-char *grpc_call_op_string(grpc_call_op *op);
-
typedef enum {
/* send a goaway message to remote channels indicating that we are going
to disconnect in the future */
@@ -170,8 +103,7 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op);
+ void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@@ -272,8 +204,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);
-/* Call the next operation (depending on call directionality) in a call stack */
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op);
+/* Call the next operation in a call stack */
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
@@ -285,10 +217,9 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_call_op *op);
+ grpc_call_element *elem, grpc_transport_op *op);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
-void grpc_call_element_send_finish(grpc_call_element *cur_elem);
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message);
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 2cb03829c7..244417384a 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -61,22 +61,11 @@ typedef struct {
} lb_channel_data;
typedef struct {
- grpc_call_element *back;
grpc_child_channel *channel;
} lb_call_data;
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- lb_call_data *calld = elem->call_data;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_call_next_op(elem, op);
- break;
- }
+static void lb_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ grpc_call_next_op(elem, op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -165,7 +154,7 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_child_channel_top_filter = {
- lb_call_op, lb_channel_op, sizeof(lb_call_data),
+ lb_start_transport_op, lb_channel_op, sizeof(lb_call_data),
lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", };
@@ -282,7 +271,6 @@ grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
lbchand = lbelem->channel_data;
lbcalld = lbelem->call_data;
- lbcalld->back = parent;
lbcalld->channel = channel;
gpr_mu_lock(&lbchand->mu);
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index bc481e59ca..6ad50cb944 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -82,7 +82,7 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
- gpr_uint8 got_first_send;
+ gpr_uint8 got_first_op;
call_state state;
gpr_timespec deadline;
@@ -91,7 +91,7 @@ struct call_data {
/* our child call stack */
grpc_child_call *child_call;
} active;
- grpc_call_op waiting_op;
+ grpc_transport_op waiting_op;
} s;
};
@@ -110,9 +110,7 @@ static int prepare_activate(grpc_call_element *elem,
return 1;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
-static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
@@ -121,17 +119,16 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
- child_elem->filter->call_op(child_elem, elem, op);
+ child_elem->filter->start_transport_op(child_elem, op);
}
-static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void start_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&chand->mu);
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(op);
return;
}
GPR_ASSERT(calld->state == CALL_CREATED);
@@ -187,19 +184,10 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
}
static void send_up_cancelled_ops(grpc_call_element *elem) {
- grpc_call_op finish_op;
- /* send up a synthesized status */
- grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
- /* send up a finish */
- finish_op.type = GRPC_RECV_FINISH;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+ abort();
}
-static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
@@ -209,15 +197,13 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
case CALL_ACTIVE:
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
- child_elem->filter->call_op(child_elem, elem, op);
+ child_elem->filter->start_transport_op(child_elem, op);
return; /* early out */
case CALL_WAITING:
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
send_up_cancelled_ops(elem);
- calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
return; /* early out */
case CALL_CREATED:
calld->state = CALL_CANCELLED;
@@ -232,40 +218,27 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
abort();
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void cc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_SEND_METADATA:
- if (!calld->got_first_send) {
- /* filter out the start event to find which child to send on */
- calld->got_first_send = 1;
- start_rpc(elem, op);
- } else {
- grpc_call_next_op(elem, op);
- }
- break;
- case GRPC_CANCEL_OP:
- cancel_rpc(elem, op);
- break;
- case GRPC_SEND_MESSAGE:
- case GRPC_SEND_FINISH:
- case GRPC_REQUEST_DATA:
- if (calld->state == CALL_ACTIVE) {
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
- child_elem->filter->call_op(child_elem, elem, op);
- } else {
- op->done_cb(op->user_data, GRPC_OP_ERROR);
- }
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ GPR_ASSERT(op->send_ops == NULL);
+ GPR_ASSERT(op->recv_ops == NULL);
+
+ cancel_rpc(elem, op);
+ return;
+ }
+
+ if (!calld->got_first_op) {
+ calld->got_first_op = 1;
+ start_rpc(elem, op);
+ } else {
+ grpc_call_element *child_elem =
+ grpc_child_call_get_top_element(calld->s.active.child_call);
+ child_elem->filter->start_transport_op(child_elem, op);
}
}
@@ -359,7 +332,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->got_first_send = 0;
+ calld->got_first_op = 0;
}
/* Destructor for call_data */
@@ -372,9 +345,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
- if (calld->state == CALL_WAITING) {
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
- }
+ GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@@ -417,7 +388,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"client-channel",
};
@@ -436,7 +407,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
- grpc_call_op *call_ops;
+ grpc_transport_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@@ -472,13 +443,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
- call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+ call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
- call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(&call_ops[i]);
}
}
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 711274bfe1..d0b834a10a 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -50,19 +50,10 @@
typedef struct connected_channel_channel_data {
grpc_transport *transport;
- gpr_uint32 max_message_length;
} channel_data;
typedef struct connected_channel_call_data {
- grpc_call_element *elem;
- grpc_stream_op_buffer outgoing_sopb;
-
- gpr_uint32 max_message_length;
- gpr_uint32 incoming_message_length;
- gpr_uint8 reading_message;
- gpr_uint8 got_read_close;
- gpr_slice_buffer incoming_message;
- gpr_uint32 outgoing_buffer_length_estimate;
+ void *unused;
} call_data;
/* We perform a small hack to locate transport data alongside the connected
@@ -72,6 +63,7 @@ typedef struct connected_channel_call_data {
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
(((call_data *)(transport_stream)) - 1)
+#if 0
/* Copy the contents of a byte buffer into stream ops */
static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
grpc_stream_op_buffer *sopb) {
@@ -87,76 +79,17 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
break;
}
}
-
-/* Flush queued stream operations onto the transport */
-static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
- call_data *calld, int is_last) {
- size_t nops;
-
- if (op->flags & GRPC_WRITE_BUFFER_HINT) {
- if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
- op->done_cb(op->user_data, GRPC_OP_OK);
- return;
- }
- }
-
- calld->outgoing_buffer_length_estimate = 0;
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
-
- nops = calld->outgoing_sopb.nops;
- calld->outgoing_sopb.nops = 0;
- grpc_transport_send_batch(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- calld->outgoing_sopb.ops, nops, is_last);
-}
+#endif
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- if (op->bind_pollset) {
- grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
- }
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_MESSAGE:
- grpc_sopb_add_begin_message(&calld->outgoing_sopb,
- grpc_byte_buffer_length(op->data.message),
- op->flags);
- /* fall-through */
- case GRPC_SEND_PREFORMATTED_MESSAGE:
- copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
- calld->outgoing_buffer_length_estimate +=
- (5 + grpc_byte_buffer_length(op->data.message));
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_FINISH:
- end_bufferable_op(op, chand, calld, 1);
- break;
- case GRPC_REQUEST_DATA:
- /* re-arm window updates if they were disarmed by finish_message */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
- break;
- case GRPC_CANCEL_OP:
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_CANCELLED);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -188,14 +121,6 @@ static void init_call_elem(grpc_call_element *elem,
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- calld->elem = elem;
- grpc_sopb_init(&calld->outgoing_sopb);
-
- calld->reading_message = 0;
- calld->got_read_close = 0;
- calld->outgoing_buffer_length_estimate = 0;
- calld->max_message_length = chand->max_message_length;
- gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
server_transport_data);
@@ -207,8 +132,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_sopb_destroy(&calld->outgoing_sopb);
- gpr_slice_buffer_destroy(&calld->incoming_message);
grpc_transport_destroy_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
@@ -218,12 +141,12 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
- size_t i;
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
+#if 0
cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
if (args) {
for (i = 0; i < args->num_args; i++) {
@@ -240,6 +163,7 @@ static void init_channel_elem(grpc_channel_element *elem,
}
}
}
+#endif
}
/* Destructor for channel_data */
@@ -250,15 +174,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
};
-static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
- grpc_stream *stream, size_t size_hint) {
- return gpr_slice_malloc(size_hint);
-}
-
/* Transport callback to accept a new stream... calls up to handle it */
static void accept_stream(void *user_data, grpc_transport *transport,
const void *transport_server_data) {
@@ -276,168 +195,6 @@ static void accept_stream(void *user_data, grpc_transport *transport,
channel_op(elem, NULL, &op);
}
-static void recv_error(channel_data *chand, call_data *calld, int line,
- const char *message) {
- gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
-
- if (chand->transport) {
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_INVALID_ARGUMENT);
- }
-}
-
-static void do_nothing(void *calldata, grpc_op_error error) {}
-
-static void finish_message(channel_data *chand, call_data *calld) {
- grpc_call_element *elem = calld->elem;
- grpc_call_op call_op;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- /* if we got all the bytes for this message, call up the stack */
- call_op.type = GRPC_RECV_MESSAGE;
- call_op.done_cb = do_nothing;
- /* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
- calld->incoming_message.count);
- gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
-
- /* disable window updates until we get a request more from above */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
-
- GPR_ASSERT(calld->incoming_message.count == 0);
- calld->reading_message = 0;
- grpc_call_next_op(elem, &call_op);
-}
-
-static void got_metadata(grpc_call_element *elem,
- grpc_metadata_batch metadata) {
- grpc_call_op op;
- op.type = GRPC_RECV_METADATA;
- op.dir = GRPC_CALL_UP;
- op.flags = 0;
- op.data.metadata = metadata;
- op.done_cb = do_nothing;
- op.user_data = NULL;
-
- grpc_call_next_op(elem, &op);
-}
-
-/* Handle incoming stream ops from the transport, translating them into
- call_ops to pass up the call stack */
-static void recv_batch(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops,
- size_t ops_count, grpc_stream_state final_state) {
- call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
- grpc_call_element *elem = calld->elem;
- channel_data *chand = elem->channel_data;
- grpc_stream_op *stream_op;
- grpc_call_op call_op;
- size_t i;
- gpr_uint32 length;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- for (i = 0; i < ops_count; i++) {
- stream_op = ops + i;
- switch (stream_op->type) {
- case GRPC_OP_FLOW_CTL_CB:
- stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
- break;
- case GRPC_NO_OP:
- break;
- case GRPC_OP_METADATA:
- got_metadata(elem, stream_op->data.metadata);
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- /* can't begin a message when we're still reading a message */
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Message terminated early; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- }
- /* stash away parameters, and prepare for incoming slices */
- length = stream_op->data.begin_message.length;
- if (length > calld->max_message_length) {
- char *message = NULL;
- gpr_asprintf(
- &message,
- "Maximum message length of %d exceeded by a message of length %d",
- calld->max_message_length, length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- } else if (length > 0) {
- calld->reading_message = 1;
- calld->incoming_message_length = length;
- } else {
- finish_message(chand, calld);
- }
- break;
- case GRPC_OP_SLICE:
- if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
- gpr_slice_unref(stream_op->data.slice);
- break;
- }
- /* we have to be reading a message to know what to do here */
- if (!calld->reading_message) {
- recv_error(chand, calld, __LINE__,
- "Received payload data while not reading a message");
- return;
- }
- /* append the slice to the incoming buffer */
- gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
- if (calld->incoming_message.length > calld->incoming_message_length) {
- /* if we got too many bytes, complain */
- char *message = NULL;
- gpr_asprintf(&message,
- "Receiving message overflow; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- } else if (calld->incoming_message.length ==
- calld->incoming_message_length) {
- finish_message(chand, calld);
- }
- }
- }
- /* if the stream closed, then call up the stack to let it know */
- if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
- final_state == GRPC_STREAM_CLOSED)) {
- calld->got_read_close = 1;
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Last message truncated; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- }
- call_op.type = GRPC_RECV_HALF_CLOSE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
- if (final_state == GRPC_STREAM_CLOSED) {
- call_op.type = GRPC_RECV_FINISH;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
-}
-
static void transport_goaway(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug) {
/* transport got goaway ==> call up and handle it */
@@ -470,7 +227,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
}
const grpc_transport_callbacks connected_channel_transport_callbacks = {
- alloc_recv_buffer, accept_stream, recv_batch,
+ accept_stream,
transport_goaway, transport_closed,
};
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index bc014b15ff..fb2ce4d34a 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -39,6 +39,12 @@ typedef struct call_data {
grpc_linked_mdelem scheme;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
+ int sent_initial_metadata;
+
+ int got_initial_metadata;
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -64,22 +70,39 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void hc_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
+ size_t i;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_SEND_METADATA:
+ if (op->send_ops && !calld->sent_initial_metadata) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_initial_metadata = 1;
/* Send : prefixed headers, which have to be before any application
- * layer headers. */
+ layer headers. */
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
grpc_mdelem_ref(channeld->method));
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
@@ -88,17 +111,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_mdelem_ref(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
grpc_mdelem_ref(channeld->content_type));
- grpc_call_next_op(elem, op);
- break;
- case GRPC_RECV_METADATA:
- grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
break;
+ }
}
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hc_on_recv;
+ op->recv_user_data = elem;
+ }
+
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -120,7 +146,11 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
+ const void *server_transport_data) {
+ call_data *calld = elem->call_data;
+ calld->sent_initial_metadata = 0;
+ calld->got_initial_metadata = 0;
+}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
@@ -181,6 +211,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"http-client"};
diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c
deleted file mode 100644
index 453a0422d8..0000000000
--- a/src/core/channel/http_filter.c
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/http_filter.h"
-#include <grpc/support/log.h>
-
-typedef struct call_data {
- int unused; /* C89 requires at least one struct element */
-} call_data;
-
-typedef struct channel_data {
- int unused; /* C89 requires at least one struct element */
-} channel_data;
-
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- ignore_unused(calld);
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
-}
-
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
-/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- /* initialize members */
- calld->unused = channeld->unused;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(calld);
- ignore_unused(channeld);
-}
-
-/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- /* The first and the last filters tend to be implemented differently to
- handle the case that there's no 'next' filter to call on the up or down
- path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
-
- /* initialize members */
- channeld->unused = 0;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-}
-
-const grpc_channel_filter grpc_http_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http"};
diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h
deleted file mode 100644
index 1b116ad61f..0000000000
--- a/src/core/channel/http_filter.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* Processes metadata that is common to both client and server for HTTP2
- transports. */
-extern const grpc_channel_filter grpc_http_filter;
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 85224f28e5..d18c38d93c 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -52,6 +52,10 @@ typedef struct call_data {
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
grpc_linked_mdelem status;
+
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -143,63 +147,73 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void hs_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path) {
+ /* do nothing */
+ } else {
+ if (!calld->seen_post) {
+ gpr_log(GPR_ERROR, "Missing :method header");
+ }
+ if (!calld->seen_scheme) {
+ gpr_log(GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers) {
+ gpr_log(GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ success = 0;
+ grpc_call_element_send_cancel(elem);
+ }
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
+ size_t i;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_RECV_METADATA:
- grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (!calld->got_initial_metadata) {
- calld->got_initial_metadata = 1;
- /* Have we seen the required http2 transport headers?
- (:method, :scheme, content-type, with :path and :authority covered
- at the channel level right now) */
- if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
- calld->seen_path) {
- grpc_call_next_op(elem, op);
- } else {
- if (!calld->seen_post) {
- gpr_log(GPR_ERROR, "Missing :method header");
- }
- if (!calld->seen_scheme) {
- gpr_log(GPR_ERROR, "Missing :scheme header");
- }
- if (!calld->seen_te_trailers) {
- gpr_log(GPR_ERROR, "Missing te trailers header");
- }
- /* Error this call out */
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- }
- } else {
- grpc_call_next_op(elem, op);
- }
- break;
- case GRPC_SEND_METADATA:
- /* If we haven't sent status 200 yet, we need to so so because it needs to
- come before any non : prefixed metadata. */
- if (!calld->sent_status) {
- calld->sent_status = 1;
- grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
- grpc_mdelem_ref(channeld->status_ok));
- }
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
+ if (op->send_ops && !calld->sent_status) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+ grpc_mdelem_ref(channeld->status_ok));
break;
+ }
}
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hs_on_recv;
+ op->recv_user_data = elem;
+ }
+
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -321,6 +335,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_server_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index d987fa2bc1..403b60901b 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -50,8 +50,7 @@ static void ignore_unused(void *ignored) {}
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -59,12 +58,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
ignore_unused(calld);
ignore_unused(channeld);
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
+ /* pass control down the stack */
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -131,6 +126,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_no_op_filter = {
- call_op, channel_op, sizeof(call_data),
+ noop_start_transport_op, channel_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "no-op"};
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 608442c008..709ca0b397 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -150,6 +150,9 @@ struct grpc_call {
gpr_uint8 num_completed_requests;
/* flag that we need to request more data */
gpr_uint8 need_more_data;
+ /* flags with bits corresponding to write states allowing us to determine
+ what was sent */
+ gpr_uint8 last_send_contains;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
@@ -214,6 +217,10 @@ struct grpc_call {
size_t send_initial_metadata_count;
gpr_timespec send_deadline;
+ grpc_stream_op_buffer send_ops;
+ grpc_stream_op_buffer recv_ops;
+ grpc_stream_state recv_state;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@@ -234,9 +241,11 @@ struct grpc_call {
} while (0)
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
-static send_action choose_send_action(grpc_call *call);
-static void enact_send_action(grpc_call *call, send_action sa);
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
+static void call_on_done_recv(void *call, int success);
+static void call_on_done_send(void *call, int success);
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
+static void execute_op(grpc_call *call, grpc_transport_op *op);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -359,20 +368,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
return GRPC_CALL_OK;
}
-static void request_more_data(grpc_call *call) {
- grpc_call_op op;
-
- /* call down */
- op.type = GRPC_REQUEST_DATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.bind_pollset = NULL;
-
- grpc_call_execute_op(call, &op);
-}
-
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@@ -384,16 +379,31 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static void unlock(grpc_call *call) {
- send_action sa = SEND_NOTHING;
+ grpc_transport_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
- int need_more_data =
- call->need_more_data &&
- (call->write_state >= WRITE_STATE_STARTED || !call->is_client);
+ int start_op = 0;
int i;
- if (need_more_data) {
+ memset(&op, 0, sizeof(op));
+
+ if (call->need_more_data &&
+ (call->write_state >= WRITE_STATE_STARTED || !call->is_client)) {
+ op.recv_ops = &call->recv_ops;
+ op.recv_state = &call->recv_state;
+ op.on_done_recv = call_on_done_recv;
+ op.recv_user_data = call;
call->need_more_data = 0;
+ grpc_call_internal_ref(call);
+ start_op = 1;
+ }
+
+ if (!call->sending) {
+ if (fill_send_ops(call, &op)) {
+ call->sending = 1;
+ grpc_call_internal_ref(call);
+ start_op = 1;
+ }
}
if (!call->completing && call->num_completed_requests != 0) {
@@ -405,22 +415,10 @@ static void unlock(grpc_call *call) {
grpc_call_internal_ref(call);
}
- if (!call->sending) {
- sa = choose_send_action(call);
- if (sa != SEND_NOTHING) {
- call->sending = 1;
- grpc_call_internal_ref(call);
- }
- }
-
gpr_mu_unlock(&call->mu);
- if (need_more_data) {
- request_more_data(call);
- }
-
- if (sa != SEND_NOTHING) {
- enact_send_action(call, sa);
+ if (start_op) {
+ execute_op(call, &op);
}
if (completing_requests > 0) {
@@ -577,6 +575,64 @@ static void finish_start_step(void *pc, grpc_op_error error) {
error);
}
+static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
+ grpc_metadata *metadata) {
+ size_t i;
+ grpc_mdelem_list out;
+ if (count == 0) {
+ out.head = out.tail = NULL;
+ return out;
+ }
+ for (i = 0; i < count; i++) {
+ grpc_metadata *md = &metadata[i];
+ grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
+ grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
+ grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
+ l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value,
+ md->value_length);
+ l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
+ l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
+ }
+ out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
+ out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
+ return out;
+}
+
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
+ grpc_ioreq_data data;
+ grpc_metadata_batch mdb;
+ size_t i;
+ GPR_ASSERT(op->send_ops == NULL);
+
+ switch (call->write_state) {
+ case WRITE_STATE_INITIAL:
+ if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+ break;
+ }
+ data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
+ mdb.list = chain_metadata_from_app(
+ call, data.send_metadata.count, data.send_metadata.metadata);
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = call->send_deadline;
+ for (i = 0; i < call->send_initial_metadata_count; i++) {
+ grpc_metadata_batch_link_head(&mdb,
+ &call->send_initial_metadata[i]);
+ }
+ grpc_sopb_add_metadata(&call->send_ops, mdb);
+ op->send_ops = &call->send_ops;
+ op->bind_pollset = grpc_cq_pollset(call->cq);
+ call->last_send_contains |= 1 << WRITE_STATE_INITIAL;
+ /* fall through intended */
+ case WRITE_STATE_STARTED:
+ if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
+ data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ grpc_sopb_add_message(data.send_message);
+abort();
+ }
+}
+
static send_action choose_send_action(grpc_call *call) {
switch (call->write_state) {
case WRITE_STATE_INITIAL:
@@ -614,33 +670,7 @@ static send_action choose_send_action(grpc_call *call) {
return SEND_NOTHING;
}
-static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
- grpc_metadata *metadata) {
- size_t i;
- grpc_mdelem_list out;
- if (count == 0) {
- out.head = out.tail = NULL;
- return out;
- }
- for (i = 0; i < count; i++) {
- grpc_metadata *md = &metadata[i];
- grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
- grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
- assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
- l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
- (const gpr_uint8 *)md->value,
- md->value_length);
- l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
- l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
- }
- out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
- out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
- return out;
-}
-
static void enact_send_action(grpc_call *call, send_action sa) {
- grpc_ioreq_data data;
grpc_call_op op;
size_t i;
gpr_uint32 flags = 0;
@@ -654,37 +684,11 @@ static void enact_send_action(grpc_call *call, send_action sa) {
flags |= GRPC_WRITE_BUFFER_HINT;
/* fallthrough */
case SEND_INITIAL_METADATA:
- data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.metadata.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = call->send_deadline;
- for (i = 0; i < call->send_initial_metadata_count; i++) {
- grpc_metadata_batch_link_head(&op.data.metadata,
- &call->send_initial_metadata[i]);
- }
- call->send_initial_metadata_count = 0;
- op.done_cb = finish_start_step;
- op.user_data = call;
- op.bind_pollset = grpc_cq_pollset(call->cq);
- grpc_call_execute_op(call, &op);
break;
case SEND_BUFFERED_MESSAGE:
flags |= GRPC_WRITE_BUFFER_HINT;
/* fallthrough */
case SEND_MESSAGE:
- data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.message = data.send_message;
- op.done_cb = finish_write_step;
- op.user_data = call;
- op.bind_pollset = NULL;
- grpc_call_execute_op(call, &op);
break;
case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index f8d0915349..358e5560a3 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -106,7 +106,6 @@ void grpc_call_recv_message(grpc_call_element *surface_element,
void grpc_call_read_closed(grpc_call_element *surface_element);
void grpc_call_stream_closed(grpc_call_element *surface_element);
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index e32ee284e0..86bd5d2098 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -503,7 +503,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_mu_lock(&t->mu);
t->calling_back = 1;
- ref_transport(t);
+ ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
@@ -515,7 +515,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
- ref_transport(t);
+ ref_transport(t); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
unref_transport(t);
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index c3901bf608..dabe68f3bd 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -50,22 +50,9 @@ typedef enum grpc_stream_op_code {
Must be ignored by receivers */
GRPC_NO_OP,
GRPC_OP_METADATA,
- /* Begin a message/metadata element/status - as defined by
- grpc_message_type. */
- GRPC_OP_BEGIN_MESSAGE,
- /* Add a slice of data to the current message/metadata element/status.
- Must not overflow the forward declared length. */
- GRPC_OP_SLICE
+ GRPC_OP_MESSAGE
} grpc_stream_op_code;
-/* Arguments for GRPC_OP_BEGIN */
-typedef struct grpc_begin_message {
- /* How many bytes of data will this message contain */
- gpr_uint32 length;
- /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */
- gpr_uint32 flags;
-} grpc_begin_message;
-
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
@@ -118,9 +105,8 @@ typedef struct grpc_stream_op {
/* the arguments to this operation. union fields are named according to the
associated op-code */
union {
- grpc_begin_message begin_message;
+ grpc_byte_buffer *message;
grpc_metadata_batch metadata;
- gpr_slice slice;
} data;
} grpc_stream_op;
@@ -148,16 +134,8 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops);
/* Append a GRPC_NO_OP to a buffer */
void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
-/* Append a GRPC_OP_BEGIN to a buffer */
-void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
- gpr_uint32 flags);
+void grpc_sopb_add_message(grpc_stream_op_buffer *sopb, grpc_byte_buffer *bb);
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
-/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
-void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
-/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 6dd0fdaf46..f31011e56a 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -78,6 +78,8 @@ struct grpc_transport_callbacks {
void (*accept_stream)(void *user_data, grpc_transport *transport,
const void *server_data);
+ void (*goaway)(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug);
+
/* The transport has been closed */
void (*closed)(void *user_data, grpc_transport *transport);
};
@@ -139,8 +141,14 @@ typedef struct grpc_transport_op {
void *recv_user_data;
grpc_pollset *bind_pollset;
+
+ grpc_status_code cancel_with_status;
} grpc_transport_op;
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
+
+char *grpc_transport_op_string(grpc_transport_op *op);
+
/* Send a batch of operations on a transport
Takes ownership of any objects contained in ops.
@@ -161,19 +169,6 @@ void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data);
-/* Abort a stream
-
- Terminate reading and writing for a stream. A final recv_batch with no
- operations and final_state == GRPC_STREAM_CLOSED will be received locally,
- and no more data will be presented to the up-layer.
-
- TODO(ctiller): consider adding a HTTP/2 reason to this function. */
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status);
-
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset);
-
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
gpr_slice debug_data);
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
new file mode 100644
index 0000000000..5f7e1be268
--- /dev/null
+++ b/src/core/transport/transport_op_string.c
@@ -0,0 +1,141 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
+ gpr_strvec_add(b, gpr_strdup(" key="));
+ gpr_strvec_add(
+ b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
+
+ gpr_strvec_add(b, gpr_strdup(" value="));
+ gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
+ GPR_SLICE_LENGTH(md->value->slice),
+ GPR_HEXDUMP_PLAINTEXT));
+}
+
+static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
+ grpc_linked_mdelem *m;
+ for (m = md.list.head; m != NULL; m = m->next) {
+ put_metadata(b, m->md);
+ }
+ if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ char *tmp;
+ gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
+ md.deadline.tv_nsec);
+ gpr_strvec_add(b, tmp);
+ }
+}
+
+char *grpc_call_op_string(grpc_call_op *op) {
+ char *tmp;
+ char *out;
+
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ switch (op->dir) {
+ case GRPC_CALL_DOWN:
+ gpr_strvec_add(&b, gpr_strdup(">"));
+ break;
+ case GRPC_CALL_UP:
+ gpr_strvec_add(&b, gpr_strdup("<"));
+ break;
+ }
+ switch (op->type) {
+ case GRPC_SEND_METADATA:
+ gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
+ put_metadata_list(&b, op->data.metadata);
+ break;
+ case GRPC_SEND_MESSAGE:
+ gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
+ break;
+ case GRPC_SEND_PREFORMATTED_MESSAGE:
+ gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE"));
+ break;
+ case GRPC_SEND_FINISH:
+ gpr_strvec_add(&b, gpr_strdup("SEND_FINISH"));
+ break;
+ case GRPC_REQUEST_DATA:
+ gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA"));
+ break;
+ case GRPC_RECV_METADATA:
+ gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
+ put_metadata_list(&b, op->data.metadata);
+ break;
+ case GRPC_RECV_MESSAGE:
+ gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
+ break;
+ case GRPC_RECV_HALF_CLOSE:
+ gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE"));
+ break;
+ case GRPC_RECV_FINISH:
+ gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
+ break;
+ case GRPC_RECV_SYNTHETIC_STATUS:
+ gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'",
+ op->data.synthetic_status.status,
+ op->data.synthetic_status.message);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_CANCEL_OP:
+ gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
+ break;
+ }
+ gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
+ gpr_strvec_add(&b, tmp);
+ if (op->bind_pollset) {
+ gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_call_op *op) {
+ char *str = grpc_call_op_string(op);
+ gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
+ gpr_free(str);
+}