aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/call_op_string.c51
-rw-r--r--src/core/channel/census_filter.c106
-rw-r--r--src/core/channel/channel_stack.c75
-rw-r--r--src/core/channel/channel_stack.h91
-rw-r--r--src/core/channel/child_channel.c36
-rw-r--r--src/core/channel/child_channel.h5
-rw-r--r--src/core/channel/client_channel.c313
-rw-r--r--src/core/channel/connected_channel.c332
-rw-r--r--src/core/channel/http_client_filter.c144
-rw-r--r--src/core/channel/http_filter.c137
-rw-r--r--src/core/channel/http_filter.h43
-rw-r--r--src/core/channel/http_server_filter.c346
-rw-r--r--src/core/channel/metadata_buffer.c149
-rw-r--r--src/core/channel/metadata_buffer.h70
-rw-r--r--src/core/channel/noop_filter.c39
15 files changed, 519 insertions, 1418 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c
index 08f2e95deb..5f7e1be268 100644
--- a/src/core/channel/call_op_string.c
+++ b/src/core/channel/call_op_string.c
@@ -43,12 +43,27 @@
static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
gpr_strvec_add(b, gpr_strdup(" key="));
- gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
- GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
+ gpr_strvec_add(
+ b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
gpr_strvec_add(b, gpr_strdup(" value="));
gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
- GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT));
+ GPR_SLICE_LENGTH(md->value->slice),
+ GPR_HEXDUMP_PLAINTEXT));
+}
+
+static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
+ grpc_linked_mdelem *m;
+ for (m = md.list.head; m != NULL; m = m->next) {
+ put_metadata(b, m->md);
+ }
+ if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ char *tmp;
+ gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
+ md.deadline.tv_nsec);
+ gpr_strvec_add(b, tmp);
+ }
}
char *grpc_call_op_string(grpc_call_op *op) {
@@ -69,16 +84,7 @@ char *grpc_call_op_string(grpc_call_op *op) {
switch (op->type) {
case GRPC_SEND_METADATA:
gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
- put_metadata(&b, op->data.metadata);
- break;
- case GRPC_SEND_DEADLINE:
- gpr_asprintf(&tmp, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec,
- op->data.deadline.tv_nsec);
- gpr_strvec_add(&b, tmp);
- break;
- case GRPC_SEND_START:
- gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset);
- gpr_strvec_add(&b, tmp);
+ put_metadata_list(&b, op->data.metadata);
break;
case GRPC_SEND_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
@@ -94,15 +100,7 @@ char *grpc_call_op_string(grpc_call_op *op) {
break;
case GRPC_RECV_METADATA:
gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
- put_metadata(&b, op->data.metadata);
- break;
- case GRPC_RECV_DEADLINE:
- gpr_asprintf(&tmp, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec,
- op->data.deadline.tv_nsec);
- gpr_strvec_add(&b, tmp);
- break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- gpr_strvec_add(&b, gpr_strdup("RECV_END_OF_INITIAL_METADATA"));
+ put_metadata_list(&b, op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
@@ -113,12 +111,21 @@ char *grpc_call_op_string(grpc_call_op *op) {
case GRPC_RECV_FINISH:
gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
break;
+ case GRPC_RECV_SYNTHETIC_STATUS:
+ gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'",
+ op->data.synthetic_status.status,
+ op->data.synthetic_status.message);
+ gpr_strvec_add(&b, tmp);
+ break;
case GRPC_CANCEL_OP:
gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
break;
}
gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
gpr_strvec_add(&b, tmp);
+ if (op->bind_pollset) {
+ gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
+ }
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index ba7b7ba59c..7e393a01a6 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -49,6 +49,11 @@ typedef struct call_data {
census_op_id op_id;
census_rpc_stats stats;
gpr_timespec start_ts;
+
+ /* recv callback */
+ grpc_stream_op_buffer* recv_ops;
+ void (*on_done_recv)(void* user_data, int success);
+ void* recv_user_data;
} call_data;
typedef struct channel_data {
@@ -60,55 +65,68 @@ static void init_rpc_stats(census_rpc_stats* stats) {
stats->cnt = 1;
}
-static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
+static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
+ call_data* calld,
channel_data* chand) {
- if (op->data.metadata->key == chand->path_str) {
- gpr_log(GPR_DEBUG,
- (const char*)GPR_SLICE_START_PTR(op->data.metadata->value->slice));
- census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
- op->data.metadata->value->slice));
+ grpc_linked_mdelem* m;
+ size_t i;
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op* op = &sopb->ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
+ if (m->md->key == chand->path_str) {
+ gpr_log(GPR_DEBUG, "%s",
+ (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
+ m->md->value->slice));
+ }
+ }
}
}
-static void client_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
- GPR_ASSERT(calld != NULL);
- GPR_ASSERT(chand != NULL);
- GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_SEND_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_RECV_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (op->send_ops) {
+ extract_and_annotate_method_tag(op->send_ops, calld, chand);
}
- /* Always pass control up or down the stack depending on op->dir */
+}
+
+static void client_start_transport_op(grpc_call_element* elem,
+ grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+ client_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
-static void server_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void server_on_done_recv(void* ptr, int success) {
+ grpc_call_element* elem = ptr;
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
- GPR_ASSERT(calld != NULL);
- GPR_ASSERT(chand != NULL);
- GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_RECV_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_SEND_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (success) {
+ extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
}
- /* Always pass control up or down the stack depending on op->dir */
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ if (op->recv_ops) {
+ /* substitute our callback for the op callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_done_recv;
+ op->recv_user_data = elem;
+ }
+}
+
+static void server_start_transport_op(grpc_call_element* elem,
+ grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+ server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@@ -126,12 +144,14 @@ static void channel_op(grpc_channel_element* elem,
}
static void client_init_call_elem(grpc_call_element* elem,
- const void* server_transport_data) {
+ const void* server_transport_data,
+ grpc_transport_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
d->start_ts = gpr_now();
d->op_id = census_tracing_start_op();
+ if (initial_op) client_mutate_op(elem, initial_op);
}
static void client_destroy_call_elem(grpc_call_element* elem) {
@@ -142,12 +162,14 @@ static void client_destroy_call_elem(grpc_call_element* elem) {
}
static void server_init_call_elem(grpc_call_element* elem,
- const void* server_transport_data) {
+ const void* server_transport_data,
+ grpc_transport_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
d->start_ts = gpr_now();
d->op_id = census_tracing_start_op();
+ if (initial_op) server_mutate_op(elem, initial_op);
}
static void server_destroy_call_elem(grpc_call_element* elem) {
@@ -178,11 +200,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
- client_call_op, channel_op, sizeof(call_data),
+ client_start_transport_op, channel_op, sizeof(call_data),
client_init_call_elem, client_destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "census-client"};
+ init_channel_elem, destroy_channel_elem, "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
- server_call_op, channel_op, sizeof(call_data),
+ server_start_transport_op, channel_op, sizeof(call_data),
server_init_call_elem, server_destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "census-server"};
+ init_channel_elem, destroy_channel_elem, "census-server"};
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 21df9771ce..311f4f08ce 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -35,6 +35,7 @@
#include <grpc/support/log.h>
#include <stdlib.h>
+#include <string.h>
int grpc_trace_channel = 0;
@@ -77,9 +78,9 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
return size;
}
-#define CHANNEL_ELEMS_FROM_STACK(stk) \
- ((grpc_channel_element *)( \
- (char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack))))
+#define CHANNEL_ELEMS_FROM_STACK(stk) \
+ ((grpc_channel_element *)((char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE( \
+ sizeof(grpc_channel_stack))))
#define CALL_ELEMS_FROM_STACK(stk) \
((grpc_call_element *)((char *)(stk) + \
@@ -147,6 +148,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack) {
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
+ grpc_transport_op *initial_op,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
@@ -164,7 +166,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack,
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data);
+ call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data,
+ initial_op);
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
@@ -181,9 +184,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
}
}
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
- grpc_call_element *next_elem = elem + op->dir;
- next_elem->filter->call_op(next_elem, elem, op);
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
+ grpc_call_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_op(next_elem, op);
}
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
@@ -193,58 +196,24 @@ void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_channel_element *elem) {
- return (grpc_channel_stack *)((char *)(elem) -
- ROUND_UP_TO_ALIGNMENT_SIZE(
- sizeof(grpc_channel_stack)));
+ return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_channel_stack)));
}
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
- return (grpc_call_stack *)((char *)(elem) - ROUND_UP_TO_ALIGNMENT_SIZE(
- sizeof(grpc_call_stack)));
-}
-
-static void do_nothing(void *user_data, grpc_op_error error) {}
-
-void grpc_call_element_recv_metadata(grpc_call_element *cur_elem,
- grpc_mdelem *mdelem) {
- grpc_call_op metadata_op;
- metadata_op.type = GRPC_RECV_METADATA;
- metadata_op.dir = GRPC_CALL_UP;
- metadata_op.done_cb = do_nothing;
- metadata_op.user_data = NULL;
- metadata_op.flags = 0;
- metadata_op.data.metadata = mdelem;
- grpc_call_next_op(cur_elem, &metadata_op);
-}
-
-void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
- grpc_mdelem *mdelem) {
- grpc_call_op metadata_op;
- metadata_op.type = GRPC_SEND_METADATA;
- metadata_op.dir = GRPC_CALL_DOWN;
- metadata_op.done_cb = do_nothing;
- metadata_op.user_data = NULL;
- metadata_op.flags = 0;
- metadata_op.data.metadata = mdelem;
- grpc_call_next_op(cur_elem, &metadata_op);
+ return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
+ sizeof(grpc_call_stack)));
}
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
- grpc_call_op cancel_op;
- cancel_op.type = GRPC_CANCEL_OP;
- cancel_op.dir = GRPC_CALL_DOWN;
- cancel_op.done_cb = do_nothing;
- cancel_op.user_data = NULL;
- cancel_op.flags = 0;
- grpc_call_next_op(cur_elem, &cancel_op);
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ grpc_call_next_op(cur_elem, &op);
}
-void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
- grpc_call_op finish_op;
- finish_op.type = GRPC_SEND_FINISH;
- finish_op.dir = GRPC_CALL_DOWN;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- finish_op.flags = 0;
- grpc_call_next_op(cur_elem, &finish_op);
+void grpc_call_element_recv_status(grpc_call_element *cur_elem,
+ grpc_status_code status,
+ const char *message) {
+ abort();
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index ef1da7b33b..de0e4e4518 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,82 +51,11 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
-/* Call operations - things that can be sent and received.
-
- Threading:
- SEND, RECV, and CANCEL ops can be active on a call at the same time, but
- only one SEND, one RECV, and one CANCEL can be active at a time.
-
- If state is shared between send/receive/cancel operations, it is up to
- filters to provide their own protection around that. */
-typedef enum {
- /* send metadata to the channels peer */
- GRPC_SEND_METADATA,
- /* send a deadline */
- GRPC_SEND_DEADLINE,
- /* start a connection (corresponds to start_invoke/accept) */
- GRPC_SEND_START,
- /* send a message to the channels peer */
- GRPC_SEND_MESSAGE,
- /* send a pre-formatted message to the channels peer */
- GRPC_SEND_PREFORMATTED_MESSAGE,
- /* send half-close to the channels peer */
- GRPC_SEND_FINISH,
- /* request that more data be allowed through flow control */
- GRPC_REQUEST_DATA,
- /* metadata was received from the channels peer */
- GRPC_RECV_METADATA,
- /* receive a deadline */
- GRPC_RECV_DEADLINE,
- /* the end of the first batch of metadata was received */
- GRPC_RECV_END_OF_INITIAL_METADATA,
- /* a message was received from the channels peer */
- GRPC_RECV_MESSAGE,
- /* half-close was received from the channels peer */
- GRPC_RECV_HALF_CLOSE,
- /* full close was received from the channels peer */
- GRPC_RECV_FINISH,
- /* the call has been abnormally terminated */
- GRPC_CANCEL_OP
-} grpc_call_op_type;
-
/* The direction of the call.
The values of the enums (1, -1) matter here - they are used to increment
or decrement a pointer to find the next element to call */
typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
-/* A single filterable operation to be performed on a call */
-typedef struct {
- /* The type of operation we're performing */
- grpc_call_op_type type;
- /* The directionality of this call - does the operation begin at the bottom
- of the stack and flow up, or does the operation start at the top of the
- stack and flow down through the filters. */
- grpc_call_dir dir;
-
- /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */
- gpr_uint32 flags;
-
- /* Argument data, matching up with grpc_call_op_type names */
- union {
- struct {
- grpc_pollset *pollset;
- } start;
- grpc_byte_buffer *message;
- grpc_mdelem *metadata;
- gpr_timespec deadline;
- } data;
-
- /* Must be called when processing of this call-op is complete.
- Signature chosen to match transport flow control callbacks */
- void (*done_cb)(void *user_data, grpc_op_error error);
- /* User data to be passed into done_cb */
- void *user_data;
-} grpc_call_op;
-
-/* returns a string representation of op, that can be destroyed with gpr_free */
-char *grpc_call_op_string(grpc_call_op *op);
-
typedef enum {
/* send a goaway message to remote channels indicating that we are going
to disconnect in the future */
@@ -174,8 +103,7 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op);
+ void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@@ -193,7 +121,8 @@ typedef struct {
transport and is on the server. Most filters want to ignore this
argument.*/
void (*init_call_elem)(grpc_call_element *elem,
- const void *server_transport_data);
+ const void *server_transport_data,
+ grpc_transport_op *initial_op);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_call_element *elem);
@@ -272,12 +201,13 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack);
server. */
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
+ grpc_transport_op *initial_op,
grpc_call_stack *call_stack);
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);
-/* Call the next operation (depending on call directionality) in a call stack */
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op);
+/* Call the next operation in a call stack */
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
@@ -289,18 +219,13 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_call_op *op);
+ grpc_call_element *elem, grpc_transport_op *op);
-void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
- grpc_mdelem *elem);
-void grpc_call_element_recv_metadata(grpc_call_element *cur_elem,
- grpc_mdelem *elem);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
-void grpc_call_element_send_finish(grpc_call_element *cur_elem);
extern int grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
if (grpc_trace_channel) grpc_call_log_op(sev, elem, op)
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_STACK_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_STACK_H */
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 2cb03829c7..a2f3c54290 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -60,23 +60,11 @@ typedef struct {
gpr_uint8 sent_farewell;
} lb_channel_data;
-typedef struct {
- grpc_call_element *back;
- grpc_child_channel *channel;
-} lb_call_data;
-
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- lb_call_data *calld = elem->call_data;
+typedef struct { grpc_child_channel *channel; } lb_call_data;
- switch (op->dir) {
- case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_call_next_op(elem, op);
- break;
- }
+static void lb_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ grpc_call_next_op(elem, op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -132,7 +120,8 @@ static void lb_channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void lb_init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {}
/* Destructor for call_data */
static void lb_destroy_call_elem(grpc_call_element *elem) {}
@@ -165,9 +154,10 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_child_channel_top_filter = {
- lb_call_op, lb_channel_op, sizeof(lb_call_data),
- lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
- lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", };
+ lb_start_transport_op, lb_channel_op, sizeof(lb_call_data),
+ lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
+ lb_init_channel_elem, lb_destroy_channel_elem, "child-channel",
+};
/* grpc_child_channel proper */
@@ -272,17 +262,17 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel,
}
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
- grpc_call_element *parent) {
+ grpc_call_element *parent,
+ grpc_transport_op *initial_op) {
grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
grpc_call_element *lbelem;
lb_call_data *lbcalld;
lb_channel_data *lbchand;
- grpc_call_stack_init(channel, NULL, stk);
+ grpc_call_stack_init(channel, NULL, initial_op, stk);
lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
lbchand = lbelem->channel_data;
lbcalld = lbelem->call_data;
- lbcalld->back = parent;
lbcalld->channel = channel;
gpr_mu_lock(&lbchand->mu);
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
index 38695402ab..556a1c731c 100644
--- a/src/core/channel/child_channel.h
+++ b/src/core/channel/child_channel.h
@@ -57,8 +57,9 @@ void grpc_child_channel_destroy(grpc_child_channel *channel,
int wait_for_callbacks);
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
- grpc_call_element *parent);
+ grpc_call_element *parent,
+ grpc_transport_op *initial_op);
grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
void grpc_child_call_destroy(grpc_child_call *call);
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9791f98be8..78f8d06d89 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -38,7 +38,6 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
@@ -59,6 +58,7 @@ typedef struct {
/* the sending child (may be null) */
grpc_child_channel *active_child;
+ grpc_mdctx *mdctx;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
@@ -70,9 +70,6 @@ typedef struct {
int transport_setup_initiated;
grpc_channel_args *args;
-
- /* metadata cache */
- grpc_mdelem *cancel_status;
} channel_data;
typedef enum {
@@ -87,19 +84,17 @@ struct call_data {
grpc_call_element *elem;
call_state state;
- grpc_metadata_buffer pending_metadata;
gpr_timespec deadline;
union {
struct {
/* our child call stack */
grpc_child_call *child_call;
} active;
+ grpc_transport_op waiting_op;
struct {
- void (*on_complete)(void *user_data, grpc_op_error error);
- void *on_complete_user_data;
- gpr_uint32 start_flags;
- grpc_pollset *pollset;
- } waiting;
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
+ } cancelled;
} s;
};
@@ -113,89 +108,23 @@ static int prepare_activate(grpc_call_element *elem,
calld->state = CALL_ACTIVE;
/* create a child call */
- calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
+ /* TODO(ctiller): pass the waiting op down here */
+ calld->s.active.child_call =
+ grpc_child_channel_create_call(on_child, elem, NULL);
return 1;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
-static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
GPR_ASSERT(calld->state == CALL_ACTIVE);
- /* sending buffered metadata down the stack before the start call */
- grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem);
-
- if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) {
- grpc_call_op dop;
- dop.type = GRPC_SEND_DEADLINE;
- dop.dir = GRPC_CALL_DOWN;
- dop.flags = 0;
- dop.data.deadline = calld->deadline;
- dop.done_cb = do_nothing;
- dop.user_data = NULL;
- child_elem->filter->call_op(child_elem, elem, &dop);
- }
-
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
- child_elem->filter->call_op(child_elem, elem, op);
-}
-
-static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu);
- if (calld->state == CALL_CANCELLED) {
- gpr_mu_unlock(&chand->mu);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
- return;
- }
- GPR_ASSERT(calld->state == CALL_CREATED);
- calld->state = CALL_WAITING;
- if (chand->active_child) {
- /* channel is connected - use the connected stack */
- if (prepare_activate(elem, chand->active_child)) {
- gpr_mu_unlock(&chand->mu);
- /* activate the request (pass it down) outside the lock */
- complete_activate(elem, op);
- } else {
- gpr_mu_unlock(&chand->mu);
- }
- } else {
- /* check to see if we should initiate a connection (if we're not already),
- but don't do so until outside the lock to avoid re-entrancy problems if
- the callback is immediate */
- int initiate_transport_setup = 0;
- if (!chand->transport_setup_initiated) {
- chand->transport_setup_initiated = 1;
- initiate_transport_setup = 1;
- }
- /* add this call to the waiting set to be resumed once we have a child
- channel stack, growing the waiting set if needed */
- if (chand->waiting_child_count == chand->waiting_child_capacity) {
- chand->waiting_child_capacity =
- GPR_MAX(chand->waiting_child_capacity * 2, 8);
- chand->waiting_children =
- gpr_realloc(chand->waiting_children,
- chand->waiting_child_capacity * sizeof(call_data *));
- }
- calld->s.waiting.on_complete = op->done_cb;
- calld->s.waiting.on_complete_user_data = op->user_data;
- calld->s.waiting.start_flags = op->flags;
- calld->s.waiting.pollset = op->data.start.pollset;
- chand->waiting_children[chand->waiting_child_count++] = calld;
- gpr_mu_unlock(&chand->mu);
-
- /* finally initiate transport setup if needed */
- if (initiate_transport_setup) {
- grpc_transport_setup_initiate(chand->transport_setup);
- }
- }
+ child_elem->filter->start_transport_op(child_elem, op);
}
static void remove_waiting_child(channel_data *chand, call_data *calld) {
@@ -210,94 +139,128 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
chand->waiting_child_count = new_count;
}
-static void send_up_cancelled_ops(grpc_call_element *elem) {
- grpc_call_op finish_op;
+static void handle_op_after_cancellation(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- /* send up a synthesized status */
- finish_op.type = GRPC_RECV_METADATA;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
- /* send up a finish */
- finish_op.type = GRPC_RECV_FINISH;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+ if (op->send_ops) {
+ op->on_done_send(op->send_user_data, 0);
+ }
+ if (op->recv_ops) {
+ char status[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa(GRPC_STATUS_CANCELLED, status);
+ calld->s.cancelled.status.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
+ calld->s.cancelled.details.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
+ calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
+ calld->s.cancelled.status.next = &calld->s.cancelled.details;
+ calld->s.cancelled.details.prev = &calld->s.cancelled.status;
+ mdb.list.head = &calld->s.cancelled.status;
+ mdb.list.tail = &calld->s.cancelled.details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future;
+ grpc_sopb_add_metadata(op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv(op->recv_user_data, 1);
+ }
}
-static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void cc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
+ grpc_transport_op waiting_op;
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
gpr_mu_lock(&chand->mu);
switch (calld->state) {
case CALL_ACTIVE:
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
- child_elem->filter->call_op(child_elem, elem, op);
- return; /* early out */
- case CALL_WAITING:
- remove_waiting_child(chand, calld);
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
- calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data,
- GRPC_OP_ERROR);
- return; /* early out */
- case CALL_CREATED:
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
- return; /* early out */
- case CALL_CANCELLED:
- gpr_mu_unlock(&chand->mu);
- return; /* early out */
- }
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
-}
-
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- call_data *calld = elem->call_data;
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_metadata_buffer_queue(&calld->pending_metadata, op);
- break;
- case GRPC_SEND_DEADLINE:
- calld->deadline = op->data.deadline;
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_SEND_START:
- /* filter out the start event to find which child to send on */
- start_rpc(elem, op);
+ child_elem->filter->start_transport_op(child_elem, op);
break;
- case GRPC_CANCEL_OP:
- cancel_rpc(elem, op);
+ case CALL_CREATED:
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
+ } else {
+ calld->state = CALL_WAITING;
+ if (chand->active_child) {
+ /* channel is connected - use the connected stack */
+ if (prepare_activate(elem, chand->active_child)) {
+ gpr_mu_unlock(&chand->mu);
+ /* activate the request (pass it down) outside the lock */
+ complete_activate(elem, op);
+ } else {
+ gpr_mu_unlock(&chand->mu);
+ }
+ } else {
+ /* check to see if we should initiate a connection (if we're not
+ already),
+ but don't do so until outside the lock to avoid re-entrancy
+ problems if
+ the callback is immediate */
+ int initiate_transport_setup = 0;
+ if (!chand->transport_setup_initiated) {
+ chand->transport_setup_initiated = 1;
+ initiate_transport_setup = 1;
+ }
+ /* add this call to the waiting set to be resumed once we have a child
+ channel stack, growing the waiting set if needed */
+ if (chand->waiting_child_count == chand->waiting_child_capacity) {
+ chand->waiting_child_capacity =
+ GPR_MAX(chand->waiting_child_capacity * 2, 8);
+ chand->waiting_children = gpr_realloc(
+ chand->waiting_children,
+ chand->waiting_child_capacity * sizeof(call_data *));
+ }
+ calld->s.waiting_op = *op;
+ chand->waiting_children[chand->waiting_child_count++] = calld;
+ gpr_mu_unlock(&chand->mu);
+
+ /* finally initiate transport setup if needed */
+ if (initiate_transport_setup) {
+ grpc_transport_setup_initiate(chand->transport_setup);
+ }
+ }
+ }
break;
- case GRPC_SEND_MESSAGE:
- case GRPC_SEND_FINISH:
- case GRPC_REQUEST_DATA:
- if (calld->state == CALL_ACTIVE) {
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
- child_elem->filter->call_op(child_elem, elem, op);
+ case CALL_WAITING:
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ waiting_op = calld->s.waiting_op;
+ remove_waiting_child(chand, calld);
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, &waiting_op);
+ handle_op_after_cancellation(elem, op);
} else {
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
+ (op->send_ops == NULL));
+ GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
+ (op->recv_ops == NULL));
+ if (op->send_ops) {
+ calld->s.waiting_op.send_ops = op->send_ops;
+ calld->s.waiting_op.is_last_send = op->is_last_send;
+ calld->s.waiting_op.on_done_send = op->on_done_send;
+ calld->s.waiting_op.send_user_data = op->send_user_data;
+ }
+ if (op->recv_ops) {
+ calld->s.waiting_op.recv_ops = op->recv_ops;
+ calld->s.waiting_op.recv_state = op->recv_state;
+ calld->s.waiting_op.on_done_recv = op->on_done_recv;
+ calld->s.waiting_op.recv_user_data = op->recv_user_data;
+ }
+ gpr_mu_unlock(&chand->mu);
}
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
break;
}
}
@@ -382,39 +345,33 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void error_bad_on_complete(void *arg, grpc_op_error error) {
- gpr_log(GPR_ERROR,
- "Waiting finished but not started? Bad on_complete callback");
- abort();
-}
-
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
+ /* TODO(ctiller): is there something useful we can do here? */
+ GPR_ASSERT(initial_op == NULL);
+
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->s.waiting.on_complete = error_bad_on_complete;
- calld->s.waiting.on_complete_user_data = NULL;
- grpc_metadata_buffer_init(&calld->pending_metadata);
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
- /* if the metadata buffer is not flushed, destroy it here. */
- grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
+ GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@@ -423,7 +380,6 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
channel_data *chand = elem->channel_data;
- char temp[GPR_LTOA_MIN_BUFSIZE];
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
@@ -437,10 +393,7 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->transport_setup = NULL;
chand->transport_setup_initiated = 0;
chand->args = grpc_channel_args_copy(args);
-
- gpr_ltoa(GRPC_STATUS_CANCELLED, temp);
- chand->cancel_status =
- grpc_mdelem_from_strings(metadata_context, "grpc-status", temp);
+ chand->mdctx = metadata_context;
}
/* Destructor for channel_data */
@@ -455,7 +408,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
grpc_channel_args_destroy(chand->args);
- grpc_mdelem_unref(chand->cancel_status);
gpr_mu_destroy(&chand->mu);
GPR_ASSERT(chand->waiting_child_count == 0);
@@ -463,9 +415,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "client-channel", };
+ cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "client-channel",
+};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
grpc_channel_stack *channel_stack, grpc_transport *transport,
@@ -481,7 +434,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
- grpc_call_op *call_ops;
+ grpc_transport_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@@ -517,19 +470,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
- call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+ call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
- call_ops[i].type = GRPC_SEND_START;
- call_ops[i].dir = GRPC_CALL_DOWN;
- call_ops[i].flags = waiting_children[i]->s.waiting.start_flags;
- call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete;
- call_ops[i].user_data =
- waiting_children[i]->s.waiting.on_complete_user_data;
- call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset;
+ call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
- call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(&call_ops[i]);
}
}
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 62611e08f3..14dda88698 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -45,26 +45,12 @@
#include <grpc/support/slice_buffer.h>
#define MAX_BUFFER_LENGTH 8192
-/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
typedef struct connected_channel_channel_data {
grpc_transport *transport;
- gpr_uint32 max_message_length;
} channel_data;
-typedef struct connected_channel_call_data {
- grpc_call_element *elem;
- grpc_stream_op_buffer outgoing_sopb;
-
- gpr_uint32 max_message_length;
- gpr_uint32 incoming_message_length;
- gpr_uint8 reading_message;
- gpr_uint8 got_metadata_boundary;
- gpr_uint8 got_read_close;
- gpr_slice_buffer incoming_message;
- gpr_uint32 outgoing_buffer_length_estimate;
-} call_data;
+typedef struct connected_channel_call_data { void *unused; } call_data;
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -73,98 +59,17 @@ typedef struct connected_channel_call_data {
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
(((call_data *)(transport_stream)) - 1)
-/* Copy the contents of a byte buffer into stream ops */
-static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
- grpc_stream_op_buffer *sopb) {
- size_t i;
-
- switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
- gpr_slice_ref(slice);
- grpc_sopb_add_slice(sopb, slice);
- }
- break;
- }
-}
-
-/* Flush queued stream operations onto the transport */
-static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
- call_data *calld, int is_last) {
- size_t nops;
-
- if (op->flags & GRPC_WRITE_BUFFER_HINT) {
- if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
- op->done_cb(op->user_data, GRPC_OP_OK);
- return;
- }
- }
-
- calld->outgoing_buffer_length_estimate = 0;
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
-
- nops = calld->outgoing_sopb.nops;
- calld->outgoing_sopb.nops = 0;
- grpc_transport_send_batch(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- calld->outgoing_sopb.ops, nops, is_last);
-}
-
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void con_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
- op->user_data);
- break;
- case GRPC_SEND_DEADLINE:
- grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline);
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
- op->user_data);
- break;
- case GRPC_SEND_START:
- grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
- grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_MESSAGE:
- grpc_sopb_add_begin_message(&calld->outgoing_sopb,
- grpc_byte_buffer_length(op->data.message),
- op->flags);
- /* fall-through */
- case GRPC_SEND_PREFORMATTED_MESSAGE:
- copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
- calld->outgoing_buffer_length_estimate +=
- (5 + grpc_byte_buffer_length(op->data.message));
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_FINISH:
- end_bufferable_op(op, chand, calld, 1);
- break;
- case GRPC_REQUEST_DATA:
- /* re-arm window updates if they were disarmed by finish_message */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
- break;
- case GRPC_CANCEL_OP:
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_CANCELLED);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -190,24 +95,16 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- calld->elem = elem;
- grpc_sopb_init(&calld->outgoing_sopb);
-
- calld->reading_message = 0;
- calld->got_metadata_boundary = 0;
- calld->got_read_close = 0;
- calld->outgoing_buffer_length_estimate = 0;
- calld->max_message_length = chand->max_message_length;
- gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- server_transport_data);
+ server_transport_data, initial_op);
GPR_ASSERT(r == 0);
}
@@ -216,8 +113,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_sopb_destroy(&calld->outgoing_sopb);
- gpr_slice_buffer_destroy(&calld->incoming_message);
grpc_transport_destroy_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
@@ -227,28 +122,10 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
- size_t i;
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
-
- cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
- if (args) {
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
- if (args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else if (args->args[i].value.integer < 0) {
- gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else {
- cd->max_message_length = args->args[i].value.integer;
- }
- }
- }
- }
}
/* Destructor for channel_data */
@@ -259,14 +136,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "connected", };
-
-static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
- grpc_stream *stream, size_t size_hint) {
- return gpr_slice_malloc(size_hint);
-}
+ con_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "connected",
+};
/* Transport callback to accept a new stream... calls up to handle it */
static void accept_stream(void *user_data, grpc_transport *transport,
@@ -285,183 +158,6 @@ static void accept_stream(void *user_data, grpc_transport *transport,
channel_op(elem, NULL, &op);
}
-static void recv_error(channel_data *chand, call_data *calld, int line,
- const char *message) {
- gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
-
- if (chand->transport) {
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_INVALID_ARGUMENT);
- }
-}
-
-static void do_nothing(void *calldata, grpc_op_error error) {}
-
-static void finish_message(channel_data *chand, call_data *calld) {
- grpc_call_element *elem = calld->elem;
- grpc_call_op call_op;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- /* if we got all the bytes for this message, call up the stack */
- call_op.type = GRPC_RECV_MESSAGE;
- call_op.done_cb = do_nothing;
- /* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.data.message = grpc_byte_buffer_create(
- calld->incoming_message.slices, calld->incoming_message.count);
- gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
-
- /* disable window updates until we get a request more from above */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
-
- GPR_ASSERT(calld->incoming_message.count == 0);
- calld->reading_message = 0;
- grpc_call_next_op(elem, &call_op);
-}
-
-/* Handle incoming stream ops from the transport, translating them into
- call_ops to pass up the call stack */
-static void recv_batch(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops,
- size_t ops_count, grpc_stream_state final_state) {
- call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
- grpc_call_element *elem = calld->elem;
- channel_data *chand = elem->channel_data;
- grpc_stream_op *stream_op;
- grpc_call_op call_op;
- size_t i;
- gpr_uint32 length;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- for (i = 0; i < ops_count; i++) {
- stream_op = ops + i;
- switch (stream_op->type) {
- case GRPC_OP_FLOW_CTL_CB:
- gpr_log(GPR_ERROR,
- "should not receive flow control ops from transport");
- abort();
- break;
- case GRPC_NO_OP:
- break;
- case GRPC_OP_METADATA:
- call_op.type = GRPC_RECV_METADATA;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.data.metadata = stream_op->data.metadata;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- break;
- case GRPC_OP_DEADLINE:
- call_op.type = GRPC_RECV_DEADLINE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.data.deadline = stream_op->data.deadline;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- break;
- case GRPC_OP_METADATA_BOUNDARY:
- if (!calld->got_metadata_boundary) {
- calld->got_metadata_boundary = 1;
- call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- /* can't begin a message when we're still reading a message */
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Message terminated early; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- }
- /* stash away parameters, and prepare for incoming slices */
- length = stream_op->data.begin_message.length;
- if (length > calld->max_message_length) {
- char *message = NULL;
- gpr_asprintf(
- &message,
- "Maximum message length of %d exceeded by a message of length %d",
- calld->max_message_length, length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- } else if (length > 0) {
- calld->reading_message = 1;
- calld->incoming_message_length = length;
- } else {
- finish_message(chand, calld);
- }
- break;
- case GRPC_OP_SLICE:
- if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
- gpr_slice_unref(stream_op->data.slice);
- break;
- }
- /* we have to be reading a message to know what to do here */
- if (!calld->reading_message) {
- recv_error(chand, calld, __LINE__,
- "Received payload data while not reading a message");
- return;
- }
- /* append the slice to the incoming buffer */
- gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
- if (calld->incoming_message.length > calld->incoming_message_length) {
- /* if we got too many bytes, complain */
- char *message = NULL;
- gpr_asprintf(&message,
- "Receiving message overflow; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- } else if (calld->incoming_message.length ==
- calld->incoming_message_length) {
- finish_message(chand, calld);
- }
- }
- }
- /* if the stream closed, then call up the stack to let it know */
- if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
- final_state == GRPC_STREAM_CLOSED)) {
- calld->got_read_close = 1;
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Last message truncated; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- }
- call_op.type = GRPC_RECV_HALF_CLOSE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
- if (final_state == GRPC_STREAM_CLOSED) {
- call_op.type = GRPC_RECV_FINISH;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
-}
-
static void transport_goaway(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug) {
/* transport got goaway ==> call up and handle it */
@@ -494,8 +190,8 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
}
const grpc_transport_callbacks connected_channel_transport_callbacks = {
- alloc_recv_buffer, accept_stream, recv_batch,
- transport_goaway, transport_closed, };
+ accept_stream, transport_goaway, transport_closed,
+};
grpc_transport_setup_result grpc_connected_channel_bind_transport(
grpc_channel_stack *channel_stack, grpc_transport *transport) {
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 3ccc39b717..9805f325a6 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -35,7 +35,16 @@
#include <grpc/support/log.h>
typedef struct call_data {
- int sent_headers;
+ grpc_linked_mdelem method;
+ grpc_linked_mdelem scheme;
+ grpc_linked_mdelem te_trailers;
+ grpc_linked_mdelem content_type;
+ int sent_initial_metadata;
+
+ int got_initial_metadata;
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -49,62 +58,78 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
+static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ if (md == channeld->status) {
+ return NULL;
+ } else if (md->key == channeld->status->key) {
+ grpc_call_element_send_cancel(elem);
+ return NULL;
+ }
+ return md;
+}
- ignore_unused(calld);
+static void hc_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
- switch (op->type) {
- case GRPC_SEND_METADATA:
- if (!calld->sent_headers) {
- /* Send : prefixed headers, which have to be before any application
- * layer headers. */
- calld->sent_headers = 1;
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method));
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme));
- }
- grpc_call_next_op(elem, op);
- break;
- case GRPC_SEND_START:
- if (!calld->sent_headers) {
- /* Send : prefixed headers, if we haven't already */
- calld->sent_headers = 1;
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method));
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme));
- }
- /* Send non : prefixed headers */
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->te_trailers));
- grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->content_type));
- grpc_call_next_op(elem, op);
- break;
- case GRPC_RECV_METADATA:
- if (op->data.metadata == channeld->status) {
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- } else if (op->data.metadata->key == channeld->status->key) {
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- } else {
- grpc_call_next_op(elem, op);
- }
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
+static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ size_t i;
+ if (op->send_ops && !calld->sent_initial_metadata) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_initial_metadata = 1;
+ /* Send : prefixed headers, which have to be before any application
+ layer headers. */
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
+ grpc_mdelem_ref(channeld->method));
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
+ grpc_mdelem_ref(channeld->scheme));
+ grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers,
+ grpc_mdelem_ref(channeld->te_trailers));
+ grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
+ grpc_mdelem_ref(channeld->content_type));
break;
+ }
+ }
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hc_on_recv;
+ op->recv_user_data = elem;
}
}
+static void hc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hc_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
+}
+
/* Called on special channel events, such as disconnection or new incoming
calls on the server */
static void channel_op(grpc_channel_element *elem,
@@ -124,15 +149,12 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
- /* grab pointers to our data from the call element */
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- /* initialize members */
- calld->sent_headers = 0;
+ calld->sent_initial_metadata = 0;
+ calld->got_initial_metadata = 0;
+ if (initial_op) hc_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -194,6 +216,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http-client"};
+ hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "http-client"};
diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c
deleted file mode 100644
index 453a0422d8..0000000000
--- a/src/core/channel/http_filter.c
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/http_filter.h"
-#include <grpc/support/log.h>
-
-typedef struct call_data {
- int unused; /* C89 requires at least one struct element */
-} call_data;
-
-typedef struct channel_data {
- int unused; /* C89 requires at least one struct element */
-} channel_data;
-
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- ignore_unused(calld);
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
-}
-
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
-/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- /* initialize members */
- calld->unused = channeld->unused;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(calld);
- ignore_unused(channeld);
-}
-
-/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- /* The first and the last filters tend to be implemented differently to
- handle the case that there's no 'next' filter to call on the up or down
- path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
-
- /* initialize members */
- channeld->unused = 0;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-}
-
-const grpc_channel_filter grpc_http_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http"};
diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h
deleted file mode 100644
index 1b116ad61f..0000000000
--- a/src/core/channel/http_filter.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* Processes metadata that is common to both client and server for HTTP2
- transports. */
-extern const grpc_channel_filter grpc_http_filter;
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index f565cbf3ae..1f64df68e3 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -38,25 +38,22 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef enum { NOT_RECEIVED, POST, GET } known_method_type;
-
-typedef struct {
- grpc_mdelem *path;
- grpc_mdelem *content_type;
- grpc_byte_buffer *content;
-} gettable;
-
typedef struct call_data {
- known_method_type seen_method;
+ gpr_uint8 got_initial_metadata;
+ gpr_uint8 seen_path;
+ gpr_uint8 seen_post;
gpr_uint8 sent_status;
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
- grpc_mdelem *path;
+ grpc_linked_mdelem status;
+
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
grpc_mdelem *te_trailers;
- grpc_mdelem *method_get;
grpc_mdelem *method_post;
grpc_mdelem *http_scheme;
grpc_mdelem *https_scheme;
@@ -70,148 +67,100 @@ typedef struct channel_data {
grpc_mdstr *host_key;
grpc_mdctx *mdctx;
-
- size_t gettable_count;
- gettable *gettables;
} channel_data;
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
-/* Handle 'GET': not technically grpc, so probably a web browser hitting
- us */
-static void payload_done(void *elem, grpc_op_error error) {
- if (error == GRPC_OP_OK) {
- grpc_call_element_send_finish(elem);
- }
-}
-
-static void handle_get(grpc_call_element *elem) {
+static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_call_op op;
- size_t i;
- for (i = 0; i < channeld->gettable_count; i++) {
- if (channeld->gettables[i].path == calld->path) {
- grpc_call_element_send_metadata(elem,
- grpc_mdelem_ref(channeld->status_ok));
- grpc_call_element_send_metadata(
- elem, grpc_mdelem_ref(channeld->gettables[i].content_type));
- op.type = GRPC_SEND_PREFORMATTED_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.data.message = channeld->gettables[i].content;
- op.done_cb = payload_done;
- op.user_data = elem;
- grpc_call_next_op(elem, &op);
+ /* Check if it is one of the headers we care about. */
+ if (md == channeld->te_trailers || md == channeld->method_post ||
+ md == channeld->http_scheme || md == channeld->https_scheme ||
+ md == channeld->grpc_scheme || md == channeld->content_type) {
+ /* swallow it */
+ if (md == channeld->method_post) {
+ calld->seen_post = 1;
+ } else if (md->key == channeld->http_scheme->key) {
+ calld->seen_scheme = 1;
+ } else if (md == channeld->te_trailers) {
+ calld->seen_te_trailers = 1;
}
+ /* TODO(klempner): Track that we've seen all the headers we should
+ require */
+ return NULL;
+ } else if (md->key == channeld->content_type->key) {
+ if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) ==
+ 0) {
+ /* Although the C implementation doesn't (currently) generate them,
+ any custom +-suffix is explicitly valid. */
+ /* TODO(klempner): We should consider preallocating common values such
+ as +proto or +json, or at least stashing them if we see them. */
+ /* TODO(klempner): Should we be surfacing this to application code? */
+ } else {
+ /* TODO(klempner): We're currently allowing this, but we shouldn't
+ see it without a proxy so log for now. */
+ gpr_log(GPR_INFO, "Unexpected content-type %s",
+ channeld->content_type->key);
+ }
+ return NULL;
+ } else if (md->key == channeld->te_trailers->key ||
+ md->key == channeld->method_post->key ||
+ md->key == channeld->http_scheme->key ||
+ md->key == channeld->content_type->key) {
+ gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
+ /* swallow it and error everything out. */
+ /* TODO(klempner): We ought to generate more descriptive error messages
+ on the wire here. */
+ grpc_call_element_send_cancel(elem);
+ return NULL;
+ } else if (md->key == channeld->path_key) {
+ if (calld->seen_path) {
+ gpr_log(GPR_ERROR, "Received :path twice");
+ return NULL;
+ }
+ calld->seen_path = 1;
+ return md;
+ } else if (md->key == channeld->host_key) {
+ /* translate host to :authority since :authority may be
+ omitted */
+ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
+ channeld->mdctx, grpc_mdstr_ref(channeld->authority_key),
+ grpc_mdstr_ref(md->value));
+ grpc_mdelem_unref(md);
+ return authority;
+ } else {
+ return md;
}
- grpc_call_element_send_metadata(elem,
- grpc_mdelem_ref(channeld->status_not_found));
- grpc_call_element_send_finish(elem);
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
+static void hs_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_RECV_METADATA:
- /* Check if it is one of the headers we care about. */
- if (op->data.metadata == channeld->te_trailers ||
- op->data.metadata == channeld->method_get ||
- op->data.metadata == channeld->method_post ||
- op->data.metadata == channeld->http_scheme ||
- op->data.metadata == channeld->https_scheme ||
- op->data.metadata == channeld->grpc_scheme ||
- op->data.metadata == channeld->content_type) {
- /* swallow it */
- if (op->data.metadata == channeld->method_get) {
- calld->seen_method = GET;
- } else if (op->data.metadata == channeld->method_post) {
- calld->seen_method = POST;
- } else if (op->data.metadata->key == channeld->http_scheme->key) {
- calld->seen_scheme = 1;
- } else if (op->data.metadata == channeld->te_trailers) {
- calld->seen_te_trailers = 1;
- }
- /* TODO(klempner): Track that we've seen all the headers we should
- require */
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- } else if (op->data.metadata->key == channeld->content_type->key) {
- if (strncmp(grpc_mdstr_as_c_string(op->data.metadata->value),
- "application/grpc+", 17) == 0) {
- /* Although the C implementation doesn't (currently) generate them,
- any
- custom +-suffix is explicitly valid. */
- /* TODO(klempner): We should consider preallocating common values such
- as +proto or +json, or at least stashing them if we see them. */
- /* TODO(klempner): Should we be surfacing this to application code? */
- } else {
- /* TODO(klempner): We're currently allowing this, but we shouldn't
- see it without a proxy so log for now. */
- gpr_log(GPR_INFO, "Unexpected content-type %s",
- channeld->content_type->key);
- }
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- } else if (op->data.metadata->key == channeld->te_trailers->key ||
- op->data.metadata->key == channeld->method_post->key ||
- op->data.metadata->key == channeld->http_scheme->key ||
- op->data.metadata->key == channeld->content_type->key) {
- gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
- grpc_mdstr_as_c_string(op->data.metadata->key),
- grpc_mdstr_as_c_string(op->data.metadata->value));
- /* swallow it and error everything out. */
- /* TODO(klempner): We ought to generate more descriptive error messages
- on the wire here. */
- grpc_mdelem_unref(op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- } else if (op->data.metadata->key == channeld->path_key) {
- if (calld->path != NULL) {
- gpr_log(GPR_ERROR, "Received :path twice");
- grpc_mdelem_unref(calld->path);
- }
- calld->path = op->data.metadata;
- op->done_cb(op->user_data, GRPC_OP_OK);
- } else if (op->data.metadata->key == channeld->host_key) {
- /* translate host to :authority since :authority may be
- omitted */
- grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
- channeld->mdctx, channeld->authority_key, op->data.metadata->value);
- grpc_mdelem_unref(op->data.metadata);
- op->data.metadata = authority;
- /* pass the event up */
- grpc_call_next_op(elem, op);
- } else {
- /* pass the event up */
- grpc_call_next_op(elem, op);
- }
- break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
/* Have we seen the required http2 transport headers?
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
- if (calld->seen_method == POST && calld->seen_scheme &&
- calld->seen_te_trailers && calld->path) {
- grpc_call_element_recv_metadata(elem, calld->path);
- calld->path = NULL;
- grpc_call_next_op(elem, op);
- } else if (calld->seen_method == GET) {
- handle_get(elem);
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path) {
+ /* do nothing */
} else {
- if (calld->seen_method == NOT_RECEIVED) {
+ if (!calld->seen_path) {
+ gpr_log(GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_post) {
gpr_log(GPR_ERROR, "Missing :method header");
}
if (!calld->seen_scheme) {
@@ -221,29 +170,50 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
gpr_log(GPR_ERROR, "Missing te trailers header");
}
/* Error this call out */
- op->done_cb(op->user_data, GRPC_OP_OK);
+ success = 0;
grpc_call_element_send_cancel(elem);
}
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ size_t i;
+
+ if (op->send_ops && !calld->sent_status) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+ grpc_mdelem_ref(channeld->status_ok));
break;
- case GRPC_SEND_START:
- case GRPC_SEND_METADATA:
- /* If we haven't sent status 200 yet, we need to so so because it needs to
- come before any non : prefixed metadata. */
- if (!calld->sent_status) {
- calld->sent_status = 1;
- /* status is reffed by grpc_call_element_send_metadata */
- grpc_call_element_send_metadata(elem,
- grpc_mdelem_ref(channeld->status_ok));
- }
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
+ }
+ }
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hs_on_recv;
+ op->recv_user_data = elem;
}
}
+static void hs_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hs_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
+}
+
/* Called on special channel events, such as disconnection or new incoming
calls on the server */
static void channel_op(grpc_channel_element *elem,
@@ -263,41 +233,22 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
/* initialize members */
- calld->path = NULL;
- calld->sent_status = 0;
- calld->seen_scheme = 0;
- calld->seen_method = NOT_RECEIVED;
- calld->seen_te_trailers = 0;
+ memset(calld, 0, sizeof(*calld));
+ if (initial_op) hs_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- if (calld->path) {
- grpc_mdelem_unref(calld->path);
- }
-}
+static void destroy_call_elem(grpc_call_element *elem) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
- size_t i;
- size_t gettable_capacity = 0;
-
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
@@ -313,7 +264,6 @@ static void init_channel_elem(grpc_channel_element *elem,
channeld->status_not_found =
grpc_mdelem_from_strings(mdctx, ":status", "404");
channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->method_get = grpc_mdelem_from_strings(mdctx, ":method", "GET");
channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http");
channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https");
channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
@@ -324,51 +274,17 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
channeld->mdctx = mdctx;
-
- /* initialize http download support */
- channeld->gettable_count = 0;
- channeld->gettables = NULL;
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_SERVE_OVER_HTTP)) {
- gettable *g;
- gpr_slice slice;
- grpc_http_server_page *p = args->args[i].value.pointer.p;
- if (channeld->gettable_count == gettable_capacity) {
- gettable_capacity =
- GPR_MAX(gettable_capacity * 3 / 2, gettable_capacity + 1);
- channeld->gettables = gpr_realloc(channeld->gettables,
- gettable_capacity * sizeof(gettable));
- }
- g = &channeld->gettables[channeld->gettable_count++];
- g->path = grpc_mdelem_from_strings(mdctx, ":path", p->path);
- g->content_type =
- grpc_mdelem_from_strings(mdctx, "content-type", p->content_type);
- slice = gpr_slice_from_copied_string(p->content);
- g->content = grpc_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
- }
- }
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) {
- size_t i;
-
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- for (i = 0; i < channeld->gettable_count; i++) {
- grpc_mdelem_unref(channeld->gettables[i].path);
- grpc_mdelem_unref(channeld->gettables[i].content_type);
- grpc_byte_buffer_destroy(channeld->gettables[i].content);
- }
- gpr_free(channeld->gettables);
-
grpc_mdelem_unref(channeld->te_trailers);
grpc_mdelem_unref(channeld->status_ok);
grpc_mdelem_unref(channeld->status_not_found);
grpc_mdelem_unref(channeld->method_post);
- grpc_mdelem_unref(channeld->method_get);
grpc_mdelem_unref(channeld->http_scheme);
grpc_mdelem_unref(channeld->https_scheme);
grpc_mdelem_unref(channeld->grpc_scheme);
@@ -379,6 +295,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_server_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "http-server"};
+ hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "http-server"};
diff --git a/src/core/channel/metadata_buffer.c b/src/core/channel/metadata_buffer.c
deleted file mode 100644
index eac852e4a4..0000000000
--- a/src/core/channel/metadata_buffer.c
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/metadata_buffer.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-#include <string.h>
-
-#define INITIAL_ELEM_CAP 8
-
-/* One queued call; we track offsets to string data in a shared buffer to
- reduce allocations. See grpc_metadata_buffer_impl for the memory use
- strategy */
-typedef struct {
- grpc_mdelem *md;
- void (*cb)(void *user_data, grpc_op_error error);
- void *user_data;
- gpr_uint32 flags;
-} qelem;
-
-/* Memory layout:
-
- grpc_metadata_buffer_impl
- followed by an array of qelem */
-struct grpc_metadata_buffer_impl {
- /* number of elements in q */
- size_t elems;
- /* capacity of q */
- size_t elem_cap;
-};
-
-#define ELEMS(buffer) ((qelem *)((buffer) + 1))
-
-void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer) {
- /* start buffer as NULL, indicating no elements */
- *buffer = NULL;
-}
-
-void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer,
- grpc_op_error error) {
- size_t i;
- qelem *qe;
- if (*buffer) {
- for (i = 0; i < (*buffer)->elems; i++) {
- qe = &ELEMS(*buffer)[i];
- grpc_mdelem_unref(qe->md);
- qe->cb(qe->user_data, error);
- }
- gpr_free(*buffer);
- }
-}
-
-void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer,
- grpc_call_op *op) {
- grpc_metadata_buffer_impl *impl = *buffer;
- qelem *qe;
- size_t bytes;
-
- GPR_ASSERT(op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA);
-
- if (!impl) {
- /* this is the first element: allocate enough space to hold the
- header object and the initial element capacity of qelems */
- bytes =
- sizeof(grpc_metadata_buffer_impl) + INITIAL_ELEM_CAP * sizeof(qelem);
- impl = gpr_malloc(bytes);
- /* initialize the header object */
- impl->elems = 0;
- impl->elem_cap = INITIAL_ELEM_CAP;
- } else if (impl->elems == impl->elem_cap) {
- /* more qelems than what we can deal with: grow by doubling size */
- impl->elem_cap *= 2;
- bytes = sizeof(grpc_metadata_buffer_impl) + impl->elem_cap * sizeof(qelem);
- impl = gpr_realloc(impl, bytes);
- }
-
- /* append an element to the queue */
- qe = &ELEMS(impl)[impl->elems];
- impl->elems++;
-
- qe->md = op->data.metadata;
- qe->cb = op->done_cb;
- qe->user_data = op->user_data;
- qe->flags = op->flags;
-
- /* header object may have changed location: store it back */
- *buffer = impl;
-}
-
-void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer,
- grpc_call_element *elem) {
- grpc_metadata_buffer_impl *impl = *buffer;
- grpc_call_op op;
- qelem *qe;
- size_t i;
-
- if (!impl) {
- /* nothing to send */
- return;
- }
-
- /* construct call_op's, and push them down the stack */
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- for (i = 0; i < impl->elems; i++) {
- qe = &ELEMS(impl)[i];
- op.done_cb = qe->cb;
- op.user_data = qe->user_data;
- op.flags = qe->flags;
- op.data.metadata = qe->md;
- grpc_call_next_op(elem, &op);
- }
-
- /* free data structures and reset to NULL: we can only flush once */
- gpr_free(impl);
- *buffer = NULL;
-}
diff --git a/src/core/channel/metadata_buffer.h b/src/core/channel/metadata_buffer.h
deleted file mode 100644
index b7cc5170d1..0000000000
--- a/src/core/channel/metadata_buffer.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* Utility code to buffer GRPC_SEND_METADATA calls and pass them down the stack
- all at once at some otherwise-determined time. Useful for implementing
- filters that want to queue metadata until a START event chooses some
- underlying filter stack to send an rpc on. */
-
-/* Clients should declare a member of grpc_metadata_buffer. This may at some
- point become a typedef for a struct, but for now a pointer suffices */
-typedef struct grpc_metadata_buffer_impl grpc_metadata_buffer_impl;
-typedef grpc_metadata_buffer_impl *grpc_metadata_buffer;
-
-/* Initializes the metadata buffer. Allocates no memory. */
-void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer);
-/* Destroy the metadata buffer. */
-void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer,
- grpc_op_error error);
-/* Append a call to the end of a metadata buffer: may allocate memory */
-void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, grpc_call_op *op);
-/* Flush all queued operations from the metadata buffer to the element below
- self */
-void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer,
- grpc_call_element *self);
-/* Count the number of queued elements in the buffer. */
-size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer);
-/* Extract elements as a grpc_metadata*, for presentation to applications.
- The returned buffer must be freed with
- grpc_metadata_buffer_cleanup_elements.
- Clears the metadata buffer (this is a one-shot operation) */
-grpc_metadata *grpc_metadata_buffer_extract_elements(
- grpc_metadata_buffer *buffer);
-void grpc_metadata_buffer_cleanup_elements(void *elements, grpc_op_error error);
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H */
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index d987fa2bc1..1d2be716d7 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -45,13 +45,7 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -59,12 +53,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
ignore_unused(calld);
ignore_unused(channeld);
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
+ /* do nothing */
+}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void noop_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ noop_mutate_op(elem, op);
+
+ /* pass control down the stack */
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -86,13 +88,16 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
/* initialize members */
calld->unused = channeld->unused;
+
+ if (initial_op) noop_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -131,6 +136,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_no_op_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "no-op"};
+ noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "no-op"};