diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/byte_buffer.c | 1 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_queue.c | 7 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_queue.h | 1 | ||||
-rw-r--r-- | src/core/surface/call.c | 231 | ||||
-rw-r--r-- | src/core/surface/call.h | 27 | ||||
-rw-r--r-- | src/core/surface/call_details.c | 13 | ||||
-rw-r--r-- | src/core/surface/channel.c | 16 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 21 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 10 | ||||
-rw-r--r-- | src/core/surface/event_string.c | 6 | ||||
-rw-r--r-- | src/core/surface/metadata_array.c | 12 | ||||
-rw-r--r-- | src/core/surface/server.c | 98 |
12 files changed, 352 insertions, 91 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c index d1be41074d..09e2aa5b87 100644 --- a/src/core/surface/byte_buffer.c +++ b/src/core/surface/byte_buffer.c @@ -61,6 +61,7 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { } void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { + if (!bb) return; switch (bb->type) { case GRPC_BB_SLICE_BUFFER: gpr_slice_buffer_destroy(&bb->data.slice_buffer); diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index 9709a665ba..aab7fd2ffe 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -65,6 +65,13 @@ void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) { bba_push(&q->filling, buffer); } +void grpc_bbq_flush(grpc_byte_buffer_queue *q) { + grpc_byte_buffer *bb; + while ((bb = grpc_bbq_pop(q))) { + grpc_byte_buffer_destroy(bb); + } +} + grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) { grpc_bbq_array temp_array; diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h index 358a42d5af..3420dc5cab 100644 --- a/src/core/surface/byte_buffer_queue.h +++ b/src/core/surface/byte_buffer_queue.h @@ -53,6 +53,7 @@ typedef struct { void grpc_bbq_destroy(grpc_byte_buffer_queue *q); grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); +void grpc_bbq_flush(grpc_byte_buffer_queue *q); int grpc_bbq_empty(grpc_byte_buffer_queue *q); void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index c68ce5a6a8..b3f272e068 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -223,7 +223,7 @@ static void do_nothing(void *ignored, grpc_op_error also_ignored) {} static send_action choose_send_action(grpc_call *call); static void enact_send_action(grpc_call *call, send_action sa); -grpc_call *grpc_call_create(grpc_channel *channel, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data) { size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); @@ -232,6 +232,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); call->channel = channel; + call->cq = cq; call->is_client = server_transport_data == NULL; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { call->request_set[i] = REQSET_EMPTY; @@ -250,6 +251,11 @@ grpc_call *grpc_call_create(grpc_channel *channel, return call; } +void grpc_call_set_completion_queue(grpc_call *call, + grpc_completion_queue *cq) { + call->cq = cq; +} + void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } static void destroy_call(void *call, int ignored_success) { @@ -289,8 +295,21 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) { static void set_status_code(grpc_call *call, status_source source, gpr_uint32 status) { + int flush; + call->status[source].is_set = 1; call->status[source].code = status; + + if (call->is_client) { + flush = status == GRPC_STATUS_CANCELLED; + } else { + flush = status != GRPC_STATUS_OK; + } + + if (flush && !grpc_bbq_empty(&call->incoming_queue)) { + gpr_log(GPR_ERROR, "Flushing unread messages due to error status %d", status); + grpc_bbq_flush(&call->incoming_queue); + } } static void set_status_details(grpc_call *call, status_source source, @@ -374,37 +393,49 @@ static void unlock(grpc_call *call) { } } -static void get_final_status(grpc_call *call, grpc_recv_status_args args) { +static void get_final_status(grpc_call *call, grpc_ioreq_data out) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + out.recv_status.set_value(call->status[i].code, + out.recv_status.user_data); + return; + } + } + out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data); +} + +static void get_final_details(grpc_call *call, grpc_ioreq_data out) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].is_set) { - *args.code = call->status[i].code; - if (!args.details) return; if (call->status[i].details) { gpr_slice details = call->status[i].details->slice; size_t len = GPR_SLICE_LENGTH(details); - if (len + 1 > *args.details_capacity) { - *args.details_capacity = - GPR_MAX(len + 1, *args.details_capacity * 3 / 2); - *args.details = gpr_realloc(*args.details, *args.details_capacity); + if (len + 1 > *out.recv_status_details.details_capacity) { + *out.recv_status_details.details_capacity = GPR_MAX( + len + 1, *out.recv_status_details.details_capacity * 3 / 2); + *out.recv_status_details.details = + gpr_realloc(*out.recv_status_details.details, + *out.recv_status_details.details_capacity); } - memcpy(*args.details, GPR_SLICE_START_PTR(details), len); - (*args.details)[len] = 0; + memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details), + len); + (*out.recv_status_details.details)[len] = 0; } else { goto no_details; } return; } } - *args.code = GRPC_STATUS_UNKNOWN; - if (!args.details) return; no_details: - if (0 == *args.details_capacity) { - *args.details_capacity = 8; - *args.details = gpr_malloc(*args.details_capacity); + if (0 == *out.recv_status_details.details_capacity) { + *out.recv_status_details.details_capacity = 8; + *out.recv_status_details.details = + gpr_malloc(*out.recv_status_details.details_capacity); } - **args.details = 0; + **out.recv_status_details.details = 0; } static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, @@ -442,8 +473,11 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, case GRPC_IOREQ_SEND_CLOSE: break; case GRPC_IOREQ_RECV_STATUS: - get_final_status( - call, call->request_data[GRPC_IOREQ_RECV_STATUS].recv_status); + get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]); + break; + case GRPC_IOREQ_RECV_STATUS_DETAILS: + get_final_details(call, + call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]); break; case GRPC_IOREQ_RECV_INITIAL_METADATA: SWAP(grpc_metadata_array, call->buffered_metadata[0], @@ -650,6 +684,7 @@ static void finish_read_ops(grpc_call *call) { finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); } finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); /* fallthrough */ case READ_STATE_GOT_INITIAL_METADATA: @@ -727,20 +762,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, return GRPC_CALL_OK; } -static void call_start_ioreq_done(grpc_call *call, grpc_op_error status, - void *user_data) { - grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status); -} - -grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, void *tag) { - grpc_call_error err; - lock(call); - err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag); - unlock(call); - return err; -} - grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) { @@ -900,8 +921,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); } mdusr = &dest->metadata[dest->count++]; - mdusr->key = (char *)grpc_mdstr_as_c_string(md->key); - mdusr->value = (char *)grpc_mdstr_as_c_string(md->value); + mdusr->key = grpc_mdstr_as_c_string(md->key); + mdusr->value = grpc_mdstr_as_c_string(md->value); mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); if (call->owned_metadata_count == call->owned_metadata_capacity) { call->owned_metadata_capacity = GPR_MAX( @@ -925,6 +946,123 @@ void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { } /* + * BATCH API IMPLEMENTATION + */ + +static void set_status_value_directly(grpc_status_code status, void *dest) { + *(grpc_status_code *)dest = status; +} + +static void set_cancelled_value(grpc_status_code status, void *dest) { + *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); +} + +static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) { + grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); +} + +grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, + size_t nops, void *tag) { + grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; + size_t in; + size_t out; + const grpc_op *op; + grpc_ioreq *req; + + /* rewrite batch ops into ioreq ops */ + for (in = 0, out = 0; in < nops; in++) { + op = &ops[in]; + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; + req->data.send_metadata.count = op->data.send_initial_metadata.count; + req->data.send_metadata.metadata = + op->data.send_initial_metadata.metadata; + break; + case GRPC_OP_SEND_MESSAGE: + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_MESSAGE; + req->data.send_message = op->data.send_message; + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_CLOSE; + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + if (call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_CLIENT; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; + req->data.send_metadata.count = + op->data.send_status_from_server.trailing_metadata_count; + req->data.send_metadata.metadata = + op->data.send_status_from_server.trailing_metadata; + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_STATUS; + req->data.send_status.code = op->data.send_status_from_server.status; + req->data.send_status.details = + op->data.send_status_from_server.status_details; + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_CLOSE; + break; + case GRPC_OP_RECV_INITIAL_METADATA: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req->data.recv_metadata = op->data.recv_initial_metadata; + break; + case GRPC_OP_RECV_MESSAGE: + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_MESSAGE; + req->data.recv_message = op->data.recv_message; + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS; + req->data.recv_status.set_value = set_status_value_directly; + req->data.recv_status.user_data = op->data.recv_status_on_client.status; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS_DETAILS; + req->data.recv_status_details.details = + op->data.recv_status_on_client.status_details; + req->data.recv_status_details.details_capacity = + op->data.recv_status_on_client.status_details_capacity; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; + req->data.recv_metadata = + op->data.recv_status_on_client.trailing_metadata; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_CLOSE; + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS; + req->data.recv_status.set_value = set_cancelled_value; + req->data.recv_status.user_data = + op->data.recv_close_on_server.cancelled; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_CLOSE; + break; + } + } + + grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); + + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + tag); +} + +/* * LEGACY API IMPLEMENTATION * All this code will disappear as soon as wrappings are updated */ @@ -964,8 +1102,8 @@ static void destroy_legacy_state(legacy_state *ls) { size_t i, j; for (i = 0; i < 2; i++) { for (j = 0; j < ls->md_out_count[i]; j++) { - gpr_free(ls->md_out[i][j].key); - gpr_free(ls->md_out[i][j].value); + gpr_free((char *)ls->md_out[i][j].key); + gpr_free((char *)ls->md_out[i][j].value); } gpr_free(ls->md_out[i]); } @@ -998,7 +1136,7 @@ grpc_call_error grpc_call_add_metadata_old(grpc_call *call, mdout->key = gpr_strdup(metadata->key); mdout->value = gpr_malloc(metadata->value_length); mdout->value_length = metadata->value_length; - memcpy(mdout->value, metadata->value, metadata->value_length); + memcpy((char *)mdout->value, metadata->value, metadata->value_length); unlock(call); @@ -1041,7 +1179,7 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status, grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, void *metadata_read_tag, void *finished_tag, gpr_uint32 flags) { - grpc_ioreq reqs[3]; + grpc_ioreq reqs[4]; legacy_state *ls; grpc_call_error err; @@ -1070,11 +1208,13 @@ grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; reqs[0].data.recv_metadata = &ls->trailing_md_in; reqs[1].op = GRPC_IOREQ_RECV_STATUS; - reqs[1].data.recv_status.details = &ls->details; - reqs[1].data.recv_status.details_capacity = &ls->details_capacity; - reqs[1].data.recv_status.code = &ls->status; - reqs[2].op = GRPC_IOREQ_RECV_CLOSE; - err = start_ioreq(call, reqs, 3, finish_status, NULL); + reqs[1].data.recv_status.user_data = &ls->status; + reqs[1].data.recv_status.set_value = set_status_value_directly; + reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS; + reqs[2].data.recv_status_details.details = &ls->details; + reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity; + reqs[3].op = GRPC_IOREQ_RECV_CLOSE; + err = start_ioreq(call, reqs, 4, finish_status, NULL); if (err != GRPC_CALL_OK) goto done; done: @@ -1102,9 +1242,8 @@ grpc_call_error grpc_call_server_accept_old(grpc_call *call, ls->finished_tag = finished_tag; reqs[0].op = GRPC_IOREQ_RECV_STATUS; - reqs[0].data.recv_status.details = NULL; - reqs[0].data.recv_status.details_capacity = 0; - reqs[0].data.recv_status.code = &ls->status; + reqs[0].data.recv_status.user_data = &ls->status; + reqs[0].data.recv_status.set_value = set_status_value_directly; reqs[1].op = GRPC_IOREQ_RECV_CLOSE; err = start_ioreq(call, reqs, 2, finish_status, NULL); unlock(call); diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 936fb29f2e..05014c631c 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -44,6 +44,7 @@ typedef enum { GRPC_IOREQ_RECV_MESSAGE, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_IOREQ_RECV_STATUS, + GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_IOREQ_RECV_CLOSE, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_IOREQ_SEND_MESSAGE, @@ -53,24 +54,25 @@ typedef enum { GRPC_IOREQ_OP_COUNT } grpc_ioreq_op; -typedef struct { - grpc_status_code *code; - char **details; - size_t *details_capacity; -} grpc_recv_status_args; - typedef union { grpc_metadata_array *recv_metadata; grpc_byte_buffer **recv_message; - grpc_recv_status_args recv_status; + struct { + void (*set_value)(grpc_status_code status, void *user_data); + void *user_data; + } recv_status; + struct { + char **details; + size_t *details_capacity; + } recv_status_details; struct { size_t count; - grpc_metadata *metadata; + const grpc_metadata *metadata; } send_metadata; grpc_byte_buffer *send_message; struct { grpc_status_code code; - char *details; + const char *details; } send_status; } grpc_ioreq_data; @@ -83,9 +85,11 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call, grpc_op_error status, void *user_data); -grpc_call *grpc_call_create(grpc_channel *channel, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data); +void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq); + void grpc_call_internal_ref(grpc_call *call); void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion); @@ -104,8 +108,7 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_ioreq_completion_func on_complete, void *user_data); /* Called when it's known that the initial batch of metadata is complete */ -void grpc_call_initial_metadata_complete( - grpc_call_element *surface_element); +void grpc_call_initial_metadata_complete(grpc_call_element *surface_element); void grpc_call_set_deadline(grpc_call_element *surface_element, gpr_timespec deadline); diff --git a/src/core/surface/call_details.c b/src/core/surface/call_details.c new file mode 100644 index 0000000000..51c05da640 --- /dev/null +++ b/src/core/surface/call_details.c @@ -0,0 +1,13 @@ +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +#include <string.h> + +void grpc_call_details_init(grpc_call_details *cd) { + memset(cd, 0, sizeof(*cd)); +} + +void grpc_call_details_destroy(grpc_call_details *cd) { + gpr_free(cd->method); + gpr_free(cd->host); +} diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index b33bd7b357..514073ce0b 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -77,9 +77,10 @@ grpc_channel *grpc_channel_create_from_filters( static void do_nothing(void *ignored, grpc_op_error error) {} -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec absolute_deadline) { +grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_completion_queue *cq, + const char *method, const char *host, + gpr_timespec absolute_deadline) { grpc_call *call; grpc_mdelem *path_mdelem; grpc_mdelem *authority_mdelem; @@ -90,7 +91,7 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, return NULL; } - call = grpc_call_create(channel, NULL); + call = grpc_call_create(channel, cq, NULL); /* Add :path and :authority headers. */ /* TODO(klempner): Consider optimizing this by stashing mdelems for common @@ -126,6 +127,13 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, return call; } +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, + const char *method, const char *host, + gpr_timespec absolute_deadline) { + return grpc_channel_create_call(channel, NULL, method, host, + absolute_deadline); +} + void grpc_channel_internal_ref(grpc_channel *channel) { gpr_ref(&channel->refs); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index ae3b96035c..8b94aa920a 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -185,14 +185,25 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } -void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_op_error error) { +void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, + grpc_call *call, grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error) { event *ev; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_IOREQ, tag, call, on_finish, user_data); + ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); ev->base.data.write_accepted = error; - end_op_locked(cc, GRPC_IOREQ); + end_op_locked(cc, GRPC_OP_COMPLETE); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +} + +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_op_error error) { + event *ev; + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); + ev->base.data.write_accepted = error; + end_op_locked(cc, GRPC_OP_COMPLETE); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index fea8336b63..205cb76cee 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -78,6 +78,10 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error); +/* Queue a GRPC_OP_COMPLETED operation */ +void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, + grpc_call *call, grpc_event_finish_func on_finish, + void *user_data, grpc_op_error error); /* Queue a GRPC_CLIENT_METADATA_READ operation */ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -97,9 +101,9 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements); -void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_op_error error); +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_op_error error); void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag); diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 7c76bf93d7..ab9435351e 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -87,10 +87,10 @@ char *grpc_event_string(grpc_event *ev) { gpr_strvec_add(&buf, gpr_strdup(" end-of-stream")); } break; - case GRPC_IOREQ: - gpr_strvec_add(&buf, gpr_strdup("IOREQ: ")); + case GRPC_OP_COMPLETE: + gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: ")); addhdr(&buf, ev); - adderr(&buf, ev->data.ioreq); + adderr(&buf, ev->data.op_complete); break; case GRPC_WRITE_ACCEPTED: gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: ")); diff --git a/src/core/surface/metadata_array.c b/src/core/surface/metadata_array.c new file mode 100644 index 0000000000..7a230037d5 --- /dev/null +++ b/src/core/surface/metadata_array.c @@ -0,0 +1,12 @@ +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +#include <string.h> + +void grpc_metadata_array_init(grpc_metadata_array *array) { + memset(array, 0, sizeof(*array)); +} + +void grpc_metadata_array_destroy(grpc_metadata_array *array) { + gpr_free(array->metadata); +} diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 455bd4337f..ee0f96a580 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -72,12 +72,15 @@ struct channel_data { }; typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, call_data *calld, void *user_data); typedef struct { void *user_data; grpc_completion_queue *cq; + grpc_call **call; + grpc_call_details *details; grpc_metadata_array *initial_metadata; new_call_cb cb; } requested_call; @@ -121,7 +124,9 @@ typedef enum { ZOMBIED } call_state; -typedef struct legacy_data { grpc_metadata_array *initial_metadata; } legacy_data; +typedef struct legacy_data { + grpc_metadata_array *initial_metadata; +} legacy_data; struct call_data { grpc_call *call; @@ -132,6 +137,7 @@ struct call_data { grpc_mdstr *host; legacy_data *legacy; + grpc_call_details *details; gpr_uint8 included[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; @@ -240,7 +246,8 @@ static void start_new_rpc(grpc_call_element *elem) { requested_call rc = server->requested_calls[--server->requested_call_count]; calld->state = ACTIVATED; gpr_mu_unlock(&server->mu); - rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data); + rc.cb(server, rc.cq, rc.call, rc.details, rc.initial_metadata, calld, + rc.user_data); } else { calld->state = PENDING; call_list_join(server, calld, PENDING_START); @@ -339,21 +346,22 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, static void channel_op(grpc_channel_element *elem, grpc_channel_element *from_elem, grpc_channel_op *op) { channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; switch (op->type) { case GRPC_ACCEPT_CALL: /* create a call */ - grpc_call_create(chand->channel, + grpc_call_create(chand->channel, NULL, op->data.accept_call.transport_server_data); break; case GRPC_TRANSPORT_CLOSED: /* if the transport is closed for a server channel, we destroy the channel */ - gpr_mu_lock(&chand->server->mu); - server_ref(chand->server); + gpr_mu_lock(&server->mu); + server_ref(server); destroy_channel(chand); - gpr_mu_unlock(&chand->server->mu); - server_unref(chand->server); + gpr_mu_unlock(&server->mu); + server_unref(server); break; case GRPC_TRANSPORT_GOAWAY: gpr_slice_unref(op->data.goaway.message); @@ -617,6 +625,7 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, /* terminate all the requested calls */ for (i = 0; i < requested_call_count; i++) { requested_calls[i].cb(server, requested_calls[i].cq, + requested_calls[i].call, requested_calls[i].details, requested_calls[i].initial_metadata, NULL, requested_calls[i].user_data); } @@ -667,6 +676,8 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, + grpc_call_details *details, grpc_metadata_array *initial_metadata, new_call_cb cb, void *user_data) { call_data *calld; @@ -674,7 +685,7 @@ static grpc_call_error queue_call_request(grpc_server *server, gpr_mu_lock(&server->mu); if (server->shutdown) { gpr_mu_unlock(&server->mu); - cb(server, cq, initial_metadata, NULL, user_data); + cb(server, cq, call, details, initial_metadata, NULL, user_data); return GRPC_CALL_OK; } calld = call_list_remove_head(server, PENDING_START); @@ -682,7 +693,7 @@ static grpc_call_error queue_call_request(grpc_server *server, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&server->mu); - cb(server, cq, initial_metadata, calld, user_data); + cb(server, cq, call, details, initial_metadata, calld, user_data); return GRPC_CALL_OK; } else { if (server->requested_call_count == server->requested_call_capacity) { @@ -696,6 +707,8 @@ static grpc_call_error queue_call_request(grpc_server *server, rc = &server->requested_calls[server->requested_call_count++]; rc->cb = cb; rc->cq = cq; + rc->call = call; + rc->details = details; rc->user_data = user_data; rc->initial_metadata = initial_metadata; gpr_mu_unlock(&server->mu); @@ -703,18 +716,64 @@ static grpc_call_error queue_call_request(grpc_server *server, } } +static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { + gpr_slice slice = value->slice; + size_t len = GPR_SLICE_LENGTH(slice); + + if (len + 1 > *capacity) { + *capacity = GPR_MAX(len + 1, *capacity * 2); + *dest = gpr_realloc(*dest, *capacity); + } + memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1); +} + +static void publish_request(grpc_call *call, grpc_op_error status, void *tag) { + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; + + if (status == GRPC_OP_OK) { + cpstr(&calld->details->host, &calld->details->host_capacity, calld->host); + cpstr(&calld->details->method, &calld->details->method_capacity, + calld->path); + calld->details->deadline = calld->deadline; + grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, + GRPC_OP_OK); + } else { + abort(); + } +} + static void begin_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, - call_data *call_data, void *tag) { - abort(); + call_data *calld, void *tag) { + grpc_ioreq req; + if (!calld) { + *call = NULL; + initial_metadata->count = 0; + grpc_cq_end_op_complete(cq, tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); + return; + } + calld->details = details; + grpc_call_set_completion_queue(calld->call, cq); + *call = calld->call; + req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req.data.recv_metadata = initial_metadata; + grpc_call_internal_ref(calld->call); + grpc_call_start_ioreq_and_call_back(calld->call, &req, 1, publish_request, + tag); } -grpc_call_error grpc_server_request_call( - grpc_server *server, grpc_call_details *details, - grpc_metadata_array *initial_metadata, grpc_completion_queue *cq, - void *tag) { - grpc_cq_begin_op(cq, NULL, GRPC_IOREQ); - return queue_call_request(server, cq, initial_metadata, begin_request, tag); +grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, + grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq, void *tag) { + grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); + return queue_call_request(server, cq, call, details, initial_metadata, + begin_request, tag); } static void publish_legacy_request(grpc_call *call, grpc_op_error status, @@ -737,9 +796,12 @@ static void publish_legacy_request(grpc_call *call, grpc_op_error status, } static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq, + grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, call_data *calld, void *tag) { grpc_ioreq req; + GPR_ASSERT(call == NULL); + GPR_ASSERT(details == NULL); if (!calld) { gpr_free(initial_metadata); grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL, @@ -762,7 +824,7 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, gpr_malloc(sizeof(grpc_metadata_array)); memset(client_metadata, 0, sizeof(*client_metadata)); grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); - return queue_call_request(server, server->cq, client_metadata, + return queue_call_request(server, server->cq, NULL, NULL, client_metadata, begin_legacy_request, tag_new); } |