diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 225 | ||||
-rw-r--r-- | src/core/surface/call.h | 25 | ||||
-rw-r--r-- | src/core/surface/channel.c | 52 | ||||
-rw-r--r-- | src/core/surface/client.c | 28 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 42 | ||||
-rw-r--r-- | src/core/surface/server.c | 40 |
6 files changed, 219 insertions, 193 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index dba63058b8..2949805622 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -33,7 +33,6 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" @@ -41,6 +40,7 @@ #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <assert.h> #include <stdio.h> #include <stdlib.h> @@ -68,8 +68,10 @@ typedef struct { } completed_request; /* See request_set in grpc_call below for a description */ -#define REQSET_EMPTY 255 -#define REQSET_DONE 254 +#define REQSET_EMPTY 'X' +#define REQSET_DONE 'Y' + +#define MAX_SEND_INITIAL_METADATA_COUNT 3 typedef struct { /* Overall status of the operation: starts OK, may degrade to @@ -92,6 +94,8 @@ typedef enum { /* Status came from the application layer overriding whatever the wire says */ STATUS_FROM_API_OVERRIDE = 0, + /* Status was created by some internal channel stack operation */ + STATUS_FROM_CORE, /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, @@ -204,12 +208,18 @@ struct grpc_call { /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; + grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT]; + grpc_linked_mdelem status_link; + grpc_linked_mdelem details_link; + size_t send_initial_metadata_count; + gpr_timespec send_deadline; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; }; -#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1)) +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) @@ -226,9 +236,13 @@ struct grpc_call { static void do_nothing(void *ignored, grpc_op_error also_ignored) {} static send_action choose_send_action(grpc_call *call); static void enact_send_action(grpc_call *call, send_action sa); +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data) { + const void *server_transport_data, + grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, + gpr_timespec send_deadline) { size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = @@ -245,6 +259,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; } + GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); + for (i = 0; i < add_initial_metadata_count; i++) { + call->send_initial_metadata[i].md = add_initial_metadata[i]; + } + call->send_initial_metadata_count = add_initial_metadata_count; + call->send_deadline = send_deadline; grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); /* one ref is dropped in response to destroy, the other in @@ -252,6 +272,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_ref_init(&call->internal_refcount, 2); grpc_call_stack_init(channel_stack, server_transport_data, CALL_STACK_FROM_CALL(call)); + if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, send_deadline); + } return call; } @@ -284,6 +307,9 @@ static void destroy_call(void *call, int ignored_success) { for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { gpr_free(c->buffered_metadata[i].metadata); } + for (i = 0; i < c->send_initial_metadata_count; i++) { + grpc_mdelem_unref(c->send_initial_metadata[i].md); + } if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } @@ -342,6 +368,7 @@ static void request_more_data(grpc_call *call) { op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); } @@ -587,15 +614,29 @@ static send_action choose_send_action(grpc_call *call) { return SEND_NOTHING; } -static void send_metadata(grpc_call *call, grpc_mdelem *elem) { - grpc_call_op op; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = GRPC_WRITE_BUFFER_HINT; - op.data.metadata = elem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); +static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, + grpc_metadata *metadata) { + size_t i; + grpc_mdelem_list out; + if (count == 0) { + out.head = out.tail = NULL; + return out; + } + for (i = 0; i < count; i++) { + grpc_metadata *md = &metadata[i]; + grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; + grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); + l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length); + l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; + l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; + } + out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); + out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); + return out; } static void enact_send_action(grpc_call *call, send_action sa) { @@ -614,19 +655,21 @@ static void enact_send_action(grpc_call *call, send_action sa) { /* fallthrough */ case SEND_INITIAL_METADATA: data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); - } - op.type = GRPC_SEND_START; + op.type = GRPC_SEND_METADATA; op.dir = GRPC_CALL_DOWN; op.flags = flags; - op.data.start.pollset = grpc_cq_pollset(call->cq); + op.data.metadata.list = chain_metadata_from_app( + call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; + for (i = 0; i < call->send_initial_metadata_count; i++) { + grpc_metadata_batch_link_head(&op.data.metadata, + &call->send_initial_metadata[i]); + } + call->send_initial_metadata_count = 0; op.done_cb = finish_start_step; op.user_data = call; + op.bind_pollset = grpc_cq_pollset(call->cq); grpc_call_execute_op(call, &op); break; case SEND_BUFFERED_MESSAGE: @@ -640,37 +683,42 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.data.message = data.send_message; op.done_cb = finish_write_step; op.user_data = call; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); break; case SEND_TRAILING_METADATA_AND_FINISH: /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - send_metadata(call, - grpc_mdelem_from_string_and_buffer( - call->metadata_context, md->key, - (const gpr_uint8 *)md->value, md->value_length)); - } + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.data.metadata.list = chain_metadata_from_app( + call, data.send_metadata.count, data.send_metadata.metadata); + op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; + op.data.metadata.deadline = call->send_deadline; + op.bind_pollset = NULL; /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; gpr_ltoa(data.send_status.code, status_str); - send_metadata( - call, + grpc_metadata_batch_add_tail( + &op.data.metadata, &call->status_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, status_str))); if (data.send_status.details) { - send_metadata( - call, + grpc_metadata_batch_add_tail( + &op.data.metadata, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)), grpc_mdstr_from_string(call->metadata_context, data.send_status.details))); } + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); /* fallthrough: see choose_send_action for details */ case SEND_FINISH: op.type = GRPC_SEND_FINISH; @@ -678,6 +726,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.flags = 0; op.done_cb = finish_finish_step; op.user_data = call; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); break; } @@ -831,6 +880,7 @@ grpc_call_error grpc_call_cancel(grpc_call *c) { op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; + op.bind_pollset = NULL; elem = CALL_ELEM_FROM_CALL(c, 0); elem->filter->call_op(elem, NULL, &op); @@ -875,9 +925,7 @@ static void call_alarm(void *arg, int success) { grpc_call_internal_unref(call, 1); } -void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { - grpc_call *call = CALL_FROM_TOP_ELEM(elem); - +static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { if (call->have_alarm) { gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); } @@ -886,11 +934,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); } -static void set_read_state(grpc_call *call, read_state state) { - lock(call); +static void set_read_state_locked(grpc_call *call, read_state state) { GPR_ASSERT(call->read_state < state); call->read_state = state; finish_read_ops(call); +} + +static void set_read_state(grpc_call *call, read_state state) { + lock(call); + set_read_state_locked(call, state); unlock(call); } @@ -914,7 +966,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { gpr_uint32 status; void *user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data) { - status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET; + status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_SLICE_LENGTH(md->value->slice), @@ -936,52 +988,81 @@ void grpc_call_recv_message(grpc_call_element *elem, unlock(call); } -void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { +void grpc_call_recv_synthetic_status(grpc_call_element *elem, + grpc_status_code status, + const char *message) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - grpc_mdstr *key = md->key; + lock(call); + set_status_code(call, STATUS_FROM_CORE, status); + set_status_details(call, STATUS_FROM_CORE, + grpc_mdstr_from_string(call->metadata_context, message)); + unlock(call); +} + +int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + grpc_linked_mdelem *l; grpc_metadata_array *dest; grpc_metadata *mdusr; + int is_trailing; + grpc_mdctx *mdctx = call->metadata_context; lock(call); - if (key == grpc_channel_get_status_string(call->channel)) { - set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); - grpc_mdelem_unref(md); - } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); - } else { - dest = &call->buffered_metadata[call->read_state >= - READ_STATE_GOT_INITIAL_METADATA]; - if (dest->count == dest->capacity) { - dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); - dest->metadata = - gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); - } - mdusr = &dest->metadata[dest->count++]; - mdusr->key = grpc_mdstr_as_c_string(md->key); - mdusr->value = grpc_mdstr_as_c_string(md->value); - mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); - if (call->owned_metadata_count == call->owned_metadata_capacity) { - call->owned_metadata_capacity = GPR_MAX( - call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2); - call->owned_metadata = - gpr_realloc(call->owned_metadata, - sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; + for (l = md->list.head; l != NULL; l = l->next) { + grpc_mdelem *md = l->md; + grpc_mdstr *key = md->key; + if (key == grpc_channel_get_status_string(call->channel)) { + set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); + } else if (key == grpc_channel_get_message_string(call->channel)) { + set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + } else { + dest = &call->buffered_metadata[is_trailing]; + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); + dest->metadata = + gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); + } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(md->key); + mdusr->value = grpc_mdstr_as_c_string(md->value); + mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); + if (call->owned_metadata_count == call->owned_metadata_capacity) { + call->owned_metadata_capacity = + GPR_MAX(call->owned_metadata_capacity + 8, + call->owned_metadata_capacity * 2); + call->owned_metadata = + gpr_realloc(call->owned_metadata, + sizeof(grpc_mdelem *) * call->owned_metadata_capacity); + } + call->owned_metadata[call->owned_metadata_count++] = md; + l->md = 0; } - call->owned_metadata[call->owned_metadata_count++] = md; + } + if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { + set_deadline_alarm(call, md->deadline); + } + if (!is_trailing) { + set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA); } unlock(call); + + grpc_mdctx_lock(mdctx); + for (l = md->list.head; l; l = l->next) { + if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + for (l = md->garbage.head; l; l = l->next) { + grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + grpc_mdctx_unlock(mdctx); + + return !is_trailing; } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { - grpc_call *call = grpc_call_from_top_element(surface_element); - set_read_state(call, READ_STATE_GOT_INITIAL_METADATA); -} - /* * BATCH API IMPLEMENTATION */ diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 06434f87ac..f8d0915349 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -35,7 +35,6 @@ #define GRPC_INTERNAL_CORE_SURFACE_CALL_H #include "src/core/channel/channel_stack.h" -#include "src/core/channel/metadata_buffer.h" #include <grpc/grpc.h> /* Primitive operation types - grpc_op's get rewritten into these */ @@ -67,7 +66,7 @@ typedef union { } recv_status_details; struct { size_t count; - const grpc_metadata *metadata; + grpc_metadata *metadata; } send_metadata; grpc_byte_buffer *send_message; struct { @@ -86,7 +85,10 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call, void *user_data); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, - const void *server_transport_data); + const void *server_transport_data, + grpc_mdelem **add_initial_metadata, + size_t add_initial_metadata_count, + gpr_timespec send_deadline); void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); @@ -96,8 +98,9 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); /* Helpers for grpc_client, grpc_server filters to publish received data to the completion queue/surface layer */ -void grpc_call_recv_metadata(grpc_call_element *surface_element, - grpc_mdelem *md); +/* receive metadata - returns 1 if this was initial metadata */ +int grpc_call_recv_metadata(grpc_call_element *surface_element, + grpc_metadata_batch *md); void grpc_call_recv_message(grpc_call_element *surface_element, grpc_byte_buffer *message); void grpc_call_read_closed(grpc_call_element *surface_element); @@ -108,14 +111,12 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); -/* Called when it's known that the initial batch of metadata is complete */ -void grpc_call_initial_metadata_complete(grpc_call_element *surface_element); - -void grpc_call_set_deadline(grpc_call_element *surface_element, - gpr_timespec deadline); - grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); +void grpc_call_recv_synthetic_status(grpc_call_element *elem, + grpc_status_code status, + const char *message); + /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); @@ -128,4 +129,4 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity, #define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \ if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag) -#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ +#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index d3962a00c4..29b042e7c1 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -62,7 +62,7 @@ struct grpc_channel { registered_call *registered_calls; }; -#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) #define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ (((grpc_channel *)(channel_stack)) - 1) #define CHANNEL_FROM_TOP_ELEM(top_elem) \ @@ -91,44 +91,25 @@ grpc_channel *grpc_channel_create_from_filters( return channel; } -static void do_nothing(void *ignored, grpc_op_error error) {} - static grpc_call *grpc_channel_create_call_internal( grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { - grpc_call *call; - grpc_call_op op; + grpc_mdelem *send_metadata[2]; - if (!channel->is_client) { - gpr_log(GPR_ERROR, "Cannot create a call on the server."); - return NULL; - } + GPR_ASSERT(channel->is_client); - call = grpc_call_create(channel, cq, NULL); + send_metadata[0] = path_mdelem; + send_metadata[1] = authority_mdelem; - /* Add :path and :authority headers. */ - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.metadata = path_mdelem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - - op.data.metadata = authority_mdelem; - grpc_call_execute_op(call, &op); - - if (0 != gpr_time_cmp(deadline, gpr_inf_future)) { - op.type = GRPC_SEND_DEADLINE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.deadline = deadline; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - } + return grpc_call_create(channel, cq, NULL, send_metadata, + GPR_ARRAY_SIZE(send_metadata), deadline); +} - return call; +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, + const char *method, const char *host, + gpr_timespec absolute_deadline) { + return grpc_channel_create_call(channel, NULL, method, host, + absolute_deadline); } grpc_call *grpc_channel_create_call(grpc_channel *channel, @@ -146,13 +127,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, deadline); } -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec absolute_deadline) { - return grpc_channel_create_call(channel, NULL, method, host, - absolute_deadline); -} - void *grpc_channel_register_call(grpc_channel *channel, const char *method, const char *host) { registered_call *rc = gpr_malloc(sizeof(registered_call)); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index 4d54865d16..2f898ff7d7 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -39,28 +39,17 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef struct { - void *unused; -} call_data; +typedef struct { void *unused; } call_data; -typedef struct { - void *unused; -} channel_data; +typedef struct { void *unused; } channel_data; static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - grpc_call_next_op(elem, op); - break; case GRPC_RECV_METADATA: - grpc_call_recv_metadata(elem, op->data.metadata); - break; - case GRPC_RECV_DEADLINE: - gpr_log(GPR_ERROR, "Deadline received by client (ignored)"); + grpc_call_recv_metadata(elem, &op->data.metadata); break; case GRPC_RECV_MESSAGE: grpc_call_recv_message(elem, op->data.message); @@ -72,8 +61,9 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, case GRPC_RECV_FINISH: grpc_call_stream_closed(elem); break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - grpc_call_initial_metadata_complete(elem); + case GRPC_RECV_SYNTHETIC_STATUS: + grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status, + op->data.synthetic_status.message); break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); @@ -114,6 +104,6 @@ static void init_channel_elem(grpc_channel_element *elem, static void destroy_channel_elem(grpc_channel_element *elem) {} const grpc_channel_filter grpc_client_surface_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "client", }; + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client", +}; diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index b40c48381f..78170806f1 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -42,28 +42,20 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -typedef struct { - void *unused; -} call_data; +typedef struct { void *unused; } call_data; -typedef struct { - grpc_mdelem *status; - grpc_mdelem *message; -} channel_data; +typedef struct { void *unused; } channel_data; static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { - channel_data *channeld = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_START: - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status)); - grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message)); - grpc_call_stream_closed(elem); - break; case GRPC_SEND_METADATA: - grpc_mdelem_unref(op->data.metadata); + grpc_metadata_batch_destroy(&op->data.metadata); + grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN, + "Rpc sent on a lame channel."); + grpc_call_stream_closed(elem); break; default: break; @@ -94,29 +86,17 @@ static void destroy_call_elem(grpc_call_element *elem) {} static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { - channel_data *channeld = elem->channel_data; - char status[12]; - GPR_ASSERT(is_first); GPR_ASSERT(is_last); - - channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message", - "Rpc sent on a lame channel."); - gpr_ltoa(GRPC_STATUS_UNKNOWN, status); - channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status); } -static void destroy_channel_elem(grpc_channel_element *elem) { - channel_data *channeld = elem->channel_data; - - grpc_mdelem_unref(channeld->message); - grpc_mdelem_unref(channeld->status); -} +static void destroy_channel_elem(grpc_channel_element *elem) {} static const grpc_channel_filter lame_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "lame-client", }; + call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + "lame-client", +}; grpc_channel *grpc_lame_client_channel_create(void) { static const grpc_channel_filter *filters[] = {&lame_filter}; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 17cba9a505..e771929870 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -411,29 +411,32 @@ static void read_closed(grpc_call_element *elem) { gpr_mu_unlock(&chand->server->mu); } +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (md->key == chand->path_key) { + calld->path = grpc_mdstr_ref(md->value); + return NULL; + } else if (md->key == chand->authority_key) { + calld->host = grpc_mdstr_ref(md->value); + return NULL; + } + return md; +} + static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, grpc_call_op *op) { - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_mdelem *md; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_RECV_METADATA: - md = op->data.metadata; - if (md->key == chand->path_key) { - calld->path = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else if (md->key == chand->authority_key) { - calld->host = grpc_mdstr_ref(md->value); - grpc_mdelem_unref(md); - } else { - grpc_call_recv_metadata(elem, md); + grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); + if (grpc_call_recv_metadata(elem, &op->data.metadata)) { + calld->deadline = op->data.metadata.deadline; + start_new_rpc(elem); } break; - case GRPC_RECV_END_OF_INITIAL_METADATA: - start_new_rpc(elem); - grpc_call_initial_metadata_complete(elem); - break; case GRPC_RECV_MESSAGE: grpc_call_recv_message(elem, op->data.message); op->done_cb(op->user_data, GRPC_OP_OK); @@ -444,10 +447,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, case GRPC_RECV_FINISH: stream_closed(elem); break; - case GRPC_RECV_DEADLINE: - grpc_call_set_deadline(elem, op->data.deadline); - ((call_data *)elem->call_data)->deadline = op->data.deadline; - break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); grpc_call_next_op(elem, op); @@ -464,7 +463,8 @@ static void channel_op(grpc_channel_element *elem, case GRPC_ACCEPT_CALL: /* create a call */ grpc_call_create(chand->channel, NULL, - op->data.accept_call.transport_server_data); + op->data.accept_call.transport_server_data, NULL, 0, + gpr_inf_future); break; case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the |