aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD8
-rw-r--r--Makefile12
-rw-r--r--build.json6
-rw-r--r--src/core/channel/census_filter.c108
-rw-r--r--src/core/channel/channel_stack.c48
-rw-r--r--src/core/channel/channel_stack.h84
-rw-r--r--src/core/channel/child_channel.c36
-rw-r--r--src/core/channel/child_channel.h5
-rw-r--r--src/core/channel/client_channel.c258
-rw-r--r--src/core/channel/connected_channel.c305
-rw-r--r--src/core/channel/http_client_filter.c85
-rw-r--r--src/core/channel/http_filter.c137
-rw-r--r--src/core/channel/http_filter.h43
-rw-r--r--src/core/channel/http_server_filter.c185
-rw-r--r--src/core/channel/noop_filter.c39
-rw-r--r--src/core/security/auth.c77
-rw-r--r--src/core/security/server_secure_chttp2.c5
-rw-r--r--src/core/surface/call.c542
-rw-r--r--src/core/surface/call.h15
-rw-r--r--src/core/surface/channel.c27
-rw-r--r--src/core/surface/channel.h3
-rw-r--r--src/core/surface/channel_create.c5
-rw-r--r--src/core/surface/client.c36
-rw-r--r--src/core/surface/lame_client.c31
-rw-r--r--src/core/surface/secure_channel_create.c5
-rw-r--r--src/core/surface/server.c134
-rw-r--r--src/core/surface/server_chttp2.c5
-rw-r--r--src/core/transport/chttp2/stream_encoder.c5
-rw-r--r--src/core/transport/chttp2_transport.c409
-rw-r--r--src/core/transport/stream_op.c14
-rw-r--r--src/core/transport/stream_op.h46
-rw-r--r--src/core/transport/transport.c42
-rw-r--r--src/core/transport/transport.h107
-rw-r--r--src/core/transport/transport_impl.h16
-rw-r--r--src/core/transport/transport_op_string.c160
-rw-r--r--src/ruby/ext/grpc/rb_call.c122
-rw-r--r--test/core/channel/channel_stack_test.c13
-rw-r--r--test/core/end2end/fixtures/chttp2_fullstack.c1
-rw-r--r--test/core/end2end/fixtures/chttp2_fullstack_uds.c1
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair.c11
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c11
-rw-r--r--test/core/transport/chttp2/stream_encoder_test.c5
-rw-r--r--test/core/transport/stream_op_test.c13
-rw-r--r--vsprojects/grpc/grpc.vcxproj7
-rw-r--r--vsprojects/grpc/grpc.vcxproj.filters12
-rw-r--r--vsprojects/grpc_unsecure/grpc_unsecure.vcxproj7
-rw-r--r--vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters12
47 files changed, 1457 insertions, 1801 deletions
diff --git a/BUILD b/BUILD
index 2a93161db2..470458f239 100644
--- a/BUILD
+++ b/BUILD
@@ -147,7 +147,6 @@ cc_library(
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/http_client_filter.h",
- "src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
@@ -247,7 +246,6 @@ cc_library(
"src/core/tsi/fake_transport_security.c",
"src/core/tsi/ssl_transport_security.c",
"src/core/tsi/transport_security.c",
- "src/core/channel/call_op_string.c",
"src/core/channel/census_filter.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
@@ -256,7 +254,6 @@ cc_library(
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
- "src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
@@ -344,6 +341,7 @@ cc_library(
"src/core/transport/metadata.c",
"src/core/transport/stream_op.c",
"src/core/transport/transport.c",
+ "src/core/transport/transport_op_string.c",
],
hdrs = [
"include/grpc/grpc_security.h",
@@ -375,7 +373,6 @@ cc_library(
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/http_client_filter.h",
- "src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
@@ -456,7 +453,6 @@ cc_library(
"src/core/transport/transport.h",
"src/core/transport/transport_impl.h",
"src/core/surface/init_unsecure.c",
- "src/core/channel/call_op_string.c",
"src/core/channel/census_filter.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
@@ -465,7 +461,6 @@ cc_library(
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
- "src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
@@ -553,6 +548,7 @@ cc_library(
"src/core/transport/metadata.c",
"src/core/transport/stream_op.c",
"src/core/transport/transport.c",
+ "src/core/transport/transport_op_string.c",
],
hdrs = [
"include/grpc/byte_buffer.h",
diff --git a/Makefile b/Makefile
index 3e7930b9c3..4a8d55b28e 100644
--- a/Makefile
+++ b/Makefile
@@ -3184,7 +3184,6 @@ LIBGRPC_SRC = \
src/core/tsi/fake_transport_security.c \
src/core/tsi/ssl_transport_security.c \
src/core/tsi/transport_security.c \
- src/core/channel/call_op_string.c \
src/core/channel/census_filter.c \
src/core/channel/channel_args.c \
src/core/channel/channel_stack.c \
@@ -3193,7 +3192,6 @@ LIBGRPC_SRC = \
src/core/channel/client_setup.c \
src/core/channel/connected_channel.c \
src/core/channel/http_client_filter.c \
- src/core/channel/http_filter.c \
src/core/channel/http_server_filter.c \
src/core/channel/noop_filter.c \
src/core/compression/algorithm.c \
@@ -3281,6 +3279,7 @@ LIBGRPC_SRC = \
src/core/transport/metadata.c \
src/core/transport/stream_op.c \
src/core/transport/transport.c \
+ src/core/transport/transport_op_string.c \
PUBLIC_HEADERS_C += \
include/grpc/grpc_security.h \
@@ -3371,7 +3370,6 @@ $(OBJDIR)/$(CONFIG)/src/core/surface/secure_channel_create.o:
$(OBJDIR)/$(CONFIG)/src/core/tsi/fake_transport_security.o:
$(OBJDIR)/$(CONFIG)/src/core/tsi/ssl_transport_security.o:
$(OBJDIR)/$(CONFIG)/src/core/tsi/transport_security.o:
-$(OBJDIR)/$(CONFIG)/src/core/channel/call_op_string.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/census_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/channel_args.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/channel_stack.o:
@@ -3380,7 +3378,6 @@ $(OBJDIR)/$(CONFIG)/src/core/channel/client_channel.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/client_setup.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/connected_channel.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_client_filter.o:
-$(OBJDIR)/$(CONFIG)/src/core/channel/http_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_server_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/noop_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/compression/algorithm.o:
@@ -3468,6 +3465,7 @@ $(OBJDIR)/$(CONFIG)/src/core/transport/chttp2_transport.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/metadata.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/stream_op.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/transport.o:
+$(OBJDIR)/$(CONFIG)/src/core/transport/transport_op_string.o:
LIBGRPC_TEST_UTIL_SRC = \
@@ -3572,7 +3570,6 @@ $(OBJDIR)/$(CONFIG)/test/core/util/slice_splitter.o:
LIBGRPC_UNSECURE_SRC = \
src/core/surface/init_unsecure.c \
- src/core/channel/call_op_string.c \
src/core/channel/census_filter.c \
src/core/channel/channel_args.c \
src/core/channel/channel_stack.c \
@@ -3581,7 +3578,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/channel/client_setup.c \
src/core/channel/connected_channel.c \
src/core/channel/http_client_filter.c \
- src/core/channel/http_filter.c \
src/core/channel/http_server_filter.c \
src/core/channel/noop_filter.c \
src/core/compression/algorithm.c \
@@ -3669,6 +3665,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/transport/metadata.c \
src/core/transport/stream_op.c \
src/core/transport/transport.c \
+ src/core/transport/transport_op_string.c \
PUBLIC_HEADERS_C += \
include/grpc/byte_buffer.h \
@@ -3713,7 +3710,6 @@ ifneq ($(NO_DEPS),true)
endif
$(OBJDIR)/$(CONFIG)/src/core/surface/init_unsecure.o:
-$(OBJDIR)/$(CONFIG)/src/core/channel/call_op_string.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/census_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/channel_args.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/channel_stack.o:
@@ -3722,7 +3718,6 @@ $(OBJDIR)/$(CONFIG)/src/core/channel/client_channel.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/client_setup.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/connected_channel.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_client_filter.o:
-$(OBJDIR)/$(CONFIG)/src/core/channel/http_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_server_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/noop_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/compression/algorithm.o:
@@ -3810,6 +3805,7 @@ $(OBJDIR)/$(CONFIG)/src/core/transport/chttp2_transport.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/metadata.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/stream_op.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/transport.o:
+$(OBJDIR)/$(CONFIG)/src/core/transport/transport_op_string.o:
LIBGRPC++_SRC = \
diff --git a/build.json b/build.json
index 8838d3a630..b9deb63582 100644
--- a/build.json
+++ b/build.json
@@ -99,7 +99,6 @@
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/http_client_filter.h",
- "src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
@@ -181,7 +180,6 @@
"src/core/transport/transport_impl.h"
],
"src": [
- "src/core/channel/call_op_string.c",
"src/core/channel/census_filter.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
@@ -190,7 +188,6 @@
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
- "src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
@@ -277,7 +274,8 @@
"src/core/transport/chttp2_transport.c",
"src/core/transport/metadata.c",
"src/core/transport/stream_op.c",
- "src/core/transport/transport.c"
+ "src/core/transport/transport.c",
+ "src/core/transport/transport_op_string.c"
]
},
{
diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c
index 9c0c20af22..7e393a01a6 100644
--- a/src/core/channel/census_filter.c
+++ b/src/core/channel/census_filter.c
@@ -49,6 +49,11 @@ typedef struct call_data {
census_op_id op_id;
census_rpc_stats stats;
gpr_timespec start_ts;
+
+ /* recv callback */
+ grpc_stream_op_buffer* recv_ops;
+ void (*on_done_recv)(void* user_data, int success);
+ void* recv_user_data;
} call_data;
typedef struct channel_data {
@@ -60,57 +65,68 @@ static void init_rpc_stats(census_rpc_stats* stats) {
stats->cnt = 1;
}
-static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
+static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb,
+ call_data* calld,
channel_data* chand) {
grpc_linked_mdelem* m;
- for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
- if (m->md->key == chand->path_str) {
- gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
- census_add_method_tag(
- calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ size_t i;
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op* op = &sopb->ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
+ if (m->md->key == chand->path_str) {
+ gpr_log(GPR_DEBUG, "%s",
+ (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
+ census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
+ m->md->value->slice));
+ }
}
}
}
-static void client_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void client_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
- GPR_ASSERT(calld != NULL);
- GPR_ASSERT(chand != NULL);
- GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_SEND_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_RECV_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (op->send_ops) {
+ extract_and_annotate_method_tag(op->send_ops, calld, chand);
}
- /* Always pass control up or down the stack depending on op->dir */
+}
+
+static void client_start_transport_op(grpc_call_element* elem,
+ grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+ client_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
-static void server_call_op(grpc_call_element* elem,
- grpc_call_element* from_elem, grpc_call_op* op) {
+static void server_on_done_recv(void* ptr, int success) {
+ grpc_call_element* elem = ptr;
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
- GPR_ASSERT(calld != NULL);
- GPR_ASSERT(chand != NULL);
- GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
- switch (op->type) {
- case GRPC_RECV_METADATA:
- extract_and_annotate_method_tag(op, calld, chand);
- break;
- case GRPC_SEND_FINISH:
- /* Should we stop timing the rpc here? */
- break;
- default:
- break;
+ if (success) {
+ extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
}
- /* Always pass control up or down the stack depending on op->dir */
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_mutate_op(grpc_call_element* elem, grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ if (op->recv_ops) {
+ /* substitute our callback for the op callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_done_recv;
+ op->recv_user_data = elem;
+ }
+}
+
+static void server_start_transport_op(grpc_call_element* elem,
+ grpc_transport_op* op) {
+ call_data* calld = elem->call_data;
+ GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
+ server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
@@ -128,12 +144,14 @@ static void channel_op(grpc_channel_element* elem,
}
static void client_init_call_elem(grpc_call_element* elem,
- const void* server_transport_data) {
+ const void* server_transport_data,
+ grpc_transport_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
d->start_ts = gpr_now();
d->op_id = census_tracing_start_op();
+ if (initial_op) client_mutate_op(elem, initial_op);
}
static void client_destroy_call_elem(grpc_call_element* elem) {
@@ -144,12 +162,14 @@ static void client_destroy_call_elem(grpc_call_element* elem) {
}
static void server_init_call_elem(grpc_call_element* elem,
- const void* server_transport_data) {
+ const void* server_transport_data,
+ grpc_transport_op* initial_op) {
call_data* d = elem->call_data;
GPR_ASSERT(d != NULL);
init_rpc_stats(&d->stats);
d->start_ts = gpr_now();
d->op_id = census_tracing_start_op();
+ if (initial_op) server_mutate_op(elem, initial_op);
}
static void server_destroy_call_elem(grpc_call_element* elem) {
@@ -180,11 +200,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
- client_call_op, channel_op, sizeof(call_data), client_init_call_elem,
- client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "census-client"};
+ client_start_transport_op, channel_op, sizeof(call_data),
+ client_init_call_elem, client_destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
- server_call_op, channel_op, sizeof(call_data), server_init_call_elem,
- server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
- destroy_channel_elem, "census-server"};
+ server_start_transport_op, channel_op, sizeof(call_data),
+ server_init_call_elem, server_destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "census-server"};
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index 3a3a3a75b7..311f4f08ce 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -35,6 +35,7 @@
#include <grpc/support/log.h>
#include <stdlib.h>
+#include <string.h>
int grpc_trace_channel = 0;
@@ -147,6 +148,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack) {
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
+ grpc_transport_op *initial_op,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count;
@@ -164,7 +166,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack,
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data);
+ call_elems[i].filter->init_call_elem(&call_elems[i], transport_server_data,
+ initial_op);
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
@@ -181,12 +184,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
}
}
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
- grpc_call_element *next_elem = elem + op->dir;
- if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) {
- grpc_metadata_batch_assert_ok(&op->data.metadata);
- }
- next_elem->filter->call_op(next_elem, elem, op);
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
+ grpc_call_element *next_elem = elem + 1;
+ next_elem->filter->start_transport_op(next_elem, op);
}
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
@@ -205,39 +205,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
-static void do_nothing(void *user_data, grpc_op_error error) {}
-
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
- grpc_call_op cancel_op;
- cancel_op.type = GRPC_CANCEL_OP;
- cancel_op.dir = GRPC_CALL_DOWN;
- cancel_op.done_cb = do_nothing;
- cancel_op.user_data = NULL;
- cancel_op.flags = 0;
- cancel_op.bind_pollset = NULL;
- grpc_call_next_op(cur_elem, &cancel_op);
-}
-
-void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
- grpc_call_op finish_op;
- finish_op.type = GRPC_SEND_FINISH;
- finish_op.dir = GRPC_CALL_DOWN;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- finish_op.flags = 0;
- finish_op.bind_pollset = NULL;
- grpc_call_next_op(cur_elem, &finish_op);
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ grpc_call_next_op(cur_elem, &op);
}
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message) {
- grpc_call_op op;
- op.type = GRPC_RECV_SYNTHETIC_STATUS;
- op.dir = GRPC_CALL_UP;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.data.synthetic_status.status = status;
- op.data.synthetic_status.message = message;
- grpc_call_next_op(cur_elem, &op);
+ abort();
}
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index addc92b272..de0e4e4518 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -51,78 +51,11 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
-/* Call operations - things that can be sent and received.
-
- Threading:
- SEND, RECV, and CANCEL ops can be active on a call at the same time, but
- only one SEND, one RECV, and one CANCEL can be active at a time.
-
- If state is shared between send/receive/cancel operations, it is up to
- filters to provide their own protection around that. */
-typedef enum {
- /* send metadata to the channels peer */
- GRPC_SEND_METADATA,
- /* send a message to the channels peer */
- GRPC_SEND_MESSAGE,
- /* send a pre-formatted message to the channels peer */
- GRPC_SEND_PREFORMATTED_MESSAGE,
- /* send half-close to the channels peer */
- GRPC_SEND_FINISH,
- /* request that more data be allowed through flow control */
- GRPC_REQUEST_DATA,
- /* metadata was received from the channels peer */
- GRPC_RECV_METADATA,
- /* a message was received from the channels peer */
- GRPC_RECV_MESSAGE,
- /* half-close was received from the channels peer */
- GRPC_RECV_HALF_CLOSE,
- /* full close was received from the channels peer */
- GRPC_RECV_FINISH,
- /* a status has been sythesized locally */
- GRPC_RECV_SYNTHETIC_STATUS,
- /* the call has been abnormally terminated */
- GRPC_CANCEL_OP
-} grpc_call_op_type;
-
/* The direction of the call.
The values of the enums (1, -1) matter here - they are used to increment
or decrement a pointer to find the next element to call */
typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
-/* A single filterable operation to be performed on a call */
-typedef struct {
- /* The type of operation we're performing */
- grpc_call_op_type type;
- /* The directionality of this call - does the operation begin at the bottom
- of the stack and flow up, or does the operation start at the top of the
- stack and flow down through the filters. */
- grpc_call_dir dir;
-
- /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */
- gpr_uint32 flags;
-
- /* Argument data, matching up with grpc_call_op_type names */
- union {
- grpc_byte_buffer *message;
- grpc_metadata_batch metadata;
- struct {
- grpc_status_code status;
- const char *message;
- } synthetic_status;
- } data;
-
- grpc_pollset *bind_pollset;
-
- /* Must be called when processing of this call-op is complete.
- Signature chosen to match transport flow control callbacks */
- void (*done_cb)(void *user_data, grpc_op_error error);
- /* User data to be passed into done_cb */
- void *user_data;
-} grpc_call_op;
-
-/* returns a string representation of op, that can be destroyed with gpr_free */
-char *grpc_call_op_string(grpc_call_op *op);
-
typedef enum {
/* send a goaway message to remote channels indicating that we are going
to disconnect in the future */
@@ -170,8 +103,7 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
- void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op);
+ void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@@ -189,7 +121,8 @@ typedef struct {
transport and is on the server. Most filters want to ignore this
argument.*/
void (*init_call_elem)(grpc_call_element *elem,
- const void *server_transport_data);
+ const void *server_transport_data,
+ grpc_transport_op *initial_op);
/* Destroy per call data.
The filter does not need to do any chaining */
void (*destroy_call_elem)(grpc_call_element *elem);
@@ -268,12 +201,13 @@ void grpc_channel_stack_destroy(grpc_channel_stack *stack);
server. */
void grpc_call_stack_init(grpc_channel_stack *channel_stack,
const void *transport_server_data,
+ grpc_transport_op *initial_op,
grpc_call_stack *call_stack);
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);
-/* Call the next operation (depending on call directionality) in a call stack */
-void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op);
+/* Call the next operation in a call stack */
+void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
@@ -285,13 +219,9 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_call_op *op);
+ grpc_call_element *elem, grpc_transport_op *op);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
-void grpc_call_element_send_finish(grpc_call_element *cur_elem);
-void grpc_call_element_recv_status(grpc_call_element *cur_elem,
- grpc_status_code status,
- const char *message);
extern int grpc_trace_channel;
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index 2cb03829c7..a2f3c54290 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -60,23 +60,11 @@ typedef struct {
gpr_uint8 sent_farewell;
} lb_channel_data;
-typedef struct {
- grpc_call_element *back;
- grpc_child_channel *channel;
-} lb_call_data;
-
-static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- lb_call_data *calld = elem->call_data;
+typedef struct { grpc_child_channel *channel; } lb_call_data;
- switch (op->dir) {
- case GRPC_CALL_UP:
- calld->back->filter->call_op(calld->back, elem, op);
- break;
- case GRPC_CALL_DOWN:
- grpc_call_next_op(elem, op);
- break;
- }
+static void lb_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ grpc_call_next_op(elem, op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -132,7 +120,8 @@ static void lb_channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void lb_init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {}
/* Destructor for call_data */
static void lb_destroy_call_elem(grpc_call_element *elem) {}
@@ -165,9 +154,10 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_child_channel_top_filter = {
- lb_call_op, lb_channel_op, sizeof(lb_call_data),
- lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
- lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", };
+ lb_start_transport_op, lb_channel_op, sizeof(lb_call_data),
+ lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
+ lb_init_channel_elem, lb_destroy_channel_elem, "child-channel",
+};
/* grpc_child_channel proper */
@@ -272,17 +262,17 @@ void grpc_child_channel_handle_op(grpc_child_channel *channel,
}
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
- grpc_call_element *parent) {
+ grpc_call_element *parent,
+ grpc_transport_op *initial_op) {
grpc_call_stack *stk = gpr_malloc((channel)->call_stack_size);
grpc_call_element *lbelem;
lb_call_data *lbcalld;
lb_channel_data *lbchand;
- grpc_call_stack_init(channel, NULL, stk);
+ grpc_call_stack_init(channel, NULL, initial_op, stk);
lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
lbchand = lbelem->channel_data;
lbcalld = lbelem->call_data;
- lbcalld->back = parent;
lbcalld->channel = channel;
gpr_mu_lock(&lbchand->mu);
diff --git a/src/core/channel/child_channel.h b/src/core/channel/child_channel.h
index 38695402ab..556a1c731c 100644
--- a/src/core/channel/child_channel.h
+++ b/src/core/channel/child_channel.h
@@ -57,8 +57,9 @@ void grpc_child_channel_destroy(grpc_child_channel *channel,
int wait_for_callbacks);
grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
- grpc_call_element *parent);
+ grpc_call_element *parent,
+ grpc_transport_op *initial_op);
grpc_call_element *grpc_child_call_get_top_element(grpc_child_call *call);
void grpc_child_call_destroy(grpc_child_call *call);
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHILD_CHANNEL_H */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index bc481e59ca..78f8d06d89 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -58,6 +58,7 @@ typedef struct {
/* the sending child (may be null) */
grpc_child_channel *active_child;
+ grpc_mdctx *mdctx;
/* calls waiting for a channel to be ready */
call_data **waiting_children;
@@ -82,8 +83,6 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
- gpr_uint8 got_first_send;
-
call_state state;
gpr_timespec deadline;
union {
@@ -91,7 +90,11 @@ struct call_data {
/* our child call stack */
grpc_child_call *child_call;
} active;
- grpc_call_op waiting_op;
+ grpc_transport_op waiting_op;
+ struct {
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
+ } cancelled;
} s;
};
@@ -105,14 +108,14 @@ static int prepare_activate(grpc_call_element *elem,
calld->state = CALL_ACTIVE;
/* create a child call */
- calld->s.active.child_call = grpc_child_channel_create_call(on_child, elem);
+ /* TODO(ctiller): pass the waiting op down here */
+ calld->s.active.child_call =
+ grpc_child_channel_create_call(on_child, elem, NULL);
return 1;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
-static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
+static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
@@ -121,57 +124,7 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
- child_elem->filter->call_op(child_elem, elem, op);
-}
-
-static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->mu);
- if (calld->state == CALL_CANCELLED) {
- gpr_mu_unlock(&chand->mu);
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
- return;
- }
- GPR_ASSERT(calld->state == CALL_CREATED);
- calld->state = CALL_WAITING;
- if (chand->active_child) {
- /* channel is connected - use the connected stack */
- if (prepare_activate(elem, chand->active_child)) {
- gpr_mu_unlock(&chand->mu);
- /* activate the request (pass it down) outside the lock */
- complete_activate(elem, op);
- } else {
- gpr_mu_unlock(&chand->mu);
- }
- } else {
- /* check to see if we should initiate a connection (if we're not already),
- but don't do so until outside the lock to avoid re-entrancy problems if
- the callback is immediate */
- int initiate_transport_setup = 0;
- if (!chand->transport_setup_initiated) {
- chand->transport_setup_initiated = 1;
- initiate_transport_setup = 1;
- }
- /* add this call to the waiting set to be resumed once we have a child
- channel stack, growing the waiting set if needed */
- if (chand->waiting_child_count == chand->waiting_child_capacity) {
- chand->waiting_child_capacity =
- GPR_MAX(chand->waiting_child_capacity * 2, 8);
- chand->waiting_children =
- gpr_realloc(chand->waiting_children,
- chand->waiting_child_capacity * sizeof(call_data *));
- }
- calld->s.waiting_op = *op;
- chand->waiting_children[chand->waiting_child_count++] = calld;
- gpr_mu_unlock(&chand->mu);
-
- /* finally initiate transport setup if needed */
- if (initiate_transport_setup) {
- grpc_transport_setup_initiate(chand->transport_setup);
- }
- }
+ child_elem->filter->start_transport_op(child_elem, op);
}
static void remove_waiting_child(channel_data *chand, call_data *calld) {
@@ -186,85 +139,128 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
chand->waiting_child_count = new_count;
}
-static void send_up_cancelled_ops(grpc_call_element *elem) {
- grpc_call_op finish_op;
- /* send up a synthesized status */
- grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
- /* send up a finish */
- finish_op.type = GRPC_RECV_FINISH;
- finish_op.dir = GRPC_CALL_UP;
- finish_op.flags = 0;
- finish_op.done_cb = do_nothing;
- finish_op.user_data = NULL;
- grpc_call_next_op(elem, &finish_op);
+static void handle_op_after_cancellation(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ if (op->send_ops) {
+ op->on_done_send(op->send_user_data, 0);
+ }
+ if (op->recv_ops) {
+ char status[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa(GRPC_STATUS_CANCELLED, status);
+ calld->s.cancelled.status.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
+ calld->s.cancelled.details.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
+ calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
+ calld->s.cancelled.status.next = &calld->s.cancelled.details;
+ calld->s.cancelled.details.prev = &calld->s.cancelled.status;
+ mdb.list.head = &calld->s.cancelled.status;
+ mdb.list.tail = &calld->s.cancelled.details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future;
+ grpc_sopb_add_metadata(op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv(op->recv_user_data, 1);
+ }
}
-static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
+static void cc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
+ grpc_transport_op waiting_op;
+ GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
gpr_mu_lock(&chand->mu);
switch (calld->state) {
case CALL_ACTIVE:
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
- child_elem->filter->call_op(child_elem, elem, op);
- return; /* early out */
- case CALL_WAITING:
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
- remove_waiting_child(chand, calld);
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
- calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
- return; /* early out */
+ child_elem->filter->start_transport_op(child_elem, op);
+ break;
case CALL_CREATED:
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- send_up_cancelled_ops(elem);
- return; /* early out */
- case CALL_CANCELLED:
- gpr_mu_unlock(&chand->mu);
- return; /* early out */
- }
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
-}
-
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- call_data *calld = elem->call_data;
- GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- if (!calld->got_first_send) {
- /* filter out the start event to find which child to send on */
- calld->got_first_send = 1;
- start_rpc(elem, op);
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
} else {
- grpc_call_next_op(elem, op);
+ calld->state = CALL_WAITING;
+ if (chand->active_child) {
+ /* channel is connected - use the connected stack */
+ if (prepare_activate(elem, chand->active_child)) {
+ gpr_mu_unlock(&chand->mu);
+ /* activate the request (pass it down) outside the lock */
+ complete_activate(elem, op);
+ } else {
+ gpr_mu_unlock(&chand->mu);
+ }
+ } else {
+ /* check to see if we should initiate a connection (if we're not
+ already),
+ but don't do so until outside the lock to avoid re-entrancy
+ problems if
+ the callback is immediate */
+ int initiate_transport_setup = 0;
+ if (!chand->transport_setup_initiated) {
+ chand->transport_setup_initiated = 1;
+ initiate_transport_setup = 1;
+ }
+ /* add this call to the waiting set to be resumed once we have a child
+ channel stack, growing the waiting set if needed */
+ if (chand->waiting_child_count == chand->waiting_child_capacity) {
+ chand->waiting_child_capacity =
+ GPR_MAX(chand->waiting_child_capacity * 2, 8);
+ chand->waiting_children = gpr_realloc(
+ chand->waiting_children,
+ chand->waiting_child_capacity * sizeof(call_data *));
+ }
+ calld->s.waiting_op = *op;
+ chand->waiting_children[chand->waiting_child_count++] = calld;
+ gpr_mu_unlock(&chand->mu);
+
+ /* finally initiate transport setup if needed */
+ if (initiate_transport_setup) {
+ grpc_transport_setup_initiate(chand->transport_setup);
+ }
+ }
}
break;
- case GRPC_CANCEL_OP:
- cancel_rpc(elem, op);
- break;
- case GRPC_SEND_MESSAGE:
- case GRPC_SEND_FINISH:
- case GRPC_REQUEST_DATA:
- if (calld->state == CALL_ACTIVE) {
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
- child_elem->filter->call_op(child_elem, elem, op);
+ case CALL_WAITING:
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ waiting_op = calld->s.waiting_op;
+ remove_waiting_child(chand, calld);
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, &waiting_op);
+ handle_op_after_cancellation(elem, op);
} else {
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
+ (op->send_ops == NULL));
+ GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
+ (op->recv_ops == NULL));
+ if (op->send_ops) {
+ calld->s.waiting_op.send_ops = op->send_ops;
+ calld->s.waiting_op.is_last_send = op->is_last_send;
+ calld->s.waiting_op.on_done_send = op->on_done_send;
+ calld->s.waiting_op.send_user_data = op->send_user_data;
+ }
+ if (op->recv_ops) {
+ calld->s.waiting_op.recv_ops = op->recv_ops;
+ calld->s.waiting_op.recv_state = op->recv_state;
+ calld->s.waiting_op.on_done_recv = op->on_done_recv;
+ calld->s.waiting_op.recv_user_data = op->recv_user_data;
+ }
+ gpr_mu_unlock(&chand->mu);
}
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&chand->mu);
+ handle_op_after_cancellation(elem, op);
break;
}
}
@@ -351,15 +347,18 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
+ /* TODO(ctiller): is there something useful we can do here? */
+ GPR_ASSERT(initial_op == NULL);
+
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
- calld->got_first_send = 0;
}
/* Destructor for call_data */
@@ -372,9 +371,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
- if (calld->state == CALL_WAITING) {
- grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
- }
+ GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@@ -396,6 +393,7 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->transport_setup = NULL;
chand->transport_setup_initiated = 0;
chand->args = grpc_channel_args_copy(args);
+ chand->mdctx = metadata_context;
}
/* Destructor for channel_data */
@@ -417,9 +415,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "client-channel",
+ cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "client-channel",
};
grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
@@ -436,7 +434,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
- grpc_call_op *call_ops;
+ grpc_transport_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@@ -472,13 +470,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
- call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
+ call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
- call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(&call_ops[i]);
}
}
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 711274bfe1..14dda88698 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -45,25 +45,12 @@
#include <grpc/support/slice_buffer.h>
#define MAX_BUFFER_LENGTH 8192
-/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
typedef struct connected_channel_channel_data {
grpc_transport *transport;
- gpr_uint32 max_message_length;
} channel_data;
-typedef struct connected_channel_call_data {
- grpc_call_element *elem;
- grpc_stream_op_buffer outgoing_sopb;
-
- gpr_uint32 max_message_length;
- gpr_uint32 incoming_message_length;
- gpr_uint8 reading_message;
- gpr_uint8 got_read_close;
- gpr_slice_buffer incoming_message;
- gpr_uint32 outgoing_buffer_length_estimate;
-} call_data;
+typedef struct connected_channel_call_data { void *unused; } call_data;
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -72,91 +59,17 @@ typedef struct connected_channel_call_data {
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
(((call_data *)(transport_stream)) - 1)
-/* Copy the contents of a byte buffer into stream ops */
-static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
- grpc_stream_op_buffer *sopb) {
- size_t i;
-
- switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
- gpr_slice_ref(slice);
- grpc_sopb_add_slice(sopb, slice);
- }
- break;
- }
-}
-
-/* Flush queued stream operations onto the transport */
-static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
- call_data *calld, int is_last) {
- size_t nops;
-
- if (op->flags & GRPC_WRITE_BUFFER_HINT) {
- if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
- op->done_cb(op->user_data, GRPC_OP_OK);
- return;
- }
- }
-
- calld->outgoing_buffer_length_estimate = 0;
- grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
-
- nops = calld->outgoing_sopb.nops;
- calld->outgoing_sopb.nops = 0;
- grpc_transport_send_batch(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- calld->outgoing_sopb.ops, nops, is_last);
-}
-
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void con_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- if (op->bind_pollset) {
- grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
- }
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_MESSAGE:
- grpc_sopb_add_begin_message(&calld->outgoing_sopb,
- grpc_byte_buffer_length(op->data.message),
- op->flags);
- /* fall-through */
- case GRPC_SEND_PREFORMATTED_MESSAGE:
- copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
- calld->outgoing_buffer_length_estimate +=
- (5 + grpc_byte_buffer_length(op->data.message));
- end_bufferable_op(op, chand, calld, 0);
- break;
- case GRPC_SEND_FINISH:
- end_bufferable_op(op, chand, calld, 1);
- break;
- case GRPC_REQUEST_DATA:
- /* re-arm window updates if they were disarmed by finish_message */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
- break;
- case GRPC_CANCEL_OP:
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_CANCELLED);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_UP);
- grpc_call_next_op(elem, op);
- break;
- }
+ grpc_transport_perform_op(chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
/* Currently we assume all channel operations should just be pushed up. */
@@ -182,23 +95,16 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- calld->elem = elem;
- grpc_sopb_init(&calld->outgoing_sopb);
-
- calld->reading_message = 0;
- calld->got_read_close = 0;
- calld->outgoing_buffer_length_estimate = 0;
- calld->max_message_length = chand->max_message_length;
- gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- server_transport_data);
+ server_transport_data, initial_op);
GPR_ASSERT(r == 0);
}
@@ -207,8 +113,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
- grpc_sopb_destroy(&calld->outgoing_sopb);
- gpr_slice_buffer_destroy(&calld->incoming_message);
grpc_transport_destroy_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
@@ -218,28 +122,10 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
- size_t i;
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
-
- cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
- if (args) {
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
- if (args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else if (args->args[i].value.integer < 0) {
- gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
- GRPC_ARG_MAX_MESSAGE_LENGTH);
- } else {
- cd->max_message_length = args->args[i].value.integer;
- }
- }
- }
- }
}
/* Destructor for channel_data */
@@ -250,15 +136,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
+ con_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "connected",
};
-static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
- grpc_stream *stream, size_t size_hint) {
- return gpr_slice_malloc(size_hint);
-}
-
/* Transport callback to accept a new stream... calls up to handle it */
static void accept_stream(void *user_data, grpc_transport *transport,
const void *transport_server_data) {
@@ -276,168 +158,6 @@ static void accept_stream(void *user_data, grpc_transport *transport,
channel_op(elem, NULL, &op);
}
-static void recv_error(channel_data *chand, call_data *calld, int line,
- const char *message) {
- gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
-
- if (chand->transport) {
- grpc_transport_abort_stream(chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld),
- GRPC_STATUS_INVALID_ARGUMENT);
- }
-}
-
-static void do_nothing(void *calldata, grpc_op_error error) {}
-
-static void finish_message(channel_data *chand, call_data *calld) {
- grpc_call_element *elem = calld->elem;
- grpc_call_op call_op;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- /* if we got all the bytes for this message, call up the stack */
- call_op.type = GRPC_RECV_MESSAGE;
- call_op.done_cb = do_nothing;
- /* TODO(ctiller): this could be a lot faster if coded directly */
- call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
- calld->incoming_message.count);
- gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
-
- /* disable window updates until we get a request more from above */
- grpc_transport_set_allow_window_updates(
- chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
-
- GPR_ASSERT(calld->incoming_message.count == 0);
- calld->reading_message = 0;
- grpc_call_next_op(elem, &call_op);
-}
-
-static void got_metadata(grpc_call_element *elem,
- grpc_metadata_batch metadata) {
- grpc_call_op op;
- op.type = GRPC_RECV_METADATA;
- op.dir = GRPC_CALL_UP;
- op.flags = 0;
- op.data.metadata = metadata;
- op.done_cb = do_nothing;
- op.user_data = NULL;
-
- grpc_call_next_op(elem, &op);
-}
-
-/* Handle incoming stream ops from the transport, translating them into
- call_ops to pass up the call stack */
-static void recv_batch(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops,
- size_t ops_count, grpc_stream_state final_state) {
- call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
- grpc_call_element *elem = calld->elem;
- channel_data *chand = elem->channel_data;
- grpc_stream_op *stream_op;
- grpc_call_op call_op;
- size_t i;
- gpr_uint32 length;
-
- GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
-
- for (i = 0; i < ops_count; i++) {
- stream_op = ops + i;
- switch (stream_op->type) {
- case GRPC_OP_FLOW_CTL_CB:
- stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
- break;
- case GRPC_NO_OP:
- break;
- case GRPC_OP_METADATA:
- got_metadata(elem, stream_op->data.metadata);
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- /* can't begin a message when we're still reading a message */
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Message terminated early; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- }
- /* stash away parameters, and prepare for incoming slices */
- length = stream_op->data.begin_message.length;
- if (length > calld->max_message_length) {
- char *message = NULL;
- gpr_asprintf(
- &message,
- "Maximum message length of %d exceeded by a message of length %d",
- calld->max_message_length, length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- } else if (length > 0) {
- calld->reading_message = 1;
- calld->incoming_message_length = length;
- } else {
- finish_message(chand, calld);
- }
- break;
- case GRPC_OP_SLICE:
- if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
- gpr_slice_unref(stream_op->data.slice);
- break;
- }
- /* we have to be reading a message to know what to do here */
- if (!calld->reading_message) {
- recv_error(chand, calld, __LINE__,
- "Received payload data while not reading a message");
- return;
- }
- /* append the slice to the incoming buffer */
- gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
- if (calld->incoming_message.length > calld->incoming_message_length) {
- /* if we got too many bytes, complain */
- char *message = NULL;
- gpr_asprintf(&message,
- "Receiving message overflow; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- return;
- } else if (calld->incoming_message.length ==
- calld->incoming_message_length) {
- finish_message(chand, calld);
- }
- }
- }
- /* if the stream closed, then call up the stack to let it know */
- if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
- final_state == GRPC_STREAM_CLOSED)) {
- calld->got_read_close = 1;
- if (calld->reading_message) {
- char *message = NULL;
- gpr_asprintf(&message,
- "Last message truncated; read %d bytes, expected %d",
- (int)calld->incoming_message.length,
- (int)calld->incoming_message_length);
- recv_error(chand, calld, __LINE__, message);
- gpr_free(message);
- }
- call_op.type = GRPC_RECV_HALF_CLOSE;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
- if (final_state == GRPC_STREAM_CLOSED) {
- call_op.type = GRPC_RECV_FINISH;
- call_op.dir = GRPC_CALL_UP;
- call_op.flags = 0;
- call_op.done_cb = do_nothing;
- call_op.user_data = NULL;
- grpc_call_next_op(elem, &call_op);
- }
-}
-
static void transport_goaway(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug) {
/* transport got goaway ==> call up and handle it */
@@ -470,8 +190,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
}
const grpc_transport_callbacks connected_channel_transport_callbacks = {
- alloc_recv_buffer, accept_stream, recv_batch,
- transport_goaway, transport_closed,
+ accept_stream, transport_goaway, transport_closed,
};
grpc_transport_setup_result grpc_connected_channel_bind_transport(
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 56e12342d7..9805f325a6 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -39,6 +39,12 @@ typedef struct call_data {
grpc_linked_mdelem scheme;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
+ int sent_initial_metadata;
+
+ int got_initial_metadata;
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -64,22 +70,37 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void hc_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
+ grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hc_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
+ size_t i;
+ if (op->send_ops && !calld->sent_initial_metadata) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_initial_metadata = 1;
/* Send : prefixed headers, which have to be before any application
- * layer headers. */
+ layer headers. */
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
grpc_mdelem_ref(channeld->method));
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
@@ -88,19 +109,27 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_mdelem_ref(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
grpc_mdelem_ref(channeld->content_type));
- grpc_call_next_op(elem, op);
- break;
- case GRPC_RECV_METADATA:
- grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
break;
+ }
+ }
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hc_on_recv;
+ op->recv_user_data = elem;
}
}
+static void hc_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hc_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
+}
+
/* Called on special channel events, such as disconnection or new incoming
calls on the server */
static void channel_op(grpc_channel_element *elem,
@@ -120,7 +149,13 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {}
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
+ call_data *calld = elem->call_data;
+ calld->sent_initial_metadata = 0;
+ calld->got_initial_metadata = 0;
+ if (initial_op) hc_mutate_op(elem, initial_op);
+}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
@@ -181,6 +216,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "http-client"};
+ hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "http-client"};
diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c
deleted file mode 100644
index 453a0422d8..0000000000
--- a/src/core/channel/http_filter.c
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/channel/http_filter.h"
-#include <grpc/support/log.h>
-
-typedef struct call_data {
- int unused; /* C89 requires at least one struct element */
-} call_data;
-
-typedef struct channel_data {
- int unused; /* C89 requires at least one struct element */
-} channel_data;
-
-/* used to silence 'variable not used' warnings */
-static void ignore_unused(void *ignored) {}
-
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- ignore_unused(calld);
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
-}
-
-/* Called on special channel events, such as disconnection or new incoming
- calls on the server */
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_channel_next_op(elem, op);
- break;
- }
-}
-
-/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- /* initialize members */
- calld->unused = channeld->unused;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(calld);
- ignore_unused(channeld);
-}
-
-/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- /* The first and the last filters tend to be implemented differently to
- handle the case that there's no 'next' filter to call on the up or down
- path */
- GPR_ASSERT(!is_first);
- GPR_ASSERT(!is_last);
-
- /* initialize members */
- channeld->unused = 0;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
- /* grab pointers to our data from the channel element */
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-}
-
-const grpc_channel_filter grpc_http_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "http"};
diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h
deleted file mode 100644
index 1b116ad61f..0000000000
--- a/src/core/channel/http_filter.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
-
-#include "src/core/channel/channel_stack.h"
-
-/* Processes metadata that is common to both client and server for HTTP2
- transports. */
-extern const grpc_channel_filter grpc_http_filter;
-
-#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 0bfe2f2e30..1f64df68e3 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -38,12 +38,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct {
- grpc_mdelem *path;
- grpc_mdelem *content_type;
- grpc_byte_buffer *content;
-} gettable;
-
typedef struct call_data {
gpr_uint8 got_initial_metadata;
gpr_uint8 seen_path;
@@ -52,6 +46,10 @@ typedef struct call_data {
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
grpc_linked_mdelem status;
+
+ grpc_stream_op_buffer *recv_ops;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
} call_data;
typedef struct channel_data {
@@ -69,9 +67,6 @@ typedef struct channel_data {
grpc_mdstr *host_key;
grpc_mdctx *mdctx;
-
- size_t gettable_count;
- gettable *gettables;
} channel_data;
/* used to silence 'variable not used' warnings */
@@ -143,68 +138,82 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
- /* grab pointers to our data from the call element */
+static void hs_on_recv(void *user_data, int success) {
+ grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_RECV_METADATA:
+ if (success) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->got_initial_metadata = 1;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (!calld->got_initial_metadata) {
- calld->got_initial_metadata = 1;
- /* Have we seen the required http2 transport headers?
- (:method, :scheme, content-type, with :path and :authority covered
- at the channel level right now) */
- if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
- calld->seen_path) {
- grpc_call_next_op(elem, op);
- } else {
- if (!calld->seen_path) {
- gpr_log(GPR_ERROR, "Missing :path header");
- }
- if (!calld->seen_post) {
- gpr_log(GPR_ERROR, "Missing :method header");
- }
- if (!calld->seen_scheme) {
- gpr_log(GPR_ERROR, "Missing :scheme header");
- }
- if (!calld->seen_te_trailers) {
- gpr_log(GPR_ERROR, "Missing te trailers header");
- }
- /* Error this call out */
- grpc_metadata_batch_destroy(&op->data.metadata);
- op->done_cb(op->user_data, GRPC_OP_OK);
- grpc_call_element_send_cancel(elem);
- }
+ /* Have we seen the required http2 transport headers?
+ (:method, :scheme, content-type, with :path and :authority covered
+ at the channel level right now) */
+ if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
+ calld->seen_path) {
+ /* do nothing */
} else {
- grpc_call_next_op(elem, op);
- }
- break;
- case GRPC_SEND_METADATA:
- /* If we haven't sent status 200 yet, we need to so so because it needs to
- come before any non : prefixed metadata. */
- if (!calld->sent_status) {
- calld->sent_status = 1;
- grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
- grpc_mdelem_ref(channeld->status_ok));
+ if (!calld->seen_path) {
+ gpr_log(GPR_ERROR, "Missing :path header");
+ }
+ if (!calld->seen_post) {
+ gpr_log(GPR_ERROR, "Missing :method header");
+ }
+ if (!calld->seen_scheme) {
+ gpr_log(GPR_ERROR, "Missing :scheme header");
+ }
+ if (!calld->seen_te_trailers) {
+ gpr_log(GPR_ERROR, "Missing te trailers header");
+ }
+ /* Error this call out */
+ success = 0;
+ grpc_call_element_send_cancel(elem);
}
- grpc_call_next_op(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
+ }
+ }
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void hs_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+ /* grab pointers to our data from the call element */
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+ size_t i;
+
+ if (op->send_ops && !calld->sent_status) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
+ calld->sent_status = 1;
+ grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
+ grpc_mdelem_ref(channeld->status_ok));
break;
+ }
+ }
+
+ if (op->recv_ops && !calld->got_initial_metadata) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = hs_on_recv;
+ op->recv_user_data = elem;
}
}
+static void hs_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ hs_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
+}
+
/* Called on special channel events, such as disconnection or new incoming
calls on the server */
static void channel_op(grpc_channel_element *elem,
@@ -224,15 +233,13 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- ignore_unused(channeld);
-
/* initialize members */
memset(calld, 0, sizeof(*calld));
+ if (initial_op) hs_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -242,9 +249,6 @@ static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
- size_t i;
- size_t gettable_capacity = 0;
-
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
@@ -270,46 +274,13 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
channeld->mdctx = mdctx;
-
- /* initialize http download support */
- channeld->gettable_count = 0;
- channeld->gettables = NULL;
- for (i = 0; i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_SERVE_OVER_HTTP)) {
- gettable *g;
- gpr_slice slice;
- grpc_http_server_page *p = args->args[i].value.pointer.p;
- if (channeld->gettable_count == gettable_capacity) {
- gettable_capacity =
- GPR_MAX(gettable_capacity * 3 / 2, gettable_capacity + 1);
- channeld->gettables = gpr_realloc(channeld->gettables,
- gettable_capacity * sizeof(gettable));
- }
- g = &channeld->gettables[channeld->gettable_count++];
- g->path = grpc_mdelem_from_strings(mdctx, ":path", p->path);
- g->content_type =
- grpc_mdelem_from_strings(mdctx, "content-type", p->content_type);
- slice = gpr_slice_from_copied_string(p->content);
- g->content = grpc_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
- }
- }
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) {
- size_t i;
-
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
- for (i = 0; i < channeld->gettable_count; i++) {
- grpc_mdelem_unref(channeld->gettables[i].path);
- grpc_mdelem_unref(channeld->gettables[i].content_type);
- grpc_byte_buffer_destroy(channeld->gettables[i].content);
- }
- gpr_free(channeld->gettables);
-
grpc_mdelem_unref(channeld->te_trailers);
grpc_mdelem_unref(channeld->status_ok);
grpc_mdelem_unref(channeld->status_not_found);
@@ -324,6 +295,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_server_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "http-server"};
+ hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "http-server"};
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index d987fa2bc1..1d2be716d7 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -45,13 +45,7 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
-/* Called either:
- - in response to an API call (or similar) from above, to send something
- - a network event (or similar) from below, to receive something
- op contains type and call direction information, in addition to the data
- that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void noop_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -59,12 +53,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
ignore_unused(calld);
ignore_unused(channeld);
- switch (op->type) {
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
- }
+ /* do nothing */
+}
+
+/* Called either:
+ - in response to an API call (or similar) from above, to send something
+ - a network event (or similar) from below, to receive something
+ op contains type and call direction information, in addition to the data
+ that is being sent or received. */
+static void noop_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ noop_mutate_op(elem, op);
+
+ /* pass control down the stack */
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -86,13 +88,16 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
/* initialize members */
calld->unused = channeld->unused;
+
+ if (initial_op) noop_mutate_op(elem, initial_op);
}
/* Destructor for call_data */
@@ -131,6 +136,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_no_op_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "no-op"};
+ noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "no-op"};
diff --git a/src/core/security/auth.c b/src/core/security/auth.c
index 4af2c67d83..2322c12aa5 100644
--- a/src/core/security/auth.c
+++ b/src/core/security/auth.c
@@ -51,7 +51,9 @@ typedef struct {
grpc_credentials *creds;
grpc_mdstr *host;
grpc_mdstr *method;
- grpc_call_op op;
+ grpc_transport_op op;
+ size_t op_md_idx;
+ int sent_initial_metadata;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
} call_data;
@@ -65,24 +67,23 @@ typedef struct {
grpc_mdstr *status_key;
} channel_data;
-static void bubbleup_error(grpc_call_element *elem, const char *error_msg) {
- grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg);
- grpc_call_element_send_cancel(elem);
-}
-
static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
size_t num_md,
grpc_credentials_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
- grpc_call_op op = calld->op;
+ grpc_transport_op *op = &calld->op;
+ grpc_metadata_batch *mdb;
size_t i;
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
+ GPR_ASSERT(op->send_ops && op->send_ops->nops > calld->op_md_idx &&
+ op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA);
+ mdb = &op->send_ops->ops[calld->op_md_idx].data.metadata;
for (i = 0; i < num_md; i++) {
- grpc_metadata_batch_add_tail(&op.data.metadata, &calld->md_links[i],
+ grpc_metadata_batch_add_tail(mdb, &calld->md_links[i],
grpc_mdelem_ref(md_elems[i]));
}
- grpc_call_next_op(elem, &op);
+ grpc_call_next_op(elem, op);
}
static char *build_service_url(const char *url_scheme, call_data *calld) {
@@ -105,7 +106,8 @@ static char *build_service_url(const char *url_scheme, call_data *calld) {
return service_url;
}
-static void send_security_metadata(grpc_call_element *elem, grpc_call_op *op) {
+static void send_security_metadata(grpc_call_element *elem,
+ grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@@ -136,6 +138,7 @@ static void send_security_metadata(grpc_call_element *elem, grpc_call_op *op) {
static void on_host_checked(void *user_data, grpc_security_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
if (status == GRPC_SECURITY_OK) {
send_security_metadata(elem, &calld->op);
@@ -143,10 +146,11 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
- bubbleup_error(elem, error_msg);
- grpc_metadata_batch_destroy(&calld->op.data.metadata);
+ grpc_transport_op_add_cancellation(
+ &calld->op, GRPC_STATUS_UNAUTHENTICATED,
+ grpc_mdstr_from_string(chand->md_ctx, error_msg));
gpr_free(error_msg);
- calld->op.done_cb(calld->op.user_data, GRPC_OP_ERROR);
+ grpc_call_next_op(elem, &calld->op);
}
}
@@ -155,16 +159,23 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void auth_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
grpc_linked_mdelem *l;
+ size_t i;
- switch (op->type) {
- case GRPC_SEND_METADATA:
- for (l = op->data.metadata.list.head; l != NULL; l = l->next) {
+ if (op->send_ops && !calld->sent_initial_metadata) {
+ size_t nops = op->send_ops->nops;
+ grpc_stream_op *ops = op->send_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *sop = &ops[i];
+ if (sop->type != GRPC_OP_METADATA) continue;
+ calld->op_md_idx = i;
+ calld->sent_initial_metadata = 1;
+ for (l = sop->data.metadata.list.head; l != NULL; l = l->next) {
grpc_mdelem *md = l->md;
/* Pointer comparison is OK for md_elems created from the same context.
*/
@@ -188,21 +199,22 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.",
call_host);
- bubbleup_error(elem, error_msg);
- grpc_metadata_batch_destroy(&calld->op.data.metadata);
+ grpc_transport_op_add_cancellation(
+ &calld->op, GRPC_STATUS_UNAUTHENTICATED,
+ grpc_mdstr_from_string(channeld->md_ctx, error_msg));
gpr_free(error_msg);
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_call_next_op(elem, &calld->op);
}
- break;
+ return; /* early exit */
}
}
send_security_metadata(elem, op);
- break;
- default:
- /* pass control up or down the stack depending on op->dir */
- grpc_call_next_op(elem, op);
- break;
+ return; /* early exit */
+ }
}
+
+ /* pass control up or down the stack */
+ grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@@ -214,13 +226,17 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
/* TODO(jboeuf):
Find a way to pass-in the credentials from the caller here. */
call_data *calld = elem->call_data;
calld->creds = NULL;
calld->host = NULL;
calld->method = NULL;
+ calld->sent_initial_metadata = 0;
+
+ GPR_ASSERT(!initial_op || !initial_op->send_ops);
}
/* Destructor for call_data */
@@ -288,5 +304,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_auth_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "auth"};
+ auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "auth"};
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 5b7d99ce40..db9d545c0e 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -35,7 +35,6 @@
#include <string.h>
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
@@ -73,8 +72,8 @@ static void state_unref(grpc_server_secure_state *state) {
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {
+ &grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 2949805622..7c91ca917c 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -81,9 +81,9 @@ typedef struct {
grpc_ioreq_completion_func on_complete;
void *user_data;
/* a bit mask of which request ops are needed (1u << opid) */
- gpr_uint32 need_mask;
+ gpr_uint16 need_mask;
/* a bit mask of which request ops are now completed */
- gpr_uint32 complete_mask;
+ gpr_uint16 complete_mask;
} reqinfo_master;
/* Status data for a request can come from several sources; this
@@ -144,12 +144,17 @@ struct grpc_call {
gpr_uint8 have_alarm;
/* are we currently performing a send operation */
gpr_uint8 sending;
+ /* are we currently performing a recv operation */
+ gpr_uint8 receiving;
/* are we currently completing requests */
gpr_uint8 completing;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
- /* flag that we need to request more data */
- gpr_uint8 need_more_data;
+ /* are we currently reading a message? */
+ gpr_uint8 reading_message;
+ /* flags with bits corresponding to write states allowing us to determine
+ what was sent */
+ gpr_uint16 last_send_contains;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
@@ -214,6 +219,13 @@ struct grpc_call {
size_t send_initial_metadata_count;
gpr_timespec send_deadline;
+ grpc_stream_op_buffer send_ops;
+ grpc_stream_op_buffer recv_ops;
+ grpc_stream_state recv_state;
+
+ gpr_slice_buffer incoming_message;
+ gpr_uint32 incoming_message_length;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@@ -234,9 +246,13 @@ struct grpc_call {
} while (0)
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
-static send_action choose_send_action(grpc_call *call);
-static void enact_send_action(grpc_call *call, send_action sa);
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
+static void call_on_done_recv(void *call, int success);
+static void call_on_done_send(void *call, int success);
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
+static void execute_op(grpc_call *call, grpc_transport_op *op);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
+static void finish_read_ops(grpc_call *call);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@@ -244,6 +260,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
size_t add_initial_metadata_count,
gpr_timespec send_deadline) {
size_t i;
+ grpc_transport_op initial_op;
+ grpc_transport_op *initial_op_ptr = NULL;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
@@ -267,10 +285,25 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
+ grpc_sopb_init(&call->send_ops);
+ grpc_sopb_init(&call->recv_ops);
+ gpr_slice_buffer_init(&call->incoming_message);
/* one ref is dropped in response to destroy, the other in
stream_closed */
gpr_ref_init(&call->internal_refcount, 2);
- grpc_call_stack_init(channel_stack, server_transport_data,
+ /* server hack: start reads immediately so we can get initial metadata.
+ TODO(ctiller): figure out a cleaner solution */
+ if (!call->is_client) {
+ memset(&initial_op, 0, sizeof(initial_op));
+ initial_op.recv_ops = &call->recv_ops;
+ initial_op.recv_state = &call->recv_state;
+ initial_op.on_done_recv = call_on_done_recv;
+ initial_op.recv_user_data = call;
+ call->receiving = 1;
+ grpc_call_internal_ref(call);
+ initial_op_ptr = &initial_op;
+ }
+ grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
set_deadline_alarm(call, send_deadline);
@@ -314,6 +347,9 @@ static void destroy_call(void *call, int ignored_success) {
destroy_legacy_state(c->legacy_state);
}
grpc_bbq_destroy(&c->incoming_queue);
+ grpc_sopb_destroy(&c->send_ops);
+ grpc_sopb_destroy(&c->recv_ops);
+ gpr_slice_buffer_destroy(&c->incoming_message);
gpr_free(c);
}
@@ -359,20 +395,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
return GRPC_CALL_OK;
}
-static void request_more_data(grpc_call *call) {
- grpc_call_op op;
-
- /* call down */
- op.type = GRPC_REQUEST_DATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.bind_pollset = NULL;
-
- grpc_call_execute_op(call, &op);
-}
-
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@@ -383,17 +405,40 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
+static int need_more_data(grpc_call *call) {
+ return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
+ is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) ||
+ is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
+ is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
+ is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
+ is_op_live(call, GRPC_IOREQ_RECV_CLOSE);
+}
+
static void unlock(grpc_call *call) {
- send_action sa = SEND_NOTHING;
+ grpc_transport_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
- int need_more_data =
- call->need_more_data &&
- (call->write_state >= WRITE_STATE_STARTED || !call->is_client);
+ int start_op = 0;
int i;
- if (need_more_data) {
- call->need_more_data = 0;
+ memset(&op, 0, sizeof(op));
+
+ if (!call->receiving && need_more_data(call)) {
+ op.recv_ops = &call->recv_ops;
+ op.recv_state = &call->recv_state;
+ op.on_done_recv = call_on_done_recv;
+ op.recv_user_data = call;
+ call->receiving = 1;
+ grpc_call_internal_ref(call);
+ start_op = 1;
+ }
+
+ if (!call->sending) {
+ if (fill_send_ops(call, &op)) {
+ call->sending = 1;
+ grpc_call_internal_ref(call);
+ start_op = 1;
+ }
}
if (!call->completing && call->num_completed_requests != 0) {
@@ -405,22 +450,10 @@ static void unlock(grpc_call *call) {
grpc_call_internal_ref(call);
}
- if (!call->sending) {
- sa = choose_send_action(call);
- if (sa != SEND_NOTHING) {
- call->sending = 1;
- grpc_call_internal_ref(call);
- }
- }
-
gpr_mu_unlock(&call->mu);
- if (need_more_data) {
- request_more_data(call);
- }
-
- if (sa != SEND_NOTHING) {
- enact_send_action(call, sa);
+ if (start_op) {
+ execute_op(call, &op);
}
if (completing_requests > 0) {
@@ -495,7 +528,6 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
master->complete_mask |= 1u << op;
if (status != GRPC_OP_OK) {
master->status = status;
- master->complete_mask = master->need_mask;
}
if (master->complete_mask == master->need_mask) {
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
@@ -554,64 +586,149 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
}
}
-static void finish_send_op(grpc_call *call, grpc_ioreq_op op, write_state ws,
- grpc_op_error error) {
+static void call_on_done_send(void *pc, int success) {
+ grpc_call *call = pc;
+ grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR;
lock(call);
- finish_ioreq_op(call, op, error);
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
+ }
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error);
+ }
+ if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
+ }
+ call->last_send_contains = 0;
call->sending = 0;
- call->write_state = ws;
unlock(call);
grpc_call_internal_unref(call, 0);
}
-static void finish_write_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, WRITE_STATE_STARTED, error);
+static void finish_message(grpc_call *call) {
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
+ call->incoming_message.slices, call->incoming_message.count);
+ gpr_slice_buffer_reset_and_unref(&call->incoming_message);
+
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+
+ GPR_ASSERT(call->incoming_message.count == 0);
+ call->reading_message = 0;
}
-static void finish_finish_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, WRITE_STATE_WRITE_CLOSED, error);
+static int begin_message(grpc_call *call, grpc_begin_message msg) {
+ /* can't begin a message when we're still reading a message */
+ if (call->reading_message) {
+ char *message = NULL;
+ gpr_asprintf(
+ &message, "Message terminated early; read %d bytes, expected %d",
+ (int)call->incoming_message.length, (int)call->incoming_message_length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ }
+ /* stash away parameters, and prepare for incoming slices */
+ if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
+ char *message = NULL;
+ gpr_asprintf(
+ &message,
+ "Maximum message length of %d exceeded by a message of length %d",
+ grpc_channel_get_max_message_length(call->channel), msg.length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ } else if (msg.length > 0) {
+ call->reading_message = 1;
+ call->incoming_message_length = msg.length;
+ return 1;
+ } else {
+ finish_message(call);
+ return 1;
+ }
}
-static void finish_start_step(void *pc, grpc_op_error error) {
- finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, WRITE_STATE_STARTED,
- error);
+static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
+ if (GPR_SLICE_LENGTH(slice) == 0) {
+ gpr_slice_unref(slice);
+ return 1;
+ }
+ /* we have to be reading a message to know what to do here */
+ if (!call->reading_message) {
+ grpc_call_cancel_with_status(
+ call, GRPC_STATUS_INVALID_ARGUMENT,
+ "Received payload data while not reading a message");
+ return 0;
+ }
+ /* append the slice to the incoming buffer */
+ gpr_slice_buffer_add(&call->incoming_message, slice);
+ if (call->incoming_message.length > call->incoming_message_length) {
+ /* if we got too many bytes, complain */
+ char *message = NULL;
+ gpr_asprintf(
+ &message, "Receiving message overflow; read %d bytes, expected %d",
+ (int)call->incoming_message.length, (int)call->incoming_message_length);
+ grpc_call_cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
+ gpr_free(message);
+ return 0;
+ } else if (call->incoming_message.length == call->incoming_message_length) {
+ finish_message(call);
+ return 1;
+ } else {
+ return 1;
+ }
}
-static send_action choose_send_action(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_INITIAL:
- if (is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
- if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE) ||
- is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- return SEND_BUFFERED_INITIAL_METADATA;
- } else {
- return SEND_INITIAL_METADATA;
- }
- }
- return SEND_NOTHING;
- case WRITE_STATE_STARTED:
- if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
- if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- return SEND_BUFFERED_MESSAGE;
- } else {
- return SEND_MESSAGE;
- }
- } else if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
- if (call->is_client) {
- return SEND_FINISH;
- } else {
- return SEND_TRAILING_METADATA_AND_FINISH;
- }
+static void call_on_done_recv(void *pc, int success) {
+ grpc_call *call = pc;
+ size_t i;
+ int unref_due_to_connection_close = 0;
+ lock(call);
+ call->receiving = 0;
+ if (success) {
+ for (i = 0; success && i < call->recv_ops.nops; i++) {
+ grpc_stream_op *op = &call->recv_ops.ops[i];
+ switch (op->type) {
+ case GRPC_NO_OP:
+ break;
+ case GRPC_OP_METADATA:
+ recv_metadata(call, &op->data.metadata);
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ success = begin_message(call, op->data.begin_message);
+ break;
+ case GRPC_OP_SLICE:
+ success = add_slice_to_message(call, op->data.slice);
+ break;
}
- return SEND_NOTHING;
- case WRITE_STATE_WRITE_CLOSED:
- return SEND_NOTHING;
+ }
+ if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
+ GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
+ call->read_state = READ_STATE_READ_CLOSED;
+ }
+ if (call->recv_state == GRPC_STREAM_CLOSED) {
+ GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
+ call->read_state = READ_STATE_STREAM_CLOSED;
+ unref_due_to_connection_close = 1;
+ }
+ finish_read_ops(call);
+ } else {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR);
+ }
+ call->recv_ops.nops = 0;
+ unlock(call);
+
+ grpc_call_internal_unref(call, 0);
+ if (unref_due_to_connection_close) {
+ grpc_call_internal_unref(call, 0);
}
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- return SEND_NOTHING;
}
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
@@ -639,97 +756,102 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
return out;
}
-static void enact_send_action(grpc_call *call, send_action sa) {
+/* Copy the contents of a byte buffer into stream ops */
+static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
+ grpc_stream_op_buffer *sopb) {
+ size_t i;
+
+ switch (byte_buffer->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ gpr_slice_ref(slice);
+ grpc_sopb_add_slice(sopb, slice);
+ }
+ break;
+ }
+}
+
+static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
- grpc_call_op op;
+ grpc_metadata_batch mdb;
size_t i;
- gpr_uint32 flags = 0;
char status_str[GPR_LTOA_MIN_BUFSIZE];
+ GPR_ASSERT(op->send_ops == NULL);
- switch (sa) {
- case SEND_NOTHING:
- abort();
- break;
- case SEND_BUFFERED_INITIAL_METADATA:
- flags |= GRPC_WRITE_BUFFER_HINT;
- /* fallthrough */
- case SEND_INITIAL_METADATA:
+ switch (call->write_state) {
+ case WRITE_STATE_INITIAL:
+ if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
+ break;
+ }
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.metadata.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = call->send_deadline;
+ mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+ data.send_metadata.metadata);
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = call->send_deadline;
for (i = 0; i < call->send_initial_metadata_count; i++) {
- grpc_metadata_batch_link_head(&op.data.metadata,
- &call->send_initial_metadata[i]);
+ grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
}
+ grpc_sopb_add_metadata(&call->send_ops, mdb);
+ op->send_ops = &call->send_ops;
+ op->bind_pollset = grpc_cq_pollset(call->cq);
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
+ call->write_state = WRITE_STATE_STARTED;
call->send_initial_metadata_count = 0;
- op.done_cb = finish_start_step;
- op.user_data = call;
- op.bind_pollset = grpc_cq_pollset(call->cq);
- grpc_call_execute_op(call, &op);
- break;
- case SEND_BUFFERED_MESSAGE:
- flags |= GRPC_WRITE_BUFFER_HINT;
- /* fallthrough */
- case SEND_MESSAGE:
- data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.message = data.send_message;
- op.done_cb = finish_write_step;
- op.user_data = call;
- op.bind_pollset = NULL;
- grpc_call_execute_op(call, &op);
- break;
- case SEND_TRAILING_METADATA_AND_FINISH:
- /* send trailing metadata */
- data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.data.metadata.list = chain_metadata_from_app(
- call, data.send_metadata.count, data.send_metadata.metadata);
- op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
- op.data.metadata.deadline = call->send_deadline;
- op.bind_pollset = NULL;
- /* send status */
- /* TODO(ctiller): cache common status values */
- data = call->request_data[GRPC_IOREQ_SEND_STATUS];
- gpr_ltoa(data.send_status.code, status_str);
- grpc_metadata_batch_add_tail(
- &op.data.metadata, &call->status_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context, status_str)));
- if (data.send_status.details) {
- grpc_metadata_batch_add_tail(
- &op.data.metadata, &call->details_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_status.details)));
+ /* fall through intended */
+ case WRITE_STATE_STARTED:
+ if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
+ data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ grpc_sopb_add_begin_message(
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
+ op->send_ops = &call->send_ops;
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
+ }
+ if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
+ op->is_last_send = 1;
+ op->send_ops = &call->send_ops;
+ call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ if (!call->is_client) {
+ /* send trailing metadata */
+ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
+ mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
+ data.send_metadata.metadata);
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future;
+ /* send status */
+ /* TODO(ctiller): cache common status values */
+ data = call->request_data[GRPC_IOREQ_SEND_STATUS];
+ gpr_ltoa(data.send_status.code, status_str);
+ grpc_metadata_batch_add_tail(
+ &mdb, &call->status_link,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context, status_str)));
+ if (data.send_status.details) {
+ grpc_metadata_batch_add_tail(
+ &mdb, &call->details_link,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(
+ grpc_channel_get_message_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context,
+ data.send_status.details)));
+ }
+ grpc_sopb_add_metadata(&call->send_ops, mdb);
+ }
}
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
- /* fallthrough: see choose_send_action for details */
- case SEND_FINISH:
- op.type = GRPC_SEND_FINISH;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = finish_finish_step;
- op.user_data = call;
- op.bind_pollset = NULL;
- grpc_call_execute_op(call, &op);
+ break;
+ case WRITE_STATE_WRITE_CLOSED:
break;
}
+ if (op->send_ops) {
+ op->on_done_send = call_on_done_send;
+ op->send_user_data = call;
+ }
+ return op->send_ops != NULL;
}
static grpc_call_error start_ioreq_error(grpc_call *call,
@@ -838,10 +960,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
master->on_complete = completion;
master->user_data = user_data;
- if (have_ops & (1u << GRPC_IOREQ_RECV_MESSAGE)) {
- call->need_more_data = 1;
- }
-
finish_read_ops(call);
early_out_write_ops(call);
@@ -871,41 +989,34 @@ void grpc_call_destroy(grpc_call *c) {
grpc_call_internal_unref(c, 1);
}
-grpc_call_error grpc_call_cancel(grpc_call *c) {
- grpc_call_element *elem;
- grpc_call_op op;
-
- op.type = GRPC_CANCEL_OP;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- op.bind_pollset = NULL;
-
- elem = CALL_ELEM_FROM_CALL(c, 0);
- elem->filter->call_op(elem, NULL, &op);
-
- return GRPC_CALL_OK;
+grpc_call_error grpc_call_cancel(grpc_call *call) {
+ return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled");
}
grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
const char *description) {
+ grpc_transport_op op;
grpc_mdstr *details =
description ? grpc_mdstr_from_string(c->metadata_context, description)
: NULL;
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = status;
+
lock(c);
set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
unlock(c);
- return grpc_call_cancel(c);
+
+ execute_op(c, &op);
+
+ return GRPC_CALL_OK;
}
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
+static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, op);
+ elem->filter->start_transport_op(elem, op);
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -928,34 +1039,14 @@ static void call_alarm(void *arg, int success) {
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
+ assert(0);
+ return;
}
grpc_call_internal_ref(call);
call->have_alarm = 1;
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-static void set_read_state_locked(grpc_call *call, read_state state) {
- GPR_ASSERT(call->read_state < state);
- call->read_state = state;
- finish_read_ops(call);
-}
-
-static void set_read_state(grpc_call *call, read_state state) {
- lock(call);
- set_read_state_locked(call, state);
- unlock(call);
-}
-
-void grpc_call_read_closed(grpc_call_element *elem) {
- set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
-}
-
-void grpc_call_stream_closed(grpc_call_element *elem) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- set_read_state(call, READ_STATE_STREAM_CLOSED);
- grpc_call_internal_unref(call, 0);
-}
-
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
@@ -979,35 +1070,13 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
-void grpc_call_recv_message(grpc_call_element *elem,
- grpc_byte_buffer *byte_buffer) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
- finish_read_ops(call);
- unlock(call);
-}
-
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- set_status_code(call, STATUS_FROM_CORE, status);
- set_status_details(call, STATUS_FROM_CORE,
- grpc_mdstr_from_string(call->metadata_context, message));
- unlock(call);
-}
-
-int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
int is_trailing;
grpc_mdctx *mdctx = call->metadata_context;
- lock(call);
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
for (l = md->list.head; l != NULL; l = l->next) {
grpc_mdelem *md = l->md;
@@ -1043,9 +1112,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
- set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
+ call->read_state = READ_STATE_GOT_INITIAL_METADATA;
}
- unlock(call);
grpc_mdctx_lock(mdctx);
for (l = md->list.head; l; l = l->next) {
@@ -1055,8 +1123,6 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
}
grpc_mdctx_unlock(mdctx);
-
- return !is_trailing;
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index f8d0915349..199beb1738 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -96,27 +96,12 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
-/* Helpers for grpc_client, grpc_server filters to publish received data to
- the completion queue/surface layer */
-/* receive metadata - returns 1 if this was initial metadata */
-int grpc_call_recv_metadata(grpc_call_element *surface_element,
- grpc_metadata_batch *md);
-void grpc_call_recv_message(grpc_call_element *surface_element,
- grpc_byte_buffer *message);
-void grpc_call_read_closed(grpc_call_element *surface_element);
-void grpc_call_stream_closed(grpc_call_element *surface_element);
-
-void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message);
-
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 29b042e7c1..78f9144c19 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -52,6 +52,7 @@ typedef struct registered_call {
struct grpc_channel {
int is_client;
gpr_refcount refs;
+ gpr_uint32 max_message_length;
grpc_mdctx *metadata_context;
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_message_string;
@@ -68,9 +69,13 @@ struct grpc_channel {
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
+/* the protobuf library will (by default) start warning at 100megs */
+#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
+ size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
@@ -88,6 +93,24 @@ grpc_channel *grpc_channel_create_from_filters(
CHANNEL_STACK_FROM_CHANNEL(channel));
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
+
+ channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
+ if (args) {
+ for (i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
+ if (args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else if (args->args[i].value.integer < 0) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
+ GRPC_ARG_MAX_MESSAGE_LENGTH);
+ } else {
+ channel->max_message_length = args->args[i].value.integer;
+ }
+ }
+ }
+ }
+
return channel;
}
@@ -219,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
return channel->grpc_message_string;
}
+
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) {
+ return channel->max_message_length;
+}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index d3e51185ee..388be35711 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -44,10 +44,11 @@ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
void grpc_client_channel_closed(grpc_channel_element *elem);
void grpc_channel_internal_ref(grpc_channel *channel);
void grpc_channel_internal_unref(grpc_channel *channel);
-#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_CHANNEL_H */
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 3104b1d00d..daa8d3a7c6 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -44,7 +44,6 @@
#include "src/core/channel/client_setup.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
@@ -176,8 +175,8 @@ static void done_setup(void *sp) {
static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {
+ &grpc_http_client_filter};
return grpc_client_channel_transport_setup_complete(
channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
mdctx);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 2f898ff7d7..8ac4dd1e0e 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -43,32 +43,10 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void client_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, &op->data.metadata);
- break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_RECV_HALF_CLOSE:
- grpc_call_read_closed(elem);
- break;
- case GRPC_RECV_FINISH:
- grpc_call_stream_closed(elem);
- break;
- case GRPC_RECV_SYNTHETIC_STATUS:
- grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status,
- op->data.synthetic_status.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
- }
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -90,7 +68,8 @@ static void channel_op(grpc_channel_element *elem,
}
static void init_call_elem(grpc_call_element *elem,
- const void *transport_server_data) {}
+ const void *transport_server_data,
+ grpc_transport_op *initial_op) {}
static void destroy_call_elem(grpc_call_element *elem) {}
@@ -104,6 +83,7 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
const grpc_channel_filter grpc_client_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client",
+ client_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "client",
};
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 78170806f1..c93222344d 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -46,22 +46,10 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void lame_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_metadata_batch_destroy(&op->data.metadata);
- grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN,
- "Rpc sent on a lame channel.");
- grpc_call_stream_closed(elem);
- break;
- default:
- break;
- }
-
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(op);
}
static void channel_op(grpc_channel_element *elem,
@@ -79,7 +67,12 @@ static void channel_op(grpc_channel_element *elem,
}
static void init_call_elem(grpc_call_element *elem,
- const void *transport_server_data) {}
+ const void *transport_server_data,
+ grpc_transport_op *initial_op) {
+ if (initial_op) {
+ grpc_transport_op_finish_with_failure(initial_op);
+ }
+}
static void destroy_call_elem(grpc_call_element *elem) {}
@@ -93,9 +86,9 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- "lame-client",
+ lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "lame-client",
};
grpc_channel *grpc_lame_client_channel_create(void) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index e7d223bfda..3e331293b5 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -44,7 +44,6 @@
#include "src/core/channel/client_setup.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
#include "src/core/security/auth.h"
@@ -193,7 +192,7 @@ static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_transport *transport,
grpc_mdctx *mdctx) {
static grpc_channel_filter const *extra_filters[] = {
- &grpc_client_auth_filter, &grpc_http_client_filter, &grpc_http_filter};
+ &grpc_client_auth_filter, &grpc_http_client_filter};
return grpc_client_channel_transport_setup_complete(
channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
mdctx);
@@ -211,7 +210,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
- grpc_channel_security_connector* connector;
+ grpc_channel_security_connector *connector;
grpc_mdctx *mdctx;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e771929870..28f42f0fc3 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -173,13 +173,19 @@ struct call_data {
grpc_call *call;
call_state state;
- gpr_timespec deadline;
grpc_mdstr *path;
grpc_mdstr *host;
+ gpr_timespec deadline;
+ int got_initial_metadata;
legacy_data *legacy;
grpc_completion_queue *cq_new;
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -371,46 +377,6 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
-static void stream_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- break;
- case PENDING:
- call_list_remove(calld, PENDING_START);
- /* fallthrough intended */
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
- grpc_call_stream_closed(elem);
-}
-
-static void read_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- case PENDING:
- grpc_call_read_closed(elem);
- break;
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
-}
-
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
@@ -425,33 +391,75 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
- grpc_call_op *op) {
+static void server_on_recv(void *ptr, int success) {
+ grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_RECV_METADATA:
+ channel_data *chand = elem->channel_data;
+
+ if (success && !calld->got_initial_metadata) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
calld->deadline = op->data.metadata.deadline;
- start_new_rpc(elem);
}
+ calld->got_initial_metadata = 1;
+ start_new_rpc(elem);
break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
+ }
+ }
+
+ switch (*calld->recv_state) {
+ case GRPC_STREAM_OPEN:
break;
- case GRPC_RECV_HALF_CLOSE:
- read_closed(elem);
+ case GRPC_STREAM_SEND_CLOSED:
break;
- case GRPC_RECV_FINISH:
- stream_closed(elem);
+ case GRPC_STREAM_RECV_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
+ case GRPC_STREAM_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ } else if (calld->state == PENDING) {
+ call_list_remove(calld, PENDING_START);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
}
+
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+
+ if (op->recv_ops) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->recv_state = op->recv_state;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_recv;
+ op->recv_user_data = elem;
+ }
+}
+
+static void server_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ server_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -502,7 +510,8 @@ static void shutdown_channel(channel_data *chand) {
}
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -514,6 +523,8 @@ static void init_call_elem(grpc_call_element *elem,
gpr_mu_unlock(&chand->server->mu);
server_ref(chand->server);
+
+ if (initial_op) server_mutate_op(elem, initial_op);
}
static void destroy_call_elem(grpc_call_element *elem) {
@@ -592,8 +603,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
+ server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "server",
};
static void addcq(grpc_server *server, grpc_completion_queue *cq) {
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index f3b9219f8b..7b5c2f227b 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -33,7 +33,6 @@
#include <grpc/grpc.h>
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_server.h"
@@ -46,8 +45,8 @@
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {
+ &grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index 5ca31d6bc7..cf1e66bf8b 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -481,7 +481,6 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
break;
case GRPC_OP_METADATA:
grpc_metadata_batch_assert_ok(&op->data.metadata);
- case GRPC_OP_FLOW_CTL_CB:
/* these just get copied as they don't impact the number of flow
controlled bytes */
grpc_sopb_append(outops, op, 1);
@@ -567,10 +566,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
GPR_ERROR,
"These stream ops should be filtered out by grpc_chttp2_preencode");
abort();
- case GRPC_OP_FLOW_CTL_CB:
- op->data.flow_ctl_cb.cb(op->data.flow_ctl_cb.arg, GRPC_OP_OK);
- curop++;
- break;
case GRPC_OP_METADATA:
/* Encode a metadata batch; store the returned values, representing
a metadata element that needs to be unreffed back into the metadata
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index e32ee284e0..52de8c218f 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -91,10 +91,9 @@ typedef enum {
/* streams that are waiting to start because there are too many concurrent
streams on the connection */
WAITING_FOR_CONCURRENCY,
- /* streams that want to callback the application */
- PENDING_CALLBACKS,
- /* streams that *ARE* calling back to the application */
- EXECUTING_CALLBACKS,
+ /* streams that have finished reading: we wait until unlock to coalesce
+ all changes into one callback */
+ FINISHED_READ_OP,
STREAM_LIST_COUNT /* must be last */
} stream_list_id;
@@ -141,6 +140,12 @@ typedef enum {
DTS_FRAME
} deframe_transport_state;
+typedef enum {
+ WRITE_STATE_OPEN,
+ WRITE_STATE_QUEUED_CLOSE,
+ WRITE_STATE_SENT_CLOSE
+} WRITE_STATE;
+
typedef struct {
stream *head;
stream *tail;
@@ -182,6 +187,18 @@ typedef struct {
gpr_slice debug;
} pending_goaway;
+typedef struct {
+ void (*cb)(void *user_data, int success);
+ void *user_data;
+ int success;
+} op_closure;
+
+typedef struct {
+ op_closure *callbacks;
+ size_t count;
+ size_t capacity;
+} op_closure_array;
+
struct transport {
grpc_transport base; /* must be first */
const grpc_transport_callbacks *cb;
@@ -202,6 +219,10 @@ struct transport {
gpr_uint8 closed;
error_state error_state;
+ /* queued callbacks */
+ op_closure_array pending_callbacks;
+ op_closure_array executing_callbacks;
+
/* stream indexing */
gpr_uint32 next_stream_id;
gpr_uint32 last_incoming_stream_id;
@@ -281,13 +302,13 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- gpr_uint8 queued_write_closed;
- gpr_uint8 sending_write_closed;
- gpr_uint8 sent_write_closed;
+ WRITE_STATE write_state;
+ gpr_uint8 send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
- gpr_uint8 allow_window_updates;
- gpr_uint8 published_close;
+
+ op_closure send_done_closure;
+ op_closure recv_done_closure;
stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT];
@@ -299,7 +320,10 @@ struct stream {
gpr_timespec incoming_deadline;
/* sops from application */
- grpc_stream_op_buffer outgoing_sopb;
+ grpc_stream_op_buffer *outgoing_sopb;
+ grpc_stream_op_buffer *incoming_sopb;
+ grpc_stream_state *publish_state;
+ grpc_stream_state published_state;
/* sops that have passed flow control to be written */
grpc_stream_op_buffer writing_sopb;
@@ -337,7 +361,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_chttp2_error_code error_code, int send_rst);
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst);
+ grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message, int send_rst);
static void finalize_cancellations(transport *t);
static stream *lookup_stream(transport *t, gpr_uint32 id);
static void remove_from_stream_map(transport *t, stream *s);
@@ -348,6 +373,14 @@ static void become_skip_parser(transport *t);
static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
grpc_endpoint_cb_status error);
+static void schedule_cb(transport *t, op_closure closure, int success);
+static void maybe_finish_read(transport *t, stream *s);
+static void maybe_join_window_updates(transport *t, stream *s);
+static void finish_reads(transport *t);
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
+static void add_metadata_batch(transport *t, stream *s);
+
/*
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -416,6 +449,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
+ memset(t, 0, sizeof(*t));
+
t->base.vtable = &vtable;
t->ep = ep;
/* one ref is for destroy, the other for when ep becomes NULL */
@@ -427,27 +462,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t->str_grpc_timeout =
grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
t->reading = 1;
- t->writing = 0;
t->error_state = ERROR_STATE_NONE;
t->next_stream_id = is_client ? 1 : 2;
- t->last_incoming_stream_id = 0;
- t->destroying = 0;
- t->closed = 0;
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
- t->expect_continuation_stream_id = 0;
- t->pings = NULL;
- t->ping_count = 0;
- t->ping_capacity = 0;
t->ping_counter = gpr_now().tv_nsec;
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- t->pending_goaways = NULL;
- t->num_pending_goaways = 0;
- t->cap_pending_goaways = 0;
gpr_slice_buffer_init(&t->outbuf);
gpr_slice_buffer_init(&t->qbuf);
grpc_sopb_init(&t->nuke_later_sopb);
@@ -462,7 +486,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
needed.
TODO(ctiller): tune this */
grpc_chttp2_stream_map_init(&t->stream_map, 8);
- memset(&t->lists, 0, sizeof(t->lists));
/* copy in initial settings to all setting sets */
for (i = 0; i < NUM_SETTING_SETS; i++) {
@@ -503,7 +526,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_mu_lock(&t->mu);
t->calling_back = 1;
- ref_transport(t);
+ ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
@@ -515,7 +538,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
- ref_transport(t);
+ ref_transport(t); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
unref_transport(t);
@@ -573,16 +596,19 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
}
static int init_stream(grpc_transport *gt, grpc_stream *gs,
- const void *server_data) {
+ const void *server_data, grpc_transport_op *initial_op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
+ memset(s, 0, sizeof(*s));
+
ref_transport(t);
if (!server_data) {
lock(t);
s->id = 0;
} else {
+ /* already locked */
s->id = (gpr_uint32)(gpr_uintptr)server_data;
t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
@@ -592,24 +618,13 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_window =
t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->queued_write_closed = 0;
- s->sending_write_closed = 0;
- s->sent_write_closed = 0;
- s->read_closed = 0;
- s->cancelled = 0;
- s->allow_window_updates = 0;
- s->published_close = 0;
- s->incoming_metadata_count = 0;
- s->incoming_metadata_capacity = 0;
- s->incoming_metadata = NULL;
s->incoming_deadline = gpr_inf_future;
- memset(&s->links, 0, sizeof(s->links));
- memset(&s->included, 0, sizeof(s->included));
- grpc_sopb_init(&s->outgoing_sopb);
grpc_sopb_init(&s->writing_sopb);
grpc_sopb_init(&s->callback_sopb);
grpc_chttp2_data_parser_init(&s->parser);
+ if (initial_op) perform_op_locked(t, s, initial_op);
+
if (!server_data) {
unlock(t);
}
@@ -642,7 +657,7 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock(&t->mu);
- grpc_sopb_destroy(&s->outgoing_sopb);
+ GPR_ASSERT(s->outgoing_sopb == NULL);
grpc_sopb_destroy(&s->writing_sopb);
grpc_sopb_destroy(&s->callback_sopb);
grpc_chttp2_data_parser_destroy(&s->parser);
@@ -708,8 +723,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join(transport *t, stream *s, stream_list_id id) {
- if (id == PENDING_CALLBACKS)
- GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
if (s->included[id]) {
return;
}
@@ -718,6 +731,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) {
static void remove_from_stream_map(transport *t, stream *s) {
if (s->id == 0) return;
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
+ t->is_client ? "CLI" : "SVR", s->id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
maybe_start_some_streams(t);
}
@@ -762,6 +777,8 @@ static void unlock(transport *t) {
finalize_cancellations(t);
}
+ finish_reads(t);
+
/* gather any callbacks that need to be made */
if (!t->calling_back && cb) {
perform_callbacks = prepare_callbacks(t);
@@ -865,21 +882,24 @@ static int prepare_write(transport *t) {
while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
s->outgoing_window > 0) {
window_delta = grpc_chttp2_preencode(
- s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
+ s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
t->outgoing_window -= window_delta;
s->outgoing_window -= window_delta;
- s->sending_write_closed =
- s->queued_write_closed && s->outgoing_sopb.nops == 0;
- if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
+ if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
+ s->outgoing_sopb->nops == 0) {
+ s->send_closed = 1;
+ }
+ if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
}
- /* if there are still writes to do and the stream still has window
- available, then schedule a further write */
- if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) {
- GPR_ASSERT(!t->outgoing_window);
+ /* we should either exhaust window or have no ops left, but not both */
+ if (s->outgoing_sopb->nops == 0) {
+ s->outgoing_sopb = NULL;
+ schedule_cb(t, s->send_done_closure, 1);
+ } else if (s->outgoing_window) {
stream_list_add_tail(t, s, WRITABLE);
}
}
@@ -912,10 +932,9 @@ static void finalize_outbuf(transport *t) {
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
- s->sending_write_closed, s->id, &t->hpack_compressor,
- &t->outbuf);
+ s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
s->writing_sopb.nops = 0;
- if (s->sending_write_closed) {
+ if (s->send_closed) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
@@ -929,8 +948,10 @@ static void finish_write_common(transport *t, int success) {
drop_connection(t);
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
- s->sent_write_closed = 1;
- if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
+ s->write_state = WRITE_STATE_SENT_CLOSE;
+ if (!s->cancelled) {
+ maybe_finish_read(t, s);
+ }
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@@ -980,6 +1001,9 @@ static void maybe_start_some_streams(transport *t) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) break;
+ IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
+ t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
+
GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id;
t->next_stream_id += 2;
@@ -988,43 +1012,61 @@ static void maybe_start_some_streams(transport *t) {
}
}
-static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
- size_t ops_count, int is_last) {
- transport *t = (transport *)gt;
- stream *s = (stream *)gs;
-
- lock(t);
-
- if (is_last) {
- s->queued_write_closed = 1;
- }
- if (!s->cancelled) {
- grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
- if (s->id == 0) {
- stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
- maybe_start_some_streams(t);
+static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ cancel_stream(
+ t, s, op->cancel_with_status,
+ grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
+ op->cancel_message, 1);
+ }
+
+ if (op->send_ops) {
+ GPR_ASSERT(s->outgoing_sopb == NULL);
+ s->send_done_closure.cb = op->on_done_send;
+ s->send_done_closure.user_data = op->send_user_data;
+ if (!s->cancelled) {
+ s->outgoing_sopb = op->send_ops;
+ if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
+ s->write_state = WRITE_STATE_QUEUED_CLOSE;
+ }
+ if (s->id == 0) {
+ IF_TRACING(gpr_log(GPR_DEBUG,
+ "HTTP:%s: New stream %p waiting for concurrency",
+ t->is_client ? "CLI" : "SVR", s));
+ stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
+ maybe_start_some_streams(t);
+ } else if (s->outgoing_window > 0) {
+ stream_list_join(t, s, WRITABLE);
+ }
} else {
- stream_list_join(t, s, WRITABLE);
+ schedule_nuke_sopb(t, op->send_ops);
+ schedule_cb(t, s->send_done_closure, 0);
}
- } else {
- grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
}
- if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
- !s->published_close) {
- stream_list_join(t, s, PENDING_CALLBACKS);
+
+ if (op->recv_ops) {
+ GPR_ASSERT(s->incoming_sopb == NULL);
+ s->recv_done_closure.cb = op->on_done_recv;
+ s->recv_done_closure.user_data = op->recv_user_data;
+ s->incoming_sopb = op->recv_ops;
+ s->incoming_sopb->nops = 0;
+ s->publish_state = op->recv_state;
+ maybe_finish_read(t, s);
+ maybe_join_window_updates(t, s);
}
- unlock(t);
+ if (op->bind_pollset) {
+ add_to_pollset_locked(t, op->bind_pollset);
+ }
}
-static void abort_stream(grpc_transport *gt, grpc_stream *gs,
- grpc_status_code status) {
+static void perform_op(grpc_transport *gt, grpc_stream *gs,
+ grpc_transport_op *op) {
transport *t = (transport *)gt;
stream *s = (stream *)gs;
lock(t);
- cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
- 1);
+ perform_op_locked(t, s, op);
unlock(t);
}
@@ -1063,8 +1105,8 @@ static void finalize_cancellations(transport *t) {
while ((s = stream_list_remove_head(t, CANCELLED))) {
s->read_closed = 1;
- s->sent_write_closed = 1;
- stream_list_join(t, s, PENDING_CALLBACKS);
+ s->write_state = WRITE_STATE_SENT_CLOSE;
+ maybe_finish_read(t, s);
}
}
@@ -1082,18 +1124,24 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
- int send_rst) {
+ grpc_mdstr *optional_message, int send_rst) {
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
- had_outgoing = s->outgoing_sopb.nops != 0;
+ had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
schedule_nuke_sopb(t, &s->parser.incoming_sopb);
- schedule_nuke_sopb(t, &s->outgoing_sopb);
+ if (s->outgoing_sopb) {
+ schedule_nuke_sopb(t, s->outgoing_sopb);
+ s->outgoing_sopb = NULL;
+ stream_list_remove(t, s, WRITABLE);
+ schedule_cb(t, s->send_done_closure, 0);
+ }
if (s->cancelled) {
send_rst = 0;
- } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
+ } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
+ had_outgoing) {
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);
@@ -1101,17 +1149,26 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
add_incoming_metadata(
t, s,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- switch (local_status) {
- case GRPC_STATUS_CANCELLED:
- add_incoming_metadata(
- t, s, grpc_mdelem_from_strings(t->metadata_context,
- "grpc-message", "Cancelled"));
- break;
- default:
- break;
+ if (!optional_message) {
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
+ } else {
+ add_incoming_metadata(
+ t, s,
+ grpc_mdelem_from_metadata_strings(
+ t->metadata_context,
+ grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
+ grpc_mdstr_ref(optional_message)));
}
-
- stream_list_join(t, s, PENDING_CALLBACKS);
+ add_metadata_batch(t, s);
+ maybe_finish_read(t, s);
}
}
if (!id) send_rst = 0;
@@ -1119,24 +1176,29 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
gpr_slice_buffer_add(&t->qbuf,
grpc_chttp2_rst_stream_create(id, error_code));
}
+ if (optional_message) {
+ grpc_mdstr_unref(optional_message);
+ }
}
static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst) {
cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
- send_rst);
+ NULL, send_rst);
}
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst) {
- cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
+ grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message, int send_rst) {
+ cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
+ send_rst);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
- GRPC_CHTTP2_INTERNAL_ERROR, 0);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
}
static void end_all_the_calls(transport *t) {
@@ -1150,8 +1212,14 @@ static void drop_connection(transport *t) {
end_all_the_calls(t);
}
+static void maybe_finish_read(transport *t, stream *s) {
+ if (s->incoming_sopb) {
+ stream_list_join(t, s, FINISHED_READ_OP);
+ }
+}
+
static void maybe_join_window_updates(transport *t, stream *s) {
- if (s->allow_window_updates &&
+ if (s->incoming_sopb != NULL &&
s->incoming_window <
t->settings[LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
@@ -1160,21 +1228,6 @@ static void maybe_join_window_updates(transport *t, stream *s) {
}
}
-static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
- int allow) {
- transport *t = (transport *)tp;
- stream *s = (stream *)sp;
-
- lock(t);
- s->allow_window_updates = allow;
- if (allow) {
- maybe_join_window_updates(t, s);
- } else {
- stream_list_remove(t, s, WINDOW_UPDATE);
- }
- unlock(t);
-}
-
static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
if (t->incoming_frame_size > t->incoming_window) {
gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
@@ -1248,7 +1301,7 @@ static int init_data_frame_parser(transport *t) {
case GRPC_CHTTP2_STREAM_ERROR:
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_INTERNAL_ERROR),
- GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
return init_skip_frame(t, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
drop_connection(t);
@@ -1267,11 +1320,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT(s);
- IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
- grpc_mdstr_as_c_string(md->key),
- grpc_mdstr_as_c_string(md->value)));
+ IF_TRACING(gpr_log(
+ GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
- stream_list_join(t, s, PENDING_CALLBACKS);
if (md->key == t->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
if (!cached_timeout) {
@@ -1290,6 +1342,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
} else {
add_incoming_metadata(t, s, md);
}
+ maybe_finish_read(t, s);
}
static int init_header_frame_parser(transport *t, int is_continuation) {
@@ -1464,8 +1517,6 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
-static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
-
static void add_metadata_batch(transport *t, stream *s) {
grpc_metadata_batch b;
size_t i;
@@ -1483,8 +1534,7 @@ static void add_metadata_batch(transport *t, stream *s) {
s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
- grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
- s->incoming_metadata);
+ /* TODO(ctiller): don't leak incoming_metadata */
/* reset */
s->incoming_deadline = gpr_inf_future;
@@ -1501,14 +1551,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
case GRPC_CHTTP2_PARSE_OK:
if (st.end_of_stream) {
t->incoming_stream->read_closed = 1;
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.need_flush_reads) {
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.metadata_boundary) {
add_metadata_batch(t, t->incoming_stream);
- stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
+ maybe_finish_read(t, t->incoming_stream);
}
if (st.ack_settings) {
gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
@@ -1545,11 +1595,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
}
if (st.initial_window_update) {
for (i = 0; i < t->stream_map.count; i++) {
- stream *s = (stream*)(t->stream_map.values[i]);
+ stream *s = (stream *)(t->stream_map.values[i]);
int was_window_empty = s->outgoing_window <= 0;
s->outgoing_window += st.initial_window_update;
- if (was_window_empty && s->outgoing_window > 0 &&
- s->outgoing_sopb.nops > 0) {
+ if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
+ s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
@@ -1563,12 +1613,13 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_FLOW_CONTROL_ERROR),
- GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
+ GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else {
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
flag that */
- if (was_window_empty && s->outgoing_sopb.nops) {
+ if (was_window_empty && s->outgoing_sopb &&
+ s->outgoing_sopb->nops > 0) {
stream_list_join(t, s, WRITABLE);
}
}
@@ -1830,53 +1881,79 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN;
}
-static int prepare_callbacks(transport *t) {
+static void finish_reads(transport *t) {
stream *s;
- int n = 0;
- while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
- int execute = 1;
-
- s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
- if (s->callback_state == GRPC_STREAM_CLOSED) {
- remove_from_stream_map(t, s);
- if (s->published_close) {
- execute = 0;
- } else if (s->incoming_metadata_count) {
- add_metadata_batch(t, s);
+
+ while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
+ int publish = 0;
+ GPR_ASSERT(s->incoming_sopb);
+ *s->publish_state =
+ compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
+ if (*s->publish_state != s->published_state) {
+ s->published_state = *s->publish_state;
+ publish = 1;
+ if (s->published_state == GRPC_STREAM_CLOSED) {
+ remove_from_stream_map(t, s);
}
- s->published_close = 1;
}
-
- grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
-
- if (execute) {
- stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
- n = 1;
+ if (s->parser.incoming_sopb.nops > 0) {
+ grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
+ publish = 1;
+ }
+ if (publish) {
+ s->incoming_sopb = NULL;
+ schedule_cb(t, s->recv_done_closure, 1);
}
}
- return n;
+}
+
+static void schedule_cb(transport *t, op_closure closure, int success) {
+ if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
+ t->pending_callbacks.capacity =
+ GPR_MAX(t->pending_callbacks.capacity * 2, 8);
+ t->pending_callbacks.callbacks =
+ gpr_realloc(t->pending_callbacks.callbacks,
+ t->pending_callbacks.capacity *
+ sizeof(*t->pending_callbacks.callbacks));
+ }
+ closure.success = success;
+ t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
+}
+
+static int prepare_callbacks(transport *t) {
+ op_closure_array temp = t->pending_callbacks;
+ t->pending_callbacks = t->executing_callbacks;
+ t->executing_callbacks = temp;
+ return t->executing_callbacks.count > 0;
}
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
- stream *s;
- while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
- size_t nops = s->callback_sopb.nops;
- s->callback_sopb.nops = 0;
- cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
- s->callback_sopb.ops, nops, s->callback_state);
+ size_t i;
+ for (i = 0; i < t->executing_callbacks.count; i++) {
+ op_closure c = t->executing_callbacks.callbacks[i];
+ c.cb(c.user_data, c.success);
}
+ t->executing_callbacks.count = 0;
}
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
cb->closed(t->cb_user_data, &t->base);
}
-static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
- transport *t = (transport *)gt;
- lock(t);
+/*
+ * POLLSET STUFF
+ */
+
+static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
if (t->ep) {
grpc_endpoint_add_to_pollset(t->ep, pollset);
}
+}
+
+static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
+ transport *t = (transport *)gt;
+ lock(t);
+ add_to_pollset_locked(t, pollset);
unlock(t);
}
@@ -1885,9 +1962,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
- sizeof(stream), init_stream, send_batch, set_allow_window_updates,
- add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
- send_ping, destroy_transport};
+ sizeof(stream), init_stream, perform_op,
+ add_to_pollset, destroy_stream, goaway,
+ close_transport, send_ping, destroy_transport};
void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
void *arg,
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 882c078d51..ea22b0e1c8 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -81,9 +81,6 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
case GRPC_OP_METADATA:
grpc_metadata_batch_destroy(&ops[i].data.metadata);
break;
- case GRPC_OP_FLOW_CTL_CB:
- ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR);
- break;
case GRPC_NO_OP:
case GRPC_OP_BEGIN_MESSAGE:
break;
@@ -119,6 +116,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
assert_contained_metadata_ok(sopb->ops, sopb->nops);
+ GPR_ASSERT(sopb->nops <= sopb->capacity);
if (sopb->nops == sopb->capacity) {
expandto(sopb, GROW(sopb->capacity));
}
@@ -158,16 +156,6 @@ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_FLOW_CTL_CB;
- op->data.flow_ctl_cb.cb = cb;
- op->data.flow_ctl_cb.arg = arg;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
-}
-
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops) {
size_t orig_nops = sopb->nops;
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 20146b9af2..95497a3cc8 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -55,9 +55,7 @@ typedef enum grpc_stream_op_code {
GRPC_OP_BEGIN_MESSAGE,
/* Add a slice of data to the current message/metadata element/status.
Must not overflow the forward declared length. */
- GRPC_OP_SLICE,
- /* Call some function once this operation has passed flow control. */
- GRPC_OP_FLOW_CTL_CB
+ GRPC_OP_SLICE
} grpc_stream_op_code;
/* Arguments for GRPC_OP_BEGIN */
@@ -68,12 +66,6 @@ typedef struct grpc_begin_message {
gpr_uint32 flags;
} grpc_begin_message;
-/* Arguments for GRPC_OP_FLOW_CTL_CB */
-typedef struct grpc_flow_ctl_cb {
- void (*cb)(void *arg, grpc_op_error error);
- void *arg;
-} grpc_flow_ctl_cb;
-
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
@@ -94,29 +86,31 @@ typedef struct grpc_metadata_batch {
void grpc_metadata_batch_init(grpc_metadata_batch *comd);
void grpc_metadata_batch_destroy(grpc_metadata_batch *comd);
void grpc_metadata_batch_merge(grpc_metadata_batch *target,
- grpc_metadata_batch *add);
+ grpc_metadata_batch *add);
void grpc_metadata_batch_link_head(grpc_metadata_batch *comd,
- grpc_linked_mdelem *storage);
+ grpc_linked_mdelem *storage);
void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
- grpc_linked_mdelem *storage);
+ grpc_linked_mdelem *storage);
void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
- grpc_linked_mdelem *storage,
- grpc_mdelem *elem_to_add);
+ grpc_linked_mdelem *storage,
+ grpc_mdelem *elem_to_add);
void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd,
- grpc_linked_mdelem *storage,
- grpc_mdelem *elem_to_add);
+ grpc_linked_mdelem *storage,
+ grpc_mdelem *elem_to_add);
void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
- grpc_mdelem *(*filter)(void *user_data,
- grpc_mdelem *elem),
- void *user_data);
+ grpc_mdelem *(*filter)(void *user_data,
+ grpc_mdelem *elem),
+ void *user_data);
#ifndef NDEBUG
void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd);
#else
-#define grpc_metadata_batch_assert_ok(comd) do {} while (0)
+#define grpc_metadata_batch_assert_ok(comd) \
+ do { \
+ } while (0)
#endif
/* Represents a single operation performed on a stream/transport */
@@ -129,7 +123,6 @@ typedef struct grpc_stream_op {
grpc_begin_message begin_message;
grpc_metadata_batch metadata;
gpr_slice slice;
- grpc_flow_ctl_cb flow_ctl_cb;
} data;
} grpc_stream_op;
@@ -160,15 +153,14 @@ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
/* Append a GRPC_OP_BEGIN to a buffer */
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
gpr_uint32 flags);
-void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
+void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb,
+ grpc_metadata_batch metadata);
/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
-/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb);
+
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index ef0020dc58..d9a1319c42 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -52,18 +52,15 @@ void grpc_transport_destroy(grpc_transport *transport) {
}
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
- const void *server_data) {
- return transport->vtable->init_stream(transport, stream, server_data);
+ const void *server_data,
+ grpc_transport_op *initial_op) {
+ return transport->vtable->init_stream(transport, stream, server_data,
+ initial_op);
}
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
- grpc_stream_op *ops, size_t nops, int is_last) {
- transport->vtable->send_batch(transport, stream, ops, nops, is_last);
-}
-
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
- grpc_stream *stream, int allow) {
- transport->vtable->set_allow_window_updates(transport, stream, allow);
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ grpc_transport_op *op) {
+ transport->vtable->perform_op(transport, stream, op);
}
void grpc_transport_add_to_pollset(grpc_transport *transport,
@@ -76,11 +73,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream);
}
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status) {
- transport->vtable->abort_stream(transport, stream, status);
-}
-
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data) {
transport->vtable->ping(transport, cb, user_data);
@@ -93,3 +85,23 @@ void grpc_transport_setup_cancel(grpc_transport_setup *setup) {
void grpc_transport_setup_initiate(grpc_transport_setup *setup) {
setup->vtable->initiate(setup);
}
+
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
+ if (op->send_ops) {
+ op->on_done_send(op->send_user_data, 0);
+ }
+ if (op->recv_ops) {
+ op->on_done_recv(op->recv_user_data, 0);
+ }
+}
+
+void grpc_transport_op_add_cancellation(grpc_transport_op *op,
+ grpc_status_code status,
+ grpc_mdstr *message) {
+ if (op->cancel_with_status == GRPC_STATUS_OK) {
+ op->cancel_with_status = status;
+ op->cancel_message = message;
+ } else if (message) {
+ grpc_mdstr_unref(message);
+ }
+}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index ce8c17c322..cdea0b9a0b 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -60,26 +60,26 @@ typedef enum grpc_stream_state {
GRPC_STREAM_CLOSED
} grpc_stream_state;
-/* Callbacks made from the transport to the upper layers of grpc. */
-struct grpc_transport_callbacks {
- /* Allocate a buffer to receive data into.
- It's safe to call grpc_slice_new() to do this, but performance minded
- proxies may want to carefully place data into optimal locations for
- transports.
- This function must return a valid, non-empty slice.
+/* Transport op: a set of operations to perform on a transport */
+typedef struct grpc_transport_op {
+ grpc_stream_op_buffer *send_ops;
+ int is_last_send;
+ void (*on_done_send)(void *user_data, int success);
+ void *send_user_data;
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- stream - the grpc_stream instance the buffer will be used for, or
- NULL if this is not known
- size_hint - how big of a buffer would the transport optimally like?
- the actual returned buffer can be smaller or larger than
- size_hint as the implementation finds convenient */
- struct gpr_slice (*alloc_recv_buffer)(void *user_data,
- grpc_transport *transport,
- grpc_stream *stream, size_t size_hint);
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
+ grpc_pollset *bind_pollset;
+
+ grpc_status_code cancel_with_status;
+ grpc_mdstr *cancel_message;
+} grpc_transport_op;
+
+/* Callbacks made from the transport to the upper layers of grpc. */
+struct grpc_transport_callbacks {
/* Initialize a new stream on behalf of the transport.
Must result in a call to
grpc_transport_init_stream(transport, ..., request) in the same call
@@ -96,28 +96,6 @@ struct grpc_transport_callbacks {
void (*accept_stream)(void *user_data, grpc_transport *transport,
const void *server_data);
- /* Process a set of stream ops that have been received by the transport.
- Called by network threads, so must be careful not to block on network
- activity.
-
- If final_state == GRPC_STREAM_CLOSED, the upper layers should arrange to
- call grpc_transport_destroy_stream.
-
- Ownership of any objects contained in ops is transferred to the callee.
-
- Arguments:
- user_data - the transport user data set at transport creation time
- transport - the grpc_transport instance making this call
- stream - the stream this data was received for
- ops - stream operations that are part of this batch
- ops_count - the number of stream operations in this batch
- final_state - the state of the stream as of the final operation in this
- batch */
- void (*recv_batch)(void *user_data, grpc_transport *transport,
- grpc_stream *stream, grpc_stream_op *ops, size_t ops_count,
- grpc_stream_state final_state);
-
- /* The transport received a goaway */
void (*goaway)(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug);
@@ -139,7 +117,8 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
server_data - either NULL for a client initiated stream, or a pointer
supplied from the accept_stream callback function */
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
- const void *server_data);
+ const void *server_data,
+ grpc_transport_op *initial_op);
/* Destroy transport data for a stream.
@@ -154,20 +133,17 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_destroy_stream(grpc_transport *transport,
grpc_stream *stream);
-/* Enable/disable incoming data for a stream.
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
- This effectively disables new window becoming available for a given stream,
- but does not prevent existing window from being consumed by a sender: the
- caller must still be prepared to receive some additional data after this
- call.
+void grpc_transport_op_add_cancellation(grpc_transport_op *op,
+ grpc_status_code status,
+ grpc_mdstr *message);
- Arguments:
- transport - the transport on which to create this stream
- stream - the grpc_stream to destroy (memory is still owned by the
- caller, but any child memory must be cleaned up)
- allow - is it allowed that new window be opened up? */
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
- grpc_stream *stream, int allow);
+/* TODO(ctiller): remove this */
+void grpc_transport_add_to_pollset(grpc_transport *transport,
+ grpc_pollset *pollset);
+
+char *grpc_transport_op_string(grpc_transport_op *op);
/* Send a batch of operations on a transport
@@ -177,13 +153,9 @@ void grpc_transport_set_allow_window_updates(grpc_transport *transport,
transport - the transport on which to initiate the stream
stream - the stream on which to send the operations. This must be
non-NULL and previously initialized by the same transport.
- ops - an array of operations to apply to the stream - can be NULL
- if ops_count == 0.
- ops_count - the number of elements in ops
- is_last - is this the last batch of operations to be sent out */
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
- grpc_stream_op *ops, size_t ops_count,
- int is_last);
+ op - a grpc_transport_op specifying the op to perform */
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ grpc_transport_op *op);
/* Send a ping on a transport
@@ -193,19 +165,6 @@ void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data);
-/* Abort a stream
-
- Terminate reading and writing for a stream. A final recv_batch with no
- operations and final_state == GRPC_STREAM_CLOSED will be received locally,
- and no more data will be presented to the up-layer.
-
- TODO(ctiller): consider adding a HTTP/2 reason to this function. */
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status);
-
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset);
-
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
gpr_slice debug_data);
@@ -254,4 +213,4 @@ void grpc_transport_setup_initiate(grpc_transport_setup *setup);
used as a destruction call by setup). */
void grpc_transport_setup_cancel(grpc_transport_setup *setup);
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */
diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h
index ac275c7560..479e15338f 100644
--- a/src/core/transport/transport_impl.h
+++ b/src/core/transport/transport_impl.h
@@ -43,15 +43,11 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_transport *self, grpc_stream *stream,
- const void *server_data);
+ const void *server_data, grpc_transport_op *initial_op);
/* implementation of grpc_transport_send_batch */
- void (*send_batch)(grpc_transport *self, grpc_stream *stream,
- grpc_stream_op *ops, size_t ops_count, int is_last);
-
- /* implementation of grpc_transport_set_allow_window_updates */
- void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream,
- int allow);
+ void (*perform_op)(grpc_transport *self, grpc_stream *stream,
+ grpc_transport_op *op);
/* implementation of grpc_transport_add_to_pollset */
void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset);
@@ -59,10 +55,6 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_transport *self, grpc_stream *stream);
- /* implementation of grpc_transport_abort_stream */
- void (*abort_stream)(grpc_transport *self, grpc_stream *stream,
- grpc_status_code status);
-
/* implementation of grpc_transport_goaway */
void (*goaway)(grpc_transport *self, grpc_status_code status,
gpr_slice debug_data);
@@ -84,4 +76,4 @@ struct grpc_transport {
const grpc_transport_vtable *vtable;
};
-#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */
+#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_IMPL_H */
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
new file mode 100644
index 0000000000..a215710969
--- /dev/null
+++ b/src/core/transport/transport_op_string.c
@@ -0,0 +1,160 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
+ gpr_strvec_add(b, gpr_strdup("key="));
+ gpr_strvec_add(
+ b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
+
+ gpr_strvec_add(b, gpr_strdup(" value="));
+ gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
+ GPR_SLICE_LENGTH(md->value->slice),
+ GPR_HEXDUMP_PLAINTEXT));
+}
+
+static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
+ grpc_linked_mdelem *m;
+ for (m = md.list.head; m != NULL; m = m->next) {
+ if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", "));
+ put_metadata(b, m->md);
+ }
+ if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ char *tmp;
+ gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
+ md.deadline.tv_nsec);
+ gpr_strvec_add(b, tmp);
+ }
+}
+
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
+ char *out;
+ char *tmp;
+ size_t i;
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op *op = &sopb->ops[i];
+ if (i > 0) gpr_strvec_add(&b, gpr_strdup(", "));
+ switch (op->type) {
+ case GRPC_NO_OP:
+ gpr_strvec_add(&b, gpr_strdup("NO_OP"));
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_SLICE:
+ gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice));
+ break;
+ case GRPC_OP_METADATA:
+ gpr_strvec_add(&b, gpr_strdup("METADATA{"));
+ put_metadata_list(&b, op->data.metadata);
+ gpr_strvec_add(&b, gpr_strdup("}"));
+ break;
+ }
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+char *grpc_transport_op_string(grpc_transport_op *op) {
+ char *tmp;
+ char *out;
+ int first = 1;
+
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ if (op->send_ops) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("SEND"));
+ if (op->is_last_send) {
+ gpr_strvec_add(&b, gpr_strdup("_LAST"));
+ }
+ gpr_strvec_add(&b, gpr_strdup("["));
+ gpr_strvec_add(&b, grpc_sopb_string(op->send_ops));
+ gpr_strvec_add(&b, gpr_strdup("]"));
+ }
+
+ if (op->recv_ops) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("RECV"));
+ }
+
+ if (op->bind_pollset) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("BIND"));
+ }
+
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
+ gpr_strvec_add(&b, tmp);
+ if (op->cancel_message) {
+ gpr_asprintf(&tmp, ";msg='%s'",
+ grpc_mdstr_as_c_string(op->cancel_message));
+ gpr_strvec_add(&b, tmp);
+ }
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_transport_op *op) {
+ char *str = grpc_transport_op_string(op);
+ gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
+ gpr_free(str);
+}
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index f4ae6fab84..e76bb930ee 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -118,35 +118,36 @@ static void grpc_rb_call_destroy(void *p) {
}
static size_t md_ary_datasize(const void *p) {
- const grpc_metadata_array* const ary = (grpc_metadata_array*)p;
- size_t i, datasize = sizeof(grpc_metadata_array);
- for (i = 0; i < ary->count; ++i) {
- const grpc_metadata* const md = &ary->metadata[i];
- datasize += strlen(md->key);
- datasize += md->value_length;
- }
- datasize += ary->capacity * sizeof(grpc_metadata);
- return datasize;
+ const grpc_metadata_array *const ary = (grpc_metadata_array *)p;
+ size_t i, datasize = sizeof(grpc_metadata_array);
+ for (i = 0; i < ary->count; ++i) {
+ const grpc_metadata *const md = &ary->metadata[i];
+ datasize += strlen(md->key);
+ datasize += md->value_length;
+ }
+ datasize += ary->capacity * sizeof(grpc_metadata);
+ return datasize;
}
static const rb_data_type_t grpc_rb_md_ary_data_type = {
"grpc_metadata_array",
{GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize},
- NULL, NULL,
- 0
-};
+ NULL,
+ NULL,
+ 0};
/* Describes grpc_call struct for RTypedData */
static const rb_data_type_t grpc_call_data_type = {
"grpc_call",
{GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE},
- NULL, NULL,
- /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because grpc_rb_call_destroy
+ NULL,
+ NULL,
+ /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
+ * grpc_rb_call_destroy
* touches a hash object.
* TODO(yugui) Directly use st_table and call the free function earlier?
*/
- 0
-};
+ 0};
/* Error code details is a hash containing text strings describing errors */
VALUE rb_error_code_details;
@@ -250,7 +251,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
}
md_ary->metadata[md_ary->count].value = RSTRING_PTR(rb_ary_entry(val, i));
md_ary->metadata[md_ary->count].value_length =
- RSTRING_LEN(rb_ary_entry(val, i));
+ RSTRING_LEN(rb_ary_entry(val, i));
md_ary->count += 1;
}
} else {
@@ -290,10 +291,11 @@ static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
/* grpc_rb_md_ary_convert converts a ruby metadata hash into
a grpc_metadata_array.
*/
-static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
+static void grpc_rb_md_ary_convert(VALUE md_ary_hash,
+ grpc_metadata_array *md_ary) {
VALUE md_ary_obj = Qnil;
if (md_ary_hash == Qnil) {
- return; /* Do nothing if the expected has value is nil */
+ return; /* Do nothing if the expected has value is nil */
}
if (TYPE(md_ary_hash) != T_HASH) {
rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
@@ -303,8 +305,8 @@ static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ar
/* Initialize the array, compute it's capacity, then fill it. */
grpc_metadata_array_init(md_ary);
- md_ary_obj = TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type,
- md_ary);
+ md_ary_obj =
+ TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata));
rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
@@ -327,16 +329,14 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) {
rb_hash_aset(result, key, value);
} else if (TYPE(value) == T_ARRAY) {
/* Add the string to the returned array */
- rb_ary_push(value,
- rb_str_new(md_ary->metadata[i].value,
- md_ary->metadata[i].value_length));
+ rb_ary_push(value, rb_str_new(md_ary->metadata[i].value,
+ md_ary->metadata[i].value_length));
} else {
/* Add the current value with this key and the new one to an array */
new_ary = rb_ary_new();
rb_ary_push(new_ary, value);
- rb_ary_push(new_ary,
- rb_str_new(md_ary->metadata[i].value,
- md_ary->metadata[i].value_length));
+ rb_ary_push(new_ary, rb_str_new(md_ary->metadata[i].value,
+ md_ary->metadata[i].value_length));
rb_hash_aset(result, key, new_ary);
}
}
@@ -355,7 +355,7 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
rb_obj_classname(key));
return ST_STOP;
}
- switch(NUM2INT(key)) {
+ switch (NUM2INT(key)) {
case GRPC_OP_SEND_INITIAL_METADATA:
case GRPC_OP_SEND_MESSAGE:
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
@@ -367,8 +367,7 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
rb_ary_push(ops_ary, key);
return ST_CONTINUE;
default:
- rb_raise(rb_eTypeError, "invalid operation : bad value %d",
- NUM2INT(key));
+ rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
};
return ST_STOP;
}
@@ -377,8 +376,8 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
struct to the 'send_status_from_server' portion of an op.
*/
static void grpc_rb_op_update_status_from_server(grpc_op *op,
- grpc_metadata_array* md_ary,
- VALUE status) {
+ grpc_metadata_array *md_ary,
+ VALUE status) {
VALUE code = rb_struct_aref(status, sym_code);
VALUE details = rb_struct_aref(status, sym_details);
VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
@@ -405,8 +404,8 @@ static void grpc_rb_op_update_status_from_server(grpc_op *op,
* grpc_rb_call_run_batch function */
typedef struct run_batch_stack {
/* The batch ops */
- grpc_op ops[8]; /* 8 is the maximum number of operations */
- size_t op_num; /* tracks the last added operation */
+ grpc_op ops[8]; /* 8 is the maximum number of operations */
+ size_t op_num; /* tracks the last added operation */
/* Data being sent */
grpc_metadata_array send_metadata;
@@ -424,7 +423,7 @@ typedef struct run_batch_stack {
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
-static void grpc_run_batch_stack_init(run_batch_stack* st) {
+static void grpc_run_batch_stack_init(run_batch_stack *st) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
@@ -435,7 +434,7 @@ static void grpc_run_batch_stack_init(run_batch_stack* st) {
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
* cleaned up */
-static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
+static void grpc_run_batch_stack_cleanup(run_batch_stack *st) {
grpc_metadata_array_destroy(&st->send_metadata);
grpc_metadata_array_destroy(&st->send_trailing_metadata);
grpc_metadata_array_destroy(&st->recv_metadata);
@@ -447,7 +446,7 @@ static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
/* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
* ops_hash */
-static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
+static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
VALUE this_op = Qnil;
VALUE this_value = Qnil;
VALUE ops_ary = rb_ary_new();
@@ -460,7 +459,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
this_op = rb_ary_entry(ops_ary, i);
this_value = rb_hash_aref(ops_hash, this_op);
- switch(NUM2INT(this_op)) {
+ switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* N.B. later there is no need to explicitly delete the metadata keys
* and values, they are references to data in ruby objects. */
@@ -471,18 +470,16 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
st->send_metadata.metadata;
break;
case GRPC_OP_SEND_MESSAGE:
- st->ops[st->op_num].data.send_message =
- grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
- RSTRING_LEN(this_value));
+ st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
+ RSTRING_PTR(this_value), RSTRING_LEN(this_value));
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
/* N.B. later there is no need to explicitly delete the metadata keys
* and values, they are references to data in ruby objects. */
- grpc_rb_op_update_status_from_server(&st->ops[st->op_num],
- &st->send_trailing_metadata,
- this_value);
+ grpc_rb_op_update_status_from_server(
+ &st->ops[st->op_num], &st->send_trailing_metadata, this_value);
break;
case GRPC_OP_RECV_INITIAL_METADATA:
st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata;
@@ -516,12 +513,12 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
/* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
after the results have run */
-static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
+static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
size_t i = 0;
VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
Qnil, Qnil, Qnil, Qnil, NULL);
for (i = 0; i < st->op_num; i++) {
- switch(st->ops[i].op) {
+ switch (st->ops[i].op) {
case GRPC_OP_SEND_INITIAL_METADATA:
rb_struct_aset(result, sym_send_metadata, Qtrue);
break;
@@ -544,13 +541,11 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
rb_struct_aset(
- result,
- sym_status,
- rb_struct_new(grpc_rb_sStatus,
- UINT2NUM(st->recv_status),
+ result, sym_status,
+ rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status),
(st->recv_status_details == NULL
- ? Qnil
- : rb_str_new2(st->recv_status_details)),
+ ? Qnil
+ : rb_str_new2(st->recv_status_details)),
grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
NULL));
break;
@@ -682,8 +677,7 @@ static void Init_grpc_error_codes() {
static void Init_grpc_op_codes() {
/* Constants representing operation type codes in grpc.h */
- VALUE grpc_rb_mCallOps =
- rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
+ VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
@@ -709,14 +703,14 @@ void Init_grpc_call() {
grpc_rb_eOutOfTime =
rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
- grpc_rb_cMdAry = rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray",
- rb_cObject);
+ grpc_rb_cMdAry =
+ rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
/* Prevent allocation or inialization of the Call class */
rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
- rb_define_method(grpc_rb_cCall, "initialize_copy",
- grpc_rb_cannot_init_copy, 1);
+ rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
+ 1);
/* Add ruby analogues of the Call methods. */
rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
@@ -746,16 +740,8 @@ void Init_grpc_call() {
/* The Struct used to return the run_batch result. */
grpc_rb_sBatchResult = rb_struct_define(
- "BatchResult",
- "send_message",
- "send_metadata",
- "send_close",
- "send_status",
- "message",
- "metadata",
- "status",
- "cancelled",
- NULL);
+ "BatchResult", "send_message", "send_metadata", "send_close",
+ "send_status", "message", "metadata", "status", "cancelled", NULL);
/* The hash for reference counting calls, to ensure they can't be destroyed
* more than once */
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index e92db59249..957dee1aa7 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -55,7 +55,8 @@ static void channel_init_func(grpc_channel_element *elem,
}
static void call_init_func(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
++*(int *)(elem->channel_data);
*(int *)(elem->call_data) = 0;
}
@@ -66,8 +67,7 @@ static void call_destroy_func(grpc_call_element *elem) {
++*(int *)(elem->channel_data);
}
-static void call_func(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void call_func(grpc_call_element *elem, grpc_transport_op *op) {
++*(int *)(elem->call_data);
}
@@ -78,9 +78,8 @@ static void channel_func(grpc_channel_element *elem,
static void test_create_channel_stack(void) {
const grpc_channel_filter filter = {
- call_func, channel_func, sizeof(int),
- call_init_func, call_destroy_func, sizeof(int),
- channel_init_func, channel_destroy_func, "some_test_filter"};
+ call_func, channel_func, sizeof(int), call_init_func, call_destroy_func,
+ sizeof(int), channel_init_func, channel_destroy_func, "some_test_filter"};
const grpc_channel_filter *filters = &filter;
grpc_channel_stack *channel_stack;
grpc_call_stack *call_stack;
@@ -112,7 +111,7 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(*channel_data == 0);
call_stack = gpr_malloc(channel_stack->call_stack_size);
- grpc_call_stack_init(channel_stack, NULL, call_stack);
+ grpc_call_stack_init(channel_stack, NULL, NULL, call_stack);
GPR_ASSERT(call_stack->count == 1);
call_elem = grpc_call_stack_element(call_stack, 0);
GPR_ASSERT(call_elem->filter == channel_elem->filter);
diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c
index ab7c7f4caa..d7de5e5434 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack.c
@@ -37,7 +37,6 @@
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/client.h"
diff --git a/test/core/end2end/fixtures/chttp2_fullstack_uds.c b/test/core/end2end/fixtures/chttp2_fullstack_uds.c
index 27e4baf3c0..53803b0f1d 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack_uds.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack_uds.c
@@ -39,7 +39,6 @@
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/support/string.h"
#include "src/core/surface/channel.h"
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c
index 1225f7db0c..d19ceb178b 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair.c
@@ -37,7 +37,6 @@
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/endpoint_pair.h"
@@ -60,8 +59,8 @@
static grpc_transport_setup_result server_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
grpc_end2end_test_fixture *f = ts;
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {
+ &grpc_http_server_filter};
return grpc_server_setup_transport(f->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
@@ -75,9 +74,9 @@ static grpc_transport_setup_result client_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
sp_client_setup *cs = ts;
- const grpc_channel_filter *filters[] = {
- &grpc_client_surface_filter, &grpc_http_client_filter, &grpc_http_filter,
- &grpc_connected_channel_filter};
+ const grpc_channel_filter *filters[] = {&grpc_client_surface_filter,
+ &grpc_http_client_filter,
+ &grpc_connected_channel_filter};
size_t nfilters = sizeof(filters) / sizeof(*filters);
grpc_channel *channel = grpc_channel_create_from_filters(
filters, nfilters, cs->client_args, mdctx, 1);
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
index 9f6ad98006..ddde585b83 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
@@ -37,7 +37,6 @@
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_client_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/endpoint_pair.h"
@@ -60,8 +59,8 @@
static grpc_transport_setup_result server_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
grpc_end2end_test_fixture *f = ts;
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {
+ &grpc_http_server_filter};
return grpc_server_setup_transport(f->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
@@ -75,9 +74,9 @@ static grpc_transport_setup_result client_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
sp_client_setup *cs = ts;
- const grpc_channel_filter *filters[] = {
- &grpc_client_surface_filter, &grpc_http_client_filter, &grpc_http_filter,
- &grpc_connected_channel_filter};
+ const grpc_channel_filter *filters[] = {&grpc_client_surface_filter,
+ &grpc_http_client_filter,
+ &grpc_connected_channel_filter};
size_t nfilters = sizeof(filters) / sizeof(*filters);
grpc_channel *channel = grpc_channel_create_from_filters(
filters, nfilters, cs->client_args, mdctx, 1);
diff --git a/test/core/transport/chttp2/stream_encoder_test.c b/test/core/transport/chttp2/stream_encoder_test.c
index bda8298eb3..91833440cd 100644
--- a/test/core/transport/chttp2/stream_encoder_test.c
+++ b/test/core/transport/chttp2/stream_encoder_test.c
@@ -102,15 +102,10 @@ static void verify_sopb(size_t window_available, int eof,
gpr_slice_unref(expect);
}
-static void assert_result_ok(void *user_data, grpc_op_error error) {
- GPR_ASSERT(error == GRPC_OP_OK);
-}
-
static void test_small_data_framing(void) {
grpc_sopb_add_no_op(&g_sopb);
verify_sopb(10, 0, 0, "");
- grpc_sopb_add_flow_ctl_cb(&g_sopb, assert_result_ok, NULL);
grpc_sopb_add_slice(&g_sopb, create_test_slice(3));
verify_sopb(10, 0, 3, "000003 0000 deadbeef 000102");
diff --git a/test/core/transport/stream_op_test.c b/test/core/transport/stream_op_test.c
index 5885223894..546080deb9 100644
--- a/test/core/transport/stream_op_test.c
+++ b/test/core/transport/stream_op_test.c
@@ -38,10 +38,6 @@
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
-static void flow_ctl_cb_fails(void *ignored, grpc_op_error error) {
- GPR_ASSERT(error == GRPC_OP_ERROR);
-}
-
static void assert_slices_equal(gpr_slice a, gpr_slice b) {
GPR_ASSERT(a.refcount == b.refcount);
if (a.refcount) {
@@ -60,7 +56,6 @@ int main(int argc, char **argv) {
gpr_slice test_slice_2 = gpr_slice_malloc(2);
gpr_slice test_slice_3 = gpr_slice_malloc(3);
gpr_slice test_slice_4 = gpr_slice_malloc(4);
- char x;
unsigned i;
grpc_stream_op_buffer buf;
@@ -78,11 +73,10 @@ int main(int argc, char **argv) {
grpc_sopb_add_slice(&buf, test_slice_2);
grpc_sopb_add_slice(&buf, test_slice_3);
grpc_sopb_add_slice(&buf, test_slice_4);
- grpc_sopb_add_flow_ctl_cb(&buf, flow_ctl_cb_fails, &x);
grpc_sopb_add_no_op(&buf);
/* verify that the data went in ok */
- GPR_ASSERT(buf.nops == 7);
+ GPR_ASSERT(buf.nops == 6);
GPR_ASSERT(buf.ops[0].type == GRPC_OP_BEGIN_MESSAGE);
GPR_ASSERT(buf.ops[0].data.begin_message.length == 1);
GPR_ASSERT(buf.ops[0].data.begin_message.flags == 2);
@@ -94,10 +88,7 @@ int main(int argc, char **argv) {
assert_slices_equal(buf.ops[3].data.slice, test_slice_3);
GPR_ASSERT(buf.ops[4].type == GRPC_OP_SLICE);
assert_slices_equal(buf.ops[4].data.slice, test_slice_4);
- GPR_ASSERT(buf.ops[5].type == GRPC_OP_FLOW_CTL_CB);
- GPR_ASSERT(buf.ops[5].data.flow_ctl_cb.cb == flow_ctl_cb_fails);
- GPR_ASSERT(buf.ops[5].data.flow_ctl_cb.arg == &x);
- GPR_ASSERT(buf.ops[6].type == GRPC_NO_OP);
+ GPR_ASSERT(buf.ops[5].type == GRPC_NO_OP);
/* initialize the second buffer */
grpc_sopb_init(&buf2);
diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj
index fc7744f2b7..0bc594382e 100644
--- a/vsprojects/grpc/grpc.vcxproj
+++ b/vsprojects/grpc/grpc.vcxproj
@@ -115,7 +115,6 @@
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
- <ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
@@ -237,8 +236,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\tsi\transport_security.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\channel\call_op_string.c">
- </ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_args.c">
@@ -255,8 +252,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\channel\http_filter.c">
- </ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
@@ -431,6 +426,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport.c">
</ClCompile>
+ <ClCompile Include="..\..\src\core\transport\transport_op_string.c">
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\gpr\gpr.vcxproj">
diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters
index 1dfca58cb5..edce8d9652 100644
--- a/vsprojects/grpc/grpc.vcxproj.filters
+++ b/vsprojects/grpc/grpc.vcxproj.filters
@@ -61,9 +61,6 @@
<ClCompile Include="..\..\src\core\tsi\transport_security.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\core\channel\call_op_string.c">
- <Filter>src\core\channel</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@@ -88,9 +85,6 @@
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\core\channel\http_filter.c">
- <Filter>src\core\channel</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@@ -352,6 +346,9 @@
<ClCompile Include="..\..\src\core\transport\transport.c">
<Filter>src\core\transport</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\core\transport\transport_op_string.c">
+ <Filter>src\core\transport</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\grpc\grpc_security.h">
@@ -443,9 +440,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
- <ClInclude Include="..\..\src\core\channel\http_filter.h">
- <Filter>src\core\channel</Filter>
- </ClInclude>
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
index 670b109e6f..fac17e6b9f 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
@@ -98,7 +98,6 @@
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
- <ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
@@ -182,8 +181,6 @@
<ItemGroup>
<ClCompile Include="..\..\src\core\surface\init_unsecure.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\channel\call_op_string.c">
- </ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_args.c">
@@ -200,8 +197,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
</ClCompile>
- <ClCompile Include="..\..\src\core\channel\http_filter.c">
- </ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
@@ -376,6 +371,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport.c">
</ClCompile>
+ <ClCompile Include="..\..\src\core\transport\transport_op_string.c">
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\gpr\gpr.vcxproj">
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
index 7c94d4d51e..daca2c0c5a 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -4,9 +4,6 @@
<ClCompile Include="..\..\src\core\surface\init_unsecure.c">
<Filter>src\core\surface</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\core\channel\call_op_string.c">
- <Filter>src\core\channel</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@@ -31,9 +28,6 @@
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
- <ClCompile Include="..\..\src\core\channel\http_filter.c">
- <Filter>src\core\channel</Filter>
- </ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@@ -295,6 +289,9 @@
<ClCompile Include="..\..\src\core\transport\transport.c">
<Filter>src\core\transport</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\core\transport\transport_op_string.c">
+ <Filter>src\core\transport</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\grpc\byte_buffer.h">
@@ -338,9 +335,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
- <ClInclude Include="..\..\src\core\channel\http_filter.h">
- <Filter>src\core\channel</Filter>
- </ClInclude>
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>