diff options
author | 2015-04-16 08:01:49 -0700 | |
---|---|---|
committer | 2015-04-16 14:22:28 -0700 | |
commit | 6902ad2e9c1e409b37b9b21a95e69a89b2be402f (patch) | |
tree | 29da7123f1db30b0b10cf4921a1e26be114f7c27 /src/core/surface | |
parent | b12dc6b5bc448d58bc726c7f36ae9eecc8e4741b (diff) |
Switching to batch oriented metadata passing
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 183 | ||||
-rw-r--r-- | src/core/surface/call.h | 17 | ||||
-rw-r--r-- | src/core/surface/channel.c | 52 | ||||
-rw-r--r-- | src/core/surface/client.c | 12 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 21 | ||||
-rw-r--r-- | src/core/surface/server.c | 39 |
6 files changed, 157 insertions, 167 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dba63058b8..9c0c65e21c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -33,7 +33,6 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" @@ -41,6 +40,7 @@ #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <assert.h> #include <stdio.h> #include <stdlib.h> @@ -68,8 +68,10 @@ typedef struct { } completed_request; /* See request_set in grpc_call below for a description */ -#define REQSET_EMPTY 255 -#define REQSET_DONE 254 +#define REQSET_EMPTY 'X' +#define REQSET_DONE 'Y' + +#define MAX_SEND_INITIAL_METADATA_COUNT 3 typedef struct { /* Overall status of the operation: starts OK, may degrade to @@ -204,6 +206,12 @@ struct grpc_call { /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; + grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT]; + grpc_linked_mdelem status_link; + grpc_linked_mdelem details_link; + size_t send_initial_metadata_count; + gpr_timespec send_deadline; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; @@ -226,9 +234,11 @@ struct grpc_call { static void do_nothing(void *ignored, grpc_op_error also_ignored) {} static send_action choose_send_action(grpc_call *call); static void enact_send_action(grpc_call *call, send_action sa); +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data) { + const void *server_transport_data, grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, gpr_timespec send_deadline) { size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = @@ -245,6 +255,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; } + GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); + for (i = 0; i < add_initial_metadata_count; i++) { + call->send_initial_metadata[i].md = add_initial_metadata[i]; + } + call->send_initial_metadata_count = add_initial_metadata_count; + call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); /* one ref is dropped in response to destroy, the other in @@ -252,6 +268,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_ref_init(&call->internal_refcount, 2); grpc_call_stack_init(channel_stack, server_transport_data, CALL_STACK_FROM_CALL(call)); + if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, send_deadline); + } return call; } @@ -587,15 +606,28 @@ static send_action choose_send_action(grpc_call *call) { return SEND_NOTHING; } -static void send_metadata(grpc_call *call, grpc_mdelem *elem) { - grpc_call_op op; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = GRPC_WRITE_BUFFER_HINT; - op.data.metadata = elem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); +static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, grpc_metadata *metadata) { + size_t i; + grpc_mdelem_list out; + if (count == 0) { + out.head = out.tail = NULL; + return out; + } + for (i = 0; i < count; i++) { + grpc_metadata *md = &metadata[i]; + grpc_metadata *next_md = (i == count-1) ? NULL : &metadata[i+1]; + grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i-1]; + grpc_linked_mdelem *l = (grpc_linked_mdelem*)&md->internal_data; + assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); + l->md = grpc_mdelem_from_string_and_buffer( + call->metadata_context, md->key, + (const gpr_uint8 *)md->value, md->value_length); + l->next = next_md ? (grpc_linked_mdelem*)&next_md->internal_data : NULL; + l->prev = prev_md ? (grpc_linked_mdelem*)&prev_md->internal_data : NULL; + } + out.head = (grpc_linked_mdelem*)&(metadata[0].internal_data); + out.tail = (grpc_linked_mdelem*)&(metadata[count-1].internal_data); + return out; } static void enact_send_action(grpc_call *call, send_action sa) { @@ -614,13 +646,18 @@ static void enact_send_action(grpc_call *call, send_action sa) { /* fallthrough */ case SEND_INITIAL_METADATA: data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.data.metadata.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; + for (i = 0; i < call->send_initial_metadata_count; i++) { + grpc_call_op_metadata_link_head(&op.data.metadata, &call->send_initial_metadata[i]); } + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); op.type = GRPC_SEND_START; op.dir = GRPC_CALL_DOWN; op.flags = flags; @@ -645,32 +682,32 @@ static void enact_send_action(grpc_call *call, send_action sa) { case SEND_TRAILING_METADATA_AND_FINISH: /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); - } + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.data.metadata.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; gpr_ltoa(data.send_status.code, status_str); - send_metadata( - call, - grpc_mdelem_from_metadata_strings( + grpc_call_op_metadata_add_tail(&op.data.metadata, &call->status_link, + grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, status_str))); if (data.send_status.details) { - send_metadata( - call, + grpc_call_op_metadata_add_tail(&op.data.metadata, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, data.send_status.details))); } + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); /* fallthrough: see choose_send_action for details */ case SEND_FINISH: op.type = GRPC_SEND_FINISH; @@ -875,9 +912,7 @@ static void call_alarm(void *arg, int success) { grpc_call_internal_unref(call, 1); } -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { if (call->have_alarm) { gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); } @@ -886,11 +921,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state(grpc_call *call, read_state state) { - lock(call); +static void set_read_state_locked(grpc_call *call, read_state state) { GPR_ASSERT(call->read_state < state); call->read_state = state; finish_read_ops(call); +} + +static void set_read_state(grpc_call *call, read_state state) { + lock(call); + set_read_state_locked(call, state); unlock(call); } @@ -936,52 +975,60 @@ void grpc_call_recv_message(grpc_call_element *elem, unlock(call); } -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { +int grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op_metadata *md) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdstr *key = md->key; + grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; + int is_trailing; lock(call); - if (key == grpc_channel_get_status_string(call->channel)) { - set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); - grpc_mdelem_unref(md); - } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); - } else { - dest = &call->buffered_metadata[call->read_state >= - READ_STATE_GOT_INITIAL_METADATA]; - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(md->key); - mdusr->value = grpc_mdstr_as_c_string(md->value); - mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = GPR_MAX( - call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; + for (l = md->list.head; l; l = l->next) { + grpc_mdelem *md = l->md; + grpc_mdstr *key = md->key; + if (key == grpc_channel_get_status_string(call->channel)) { + set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); + grpc_mdelem_unref(md); + } else if (key == grpc_channel_get_message_string(call->channel)) { + set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + grpc_mdelem_unref(md); + } else { + dest = &call->buffered_metadata[is_trailing]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(md->key); + mdusr->value = grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + if (call->owned_metadata_count == call->owned_metadata_capacity) { + call->owned_metadata_capacity = GPR_MAX( + call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); + call->owned_metadata = + gpr_realloc(call->owned_metadata, + sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + } + call->owned_metadata[call->owned_metadata_count++] = md; } - call->owned_metadata[call->owned_metadata_count++] = md; + } + if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, md->deadline); + } + if (!is_trailing) { + set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); } unlock(call); + + return !is_trailing; } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { - grpc_call *call = grpc_call_from_top_element(surface_element); - set_read_state(call, READ_STATE_GOT_INITIAL_METADATA); -} - /* * BATCH API IMPLEMENTATION */ diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 06434f87ac..25b0fd3cc0 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -35,7 +35,6 @@ #define GRPC_INTERNAL_CORE_SURFACE_CALL_H #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include <grpc/grpc.h> /* Primitive operation types - grpc_op's get rewritten into these */ @@ -67,7 +66,7 @@ typedef union { } recv_status_details; struct { size_t count; - const grpc_metadata *metadata; + grpc_metadata *metadata; } send_metadata; grpc_byte_buffer *send_message; struct { @@ -86,7 +85,8 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call, void *user_data); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data); + const void *server_transport_data, grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, gpr_timespec send_deadline); void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); @@ -96,8 +96,9 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); /* Helpers for grpc_client, grpc_server filters to publish received data to the completion queue/surface layer */ -void grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_mdelem *md); +/* receive metadata - returns 1 if this was initial metadata */ +int grpc_call_recv_metadata(grpc_call_element *surface_element, + grpc_call_op_metadata *md); void grpc_call_recv_message(grpc_call_element *surface_element, grpc_byte_buffer *message); void grpc_call_read_closed(grpc_call_element *surface_element); @@ -108,12 +109,6 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); -/* Called when it's known that the initial batch of metadata is complete */ -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element); - -void grpc_call_set_deadline(grpc_call_element *surface_element, - gpr_timespec deadline); - grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); /* Given the top call_element, get the call object. */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index d3962a00c4..29b042e7c1 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -62,7 +62,7 @@ struct grpc_channel { registered_call *registered_calls; }; -#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) #define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ (((grpc_channel *)(channel_stack)) - 1) #define CHANNEL_FROM_TOP_ELEM(top_elem) \ @@ -91,44 +91,25 @@ grpc_channel *grpc_channel_create_from_filters( return channel; } -static void do_nothing(void *ignored, grpc_op_error error) {} - static grpc_call *grpc_channel_create_call_internal( grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { - grpc_call *call; - grpc_call_op op; + grpc_mdelem *send_metadata[2]; - if (!channel->is_client) { - gpr_log(GPR_ERROR, "Cannot create a call on the server."); - return NULL; - } + GPR_ASSERT(channel->is_client); - call = grpc_call_create(channel, cq, NULL); + send_metadata[0] = path_mdelem; + send_metadata[1] = authority_mdelem; - /* Add :path and :authority headers. */ - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.metadata = path_mdelem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - - op.data.metadata = authority_mdelem; - grpc_call_execute_op(call, &op); - - if (0 != gpr_time_cmp(deadline, gpr_inf_future)) { - op.type = GRPC_SEND_DEADLINE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.deadline = deadline; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - } + return grpc_call_create(channel, cq, NULL, send_metadata, + GPR_ARRAY_SIZE(send_metadata), deadline); +} - return call; +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, + const char *method, const char *host, + gpr_timespec absolute_deadline) { + return grpc_channel_create_call(channel, NULL, method, host, + absolute_deadline); } grpc_call *grpc_channel_create_call(grpc_channel *channel, @@ -146,13 +127,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, deadline); } -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec absolute_deadline) { - return grpc_channel_create_call(channel, NULL, method, host, - absolute_deadline); -} - void *grpc_channel_register_call(grpc_channel *channel, const char *method, const char *host) { registered_call *rc = gpr_malloc(sizeof(registered_call)); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 4d54865d16..71ca26e65d 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -52,15 +52,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - grpc_call_next_op(elem, op); - break; case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op->data.metadata); - break; - case GRPC_RECV_DEADLINE: - gpr_log(GPR_ERROR, "Deadline received by client (ignored)"); + grpc_call_recv_metadata(elem, &op->data.metadata); break; case GRPC_RECV_MESSAGE: grpc_call_recv_message(elem, op->data.message); @@ -72,9 +65,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, case GRPC_RECV_FINISH: grpc_call_stream_closed(elem); break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - grpc_call_initial_metadata_complete(elem); - break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); grpc_call_next_op(elem, op); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b40c48381f..f2497e28e6 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -47,23 +47,20 @@ typedef struct { } call_data; typedef struct { - grpc_mdelem *status; - grpc_mdelem *message; + void *unused; } channel_data; static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { - channel_data *channeld = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_SEND_START: - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status)); - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); + grpc_call_element_recv_status(elem, GRPC_STATUS_UNKNOWN, "Rpc sent on a lame channel."); grpc_call_stream_closed(elem); break; case GRPC_SEND_METADATA: - grpc_mdelem_unref(op->data.metadata); + abort(); break; default: break; @@ -94,23 +91,11 @@ static void destroy_call_elem(grpc_call_element *elem) {} static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { - channel_data *channeld = elem->channel_data; - char status[12]; - GPR_ASSERT(is_first); GPR_ASSERT(is_last); - - channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message", - "Rpc sent on a lame channel."); - gpr_ltoa(GRPC_STATUS_UNKNOWN, status); - channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status); } static void destroy_channel_elem(grpc_channel_element *elem) { - channel_data *channeld = elem->channel_data; - - grpc_mdelem_unref(channeld->message); - grpc_mdelem_unref(channeld->status); } static const grpc_channel_filter lame_filter = { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 17cba9a505..e0d7950dea 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -411,29 +411,32 @@ static void read_closed(grpc_call_element *elem) { gpr_mu_unlock(&chand->server->mu); } +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (md->key == chand->path_key) { + calld->path = grpc_mdstr_ref(md->value); + return NULL; + } else if (md->key == chand->authority_key) { + calld->host = grpc_mdstr_ref(md->value); + return NULL; + } + return md; +} + static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, grpc_call_op *op) { - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_mdelem *md; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_RECV_METADATA: - md = op->data.metadata; - if (md->key == chand->path_key) { - calld->path = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else if (md->key == chand->authority_key) { - calld->host = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else { - grpc_call_recv_metadata(elem, md); + grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem); + if (grpc_call_recv_metadata(elem, &op->data.metadata)) { + calld->deadline = op->data.metadata.deadline; + start_new_rpc(elem); } break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - start_new_rpc(elem); - grpc_call_initial_metadata_complete(elem); - break; case GRPC_RECV_MESSAGE: grpc_call_recv_message(elem, op->data.message); op->done_cb(op->user_data, GRPC_OP_OK); @@ -444,10 +447,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, case GRPC_RECV_FINISH: stream_closed(elem); break; - case GRPC_RECV_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - ((call_data *)elem->call_data)->deadline = op->data.deadline; - break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); grpc_call_next_op(elem, op); @@ -464,7 +463,7 @@ static void channel_op(grpc_channel_element *elem, case GRPC_ACCEPT_CALL: /* create a call */ grpc_call_create(chand->channel, NULL, - op->data.accept_call.transport_server_data); + op->data.accept_call.transport_server_data, NULL, 0, gpr_inf_future); break; case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the |