aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Abhishek Kumar <abhikumar@google.com>2015-04-29 11:16:38 -0700
committerGravatar Abhishek Kumar <abhikumar@google.com>2015-04-29 11:16:38 -0700
commitd7ec8a3dd0ad389a37ce0dd7ed140483785b293b (patch)
treed950eafaa380dccbdfed70c0ac985328116cacbf /src
parent44347960733562f938b8e51e1c1a0ba90335dea3 (diff)
parent1d73b7bffd8f4036a782689ccb3a255953af9f4c (diff)
Merge pull request #1369 from ctiller/one-pass
Unify transport & channel op interfaces
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/census_filter.c108
-rw-r--r--src/core/channel/channel_stack.c48
-rw-r--r--src/core/channel/channel_stack.h84
-rw-r--r--src/core/channel/child_channel.c36
-rw-r--r--src/core/channel/child_channel.h5
-rw-r--r--src/core/channel/client_channel.c258
-rw-r--r--src/core/channel/connected_channel.c305
-rw-r--r--src/core/channel/http_client_filter.c85
-rw-r--r--src/core/channel/http_filter.c137
-rw-r--r--src/core/channel/http_filter.h43
-rw-r--r--src/core/channel/http_server_filter.c185
-rw-r--r--src/core/channel/noop_filter.c39
-rw-r--r--src/core/security/auth.c77
-rw-r--r--src/core/security/server_secure_chttp2.c5
-rw-r--r--src/core/surface/call.c574
-rw-r--r--src/core/surface/call.h24
-rw-r--r--src/core/surface/channel.c27
-rw-r--r--src/core/surface/channel.h3
-rw-r--r--src/core/surface/channel_create.c5
-rw-r--r--src/core/surface/client.c36
-rw-r--r--src/core/surface/completion_queue.c4
-rw-r--r--src/core/surface/lame_client.c61
-rw-r--r--src/core/surface/secure_channel_create.c5
-rw-r--r--src/core/surface/server.c147
-rw-r--r--src/core/surface/server_chttp2.c5
-rw-r--r--src/core/transport/chttp2/stream_encoder.c5
-rw-r--r--src/core/transport/chttp2_transport.c499
-rw-r--r--src/core/transport/stream_op.c38
-rw-r--r--src/core/transport/stream_op.h46
-rw-r--r--src/core/transport/transport.c42
-rw-r--r--src/core/transport/transport.h107
-rw-r--r--src/core/transport/transport_impl.h16
-rw-r--r--src/core/transport/transport_op_string.c164
-rw-r--r--src/ruby/ext/grpc/rb_call.c122
34 files changed, 1573 insertions, 1772 deletions
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index 9c0c20af22..7e393a01a6 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -49,6 +49,11 @@ typedef struct call_data {
census_op_id op_id;
census_rpc_stats stats;
gpr_timespec start_ts;
+
+ /* recv callback */
+ grpc_stream_op_buffer* recv_ops;
+ void (*on_done_recv)(void* user_data, int success);
+ void* recv_user_data;
} call_data;
typedef struct channel_data {
@@ -60,57 +65,68 @@ static void init_rpc_stats(census_rpc_stats* stats) {
stats->cnt = 1;
}
-static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
+static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
+ call_data* calld,
channel_data* chand) {
grpc_linked_mdelem* m;
- for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
- if (m->md->key == chand->path_str) {
- gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
- census_add_method_tag(
- calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ size_t i;
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op* op = &sopb->ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
+ if (m->md->key == chand->path_str) {
+ gpr_log(GPR_DEBUG, "%s",
+ (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
+ m->md->value->slice));
+ }
}
}
}
-static void client_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
- GPR_ASSERT(calld != NULL);
- GPR_ASSERT(chand != NULL);
- GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_SEND_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_RECV_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (op->send_ops) {
+ extract_and_annotate_method_tag(op->send_ops, calld, chand);
}
- /* Always pass control up or down the stack depending on op->dir */
+}
+
+static void client_start_transport_op(grpc_call_element* elem,
+ grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+ client_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
-static void server_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void server_on_done_recv(void* ptr, int success) {
+ grpc_call_element* elem = ptr;
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
- GPR_ASSERT(calld != NULL);
- GPR_ASSERT(chand != NULL);
- GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_RECV_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_SEND_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (success) {
+ extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
}
- /* Always pass control up or down the stack depending on op->dir */
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ if (op->recv_ops) {
+ /* substitute our callback for the op callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_done_recv;
+ op->recv_user_data = elem;
+ }
+}
+
+static void server_start_transport_op(grpc_call_element* elem,
+ grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+ server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@@ -128,12 +144,14 @@ static void channel_op(grpc_channel_element* elem,
}
static void client_init_call_elem(grpc_call_element* elem,
- const void* server_transport_data) {
+ const void* server_transport_data,
+ grpc_transport_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
d->start_ts = gpr_now();
d->op_id = census_tracing_start_op();
+ if (initial_op) client_mutate_op(elem, initial_op);
}
static void client_destroy_call_elem(grpc_call_element* elem) {
@@ -144,12 +162,14 @@ static void client_destroy_call_elem(grpc_call_element* elem) {
}
static void server_init_call_elem(grpc_call_element* elem,
- const void* server_transport_data) {
+ const void* server_transport_data,
+ grpc_transport_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
d->start_ts = gpr_now();
d->op_id = census_tracing_start_op();
+ if (initial_op) server_mutate_op(elem, initial_op);
}
static void server_destroy_call_elem(grpc_call_element* elem) {
@@ -180,11 +200,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
- client_call_op, channel_op, sizeof(call_data), client_init_call_elem,
- client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "census-client"};
+ client_start_transport_op, channel_op, sizeof(call_data),
+ client_init_call_elem, client_destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
- server_call_op, channel_op, sizeof(call_data), server_init_call_elem,
- server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "census-server"};
+ server_start_transport_op, channel_op, sizeof(call_data),
+ server_init_call_elem, server_destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "census-server"};
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 3a3a3a75b7..311f4f08ce 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -35,6 +35,7 @@
#include <grpc/support/log.h>
#include <stdlib.h>
+#include <string.h>
int grpc_trace_channel = 0;
@@ -147,6 +148,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack) {
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
+ grpc_transport_op *initial_op,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
@@ -164,7 +166,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack,
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data);
+ call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data,
+ initial_op);
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
@@ -181,12 +184,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
}
}
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
- grpc_call_element *next_elem = elem + op->dir;
- if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) {
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- }
- next_elem->filter->call_op(next_elem, elem, op);
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
+ grpc_call_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_op(next_elem, op);
}
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
@@ -205,39 +205,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
-static void do_nothing(void *user_data, grpc_op_error error) {}
-
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
- grpc_call_op cancel_op;
- cancel_op.type = GRPC_CANCEL_OP;
- cancel_op.dir = GRPC_CALL_DOWN;
- cancel_op.done_cb = do_nothing;
- cancel_op.user_data = NULL;
- cancel_op.flags = 0;
- cancel_op.bind_pollset = NULL;
- grpc_call_next_op(cur_elem, &cancel_op);
-}
-
-void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
- grpc_call_op finish_op;
- finish_op.type = GRPC_SEND_FINISH;
- finish_op.dir = GRPC_CALL_DOWN;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- finish_op.flags = 0;
- finish_op.bind_pollset = NULL;
- grpc_call_next_op(cur_elem, &finish_op);
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ grpc_call_next_op(cur_elem, &op);
}
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message) {
- grpc_call_op op;
- op.type = GRPC_RECV_SYNTHETIC_STATUS;
- op.dir = GRPC_CALL_UP;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.data.synthetic_status.status = status;
- op.data.synthetic_status.message = message;
- grpc_call_next_op(cur_elem, &op);
+ abort();
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index addc92b272..de0e4e4518 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,78 +51,11 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
-/* Call operations - things that can be sent and received.
-
- Threading:
- SEND, RECV, and CANCEL ops can be active on a call at the same time, but
- only one SEND, one RECV, and one CANCEL can be active at a time.
-
- If state is shared between send/receive/cancel operations, it is up to
- filters to provide their own protection around that. */
-typedef enum {
- /* send metadata to the channels peer */
- GRPC_SEND_METADATA,
- /* send a message to the channels peer */
- GRPC_SEND_MESSAGE,
- /* send a pre-formatted message to the channels peer */
- GRPC_SEND_PREFORMATTED_MESSAGE,
- /* send half-close to the channels peer */
- GRPC_SEND_FINISH,
- /* request that more data be allowed through flow control */
- GRPC_REQUEST_DATA,
- /* metadata was received from the channels peer */
- GRPC_RECV_METADATA,
- /* a message was received from the channels peer */
- GRPC_RECV_MESSAGE,
- /* half-close was received from the channels peer */
- GRPC_RECV_HALF_CLOSE,
- /* full close was received from the channels peer */
- GRPC_RECV_FINISH,
- /* a status has been sythesized locally */
- GRPC_RECV_SYNTHETIC_STATUS,
- /* the call has been abnormally terminated */
- GRPC_CANCEL_OP
-} grpc_call_op_type;
-
/* The direction of the call.
The values of the enums (1, -1) matter here - they are used to increment
or decrement a pointer to find the next element to call */
typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
-/* A single filterable operation to be performed on a call */
-typedef struct {
- /* The type of operation we're performing */
- grpc_call_op_type type;
- /* The directionality of this call - does the operation begin at the bottom
- of the stack and flow up, or does the operation start at the top of the
- stack and flow down through the filters. */
- grpc_call_dir dir;
-
- /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */
- gpr_uint32 flags;
-
- /* Argument data, matching up with grpc_call_op_type names */
- union {
- grpc_byte_buffer *message;
- grpc_metadata_batch metadata;
- struct {
- grpc_status_code status;
- const char *message;
- } synthetic_status;
- } data;
-
- grpc_pollset *bind_pollset;
-
- /* Must be called when processing of this call-op is complete.
- Signature chosen to match transport flow control callbacks */
- void (*done_cb)(void *user_data, grpc_op_error error);
- /* User data to be passed into done_cb */
- void *user_data;
-} grpc_call_op;
-
-/* returns a string representation of op, that can be destroyed with gpr_free */
-char *grpc_call_op_string(grpc_call_op *op);
-
typedef enum {
/* send a goaway message to remote channels indicating that we are going
to disconnect in the future */
@@ -170,8 +103,7 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op);
+ void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@@ -189,7 +121,8 @@ typedef struct {
transport and is on the server. Most filters want to ignore this
argument.*/
void (*init_call_elem)(grpc_call_element *elem,
- const void *server_transport_data);
+ const void *server_transport_data,
+ grpc_transport_op *initial_op);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_call_element *elem);
@@ -268,12 +201,13 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack);
server. */
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
+ grpc_transport_op *initial_op,
grpc_call_stack *call_stack);
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);
-/* Call the next operation (depending on call directionality) in a call stack */
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op);
+/* Call the next operation in a call stack */
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
@@ -285,13 +219,9 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_call_op *op);
+ grpc_call_element *elem, grpc_transport_op *op);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
-void grpc_call_element_send_finish(grpc_call_element *cur_elem);
-void grpc_call_element_recv_status(grpc_call_element *cur_elem,
- grpc_status_code status,
- const char *message);
extern int grpc_trace_channel;
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 2cb03829c7..a2f3c54290 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -60,23 +60,11 @@ typedef struct {
gpr_uint8 sent_farewell;
} lb_channel_data;
-typedef struct {
- grpc_call_element *back;
- grpc_child_channel *channel;
-} lb_call_data;
-
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- lb_call_data *calld = elem->call_data;
+typedef struct { grpc_child_channel *channel; } lb_call_data;
- switch (op->dir) {
- case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_call_next_op(elem, op);
- break;
- }
+static void lb_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ grpc_call_next_op(elem, op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -132,7 +120,8 @@ static void lb_channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void lb_init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {}
/* Destructor for call_data */
static void lb_destroy_call_elem(grpc_call_element *elem) {}
@@ -165,9 +154,10 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_child_channel_top_filter = {
- lb_call_op, lb_channel_op, sizeof(lb_call_data),
- lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
- lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", };
+ lb_start_transport_op, lb_channel_op, sizeof(lb_call_data),
+ lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
+ lb_init_channel_elem, lb_destroy_channel_elem, "child-channel",
+};
/* grpc_child_channel proper */
@@ -272,17 +262,17 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel,
}
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
- grpc_call_element *parent) {
+ grpc_call_element *parent,
+ grpc_transport_op *initial_op) {
grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
grpc_call_element *lbelem;
lb_call_data *lbcalld;
lb_channel_data *lbchand;
- grpc_call_stack_init(channel, NULL, stk);
+ grpc_call_stack_init(channel, NULL, initial_op, stk);
lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
lbchand = lbelem->channel_data;
lbcalld = lbelem->call_data;
- lbcalld->back = parent;
lbcalld->channel = channel;
gpr_mu_lock(&lbchand->mu);
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
index 38695402ab..556a1c731c 100644
--- a/src/core/channel/child_channel.h
+++ b/src/core/channel/child_channel.h
@@ -57,8 +57,9 @@ void grpc_child_channel_destroy(grpc_child_channel *channel,
int wait_for_callbacks);
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
- grpc_call_element *parent);
+ grpc_call_element *parent,
+ grpc_transport_op *initial_op);
grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
void grpc_child_call_destroy(grpc_child_call *call);
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index bc481e59ca..78f8d06d89 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -58,6 +58,7 @@ typedef struct {
/* the sending child (may be null) */
grpc_child_channel *active_child;
+ grpc_mdctx *mdctx;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
@@ -82,8 +83,6 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
- gpr_uint8 got_first_send;
-
call_state state;
gpr_timespec deadline;
union {
@@ -91,7 +90,11 @@ struct call_data {
/* our child call stack */
grpc_child_call *child_call;
} active;
- grpc_call_op waiting_op;
+ grpc_transport_op waiting_op;
+ struct {
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
+ } cancelled;
} s;
};
@@ -105,14 +108,14 @@ static int prepare_activate(grpc_call_element *elem,
calld->state = CALL_ACTIVE;
/* create a child call */
- calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
+ /* TODO(ctiller): pass the waiting op down here */
+ calld->s.active.child_call =
+ grpc_child_channel_create_call(on_child, elem, NULL);
return 1;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
-static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
@@ -121,57 +124,7 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
- child_elem->filter->call_op(child_elem, elem, op);
-}
-
-static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu);
- if (calld->state == CALL_CANCELLED) {
- gpr_mu_unlock(&chand->mu);
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
- return;
- }
- GPR_ASSERT(calld->state == CALL_CREATED);
- calld->state = CALL_WAITING;
- if (chand->active_child) {
- /* channel is connected - use the connected stack */
- if (prepare_activate(elem, chand->active_child)) {
- gpr_mu_unlock(&chand->mu);
- /* activate the request (pass it down) outside the lock */
- complete_activate(elem, op);
- } else {
- gpr_mu_unlock(&chand->mu);
- }
- } else {
- /* check to see if we should initiate a connection (if we're not already),
- but don't do so until outside the lock to avoid re-entrancy problems if
- the callback is immediate */
- int initiate_transport_setup = 0;
- if (!chand->transport_setup_initiated) {
- chand->transport_setup_initiated = 1;
- initiate_transport_setup = 1;
- }
- /* add this call to the waiting set to be resumed once we have a child
- channel stack, growing the waiting set if needed */
- if (chand->waiting_child_count == chand->waiting_child_capacity) {
- chand->waiting_child_capacity =
- GPR_MAX(chand->waiting_child_capacity * 2, 8);
- chand->waiting_children =
- gpr_realloc(chand->waiting_children,
- chand->waiting_child_capacity * sizeof(call_data *));
- }
- calld->s.waiting_op = *op;
- chand->waiting_children[chand->waiting_child_count++] = calld;
- gpr_mu_unlock(&chand->mu);
-
- /* finally initiate transport setup if needed */
- if (initiate_transport_setup) {
- grpc_transport_setup_initiate(chand->transport_setup);
- }
- }
+ child_elem->filter->start_transport_op(child_elem, op);
}
static void remove_waiting_child(channel_data *chand, call_data *calld) {
@@ -186,85 +139,128 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
chand->waiting_child_count = new_count;
}
-static void send_up_cancelled_ops(grpc_call_element *elem) {
- grpc_call_op finish_op;
- /* send up a synthesized status */
- grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
- /* send up a finish */
- finish_op.type = GRPC_RECV_FINISH;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+static void handle_op_after_cancellation(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ if (op->send_ops) {
+ op->on_done_send(op->send_user_data, 0);
+ }
+ if (op->recv_ops) {
+ char status[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa(GRPC_STATUS_CANCELLED, status);
+ calld->s.cancelled.status.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
+ calld->s.cancelled.details.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
+ calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
+ calld->s.cancelled.status.next = &calld->s.cancelled.details;
+ calld->s.cancelled.details.prev = &calld->s.cancelled.status;
+ mdb.list.head = &calld->s.cancelled.status;
+ mdb.list.tail = &calld->s.cancelled.details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future;
+ grpc_sopb_add_metadata(op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv(op->recv_user_data, 1);
+ }
}
-static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void cc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
+ grpc_transport_op waiting_op;
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
gpr_mu_lock(&chand->mu);
switch (calld->state) {
case CALL_ACTIVE:
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
- child_elem->filter->call_op(child_elem, elem, op);
- return; /* early out */
- case CALL_WAITING:
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
- remove_waiting_child(chand, calld);
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
- calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
- return; /* early out */
+ child_elem->filter->start_transport_op(child_elem, op);
+ break;
case CALL_CREATED:
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
- return; /* early out */
- case CALL_CANCELLED:
- gpr_mu_unlock(&chand->mu);
- return; /* early out */
- }
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
-}
-
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- call_data *calld = elem->call_data;
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- if (!calld->got_first_send) {
- /* filter out the start event to find which child to send on */
- calld->got_first_send = 1;
- start_rpc(elem, op);
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
} else {
- grpc_call_next_op(elem, op);
+ calld->state = CALL_WAITING;
+ if (chand->active_child) {
+ /* channel is connected - use the connected stack */
+ if (prepare_activate(elem, chand->active_child)) {
+ gpr_mu_unlock(&chand->mu);
+ /* activate the request (pass it down) outside the lock */
+ complete_activate(elem, op);
+ } else {
+ gpr_mu_unlock(&chand->mu);
+ }
+ } else {
+ /* check to see if we should initiate a connection (if we're not
+ already),
+ but don't do so until outside the lock to avoid re-entrancy
+ problems if
+ the callback is immediate */
+ int initiate_transport_setup = 0;
+ if (!chand->transport_setup_initiated) {
+ chand->transport_setup_initiated = 1;
+ initiate_transport_setup = 1;
+ }
+ /* add this call to the waiting set to be resumed once we have a child
+ channel stack, growing the waiting set if needed */
+ if (chand->waiting_child_count == chand->waiting_child_capacity) {
+ chand->waiting_child_capacity =
+ GPR_MAX(chand->waiting_child_capacity * 2, 8);
+ chand->waiting_children = gpr_realloc(
+ chand->waiting_children,
+ chand->waiting_child_capacity * sizeof(call_data *));
+ }
+ calld->s.waiting_op = *op;
+ chand->waiting_children[chand->waiting_child_count++] = calld;
+ gpr_mu_unlock(&chand->mu);
+
+ /* finally initiate transport setup if needed */
+ if (initiate_transport_setup) {
+ grpc_transport_setup_initiate(chand->transport_setup);
+ }
+ }
}
break;
- case GRPC_CANCEL_OP:
- cancel_rpc(elem, op);
- break;
- case GRPC_SEND_MESSAGE:
- case GRPC_SEND_FINISH:
- case GRPC_REQUEST_DATA:
- if (calld->state == CALL_ACTIVE) {
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
- child_elem->filter->call_op(child_elem, elem, op);
+ case CALL_WAITING:
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ waiting_op = calld->s.waiting_op;
+ remove_waiting_child(chand, calld);
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, &waiting_op);
+ handle_op_after_cancellation(elem, op);
} else {
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
+ (op->send_ops == NULL));
+ GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
+ (op->recv_ops == NULL));
+ if (op->send_ops) {
+ calld->s.waiting_op.send_ops = op->send_ops;
+ calld->s.waiting_op.is_last_send = op->is_last_send;
+ calld->s.waiting_op.on_done_send = op->on_done_send;
+ calld->s.waiting_op.send_user_data = op->send_user_data;
+ }
+ if (op->recv_ops) {
+ calld->s.waiting_op.recv_ops = op->recv_ops;
+ calld->s.waiting_op.recv_state = op->recv_state;
+ calld->s.waiting_op.on_done_recv = op->on_done_recv;
+ calld->s.waiting_op.recv_user_data = op->recv_user_data;
+ }
+ gpr_mu_unlock(&chand->mu);
}
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
break;
}
}
@@ -351,15 +347,18 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
+ /* TODO(ctiller): is there something useful we can do here? */
+ GPR_ASSERT(initial_op == NULL);
+
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->got_first_send = 0;
}
/* Destructor for call_data */
@@ -372,9 +371,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
- if (calld->state == CALL_WAITING) {
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
- }
+ GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@@ -396,6 +393,7 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->transport_setup = NULL;
chand->transport_setup_initiated = 0;
chand->args = grpc_channel_args_copy(args);
+ chand->mdctx = metadata_context;
}
/* Destructor for channel_data */
@@ -417,9 +415,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "client-channel",
+ cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "client-channel",
};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
@@ -436,7 +434,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
- grpc_call_op *call_ops;
+ grpc_transport_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@@ -472,13 +470,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
- call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+ call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
- call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(&call_ops[i]);
}
}
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 711274bfe1..14dda88698 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -45,25 +45,12 @@
#include <grpc/support/slice_buffer.h>
#define MAX_BUFFER_LENGTH 8192
-/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
typedef struct connected_channel_channel_data {
grpc_transport *transport;
- gpr_uint32 max_message_length;
} channel_data;
-typedef struct connected_channel_call_data {
- grpc_call_element *elem;
- grpc_stream_op_buffer outgoing_sopb;
-
- gpr_uint32 max_message_length;
- gpr_uint32 incoming_message_length;
- gpr_uint8 reading_message;
- gpr_uint8 got_read_close;
- gpr_slice_buffer incoming_message;
- gpr_uint32 outgoing_buffer_length_estimate;
-} call_data;
+typedef struct connected_channel_call_data { void *unused; } call_data;
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -72,91 +59,17 @@ typedef struct connected_channel_call_data {
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
(((call_data *)(transport_stream)) - 1)
-/* Copy the contents of a byte buffer into stream ops */
-static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
- grpc_stream_op_buffer *sopb) {
- size_t i;
-
- switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
- gpr_slice_ref(slice);
- grpc_sopb_add_slice(sopb, slice);
- }
- break;
- }
-}
-
-/* Flush queued stream operations onto the transport */
-static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
- call_data *calld, int is_last) {
- size_t nops;
-
- if (op->flags & GRPC_WRITE_BUFFER_HINT) {
- if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
- op->done_cb(op->user_data, GRPC_OP_OK);
- return;
- }
- }
-
- calld->outgoing_buffer_length_estimate = 0;
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
-
- nops = calld->outgoing_sopb.nops;
- calld->outgoing_sopb.nops = 0;
- grpc_transport_send_batch(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- calld->outgoing_sopb.ops, nops, is_last);
-}
-
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void con_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- if (op->bind_pollset) {
- grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
- }
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_MESSAGE:
- grpc_sopb_add_begin_message(&calld->outgoing_sopb,
- grpc_byte_buffer_length(op->data.message),
- op->flags);
- /* fall-through */
- case GRPC_SEND_PREFORMATTED_MESSAGE:
- copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
- calld->outgoing_buffer_length_estimate +=
- (5 + grpc_byte_buffer_length(op->data.message));
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_FINISH:
- end_bufferable_op(op, chand, calld, 1);
- break;
- case GRPC_REQUEST_DATA:
- /* re-arm window updates if they were disarmed by finish_message */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
- break;
- case GRPC_CANCEL_OP:
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_CANCELLED);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -182,23 +95,16 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- calld->elem = elem;
- grpc_sopb_init(&calld->outgoing_sopb);
-
- calld->reading_message = 0;
- calld->got_read_close = 0;
- calld->outgoing_buffer_length_estimate = 0;
- calld->max_message_length = chand->max_message_length;
- gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- server_transport_data);
+ server_transport_data, initial_op);
GPR_ASSERT(r == 0);
}
@@ -207,8 +113,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_sopb_destroy(&calld->outgoing_sopb);
- gpr_slice_buffer_destroy(&calld->incoming_message);
grpc_transport_destroy_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
@@ -218,28 +122,10 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
- size_t i;
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
-
- cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
- if (args) {
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
- if (args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else if (args->args[i].value.integer < 0) {
- gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else {
- cd->max_message_length = args->args[i].value.integer;
- }
- }
- }
- }
}
/* Destructor for channel_data */
@@ -250,15 +136,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
+ con_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "connected",
};
-static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
- grpc_stream *stream, size_t size_hint) {
- return gpr_slice_malloc(size_hint);
-}
-
/* Transport callback to accept a new stream... calls up to handle it */
static void accept_stream(void *user_data, grpc_transport *transport,
const void *transport_server_data) {
@@ -276,168 +158,6 @@ static void accept_stream(void *user_data, grpc_transport *transport,
channel_op(elem, NULL, &op);
}
-static void recv_error(channel_data *chand, call_data *calld, int line,
- const char *message) {
- gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
-
- if (chand->transport) {
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_INVALID_ARGUMENT);
- }
-}
-
-static void do_nothing(void *calldata, grpc_op_error error) {}
-
-static void finish_message(channel_data *chand, call_data *calld) {
- grpc_call_element *elem = calld->elem;
- grpc_call_op call_op;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- /* if we got all the bytes for this message, call up the stack */
- call_op.type = GRPC_RECV_MESSAGE;
- call_op.done_cb = do_nothing;
- /* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
- calld->incoming_message.count);
- gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
-
- /* disable window updates until we get a request more from above */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
-
- GPR_ASSERT(calld->incoming_message.count == 0);
- calld->reading_message = 0;
- grpc_call_next_op(elem, &call_op);
-}
-
-static void got_metadata(grpc_call_element *elem,
- grpc_metadata_batch metadata) {
- grpc_call_op op;
- op.type = GRPC_RECV_METADATA;
- op.dir = GRPC_CALL_UP;
- op.flags = 0;
- op.data.metadata = metadata;
- op.done_cb = do_nothing;
- op.user_data = NULL;
-
- grpc_call_next_op(elem, &op);
-}
-
-/* Handle incoming stream ops from the transport, translating them into
- call_ops to pass up the call stack */
-static void recv_batch(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops,
- size_t ops_count, grpc_stream_state final_state) {
- call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
- grpc_call_element *elem = calld->elem;
- channel_data *chand = elem->channel_data;
- grpc_stream_op *stream_op;
- grpc_call_op call_op;
- size_t i;
- gpr_uint32 length;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- for (i = 0; i < ops_count; i++) {
- stream_op = ops + i;
- switch (stream_op->type) {
- case GRPC_OP_FLOW_CTL_CB:
- stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
- break;
- case GRPC_NO_OP:
- break;
- case GRPC_OP_METADATA:
- got_metadata(elem, stream_op->data.metadata);
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- /* can't begin a message when we're still reading a message */
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Message terminated early; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- }
- /* stash away parameters, and prepare for incoming slices */
- length = stream_op->data.begin_message.length;
- if (length > calld->max_message_length) {
- char *message = NULL;
- gpr_asprintf(
- &message,
- "Maximum message length of %d exceeded by a message of length %d",
- calld->max_message_length, length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- } else if (length > 0) {
- calld->reading_message = 1;
- calld->incoming_message_length = length;
- } else {
- finish_message(chand, calld);
- }
- break;
- case GRPC_OP_SLICE:
- if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
- gpr_slice_unref(stream_op->data.slice);
- break;
- }
- /* we have to be reading a message to know what to do here */
- if (!calld->reading_message) {
- recv_error(chand, calld, __LINE__,
- "Received payload data while not reading a message");
- return;
- }
- /* append the slice to the incoming buffer */
- gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
- if (calld->incoming_message.length > calld->incoming_message_length) {
- /* if we got too many bytes, complain */
- char *message = NULL;
- gpr_asprintf(&message,
- "Receiving message overflow; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- } else if (calld->incoming_message.length ==
- calld->incoming_message_length) {
- finish_message(chand, calld);
- }
- }
- }
- /* if the stream closed, then call up the stack to let it know */
- if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
- final_state == GRPC_STREAM_CLOSED)) {
- calld->got_read_close = 1;
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Last message truncated; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- }
- call_op.type = GRPC_RECV_HALF_CLOSE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
- if (final_state == GRPC_STREAM_CLOSED) {
- call_op.type = GRPC_RECV_FINISH;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
-}
-
static void transport_goaway(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug) {
/* transport got goaway ==> call up and handle it */
@@ -470,8 +190,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
}
const grpc_transport_callbacks connected_channel_transport_callbacks = {
- alloc_recv_buffer, accept_stream, recv_batch,
- transport_goaway, transport_closed,
+ accept_stream, transport_goaway, transport_closed,
};
grpc_transport_setup_result grpc_connected_channel_bind_transport(
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 56e12342d7..9805f325a6 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -39,6 +39,12 @@ typedef struct call_data {
grpc_linked_mdelem scheme;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
+ int sent_initial_metadata;
+
+ int got_initial_metadata;
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -64,22 +70,37 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void hc_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
+ size_t i;
+ if (op->send_ops && !calld->sent_initial_metadata) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_initial_metadata = 1;
/* Send : prefixed headers, which have to be before any application
- * layer headers. */
+ layer headers. */
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
grpc_mdelem_ref(channeld->method));
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
@@ -88,19 +109,27 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_mdelem_ref(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
grpc_mdelem_ref(channeld->content_type));
- grpc_call_next_op(elem, op);
- break;
- case GRPC_RECV_METADATA:
- grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
break;
+ }
+ }
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hc_on_recv;
+ op->recv_user_data = elem;
}
}
+static void hc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hc_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
+}
+
/* Called on special channel events, such as disconnection or new incoming
calls on the server */
static void channel_op(grpc_channel_element *elem,
@@ -120,7 +149,13 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
+ call_data *calld = elem->call_data;
+ calld->sent_initial_metadata = 0;
+ calld->got_initial_metadata = 0;
+ if (initial_op) hc_mutate_op(elem, initial_op);
+}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
@@ -181,6 +216,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "http-client"};
+ hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "http-client"};
diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c
deleted file mode 100644
index 453a0422d8..0000000000
--- a/src/core/channel/http_filter.c
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/http_filter.h"
-#include <grpc/support/log.h>
-
-typedef struct call_data {
- int unused; /* C89 requires at least one struct element */
-} call_data;
-
-typedef struct channel_data {
- int unused; /* C89 requires at least one struct element */
-} channel_data;
-
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- ignore_unused(calld);
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
-}
-
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
-/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- /* initialize members */
- calld->unused = channeld->unused;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(calld);
- ignore_unused(channeld);
-}
-
-/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- /* The first and the last filters tend to be implemented differently to
- handle the case that there's no 'next' filter to call on the up or down
- path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
-
- /* initialize members */
- channeld->unused = 0;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-}
-
-const grpc_channel_filter grpc_http_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http"};
diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h
deleted file mode 100644
index 1b116ad61f..0000000000
--- a/src/core/channel/http_filter.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* Processes metadata that is common to both client and server for HTTP2
- transports. */
-extern const grpc_channel_filter grpc_http_filter;
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 0bfe2f2e30..1f64df68e3 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -38,12 +38,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct {
- grpc_mdelem *path;
- grpc_mdelem *content_type;
- grpc_byte_buffer *content;
-} gettable;
-
typedef struct call_data {
gpr_uint8 got_initial_metadata;
gpr_uint8 seen_path;
@@ -52,6 +46,10 @@ typedef struct call_data {
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
grpc_linked_mdelem status;
+
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -69,9 +67,6 @@ typedef struct channel_data {
grpc_mdstr *host_key;
grpc_mdctx *mdctx;
-
- size_t gettable_count;
- gettable *gettables;
} channel_data;
/* used to silence 'variable not used' warnings */
@@ -143,68 +138,82 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
+static void hs_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_RECV_METADATA:
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (!calld->got_initial_metadata) {
- calld->got_initial_metadata = 1;
- /* Have we seen the required http2 transport headers?
- (:method, :scheme, content-type, with :path and :authority covered
- at the channel level right now) */
- if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
- calld->seen_path) {
- grpc_call_next_op(elem, op);
- } else {
- if (!calld->seen_path) {
- gpr_log(GPR_ERROR, "Missing :path header");
- }
- if (!calld->seen_post) {
- gpr_log(GPR_ERROR, "Missing :method header");
- }
- if (!calld->seen_scheme) {
- gpr_log(GPR_ERROR, "Missing :scheme header");
- }
- if (!calld->seen_te_trailers) {
- gpr_log(GPR_ERROR, "Missing te trailers header");
- }
- /* Error this call out */
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- }
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path) {
+ /* do nothing */
} else {
- grpc_call_next_op(elem, op);
- }
- break;
- case GRPC_SEND_METADATA:
- /* If we haven't sent status 200 yet, we need to so so because it needs to
- come before any non : prefixed metadata. */
- if (!calld->sent_status) {
- calld->sent_status = 1;
- grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
- grpc_mdelem_ref(channeld->status_ok));
+ if (!calld->seen_path) {
+ gpr_log(GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_post) {
+ gpr_log(GPR_ERROR, "Missing :method header");
+ }
+ if (!calld->seen_scheme) {
+ gpr_log(GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers) {
+ gpr_log(GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ success = 0;
+ grpc_call_element_send_cancel(elem);
}
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ size_t i;
+
+ if (op->send_ops && !calld->sent_status) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+ grpc_mdelem_ref(channeld->status_ok));
break;
+ }
+ }
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hs_on_recv;
+ op->recv_user_data = elem;
}
}
+static void hs_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hs_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
+}
+
/* Called on special channel events, such as disconnection or new incoming
calls on the server */
static void channel_op(grpc_channel_element *elem,
@@ -224,15 +233,13 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
/* initialize members */
memset(calld, 0, sizeof(*calld));
+ if (initial_op) hs_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -242,9 +249,6 @@ static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
- size_t i;
- size_t gettable_capacity = 0;
-
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
@@ -270,46 +274,13 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
channeld->mdctx = mdctx;
-
- /* initialize http download support */
- channeld->gettable_count = 0;
- channeld->gettables = NULL;
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_SERVE_OVER_HTTP)) {
- gettable *g;
- gpr_slice slice;
- grpc_http_server_page *p = args->args[i].value.pointer.p;
- if (channeld->gettable_count == gettable_capacity) {
- gettable_capacity =
- GPR_MAX(gettable_capacity * 3 / 2, gettable_capacity + 1);
- channeld->gettables = gpr_realloc(channeld->gettables,
- gettable_capacity * sizeof(gettable));
- }
- g = &channeld->gettables[channeld->gettable_count++];
- g->path = grpc_mdelem_from_strings(mdctx, ":path", p->path);
- g->content_type =
- grpc_mdelem_from_strings(mdctx, "content-type", p->content_type);
- slice = gpr_slice_from_copied_string(p->content);
- g->content = grpc_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
- }
- }
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) {
- size_t i;
-
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- for (i = 0; i < channeld->gettable_count; i++) {
- grpc_mdelem_unref(channeld->gettables[i].path);
- grpc_mdelem_unref(channeld->gettables[i].content_type);
- grpc_byte_buffer_destroy(channeld->gettables[i].content);
- }
- gpr_free(channeld->gettables);
-
grpc_mdelem_unref(channeld->te_trailers);
grpc_mdelem_unref(channeld->status_ok);
grpc_mdelem_unref(channeld->status_not_found);
@@ -324,6 +295,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_server_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "http-server"};
+ hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index d987fa2bc1..1d2be716d7 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -45,13 +45,7 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -59,12 +53,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
ignore_unused(calld);
ignore_unused(channeld);
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
+ /* do nothing */
+}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void noop_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ noop_mutate_op(elem, op);
+
+ /* pass control down the stack */
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -86,13 +88,16 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
/* initialize members */
calld->unused = channeld->unused;
+
+ if (initial_op) noop_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -131,6 +136,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_no_op_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "no-op"};
+ noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "no-op"};
diff --git a/src/core/security/auth.c b/src/core/security/auth.c
index 4af2c67d83..2322c12aa5 100644
--- a/src/core/security/auth.c
+++ b/src/core/security/auth.c
@@ -51,7 +51,9 @@ typedef struct {
grpc_credentials *creds;
grpc_mdstr *host;
grpc_mdstr *method;
- grpc_call_op op;
+ grpc_transport_op op;
+ size_t op_md_idx;
+ int sent_initial_metadata;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
} call_data;
@@ -65,24 +67,23 @@ typedef struct {
grpc_mdstr *status_key;
} channel_data;
-static void bubbleup_error(grpc_call_element *elem, const char *error_msg) {
- grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg);
- grpc_call_element_send_cancel(elem);
-}
-
static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
size_t num_md,
grpc_credentials_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
- grpc_call_op op = calld->op;
+ grpc_transport_op *op = &calld->op;
+ grpc_metadata_batch *mdb;
size_t i;
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
+ GPR_ASSERT(op->send_ops && op->send_ops->nops > calld->op_md_idx &&
+ op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA);
+ mdb = &op->send_ops->ops[calld->op_md_idx].data.metadata;
for (i = 0; i < num_md; i++) {
- grpc_metadata_batch_add_tail(&op.data.metadata, &calld->md_links[i],
+ grpc_metadata_batch_add_tail(mdb, &calld->md_links[i],
grpc_mdelem_ref(md_elems[i]));
}
- grpc_call_next_op(elem, &op);
+ grpc_call_next_op(elem, op);
}
static char *build_service_url(const char *url_scheme, call_data *calld) {
@@ -105,7 +106,8 @@ static char *build_service_url(const char *url_scheme, call_data *calld) {
return service_url;
}
-static void send_security_metadata(grpc_call_element *elem, grpc_call_op *op) {
+static void send_security_metadata(grpc_call_element *elem,
+ grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -136,6 +138,7 @@ static void send_security_metadata(grpc_call_element *elem, grpc_call_op *op) {
static void on_host_checked(void *user_data, grpc_security_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
if (status == GRPC_SECURITY_OK) {
send_security_metadata(elem, &calld->op);
@@ -143,10 +146,11 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
- bubbleup_error(elem, error_msg);
- grpc_metadata_batch_destroy(&calld->op.data.metadata);
+ grpc_transport_op_add_cancellation(
+ &calld->op, GRPC_STATUS_UNAUTHENTICATED,
+ grpc_mdstr_from_string(chand->md_ctx, error_msg));
gpr_free(error_msg);
- calld->op.done_cb(calld->op.user_data, GRPC_OP_ERROR);
+ grpc_call_next_op(elem, &calld->op);
}
}
@@ -155,16 +159,23 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void auth_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
grpc_linked_mdelem *l;
+ size_t i;
- switch (op->type) {
- case GRPC_SEND_METADATA:
- for (l = op->data.metadata.list.head; l != NULL; l = l->next) {
+ if (op->send_ops && !calld->sent_initial_metadata) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *sop = &ops[i];
+ if (sop->type != GRPC_OP_METADATA) continue;
+ calld->op_md_idx = i;
+ calld->sent_initial_metadata = 1;
+ for (l = sop->data.metadata.list.head; l != NULL; l = l->next) {
grpc_mdelem *md = l->md;
/* Pointer comparison is OK for md_elems created from the same context.
*/
@@ -188,21 +199,22 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.",
call_host);
- bubbleup_error(elem, error_msg);
- grpc_metadata_batch_destroy(&calld->op.data.metadata);
+ grpc_transport_op_add_cancellation(
+ &calld->op, GRPC_STATUS_UNAUTHENTICATED,
+ grpc_mdstr_from_string(channeld->md_ctx, error_msg));
gpr_free(error_msg);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_call_next_op(elem, &calld->op);
}
- break;
+ return; /* early exit */
}
}
send_security_metadata(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
+ return; /* early exit */
+ }
}
+
+ /* pass control up or down the stack */
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -214,13 +226,17 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
/* TODO(jboeuf):
Find a way to pass-in the credentials from the caller here. */
call_data *calld = elem->call_data;
calld->creds = NULL;
calld->host = NULL;
calld->method = NULL;
+ calld->sent_initial_metadata = 0;
+
+ GPR_ASSERT(!initial_op || !initial_op->send_ops);
}
/* Destructor for call_data */
@@ -288,5 +304,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_auth_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "auth"};
+ auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "auth"};
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 5b7d99ce40..db9d545c0e 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -35,7 +35,6 @@
#include <string.h>
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
@@ -73,8 +72,8 @@ static void state_unref(grpc_server_secure_state *state) {
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {
+ &grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 2949805622..6ca1b4e9a1 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -81,9 +81,9 @@ typedef struct {
grpc_ioreq_completion_func on_complete;
void *user_data;
/* a bit mask of which request ops are needed (1u << opid) */
- gpr_uint32 need_mask;
+ gpr_uint16 need_mask;
/* a bit mask of which request ops are now completed */
- gpr_uint32 complete_mask;
+ gpr_uint16 complete_mask;
} reqinfo_master;
/* Status data for a request can come from several sources; this
@@ -144,12 +144,17 @@ struct grpc_call {
gpr_uint8 have_alarm;
/* are we currently performing a send operation */
gpr_uint8 sending;
+ /* are we currently performing a recv operation */
+ gpr_uint8 receiving;
/* are we currently completing requests */
gpr_uint8 completing;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
- /* flag that we need to request more data */
- gpr_uint8 need_more_data;
+ /* are we currently reading a message? */
+ gpr_uint8 reading_message;
+ /* flags with bits corresponding to write states allowing us to determine
+ what was sent */
+ gpr_uint16 last_send_contains;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
@@ -214,6 +219,13 @@ struct grpc_call {
size_t send_initial_metadata_count;
gpr_timespec send_deadline;
+ grpc_stream_op_buffer send_ops;
+ grpc_stream_op_buffer recv_ops;
+ grpc_stream_state recv_state;
+
+ gpr_slice_buffer incoming_message;
+ gpr_uint32 incoming_message_length;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@@ -234,9 +246,13 @@ struct grpc_call {
} while (0)
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
-static send_action choose_send_action(grpc_call *call);
-static void enact_send_action(grpc_call *call, send_action sa);
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
+static void call_on_done_recv(void *call, int success);
+static void call_on_done_send(void *call, int success);
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
+static void execute_op(grpc_call *call, grpc_transport_op *op);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
+static void finish_read_ops(grpc_call *call);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -244,6 +260,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
size_t add_initial_metadata_count,
gpr_timespec send_deadline) {
size_t i;
+ grpc_transport_op initial_op;
+ grpc_transport_op *initial_op_ptr = NULL;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
@@ -267,10 +285,24 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
- /* one ref is dropped in response to destroy, the other in
- stream_closed */
- gpr_ref_init(&call->internal_refcount, 2);
- grpc_call_stack_init(channel_stack, server_transport_data,
+ grpc_sopb_init(&call->send_ops);
+ grpc_sopb_init(&call->recv_ops);
+ gpr_slice_buffer_init(&call->incoming_message);
+ /* dropped in destroy */
+ gpr_ref_init(&call->internal_refcount, 1);
+ /* server hack: start reads immediately so we can get initial metadata.
+ TODO(ctiller): figure out a cleaner solution */
+ if (!call->is_client) {
+ memset(&initial_op, 0, sizeof(initial_op));
+ initial_op.recv_ops = &call->recv_ops;
+ initial_op.recv_state = &call->recv_state;
+ initial_op.on_done_recv = call_on_done_recv;
+ initial_op.recv_user_data = call;
+ call->receiving = 1;
+ GRPC_CALL_INTERNAL_REF(call, "receiving");
+ initial_op_ptr = &initial_op;
+ }
+ grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
set_deadline_alarm(call, send_deadline);
@@ -287,7 +319,15 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq;
}
-void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
+#ifdef GRPC_CALL_REF_COUNT_DEBUG
+void grpc_call_internal_ref(grpc_call *c, const char *reason) {
+ gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
+ c->internal_refcount.count, c->internal_refcount.count + 1, reason);
+#else
+void grpc_call_internal_ref(grpc_call *c) {
+#endif
+ gpr_ref(&c->internal_refcount);
+}
static void destroy_call(void *call, int ignored_success) {
size_t i;
@@ -310,14 +350,24 @@ static void destroy_call(void *call, int ignored_success) {
for (i = 0; i < c->send_initial_metadata_count; i++) {
grpc_mdelem_unref(c->send_initial_metadata[i].md);
}
+ grpc_sopb_destroy(&c->send_ops);
+ grpc_sopb_destroy(&c->recv_ops);
if (c->legacy_state) {
destroy_legacy_state(c->legacy_state);
}
grpc_bbq_destroy(&c->incoming_queue);
+ gpr_slice_buffer_destroy(&c->incoming_message);
gpr_free(c);
}
+#ifdef GRPC_CALL_REF_COUNT_DEBUG
+void grpc_call_internal_unref(grpc_call *c, const char *reason,
+ int allow_immediate_deletion) {
+ gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c,
+ c->internal_refcount.count, c->internal_refcount.count - 1, reason);
+#else
void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
+#endif
if (gpr_unref(&c->internal_refcount)) {
if (allow_immediate_deletion) {
destroy_call(c, 1);
@@ -359,20 +409,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
return GRPC_CALL_OK;
}
-static void request_more_data(grpc_call *call) {
- grpc_call_op op;
-
- /* call down */
- op.type = GRPC_REQUEST_DATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.bind_pollset = NULL;
-
- grpc_call_execute_op(call, &op);
-}
-
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@@ -383,17 +419,43 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
+static int need_more_data(grpc_call *call) {
+ return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
+ is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
+ is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
+ is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
+ is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
+ (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
+ grpc_bbq_empty(&call->incoming_queue)) ||
+ (call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
+ call->read_state != READ_STATE_STREAM_CLOSED);
+}
+
static void unlock(grpc_call *call) {
- send_action sa = SEND_NOTHING;
+ grpc_transport_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
- int need_more_data =
- call->need_more_data &&
- (call->write_state >= WRITE_STATE_STARTED || !call->is_client);
+ int start_op = 0;
int i;
- if (need_more_data) {
- call->need_more_data = 0;
+ memset(&op, 0, sizeof(op));
+
+ if (!call->receiving && need_more_data(call)) {
+ op.recv_ops = &call->recv_ops;
+ op.recv_state = &call->recv_state;
+ op.on_done_recv = call_on_done_recv;
+ op.recv_user_data = call;
+ call->receiving = 1;
+ GRPC_CALL_INTERNAL_REF(call, "receiving");
+ start_op = 1;
+ }
+
+ if (!call->sending) {
+ if (fill_send_ops(call, &op)) {
+ call->sending = 1;
+ GRPC_CALL_INTERNAL_REF(call, "sending");
+ start_op = 1;
+ }
}
if (!call->completing && call->num_completed_requests != 0) {
@@ -402,25 +464,13 @@ static void unlock(grpc_call *call) {
sizeof(completed_requests));
call->num_completed_requests = 0;
call->completing = 1;
- grpc_call_internal_ref(call);
- }
-
- if (!call->sending) {
- sa = choose_send_action(call);
- if (sa != SEND_NOTHING) {
- call->sending = 1;
- grpc_call_internal_ref(call);
- }
+ GRPC_CALL_INTERNAL_REF(call, "completing");
}
gpr_mu_unlock(&call->mu);
- if (need_more_data) {
- request_more_data(call);
- }
-
- if (sa != SEND_NOTHING) {
- enact_send_action(call, sa);
+ if (start_op) {
+ execute_op(call, &op);
}
if (completing_requests > 0) {
@@ -431,7 +481,7 @@ static void unlock(grpc_call *call) {
lock(call);
call->completing = 0;
unlock(call);
- grpc_call_internal_unref(call, 0);
+ GRPC_CALL_INTERNAL_UNREF(call, "completing", 0);
}
}
@@ -495,7 +545,6 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
master->complete_mask |= 1u << op;
if (status != GRPC_OP_OK) {
master->status = status;
- master->complete_mask = master->need_mask;
}
if (master->complete_mask == master->need_mask) {
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
@@ -554,64 +603,144 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
}
}
-static void finish_send_op(grpc_call *call, grpc_ioreq_op op, write_state ws,
- grpc_op_error error) {
+static void call_on_done_send(void *pc, int success) {
+ grpc_call *call = pc;
+ grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR;
lock(call);
- finish_ioreq_op(call, op, error);
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
+ }
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error);
+ }
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
+ }
+ call->last_send_contains = 0;
call->sending = 0;
- call->write_state = ws;
unlock(call);
- grpc_call_internal_unref(call, 0);
+ GRPC_CALL_INTERNAL_UNREF(call, "sending", 0);
}
-static void finish_write_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, WRITE_STATE_STARTED, error);
+static void finish_message(grpc_call *call) {
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
+ call->incoming_message.slices, call->incoming_message.count);
+ gpr_slice_buffer_reset_and_unref(&call->incoming_message);
+
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+
+ GPR_ASSERT(call->incoming_message.count == 0);
+ call->reading_message = 0;
}
-static void finish_finish_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, WRITE_STATE_WRITE_CLOSED, error);
+static int begin_message(grpc_call *call, grpc_begin_message msg) {
+ /* can't begin a message when we're still reading a message */
+ if (call->reading_message) {
+ char *message = NULL;
+ gpr_asprintf(
+ &message, "Message terminated early; read %d bytes, expected %d",
+ (int)call->incoming_message.length, (int)call->incoming_message_length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ }
+ /* stash away parameters, and prepare for incoming slices */
+ if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
+ char *message = NULL;
+ gpr_asprintf(
+ &message,
+ "Maximum message length of %d exceeded by a message of length %d",
+ grpc_channel_get_max_message_length(call->channel), msg.length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ } else if (msg.length > 0) {
+ call->reading_message = 1;
+ call->incoming_message_length = msg.length;
+ return 1;
+ } else {
+ finish_message(call);
+ return 1;
+ }
}
-static void finish_start_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, WRITE_STATE_STARTED,
- error);
+static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
+ if (GPR_SLICE_LENGTH(slice) == 0) {
+ gpr_slice_unref(slice);
+ return 1;
+ }
+ /* we have to be reading a message to know what to do here */
+ if (!call->reading_message) {
+ grpc_call_cancel_with_status(
+ call, GRPC_STATUS_INVALID_ARGUMENT,
+ "Received payload data while not reading a message");
+ return 0;
+ }
+ /* append the slice to the incoming buffer */
+ gpr_slice_buffer_add(&call->incoming_message, slice);
+ if (call->incoming_message.length > call->incoming_message_length) {
+ /* if we got too many bytes, complain */
+ char *message = NULL;
+ gpr_asprintf(
+ &message, "Receiving message overflow; read %d bytes, expected %d",
+ (int)call->incoming_message.length, (int)call->incoming_message_length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ } else if (call->incoming_message.length == call->incoming_message_length) {
+ finish_message(call);
+ return 1;
+ } else {
+ return 1;
+ }
}
-static send_action choose_send_action(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_INITIAL:
- if (is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
- if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) ||
- is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- return SEND_BUFFERED_INITIAL_METADATA;
- } else {
- return SEND_INITIAL_METADATA;
- }
- }
- return SEND_NOTHING;
- case WRITE_STATE_STARTED:
- if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
- if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- return SEND_BUFFERED_MESSAGE;
- } else {
- return SEND_MESSAGE;
- }
- } else if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
- if (call->is_client) {
- return SEND_FINISH;
- } else {
- return SEND_TRAILING_METADATA_AND_FINISH;
- }
+static void call_on_done_recv(void *pc, int success) {
+ grpc_call *call = pc;
+ size_t i;
+ lock(call);
+ call->receiving = 0;
+ if (success) {
+ for (i = 0; success && i < call->recv_ops.nops; i++) {
+ grpc_stream_op *op = &call->recv_ops.ops[i];
+ switch (op->type) {
+ case GRPC_NO_OP:
+ break;
+ case GRPC_OP_METADATA:
+ recv_metadata(call, &op->data.metadata);
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ success = begin_message(call, op->data.begin_message);
+ break;
+ case GRPC_OP_SLICE:
+ success = add_slice_to_message(call, op->data.slice);
+ break;
}
- return SEND_NOTHING;
- case WRITE_STATE_WRITE_CLOSED:
- return SEND_NOTHING;
+ }
+ if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
+ GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
+ call->read_state = READ_STATE_READ_CLOSED;
+ }
+ if (call->recv_state == GRPC_STREAM_CLOSED) {
+ GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
+ call->read_state = READ_STATE_STREAM_CLOSED;
+ }
+ finish_read_ops(call);
+ } else {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR);
}
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- return SEND_NOTHING;
+ call->recv_ops.nops = 0;
+ unlock(call);
+
+ GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
}
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
@@ -639,97 +768,102 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
return out;
}
-static void enact_send_action(grpc_call *call, send_action sa) {
+/* Copy the contents of a byte buffer into stream ops */
+static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
+ grpc_stream_op_buffer *sopb) {
+ size_t i;
+
+ switch (byte_buffer->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ gpr_slice_ref(slice);
+ grpc_sopb_add_slice(sopb, slice);
+ }
+ break;
+ }
+}
+
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
- grpc_call_op op;
+ grpc_metadata_batch mdb;
size_t i;
- gpr_uint32 flags = 0;
char status_str[GPR_LTOA_MIN_BUFSIZE];
+ GPR_ASSERT(op->send_ops == NULL);
- switch (sa) {
- case SEND_NOTHING:
- abort();
- break;
- case SEND_BUFFERED_INITIAL_METADATA:
- flags |= GRPC_WRITE_BUFFER_HINT;
- /* fallthrough */
- case SEND_INITIAL_METADATA:
+ switch (call->write_state) {
+ case WRITE_STATE_INITIAL:
+ if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+ break;
+ }
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.metadata.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = call->send_deadline;
+ mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+ data.send_metadata.metadata);
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = call->send_deadline;
for (i = 0; i < call->send_initial_metadata_count; i++) {
- grpc_metadata_batch_link_head(&op.data.metadata,
- &call->send_initial_metadata[i]);
+ grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
}
+ grpc_sopb_add_metadata(&call->send_ops, mdb);
+ op->send_ops = &call->send_ops;
+ op->bind_pollset = grpc_cq_pollset(call->cq);
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
+ call->write_state = WRITE_STATE_STARTED;
call->send_initial_metadata_count = 0;
- op.done_cb = finish_start_step;
- op.user_data = call;
- op.bind_pollset = grpc_cq_pollset(call->cq);
- grpc_call_execute_op(call, &op);
- break;
- case SEND_BUFFERED_MESSAGE:
- flags |= GRPC_WRITE_BUFFER_HINT;
- /* fallthrough */
- case SEND_MESSAGE:
- data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.message = data.send_message;
- op.done_cb = finish_write_step;
- op.user_data = call;
- op.bind_pollset = NULL;
- grpc_call_execute_op(call, &op);
- break;
- case SEND_TRAILING_METADATA_AND_FINISH:
- /* send trailing metadata */
- data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.metadata.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = call->send_deadline;
- op.bind_pollset = NULL;
- /* send status */
- /* TODO(ctiller): cache common status values */
- data = call->request_data[GRPC_IOREQ_SEND_STATUS];
- gpr_ltoa(data.send_status.code, status_str);
- grpc_metadata_batch_add_tail(
- &op.data.metadata, &call->status_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context, status_str)));
- if (data.send_status.details) {
- grpc_metadata_batch_add_tail(
- &op.data.metadata, &call->details_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_status.details)));
+ /* fall through intended */
+ case WRITE_STATE_STARTED:
+ if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
+ data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ grpc_sopb_add_begin_message(
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
+ op->send_ops = &call->send_ops;
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
+ }
+ if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
+ op->is_last_send = 1;
+ op->send_ops = &call->send_ops;
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ if (!call->is_client) {
+ /* send trailing metadata */
+ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
+ mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+ data.send_metadata.metadata);
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future;
+ /* send status */
+ /* TODO(ctiller): cache common status values */
+ data = call->request_data[GRPC_IOREQ_SEND_STATUS];
+ gpr_ltoa(data.send_status.code, status_str);
+ grpc_metadata_batch_add_tail(
+ &mdb, &call->status_link,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context, status_str)));
+ if (data.send_status.details) {
+ grpc_metadata_batch_add_tail(
+ &mdb, &call->details_link,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(
+ grpc_channel_get_message_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context,
+ data.send_status.details)));
+ }
+ grpc_sopb_add_metadata(&call->send_ops, mdb);
+ }
}
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
- /* fallthrough: see choose_send_action for details */
- case SEND_FINISH:
- op.type = GRPC_SEND_FINISH;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = finish_finish_step;
- op.user_data = call;
- op.bind_pollset = NULL;
- grpc_call_execute_op(call, &op);
+ break;
+ case WRITE_STATE_WRITE_CLOSED:
break;
}
+ if (op->send_ops) {
+ op->on_done_send = call_on_done_send;
+ op->send_user_data = call;
+ }
+ return op->send_ops != NULL;
}
static grpc_call_error start_ioreq_error(grpc_call *call,
@@ -838,10 +972,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
master->on_complete = completion;
master->user_data = user_data;
- if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) {
- call->need_more_data = 1;
- }
-
finish_read_ops(call);
early_out_write_ops(call);
@@ -868,44 +998,37 @@ void grpc_call_destroy(grpc_call *c) {
cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c);
if (cancel) grpc_call_cancel(c);
- grpc_call_internal_unref(c, 1);
+ GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
}
-grpc_call_error grpc_call_cancel(grpc_call *c) {
- grpc_call_element *elem;
- grpc_call_op op;
-
- op.type = GRPC_CANCEL_OP;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.bind_pollset = NULL;
-
- elem = CALL_ELEM_FROM_CALL(c, 0);
- elem->filter->call_op(elem, NULL, &op);
-
- return GRPC_CALL_OK;
+grpc_call_error grpc_call_cancel(grpc_call *call) {
+ return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled");
}
grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
const char *description) {
+ grpc_transport_op op;
grpc_mdstr *details =
description ? grpc_mdstr_from_string(c->metadata_context, description)
: NULL;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = status;
+
lock(c);
set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
unlock(c);
- return grpc_call_cancel(c);
+
+ execute_op(c, &op);
+
+ return GRPC_CALL_OK;
}
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
+static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, op);
+ elem->filter->start_transport_op(elem, op);
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -922,40 +1045,20 @@ static void call_alarm(void *arg, int success) {
grpc_call_cancel(call);
}
}
- grpc_call_internal_unref(call, 1);
+ GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
}
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
+ assert(0);
+ return;
}
- grpc_call_internal_ref(call);
+ GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-static void set_read_state_locked(grpc_call *call, read_state state) {
- GPR_ASSERT(call->read_state < state);
- call->read_state = state;
- finish_read_ops(call);
-}
-
-static void set_read_state(grpc_call *call, read_state state) {
- lock(call);
- set_read_state_locked(call, state);
- unlock(call);
-}
-
-void grpc_call_read_closed(grpc_call_element *elem) {
- set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
-}
-
-void grpc_call_stream_closed(grpc_call_element *elem) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- set_read_state(call, READ_STATE_STREAM_CLOSED);
- grpc_call_internal_unref(call, 0);
-}
-
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
@@ -979,35 +1082,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
-void grpc_call_recv_message(grpc_call_element *elem,
- grpc_byte_buffer *byte_buffer) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
- finish_read_ops(call);
- unlock(call);
-}
-
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- set_status_code(call, STATUS_FROM_CORE, status);
- set_status_details(call, STATUS_FROM_CORE,
- grpc_mdstr_from_string(call->metadata_context, message));
- unlock(call);
-}
-
-int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
int is_trailing;
grpc_mdctx *mdctx = call->metadata_context;
- lock(call);
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
for (l = md->list.head; l != NULL; l = l->next) {
grpc_mdelem *md = l->md;
@@ -1043,9 +1124,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
- set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
+ call->read_state = READ_STATE_GOT_INITIAL_METADATA;
}
- unlock(call);
grpc_mdctx_lock(mdctx);
for (l = md->list.head; l; l = l->next) {
@@ -1055,8 +1135,6 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
}
grpc_mdctx_unlock(mdctx);
-
- return !is_trailing;
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index f8d0915349..2d4c7f61e3 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -93,30 +93,24 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
+#ifdef GRPC_CALL_REF_COUNT_DEBUG
+void grpc_call_internal_ref(grpc_call *call, const char *reason);
+void grpc_call_internal_unref(grpc_call *call, const char *reason, int allow_immediate_deletion);
+#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call, reason)
+#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) grpc_call_internal_unref(call, reason, allow_immediate_deletion)
+#else
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
+#define GRPC_CALL_INTERNAL_REF(call, reason) grpc_call_internal_ref(call)
+#define GRPC_CALL_INTERNAL_UNREF(call, reason, allow_immediate_deletion) grpc_call_internal_unref(call, allow_immediate_deletion)
+#endif
-/* Helpers for grpc_client, grpc_server filters to publish received data to
- the completion queue/surface layer */
-/* receive metadata - returns 1 if this was initial metadata */
-int grpc_call_recv_metadata(grpc_call_element *surface_element,
- grpc_metadata_batch *md);
-void grpc_call_recv_message(grpc_call_element *surface_element,
- grpc_byte_buffer *message);
-void grpc_call_read_closed(grpc_call_element *surface_element);
-void grpc_call_stream_closed(grpc_call_element *surface_element);
-
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message);
-
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 29b042e7c1..78f9144c19 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -52,6 +52,7 @@ typedef struct registered_call {
struct grpc_channel {
int is_client;
gpr_refcount refs;
+ gpr_uint32 max_message_length;
grpc_mdctx *metadata_context;
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_message_string;
@@ -68,9 +69,13 @@ struct grpc_channel {
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
+/* the protobuf library will (by default) start warning at 100megs */
+#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+ size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
@@ -88,6 +93,24 @@ grpc_channel *grpc_channel_create_from_filters(
CHANNEL_STACK_FROM_CHANNEL(channel));
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
+
+ channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
+ if (args) {
+ for (i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
+ if (args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else if (args->args[i].value.integer < 0) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else {
+ channel->max_message_length = args->args[i].value.integer;
+ }
+ }
+ }
+ }
+
return channel;
}
@@ -219,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
return channel->grpc_message_string;
}
+
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) {
+ return channel->max_message_length;
+}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index d3e51185ee..388be35711 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -44,10 +44,11 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
void grpc_client_channel_closed(grpc_channel_element *elem);
void grpc_channel_internal_ref(grpc_channel *channel);
void grpc_channel_internal_unref(grpc_channel *channel);
-#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 3104b1d00d..daa8d3a7c6 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -44,7 +44,6 @@
#include "src/core/channel/client_setup.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
@@ -176,8 +175,8 @@ static void done_setup(void *sp) {
static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {
+ &grpc_http_client_filter};
return grpc_client_channel_transport_setup_complete(
channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
mdctx);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 2f898ff7d7..8ac4dd1e0e 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -43,32 +43,10 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void client_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, &op->data.metadata);
- break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_RECV_HALF_CLOSE:
- grpc_call_read_closed(elem);
- break;
- case GRPC_RECV_FINISH:
- grpc_call_stream_closed(elem);
- break;
- case GRPC_RECV_SYNTHETIC_STATUS:
- grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status,
- op->data.synthetic_status.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
- }
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -90,7 +68,8 @@ static void channel_op(grpc_channel_element *elem,
}
static void init_call_elem(grpc_call_element *elem,
- const void *transport_server_data) {}
+ const void *transport_server_data,
+ grpc_transport_op *initial_op) {}
static void destroy_call_elem(grpc_call_element *elem) {}
@@ -104,6 +83,7 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
const grpc_channel_filter grpc_client_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client",
+ client_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "client",
};
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index e0135d9fb9..c1c97af337 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -155,7 +155,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
grpc_completion_type type) {
gpr_ref(&cc->refs);
- if (call) grpc_call_internal_ref(call);
+ if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
#ifndef NDEBUG
gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
#endif
@@ -422,7 +422,7 @@ void grpc_event_finish(grpc_event *base) {
event *ev = (event *)base;
ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
if (ev->base.call) {
- grpc_call_internal_unref(ev->base.call, 1);
+ GRPC_CALL_INTERNAL_UNREF(ev->base.call, "cq", 1);
}
gpr_free(ev);
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 78170806f1..3186292a02 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -42,26 +42,40 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct { void *unused; } call_data;
+typedef struct {
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
+} call_data;
-typedef struct { void *unused; } channel_data;
+typedef struct { grpc_mdctx *mdctx; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void lame_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_metadata_batch_destroy(&op->data.metadata);
- grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN,
- "Rpc sent on a lame channel.");
- grpc_call_stream_closed(elem);
- break;
- default:
- break;
+ if (op->send_ops) {
+ op->on_done_send(op->send_user_data, 0);
+ }
+ if (op->recv_ops) {
+ char tmp[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa(GRPC_STATUS_UNKNOWN, tmp);
+ calld->status.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp);
+ calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message",
+ "Rpc sent on a lame channel.");
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb.list.head = &calld->status;
+ mdb.list.tail = &calld->details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future;
+ grpc_sopb_add_metadata(op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv(op->recv_user_data, 1);
}
-
- op->done_cb(op->user_data, GRPC_OP_ERROR);
}
static void channel_op(grpc_channel_element *elem,
@@ -79,23 +93,30 @@ static void channel_op(grpc_channel_element *elem,
}
static void init_call_elem(grpc_call_element *elem,
- const void *transport_server_data) {}
+ const void *transport_server_data,
+ grpc_transport_op *initial_op) {
+ if (initial_op) {
+ grpc_transport_op_finish_with_failure(initial_op);
+ }
+}
static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
+ channel_data *chand = elem->channel_data;
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
+ chand->mdctx = mdctx;
}
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "lame-client",
+ lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "lame-client",
};
grpc_channel *grpc_lame_client_channel_create(void) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index e7d223bfda..3e331293b5 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -44,7 +44,6 @@
#include "src/core/channel/client_setup.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
#include "src/core/security/auth.h"
@@ -193,7 +192,7 @@ static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_transport *transport,
grpc_mdctx *mdctx) {
static grpc_channel_filter const *extra_filters[] = {
- &grpc_client_auth_filter, &grpc_http_client_filter, &grpc_http_filter};
+ &grpc_client_auth_filter, &grpc_http_client_filter};
return grpc_client_channel_transport_setup_complete(
channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
mdctx);
@@ -211,7 +210,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
- grpc_channel_security_connector* connector;
+ grpc_channel_security_connector *connector;
grpc_mdctx *mdctx;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index a2e94d5598..83caefcbc6 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -173,13 +173,19 @@ struct call_data {
grpc_call *call;
call_state state;
- gpr_timespec deadline;
grpc_mdstr *path;
grpc_mdstr *host;
+ gpr_timespec deadline;
+ int got_initial_metadata;
legacy_data *legacy;
grpc_completion_queue *cq_new;
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -375,46 +381,6 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
-static void stream_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- break;
- case PENDING:
- call_list_remove(calld, PENDING_START);
- /* fallthrough intended */
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
- grpc_call_stream_closed(elem);
-}
-
-static void read_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- case PENDING:
- grpc_call_read_closed(elem);
- break;
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
-}
-
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
@@ -429,33 +395,75 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
- grpc_call_op *op) {
+static void server_on_recv(void *ptr, int success) {
+ grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_RECV_METADATA:
+ channel_data *chand = elem->channel_data;
+
+ if (success && !calld->got_initial_metadata) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
calld->deadline = op->data.metadata.deadline;
- start_new_rpc(elem);
}
+ calld->got_initial_metadata = 1;
+ start_new_rpc(elem);
break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
+ }
+ }
+
+ switch (*calld->recv_state) {
+ case GRPC_STREAM_OPEN:
break;
- case GRPC_RECV_HALF_CLOSE:
- read_closed(elem);
+ case GRPC_STREAM_SEND_CLOSED:
break;
- case GRPC_RECV_FINISH:
- stream_closed(elem);
+ case GRPC_STREAM_RECV_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
+ case GRPC_STREAM_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ } else if (calld->state == PENDING) {
+ call_list_remove(calld, PENDING_START);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
}
+
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+
+ if (op->recv_ops) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->recv_state = op->recv_state;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_recv;
+ op->recv_user_data = elem;
+ }
+}
+
+static void server_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ server_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -506,7 +514,8 @@ static void shutdown_channel(channel_data *chand) {
}
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -518,6 +527,8 @@ static void init_call_elem(grpc_call_element *elem,
gpr_mu_unlock(&chand->server->mu);
server_ref(chand->server);
+
+ if (initial_op) server_mutate_op(elem, initial_op);
}
static void destroy_call_elem(grpc_call_element *elem) {
@@ -596,8 +607,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
+ server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "server",
};
static void addcq(grpc_server *server, grpc_completion_queue *cq) {
@@ -918,6 +930,8 @@ void grpc_server_destroy(grpc_server *server) {
channel_data *c;
listener *l;
size_t i;
+ call_data *calld;
+
gpr_mu_lock(&server->mu);
if (!server->shutdown) {
gpr_mu_unlock(&server->mu);
@@ -942,6 +956,15 @@ void grpc_server_destroy(grpc_server *server) {
gpr_free(l);
}
+ while ((calld = call_list_remove_head(&server->lists[PENDING_START],
+ PENDING_START)) != NULL) {
+ gpr_log(GPR_DEBUG, "server destroys call %p", calld->call);
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(
+ kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+ }
+
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
shutdown_channel(c);
@@ -1114,7 +1137,7 @@ static void begin_call(grpc_server *server, call_data *calld,
break;
}
- grpc_call_internal_ref(calld->call);
+ GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
rc->tag);
}
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index f3b9219f8b..7b5c2f227b 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -33,7 +33,6 @@
#include <grpc/grpc.h>
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_server.h"
@@ -46,8 +45,8 @@
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {
+ &grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 5ca31d6bc7..cf1e66bf8b 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -481,7 +481,6 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
break;
case GRPC_OP_METADATA:
grpc_metadata_batch_assert_ok(&op->data.metadata);
- case GRPC_OP_FLOW_CTL_CB:
/* these just get copied as they don't impact the number of flow
controlled bytes */
grpc_sopb_append(outops, op, 1);
@@ -567,10 +566,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
GPR_ERROR,
"These stream ops should be filtered out by grpc_chttp2_preencode");
abort();
- case GRPC_OP_FLOW_CTL_CB:
- op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
- curop++;
- break;
case GRPC_OP_METADATA:
/* Encode a metadata batch; store the returned values, representing
a metadata element that needs to be unreffed back into the metadata
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index e32ee284e0..26c550c1f1 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -91,10 +91,9 @@ typedef enum {
/* streams that are waiting to start because there are too many concurrent
streams on the connection */
WAITING_FOR_CONCURRENCY,
- /* streams that want to callback the application */
- PENDING_CALLBACKS,
- /* streams that *ARE* calling back to the application */
- EXECUTING_CALLBACKS,
+ /* streams that have finished reading: we wait until unlock to coalesce
+ all changes into one callback */
+ FINISHED_READ_OP,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
@@ -141,6 +140,12 @@ typedef enum {
DTS_FRAME
} deframe_transport_state;
+typedef enum {
+ WRITE_STATE_OPEN,
+ WRITE_STATE_QUEUED_CLOSE,
+ WRITE_STATE_SENT_CLOSE
+} WRITE_STATE;
+
typedef struct {
stream *head;
stream *tail;
@@ -182,6 +187,18 @@ typedef struct {
gpr_slice debug;
} pending_goaway;
+typedef struct {
+ void (*cb)(void *user_data, int success);
+ void *user_data;
+ int success;
+} op_closure;
+
+typedef struct {
+ op_closure *callbacks;
+ size_t count;
+ size_t capacity;
+} op_closure_array;
+
struct transport {
grpc_transport base; /* must be first */
const grpc_transport_callbacks *cb;
@@ -202,6 +219,10 @@ struct transport {
gpr_uint8 closed;
error_state error_state;
+ /* queued callbacks */
+ op_closure_array pending_callbacks;
+ op_closure_array executing_callbacks;
+
/* stream indexing */
gpr_uint32 next_stream_id;
gpr_uint32 last_incoming_stream_id;
@@ -281,13 +302,13 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- gpr_uint8 queued_write_closed;
- gpr_uint8 sending_write_closed;
- gpr_uint8 sent_write_closed;
+ WRITE_STATE write_state;
+ gpr_uint8 send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
- gpr_uint8 allow_window_updates;
- gpr_uint8 published_close;
+
+ op_closure send_done_closure;
+ op_closure recv_done_closure;
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
@@ -296,10 +317,14 @@ struct stream {
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
+ grpc_linked_mdelem *old_incoming_metadata;
gpr_timespec incoming_deadline;
/* sops from application */
- grpc_stream_op_buffer outgoing_sopb;
+ grpc_stream_op_buffer *outgoing_sopb;
+ grpc_stream_op_buffer *incoming_sopb;
+ grpc_stream_state *publish_state;
+ grpc_stream_state published_state;
/* sops that have passed flow control to be written */
grpc_stream_op_buffer writing_sopb;
@@ -337,7 +362,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_chttp2_error_code error_code, int send_rst);
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst);
+ grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message, int send_rst);
static void finalize_cancellations(transport *t);
static stream *lookup_stream(transport *t, gpr_uint32 id);
static void remove_from_stream_map(transport *t, stream *s);
@@ -348,6 +374,14 @@ static void become_skip_parser(transport *t);
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
+static void schedule_cb(transport *t, op_closure closure, int success);
+static void maybe_finish_read(transport *t, stream *s);
+static void maybe_join_window_updates(transport *t, stream *s);
+static void finish_reads(transport *t);
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
+static void add_metadata_batch(transport *t, stream *s);
+
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -387,6 +421,9 @@ static void destruct_transport(transport *t) {
}
gpr_free(t->pings);
+ gpr_free(t->pending_callbacks.callbacks);
+ gpr_free(t->executing_callbacks.callbacks);
+
for (i = 0; i < t->num_pending_goaways; i++) {
gpr_slice_unref(t->pending_goaways[i].debug);
}
@@ -416,6 +453,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
+ memset(t, 0, sizeof(*t));
+
t->base.vtable = &vtable;
t->ep = ep;
/* one ref is for destroy, the other for when ep becomes NULL */
@@ -427,27 +466,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->reading = 1;
- t->writing = 0;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
- t->last_incoming_stream_id = 0;
- t->destroying = 0;
- t->closed = 0;
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
- t->expect_continuation_stream_id = 0;
- t->pings = NULL;
- t->ping_count = 0;
- t->ping_capacity = 0;
t->ping_counter = gpr_now().tv_nsec;
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- t->pending_goaways = NULL;
- t->num_pending_goaways = 0;
- t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
@@ -462,7 +490,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
needed.
TODO(ctiller): tune this */
grpc_chttp2_stream_map_init(&t->stream_map, 8);
- memset(&t->lists, 0, sizeof(t->lists));
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
@@ -503,7 +530,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_mu_lock(&t->mu);
t->calling_back = 1;
- ref_transport(t);
+ ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
@@ -515,7 +542,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
- ref_transport(t);
+ ref_transport(t); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
unref_transport(t);
@@ -573,16 +600,19 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
}
static int init_stream(grpc_transport *gt, grpc_stream *gs,
- const void *server_data) {
+ const void *server_data, grpc_transport_op *initial_op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
+ memset(s, 0, sizeof(*s));
+
ref_transport(t);
if (!server_data) {
lock(t);
s->id = 0;
} else {
+ /* already locked */
s->id = (gpr_uint32)(gpr_uintptr)server_data;
t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
@@ -592,24 +622,13 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->queued_write_closed = 0;
- s->sending_write_closed = 0;
- s->sent_write_closed = 0;
- s->read_closed = 0;
- s->cancelled = 0;
- s->allow_window_updates = 0;
- s->published_close = 0;
- s->incoming_metadata_count = 0;
- s->incoming_metadata_capacity = 0;
- s->incoming_metadata = NULL;
s->incoming_deadline = gpr_inf_future;
- memset(&s->links, 0, sizeof(s->links));
- memset(&s->included, 0, sizeof(s->included));
- grpc_sopb_init(&s->outgoing_sopb);
grpc_sopb_init(&s->writing_sopb);
grpc_sopb_init(&s->callback_sopb);
grpc_chttp2_data_parser_init(&s->parser);
+ if (initial_op) perform_op_locked(t, s, initial_op);
+
if (!server_data) {
unlock(t);
}
@@ -642,10 +661,16 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock(&t->mu);
- grpc_sopb_destroy(&s->outgoing_sopb);
+ GPR_ASSERT(s->outgoing_sopb == NULL);
+ GPR_ASSERT(s->incoming_sopb == NULL);
grpc_sopb_destroy(&s->writing_sopb);
grpc_sopb_destroy(&s->callback_sopb);
grpc_chttp2_data_parser_destroy(&s->parser);
+ for (i = 0; i < s->incoming_metadata_count; i++) {
+ grpc_mdelem_unref(s->incoming_metadata[i].md);
+ }
+ gpr_free(s->incoming_metadata);
+ gpr_free(s->old_incoming_metadata);
unref_transport(t);
}
@@ -708,8 +733,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
- if (id == PENDING_CALLBACKS)
- GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@@ -718,6 +741,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) {
static void remove_from_stream_map(transport *t, stream *s) {
if (s->id == 0) return;
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
+ t->is_client ? "CLI" : "SVR", s->id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
maybe_start_some_streams(t);
}
@@ -762,6 +787,8 @@ static void unlock(transport *t) {
finalize_cancellations(t);
}
+ finish_reads(t);
+
/* gather any callbacks that need to be made */
if (!t->calling_back && cb) {
perform_callbacks = prepare_callbacks(t);
@@ -865,21 +892,24 @@ static int prepare_write(transport *t) {
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
s->outgoing_window > 0) {
window_delta = grpc_chttp2_preencode(
- s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
+ s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;
- s->sending_write_closed =
- s->queued_write_closed && s->outgoing_sopb.nops == 0;
- if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
+ if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
+ s->outgoing_sopb->nops == 0) {
+ s->send_closed = 1;
+ }
+ if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
}
- /* if there are still writes to do and the stream still has window
- available, then schedule a further write */
- if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) {
- GPR_ASSERT(!t->outgoing_window);
+ /* we should either exhaust window or have no ops left, but not both */
+ if (s->outgoing_sopb->nops == 0) {
+ s->outgoing_sopb = NULL;
+ schedule_cb(t, s->send_done_closure, 1);
+ } else if (s->outgoing_window) {
stream_list_add_tail(t, s, WRITABLE);
}
}
@@ -912,10 +942,9 @@ static void finalize_outbuf(transport *t) {
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
- s->sending_write_closed, s->id, &t->hpack_compressor,
- &t->outbuf);
+ s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
s->writing_sopb.nops = 0;
- if (s->sending_write_closed) {
+ if (s->send_closed) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
@@ -929,8 +958,10 @@ static void finish_write_common(transport *t, int success) {
drop_connection(t);
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
- s->sent_write_closed = 1;
- if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
+ s->write_state = WRITE_STATE_SENT_CLOSE;
+ if (1||!s->cancelled) {
+ maybe_finish_read(t, s);
+ }
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@@ -980,6 +1011,9 @@ static void maybe_start_some_streams(transport *t) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) break;
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
+ t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
+
GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id;
t->next_stream_id += 2;
@@ -988,43 +1022,63 @@ static void maybe_start_some_streams(transport *t) {
}
}
-static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
- size_t ops_count, int is_last) {
- transport *t = (transport *)gt;
- stream *s = (stream *)gs;
-
- lock(t);
-
- if (is_last) {
- s->queued_write_closed = 1;
- }
- if (!s->cancelled) {
- grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
- if (s->id == 0) {
- stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
- maybe_start_some_streams(t);
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ cancel_stream(
+ t, s, op->cancel_with_status,
+ grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
+ op->cancel_message, 1);
+ }
+
+ if (op->send_ops) {
+ GPR_ASSERT(s->outgoing_sopb == NULL);
+ s->send_done_closure.cb = op->on_done_send;
+ s->send_done_closure.user_data = op->send_user_data;
+ if (!s->cancelled) {
+ s->outgoing_sopb = op->send_ops;
+ if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
+ s->write_state = WRITE_STATE_QUEUED_CLOSE;
+ }
+ if (s->id == 0) {
+ IF_TRACING(gpr_log(GPR_DEBUG,
+ "HTTP:%s: New stream %p waiting for concurrency",
+ t->is_client ? "CLI" : "SVR", s));
+ stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
+ maybe_start_some_streams(t);
+ } else if (s->outgoing_window > 0) {
+ stream_list_join(t, s, WRITABLE);
+ }
} else {
- stream_list_join(t, s, WRITABLE);
+ schedule_nuke_sopb(t, op->send_ops);
+ schedule_cb(t, s->send_done_closure, 0);
}
- } else {
- grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
}
- if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
- !s->published_close) {
- stream_list_join(t, s, PENDING_CALLBACKS);
+
+ if (op->recv_ops) {
+ GPR_ASSERT(s->incoming_sopb == NULL);
+ s->recv_done_closure.cb = op->on_done_recv;
+ s->recv_done_closure.user_data = op->recv_user_data;
+ s->incoming_sopb = op->recv_ops;
+ s->incoming_sopb->nops = 0;
+ s->publish_state = op->recv_state;
+ gpr_free(s->old_incoming_metadata);
+ s->old_incoming_metadata = NULL;
+ maybe_finish_read(t, s);
+ maybe_join_window_updates(t, s);
}
- unlock(t);
+ if (op->bind_pollset) {
+ add_to_pollset_locked(t, op->bind_pollset);
+ }
}
-static void abort_stream(grpc_transport *gt, grpc_stream *gs,
- grpc_status_code status) {
+static void perform_op(grpc_transport *gt, grpc_stream *gs,
+ grpc_transport_op *op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
lock(t);
- cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
- 1);
+ perform_op_locked(t, s, op);
unlock(t);
}
@@ -1063,8 +1117,8 @@ static void finalize_cancellations(transport *t) {
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->read_closed = 1;
- s->sent_write_closed = 1;
- stream_list_join(t, s, PENDING_CALLBACKS);
+ s->write_state = WRITE_STATE_SENT_CLOSE;
+ maybe_finish_read(t, s);
}
}
@@ -1082,18 +1136,24 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
- int send_rst) {
+ grpc_mdstr *optional_message, int send_rst) {
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
- had_outgoing = s->outgoing_sopb.nops != 0;
+ had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
- schedule_nuke_sopb(t, &s->outgoing_sopb);
+ if (s->outgoing_sopb) {
+ schedule_nuke_sopb(t, s->outgoing_sopb);
+ s->outgoing_sopb = NULL;
+ stream_list_remove(t, s, WRITABLE);
+ schedule_cb(t, s->send_done_closure, 0);
+ }
if (s->cancelled) {
send_rst = 0;
- } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
+ } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
+ had_outgoing) {
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);
@@ -1101,17 +1161,26 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
add_incoming_metadata(
t, s,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- switch (local_status) {
- case GRPC_STATUS_CANCELLED:
- add_incoming_metadata(
- t, s, grpc_mdelem_from_strings(t->metadata_context,
- "grpc-message", "Cancelled"));
- break;
- default:
- break;
+ if (!optional_message) {
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
+ } else {
+ add_incoming_metadata(
+ t, s,
+ grpc_mdelem_from_metadata_strings(
+ t->metadata_context,
+ grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
+ grpc_mdstr_ref(optional_message)));
}
-
- stream_list_join(t, s, PENDING_CALLBACKS);
+ add_metadata_batch(t, s);
+ maybe_finish_read(t, s);
}
}
if (!id) send_rst = 0;
@@ -1119,24 +1188,29 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
gpr_slice_buffer_add(&t->qbuf,
grpc_chttp2_rst_stream_create(id, error_code));
}
+ if (optional_message) {
+ grpc_mdstr_unref(optional_message);
+ }
}
static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst) {
cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
- send_rst);
+ NULL, send_rst);
}
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst) {
- cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
+ grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message, int send_rst) {
+ cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
+ send_rst);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
- GRPC_CHTTP2_INTERNAL_ERROR, 0);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
}
static void end_all_the_calls(transport *t) {
@@ -1150,8 +1224,14 @@ static void drop_connection(transport *t) {
end_all_the_calls(t);
}
+static void maybe_finish_read(transport *t, stream *s) {
+ if (s->incoming_sopb) {
+ stream_list_join(t, s, FINISHED_READ_OP);
+ }
+}
+
static void maybe_join_window_updates(transport *t, stream *s) {
- if (s->allow_window_updates &&
+ if (s->incoming_sopb != NULL &&
s->incoming_window <
t->settings[LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
@@ -1160,21 +1240,6 @@ static void maybe_join_window_updates(transport *t, stream *s) {
}
}
-static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
- int allow) {
- transport *t = (transport *)tp;
- stream *s = (stream *)sp;
-
- lock(t);
- s->allow_window_updates = allow;
- if (allow) {
- maybe_join_window_updates(t, s);
- } else {
- stream_list_remove(t, s, WINDOW_UPDATE);
- }
- unlock(t);
-}
-
static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
if (t->incoming_frame_size > t->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
@@ -1248,7 +1313,7 @@ static int init_data_frame_parser(transport *t) {
case GRPC_CHTTP2_STREAM_ERROR:
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_INTERNAL_ERROR),
- GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
return init_skip_frame(t, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
drop_connection(t);
@@ -1267,11 +1332,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT(s);
- IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
- grpc_mdstr_as_c_string(md->key),
- grpc_mdstr_as_c_string(md->value)));
+ IF_TRACING(gpr_log(
+ GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
- stream_list_join(t, s, PENDING_CALLBACKS);
if (md->key == t->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
if (!cached_timeout) {
@@ -1290,6 +1354,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
} else {
add_incoming_metadata(t, s, md);
}
+ maybe_finish_read(t, s);
}
static int init_header_frame_parser(transport *t, int is_continuation) {
@@ -1464,33 +1529,20 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
-static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
-
static void add_metadata_batch(transport *t, stream *s) {
grpc_metadata_batch b;
- size_t i;
- b.list.head = &s->incoming_metadata[0];
- b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
+ b.list.head = NULL;
+ /* Store away the last element of the list, so that in patch_metadata_ops
+ we can reconstitute the list.
+ We can't do list building here as later incoming metadata may reallocate
+ the underlying array. */
+ b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count;
b.garbage.head = b.garbage.tail = NULL;
b.deadline = s->incoming_deadline;
-
- for (i = 1; i < s->incoming_metadata_count; i++) {
- s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
- s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
- }
- s->incoming_metadata[0].prev = NULL;
- s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
+ s->incoming_deadline = gpr_inf_future;
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
- grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
- s->incoming_metadata);
-
- /* reset */
- s->incoming_deadline = gpr_inf_future;
- s->incoming_metadata = NULL;
- s->incoming_metadata_count = 0;
- s->incoming_metadata_capacity = 0;
}
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
@@ -1501,14 +1553,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
case GRPC_CHTTP2_PARSE_OK:
if (st.end_of_stream) {
t->incoming_stream->read_closed = 1;
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.need_flush_reads) {
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.metadata_boundary) {
add_metadata_batch(t, t->incoming_stream);
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.ack_settings) {
gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
@@ -1545,11 +1597,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
}
if (st.initial_window_update) {
for (i = 0; i < t->stream_map.count; i++) {
- stream *s = (stream*)(t->stream_map.values[i]);
+ stream *s = (stream *)(t->stream_map.values[i]);
int was_window_empty = s->outgoing_window <= 0;
s->outgoing_window += st.initial_window_update;
- if (was_window_empty && s->outgoing_window > 0 &&
- s->outgoing_sopb.nops > 0) {
+ if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
+ s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
@@ -1563,12 +1615,13 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_FLOW_CONTROL_ERROR),
- GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
+ GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else {
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
flag that */
- if (was_window_empty && s->outgoing_sopb.nops) {
+ if (was_window_empty && s->outgoing_sopb &&
+ s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
@@ -1830,53 +1883,135 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN;
}
-static int prepare_callbacks(transport *t) {
- stream *s;
- int n = 0;
- while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
- int execute = 1;
-
- s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
- if (s->callback_state == GRPC_STREAM_CLOSED) {
- remove_from_stream_map(t, s);
- if (s->published_close) {
- execute = 0;
- } else if (s->incoming_metadata_count) {
- add_metadata_batch(t, s);
- }
- s->published_close = 1;
+static void patch_metadata_ops(stream *s) {
+ grpc_stream_op *ops = s->incoming_sopb->ops;
+ size_t nops = s->incoming_sopb->nops;
+ size_t i;
+ size_t j;
+ size_t mdidx = 0;
+ size_t last_mdidx;
+ int found_metadata = 0;
+
+ /* rework the array of metadata into a linked list, making use
+ of the breadcrumbs we left in metadata batches during
+ add_metadata_batch */
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ found_metadata = 1;
+ /* we left a breadcrumb indicating where the end of this list is,
+ and since we add sequentially, we know from the end of the last
+ segment where this segment begins */
+ last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
+ GPR_ASSERT(last_mdidx > mdidx);
+ GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
+ /* turn the array into a doubly linked list */
+ op->data.metadata.list.head = &s->incoming_metadata[mdidx];
+ op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
+ for (j = mdidx + 1; j < last_mdidx; j++) {
+ s->incoming_metadata[j].prev = &s->incoming_metadata[j-1];
+ s->incoming_metadata[j-1].next = &s->incoming_metadata[j];
+ }
+ s->incoming_metadata[mdidx].prev = NULL;
+ s->incoming_metadata[last_mdidx-1].next = NULL;
+ /* track where we're up to */
+ mdidx = last_mdidx;
+ }
+ if (found_metadata) {
+ s->old_incoming_metadata = s->incoming_metadata;
+ if (mdidx != s->incoming_metadata_count) {
+ /* we have a partially read metadata batch still in incoming_metadata */
+ size_t new_count = s->incoming_metadata_count - mdidx;
+ size_t copy_bytes = sizeof(*s->incoming_metadata) * new_count;
+ GPR_ASSERT(mdidx < s->incoming_metadata_count);
+ s->incoming_metadata = gpr_malloc(copy_bytes);
+ memcpy(s->old_incoming_metadata + mdidx, s->incoming_metadata, copy_bytes);
+ s->incoming_metadata_count = s->incoming_metadata_capacity = new_count;
+ } else {
+ s->incoming_metadata = NULL;
+ s->incoming_metadata_count = 0;
+ s->incoming_metadata_capacity = 0;
}
+ }
+}
- grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
+static void finish_reads(transport *t) {
+ stream *s;
- if (execute) {
- stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
- n = 1;
+ while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
+ int publish = 0;
+ GPR_ASSERT(s->incoming_sopb);
+ *s->publish_state =
+ compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
+ if (*s->publish_state != s->published_state) {
+ s->published_state = *s->publish_state;
+ publish = 1;
+ if (s->published_state == GRPC_STREAM_CLOSED) {
+ remove_from_stream_map(t, s);
+ }
+ }
+ if (s->parser.incoming_sopb.nops > 0) {
+ grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
+ publish = 1;
+ }
+ if (publish) {
+ if (s->incoming_metadata_count > 0) {
+ patch_metadata_ops(s);
+ }
+ s->incoming_sopb = NULL;
+ schedule_cb(t, s->recv_done_closure, 1);
}
}
- return n;
+
+}
+
+static void schedule_cb(transport *t, op_closure closure, int success) {
+ if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
+ t->pending_callbacks.capacity =
+ GPR_MAX(t->pending_callbacks.capacity * 2, 8);
+ t->pending_callbacks.callbacks =
+ gpr_realloc(t->pending_callbacks.callbacks,
+ t->pending_callbacks.capacity *
+ sizeof(*t->pending_callbacks.callbacks));
+ }
+ closure.success = success;
+ t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
+}
+
+static int prepare_callbacks(transport *t) {
+ op_closure_array temp = t->pending_callbacks;
+ t->pending_callbacks = t->executing_callbacks;
+ t->executing_callbacks = temp;
+ return t->executing_callbacks.count > 0;
}
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
- stream *s;
- while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
- size_t nops = s->callback_sopb.nops;
- s->callback_sopb.nops = 0;
- cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
- s->callback_sopb.ops, nops, s->callback_state);
+ size_t i;
+ for (i = 0; i < t->executing_callbacks.count; i++) {
+ op_closure c = t->executing_callbacks.callbacks[i];
+ c.cb(c.user_data, c.success);
}
+ t->executing_callbacks.count = 0;
}
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
cb->closed(t->cb_user_data, &t->base);
}
-static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
- transport *t = (transport *)gt;
- lock(t);
+/*
+ * POLLSET STUFF
+ */
+
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
if (t->ep) {
grpc_endpoint_add_to_pollset(t->ep, pollset);
}
+}
+
+static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
+ transport *t = (transport *)gt;
+ lock(t);
+ add_to_pollset_locked(t, pollset);
unlock(t);
}
@@ -1885,9 +2020,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
- sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
- send_ping, destroy_transport};
+ sizeof(stream), init_stream, perform_op,
+ add_to_pollset, destroy_stream, goaway,
+ close_transport, send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 882c078d51..e1a75adcb6 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -81,9 +81,6 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
case GRPC_OP_METADATA:
grpc_metadata_batch_destroy(&ops[i].data.metadata);
break;
- case GRPC_OP_FLOW_CTL_CB:
- ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR);
- break;
case GRPC_NO_OP:
case GRPC_OP_BEGIN_MESSAGE:
break;
@@ -91,34 +88,20 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
}
}
-static void assert_contained_metadata_ok(grpc_stream_op *ops, size_t nops) {
-#ifndef NDEBUG
- size_t i;
- for (i = 0; i < nops; i++) {
- if (ops[i].type == GRPC_OP_METADATA) {
- grpc_metadata_batch_assert_ok(&ops[i].data.metadata);
- }
- }
-#endif /* NDEBUG */
-}
-
static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) {
sopb->capacity = new_capacity;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
if (sopb->ops == sopb->inlined_ops) {
sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity);
memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op));
} else {
sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity);
}
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
grpc_stream_op *out;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
-
+ GPR_ASSERT(sopb->nops <= sopb->capacity);
if (sopb->nops == sopb->capacity) {
expandto(sopb, GROW(sopb->capacity));
}
@@ -129,7 +112,6 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) {
add(sopb)->type = GRPC_NO_OP;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
@@ -138,34 +120,19 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
op->type = GRPC_OP_BEGIN_MESSAGE;
op->data.begin_message.length = length;
op->data.begin_message.flags = flags;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb,
grpc_metadata_batch b) {
grpc_stream_op *op = add(sopb);
- grpc_metadata_batch_assert_ok(&b);
op->type = GRPC_OP_METADATA;
op->data.metadata = b;
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
grpc_stream_op *op = add(sopb);
op->type = GRPC_OP_SLICE;
op->data.slice = slice;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
-}
-
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_FLOW_CTL_CB;
- op->data.flow_ctl_cb.cb = cb;
- op->data.flow_ctl_cb.arg = arg;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
@@ -173,15 +140,12 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t orig_nops = sopb->nops;
size_t new_nops = orig_nops + nops;
- assert_contained_metadata_ok(ops, nops);
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
if (new_nops > sopb->capacity) {
expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops));
}
memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops);
sopb->nops = new_nops;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
static void assert_valid_list(grpc_mdelem_list *list) {
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 20146b9af2..95497a3cc8 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -55,9 +55,7 @@ typedef enum grpc_stream_op_code {
GRPC_OP_BEGIN_MESSAGE,
/* Add a slice of data to the current message/metadata element/status.
Must not overflow the forward declared length. */
- GRPC_OP_SLICE,
- /* Call some function once this operation has passed flow control. */
- GRPC_OP_FLOW_CTL_CB
+ GRPC_OP_SLICE
} grpc_stream_op_code;
/* Arguments for GRPC_OP_BEGIN */
@@ -68,12 +66,6 @@ typedef struct grpc_begin_message {
gpr_uint32 flags;
} grpc_begin_message;
-/* Arguments for GRPC_OP_FLOW_CTL_CB */
-typedef struct grpc_flow_ctl_cb {
- void (*cb)(void *arg, grpc_op_error error);
- void *arg;
-} grpc_flow_ctl_cb;
-
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
@@ -94,29 +86,31 @@ typedef struct grpc_metadata_batch {
void grpc_metadata_batch_init(grpc_metadata_batch *comd);
void grpc_metadata_batch_destroy(grpc_metadata_batch *comd);
void grpc_metadata_batch_merge(grpc_metadata_batch *target,
- grpc_metadata_batch *add);
+ grpc_metadata_batch *add);
void grpc_metadata_batch_link_head(grpc_metadata_batch *comd,
- grpc_linked_mdelem *storage);
+ grpc_linked_mdelem *storage);
void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
- grpc_linked_mdelem *storage);
+ grpc_linked_mdelem *storage);
void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
- grpc_linked_mdelem *storage,
- grpc_mdelem *elem_to_add);
+ grpc_linked_mdelem *storage,
+ grpc_mdelem *elem_to_add);
void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd,
- grpc_linked_mdelem *storage,
- grpc_mdelem *elem_to_add);
+ grpc_linked_mdelem *storage,
+ grpc_mdelem *elem_to_add);
void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
- grpc_mdelem *(*filter)(void *user_data,
- grpc_mdelem *elem),
- void *user_data);
+ grpc_mdelem *(*filter)(void *user_data,
+ grpc_mdelem *elem),
+ void *user_data);
#ifndef NDEBUG
void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd);
#else
-#define grpc_metadata_batch_assert_ok(comd) do {} while (0)
+#define grpc_metadata_batch_assert_ok(comd) \
+ do { \
+ } while (0)
#endif
/* Represents a single operation performed on a stream/transport */
@@ -129,7 +123,6 @@ typedef struct grpc_stream_op {
grpc_begin_message begin_message;
grpc_metadata_batch metadata;
gpr_slice slice;
- grpc_flow_ctl_cb flow_ctl_cb;
} data;
} grpc_stream_op;
@@ -160,15 +153,14 @@ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
/* Append a GRPC_OP_BEGIN to a buffer */
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
gpr_uint32 flags);
-void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
+void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb,
+ grpc_metadata_batch metadata);
/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
-/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb);
+
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index ef0020dc58..d9a1319c42 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -52,18 +52,15 @@ void grpc_transport_destroy(grpc_transport *transport) {
}
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
- const void *server_data) {
- return transport->vtable->init_stream(transport, stream, server_data);
+ const void *server_data,
+ grpc_transport_op *initial_op) {
+ return transport->vtable->init_stream(transport, stream, server_data,
+ initial_op);
}
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
- grpc_stream_op *ops, size_t nops, int is_last) {
- transport->vtable->send_batch(transport, stream, ops, nops, is_last);
-}
-
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
- grpc_stream *stream, int allow) {
- transport->vtable->set_allow_window_updates(transport, stream, allow);
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ grpc_transport_op *op) {
+ transport->vtable->perform_op(transport, stream, op);
}
void grpc_transport_add_to_pollset(grpc_transport *transport,
@@ -76,11 +73,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream);
}
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status) {
- transport->vtable->abort_stream(transport, stream, status);
-}
-
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data) {
transport->vtable->ping(transport, cb, user_data);
@@ -93,3 +85,23 @@ void grpc_transport_setup_cancel(grpc_transport_setup *setup) {
void grpc_transport_setup_initiate(grpc_transport_setup *setup) {
setup->vtable->initiate(setup);
}
+
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
+ if (op->send_ops) {
+ op->on_done_send(op->send_user_data, 0);
+ }
+ if (op->recv_ops) {
+ op->on_done_recv(op->recv_user_data, 0);
+ }
+}
+
+void grpc_transport_op_add_cancellation(grpc_transport_op *op,
+ grpc_status_code status,
+ grpc_mdstr *message) {
+ if (op->cancel_with_status == GRPC_STATUS_OK) {
+ op->cancel_with_status = status;
+ op->cancel_message = message;
+ } else if (message) {
+ grpc_mdstr_unref(message);
+ }
+}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index ce8c17c322..cdea0b9a0b 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -60,26 +60,26 @@ typedef enum grpc_stream_state {
GRPC_STREAM_CLOSED
} grpc_stream_state;
-/* Callbacks made from the transport to the upper layers of grpc. */
-struct grpc_transport_callbacks {
- /* Allocate a buffer to receive data into.
- It's safe to call grpc_slice_new() to do this, but performance minded
- proxies may want to carefully place data into optimal locations for
- transports.
- This function must return a valid, non-empty slice.
+/* Transport op: a set of operations to perform on a transport */
+typedef struct grpc_transport_op {
+ grpc_stream_op_buffer *send_ops;
+ int is_last_send;
+ void (*on_done_send)(void *user_data, int success);
+ void *send_user_data;
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- stream - the grpc_stream instance the buffer will be used for, or
- NULL if this is not known
- size_hint - how big of a buffer would the transport optimally like?
- the actual returned buffer can be smaller or larger than
- size_hint as the implementation finds convenient */
- struct gpr_slice (*alloc_recv_buffer)(void *user_data,
- grpc_transport *transport,
- grpc_stream *stream, size_t size_hint);
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
+ grpc_pollset *bind_pollset;
+
+ grpc_status_code cancel_with_status;
+ grpc_mdstr *cancel_message;
+} grpc_transport_op;
+
+/* Callbacks made from the transport to the upper layers of grpc. */
+struct grpc_transport_callbacks {
/* Initialize a new stream on behalf of the transport.
Must result in a call to
grpc_transport_init_stream(transport, ..., request) in the same call
@@ -96,28 +96,6 @@ struct grpc_transport_callbacks {
void (*accept_stream)(void *user_data, grpc_transport *transport,
const void *server_data);
- /* Process a set of stream ops that have been received by the transport.
- Called by network threads, so must be careful not to block on network
- activity.
-
- If final_state == GRPC_STREAM_CLOSED, the upper layers should arrange to
- call grpc_transport_destroy_stream.
-
- Ownership of any objects contained in ops is transferred to the callee.
-
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- stream - the stream this data was received for
- ops - stream operations that are part of this batch
- ops_count - the number of stream operations in this batch
- final_state - the state of the stream as of the final operation in this
- batch */
- void (*recv_batch)(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops, size_t ops_count,
- grpc_stream_state final_state);
-
- /* The transport received a goaway */
void (*goaway)(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug);
@@ -139,7 +117,8 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
server_data - either NULL for a client initiated stream, or a pointer
supplied from the accept_stream callback function */
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
- const void *server_data);
+ const void *server_data,
+ grpc_transport_op *initial_op);
/* Destroy transport data for a stream.
@@ -154,20 +133,17 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream);
-/* Enable/disable incoming data for a stream.
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
- This effectively disables new window becoming available for a given stream,
- but does not prevent existing window from being consumed by a sender: the
- caller must still be prepared to receive some additional data after this
- call.
+void grpc_transport_op_add_cancellation(grpc_transport_op *op,
+ grpc_status_code status,
+ grpc_mdstr *message);
- Arguments:
- transport - the transport on which to create this stream
- stream - the grpc_stream to destroy (memory is still owned by the
- caller, but any child memory must be cleaned up)
- allow - is it allowed that new window be opened up? */
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
- grpc_stream *stream, int allow);
+/* TODO(ctiller): remove this */
+void grpc_transport_add_to_pollset(grpc_transport *transport,
+ grpc_pollset *pollset);
+
+char *grpc_transport_op_string(grpc_transport_op *op);
/* Send a batch of operations on a transport
@@ -177,13 +153,9 @@ void grpc_transport_set_allow_window_updates(grpc_transport *transport,
transport - the transport on which to initiate the stream
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
- ops - an array of operations to apply to the stream - can be NULL
- if ops_count == 0.
- ops_count - the number of elements in ops
- is_last - is this the last batch of operations to be sent out */
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
- grpc_stream_op *ops, size_t ops_count,
- int is_last);
+ op - a grpc_transport_op specifying the op to perform */
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ grpc_transport_op *op);
/* Send a ping on a transport
@@ -193,19 +165,6 @@ void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data);
-/* Abort a stream
-
- Terminate reading and writing for a stream. A final recv_batch with no
- operations and final_state == GRPC_STREAM_CLOSED will be received locally,
- and no more data will be presented to the up-layer.
-
- TODO(ctiller): consider adding a HTTP/2 reason to this function. */
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status);
-
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset);
-
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
gpr_slice debug_data);
@@ -254,4 +213,4 @@ void grpc_transport_setup_initiate(grpc_transport_setup *setup);
used as a destruction call by setup). */
void grpc_transport_setup_cancel(grpc_transport_setup *setup);
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index ac275c7560..479e15338f 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -43,15 +43,11 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_transport *self, grpc_stream *stream,
- const void *server_data);
+ const void *server_data, grpc_transport_op *initial_op);
/* implementation of grpc_transport_send_batch */
- void (*send_batch)(grpc_transport *self, grpc_stream *stream,
- grpc_stream_op *ops, size_t ops_count, int is_last);
-
- /* implementation of grpc_transport_set_allow_window_updates */
- void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream,
- int allow);
+ void (*perform_op)(grpc_transport *self, grpc_stream *stream,
+ grpc_transport_op *op);
/* implementation of grpc_transport_add_to_pollset */
void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
@@ -59,10 +55,6 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
- /* implementation of grpc_transport_abort_stream */
- void (*abort_stream)(grpc_transport *self, grpc_stream *stream,
- grpc_status_code status);
-
/* implementation of grpc_transport_goaway */
void (*goaway)(grpc_transport *self, grpc_status_code status,
gpr_slice debug_data);
@@ -84,4 +76,4 @@ struct grpc_transport {
const grpc_transport_vtable *vtable;
};
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
new file mode 100644
index 0000000000..7bbe8276c3
--- /dev/null
+++ b/src/core/transport/transport_op_string.c
@@ -0,0 +1,164 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+/* These routines are here to facilitate debugging - they produce string
+ representations of various transport data structures */
+
+static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
+ gpr_strvec_add(b, gpr_strdup("key="));
+ gpr_strvec_add(
+ b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
+
+ gpr_strvec_add(b, gpr_strdup(" value="));
+ gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
+ GPR_SLICE_LENGTH(md->value->slice),
+ GPR_HEXDUMP_PLAINTEXT));
+}
+
+static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
+ grpc_linked_mdelem *m;
+ for (m = md.list.head; m != NULL; m = m->next) {
+ if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", "));
+ put_metadata(b, m->md);
+ }
+ if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ char *tmp;
+ gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
+ md.deadline.tv_nsec);
+ gpr_strvec_add(b, tmp);
+ }
+}
+
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
+ char *out;
+ char *tmp;
+ size_t i;
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op *op = &sopb->ops[i];
+ if (i > 0) gpr_strvec_add(&b, gpr_strdup(", "));
+ switch (op->type) {
+ case GRPC_NO_OP:
+ gpr_strvec_add(&b, gpr_strdup("NO_OP"));
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_SLICE:
+ gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice));
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_METADATA:
+ gpr_strvec_add(&b, gpr_strdup("METADATA{"));
+ put_metadata_list(&b, op->data.metadata);
+ gpr_strvec_add(&b, gpr_strdup("}"));
+ break;
+ }
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+char *grpc_transport_op_string(grpc_transport_op *op) {
+ char *tmp;
+ char *out;
+ int first = 1;
+
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ if (op->send_ops) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("SEND"));
+ if (op->is_last_send) {
+ gpr_strvec_add(&b, gpr_strdup("_LAST"));
+ }
+ gpr_strvec_add(&b, gpr_strdup("["));
+ gpr_strvec_add(&b, grpc_sopb_string(op->send_ops));
+ gpr_strvec_add(&b, gpr_strdup("]"));
+ }
+
+ if (op->recv_ops) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("RECV"));
+ }
+
+ if (op->bind_pollset) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("BIND"));
+ }
+
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
+ gpr_strvec_add(&b, tmp);
+ if (op->cancel_message) {
+ gpr_asprintf(&tmp, ";msg='%s'",
+ grpc_mdstr_as_c_string(op->cancel_message));
+ gpr_strvec_add(&b, tmp);
+ }
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_transport_op *op) {
+ char *str = grpc_transport_op_string(op);
+ gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
+ gpr_free(str);
+}
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index f4ae6fab84..e76bb930ee 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -118,35 +118,36 @@ static void grpc_rb_call_destroy(void *p) {
}
static size_t md_ary_datasize(const void *p) {
- const grpc_metadata_array* const ary = (grpc_metadata_array*)p;
- size_t i, datasize = sizeof(grpc_metadata_array);
- for (i = 0; i < ary->count; ++i) {
- const grpc_metadata* const md = &ary->metadata[i];
- datasize += strlen(md->key);
- datasize += md->value_length;
- }
- datasize += ary->capacity * sizeof(grpc_metadata);
- return datasize;
+ const grpc_metadata_array *const ary = (grpc_metadata_array *)p;
+ size_t i, datasize = sizeof(grpc_metadata_array);
+ for (i = 0; i < ary->count; ++i) {
+ const grpc_metadata *const md = &ary->metadata[i];
+ datasize += strlen(md->key);
+ datasize += md->value_length;
+ }
+ datasize += ary->capacity * sizeof(grpc_metadata);
+ return datasize;
}
static const rb_data_type_t grpc_rb_md_ary_data_type = {
"grpc_metadata_array",
{GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize},
- NULL, NULL,
- 0
-};
+ NULL,
+ NULL,
+ 0};
/* Describes grpc_call struct for RTypedData */
static const rb_data_type_t grpc_call_data_type = {
"grpc_call",
{GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE},
- NULL, NULL,
- /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because grpc_rb_call_destroy
+ NULL,
+ NULL,
+ /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
+ * grpc_rb_call_destroy
* touches a hash object.
* TODO(yugui) Directly use st_table and call the free function earlier?
*/
- 0
-};
+ 0};
/* Error code details is a hash containing text strings describing errors */
VALUE rb_error_code_details;
@@ -250,7 +251,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
}
md_ary->metadata[md_ary->count].value = RSTRING_PTR(rb_ary_entry(val, i));
md_ary->metadata[md_ary->count].value_length =
- RSTRING_LEN(rb_ary_entry(val, i));
+ RSTRING_LEN(rb_ary_entry(val, i));
md_ary->count += 1;
}
} else {
@@ -290,10 +291,11 @@ static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
/* grpc_rb_md_ary_convert converts a ruby metadata hash into
a grpc_metadata_array.
*/
-static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
+static void grpc_rb_md_ary_convert(VALUE md_ary_hash,
+ grpc_metadata_array *md_ary) {
VALUE md_ary_obj = Qnil;
if (md_ary_hash == Qnil) {
- return; /* Do nothing if the expected has value is nil */
+ return; /* Do nothing if the expected has value is nil */
}
if (TYPE(md_ary_hash) != T_HASH) {
rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
@@ -303,8 +305,8 @@ static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ar
/* Initialize the array, compute it's capacity, then fill it. */
grpc_metadata_array_init(md_ary);
- md_ary_obj = TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type,
- md_ary);
+ md_ary_obj =
+ TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata));
rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
@@ -327,16 +329,14 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) {
rb_hash_aset(result, key, value);
} else if (TYPE(value) == T_ARRAY) {
/* Add the string to the returned array */
- rb_ary_push(value,
- rb_str_new(md_ary->metadata[i].value,
- md_ary->metadata[i].value_length));
+ rb_ary_push(value, rb_str_new(md_ary->metadata[i].value,
+ md_ary->metadata[i].value_length));
} else {
/* Add the current value with this key and the new one to an array */
new_ary = rb_ary_new();
rb_ary_push(new_ary, value);
- rb_ary_push(new_ary,
- rb_str_new(md_ary->metadata[i].value,
- md_ary->metadata[i].value_length));
+ rb_ary_push(new_ary, rb_str_new(md_ary->metadata[i].value,
+ md_ary->metadata[i].value_length));
rb_hash_aset(result, key, new_ary);
}
}
@@ -355,7 +355,7 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
rb_obj_classname(key));
return ST_STOP;
}
- switch(NUM2INT(key)) {
+ switch (NUM2INT(key)) {
case GRPC_OP_SEND_INITIAL_METADATA:
case GRPC_OP_SEND_MESSAGE:
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
@@ -367,8 +367,7 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
rb_ary_push(ops_ary, key);
return ST_CONTINUE;
default:
- rb_raise(rb_eTypeError, "invalid operation : bad value %d",
- NUM2INT(key));
+ rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
};
return ST_STOP;
}
@@ -377,8 +376,8 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
struct to the 'send_status_from_server' portion of an op.
*/
static void grpc_rb_op_update_status_from_server(grpc_op *op,
- grpc_metadata_array* md_ary,
- VALUE status) {
+ grpc_metadata_array *md_ary,
+ VALUE status) {
VALUE code = rb_struct_aref(status, sym_code);
VALUE details = rb_struct_aref(status, sym_details);
VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
@@ -405,8 +404,8 @@ static void grpc_rb_op_update_status_from_server(grpc_op *op,
* grpc_rb_call_run_batch function */
typedef struct run_batch_stack {
/* The batch ops */
- grpc_op ops[8]; /* 8 is the maximum number of operations */
- size_t op_num; /* tracks the last added operation */
+ grpc_op ops[8]; /* 8 is the maximum number of operations */
+ size_t op_num; /* tracks the last added operation */
/* Data being sent */
grpc_metadata_array send_metadata;
@@ -424,7 +423,7 @@ typedef struct run_batch_stack {
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
-static void grpc_run_batch_stack_init(run_batch_stack* st) {
+static void grpc_run_batch_stack_init(run_batch_stack *st) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
@@ -435,7 +434,7 @@ static void grpc_run_batch_stack_init(run_batch_stack* st) {
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
* cleaned up */
-static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
+static void grpc_run_batch_stack_cleanup(run_batch_stack *st) {
grpc_metadata_array_destroy(&st->send_metadata);
grpc_metadata_array_destroy(&st->send_trailing_metadata);
grpc_metadata_array_destroy(&st->recv_metadata);
@@ -447,7 +446,7 @@ static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
/* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
* ops_hash */
-static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
+static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
VALUE this_op = Qnil;
VALUE this_value = Qnil;
VALUE ops_ary = rb_ary_new();
@@ -460,7 +459,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
this_op = rb_ary_entry(ops_ary, i);
this_value = rb_hash_aref(ops_hash, this_op);
- switch(NUM2INT(this_op)) {
+ switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* N.B. later there is no need to explicitly delete the metadata keys
* and values, they are references to data in ruby objects. */
@@ -471,18 +470,16 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
st->send_metadata.metadata;
break;
case GRPC_OP_SEND_MESSAGE:
- st->ops[st->op_num].data.send_message =
- grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
- RSTRING_LEN(this_value));
+ st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
+ RSTRING_PTR(this_value), RSTRING_LEN(this_value));
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
/* N.B. later there is no need to explicitly delete the metadata keys
* and values, they are references to data in ruby objects. */
- grpc_rb_op_update_status_from_server(&st->ops[st->op_num],
- &st->send_trailing_metadata,
- this_value);
+ grpc_rb_op_update_status_from_server(
+ &st->ops[st->op_num], &st->send_trailing_metadata, this_value);
break;
case GRPC_OP_RECV_INITIAL_METADATA:
st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata;
@@ -516,12 +513,12 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
/* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
after the results have run */
-static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
+static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
size_t i = 0;
VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
Qnil, Qnil, Qnil, Qnil, NULL);
for (i = 0; i < st->op_num; i++) {
- switch(st->ops[i].op) {
+ switch (st->ops[i].op) {
case GRPC_OP_SEND_INITIAL_METADATA:
rb_struct_aset(result, sym_send_metadata, Qtrue);
break;
@@ -544,13 +541,11 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
rb_struct_aset(
- result,
- sym_status,
- rb_struct_new(grpc_rb_sStatus,
- UINT2NUM(st->recv_status),
+ result, sym_status,
+ rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status),
(st->recv_status_details == NULL
- ? Qnil
- : rb_str_new2(st->recv_status_details)),
+ ? Qnil
+ : rb_str_new2(st->recv_status_details)),
grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
NULL));
break;
@@ -682,8 +677,7 @@ static void Init_grpc_error_codes() {
static void Init_grpc_op_codes() {
/* Constants representing operation type codes in grpc.h */
- VALUE grpc_rb_mCallOps =
- rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
+ VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
@@ -709,14 +703,14 @@ void Init_grpc_call() {
grpc_rb_eOutOfTime =
rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
- grpc_rb_cMdAry = rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray",
- rb_cObject);
+ grpc_rb_cMdAry =
+ rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
/* Prevent allocation or inialization of the Call class */
rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
- rb_define_method(grpc_rb_cCall, "initialize_copy",
- grpc_rb_cannot_init_copy, 1);
+ rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
+ 1);
/* Add ruby analogues of the Call methods. */
rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
@@ -746,16 +740,8 @@ void Init_grpc_call() {
/* The Struct used to return the run_batch result. */
grpc_rb_sBatchResult = rb_struct_define(
- "BatchResult",
- "send_message",
- "send_metadata",
- "send_close",
- "send_status",
- "message",
- "metadata",
- "status",
- "cancelled",
- NULL);
+ "BatchResult", "send_message", "send_metadata", "send_close",
+ "send_status", "message", "metadata", "status", "cancelled", NULL);
/* The hash for reference counting calls, to ensure they can't be destroyed
* more than once */