aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-04-16 08:01:49 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-16 14:22:28 -0700
commit6902ad2e9c1e409b37b9b21a95e69a89b2be402f (patch)
tree29da7123f1db30b0b10cf4921a1e26be114f7c27 /src/core/surface
parentb12dc6b5bc448d58bc726c7f36ae9eecc8e4741b (diff)
Switching to batch oriented metadata passing
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c183
-rw-r--r--src/core/surface/call.h17
-rw-r--r--src/core/surface/channel.c52
-rw-r--r--src/core/surface/client.c12
-rw-r--r--src/core/surface/lame_client.c21
-rw-r--r--src/core/surface/server.c39
6 files changed, 157 insertions, 167 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index dba63058b8..9c0c65e21c 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -33,7 +33,6 @@
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
-#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
@@ -41,6 +40,7 @@
#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
@@ -68,8 +68,10 @@ typedef struct {
} completed_request;
/* See request_set in grpc_call below for a description */
-#define REQSET_EMPTY 255
-#define REQSET_DONE 254
+#define REQSET_EMPTY 'X'
+#define REQSET_DONE 'Y'
+
+#define MAX_SEND_INITIAL_METADATA_COUNT 3
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
@@ -204,6 +206,12 @@ struct grpc_call {
/* Call refcount - to keep the call alive during asynchronous operations */
gpr_refcount internal_refcount;
+ grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT];
+ grpc_linked_mdelem status_link;
+ grpc_linked_mdelem details_link;
+ size_t send_initial_metadata_count;
+ gpr_timespec send_deadline;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@@ -226,9 +234,11 @@ struct grpc_call {
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
static send_action choose_send_action(grpc_call *call);
static void enact_send_action(grpc_call *call, send_action sa);
+static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
- const void *server_transport_data) {
+ const void *server_transport_data, grpc_mdelem **add_initial_metadata,
+ size_t add_initial_metadata_count, gpr_timespec send_deadline) {
size_t i;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
@@ -245,6 +255,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
}
+ GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
+ for (i = 0; i < add_initial_metadata_count; i++) {
+ call->send_initial_metadata[i].md = add_initial_metadata[i];
+ }
+ call->send_initial_metadata_count = add_initial_metadata_count;
+ call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
/* one ref is dropped in response to destroy, the other in
@@ -252,6 +268,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_ref_init(&call->internal_refcount, 2);
grpc_call_stack_init(channel_stack, server_transport_data,
CALL_STACK_FROM_CALL(call));
+ if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
+ set_deadline_alarm(call, send_deadline);
+ }
return call;
}
@@ -587,15 +606,28 @@ static send_action choose_send_action(grpc_call *call) {
return SEND_NOTHING;
}
-static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
- grpc_call_op op;
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = GRPC_WRITE_BUFFER_HINT;
- op.data.metadata = elem;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
+static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, grpc_metadata *metadata) {
+ size_t i;
+ grpc_mdelem_list out;
+ if (count == 0) {
+ out.head = out.tail = NULL;
+ return out;
+ }
+ for (i = 0; i < count; i++) {
+ grpc_metadata *md = &metadata[i];
+ grpc_metadata *next_md = (i == count-1) ? NULL : &metadata[i+1];
+ grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i-1];
+ grpc_linked_mdelem *l = (grpc_linked_mdelem*)&md->internal_data;
+ assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
+ l->md = grpc_mdelem_from_string_and_buffer(
+ call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value, md->value_length);
+ l->next = next_md ? (grpc_linked_mdelem*)&next_md->internal_data : NULL;
+ l->prev = prev_md ? (grpc_linked_mdelem*)&prev_md->internal_data : NULL;
+ }
+ out.head = (grpc_linked_mdelem*)&(metadata[0].internal_data);
+ out.tail = (grpc_linked_mdelem*)&(metadata[count-1].internal_data);
+ return out;
}
static void enact_send_action(grpc_call *call, send_action sa) {
@@ -614,13 +646,18 @@ static void enact_send_action(grpc_call *call, send_action sa) {
/* fallthrough */
case SEND_INITIAL_METADATA:
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- for (i = 0; i < data.send_metadata.count; i++) {
- const grpc_metadata *md = &data.send_metadata.metadata[i];
- send_metadata(call,
- grpc_mdelem_from_string_and_buffer(
- call->metadata_context, md->key,
- (const gpr_uint8 *)md->value, md->value_length));
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.data.metadata.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata);
+ op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
+ op.data.metadata.deadline = call->send_deadline;
+ for (i = 0; i < call->send_initial_metadata_count; i++) {
+ grpc_call_op_metadata_link_head(&op.data.metadata, &call->send_initial_metadata[i]);
}
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ grpc_call_execute_op(call, &op);
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
@@ -645,32 +682,32 @@ static void enact_send_action(grpc_call *call, send_action sa) {
case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
- for (i = 0; i < data.send_metadata.count; i++) {
- const grpc_metadata *md = &data.send_metadata.metadata[i];
- send_metadata(call,
- grpc_mdelem_from_string_and_buffer(
- call->metadata_context, md->key,
- (const gpr_uint8 *)md->value, md->value_length));
- }
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.data.metadata.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata);
+ op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
+ op.data.metadata.deadline = call->send_deadline;
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
gpr_ltoa(data.send_status.code, status_str);
- send_metadata(
- call,
- grpc_mdelem_from_metadata_strings(
+ grpc_call_op_metadata_add_tail(&op.data.metadata, &call->status_link,
+ grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_status.details) {
- send_metadata(
- call,
+ grpc_call_op_metadata_add_tail(&op.data.metadata, &call->details_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context,
data.send_status.details)));
}
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ grpc_call_execute_op(call, &op);
/* fallthrough: see choose_send_action for details */
case SEND_FINISH:
op.type = GRPC_SEND_FINISH;
@@ -875,9 +912,7 @@ static void call_alarm(void *arg, int success) {
grpc_call_internal_unref(call, 1);
}
-void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-
+static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
}
@@ -886,11 +921,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-static void set_read_state(grpc_call *call, read_state state) {
- lock(call);
+static void set_read_state_locked(grpc_call *call, read_state state) {
GPR_ASSERT(call->read_state < state);
call->read_state = state;
finish_read_ops(call);
+}
+
+static void set_read_state(grpc_call *call, read_state state) {
+ lock(call);
+ set_read_state_locked(call, state);
unlock(call);
}
@@ -936,52 +975,60 @@ void grpc_call_recv_message(grpc_call_element *elem,
unlock(call);
}
-void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
+int grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op_metadata *md) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- grpc_mdstr *key = md->key;
+ grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
+ int is_trailing;
lock(call);
- if (key == grpc_channel_get_status_string(call->channel)) {
- set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
- grpc_mdelem_unref(md);
- } else if (key == grpc_channel_get_message_string(call->channel)) {
- set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
- grpc_mdelem_unref(md);
- } else {
- dest = &call->buffered_metadata[call->read_state >=
- READ_STATE_GOT_INITIAL_METADATA];
- if (dest->count == dest->capacity) {
- dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
- dest->metadata =
- gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
- }
- mdusr = &dest->metadata[dest->count++];
- mdusr->key = grpc_mdstr_as_c_string(md->key);
- mdusr->value = grpc_mdstr_as_c_string(md->value);
- mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
- if (call->owned_metadata_count == call->owned_metadata_capacity) {
- call->owned_metadata_capacity = GPR_MAX(
- call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
- call->owned_metadata =
- gpr_realloc(call->owned_metadata,
- sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
+ is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
+ for (l = md->list.head; l; l = l->next) {
+ grpc_mdelem *md = l->md;
+ grpc_mdstr *key = md->key;
+ if (key == grpc_channel_get_status_string(call->channel)) {
+ set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
+ grpc_mdelem_unref(md);
+ } else if (key == grpc_channel_get_message_string(call->channel)) {
+ set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
+ grpc_mdelem_unref(md);
+ } else {
+ dest = &call->buffered_metadata[is_trailing];
+ if (dest->count == dest->capacity) {
+ dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
+ dest->metadata =
+ gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
+ }
+ mdusr = &dest->metadata[dest->count++];
+ mdusr->key = grpc_mdstr_as_c_string(md->key);
+ mdusr->value = grpc_mdstr_as_c_string(md->value);
+ mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
+ if (call->owned_metadata_count == call->owned_metadata_capacity) {
+ call->owned_metadata_capacity = GPR_MAX(
+ call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
+ call->owned_metadata =
+ gpr_realloc(call->owned_metadata,
+ sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
+ }
+ call->owned_metadata[call->owned_metadata_count++] = md;
}
- call->owned_metadata[call->owned_metadata_count++] = md;
+ }
+ if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
+ set_deadline_alarm(call, md->deadline);
+ }
+ if (!is_trailing) {
+ set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
}
unlock(call);
+
+ return !is_trailing;
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
-void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
- grpc_call *call = grpc_call_from_top_element(surface_element);
- set_read_state(call, READ_STATE_GOT_INITIAL_METADATA);
-}
-
/*
* BATCH API IMPLEMENTATION
*/
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 06434f87ac..25b0fd3cc0 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -35,7 +35,6 @@
#define GRPC_INTERNAL_CORE_SURFACE_CALL_H
#include "src/core/channel/channel_stack.h"
-#include "src/core/channel/metadata_buffer.h"
#include <grpc/grpc.h>
/* Primitive operation types - grpc_op's get rewritten into these */
@@ -67,7 +66,7 @@ typedef union {
} recv_status_details;
struct {
size_t count;
- const grpc_metadata *metadata;
+ grpc_metadata *metadata;
} send_metadata;
grpc_byte_buffer *send_message;
struct {
@@ -86,7 +85,8 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
- const void *server_transport_data);
+ const void *server_transport_data, grpc_mdelem **add_initial_metadata,
+ size_t add_initial_metadata_count, gpr_timespec send_deadline);
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
@@ -96,8 +96,9 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
/* Helpers for grpc_client, grpc_server filters to publish received data to
the completion queue/surface layer */
-void grpc_call_recv_metadata(grpc_call_element *surface_element,
- grpc_mdelem *md);
+/* receive metadata - returns 1 if this was initial metadata */
+int grpc_call_recv_metadata(grpc_call_element *surface_element,
+ grpc_call_op_metadata *md);
void grpc_call_recv_message(grpc_call_element *surface_element,
grpc_byte_buffer *message);
void grpc_call_read_closed(grpc_call_element *surface_element);
@@ -108,12 +109,6 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
-/* Called when it's known that the initial batch of metadata is complete */
-void grpc_call_initial_metadata_complete(grpc_call_element *surface_element);
-
-void grpc_call_set_deadline(grpc_call_element *surface_element,
- gpr_timespec deadline);
-
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
/* Given the top call_element, get the call object. */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index d3962a00c4..29b042e7c1 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -62,7 +62,7 @@ struct grpc_channel {
registered_call *registered_calls;
};
-#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \
(((grpc_channel *)(channel_stack)) - 1)
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
@@ -91,44 +91,25 @@ grpc_channel *grpc_channel_create_from_filters(
return channel;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
- grpc_call *call;
- grpc_call_op op;
+ grpc_mdelem *send_metadata[2];
- if (!channel->is_client) {
- gpr_log(GPR_ERROR, "Cannot create a call on the server.");
- return NULL;
- }
+ GPR_ASSERT(channel->is_client);
- call = grpc_call_create(channel, cq, NULL);
+ send_metadata[0] = path_mdelem;
+ send_metadata[1] = authority_mdelem;
- /* Add :path and :authority headers. */
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.data.metadata = path_mdelem;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
-
- op.data.metadata = authority_mdelem;
- grpc_call_execute_op(call, &op);
-
- if (0 != gpr_time_cmp(deadline, gpr_inf_future)) {
- op.type = GRPC_SEND_DEADLINE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.data.deadline = deadline;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
- }
+ return grpc_call_create(channel, cq, NULL, send_metadata,
+ GPR_ARRAY_SIZE(send_metadata), deadline);
+}
- return call;
+grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
+ const char *method, const char *host,
+ gpr_timespec absolute_deadline) {
+ return grpc_channel_create_call(channel, NULL, method, host,
+ absolute_deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
@@ -146,13 +127,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
deadline);
}
-grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
- const char *method, const char *host,
- gpr_timespec absolute_deadline) {
- return grpc_channel_create_call(channel, NULL, method, host,
- absolute_deadline);
-}
-
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host) {
registered_call *rc = gpr_malloc(sizeof(registered_call));
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 4d54865d16..71ca26e65d 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -52,15 +52,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
- case GRPC_SEND_DEADLINE:
- grpc_call_set_deadline(elem, op->data.deadline);
- grpc_call_next_op(elem, op);
- break;
case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, op->data.metadata);
- break;
- case GRPC_RECV_DEADLINE:
- gpr_log(GPR_ERROR, "Deadline received by client (ignored)");
+ grpc_call_recv_metadata(elem, &op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message);
@@ -72,9 +65,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
case GRPC_RECV_FINISH:
grpc_call_stream_closed(elem);
break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- grpc_call_initial_metadata_complete(elem);
- break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
grpc_call_next_op(elem, op);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index b40c48381f..f2497e28e6 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -47,23 +47,20 @@ typedef struct {
} call_data;
typedef struct {
- grpc_mdelem *status;
- grpc_mdelem *message;
+ void *unused;
} channel_data;
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
- channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_START:
- grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status));
- grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message));
+ grpc_call_element_recv_status(elem, GRPC_STATUS_UNKNOWN, "Rpc sent on a lame channel.");
grpc_call_stream_closed(elem);
break;
case GRPC_SEND_METADATA:
- grpc_mdelem_unref(op->data.metadata);
+ abort();
break;
default:
break;
@@ -94,23 +91,11 @@ static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
- channel_data *channeld = elem->channel_data;
- char status[12];
-
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
-
- channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message",
- "Rpc sent on a lame channel.");
- gpr_ltoa(GRPC_STATUS_UNKNOWN, status);
- channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
- channel_data *channeld = elem->channel_data;
-
- grpc_mdelem_unref(channeld->message);
- grpc_mdelem_unref(channeld->status);
}
static const grpc_channel_filter lame_filter = {
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 17cba9a505..e0d7950dea 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -411,29 +411,32 @@ static void read_closed(grpc_call_element *elem) {
gpr_mu_unlock(&chand->server->mu);
}
+static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (md->key == chand->path_key) {
+ calld->path = grpc_mdstr_ref(md->value);
+ return NULL;
+ } else if (md->key == chand->authority_key) {
+ calld->host = grpc_mdstr_ref(md->value);
+ return NULL;
+ }
+ return md;
+}
+
static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
grpc_call_op *op) {
- channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_mdelem *md;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_RECV_METADATA:
- md = op->data.metadata;
- if (md->key == chand->path_key) {
- calld->path = grpc_mdstr_ref(md->value);
- grpc_mdelem_unref(md);
- } else if (md->key == chand->authority_key) {
- calld->host = grpc_mdstr_ref(md->value);
- grpc_mdelem_unref(md);
- } else {
- grpc_call_recv_metadata(elem, md);
+ grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem);
+ if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ calld->deadline = op->data.metadata.deadline;
+ start_new_rpc(elem);
}
break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- start_new_rpc(elem);
- grpc_call_initial_metadata_complete(elem);
- break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message);
op->done_cb(op->user_data, GRPC_OP_OK);
@@ -444,10 +447,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
case GRPC_RECV_FINISH:
stream_closed(elem);
break;
- case GRPC_RECV_DEADLINE:
- grpc_call_set_deadline(elem, op->data.deadline);
- ((call_data *)elem->call_data)->deadline = op->data.deadline;
- break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
grpc_call_next_op(elem, op);
@@ -464,7 +463,7 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_ACCEPT_CALL:
/* create a call */
grpc_call_create(chand->channel, NULL,
- op->data.accept_call.transport_server_data);
+ op->data.accept_call.transport_server_data, NULL, 0, gpr_inf_future);
break;
case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the