aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer.c1
-rw-r--r--src/core/surface/byte_buffer_queue.c7
-rw-r--r--src/core/surface/byte_buffer_queue.h1
-rw-r--r--src/core/surface/call.c231
-rw-r--r--src/core/surface/call.h27
-rw-r--r--src/core/surface/call_details.c13
-rw-r--r--src/core/surface/channel.c16
-rw-r--r--src/core/surface/completion_queue.c21
-rw-r--r--src/core/surface/completion_queue.h10
-rw-r--r--src/core/surface/event_string.c6
-rw-r--r--src/core/surface/metadata_array.c12
-rw-r--r--src/core/surface/server.c98
12 files changed, 352 insertions, 91 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index d1be41074d..09e2aa5b87 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -61,6 +61,7 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
}
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
+ if (!bb) return;
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
gpr_slice_buffer_destroy(&bb->data.slice_buffer);
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index 9709a665ba..aab7fd2ffe 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -65,6 +65,13 @@ void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
bba_push(&q->filling, buffer);
}
+void grpc_bbq_flush(grpc_byte_buffer_queue *q) {
+ grpc_byte_buffer *bb;
+ while ((bb = grpc_bbq_pop(q))) {
+ grpc_byte_buffer_destroy(bb);
+ }
+}
+
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
grpc_bbq_array temp_array;
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index 358a42d5af..3420dc5cab 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -53,6 +53,7 @@ typedef struct {
void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
+void grpc_bbq_flush(grpc_byte_buffer_queue *q);
int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index c68ce5a6a8..b3f272e068 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -223,7 +223,7 @@ static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
static send_action choose_send_action(grpc_call *call);
static void enact_send_action(grpc_call *call, send_action sa);
-grpc_call *grpc_call_create(grpc_channel *channel,
+grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data) {
size_t i;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
@@ -232,6 +232,7 @@ grpc_call *grpc_call_create(grpc_channel *channel,
memset(call, 0, sizeof(grpc_call));
gpr_mu_init(&call->mu);
call->channel = channel;
+ call->cq = cq;
call->is_client = server_transport_data == NULL;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
call->request_set[i] = REQSET_EMPTY;
@@ -250,6 +251,11 @@ grpc_call *grpc_call_create(grpc_channel *channel,
return call;
}
+void grpc_call_set_completion_queue(grpc_call *call,
+ grpc_completion_queue *cq) {
+ call->cq = cq;
+}
+
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
static void destroy_call(void *call, int ignored_success) {
@@ -289,8 +295,21 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
+ int flush;
+
call->status[source].is_set = 1;
call->status[source].code = status;
+
+ if (call->is_client) {
+ flush = status == GRPC_STATUS_CANCELLED;
+ } else {
+ flush = status != GRPC_STATUS_OK;
+ }
+
+ if (flush && !grpc_bbq_empty(&call->incoming_queue)) {
+ gpr_log(GPR_ERROR, "Flushing unread messages due to error status %d", status);
+ grpc_bbq_flush(&call->incoming_queue);
+ }
}
static void set_status_details(grpc_call *call, status_source source,
@@ -374,37 +393,49 @@ static void unlock(grpc_call *call) {
}
}
-static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
+static void get_final_status(grpc_call *call, grpc_ioreq_data out) {
+ int i;
+ for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
+ if (call->status[i].is_set) {
+ out.recv_status.set_value(call->status[i].code,
+ out.recv_status.user_data);
+ return;
+ }
+ }
+ out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data);
+}
+
+static void get_final_details(grpc_call *call, grpc_ioreq_data out) {
int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set) {
- *args.code = call->status[i].code;
- if (!args.details) return;
if (call->status[i].details) {
gpr_slice details = call->status[i].details->slice;
size_t len = GPR_SLICE_LENGTH(details);
- if (len + 1 > *args.details_capacity) {
- *args.details_capacity =
- GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
- *args.details = gpr_realloc(*args.details, *args.details_capacity);
+ if (len + 1 > *out.recv_status_details.details_capacity) {
+ *out.recv_status_details.details_capacity = GPR_MAX(
+ len + 1, *out.recv_status_details.details_capacity * 3 / 2);
+ *out.recv_status_details.details =
+ gpr_realloc(*out.recv_status_details.details,
+ *out.recv_status_details.details_capacity);
}
- memcpy(*args.details, GPR_SLICE_START_PTR(details), len);
- (*args.details)[len] = 0;
+ memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details),
+ len);
+ (*out.recv_status_details.details)[len] = 0;
} else {
goto no_details;
}
return;
}
}
- *args.code = GRPC_STATUS_UNKNOWN;
- if (!args.details) return;
no_details:
- if (0 == *args.details_capacity) {
- *args.details_capacity = 8;
- *args.details = gpr_malloc(*args.details_capacity);
+ if (0 == *out.recv_status_details.details_capacity) {
+ *out.recv_status_details.details_capacity = 8;
+ *out.recv_status_details.details =
+ gpr_malloc(*out.recv_status_details.details_capacity);
}
- **args.details = 0;
+ **out.recv_status_details.details = 0;
}
static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
@@ -442,8 +473,11 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
case GRPC_IOREQ_SEND_CLOSE:
break;
case GRPC_IOREQ_RECV_STATUS:
- get_final_status(
- call, call->request_data[GRPC_IOREQ_RECV_STATUS].recv_status);
+ get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]);
+ break;
+ case GRPC_IOREQ_RECV_STATUS_DETAILS:
+ get_final_details(call,
+ call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]);
break;
case GRPC_IOREQ_RECV_INITIAL_METADATA:
SWAP(grpc_metadata_array, call->buffered_metadata[0],
@@ -650,6 +684,7 @@ static void finish_read_ops(grpc_call *call) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
}
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_OK);
finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
/* fallthrough */
case READ_STATE_GOT_INITIAL_METADATA:
@@ -727,20 +762,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
return GRPC_CALL_OK;
}
-static void call_start_ioreq_done(grpc_call *call, grpc_op_error status,
- void *user_data) {
- grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status);
-}
-
-grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
- size_t nreqs, void *tag) {
- grpc_call_error err;
- lock(call);
- err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag);
- unlock(call);
- return err;
-}
-
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data) {
@@ -900,8 +921,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
}
mdusr = &dest->metadata[dest->count++];
- mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
- mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
+ mdusr->key = grpc_mdstr_as_c_string(md->key);
+ mdusr->value = grpc_mdstr_as_c_string(md->value);
mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
if (call->owned_metadata_count == call->owned_metadata_capacity) {
call->owned_metadata_capacity = GPR_MAX(
@@ -925,6 +946,123 @@ void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
}
/*
+ * BATCH API IMPLEMENTATION
+ */
+
+static void set_status_value_directly(grpc_status_code status, void *dest) {
+ *(grpc_status_code *)dest = status;
+}
+
+static void set_cancelled_value(grpc_status_code status, void *dest) {
+ *(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
+}
+
+static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {
+ grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
+}
+
+grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
+ size_t nops, void *tag) {
+ grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
+ size_t in;
+ size_t out;
+ const grpc_op *op;
+ grpc_ioreq *req;
+
+ /* rewrite batch ops into ioreq ops */
+ for (in = 0, out = 0; in < nops; in++) {
+ op = &ops[in];
+ switch (op->op) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
+ req->data.send_metadata.count = op->data.send_initial_metadata.count;
+ req->data.send_metadata.metadata =
+ op->data.send_initial_metadata.metadata;
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_SEND_MESSAGE;
+ req->data.send_message = op->data.send_message;
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ if (!call->is_client) {
+ return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ }
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_SEND_CLOSE;
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ if (call->is_client) {
+ return GRPC_CALL_ERROR_NOT_ON_CLIENT;
+ }
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
+ req->data.send_metadata.count =
+ op->data.send_status_from_server.trailing_metadata_count;
+ req->data.send_metadata.metadata =
+ op->data.send_status_from_server.trailing_metadata;
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_SEND_STATUS;
+ req->data.send_status.code = op->data.send_status_from_server.status;
+ req->data.send_status.details =
+ op->data.send_status_from_server.status_details;
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_SEND_CLOSE;
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ if (!call->is_client) {
+ return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ }
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+ req->data.recv_metadata = op->data.recv_initial_metadata;
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_RECV_MESSAGE;
+ req->data.recv_message = op->data.recv_message;
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ if (!call->is_client) {
+ return GRPC_CALL_ERROR_NOT_ON_SERVER;
+ }
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_RECV_STATUS;
+ req->data.recv_status.set_value = set_status_value_directly;
+ req->data.recv_status.user_data = op->data.recv_status_on_client.status;
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
+ req->data.recv_status_details.details =
+ op->data.recv_status_on_client.status_details;
+ req->data.recv_status_details.details_capacity =
+ op->data.recv_status_on_client.status_details_capacity;
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
+ req->data.recv_metadata =
+ op->data.recv_status_on_client.trailing_metadata;
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_RECV_CLOSE;
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_RECV_STATUS;
+ req->data.recv_status.set_value = set_cancelled_value;
+ req->data.recv_status.user_data =
+ op->data.recv_close_on_server.cancelled;
+ req = &reqs[out++];
+ req->op = GRPC_IOREQ_RECV_CLOSE;
+ break;
+ }
+ }
+
+ grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
+
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
+ tag);
+}
+
+/*
* LEGACY API IMPLEMENTATION
* All this code will disappear as soon as wrappings are updated
*/
@@ -964,8 +1102,8 @@ static void destroy_legacy_state(legacy_state *ls) {
size_t i, j;
for (i = 0; i < 2; i++) {
for (j = 0; j < ls->md_out_count[i]; j++) {
- gpr_free(ls->md_out[i][j].key);
- gpr_free(ls->md_out[i][j].value);
+ gpr_free((char *)ls->md_out[i][j].key);
+ gpr_free((char *)ls->md_out[i][j].value);
}
gpr_free(ls->md_out[i]);
}
@@ -998,7 +1136,7 @@ grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
mdout->key = gpr_strdup(metadata->key);
mdout->value = gpr_malloc(metadata->value_length);
mdout->value_length = metadata->value_length;
- memcpy(mdout->value, metadata->value, metadata->value_length);
+ memcpy((char *)mdout->value, metadata->value, metadata->value_length);
unlock(call);
@@ -1041,7 +1179,7 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status,
grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag,
void *finished_tag, gpr_uint32 flags) {
- grpc_ioreq reqs[3];
+ grpc_ioreq reqs[4];
legacy_state *ls;
grpc_call_error err;
@@ -1070,11 +1208,13 @@ grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
reqs[0].data.recv_metadata = &ls->trailing_md_in;
reqs[1].op = GRPC_IOREQ_RECV_STATUS;
- reqs[1].data.recv_status.details = &ls->details;
- reqs[1].data.recv_status.details_capacity = &ls->details_capacity;
- reqs[1].data.recv_status.code = &ls->status;
- reqs[2].op = GRPC_IOREQ_RECV_CLOSE;
- err = start_ioreq(call, reqs, 3, finish_status, NULL);
+ reqs[1].data.recv_status.user_data = &ls->status;
+ reqs[1].data.recv_status.set_value = set_status_value_directly;
+ reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS;
+ reqs[2].data.recv_status_details.details = &ls->details;
+ reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity;
+ reqs[3].op = GRPC_IOREQ_RECV_CLOSE;
+ err = start_ioreq(call, reqs, 4, finish_status, NULL);
if (err != GRPC_CALL_OK) goto done;
done:
@@ -1102,9 +1242,8 @@ grpc_call_error grpc_call_server_accept_old(grpc_call *call,
ls->finished_tag = finished_tag;
reqs[0].op = GRPC_IOREQ_RECV_STATUS;
- reqs[0].data.recv_status.details = NULL;
- reqs[0].data.recv_status.details_capacity = 0;
- reqs[0].data.recv_status.code = &ls->status;
+ reqs[0].data.recv_status.user_data = &ls->status;
+ reqs[0].data.recv_status.set_value = set_status_value_directly;
reqs[1].op = GRPC_IOREQ_RECV_CLOSE;
err = start_ioreq(call, reqs, 2, finish_status, NULL);
unlock(call);
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 936fb29f2e..05014c631c 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -44,6 +44,7 @@ typedef enum {
GRPC_IOREQ_RECV_MESSAGE,
GRPC_IOREQ_RECV_TRAILING_METADATA,
GRPC_IOREQ_RECV_STATUS,
+ GRPC_IOREQ_RECV_STATUS_DETAILS,
GRPC_IOREQ_RECV_CLOSE,
GRPC_IOREQ_SEND_INITIAL_METADATA,
GRPC_IOREQ_SEND_MESSAGE,
@@ -53,24 +54,25 @@ typedef enum {
GRPC_IOREQ_OP_COUNT
} grpc_ioreq_op;
-typedef struct {
- grpc_status_code *code;
- char **details;
- size_t *details_capacity;
-} grpc_recv_status_args;
-
typedef union {
grpc_metadata_array *recv_metadata;
grpc_byte_buffer **recv_message;
- grpc_recv_status_args recv_status;
+ struct {
+ void (*set_value)(grpc_status_code status, void *user_data);
+ void *user_data;
+ } recv_status;
+ struct {
+ char **details;
+ size_t *details_capacity;
+ } recv_status_details;
struct {
size_t count;
- grpc_metadata *metadata;
+ const grpc_metadata *metadata;
} send_metadata;
grpc_byte_buffer *send_message;
struct {
grpc_status_code code;
- char *details;
+ const char *details;
} send_status;
} grpc_ioreq_data;
@@ -83,9 +85,11 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
grpc_op_error status,
void *user_data);
-grpc_call *grpc_call_create(grpc_channel *channel,
+grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data);
+void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
+
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
@@ -104,8 +108,7 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_ioreq_completion_func on_complete, void *user_data);
/* Called when it's known that the initial batch of metadata is complete */
-void grpc_call_initial_metadata_complete(
- grpc_call_element *surface_element);
+void grpc_call_initial_metadata_complete(grpc_call_element *surface_element);
void grpc_call_set_deadline(grpc_call_element *surface_element,
gpr_timespec deadline);
diff --git a/src/core/surface/call_details.c b/src/core/surface/call_details.c
new file mode 100644
index 0000000000..51c05da640
--- /dev/null
+++ b/src/core/surface/call_details.c
@@ -0,0 +1,13 @@
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
+#include <string.h>
+
+void grpc_call_details_init(grpc_call_details *cd) {
+ memset(cd, 0, sizeof(*cd));
+}
+
+void grpc_call_details_destroy(grpc_call_details *cd) {
+ gpr_free(cd->method);
+ gpr_free(cd->host);
+}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index b33bd7b357..514073ce0b 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -77,9 +77,10 @@ grpc_channel *grpc_channel_create_from_filters(
static void do_nothing(void *ignored, grpc_op_error error) {}
-grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
- const char *method, const char *host,
- gpr_timespec absolute_deadline) {
+grpc_call *grpc_channel_create_call(grpc_channel *channel,
+ grpc_completion_queue *cq,
+ const char *method, const char *host,
+ gpr_timespec absolute_deadline) {
grpc_call *call;
grpc_mdelem *path_mdelem;
grpc_mdelem *authority_mdelem;
@@ -90,7 +91,7 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
return NULL;
}
- call = grpc_call_create(channel, NULL);
+ call = grpc_call_create(channel, cq, NULL);
/* Add :path and :authority headers. */
/* TODO(klempner): Consider optimizing this by stashing mdelems for common
@@ -126,6 +127,13 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
return call;
}
+grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
+ const char *method, const char *host,
+ gpr_timespec absolute_deadline) {
+ return grpc_channel_create_call(channel, NULL, method, host,
+ absolute_deadline);
+}
+
void grpc_channel_internal_ref(grpc_channel *channel) {
gpr_ref(&channel->refs);
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index ae3b96035c..8b94aa920a 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -185,14 +185,25 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
-void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- grpc_op_error error) {
+void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag,
+ grpc_call *call, grpc_event_finish_func on_finish,
+ void *user_data, grpc_op_error error) {
event *ev;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_IOREQ, tag, call, on_finish, user_data);
+ ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
- end_op_locked(cc, GRPC_IOREQ);
+ end_op_locked(cc, GRPC_OP_COMPLETE);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+}
+
+void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ grpc_op_error error) {
+ event *ev;
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
+ ev->base.data.write_accepted = error;
+ end_op_locked(cc, GRPC_OP_COMPLETE);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index fea8336b63..205cb76cee 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -78,6 +78,10 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_call *call,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error);
+/* Queue a GRPC_OP_COMPLETED operation */
+void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag,
+ grpc_call *call, grpc_event_finish_func on_finish,
+ void *user_data, grpc_op_error error);
/* Queue a GRPC_CLIENT_METADATA_READ operation */
void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
grpc_call *call,
@@ -97,9 +101,9 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements);
-void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- grpc_op_error error);
+void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
+ grpc_event_finish_func on_finish, void *user_data,
+ grpc_op_error error);
void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag);
diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c
index 7c76bf93d7..ab9435351e 100644
--- a/src/core/surface/event_string.c
+++ b/src/core/surface/event_string.c
@@ -87,10 +87,10 @@ char *grpc_event_string(grpc_event *ev) {
gpr_strvec_add(&buf, gpr_strdup(" end-of-stream"));
}
break;
- case GRPC_IOREQ:
- gpr_strvec_add(&buf, gpr_strdup("IOREQ: "));
+ case GRPC_OP_COMPLETE:
+ gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: "));
addhdr(&buf, ev);
- adderr(&buf, ev->data.ioreq);
+ adderr(&buf, ev->data.op_complete);
break;
case GRPC_WRITE_ACCEPTED:
gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: "));
diff --git a/src/core/surface/metadata_array.c b/src/core/surface/metadata_array.c
new file mode 100644
index 0000000000..7a230037d5
--- /dev/null
+++ b/src/core/surface/metadata_array.c
@@ -0,0 +1,12 @@
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
+#include <string.h>
+
+void grpc_metadata_array_init(grpc_metadata_array *array) {
+ memset(array, 0, sizeof(*array));
+}
+
+void grpc_metadata_array_destroy(grpc_metadata_array *array) {
+ gpr_free(array->metadata);
+}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 455bd4337f..ee0f96a580 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -72,12 +72,15 @@ struct channel_data {
};
typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq,
+ grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata,
call_data *calld, void *user_data);
typedef struct {
void *user_data;
grpc_completion_queue *cq;
+ grpc_call **call;
+ grpc_call_details *details;
grpc_metadata_array *initial_metadata;
new_call_cb cb;
} requested_call;
@@ -121,7 +124,9 @@ typedef enum {
ZOMBIED
} call_state;
-typedef struct legacy_data { grpc_metadata_array *initial_metadata; } legacy_data;
+typedef struct legacy_data {
+ grpc_metadata_array *initial_metadata;
+} legacy_data;
struct call_data {
grpc_call *call;
@@ -132,6 +137,7 @@ struct call_data {
grpc_mdstr *host;
legacy_data *legacy;
+ grpc_call_details *details;
gpr_uint8 included[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
@@ -240,7 +246,8 @@ static void start_new_rpc(grpc_call_element *elem) {
requested_call rc = server->requested_calls[--server->requested_call_count];
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu);
- rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data);
+ rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld,
+ rc.user_data);
} else {
calld->state = PENDING;
call_list_join(server, calld, PENDING_START);
@@ -339,21 +346,22 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
channel_data *chand = elem->channel_data;
+ grpc_server *server = chand->server;
switch (op->type) {
case GRPC_ACCEPT_CALL:
/* create a call */
- grpc_call_create(chand->channel,
+ grpc_call_create(chand->channel, NULL,
op->data.accept_call.transport_server_data);
break;
case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the
channel */
- gpr_mu_lock(&chand->server->mu);
- server_ref(chand->server);
+ gpr_mu_lock(&server->mu);
+ server_ref(server);
destroy_channel(chand);
- gpr_mu_unlock(&chand->server->mu);
- server_unref(chand->server);
+ gpr_mu_unlock(&server->mu);
+ server_unref(server);
break;
case GRPC_TRANSPORT_GOAWAY:
gpr_slice_unref(op->data.goaway.message);
@@ -617,6 +625,7 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
/* terminate all the requested calls */
for (i = 0; i < requested_call_count; i++) {
requested_calls[i].cb(server, requested_calls[i].cq,
+ requested_calls[i].call, requested_calls[i].details,
requested_calls[i].initial_metadata, NULL,
requested_calls[i].user_data);
}
@@ -667,6 +676,8 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request(grpc_server *server,
grpc_completion_queue *cq,
+ grpc_call **call,
+ grpc_call_details *details,
grpc_metadata_array *initial_metadata,
new_call_cb cb, void *user_data) {
call_data *calld;
@@ -674,7 +685,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
gpr_mu_lock(&server->mu);
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
- cb(server, cq, initial_metadata, NULL, user_data);
+ cb(server, cq, call, details, initial_metadata, NULL, user_data);
return GRPC_CALL_OK;
}
calld = call_list_remove_head(server, PENDING_START);
@@ -682,7 +693,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu);
- cb(server, cq, initial_metadata, calld, user_data);
+ cb(server, cq, call, details, initial_metadata, calld, user_data);
return GRPC_CALL_OK;
} else {
if (server->requested_call_count == server->requested_call_capacity) {
@@ -696,6 +707,8 @@ static grpc_call_error queue_call_request(grpc_server *server,
rc = &server->requested_calls[server->requested_call_count++];
rc->cb = cb;
rc->cq = cq;
+ rc->call = call;
+ rc->details = details;
rc->user_data = user_data;
rc->initial_metadata = initial_metadata;
gpr_mu_unlock(&server->mu);
@@ -703,18 +716,64 @@ static grpc_call_error queue_call_request(grpc_server *server,
}
}
+static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
+ gpr_slice slice = value->slice;
+ size_t len = GPR_SLICE_LENGTH(slice);
+
+ if (len + 1 > *capacity) {
+ *capacity = GPR_MAX(len + 1, *capacity * 2);
+ *dest = gpr_realloc(*dest, *capacity);
+ }
+ memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
+}
+
+static void publish_request(grpc_call *call, grpc_op_error status, void *tag) {
+ grpc_call_element *elem =
+ grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_server *server = chand->server;
+
+ if (status == GRPC_OP_OK) {
+ cpstr(&calld->details->host, &calld->details->host_capacity, calld->host);
+ cpstr(&calld->details->method, &calld->details->method_capacity,
+ calld->path);
+ calld->details->deadline = calld->deadline;
+ grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL,
+ GRPC_OP_OK);
+ } else {
+ abort();
+ }
+}
+
static void begin_request(grpc_server *server, grpc_completion_queue *cq,
+ grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata,
- call_data *call_data, void *tag) {
- abort();
+ call_data *calld, void *tag) {
+ grpc_ioreq req;
+ if (!calld) {
+ *call = NULL;
+ initial_metadata->count = 0;
+ grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
+ return;
+ }
+ calld->details = details;
+ grpc_call_set_completion_queue(calld->call, cq);
+ *call = calld->call;
+ req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
+ req.data.recv_metadata = initial_metadata;
+ grpc_call_internal_ref(calld->call);
+ grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, publish_request,
+ tag);
}
-grpc_call_error grpc_server_request_call(
- grpc_server *server, grpc_call_details *details,
- grpc_metadata_array *initial_metadata, grpc_completion_queue *cq,
- void *tag) {
- grpc_cq_begin_op(cq, NULL, GRPC_IOREQ);
- return queue_call_request(server, cq, initial_metadata, begin_request, tag);
+grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
+ grpc_call_details *details,
+ grpc_metadata_array *initial_metadata,
+ grpc_completion_queue *cq, void *tag) {
+ grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE);
+ return queue_call_request(server, cq, call, details, initial_metadata,
+ begin_request, tag);
}
static void publish_legacy_request(grpc_call *call, grpc_op_error status,
@@ -737,9 +796,12 @@ static void publish_legacy_request(grpc_call *call, grpc_op_error status,
}
static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
+ grpc_call **call, grpc_call_details *details,
grpc_metadata_array *initial_metadata,
call_data *calld, void *tag) {
grpc_ioreq req;
+ GPR_ASSERT(call == NULL);
+ GPR_ASSERT(details == NULL);
if (!calld) {
gpr_free(initial_metadata);
grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL,
@@ -762,7 +824,7 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server,
gpr_malloc(sizeof(grpc_metadata_array));
memset(client_metadata, 0, sizeof(*client_metadata));
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
- return queue_call_request(server, server->cq, client_metadata,
+ return queue_call_request(server, server->cq, NULL, NULL, client_metadata,
begin_legacy_request, tag_new);
}