aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/census_filter.c70
-rw-r--r--src/core/channel/channel_stack.c44
-rw-r--r--src/core/channel/channel_stack.h77
-rw-r--r--src/core/channel/child_channel.c18
-rw-r--r--src/core/channel/client_channel.c93
-rw-r--r--src/core/channel/connected_channel.c292
-rw-r--r--src/core/channel/http_client_filter.c72
-rw-r--r--src/core/channel/http_filter.c137
-rw-r--r--src/core/channel/http_filter.h43
-rw-r--r--src/core/channel/http_server_filter.c122
-rw-r--r--src/core/channel/noop_filter.c13
-rw-r--r--src/core/surface/call.c504
-rw-r--r--src/core/surface/call.h15
-rw-r--r--src/core/surface/channel.c27
-rw-r--r--src/core/surface/channel.h1
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/client.c29
-rw-r--r--src/core/surface/lame_client.c19
-rw-r--r--src/core/surface/server.c124
-rw-r--r--src/core/surface/server_chttp2.c4
-rw-r--r--src/core/transport/chttp2/stream_encoder.c13
-rw-r--r--src/core/transport/chttp2_transport.c306
-rw-r--r--src/core/transport/stream_op.c14
-rw-r--r--src/core/transport/stream_op.h17
-rw-r--r--src/core/transport/transport.c16
-rw-r--r--src/core/transport/transport.h99
-rw-r--r--src/core/transport/transport_impl.h12
-rw-r--r--src/core/transport/transport_op_string.c152
28 files changed, 954 insertions, 1383 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index 9c0c20af22..3e0fc39fc9 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -49,6 +49,11 @@ typedef struct call_data {
census_op_id op_id;
census_rpc_stats stats;
gpr_timespec start_ts;
+
+ /* recv callback */
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -60,55 +65,58 @@ static void init_rpc_stats(census_rpc_stats* stats) {
stats->cnt = 1;
}
-static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
+static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_data* calld,
channel_data* chand) {
grpc_linked_mdelem* m;
- for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
- if (m->md->key == chand->path_str) {
- gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
- census_add_method_tag(
- calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ size_t i;
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op * op = &sopb->ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
+ if (m->md->key == chand->path_str) {
+ gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ census_add_method_tag(
+ calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ }
}
}
}
-static void client_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_SEND_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_RECV_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (op->send_ops) {
+ extract_and_annotate_method_tag(op->send_ops, calld, chand);
}
- /* Always pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
}
-static void server_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void server_on_done_recv(void *ptr, int success) {
+ grpc_call_element *elem = ptr;
+ call_data* calld = elem->call_data;
+ channel_data* chand = elem->channel_data;
+ if (success) {
+ extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_RECV_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_SEND_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (op->recv_ops) {
+ /* substitute our callback for the op callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_done_recv;
+ op->recv_user_data = elem;
}
/* Always pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
@@ -180,11 +188,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
- client_call_op, channel_op, sizeof(call_data), client_init_call_elem,
+ client_start_transport_op, channel_op, sizeof(call_data), client_init_call_elem,
client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
- server_call_op, channel_op, sizeof(call_data), server_init_call_elem,
+ server_start_transport_op, channel_op, sizeof(call_data), server_init_call_elem,
server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "census-server"};
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 3a3a3a75b7..c121e27005 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -35,6 +35,7 @@
#include <grpc/support/log.h>
#include <stdlib.h>
+#include <string.h>
int grpc_trace_channel = 0;
@@ -181,12 +182,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
}
}
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
- grpc_call_element *next_elem = elem + op->dir;
- if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) {
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- }
- next_elem->filter->call_op(next_elem, elem, op);
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
+ grpc_call_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_op(next_elem, op);
}
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
@@ -205,39 +203,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
-static void do_nothing(void *user_data, grpc_op_error error) {}
-
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
- grpc_call_op cancel_op;
- cancel_op.type = GRPC_CANCEL_OP;
- cancel_op.dir = GRPC_CALL_DOWN;
- cancel_op.done_cb = do_nothing;
- cancel_op.user_data = NULL;
- cancel_op.flags = 0;
- cancel_op.bind_pollset = NULL;
- grpc_call_next_op(cur_elem, &cancel_op);
-}
-
-void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
- grpc_call_op finish_op;
- finish_op.type = GRPC_SEND_FINISH;
- finish_op.dir = GRPC_CALL_DOWN;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- finish_op.flags = 0;
- finish_op.bind_pollset = NULL;
- grpc_call_next_op(cur_elem, &finish_op);
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ grpc_call_next_op(cur_elem, &op);
}
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message) {
- grpc_call_op op;
- op.type = GRPC_RECV_SYNTHETIC_STATUS;
- op.dir = GRPC_CALL_UP;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.data.synthetic_status.status = status;
- op.data.synthetic_status.message = message;
- grpc_call_next_op(cur_elem, &op);
+ abort();
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index addc92b272..75897ff651 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,78 +51,11 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
-/* Call operations - things that can be sent and received.
-
- Threading:
- SEND, RECV, and CANCEL ops can be active on a call at the same time, but
- only one SEND, one RECV, and one CANCEL can be active at a time.
-
- If state is shared between send/receive/cancel operations, it is up to
- filters to provide their own protection around that. */
-typedef enum {
- /* send metadata to the channels peer */
- GRPC_SEND_METADATA,
- /* send a message to the channels peer */
- GRPC_SEND_MESSAGE,
- /* send a pre-formatted message to the channels peer */
- GRPC_SEND_PREFORMATTED_MESSAGE,
- /* send half-close to the channels peer */
- GRPC_SEND_FINISH,
- /* request that more data be allowed through flow control */
- GRPC_REQUEST_DATA,
- /* metadata was received from the channels peer */
- GRPC_RECV_METADATA,
- /* a message was received from the channels peer */
- GRPC_RECV_MESSAGE,
- /* half-close was received from the channels peer */
- GRPC_RECV_HALF_CLOSE,
- /* full close was received from the channels peer */
- GRPC_RECV_FINISH,
- /* a status has been sythesized locally */
- GRPC_RECV_SYNTHETIC_STATUS,
- /* the call has been abnormally terminated */
- GRPC_CANCEL_OP
-} grpc_call_op_type;
-
/* The direction of the call.
The values of the enums (1, -1) matter here - they are used to increment
or decrement a pointer to find the next element to call */
typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
-/* A single filterable operation to be performed on a call */
-typedef struct {
- /* The type of operation we're performing */
- grpc_call_op_type type;
- /* The directionality of this call - does the operation begin at the bottom
- of the stack and flow up, or does the operation start at the top of the
- stack and flow down through the filters. */
- grpc_call_dir dir;
-
- /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */
- gpr_uint32 flags;
-
- /* Argument data, matching up with grpc_call_op_type names */
- union {
- grpc_byte_buffer *message;
- grpc_metadata_batch metadata;
- struct {
- grpc_status_code status;
- const char *message;
- } synthetic_status;
- } data;
-
- grpc_pollset *bind_pollset;
-
- /* Must be called when processing of this call-op is complete.
- Signature chosen to match transport flow control callbacks */
- void (*done_cb)(void *user_data, grpc_op_error error);
- /* User data to be passed into done_cb */
- void *user_data;
-} grpc_call_op;
-
-/* returns a string representation of op, that can be destroyed with gpr_free */
-char *grpc_call_op_string(grpc_call_op *op);
-
typedef enum {
/* send a goaway message to remote channels indicating that we are going
to disconnect in the future */
@@ -170,8 +103,7 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op);
+ void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@@ -272,8 +204,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);
-/* Call the next operation (depending on call directionality) in a call stack */
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op);
+/* Call the next operation in a call stack */
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
@@ -285,10 +217,9 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_call_op *op);
+ grpc_call_element *elem, grpc_transport_op *op);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
-void grpc_call_element_send_finish(grpc_call_element *cur_elem);
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message);
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 2cb03829c7..244417384a 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -61,22 +61,11 @@ typedef struct {
} lb_channel_data;
typedef struct {
- grpc_call_element *back;
grpc_child_channel *channel;
} lb_call_data;
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- lb_call_data *calld = elem->call_data;
-
- switch (op->dir) {
- case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_call_next_op(elem, op);
- break;
- }
+static void lb_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ grpc_call_next_op(elem, op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -165,7 +154,7 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_child_channel_top_filter = {
- lb_call_op, lb_channel_op, sizeof(lb_call_data),
+ lb_start_transport_op, lb_channel_op, sizeof(lb_call_data),
lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", };
@@ -282,7 +271,6 @@ grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
lbchand = lbelem->channel_data;
lbcalld = lbelem->call_data;
- lbcalld->back = parent;
lbcalld->channel = channel;
gpr_mu_lock(&lbchand->mu);
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index bc481e59ca..6ad50cb944 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -82,7 +82,7 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
- gpr_uint8 got_first_send;
+ gpr_uint8 got_first_op;
call_state state;
gpr_timespec deadline;
@@ -91,7 +91,7 @@ struct call_data {
/* our child call stack */
grpc_child_call *child_call;
} active;
- grpc_call_op waiting_op;
+ grpc_transport_op waiting_op;
} s;
};
@@ -110,9 +110,7 @@ static int prepare_activate(grpc_call_element *elem,
return 1;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
-static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
@@ -121,17 +119,16 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
- child_elem->filter->call_op(child_elem, elem, op);
+ child_elem->filter->start_transport_op(child_elem, op);
}
-static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void start_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&chand->mu);
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(op);
return;
}
GPR_ASSERT(calld->state == CALL_CREATED);
@@ -187,19 +184,10 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
}
static void send_up_cancelled_ops(grpc_call_element *elem) {
- grpc_call_op finish_op;
- /* send up a synthesized status */
- grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
- /* send up a finish */
- finish_op.type = GRPC_RECV_FINISH;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+ abort();
}
-static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
@@ -209,15 +197,13 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
case CALL_ACTIVE:
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
- child_elem->filter->call_op(child_elem, elem, op);
+ child_elem->filter->start_transport_op(child_elem, op);
return; /* early out */
case CALL_WAITING:
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
send_up_cancelled_ops(elem);
- calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
return; /* early out */
case CALL_CREATED:
calld->state = CALL_CANCELLED;
@@ -232,40 +218,27 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
abort();
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void cc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_SEND_METADATA:
- if (!calld->got_first_send) {
- /* filter out the start event to find which child to send on */
- calld->got_first_send = 1;
- start_rpc(elem, op);
- } else {
- grpc_call_next_op(elem, op);
- }
- break;
- case GRPC_CANCEL_OP:
- cancel_rpc(elem, op);
- break;
- case GRPC_SEND_MESSAGE:
- case GRPC_SEND_FINISH:
- case GRPC_REQUEST_DATA:
- if (calld->state == CALL_ACTIVE) {
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
- child_elem->filter->call_op(child_elem, elem, op);
- } else {
- op->done_cb(op->user_data, GRPC_OP_ERROR);
- }
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ GPR_ASSERT(op->send_ops == NULL);
+ GPR_ASSERT(op->recv_ops == NULL);
+
+ cancel_rpc(elem, op);
+ return;
+ }
+
+ if (!calld->got_first_op) {
+ calld->got_first_op = 1;
+ start_rpc(elem, op);
+ } else {
+ grpc_call_element *child_elem =
+ grpc_child_call_get_top_element(calld->s.active.child_call);
+ child_elem->filter->start_transport_op(child_elem, op);
}
}
@@ -359,7 +332,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->got_first_send = 0;
+ calld->got_first_op = 0;
}
/* Destructor for call_data */
@@ -372,9 +345,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
- if (calld->state == CALL_WAITING) {
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
- }
+ GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@@ -417,7 +388,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"client-channel",
};
@@ -436,7 +407,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
- grpc_call_op *call_ops;
+ grpc_transport_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@@ -472,13 +443,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
- call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+ call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
- call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(&call_ops[i]);
}
}
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 711274bfe1..9e2d92ffbc 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -45,24 +45,13 @@
#include <grpc/support/slice_buffer.h>
#define MAX_BUFFER_LENGTH 8192
-/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
typedef struct connected_channel_channel_data {
grpc_transport *transport;
- gpr_uint32 max_message_length;
} channel_data;
typedef struct connected_channel_call_data {
- grpc_call_element *elem;
- grpc_stream_op_buffer outgoing_sopb;
-
- gpr_uint32 max_message_length;
- gpr_uint32 incoming_message_length;
- gpr_uint8 reading_message;
- gpr_uint8 got_read_close;
- gpr_slice_buffer incoming_message;
- gpr_uint32 outgoing_buffer_length_estimate;
+ void *unused;
} call_data;
/* We perform a small hack to locate transport data alongside the connected
@@ -72,91 +61,15 @@ typedef struct connected_channel_call_data {
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
(((call_data *)(transport_stream)) - 1)
-/* Copy the contents of a byte buffer into stream ops */
-static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
- grpc_stream_op_buffer *sopb) {
- size_t i;
-
- switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
- gpr_slice_ref(slice);
- grpc_sopb_add_slice(sopb, slice);
- }
- break;
- }
-}
-
-/* Flush queued stream operations onto the transport */
-static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
- call_data *calld, int is_last) {
- size_t nops;
-
- if (op->flags & GRPC_WRITE_BUFFER_HINT) {
- if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
- op->done_cb(op->user_data, GRPC_OP_OK);
- return;
- }
- }
-
- calld->outgoing_buffer_length_estimate = 0;
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
-
- nops = calld->outgoing_sopb.nops;
- calld->outgoing_sopb.nops = 0;
- grpc_transport_send_batch(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- calld->outgoing_sopb.ops, nops, is_last);
-}
-
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- if (op->bind_pollset) {
- grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
- }
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_MESSAGE:
- grpc_sopb_add_begin_message(&calld->outgoing_sopb,
- grpc_byte_buffer_length(op->data.message),
- op->flags);
- /* fall-through */
- case GRPC_SEND_PREFORMATTED_MESSAGE:
- copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
- calld->outgoing_buffer_length_estimate +=
- (5 + grpc_byte_buffer_length(op->data.message));
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_FINISH:
- end_bufferable_op(op, chand, calld, 1);
- break;
- case GRPC_REQUEST_DATA:
- /* re-arm window updates if they were disarmed by finish_message */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
- break;
- case GRPC_CANCEL_OP:
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_CANCELLED);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -188,14 +101,6 @@ static void init_call_elem(grpc_call_element *elem,
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- calld->elem = elem;
- grpc_sopb_init(&calld->outgoing_sopb);
-
- calld->reading_message = 0;
- calld->got_read_close = 0;
- calld->outgoing_buffer_length_estimate = 0;
- calld->max_message_length = chand->max_message_length;
- gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
server_transport_data);
@@ -207,8 +112,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_sopb_destroy(&calld->outgoing_sopb);
- gpr_slice_buffer_destroy(&calld->incoming_message);
grpc_transport_destroy_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
@@ -218,28 +121,10 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
- size_t i;
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
-
- cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
- if (args) {
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
- if (args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else if (args->args[i].value.integer < 0) {
- gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else {
- cd->max_message_length = args->args[i].value.integer;
- }
- }
- }
- }
}
/* Destructor for channel_data */
@@ -250,15 +135,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
};
-static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
- grpc_stream *stream, size_t size_hint) {
- return gpr_slice_malloc(size_hint);
-}
-
/* Transport callback to accept a new stream... calls up to handle it */
static void accept_stream(void *user_data, grpc_transport *transport,
const void *transport_server_data) {
@@ -276,168 +156,6 @@ static void accept_stream(void *user_data, grpc_transport *transport,
channel_op(elem, NULL, &op);
}
-static void recv_error(channel_data *chand, call_data *calld, int line,
- const char *message) {
- gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
-
- if (chand->transport) {
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_INVALID_ARGUMENT);
- }
-}
-
-static void do_nothing(void *calldata, grpc_op_error error) {}
-
-static void finish_message(channel_data *chand, call_data *calld) {
- grpc_call_element *elem = calld->elem;
- grpc_call_op call_op;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- /* if we got all the bytes for this message, call up the stack */
- call_op.type = GRPC_RECV_MESSAGE;
- call_op.done_cb = do_nothing;
- /* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
- calld->incoming_message.count);
- gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
-
- /* disable window updates until we get a request more from above */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
-
- GPR_ASSERT(calld->incoming_message.count == 0);
- calld->reading_message = 0;
- grpc_call_next_op(elem, &call_op);
-}
-
-static void got_metadata(grpc_call_element *elem,
- grpc_metadata_batch metadata) {
- grpc_call_op op;
- op.type = GRPC_RECV_METADATA;
- op.dir = GRPC_CALL_UP;
- op.flags = 0;
- op.data.metadata = metadata;
- op.done_cb = do_nothing;
- op.user_data = NULL;
-
- grpc_call_next_op(elem, &op);
-}
-
-/* Handle incoming stream ops from the transport, translating them into
- call_ops to pass up the call stack */
-static void recv_batch(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops,
- size_t ops_count, grpc_stream_state final_state) {
- call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
- grpc_call_element *elem = calld->elem;
- channel_data *chand = elem->channel_data;
- grpc_stream_op *stream_op;
- grpc_call_op call_op;
- size_t i;
- gpr_uint32 length;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- for (i = 0; i < ops_count; i++) {
- stream_op = ops + i;
- switch (stream_op->type) {
- case GRPC_OP_FLOW_CTL_CB:
- stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
- break;
- case GRPC_NO_OP:
- break;
- case GRPC_OP_METADATA:
- got_metadata(elem, stream_op->data.metadata);
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- /* can't begin a message when we're still reading a message */
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Message terminated early; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- }
- /* stash away parameters, and prepare for incoming slices */
- length = stream_op->data.begin_message.length;
- if (length > calld->max_message_length) {
- char *message = NULL;
- gpr_asprintf(
- &message,
- "Maximum message length of %d exceeded by a message of length %d",
- calld->max_message_length, length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- } else if (length > 0) {
- calld->reading_message = 1;
- calld->incoming_message_length = length;
- } else {
- finish_message(chand, calld);
- }
- break;
- case GRPC_OP_SLICE:
- if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
- gpr_slice_unref(stream_op->data.slice);
- break;
- }
- /* we have to be reading a message to know what to do here */
- if (!calld->reading_message) {
- recv_error(chand, calld, __LINE__,
- "Received payload data while not reading a message");
- return;
- }
- /* append the slice to the incoming buffer */
- gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
- if (calld->incoming_message.length > calld->incoming_message_length) {
- /* if we got too many bytes, complain */
- char *message = NULL;
- gpr_asprintf(&message,
- "Receiving message overflow; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- } else if (calld->incoming_message.length ==
- calld->incoming_message_length) {
- finish_message(chand, calld);
- }
- }
- }
- /* if the stream closed, then call up the stack to let it know */
- if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
- final_state == GRPC_STREAM_CLOSED)) {
- calld->got_read_close = 1;
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Last message truncated; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- }
- call_op.type = GRPC_RECV_HALF_CLOSE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
- if (final_state == GRPC_STREAM_CLOSED) {
- call_op.type = GRPC_RECV_FINISH;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
-}
-
static void transport_goaway(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug) {
/* transport got goaway ==> call up and handle it */
@@ -470,7 +188,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
}
const grpc_transport_callbacks connected_channel_transport_callbacks = {
- alloc_recv_buffer, accept_stream, recv_batch,
+ accept_stream,
transport_goaway, transport_closed,
};
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 56e12342d7..7287408401 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -39,6 +39,12 @@ typedef struct call_data {
grpc_linked_mdelem scheme;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
+ int sent_initial_metadata;
+
+ int got_initial_metadata;
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -64,22 +70,39 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void hc_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
+ size_t i;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_SEND_METADATA:
+ if (op->send_ops && !calld->sent_initial_metadata) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_initial_metadata = 1;
/* Send : prefixed headers, which have to be before any application
- * layer headers. */
+ layer headers. */
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
grpc_mdelem_ref(channeld->method));
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
@@ -88,17 +111,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_mdelem_ref(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
grpc_mdelem_ref(channeld->content_type));
- grpc_call_next_op(elem, op);
- break;
- case GRPC_RECV_METADATA:
- grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
break;
+ }
}
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hc_on_recv;
+ op->recv_user_data = elem;
+ }
+
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -120,7 +146,11 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
+ const void *server_transport_data) {
+ call_data *calld = elem->call_data;
+ calld->sent_initial_metadata = 0;
+ calld->got_initial_metadata = 0;
+}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
@@ -181,6 +211,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"http-client"};
diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c
deleted file mode 100644
index 453a0422d8..0000000000
--- a/src/core/channel/http_filter.c
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/http_filter.h"
-#include <grpc/support/log.h>
-
-typedef struct call_data {
- int unused; /* C89 requires at least one struct element */
-} call_data;
-
-typedef struct channel_data {
- int unused; /* C89 requires at least one struct element */
-} channel_data;
-
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- ignore_unused(calld);
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
-}
-
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
-/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- /* initialize members */
- calld->unused = channeld->unused;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(calld);
- ignore_unused(channeld);
-}
-
-/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- /* The first and the last filters tend to be implemented differently to
- handle the case that there's no 'next' filter to call on the up or down
- path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
-
- /* initialize members */
- channeld->unused = 0;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-}
-
-const grpc_channel_filter grpc_http_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http"};
diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h
deleted file mode 100644
index 1b116ad61f..0000000000
--- a/src/core/channel/http_filter.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* Processes metadata that is common to both client and server for HTTP2
- transports. */
-extern const grpc_channel_filter grpc_http_filter;
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 0bfe2f2e30..31ed8a3393 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -52,6 +52,10 @@ typedef struct call_data {
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
grpc_linked_mdelem status;
+
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -143,66 +147,76 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void hs_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path) {
+ /* do nothing */
+ } else {
+ if (!calld->seen_path) {
+ gpr_log(GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_post) {
+ gpr_log(GPR_ERROR, "Missing :method header");
+ }
+ if (!calld->seen_scheme) {
+ gpr_log(GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers) {
+ gpr_log(GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ success = 0;
+ grpc_call_element_send_cancel(elem);
+ }
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
+ size_t i;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_RECV_METADATA:
- grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (!calld->got_initial_metadata) {
- calld->got_initial_metadata = 1;
- /* Have we seen the required http2 transport headers?
- (:method, :scheme, content-type, with :path and :authority covered
- at the channel level right now) */
- if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
- calld->seen_path) {
- grpc_call_next_op(elem, op);
- } else {
- if (!calld->seen_path) {
- gpr_log(GPR_ERROR, "Missing :path header");
- }
- if (!calld->seen_post) {
- gpr_log(GPR_ERROR, "Missing :method header");
- }
- if (!calld->seen_scheme) {
- gpr_log(GPR_ERROR, "Missing :scheme header");
- }
- if (!calld->seen_te_trailers) {
- gpr_log(GPR_ERROR, "Missing te trailers header");
- }
- /* Error this call out */
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- }
- } else {
- grpc_call_next_op(elem, op);
- }
- break;
- case GRPC_SEND_METADATA:
- /* If we haven't sent status 200 yet, we need to so so because it needs to
- come before any non : prefixed metadata. */
- if (!calld->sent_status) {
- calld->sent_status = 1;
- grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
- grpc_mdelem_ref(channeld->status_ok));
- }
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
+ if (op->send_ops && !calld->sent_status) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+ grpc_mdelem_ref(channeld->status_ok));
break;
+ }
}
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hs_on_recv;
+ op->recv_user_data = elem;
+ }
+
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -324,6 +338,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_server_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index d987fa2bc1..403b60901b 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -50,8 +50,7 @@ static void ignore_unused(void *ignored) {}
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -59,12 +58,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
ignore_unused(calld);
ignore_unused(channeld);
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
+ /* pass control down the stack */
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -131,6 +126,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_no_op_filter = {
- call_op, channel_op, sizeof(call_data),
+ noop_start_transport_op, channel_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "no-op"};
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 2949805622..18be81308d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -81,9 +81,9 @@ typedef struct {
grpc_ioreq_completion_func on_complete;
void *user_data;
/* a bit mask of which request ops are needed (1u << opid) */
- gpr_uint32 need_mask;
+ gpr_uint16 need_mask;
/* a bit mask of which request ops are now completed */
- gpr_uint32 complete_mask;
+ gpr_uint16 complete_mask;
} reqinfo_master;
/* Status data for a request can come from several sources; this
@@ -144,12 +144,17 @@ struct grpc_call {
gpr_uint8 have_alarm;
/* are we currently performing a send operation */
gpr_uint8 sending;
+ /* are we currently performing a recv operation */
+ gpr_uint8 receiving;
/* are we currently completing requests */
gpr_uint8 completing;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
- /* flag that we need to request more data */
- gpr_uint8 need_more_data;
+ /* are we currently reading a message? */
+ gpr_uint8 reading_message;
+ /* flags with bits corresponding to write states allowing us to determine
+ what was sent */
+ gpr_uint16 last_send_contains;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
@@ -214,6 +219,13 @@ struct grpc_call {
size_t send_initial_metadata_count;
gpr_timespec send_deadline;
+ grpc_stream_op_buffer send_ops;
+ grpc_stream_op_buffer recv_ops;
+ grpc_stream_state recv_state;
+
+ gpr_slice_buffer incoming_message;
+ gpr_uint32 incoming_message_length;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@@ -234,9 +246,13 @@ struct grpc_call {
} while (0)
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
-static send_action choose_send_action(grpc_call *call);
-static void enact_send_action(grpc_call *call, send_action sa);
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
+static void call_on_done_recv(void *call, int success);
+static void call_on_done_send(void *call, int success);
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
+static void execute_op(grpc_call *call, grpc_transport_op *op);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
+static void finish_read_ops(grpc_call *call);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -267,6 +283,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
+ grpc_sopb_init(&call->send_ops);
+ grpc_sopb_init(&call->recv_ops);
/* one ref is dropped in response to destroy, the other in
stream_closed */
gpr_ref_init(&call->internal_refcount, 2);
@@ -314,6 +332,8 @@ static void destroy_call(void *call, int ignored_success) {
destroy_legacy_state(c->legacy_state);
}
grpc_bbq_destroy(&c->incoming_queue);
+ grpc_sopb_destroy(&c->send_ops);
+ grpc_sopb_destroy(&c->recv_ops);
gpr_free(c);
}
@@ -359,20 +379,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
return GRPC_CALL_OK;
}
-static void request_more_data(grpc_call *call) {
- grpc_call_op op;
-
- /* call down */
- op.type = GRPC_REQUEST_DATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.bind_pollset = NULL;
-
- grpc_call_execute_op(call, &op);
-}
-
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@@ -383,17 +389,42 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
+static int need_more_data(grpc_call *call) {
+ return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
+ is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
+ is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
+ is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
+ is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
+ is_op_live(call, GRPC_IOREQ_RECV_CLOSE);
+}
+
static void unlock(grpc_call *call) {
- send_action sa = SEND_NOTHING;
+ grpc_transport_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
- int need_more_data =
- call->need_more_data &&
- (call->write_state >= WRITE_STATE_STARTED || !call->is_client);
+ int start_op = 0;
int i;
- if (need_more_data) {
- call->need_more_data = 0;
+ memset(&op, 0, sizeof(op));
+
+ if (!call->receiving &&
+ (call->write_state >= WRITE_STATE_STARTED || !call->is_client) &&
+ need_more_data(call)) {
+ op.recv_ops = &call->recv_ops;
+ op.recv_state = &call->recv_state;
+ op.on_done_recv = call_on_done_recv;
+ op.recv_user_data = call;
+ call->receiving = 1;
+ grpc_call_internal_ref(call);
+ start_op = 1;
+ }
+
+ if (!call->sending) {
+ if (fill_send_ops(call, &op)) {
+ call->sending = 1;
+ grpc_call_internal_ref(call);
+ start_op = 1;
+ }
}
if (!call->completing && call->num_completed_requests != 0) {
@@ -405,22 +436,10 @@ static void unlock(grpc_call *call) {
grpc_call_internal_ref(call);
}
- if (!call->sending) {
- sa = choose_send_action(call);
- if (sa != SEND_NOTHING) {
- call->sending = 1;
- grpc_call_internal_ref(call);
- }
- }
-
gpr_mu_unlock(&call->mu);
- if (need_more_data) {
- request_more_data(call);
- }
-
- if (sa != SEND_NOTHING) {
- enact_send_action(call, sa);
+ if (start_op) {
+ execute_op(call, &op);
}
if (completing_requests > 0) {
@@ -554,64 +573,137 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
}
}
-static void finish_send_op(grpc_call *call, grpc_ioreq_op op, write_state ws,
- grpc_op_error error) {
+static void call_on_done_send(void *pc, int success) {
+ grpc_call *call = pc;
+ grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR;
lock(call);
- finish_ioreq_op(call, op, error);
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
+ }
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error);
+ }
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error);
+ }
call->sending = 0;
- call->write_state = ws;
unlock(call);
grpc_call_internal_unref(call, 0);
}
-static void finish_write_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, WRITE_STATE_STARTED, error);
+static void finish_message(grpc_call *call) {
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
+ call->incoming_message.slices, call->incoming_message.count);
+ gpr_slice_buffer_reset_and_unref(&call->incoming_message);
+
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+
+ GPR_ASSERT(call->incoming_message.count == 0);
+ call->reading_message = 0;
}
-static void finish_finish_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, WRITE_STATE_WRITE_CLOSED, error);
+static int begin_message(grpc_call *call, grpc_begin_message msg) {
+ /* can't begin a message when we're still reading a message */
+ if (call->reading_message) {
+ char *message = NULL;
+ gpr_asprintf(
+ &message, "Message terminated early; read %d bytes, expected %d",
+ (int)call->incoming_message.length, (int)call->incoming_message_length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ }
+ /* stash away parameters, and prepare for incoming slices */
+ if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
+ char *message = NULL;
+ gpr_asprintf(
+ &message,
+ "Maximum message length of %d exceeded by a message of length %d",
+ grpc_channel_get_max_message_length(call->channel), msg.length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ } else if (msg.length > 0) {
+ call->reading_message = 1;
+ call->incoming_message_length = msg.length;
+ return 1;
+ } else {
+ finish_message(call);
+ return 1;
+ }
}
-static void finish_start_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, WRITE_STATE_STARTED,
- error);
+static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
+ if (GPR_SLICE_LENGTH(slice) == 0) {
+ gpr_slice_unref(slice);
+ return 1;
+ }
+ /* we have to be reading a message to know what to do here */
+ if (!call->reading_message) {
+ grpc_call_cancel_with_status(
+ call, GRPC_STATUS_INVALID_ARGUMENT,
+ "Received payload data while not reading a message");
+ return 0;
+ }
+ /* append the slice to the incoming buffer */
+ gpr_slice_buffer_add(&call->incoming_message, slice);
+ if (call->incoming_message.length > call->incoming_message_length) {
+ /* if we got too many bytes, complain */
+ char *message = NULL;
+ gpr_asprintf(
+ &message, "Receiving message overflow; read %d bytes, expected %d",
+ (int)call->incoming_message.length, (int)call->incoming_message_length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ } else if (call->incoming_message.length == call->incoming_message_length) {
+ finish_message(call);
+ return 1;
+ } else {
+ return 1;
+ }
}
-static send_action choose_send_action(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_INITIAL:
- if (is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
- if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) ||
- is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- return SEND_BUFFERED_INITIAL_METADATA;
- } else {
- return SEND_INITIAL_METADATA;
- }
- }
- return SEND_NOTHING;
- case WRITE_STATE_STARTED:
- if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
- if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- return SEND_BUFFERED_MESSAGE;
- } else {
- return SEND_MESSAGE;
- }
- } else if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
- if (call->is_client) {
- return SEND_FINISH;
- } else {
- return SEND_TRAILING_METADATA_AND_FINISH;
- }
- }
- return SEND_NOTHING;
- case WRITE_STATE_WRITE_CLOSED:
- return SEND_NOTHING;
+static void call_on_done_recv(void *pc, int success) {
+ grpc_call *call = pc;
+ size_t i;
+ int unref = 0;
+ lock(call);
+ for (i = 0; success && i < call->recv_ops.nops; i++) {
+ grpc_stream_op *op = &call->recv_ops.ops[i];
+ switch (op->type) {
+ case GRPC_NO_OP:
+ break;
+ case GRPC_OP_METADATA:
+ recv_metadata(call, &op->data.metadata);
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ success = begin_message(call, op->data.begin_message);
+ break;
+ case GRPC_OP_SLICE:
+ success = add_slice_to_message(call, op->data.slice);
+ break;
+ }
+ }
+ if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
+ GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
+ call->read_state = READ_STATE_READ_CLOSED;
+ }
+ if (call->recv_state == GRPC_STREAM_CLOSED) {
+ GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
+ call->read_state = READ_STATE_STREAM_CLOSED;
+ unref = 1;
+ }
+ if (!success) {
+ abort();
+ }
+ finish_read_ops(call);
+ unlock(call);
+
+ if (unref) {
+ grpc_call_internal_unref(call, 0);
}
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- return SEND_NOTHING;
}
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
@@ -639,97 +731,100 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
return out;
}
-static void enact_send_action(grpc_call *call, send_action sa) {
+/* Copy the contents of a byte buffer into stream ops */
+static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
+ grpc_stream_op_buffer *sopb) {
+ size_t i;
+
+ switch (byte_buffer->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ gpr_slice_ref(slice);
+ grpc_sopb_add_slice(sopb, slice);
+ }
+ break;
+ }
+}
+
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
- grpc_call_op op;
+ grpc_metadata_batch mdb;
size_t i;
- gpr_uint32 flags = 0;
char status_str[GPR_LTOA_MIN_BUFSIZE];
+ GPR_ASSERT(op->send_ops == NULL);
- switch (sa) {
- case SEND_NOTHING:
- abort();
- break;
- case SEND_BUFFERED_INITIAL_METADATA:
- flags |= GRPC_WRITE_BUFFER_HINT;
- /* fallthrough */
- case SEND_INITIAL_METADATA:
+ switch (call->write_state) {
+ case WRITE_STATE_INITIAL:
+ if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+ break;
+ }
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.metadata.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = call->send_deadline;
+ mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+ data.send_metadata.metadata);
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = call->send_deadline;
for (i = 0; i < call->send_initial_metadata_count; i++) {
- grpc_metadata_batch_link_head(&op.data.metadata,
- &call->send_initial_metadata[i]);
+ grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
}
- call->send_initial_metadata_count = 0;
- op.done_cb = finish_start_step;
- op.user_data = call;
- op.bind_pollset = grpc_cq_pollset(call->cq);
- grpc_call_execute_op(call, &op);
- break;
- case SEND_BUFFERED_MESSAGE:
- flags |= GRPC_WRITE_BUFFER_HINT;
- /* fallthrough */
- case SEND_MESSAGE:
- data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.message = data.send_message;
- op.done_cb = finish_write_step;
- op.user_data = call;
- op.bind_pollset = NULL;
- grpc_call_execute_op(call, &op);
- break;
- case SEND_TRAILING_METADATA_AND_FINISH:
- /* send trailing metadata */
- data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.metadata.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = call->send_deadline;
- op.bind_pollset = NULL;
- /* send status */
- /* TODO(ctiller): cache common status values */
- data = call->request_data[GRPC_IOREQ_SEND_STATUS];
- gpr_ltoa(data.send_status.code, status_str);
- grpc_metadata_batch_add_tail(
- &op.data.metadata, &call->status_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context, status_str)));
- if (data.send_status.details) {
- grpc_metadata_batch_add_tail(
- &op.data.metadata, &call->details_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_status.details)));
+ grpc_sopb_add_metadata(&call->send_ops, mdb);
+ op->send_ops = &call->send_ops;
+ op->bind_pollset = grpc_cq_pollset(call->cq);
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
+ call->write_state = WRITE_STATE_STARTED;
+ /* fall through intended */
+ case WRITE_STATE_STARTED:
+ if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
+ data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ grpc_sopb_add_begin_message(
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
+ op->send_ops = &call->send_ops;
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
+ }
+ if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
+ op->is_last_send = 1;
+ op->send_ops = &call->send_ops;
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ if (!call->is_client) {
+ /* send trailing metadata */
+ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
+ mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+ data.send_metadata.metadata);
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = call->send_deadline;
+ /* send status */
+ /* TODO(ctiller): cache common status values */
+ data = call->request_data[GRPC_IOREQ_SEND_STATUS];
+ gpr_ltoa(data.send_status.code, status_str);
+ grpc_metadata_batch_add_tail(
+ &mdb, &call->status_link,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context, status_str)));
+ if (data.send_status.details) {
+ grpc_metadata_batch_add_tail(
+ &mdb, &call->details_link,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(
+ grpc_channel_get_message_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context,
+ data.send_status.details)));
+ }
+ }
}
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
- /* fallthrough: see choose_send_action for details */
- case SEND_FINISH:
- op.type = GRPC_SEND_FINISH;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = finish_finish_step;
- op.user_data = call;
- op.bind_pollset = NULL;
- grpc_call_execute_op(call, &op);
break;
+ case WRITE_STATE_WRITE_CLOSED:
+ break;
+ }
+ if (op->send_ops) {
+ op->on_done_send = call_on_done_send;
+ op->send_user_data = call;
}
+ return op->send_ops != NULL;
}
static grpc_call_error start_ioreq_error(grpc_call *call,
@@ -838,10 +933,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
master->on_complete = completion;
master->user_data = user_data;
- if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) {
- call->need_more_data = 1;
- }
-
finish_read_ops(call);
early_out_write_ops(call);
@@ -871,19 +962,12 @@ void grpc_call_destroy(grpc_call *c) {
grpc_call_internal_unref(c, 1);
}
-grpc_call_error grpc_call_cancel(grpc_call *c) {
- grpc_call_element *elem;
- grpc_call_op op;
-
- op.type = GRPC_CANCEL_OP;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.bind_pollset = NULL;
+grpc_call_error grpc_call_cancel(grpc_call *call) {
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
- elem = CALL_ELEM_FROM_CALL(c, 0);
- elem->filter->call_op(elem, NULL, &op);
+ execute_op(call, &op);
return GRPC_CALL_OK;
}
@@ -901,11 +985,10 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return grpc_call_cancel(c);
}
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
+static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, op);
+ elem->filter->start_transport_op(elem, op);
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -934,28 +1017,6 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-static void set_read_state_locked(grpc_call *call, read_state state) {
- GPR_ASSERT(call->read_state < state);
- call->read_state = state;
- finish_read_ops(call);
-}
-
-static void set_read_state(grpc_call *call, read_state state) {
- lock(call);
- set_read_state_locked(call, state);
- unlock(call);
-}
-
-void grpc_call_read_closed(grpc_call_element *elem) {
- set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
-}
-
-void grpc_call_stream_closed(grpc_call_element *elem) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- set_read_state(call, READ_STATE_STREAM_CLOSED);
- grpc_call_internal_unref(call, 0);
-}
-
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
@@ -979,35 +1040,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
-void grpc_call_recv_message(grpc_call_element *elem,
- grpc_byte_buffer *byte_buffer) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
- finish_read_ops(call);
- unlock(call);
-}
-
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- set_status_code(call, STATUS_FROM_CORE, status);
- set_status_details(call, STATUS_FROM_CORE,
- grpc_mdstr_from_string(call->metadata_context, message));
- unlock(call);
-}
-
-int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
int is_trailing;
grpc_mdctx *mdctx = call->metadata_context;
- lock(call);
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
for (l = md->list.head; l != NULL; l = l->next) {
grpc_mdelem *md = l->md;
@@ -1043,9 +1082,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
- set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
+ call->read_state = READ_STATE_GOT_INITIAL_METADATA;
}
- unlock(call);
grpc_mdctx_lock(mdctx);
for (l = md->list.head; l; l = l->next) {
@@ -1055,8 +1093,6 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
}
grpc_mdctx_unlock(mdctx);
-
- return !is_trailing;
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index f8d0915349..199beb1738 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -96,27 +96,12 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
-/* Helpers for grpc_client, grpc_server filters to publish received data to
- the completion queue/surface layer */
-/* receive metadata - returns 1 if this was initial metadata */
-int grpc_call_recv_metadata(grpc_call_element *surface_element,
- grpc_metadata_batch *md);
-void grpc_call_recv_message(grpc_call_element *surface_element,
- grpc_byte_buffer *message);
-void grpc_call_read_closed(grpc_call_element *surface_element);
-void grpc_call_stream_closed(grpc_call_element *surface_element);
-
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message);
-
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 29b042e7c1..78f9144c19 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -52,6 +52,7 @@ typedef struct registered_call {
struct grpc_channel {
int is_client;
gpr_refcount refs;
+ gpr_uint32 max_message_length;
grpc_mdctx *metadata_context;
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_message_string;
@@ -68,9 +69,13 @@ struct grpc_channel {
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
+/* the protobuf library will (by default) start warning at 100megs */
+#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+ size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
@@ -88,6 +93,24 @@ grpc_channel *grpc_channel_create_from_filters(
CHANNEL_STACK_FROM_CHANNEL(channel));
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
+
+ channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
+ if (args) {
+ for (i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
+ if (args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else if (args->args[i].value.integer < 0) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else {
+ channel->max_message_length = args->args[i].value.integer;
+ }
+ }
+ }
+ }
+
return channel;
}
@@ -219,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
return channel->grpc_message_string;
}
+
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) {
+ return channel->max_message_length;
+}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index d3e51185ee..05d57a905b 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -44,6 +44,7 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
void grpc_client_channel_closed(grpc_channel_element *elem);
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 3104b1d00d..5f3e9bec0c 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -44,7 +44,6 @@
#include "src/core/channel/client_setup.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
@@ -176,8 +175,7 @@ static void done_setup(void *sp) {
static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter};
return grpc_client_channel_transport_setup_complete(
channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
mdctx);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 2f898ff7d7..4669c59ee4 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -43,32 +43,9 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void client_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, &op->data.metadata);
- break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_RECV_HALF_CLOSE:
- grpc_call_read_closed(elem);
- break;
- case GRPC_RECV_FINISH:
- grpc_call_stream_closed(elem);
- break;
- case GRPC_RECV_SYNTHETIC_STATUS:
- grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status,
- op->data.synthetic_status.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
- }
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -104,6 +81,6 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
const grpc_channel_filter grpc_client_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client",
};
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 78170806f1..4e9eb808d7 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -46,22 +46,9 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void lame_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_metadata_batch_destroy(&op->data.metadata);
- grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN,
- "Rpc sent on a lame channel.");
- grpc_call_stream_closed(elem);
- break;
- default:
- break;
- }
-
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(op);
}
static void channel_op(grpc_channel_element *elem,
@@ -93,7 +80,7 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"lame-client",
};
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e771929870..82d1323a4b 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -173,13 +173,19 @@ struct call_data {
grpc_call *call;
call_state state;
- gpr_timespec deadline;
grpc_mdstr *path;
grpc_mdstr *host;
+ gpr_timespec deadline;
+ int got_initial_metadata;
legacy_data *legacy;
grpc_completion_queue *cq_new;
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -371,46 +377,6 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
-static void stream_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- break;
- case PENDING:
- call_list_remove(calld, PENDING_START);
- /* fallthrough intended */
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
- grpc_call_stream_closed(elem);
-}
-
-static void read_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- case PENDING:
- grpc_call_read_closed(elem);
- break;
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
-}
-
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
@@ -425,33 +391,69 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
- grpc_call_op *op) {
+static void server_on_recv(void *ptr, int success) {
+ grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_RECV_METADATA:
+ channel_data *chand = elem->channel_data;
+
+ if (success && !calld->got_initial_metadata) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
calld->deadline = op->data.metadata.deadline;
- start_new_rpc(elem);
}
+ calld->got_initial_metadata = 1;
+ start_new_rpc(elem);
break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_RECV_HALF_CLOSE:
- read_closed(elem);
- break;
- case GRPC_RECV_FINISH:
- stream_closed(elem);
+ }
+ }
+
+ switch (*calld->recv_state) {
+ case GRPC_STREAM_OPEN: break;
+ case GRPC_STREAM_SEND_CLOSED: break;
+ case GRPC_STREAM_RECV_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
+ case GRPC_STREAM_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ } else if (calld->state == PENDING) {
+ call_list_remove(calld, PENDING_START);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
}
+
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ if (op->recv_ops) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->recv_state = op->recv_state;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_recv;
+ op->recv_user_data = elem;
+ }
+
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -592,7 +594,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
};
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index f3b9219f8b..ebde5095a9 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -33,7 +33,6 @@
#include <grpc/grpc.h>
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_server.h"
@@ -46,8 +45,7 @@
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 5ca31d6bc7..9b62aefd0b 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -481,11 +481,10 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
break;
case GRPC_OP_METADATA:
grpc_metadata_batch_assert_ok(&op->data.metadata);
- case GRPC_OP_FLOW_CTL_CB:
- /* these just get copied as they don't impact the number of flow
- controlled bytes */
- grpc_sopb_append(outops, op, 1);
- curop++;
+ /* these just get copied as they don't impact the number of flow
+ controlled bytes */
+ grpc_sopb_append(outops, op, 1);
+ curop++;
break;
case GRPC_OP_BEGIN_MESSAGE:
/* begin op: for now we just convert the op to a slice and fall
@@ -567,10 +566,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
GPR_ERROR,
"These stream ops should be filtered out by grpc_chttp2_preencode");
abort();
- case GRPC_OP_FLOW_CTL_CB:
- op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
- curop++;
- break;
case GRPC_OP_METADATA:
/* Encode a metadata batch; store the returned values, representing
a metadata element that needs to be unreffed back into the metadata
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index e32ee284e0..9c2af560c1 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -91,10 +91,9 @@ typedef enum {
/* streams that are waiting to start because there are too many concurrent
streams on the connection */
WAITING_FOR_CONCURRENCY,
- /* streams that want to callback the application */
- PENDING_CALLBACKS,
- /* streams that *ARE* calling back to the application */
- EXECUTING_CALLBACKS,
+ /* streams that have finished reading: we wait until unlock to coalesce
+ all changes into one callback */
+ FINISHED_READ_OP,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
@@ -141,6 +140,12 @@ typedef enum {
DTS_FRAME
} deframe_transport_state;
+typedef enum {
+ WRITE_STATE_OPEN,
+ WRITE_STATE_QUEUED_CLOSE,
+ WRITE_STATE_SENT_CLOSE
+} WRITE_STATE;
+
typedef struct {
stream *head;
stream *tail;
@@ -182,6 +187,18 @@ typedef struct {
gpr_slice debug;
} pending_goaway;
+typedef struct {
+ void (*cb)(void *user_data, int success);
+ void *user_data;
+ int success;
+} op_closure;
+
+typedef struct {
+ op_closure *callbacks;
+ size_t count;
+ size_t capacity;
+} op_closure_array;
+
struct transport {
grpc_transport base; /* must be first */
const grpc_transport_callbacks *cb;
@@ -202,6 +219,10 @@ struct transport {
gpr_uint8 closed;
error_state error_state;
+ /* queued callbacks */
+ op_closure_array pending_callbacks;
+ op_closure_array executing_callbacks;
+
/* stream indexing */
gpr_uint32 next_stream_id;
gpr_uint32 last_incoming_stream_id;
@@ -281,14 +302,15 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- gpr_uint8 queued_write_closed;
- gpr_uint8 sending_write_closed;
- gpr_uint8 sent_write_closed;
+ WRITE_STATE write_state;
+ gpr_uint8 send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
- gpr_uint8 allow_window_updates;
gpr_uint8 published_close;
+ op_closure send_done_closure;
+ op_closure recv_done_closure;
+
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
@@ -299,7 +321,10 @@ struct stream {
gpr_timespec incoming_deadline;
/* sops from application */
- grpc_stream_op_buffer outgoing_sopb;
+ grpc_stream_op_buffer *outgoing_sopb;
+ grpc_stream_op_buffer *incoming_sopb;
+ grpc_stream_state *publish_state;
+ grpc_stream_state published_state;
/* sops that have passed flow control to be written */
grpc_stream_op_buffer writing_sopb;
@@ -348,6 +373,13 @@ static void become_skip_parser(transport *t);
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
+static void schedule_cb(transport *t, op_closure closure, int success);
+static void maybe_finish_read(transport *t, stream *s);
+static void maybe_join_window_updates(transport *t, stream *s);
+static void finish_reads(transport *t);
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
+
+
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -416,6 +448,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
+ memset(t, 0, sizeof(*t));
+
t->base.vtable = &vtable;
t->ep = ep;
/* one ref is for destroy, the other for when ep becomes NULL */
@@ -427,27 +461,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->reading = 1;
- t->writing = 0;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
- t->last_incoming_stream_id = 0;
- t->destroying = 0;
- t->closed = 0;
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
- t->expect_continuation_stream_id = 0;
- t->pings = NULL;
- t->ping_count = 0;
- t->ping_capacity = 0;
t->ping_counter = gpr_now().tv_nsec;
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- t->pending_goaways = NULL;
- t->num_pending_goaways = 0;
- t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
@@ -462,7 +485,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
needed.
TODO(ctiller): tune this */
grpc_chttp2_stream_map_init(&t->stream_map, 8);
- memset(&t->lists, 0, sizeof(t->lists));
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
@@ -503,7 +525,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_mu_lock(&t->mu);
t->calling_back = 1;
- ref_transport(t);
+ ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
@@ -515,7 +537,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
- ref_transport(t);
+ ref_transport(t); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
unref_transport(t);
@@ -577,6 +599,8 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
transport *t = (transport *)gt;
stream *s = (stream *)gs;
+ memset(s, 0, sizeof(*s));
+
ref_transport(t);
if (!server_data) {
@@ -592,20 +616,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->queued_write_closed = 0;
- s->sending_write_closed = 0;
- s->sent_write_closed = 0;
- s->read_closed = 0;
- s->cancelled = 0;
- s->allow_window_updates = 0;
- s->published_close = 0;
- s->incoming_metadata_count = 0;
- s->incoming_metadata_capacity = 0;
- s->incoming_metadata = NULL;
s->incoming_deadline = gpr_inf_future;
- memset(&s->links, 0, sizeof(s->links));
- memset(&s->included, 0, sizeof(s->included));
- grpc_sopb_init(&s->outgoing_sopb);
grpc_sopb_init(&s->writing_sopb);
grpc_sopb_init(&s->callback_sopb);
grpc_chttp2_data_parser_init(&s->parser);
@@ -642,7 +653,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock(&t->mu);
- grpc_sopb_destroy(&s->outgoing_sopb);
+ GPR_ASSERT(s->outgoing_sopb == NULL);
grpc_sopb_destroy(&s->writing_sopb);
grpc_sopb_destroy(&s->callback_sopb);
grpc_chttp2_data_parser_destroy(&s->parser);
@@ -708,8 +719,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
- if (id == PENDING_CALLBACKS)
- GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@@ -762,6 +771,8 @@ static void unlock(transport *t) {
finalize_cancellations(t);
}
+ finish_reads(t);
+
/* gather any callbacks that need to be made */
if (!t->calling_back && cb) {
perform_callbacks = prepare_callbacks(t);
@@ -865,22 +876,23 @@ static int prepare_write(transport *t) {
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
s->outgoing_window > 0) {
window_delta = grpc_chttp2_preencode(
- s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
+ s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;
- s->sending_write_closed =
- s->queued_write_closed && s->outgoing_sopb.nops == 0;
- if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
+ if (s->write_state == WRITE_STATE_QUEUED_CLOSE && s->outgoing_sopb->nops == 0) {
+ s->send_closed = 1;
+ }
+ if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
}
- /* if there are still writes to do and the stream still has window
- available, then schedule a further write */
- if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) {
- GPR_ASSERT(!t->outgoing_window);
- stream_list_add_tail(t, s, WRITABLE);
+ /* we should either exhaust window or have no ops left, but not both */
+ GPR_ASSERT(s->outgoing_sopb->nops == 0 || s->outgoing_window <= 0);
+ if (s->outgoing_sopb->nops == 0) {
+ s->outgoing_sopb = NULL;
+ schedule_cb(t, s->send_done_closure, 1);
}
}
@@ -912,10 +924,10 @@ static void finalize_outbuf(transport *t) {
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
- s->sending_write_closed, s->id, &t->hpack_compressor,
+ s->send_closed, s->id, &t->hpack_compressor,
&t->outbuf);
s->writing_sopb.nops = 0;
- if (s->sending_write_closed) {
+ if (s->send_closed) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
@@ -929,8 +941,10 @@ static void finish_write_common(transport *t, int success) {
drop_connection(t);
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
- s->sent_write_closed = 1;
- if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
+ s->write_state = WRITE_STATE_SENT_CLOSE;
+ if (!s->cancelled) {
+ maybe_finish_read(t, s);
+ }
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@@ -988,43 +1002,57 @@ static void maybe_start_some_streams(transport *t) {
}
}
-static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
- size_t ops_count, int is_last) {
+static void perform_op(grpc_transport *gt, grpc_stream *gs, grpc_transport_op *op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
lock(t);
- if (is_last) {
- s->queued_write_closed = 1;
- }
- if (!s->cancelled) {
- grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
- if (s->id == 0) {
- stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
- maybe_start_some_streams(t);
+ if (op->send_ops) {
+ GPR_ASSERT(s->outgoing_sopb == NULL);
+ s->send_done_closure.cb = op->on_done_send;
+ s->send_done_closure.user_data = op->send_user_data;
+ if (!s->cancelled) {
+ s->outgoing_sopb = op->send_ops;
+ if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
+ s->write_state = WRITE_STATE_QUEUED_CLOSE;
+ }
+ if (s->id == 0) {
+ stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
+ maybe_start_some_streams(t);
+ } else if (s->outgoing_window > 0) {
+ stream_list_join(t, s, WRITABLE);
+ }
} else {
- stream_list_join(t, s, WRITABLE);
+ schedule_nuke_sopb(t, op->send_ops);
+ schedule_cb(t, s->send_done_closure, 0);
}
- } else {
- grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
}
- if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
- !s->published_close) {
- stream_list_join(t, s, PENDING_CALLBACKS);
+
+ if (op->recv_ops) {
+ GPR_ASSERT(s->incoming_sopb == NULL);
+ s->recv_done_closure.cb = op->on_done_recv;
+ s->recv_done_closure.user_data = op->recv_user_data;
+ if (!s->cancelled) {
+ s->incoming_sopb = op->recv_ops;
+ s->incoming_sopb->nops = 0;
+ s->publish_state = op->recv_state;
+ maybe_finish_read(t, s);
+ maybe_join_window_updates(t, s);
+ } else {
+ schedule_cb(t, s->recv_done_closure, 0);
+ }
}
- unlock(t);
-}
+ if (op->bind_pollset) {
+ add_to_pollset_locked(t, op->bind_pollset);
+ }
-static void abort_stream(grpc_transport *gt, grpc_stream *gs,
- grpc_status_code status) {
- transport *t = (transport *)gt;
- stream *s = (stream *)gs;
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ cancel_stream(t, s, op->cancel_with_status, grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
+ 1);
+ }
- lock(t);
- cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
- 1);
unlock(t);
}
@@ -1063,8 +1091,8 @@ static void finalize_cancellations(transport *t) {
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->read_closed = 1;
- s->sent_write_closed = 1;
- stream_list_join(t, s, PENDING_CALLBACKS);
+ s->write_state = WRITE_STATE_SENT_CLOSE;
+ maybe_finish_read(t, s);
}
}
@@ -1088,12 +1116,15 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
- had_outgoing = s->outgoing_sopb.nops != 0;
+ had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
- schedule_nuke_sopb(t, &s->outgoing_sopb);
+ if (s->outgoing_sopb) {
+ schedule_nuke_sopb(t, s->outgoing_sopb);
+ schedule_cb(t, s->send_done_closure, 0);
+ }
if (s->cancelled) {
send_rst = 0;
- } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
+ } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE || had_outgoing) {
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);
@@ -1111,7 +1142,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
break;
}
- stream_list_join(t, s, PENDING_CALLBACKS);
+ maybe_finish_read(t, s);
}
}
if (!id) send_rst = 0;
@@ -1150,8 +1181,14 @@ static void drop_connection(transport *t) {
end_all_the_calls(t);
}
+static void maybe_finish_read(transport *t, stream *s) {
+ if (s->incoming_sopb) {
+ stream_list_join(t, s, FINISHED_READ_OP);
+ }
+}
+
static void maybe_join_window_updates(transport *t, stream *s) {
- if (s->allow_window_updates &&
+ if (s->incoming_sopb != NULL &&
s->incoming_window <
t->settings[LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
@@ -1160,6 +1197,7 @@ static void maybe_join_window_updates(transport *t, stream *s) {
}
}
+#if 0
static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
int allow) {
transport *t = (transport *)tp;
@@ -1174,6 +1212,7 @@ static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
}
unlock(t);
}
+#endif
static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
if (t->incoming_frame_size > t->incoming_window) {
@@ -1271,7 +1310,6 @@ static void on_header(void *tp, grpc_mdelem *md) {
grpc_mdstr_as_c_string(md->key),
grpc_mdstr_as_c_string(md->value)));
- stream_list_join(t, s, PENDING_CALLBACKS);
if (md->key == t->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
if (!cached_timeout) {
@@ -1290,6 +1328,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
} else {
add_incoming_metadata(t, s, md);
}
+ maybe_finish_read(t, s);
}
static int init_header_frame_parser(transport *t, int is_continuation) {
@@ -1464,8 +1503,6 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
-static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
-
static void add_metadata_batch(transport *t, stream *s) {
grpc_metadata_batch b;
size_t i;
@@ -1483,8 +1520,7 @@ static void add_metadata_batch(transport *t, stream *s) {
s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
- grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
- s->incoming_metadata);
+ /* TODO(ctiller): don't leak incoming_metadata */
/* reset */
s->incoming_deadline = gpr_inf_future;
@@ -1501,14 +1537,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
case GRPC_CHTTP2_PARSE_OK:
if (st.end_of_stream) {
t->incoming_stream->read_closed = 1;
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.need_flush_reads) {
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.metadata_boundary) {
add_metadata_batch(t, t->incoming_stream);
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.ack_settings) {
gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
@@ -1549,7 +1585,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
int was_window_empty = s->outgoing_window <= 0;
s->outgoing_window += st.initial_window_update;
if (was_window_empty && s->outgoing_window > 0 &&
- s->outgoing_sopb.nops > 0) {
+ s->outgoing_sopb && s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
@@ -1568,7 +1604,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
flag that */
- if (was_window_empty && s->outgoing_sopb.nops) {
+ if (was_window_empty && s->outgoing_sopb && s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
@@ -1830,53 +1866,71 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN;
}
-static int prepare_callbacks(transport *t) {
+static void finish_reads(transport *t) {
stream *s;
- int n = 0;
- while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
- int execute = 1;
-
- s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
- if (s->callback_state == GRPC_STREAM_CLOSED) {
- remove_from_stream_map(t, s);
- if (s->published_close) {
- execute = 0;
- } else if (s->incoming_metadata_count) {
- add_metadata_batch(t, s);
- }
- s->published_close = 1;
- }
- grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
-
- if (execute) {
- stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
- n = 1;
+ while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
+ int publish = 0;
+ GPR_ASSERT(s->incoming_sopb);
+ *s->publish_state = compute_state(s->write_state == WRITE_STATE_SENT_CLOSE,
+ s->read_closed);
+ if (*s->publish_state != s->published_state) {
+ s->published_state = *s->publish_state;
+ publish = 1;
+ }
+ if (s->parser.incoming_sopb.nops > 0) {
+ grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
+ publish = 1;
}
+ if (publish) {
+ schedule_cb(t, s->recv_done_closure, 1);
+ }
+ }
+}
+
+static void schedule_cb(transport *t, op_closure closure, int success) {
+ if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
+ t->pending_callbacks.capacity = GPR_MAX(t->pending_callbacks.capacity * 2, 8);
+ t->pending_callbacks.callbacks = gpr_realloc(t->pending_callbacks.callbacks, t->pending_callbacks.capacity * sizeof(*t->pending_callbacks.callbacks));
}
- return n;
+ closure.success = success;
+ t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
+}
+
+static int prepare_callbacks(transport *t) {
+ op_closure_array temp = t->pending_callbacks;
+ t->pending_callbacks = t->executing_callbacks;
+ t->executing_callbacks = temp;
+ return t->executing_callbacks.count > 0;
}
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
- stream *s;
- while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
- size_t nops = s->callback_sopb.nops;
- s->callback_sopb.nops = 0;
- cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
- s->callback_sopb.ops, nops, s->callback_state);
+ size_t i;
+ for (i = 0; i < t->executing_callbacks.count; i++) {
+ op_closure c = t->executing_callbacks.callbacks[i];
+ c.cb(c.user_data, c.success);
}
+ t->executing_callbacks.count = 0;
}
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
cb->closed(t->cb_user_data, &t->base);
}
-static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
- transport *t = (transport *)gt;
- lock(t);
+/*
+ * POLLSET STUFF
+ */
+
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
if (t->ep) {
grpc_endpoint_add_to_pollset(t->ep, pollset);
}
+}
+
+static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
+ transport *t = (transport *)gt;
+ lock(t);
+ add_to_pollset_locked(t, pollset);
unlock(t);
}
@@ -1885,8 +1939,8 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
- sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
+ sizeof(stream), init_stream, perform_op,
+ add_to_pollset, destroy_stream, goaway, close_transport,
send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 882c078d51..ea22b0e1c8 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -81,9 +81,6 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
case GRPC_OP_METADATA:
grpc_metadata_batch_destroy(&ops[i].data.metadata);
break;
- case GRPC_OP_FLOW_CTL_CB:
- ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR);
- break;
case GRPC_NO_OP:
case GRPC_OP_BEGIN_MESSAGE:
break;
@@ -119,6 +116,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
assert_contained_metadata_ok(sopb->ops, sopb->nops);
+ GPR_ASSERT(sopb->nops <= sopb->capacity);
if (sopb->nops == sopb->capacity) {
expandto(sopb, GROW(sopb->capacity));
}
@@ -158,16 +156,6 @@ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_FLOW_CTL_CB;
- op->data.flow_ctl_cb.cb = cb;
- op->data.flow_ctl_cb.arg = arg;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
-}
-
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops) {
size_t orig_nops = sopb->nops;
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 20146b9af2..f5de64d583 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -55,9 +55,7 @@ typedef enum grpc_stream_op_code {
GRPC_OP_BEGIN_MESSAGE,
/* Add a slice of data to the current message/metadata element/status.
Must not overflow the forward declared length. */
- GRPC_OP_SLICE,
- /* Call some function once this operation has passed flow control. */
- GRPC_OP_FLOW_CTL_CB
+ GRPC_OP_SLICE
} grpc_stream_op_code;
/* Arguments for GRPC_OP_BEGIN */
@@ -68,12 +66,6 @@ typedef struct grpc_begin_message {
gpr_uint32 flags;
} grpc_begin_message;
-/* Arguments for GRPC_OP_FLOW_CTL_CB */
-typedef struct grpc_flow_ctl_cb {
- void (*cb)(void *arg, grpc_op_error error);
- void *arg;
-} grpc_flow_ctl_cb;
-
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
@@ -129,7 +121,6 @@ typedef struct grpc_stream_op {
grpc_begin_message begin_message;
grpc_metadata_batch metadata;
gpr_slice slice;
- grpc_flow_ctl_cb flow_ctl_cb;
} data;
} grpc_stream_op;
@@ -163,12 +154,10 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
-/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb);
+
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index ef0020dc58..35195348e7 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -56,14 +56,9 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
return transport->vtable->init_stream(transport, stream, server_data);
}
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
- grpc_stream_op *ops, size_t nops, int is_last) {
- transport->vtable->send_batch(transport, stream, ops, nops, is_last);
-}
-
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
- grpc_stream *stream, int allow) {
- transport->vtable->set_allow_window_updates(transport, stream, allow);
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ grpc_transport_op *op) {
+ transport->vtable->perform_op(transport, stream, op);
}
void grpc_transport_add_to_pollset(grpc_transport *transport,
@@ -76,11 +71,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream);
}
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status) {
- transport->vtable->abort_stream(transport, stream, status);
-}
-
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data) {
transport->vtable->ping(transport, cb, user_data);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index ce8c17c322..d0007680e3 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -62,24 +62,6 @@ typedef enum grpc_stream_state {
/* Callbacks made from the transport to the upper layers of grpc. */
struct grpc_transport_callbacks {
- /* Allocate a buffer to receive data into.
- It's safe to call grpc_slice_new() to do this, but performance minded
- proxies may want to carefully place data into optimal locations for
- transports.
- This function must return a valid, non-empty slice.
-
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- stream - the grpc_stream instance the buffer will be used for, or
- NULL if this is not known
- size_hint - how big of a buffer would the transport optimally like?
- the actual returned buffer can be smaller or larger than
- size_hint as the implementation finds convenient */
- struct gpr_slice (*alloc_recv_buffer)(void *user_data,
- grpc_transport *transport,
- grpc_stream *stream, size_t size_hint);
-
/* Initialize a new stream on behalf of the transport.
Must result in a call to
grpc_transport_init_stream(transport, ..., request) in the same call
@@ -96,30 +78,7 @@ struct grpc_transport_callbacks {
void (*accept_stream)(void *user_data, grpc_transport *transport,
const void *server_data);
- /* Process a set of stream ops that have been received by the transport.
- Called by network threads, so must be careful not to block on network
- activity.
-
- If final_state == GRPC_STREAM_CLOSED, the upper layers should arrange to
- call grpc_transport_destroy_stream.
-
- Ownership of any objects contained in ops is transferred to the callee.
-
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- stream - the stream this data was received for
- ops - stream operations that are part of this batch
- ops_count - the number of stream operations in this batch
- final_state - the state of the stream as of the final operation in this
- batch */
- void (*recv_batch)(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops, size_t ops_count,
- grpc_stream_state final_state);
-
- /* The transport received a goaway */
- void (*goaway)(void *user_data, grpc_transport *transport,
- grpc_status_code status, gpr_slice debug);
+ void (*goaway)(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug);
/* The transport has been closed */
void (*closed)(void *user_data, grpc_transport *transport);
@@ -154,20 +113,29 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream);
-/* Enable/disable incoming data for a stream.
+/* Transport op: a set of operations to perform on a transport */
+typedef struct grpc_transport_op {
+ grpc_stream_op_buffer *send_ops;
+ int is_last_send;
+ void (*on_done_send)(void *user_data, int success);
+ void *send_user_data;
- This effectively disables new window becoming available for a given stream,
- but does not prevent existing window from being consumed by a sender: the
- caller must still be prepared to receive some additional data after this
- call.
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
- Arguments:
- transport - the transport on which to create this stream
- stream - the grpc_stream to destroy (memory is still owned by the
- caller, but any child memory must be cleaned up)
- allow - is it allowed that new window be opened up? */
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
- grpc_stream *stream, int allow);
+ grpc_pollset *bind_pollset;
+
+ grpc_status_code cancel_with_status;
+} grpc_transport_op;
+
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
+
+/* TODO(ctiller): remove this */
+void grpc_transport_add_to_pollset(grpc_transport *transport, grpc_pollset *pollset);
+
+char *grpc_transport_op_string(grpc_transport_op *op);
/* Send a batch of operations on a transport
@@ -177,13 +145,9 @@ void grpc_transport_set_allow_window_updates(grpc_transport *transport,
transport - the transport on which to initiate the stream
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
- ops - an array of operations to apply to the stream - can be NULL
- if ops_count == 0.
- ops_count - the number of elements in ops
- is_last - is this the last batch of operations to be sent out */
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
- grpc_stream_op *ops, size_t ops_count,
- int is_last);
+ op - a grpc_transport_op specifying the op to perform */
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ grpc_transport_op *op);
/* Send a ping on a transport
@@ -193,19 +157,6 @@ void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data);
-/* Abort a stream
-
- Terminate reading and writing for a stream. A final recv_batch with no
- operations and final_state == GRPC_STREAM_CLOSED will be received locally,
- and no more data will be presented to the up-layer.
-
- TODO(ctiller): consider adding a HTTP/2 reason to this function. */
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status);
-
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset);
-
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
gpr_slice debug_data);
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index ac275c7560..ef79ac0741 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -46,12 +46,8 @@ typedef struct grpc_transport_vtable {
const void *server_data);
/* implementation of grpc_transport_send_batch */
- void (*send_batch)(grpc_transport *self, grpc_stream *stream,
- grpc_stream_op *ops, size_t ops_count, int is_last);
-
- /* implementation of grpc_transport_set_allow_window_updates */
- void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream,
- int allow);
+ void (*perform_op)(grpc_transport *self, grpc_stream *stream,
+ grpc_transport_op *op);
/* implementation of grpc_transport_add_to_pollset */
void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
@@ -59,10 +55,6 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
- /* implementation of grpc_transport_abort_stream */
- void (*abort_stream)(grpc_transport *self, grpc_stream *stream,
- grpc_status_code status);
-
/* implementation of grpc_transport_goaway */
void (*goaway)(grpc_transport *self, grpc_status_code status,
gpr_slice debug_data);
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
new file mode 100644
index 0000000000..e886690234
--- /dev/null
+++ b/src/core/transport/transport_op_string.c
@@ -0,0 +1,152 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
+ gpr_strvec_add(b, gpr_strdup(" key="));
+ gpr_strvec_add(
+ b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
+
+ gpr_strvec_add(b, gpr_strdup(" value="));
+ gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
+ GPR_SLICE_LENGTH(md->value->slice),
+ GPR_HEXDUMP_PLAINTEXT));
+}
+
+static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
+ grpc_linked_mdelem *m;
+ for (m = md.list.head; m != NULL; m = m->next) {
+ put_metadata(b, m->md);
+ }
+ if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ char *tmp;
+ gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
+ md.deadline.tv_nsec);
+ gpr_strvec_add(b, tmp);
+ }
+}
+
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
+ char *out;
+ char *tmp;
+ size_t i;
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op *op = &sopb->ops[i];
+ if (i) gpr_strvec_add(&b, gpr_strdup(", "));
+ switch (op->type) {
+ case GRPC_NO_OP:
+ gpr_strvec_add(&b, gpr_strdup("NO_OP"));
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_SLICE:
+ gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice));
+ break;
+ case GRPC_OP_METADATA:
+ put_metadata_list(&b, op->data.metadata);
+ break;
+ }
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+char *grpc_transport_op_string(grpc_transport_op *op) {
+ char *tmp;
+ char *out;
+ int first = 1;
+
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ if (op->send_ops) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("SEND"));
+ if (op->is_last_send) {
+ gpr_strvec_add(&b, gpr_strdup("_LAST"));
+ }
+ gpr_strvec_add(&b, gpr_strdup("["));
+ gpr_strvec_add(&b, grpc_sopb_string(op->send_ops));
+ gpr_strvec_add(&b, gpr_strdup("]"));
+ }
+
+ if (op->recv_ops) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("RECV"));
+ }
+
+ if (op->bind_pollset) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("BIND"));
+ }
+
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
+ gpr_strvec_add(&b, tmp);
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_transport_op *op) {
+ char *str = grpc_transport_op_string(op);
+ gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
+ gpr_free(str);
+}