diff options
author | Craig Tiller <ctiller@google.com> | 2015-02-03 12:07:07 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-02-03 12:07:07 -0800 |
commit | fb189f826e2e5edc9128155ef0847c447a47c6ce (patch) | |
tree | 91324ea580d473ca31670d024fccb2429d40462d /src/core/surface | |
parent | 034929cc31fe46d4943913939a4de678edf3507a (diff) |
Flesh out batch API
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 201 | ||||
-rw-r--r-- | src/core/surface/call.h | 25 | ||||
-rw-r--r-- | src/core/surface/channel.c | 16 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 10 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 6 | ||||
-rw-r--r-- | src/core/surface/event_string.c | 6 | ||||
-rw-r--r-- | src/core/surface/server.c | 16 |
7 files changed, 205 insertions, 75 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 382909c865..1aecd3400a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -173,7 +173,7 @@ static void do_nothing(void *ignored, grpc_op_error also_ignored) {} static send_action choose_send_action(grpc_call *call); static void enact_send_action(grpc_call *call, send_action sa); -grpc_call *grpc_call_create(grpc_channel *channel, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data) { size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); @@ -182,6 +182,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); call->channel = channel; + call->cq = cq; call->is_client = server_transport_data == NULL; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { call->request_set[i] = REQSET_EMPTY; @@ -321,37 +322,49 @@ static void unlock(grpc_call *call) { } } -static void get_final_status(grpc_call *call, grpc_recv_status_args args) { +static void get_final_status(grpc_call *call, grpc_ioreq_data out) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + out.recv_status.set_value(call->status[i].code, + out.recv_status.user_data); + return; + } + } + out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data); +} + +static void get_final_details(grpc_call *call, grpc_ioreq_data out) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].is_set) { - *args.code = call->status[i].code; - if (!args.details) return; if (call->status[i].details) { gpr_slice details = call->status[i].details->slice; size_t len = GPR_SLICE_LENGTH(details); - if (len + 1 > *args.details_capacity) { - *args.details_capacity = - GPR_MAX(len + 1, *args.details_capacity * 3 / 2); - *args.details = gpr_realloc(*args.details, *args.details_capacity); + if (len + 1 > *out.recv_status_details.details_capacity) { + *out.recv_status_details.details_capacity = GPR_MAX( + len + 1, *out.recv_status_details.details_capacity * 3 / 2); + *out.recv_status_details.details = + gpr_realloc(*out.recv_status_details.details, + *out.recv_status_details.details_capacity); } - memcpy(*args.details, GPR_SLICE_START_PTR(details), len); - (*args.details)[len] = 0; + memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details), + len); + (*out.recv_status_details.details)[len] = 0; } else { goto no_details; } return; } } - *args.code = GRPC_STATUS_UNKNOWN; - if (!args.details) return; no_details: - if (0 == *args.details_capacity) { - *args.details_capacity = 8; - *args.details = gpr_malloc(*args.details_capacity); + if (0 == *out.recv_status_details.details_capacity) { + *out.recv_status_details.details_capacity = 8; + *out.recv_status_details.details = + gpr_malloc(*out.recv_status_details.details_capacity); } - **args.details = 0; + **out.recv_status_details.details = 0; } static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, @@ -389,8 +402,11 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, case GRPC_IOREQ_SEND_CLOSE: break; case GRPC_IOREQ_RECV_STATUS: - get_final_status( - call, call->request_data[GRPC_IOREQ_RECV_STATUS].recv_status); + get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]); + break; + case GRPC_IOREQ_RECV_STATUS_DETAILS: + get_final_details(call, + call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]); break; case GRPC_IOREQ_RECV_INITIAL_METADATA: SWAP(grpc_metadata_array, call->buffered_metadata[0], @@ -598,6 +614,7 @@ static void finish_read_ops(grpc_call *call) { finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); } finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); /* fallthrough */ case READ_STATE_GOT_INITIAL_METADATA: @@ -675,20 +692,6 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, return GRPC_CALL_OK; } -static void call_start_ioreq_done(grpc_call *call, grpc_op_error status, - void *user_data) { - grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status); -} - -grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, void *tag) { - grpc_call_error err; - lock(call); - err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag); - unlock(call); - return err; -} - grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) { @@ -873,6 +876,121 @@ void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) { } /* + * BATCH API IMPLEMENTATION + */ + +static void set_status_value_directly(grpc_status_code status, void *dest) { + *(grpc_status_code *)dest = status; +} + +static void set_cancelled_value(grpc_status_code status, void *dest) { + *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); +} + +static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {} + +grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, + size_t nops, void *tag) { + grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; + size_t in; + size_t out; + const grpc_op *op; + grpc_ioreq *req; + + /* rewrite batch ops into ioreq ops */ + for (in = 0, out = 0; in < nops; in++) { + op = &ops[in]; + switch (op->op) { + case GRPC_OP_SEND_INITIAL_METADATA: + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; + req->data.send_metadata.count = op->data.send_initial_metadata.count; + req->data.send_metadata.metadata = + op->data.send_initial_metadata.metadata; + break; + case GRPC_OP_SEND_MESSAGE: + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_MESSAGE; + req->data.send_message = op->data.send_message; + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_CLOSE; + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + if (call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_CLIENT; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; + req->data.send_metadata.count = + op->data.send_status_from_server.trailing_metadata_count; + req->data.send_metadata.metadata = + op->data.send_status_from_server.trailing_metadata; + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_STATUS; + req->data.send_status.code = op->data.send_status_from_server.status; + req->data.send_status.details = + op->data.send_status_from_server.status_details; + req = &reqs[out++]; + req->op = GRPC_IOREQ_SEND_CLOSE; + break; + case GRPC_OP_RECV_INITIAL_METADATA: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; + req->data.recv_metadata = op->data.recv_initial_metadata; + break; + case GRPC_OP_RECV_MESSAGE: + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_MESSAGE; + req->data.recv_message = op->data.recv_message; + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + if (!call->is_client) { + return GRPC_CALL_ERROR_NOT_ON_SERVER; + } + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS; + req->data.recv_status.set_value = set_status_value_directly; + req->data.recv_status.user_data = op->data.recv_status_on_client.status; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS_DETAILS; + req->data.recv_status_details.details = + op->data.recv_status_on_client.status_details; + req->data.recv_status_details.details_capacity = + op->data.recv_status_on_client.status_details_capacity; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; + req->data.recv_metadata = + op->data.recv_status_on_client.trailing_metadata; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_CLOSE; + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_STATUS; + req->data.recv_status.set_value = set_cancelled_value; + req->data.recv_status.user_data = + op->data.recv_close_on_server.cancelled; + req = &reqs[out++]; + req->op = GRPC_IOREQ_RECV_CLOSE; + break; + } + } + + grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); + + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + tag); +} + +/* * LEGACY API IMPLEMENTATION * All this code will disappear as soon as wrappings are updated */ @@ -985,7 +1103,7 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status, grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, void *metadata_read_tag, void *finished_tag, gpr_uint32 flags) { - grpc_ioreq reqs[3]; + grpc_ioreq reqs[4]; legacy_state *ls; grpc_call_error err; @@ -1014,11 +1132,13 @@ grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; reqs[0].data.recv_metadata = &ls->trailing_md_in; reqs[1].op = GRPC_IOREQ_RECV_STATUS; - reqs[1].data.recv_status.details = &ls->details; - reqs[1].data.recv_status.details_capacity = &ls->details_capacity; - reqs[1].data.recv_status.code = &ls->status; - reqs[2].op = GRPC_IOREQ_RECV_CLOSE; - err = start_ioreq(call, reqs, 3, finish_status, NULL); + reqs[1].data.recv_status.user_data = &ls->status; + reqs[1].data.recv_status.set_value = set_status_value_directly; + reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS; + reqs[2].data.recv_status_details.details = &ls->details; + reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity; + reqs[3].op = GRPC_IOREQ_RECV_CLOSE; + err = start_ioreq(call, reqs, 4, finish_status, NULL); if (err != GRPC_CALL_OK) goto done; done: @@ -1046,9 +1166,8 @@ grpc_call_error grpc_call_server_accept_old(grpc_call *call, ls->finished_tag = finished_tag; reqs[0].op = GRPC_IOREQ_RECV_STATUS; - reqs[0].data.recv_status.details = NULL; - reqs[0].data.recv_status.details_capacity = 0; - reqs[0].data.recv_status.code = &ls->status; + reqs[0].data.recv_status.user_data = &ls->status; + reqs[0].data.recv_status.set_value = set_status_value_directly; reqs[1].op = GRPC_IOREQ_RECV_CLOSE; err = start_ioreq(call, reqs, 2, finish_status, NULL); unlock(call); diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 936fb29f2e..723f132015 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -44,6 +44,7 @@ typedef enum { GRPC_IOREQ_RECV_MESSAGE, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_IOREQ_RECV_STATUS, + GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_IOREQ_RECV_CLOSE, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_IOREQ_SEND_MESSAGE, @@ -53,24 +54,25 @@ typedef enum { GRPC_IOREQ_OP_COUNT } grpc_ioreq_op; -typedef struct { - grpc_status_code *code; - char **details; - size_t *details_capacity; -} grpc_recv_status_args; - typedef union { grpc_metadata_array *recv_metadata; grpc_byte_buffer **recv_message; - grpc_recv_status_args recv_status; + struct { + void (*set_value)(grpc_status_code status, void *user_data); + void *user_data; + } recv_status; + struct { + char **details; + size_t *details_capacity; + } recv_status_details; struct { size_t count; - grpc_metadata *metadata; + const grpc_metadata *metadata; } send_metadata; grpc_byte_buffer *send_message; struct { grpc_status_code code; - char *details; + const char *details; } send_status; } grpc_ioreq_data; @@ -83,7 +85,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call, grpc_op_error status, void *user_data); -grpc_call *grpc_call_create(grpc_channel *channel, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data); void grpc_call_internal_ref(grpc_call *call); @@ -104,8 +106,7 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_ioreq_completion_func on_complete, void *user_data); /* Called when it's known that the initial batch of metadata is complete */ -void grpc_call_initial_metadata_complete( - grpc_call_element *surface_element); +void grpc_call_initial_metadata_complete(grpc_call_element *surface_element); void grpc_call_set_deadline(grpc_call_element *surface_element, gpr_timespec deadline); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index c33ea923e8..6d47787b7c 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -74,9 +74,10 @@ grpc_channel *grpc_channel_create_from_filters( static void do_nothing(void *ignored, grpc_op_error error) {} -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec absolute_deadline) { +grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_completion_queue *cq, + const char *method, const char *host, + gpr_timespec absolute_deadline) { grpc_call *call; grpc_mdelem *path_mdelem; grpc_mdelem *authority_mdelem; @@ -87,7 +88,7 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, return NULL; } - call = grpc_call_create(channel, NULL); + call = grpc_call_create(channel, cq, NULL); /* Add :path and :authority headers. */ /* TODO(klempner): Consider optimizing this by stashing mdelems for common @@ -123,6 +124,13 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, return call; } +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, + const char *method, const char *host, + gpr_timespec absolute_deadline) { + return grpc_channel_create_call(channel, NULL, method, host, + absolute_deadline); +} + void grpc_channel_internal_ref(grpc_channel *channel) { gpr_ref(&channel->refs); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index ae3b96035c..b87117bf72 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -185,14 +185,14 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } -void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_op_error error) { +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_op_error error) { event *ev; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_IOREQ, tag, call, on_finish, user_data); + ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); ev->base.data.write_accepted = error; - end_op_locked(cc, GRPC_IOREQ); + end_op_locked(cc, GRPC_OP_COMPLETE); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index fea8336b63..cbf1c4c7f3 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -97,9 +97,9 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements); -void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_op_error error); +void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, + grpc_event_finish_func on_finish, void *user_data, + grpc_op_error error); void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag); diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 7c76bf93d7..ab9435351e 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -87,10 +87,10 @@ char *grpc_event_string(grpc_event *ev) { gpr_strvec_add(&buf, gpr_strdup(" end-of-stream")); } break; - case GRPC_IOREQ: - gpr_strvec_add(&buf, gpr_strdup("IOREQ: ")); + case GRPC_OP_COMPLETE: + gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: ")); addhdr(&buf, ev); - adderr(&buf, ev->data.ioreq); + adderr(&buf, ev->data.op_complete); break; case GRPC_WRITE_ACCEPTED: gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: ")); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index a057694f13..4cba41d1f8 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -121,7 +121,9 @@ typedef enum { ZOMBIED } call_state; -typedef struct legacy_data { grpc_metadata_array *initial_metadata; } legacy_data; +typedef struct legacy_data { + grpc_metadata_array *initial_metadata; +} legacy_data; struct call_data { grpc_call *call; @@ -343,7 +345,7 @@ static void channel_op(grpc_channel_element *elem, switch (op->type) { case GRPC_ACCEPT_CALL: /* create a call */ - grpc_call_create(chand->channel, + grpc_call_create(chand->channel, NULL, op->data.accept_call.transport_server_data); break; case GRPC_TRANSPORT_CLOSED: @@ -709,11 +711,11 @@ static void begin_request(grpc_server *server, grpc_completion_queue *cq, abort(); } -grpc_call_error grpc_server_request_call( - grpc_server *server, grpc_call_details *details, - grpc_metadata_array *initial_metadata, grpc_completion_queue *cq, - void *tag) { - grpc_cq_begin_op(cq, NULL, GRPC_IOREQ); +grpc_call_error grpc_server_request_call(grpc_server *server, + grpc_call_details *details, + grpc_metadata_array *initial_metadata, + grpc_completion_queue *cq, void *tag) { + grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); return queue_call_request(server, cq, initial_metadata, begin_request, tag); } |