aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c225
-rw-r--r--src/core/surface/call.h25
-rw-r--r--src/core/surface/channel.c52
-rw-r--r--src/core/surface/client.c28
-rw-r--r--src/core/surface/lame_client.c42
-rw-r--r--src/core/surface/server.c40
6 files changed, 219 insertions, 193 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index dba63058b8..2949805622 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -33,7 +33,6 @@
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
-#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
@@ -41,6 +40,7 @@
#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
@@ -68,8 +68,10 @@ typedef struct {
} completed_request;
/* See request_set in grpc_call below for a description */
-#define REQSET_EMPTY 255
-#define REQSET_DONE 254
+#define REQSET_EMPTY 'X'
+#define REQSET_DONE 'Y'
+
+#define MAX_SEND_INITIAL_METADATA_COUNT 3
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
@@ -92,6 +94,8 @@ typedef enum {
/* Status came from the application layer overriding whatever
the wire says */
STATUS_FROM_API_OVERRIDE = 0,
+ /* Status was created by some internal channel stack operation */
+ STATUS_FROM_CORE,
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
@@ -204,12 +208,18 @@ struct grpc_call {
/* Call refcount - to keep the call alive during asynchronous operations */
gpr_refcount internal_refcount;
+ grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT];
+ grpc_linked_mdelem status_link;
+ grpc_linked_mdelem details_link;
+ size_t send_initial_metadata_count;
+ gpr_timespec send_deadline;
+
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
};
-#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call)+1))
+#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
#define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
@@ -226,9 +236,13 @@ struct grpc_call {
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
static send_action choose_send_action(grpc_call *call);
static void enact_send_action(grpc_call *call, send_action sa);
+static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_mdelem **add_initial_metadata,
+ size_t add_initial_metadata_count,
+ gpr_timespec send_deadline) {
size_t i;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
@@ -245,6 +259,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
}
+ GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
+ for (i = 0; i < add_initial_metadata_count; i++) {
+ call->send_initial_metadata[i].md = add_initial_metadata[i];
+ }
+ call->send_initial_metadata_count = add_initial_metadata_count;
+ call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
/* one ref is dropped in response to destroy, the other in
@@ -252,6 +272,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_ref_init(&call->internal_refcount, 2);
grpc_call_stack_init(channel_stack, server_transport_data,
CALL_STACK_FROM_CALL(call));
+ if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
+ set_deadline_alarm(call, send_deadline);
+ }
return call;
}
@@ -284,6 +307,9 @@ static void destroy_call(void *call, int ignored_success) {
for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
gpr_free(c->buffered_metadata[i].metadata);
}
+ for (i = 0; i < c->send_initial_metadata_count; i++) {
+ grpc_mdelem_unref(c->send_initial_metadata[i].md);
+ }
if (c->legacy_state) {
destroy_legacy_state(c->legacy_state);
}
@@ -342,6 +368,7 @@ static void request_more_data(grpc_call *call) {
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
}
@@ -587,15 +614,29 @@ static send_action choose_send_action(grpc_call *call) {
return SEND_NOTHING;
}
-static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
- grpc_call_op op;
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = GRPC_WRITE_BUFFER_HINT;
- op.data.metadata = elem;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
+static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
+ grpc_metadata *metadata) {
+ size_t i;
+ grpc_mdelem_list out;
+ if (count == 0) {
+ out.head = out.tail = NULL;
+ return out;
+ }
+ for (i = 0; i < count; i++) {
+ grpc_metadata *md = &metadata[i];
+ grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
+ grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
+ grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
+ l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
+ (const gpr_uint8 *)md->value,
+ md->value_length);
+ l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
+ l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
+ }
+ out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
+ out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
+ return out;
}
static void enact_send_action(grpc_call *call, send_action sa) {
@@ -614,19 +655,21 @@ static void enact_send_action(grpc_call *call, send_action sa) {
/* fallthrough */
case SEND_INITIAL_METADATA:
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
- for (i = 0; i < data.send_metadata.count; i++) {
- const grpc_metadata *md = &data.send_metadata.metadata[i];
- send_metadata(call,
- grpc_mdelem_from_string_and_buffer(
- call->metadata_context, md->key,
- (const gpr_uint8 *)md->value, md->value_length));
- }
- op.type = GRPC_SEND_START;
+ op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
- op.data.start.pollset = grpc_cq_pollset(call->cq);
+ op.data.metadata.list = chain_metadata_from_app(
+ call, data.send_metadata.count, data.send_metadata.metadata);
+ op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
+ op.data.metadata.deadline = call->send_deadline;
+ for (i = 0; i < call->send_initial_metadata_count; i++) {
+ grpc_metadata_batch_link_head(&op.data.metadata,
+ &call->send_initial_metadata[i]);
+ }
+ call->send_initial_metadata_count = 0;
op.done_cb = finish_start_step;
op.user_data = call;
+ op.bind_pollset = grpc_cq_pollset(call->cq);
grpc_call_execute_op(call, &op);
break;
case SEND_BUFFERED_MESSAGE:
@@ -640,37 +683,42 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.data.message = data.send_message;
op.done_cb = finish_write_step;
op.user_data = call;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
- for (i = 0; i < data.send_metadata.count; i++) {
- const grpc_metadata *md = &data.send_metadata.metadata[i];
- send_metadata(call,
- grpc_mdelem_from_string_and_buffer(
- call->metadata_context, md->key,
- (const gpr_uint8 *)md->value, md->value_length));
- }
+ op.type = GRPC_SEND_METADATA;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.data.metadata.list = chain_metadata_from_app(
+ call, data.send_metadata.count, data.send_metadata.metadata);
+ op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
+ op.data.metadata.deadline = call->send_deadline;
+ op.bind_pollset = NULL;
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
gpr_ltoa(data.send_status.code, status_str);
- send_metadata(
- call,
+ grpc_metadata_batch_add_tail(
+ &op.data.metadata, &call->status_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_status.details) {
- send_metadata(
- call,
+ grpc_metadata_batch_add_tail(
+ &op.data.metadata, &call->details_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context,
data.send_status.details)));
}
+ op.done_cb = do_nothing;
+ op.user_data = NULL;
+ grpc_call_execute_op(call, &op);
/* fallthrough: see choose_send_action for details */
case SEND_FINISH:
op.type = GRPC_SEND_FINISH;
@@ -678,6 +726,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.flags = 0;
op.done_cb = finish_finish_step;
op.user_data = call;
+ op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
}
@@ -831,6 +880,7 @@ grpc_call_error grpc_call_cancel(grpc_call *c) {
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
+ op.bind_pollset = NULL;
elem = CALL_ELEM_FROM_CALL(c, 0);
elem->filter->call_op(elem, NULL, &op);
@@ -875,9 +925,7 @@ static void call_alarm(void *arg, int success) {
grpc_call_internal_unref(call, 1);
}
-void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
-
+static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
}
@@ -886,11 +934,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
-static void set_read_state(grpc_call *call, read_state state) {
- lock(call);
+static void set_read_state_locked(grpc_call *call, read_state state) {
GPR_ASSERT(call->read_state < state);
call->read_state = state;
finish_read_ops(call);
+}
+
+static void set_read_state(grpc_call *call, read_state state) {
+ lock(call);
+ set_read_state_locked(call, state);
unlock(call);
}
@@ -914,7 +966,7 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
gpr_uint32 status;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
- status = ((gpr_uint32)(gpr_intptr) user_data) - STATUS_OFFSET;
+ status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
@@ -936,52 +988,81 @@ void grpc_call_recv_message(grpc_call_element *elem,
unlock(call);
}
-void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
+void grpc_call_recv_synthetic_status(grpc_call_element *elem,
+ grpc_status_code status,
+ const char *message) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- grpc_mdstr *key = md->key;
+ lock(call);
+ set_status_code(call, STATUS_FROM_CORE, status);
+ set_status_details(call, STATUS_FROM_CORE,
+ grpc_mdstr_from_string(call->metadata_context, message));
+ unlock(call);
+}
+
+int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) {
+ grpc_call *call = CALL_FROM_TOP_ELEM(elem);
+ grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
+ int is_trailing;
+ grpc_mdctx *mdctx = call->metadata_context;
lock(call);
- if (key == grpc_channel_get_status_string(call->channel)) {
- set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
- grpc_mdelem_unref(md);
- } else if (key == grpc_channel_get_message_string(call->channel)) {
- set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
- grpc_mdelem_unref(md);
- } else {
- dest = &call->buffered_metadata[call->read_state >=
- READ_STATE_GOT_INITIAL_METADATA];
- if (dest->count == dest->capacity) {
- dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
- dest->metadata =
- gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
- }
- mdusr = &dest->metadata[dest->count++];
- mdusr->key = grpc_mdstr_as_c_string(md->key);
- mdusr->value = grpc_mdstr_as_c_string(md->value);
- mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
- if (call->owned_metadata_count == call->owned_metadata_capacity) {
- call->owned_metadata_capacity = GPR_MAX(
- call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
- call->owned_metadata =
- gpr_realloc(call->owned_metadata,
- sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
+ is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
+ for (l = md->list.head; l != NULL; l = l->next) {
+ grpc_mdelem *md = l->md;
+ grpc_mdstr *key = md->key;
+ if (key == grpc_channel_get_status_string(call->channel)) {
+ set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
+ } else if (key == grpc_channel_get_message_string(call->channel)) {
+ set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
+ } else {
+ dest = &call->buffered_metadata[is_trailing];
+ if (dest->count == dest->capacity) {
+ dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
+ dest->metadata =
+ gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
+ }
+ mdusr = &dest->metadata[dest->count++];
+ mdusr->key = grpc_mdstr_as_c_string(md->key);
+ mdusr->value = grpc_mdstr_as_c_string(md->value);
+ mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
+ if (call->owned_metadata_count == call->owned_metadata_capacity) {
+ call->owned_metadata_capacity =
+ GPR_MAX(call->owned_metadata_capacity + 8,
+ call->owned_metadata_capacity * 2);
+ call->owned_metadata =
+ gpr_realloc(call->owned_metadata,
+ sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
+ }
+ call->owned_metadata[call->owned_metadata_count++] = md;
+ l->md = 0;
}
- call->owned_metadata[call->owned_metadata_count++] = md;
+ }
+ if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
+ set_deadline_alarm(call, md->deadline);
+ }
+ if (!is_trailing) {
+ set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
}
unlock(call);
+
+ grpc_mdctx_lock(mdctx);
+ for (l = md->list.head; l; l = l->next) {
+ if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ for (l = md->garbage.head; l; l = l->next) {
+ grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
+ }
+ grpc_mdctx_unlock(mdctx);
+
+ return !is_trailing;
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
-void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
- grpc_call *call = grpc_call_from_top_element(surface_element);
- set_read_state(call, READ_STATE_GOT_INITIAL_METADATA);
-}
-
/*
* BATCH API IMPLEMENTATION
*/
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 06434f87ac..f8d0915349 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -35,7 +35,6 @@
#define GRPC_INTERNAL_CORE_SURFACE_CALL_H
#include "src/core/channel/channel_stack.h"
-#include "src/core/channel/metadata_buffer.h"
#include <grpc/grpc.h>
/* Primitive operation types - grpc_op's get rewritten into these */
@@ -67,7 +66,7 @@ typedef union {
} recv_status_details;
struct {
size_t count;
- const grpc_metadata *metadata;
+ grpc_metadata *metadata;
} send_metadata;
grpc_byte_buffer *send_message;
struct {
@@ -86,7 +85,10 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
- const void *server_transport_data);
+ const void *server_transport_data,
+ grpc_mdelem **add_initial_metadata,
+ size_t add_initial_metadata_count,
+ gpr_timespec send_deadline);
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
@@ -96,8 +98,9 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
/* Helpers for grpc_client, grpc_server filters to publish received data to
the completion queue/surface layer */
-void grpc_call_recv_metadata(grpc_call_element *surface_element,
- grpc_mdelem *md);
+/* receive metadata - returns 1 if this was initial metadata */
+int grpc_call_recv_metadata(grpc_call_element *surface_element,
+ grpc_metadata_batch *md);
void grpc_call_recv_message(grpc_call_element *surface_element,
grpc_byte_buffer *message);
void grpc_call_read_closed(grpc_call_element *surface_element);
@@ -108,14 +111,12 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
-/* Called when it's known that the initial batch of metadata is complete */
-void grpc_call_initial_metadata_complete(grpc_call_element *surface_element);
-
-void grpc_call_set_deadline(grpc_call_element *surface_element,
- gpr_timespec deadline);
-
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
+void grpc_call_recv_synthetic_status(grpc_call_element *elem,
+ grpc_status_code status,
+ const char *message);
+
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
@@ -128,4 +129,4 @@ void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
#define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \
if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag)
-#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
+#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index d3962a00c4..29b042e7c1 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -62,7 +62,7 @@ struct grpc_channel {
registered_call *registered_calls;
};
-#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
+#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \
(((grpc_channel *)(channel_stack)) - 1)
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
@@ -91,44 +91,25 @@ grpc_channel *grpc_channel_create_from_filters(
return channel;
}
-static void do_nothing(void *ignored, grpc_op_error error) {}
-
static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
- grpc_call *call;
- grpc_call_op op;
+ grpc_mdelem *send_metadata[2];
- if (!channel->is_client) {
- gpr_log(GPR_ERROR, "Cannot create a call on the server.");
- return NULL;
- }
+ GPR_ASSERT(channel->is_client);
- call = grpc_call_create(channel, cq, NULL);
+ send_metadata[0] = path_mdelem;
+ send_metadata[1] = authority_mdelem;
- /* Add :path and :authority headers. */
- op.type = GRPC_SEND_METADATA;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.data.metadata = path_mdelem;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
-
- op.data.metadata = authority_mdelem;
- grpc_call_execute_op(call, &op);
-
- if (0 != gpr_time_cmp(deadline, gpr_inf_future)) {
- op.type = GRPC_SEND_DEADLINE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.data.deadline = deadline;
- op.done_cb = do_nothing;
- op.user_data = NULL;
- grpc_call_execute_op(call, &op);
- }
+ return grpc_call_create(channel, cq, NULL, send_metadata,
+ GPR_ARRAY_SIZE(send_metadata), deadline);
+}
- return call;
+grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
+ const char *method, const char *host,
+ gpr_timespec absolute_deadline) {
+ return grpc_channel_create_call(channel, NULL, method, host,
+ absolute_deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
@@ -146,13 +127,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
deadline);
}
-grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
- const char *method, const char *host,
- gpr_timespec absolute_deadline) {
- return grpc_channel_create_call(channel, NULL, method, host,
- absolute_deadline);
-}
-
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host) {
registered_call *rc = gpr_malloc(sizeof(registered_call));
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index 4d54865d16..2f898ff7d7 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -39,28 +39,17 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct {
- void *unused;
-} call_data;
+typedef struct { void *unused; } call_data;
-typedef struct {
- void *unused;
-} channel_data;
+typedef struct { void *unused; } channel_data;
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
- case GRPC_SEND_DEADLINE:
- grpc_call_set_deadline(elem, op->data.deadline);
- grpc_call_next_op(elem, op);
- break;
case GRPC_RECV_METADATA:
- grpc_call_recv_metadata(elem, op->data.metadata);
- break;
- case GRPC_RECV_DEADLINE:
- gpr_log(GPR_ERROR, "Deadline received by client (ignored)");
+ grpc_call_recv_metadata(elem, &op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message);
@@ -72,8 +61,9 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
case GRPC_RECV_FINISH:
grpc_call_stream_closed(elem);
break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- grpc_call_initial_metadata_complete(elem);
+ case GRPC_RECV_SYNTHETIC_STATUS:
+ grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status,
+ op->data.synthetic_status.message);
break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
@@ -114,6 +104,6 @@ static void init_channel_elem(grpc_channel_element *elem,
static void destroy_channel_elem(grpc_channel_element *elem) {}
const grpc_channel_filter grpc_client_surface_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "client", };
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client",
+};
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index b40c48381f..78170806f1 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -42,28 +42,20 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-typedef struct {
- void *unused;
-} call_data;
+typedef struct { void *unused; } call_data;
-typedef struct {
- grpc_mdelem *status;
- grpc_mdelem *message;
-} channel_data;
+typedef struct { void *unused; } channel_data;
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
- channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
- case GRPC_SEND_START:
- grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status));
- grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message));
- grpc_call_stream_closed(elem);
- break;
case GRPC_SEND_METADATA:
- grpc_mdelem_unref(op->data.metadata);
+ grpc_metadata_batch_destroy(&op->data.metadata);
+ grpc_call_recv_synthetic_status(elem, GRPC_STATUS_UNKNOWN,
+ "Rpc sent on a lame channel.");
+ grpc_call_stream_closed(elem);
break;
default:
break;
@@ -94,29 +86,17 @@ static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
- channel_data *channeld = elem->channel_data;
- char status[12];
-
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
-
- channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message",
- "Rpc sent on a lame channel.");
- gpr_ltoa(GRPC_STATUS_UNKNOWN, status);
- channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status);
}
-static void destroy_channel_elem(grpc_channel_element *elem) {
- channel_data *channeld = elem->channel_data;
-
- grpc_mdelem_unref(channeld->message);
- grpc_mdelem_unref(channeld->status);
-}
+static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter lame_filter = {
- call_op, channel_op, sizeof(call_data),
- init_call_elem, destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem, "lame-client", };
+ call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
+ sizeof(channel_data), init_channel_elem, destroy_channel_elem,
+ "lame-client",
+};
grpc_channel *grpc_lame_client_channel_create(void) {
static const grpc_channel_filter *filters[] = {&lame_filter};
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 17cba9a505..e771929870 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -411,29 +411,32 @@ static void read_closed(grpc_call_element *elem) {
gpr_mu_unlock(&chand->server->mu);
}
+static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (md->key == chand->path_key) {
+ calld->path = grpc_mdstr_ref(md->value);
+ return NULL;
+ } else if (md->key == chand->authority_key) {
+ calld->host = grpc_mdstr_ref(md->value);
+ return NULL;
+ }
+ return md;
+}
+
static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
grpc_call_op *op) {
- channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_mdelem *md;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_RECV_METADATA:
- md = op->data.metadata;
- if (md->key == chand->path_key) {
- calld->path = grpc_mdstr_ref(md->value);
- grpc_mdelem_unref(md);
- } else if (md->key == chand->authority_key) {
- calld->host = grpc_mdstr_ref(md->value);
- grpc_mdelem_unref(md);
- } else {
- grpc_call_recv_metadata(elem, md);
+ grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
+ if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ calld->deadline = op->data.metadata.deadline;
+ start_new_rpc(elem);
}
break;
- case GRPC_RECV_END_OF_INITIAL_METADATA:
- start_new_rpc(elem);
- grpc_call_initial_metadata_complete(elem);
- break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message);
op->done_cb(op->user_data, GRPC_OP_OK);
@@ -444,10 +447,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
case GRPC_RECV_FINISH:
stream_closed(elem);
break;
- case GRPC_RECV_DEADLINE:
- grpc_call_set_deadline(elem, op->data.deadline);
- ((call_data *)elem->call_data)->deadline = op->data.deadline;
- break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
grpc_call_next_op(elem, op);
@@ -464,7 +463,8 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_ACCEPT_CALL:
/* create a call */
grpc_call_create(chand->channel, NULL,
- op->data.accept_call.transport_server_data);
+ op->data.accept_call.transport_server_data, NULL, 0,
+ gpr_inf_future);
break;
case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the