aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-22 14:00:47 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-22 14:00:47 -0700
commitbe18b8db301b800edef382389749632e4099afaf (patch)
treeec9210bc75dacabc146298f37eb89e3070170a2f /src/core/surface
parent629b0ed8041f96968e8e61c2b4992d08a38cf28a (diff)
Beginning transport work
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/client.c29
-rw-r--r--src/core/surface/lame_client.c19
-rw-r--r--src/core/surface/server.c124
-rw-r--r--src/core/surface/server_chttp2.c4
5 files changed, 71 insertions, 109 deletions
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 3104b1d00d..5f3e9bec0c 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -44,7 +44,6 @@
#include "src/core/channel/client_setup.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_client_filter.h"
-#include "src/core/channel/http_filter.h"
#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_client.h"
@@ -176,8 +175,7 @@ static void done_setup(void *sp) {
static grpc_transport_setup_result complete_setup(void *channel_stack,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_client_filter};
return grpc_client_channel_transport_setup_complete(
channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters),
mdctx);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 2f898ff7d7..4669c59ee4 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -43,32 +43,9 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void client_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, &op->data.metadata);
- break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_RECV_HALF_CLOSE:
- grpc_call_read_closed(elem);
- break;
- case GRPC_RECV_FINISH:
- grpc_call_stream_closed(elem);
- break;
- case GRPC_RECV_SYNTHETIC_STATUS:
- grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status,
- op->data.synthetic_status.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
- }
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -104,6 +81,6 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
const grpc_channel_filter grpc_client_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ client_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client",
};
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 78170806f1..4e9eb808d7 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -46,22 +46,9 @@ typedef struct { void *unused; } call_data;
typedef struct { void *unused; } channel_data;
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
- grpc_call_op *op) {
+static void lame_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
- switch (op->type) {
- case GRPC_SEND_METADATA:
- grpc_metadata_batch_destroy(&op->data.metadata);
- grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN,
- "Rpc sent on a lame channel.");
- grpc_call_stream_closed(elem);
- break;
- default:
- break;
- }
-
- op->done_cb(op->user_data, GRPC_OP_ERROR);
+ grpc_transport_op_finish_with_failure(op);
}
static void channel_op(grpc_channel_element *elem,
@@ -93,7 +80,7 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ lame_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"lame-client",
};
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e771929870..82d1323a4b 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -173,13 +173,19 @@ struct call_data {
grpc_call *call;
call_state state;
- gpr_timespec deadline;
grpc_mdstr *path;
grpc_mdstr *host;
+ gpr_timespec deadline;
+ int got_initial_metadata;
legacy_data *legacy;
grpc_completion_queue *cq_new;
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -371,46 +377,6 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
-static void stream_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- break;
- case PENDING:
- call_list_remove(calld, PENDING_START);
- /* fallthrough intended */
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
- grpc_call_stream_closed(elem);
-}
-
-static void read_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- case PENDING:
- grpc_call_read_closed(elem);
- break;
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
-}
-
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
@@ -425,33 +391,69 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
- grpc_call_op *op) {
+static void server_on_recv(void *ptr, int success) {
+ grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_RECV_METADATA:
+ channel_data *chand = elem->channel_data;
+
+ if (success && !calld->got_initial_metadata) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
calld->deadline = op->data.metadata.deadline;
- start_new_rpc(elem);
}
+ calld->got_initial_metadata = 1;
+ start_new_rpc(elem);
break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
- break;
- case GRPC_RECV_HALF_CLOSE:
- read_closed(elem);
- break;
- case GRPC_RECV_FINISH:
- stream_closed(elem);
+ }
+ }
+
+ switch (*calld->recv_state) {
+ case GRPC_STREAM_OPEN: break;
+ case GRPC_STREAM_SEND_CLOSED: break;
+ case GRPC_STREAM_RECV_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
+ case GRPC_STREAM_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ } else if (calld->state == PENDING) {
+ call_list_remove(calld, PENDING_START);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
}
+
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ if (op->recv_ops) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->recv_state = op->recv_state;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_recv;
+ op->recv_user_data = elem;
+ }
+
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -592,7 +594,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
};
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index f3b9219f8b..ebde5095a9 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -33,7 +33,6 @@
#include <grpc/grpc.h>
-#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_server.h"
@@ -46,8 +45,7 @@
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
- static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter,
- &grpc_http_filter};
+ static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}