diff options
32 files changed, 441 insertions, 250 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 6a818fcd47..0ebfad8824 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -240,12 +240,6 @@ typedef struct { } grpc_metadata_array; typedef struct { - size_t count; - size_t capacity; - grpc_byte_buffer **buffers; -} grpc_byte_buffer_array; - -typedef struct { grpc_status_code status; const char *details; } grpc_recv_status; @@ -257,40 +251,43 @@ typedef struct { } grpc_call_details; typedef enum { - GRPC_IOREQ_SEND_INITIAL_METADATA = 0, - GRPC_IOREQ_SEND_TRAILING_METADATA, - GRPC_IOREQ_SEND_MESSAGES, - GRPC_IOREQ_SEND_CLOSE, - GRPC_IOREQ_RECV_INITIAL_METADATA, - GRPC_IOREQ_RECV_TRAILING_METADATA, - GRPC_IOREQ_RECV_MESSAGES, - GRPC_IOREQ_RECV_STATUS, - GRPC_IOREQ_OP_COUNT -} grpc_ioreq_op; - -typedef union { - struct { - size_t count; - const grpc_metadata *metadata; - } send_metadata; - struct { - size_t count; - grpc_byte_buffer **messages; - } send_messages; - struct { - /* fields only make sense on the server */ - grpc_status_code status; - const char *details; - } send_close; - grpc_metadata_array *recv_metadata; - grpc_byte_buffer_array *recv_messages; - grpc_recv_status *recv_status; -} grpc_ioreq_data; - -typedef struct grpc_ioreq { - grpc_ioreq_op op; - grpc_ioreq_data data; -} grpc_ioreq; + GRPC_OP_SEND_INITIAL_METADATA = 0, + GRPC_OP_SEND_MESSAGE, + GRPC_OP_SEND_CLOSE_FROM_CLIENT, + GRPC_OP_SEND_STATUS_FROM_SERVER, + GRPC_OP_RECV_INITIAL_METADATA, + GRPC_OP_RECV_MESSAGES, + GRPC_OP_RECV_STATUS_ON_CLIENT, + GRPC_OP_RECV_CLOSE_ON_SERVER +} grpc_op_type; + +typedef struct grpc_op { + grpc_op_type op; + union { + struct { + size_t count; + const grpc_metadata *metadata; + } send_initial_metadata; + grpc_byte_buffer *send_message; + struct { + size_t trailing_metadata_count; + grpc_metadata *trailing_metadata; + grpc_status_code status; + const char *status_details; + } send_status_from_server; + grpc_metadata_array *recv_initial_metadata; + grpc_byte_buffer **recv_message; + struct { + grpc_metadata_array *trailing_metadata; + grpc_status_code *status; + char **status_details; + size_t *status_details_capacity; + } recv_status_on_client; + struct { + int *cancelled; + } recv_close_on_server; + } data; +} grpc_op; /* Initialize the grpc library */ void grpc_init(void); @@ -335,14 +332,12 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cq); is not sent until grpc_call_invoke is called. All completions are sent to 'completion_queue'. */ -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *method, const char *host, gpr_timespec deadline); - -grpc_call *grpc_channel_create_call(grpc_channel *channel, - grpc_completion_queue *cq, - const grpc_call_details *details); +grpc_call *grpc_channel_create_call(grpc_channel *channel, const char *method, + const char *host, gpr_timespec deadline); -grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, void *tag); +grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, + size_t nops, grpc_completion_queue *cq, + void *tag); /* Create a client channel */ grpc_channel *grpc_channel_create(const char *target, @@ -483,8 +478,8 @@ void grpc_call_destroy(grpc_call *call); grpc_call_error grpc_server_request_call_old(grpc_server *server, void *tag_new); grpc_call_error grpc_server_request_call( - grpc_server *server, grpc_completion_queue *cq, grpc_call_details *details, - grpc_metadata_array *initial_metadata, void *tag); + grpc_server *server, grpc_call_details *details, + grpc_metadata_array *initial_metadata, grpc_completion_queue *cq, void *tag); /* Create a server */ grpc_server *grpc_server_create(grpc_completion_queue *cq, diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c new file mode 100644 index 0000000000..bd5263b2f6 --- /dev/null +++ b/src/core/surface/byte_buffer_queue.c @@ -0,0 +1,112 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/byte_buffer_queue.h" + +#define INITIAL_PENDING_READ_COUNT 4 + +static void pra_init(pending_read_array *array) { + array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT); + array->count = 0; + array->capacity = INITIAL_PENDING_READ_COUNT; +} + +static void pra_destroy(pending_read_array *array, + size_t finish_starting_from) { + size_t i; + for (i = finish_starting_from; i < array->count; i++) { + array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR); + } + gpr_free(array->data); +} + +/* Append an operation to an array, expanding as needed */ +static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer, + void (*on_finish)(void *user_data, grpc_op_error error), + void *user_data) { + if (a->count == a->capacity) { + a->capacity *= 2; + a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity); + } + a->data[a->count].byte_buffer = buffer; + a->data[a->count].user_data = user_data; + a->data[a->count].on_finish = on_finish; + a->count++; +} + +static void prq_init(pending_read_queue *q) { + q->drain_pos = 0; + pra_init(&q->filling); + pra_init(&q->draining); +} + +static void prq_destroy(pending_read_queue *q) { + pra_destroy(&q->filling, 0); + pra_destroy(&q->draining, q->drain_pos); +} + +static int prq_is_empty(pending_read_queue *q) { + return (q->drain_pos == q->draining.count && q->filling.count == 0); +} + +static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer, + void (*on_finish)(void *user_data, grpc_op_error error), + void *user_data) { + pra_push(&q->filling, buffer, on_finish, user_data); +} + +/* Take the first queue element and move it to the completion queue. Do nothing + if q is empty */ +static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call, + grpc_completion_queue *cq) { + pending_read_array temp_array; + pending_read *pr; + + if (q->drain_pos == q->draining.count) { + if (q->filling.count == 0) { + return 0; + } + q->draining.count = 0; + q->drain_pos = 0; + /* swap arrays */ + temp_array = q->filling; + q->filling = q->draining; + q->draining = temp_array; + } + + pr = q->draining.data + q->drain_pos; + q->drain_pos++; + grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data, + pr->byte_buffer); + return 1; +} diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h new file mode 100644 index 0000000000..ffd2616d9d --- /dev/null +++ b/src/core/surface/byte_buffer_queue.h @@ -0,0 +1,53 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ +#define __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ + +/* TODO(ctiller): inline an element or two into this struct to avoid per-call + allocations */ +typedef struct { + grpc_byte_buffer **data; + size_t count; + size_t capacity; +} grpc_bbq_array; + +typedef struct { + size_t drain_pos; + grpc_bbq_array filling; + grpc_bbq_array draining; +} grpc_byte_buffer_queue; + +grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q); + +#endif /* __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5040aeef13..4f8ac6193a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -36,6 +36,7 @@ #include "src/core/channel/metadata_buffer.h" #include "src/core/iomgr/alarm.h" #include "src/core/support/string.h" +#include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" #include <grpc/support/alloc.h> @@ -59,7 +60,7 @@ typedef struct { grpc_metadata_array trailing_md_in; grpc_recv_status status_in; size_t msg_in_read_idx; - grpc_byte_buffer_array msg_in; + grpc_byte_buffer *msg_in; gpr_uint8 got_status; void *finished_tag; @@ -72,6 +73,7 @@ typedef enum { SEND_INITIAL_METADATA, SEND_MESSAGE, SEND_TRAILING_METADATA, + SEND_STATUS, SEND_FINISH } send_action; @@ -81,10 +83,24 @@ typedef struct { grpc_op_error status; } completed_request; +/* See reqinfo.set below for a description */ +#define REQSET_EMPTY 255 +#define REQSET_DONE 254 + +/* The state of an ioreq */ typedef struct reqinfo { - req_state state; + /* User supplied parameters */ grpc_ioreq_data data; - struct reqinfo *master; + /* In which set is this ioreq? + This value could be: + - an element of grpc_ioreq_op enumeration, in which case + it designates the master ioreq in a set of requests + - REQSET_EMPTY, in which case this reqinfo type has no application + request against it + - REQSET_DONE, in which case this reqinfo has been satisfied for + all time for this call, and no further use will be made of it */ + gpr_uint8 set; + grpc_op_error status; grpc_ioreq_completion_func on_complete; void *user_data; gpr_uint32 need_mask; @@ -122,10 +138,9 @@ struct grpc_call { reqinfo requests[GRPC_IOREQ_OP_COUNT]; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; - grpc_byte_buffer_array buffered_messages; + grpc_byte_buffer_queue incoming_queue; grpc_metadata_array buffered_initial_metadata; grpc_metadata_array buffered_trailing_metadata; - size_t write_index; grpc_mdelem **owned_metadata; size_t owned_metadata_count; size_t owned_metadata_capacity; @@ -159,6 +174,7 @@ static void enact_send_action(grpc_call *call, send_action sa); grpc_call *grpc_call_create(grpc_channel *channel, const void *server_transport_data) { + size_t i; grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_call *call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); @@ -166,8 +182,11 @@ grpc_call *grpc_call_create(grpc_channel *channel, gpr_mu_init(&call->mu); call->channel = channel; call->is_client = server_transport_data == NULL; + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + call->requests[i].set = REQSET_EMPTY; + } if (call->is_client) { - call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state = REQ_DONE; + call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE; } grpc_channel_internal_ref(channel); call->metadata_context = grpc_channel_get_metadata_context(channel); @@ -189,15 +208,6 @@ legacy_state *get_legacy_state(grpc_call *call) { void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); } -static void destroy_message_array(grpc_byte_buffer_array *array, - size_t start_idx) { - size_t i; - for (i = start_idx; i < array->count; i++) { - grpc_byte_buffer_destroy(array->buffers[i]); - } - gpr_free(array->buffers); -} - static void destroy_call(void *call, int ignored_success) { size_t i, j; grpc_call *c = call; @@ -213,7 +223,6 @@ static void destroy_call(void *call, int ignored_success) { grpc_mdelem_unref(c->owned_metadata[i]); } gpr_free(c->owned_metadata); - destroy_message_array(&c->buffered_messages, 0); gpr_free(c->buffered_initial_metadata.metadata); gpr_free(c->buffered_trailing_metadata.metadata); if (c->legacy_state) { @@ -226,8 +235,6 @@ static void destroy_call(void *call, int ignored_success) { } gpr_free(c->legacy_state->initial_md_in.metadata); gpr_free(c->legacy_state->trailing_md_in.metadata); - destroy_message_array(&c->legacy_state->msg_in, - c->legacy_state->msg_in_read_idx); gpr_free(c->legacy_state); } gpr_free(c); @@ -284,7 +291,7 @@ static void unlock(grpc_call *call) { int num_completed_requests = call->num_completed_requests; int need_more_data = call->need_more_data && - call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state == REQ_DONE; + call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set == REQSET_DONE; int i; if (need_more_data) { @@ -321,124 +328,131 @@ static void unlock(grpc_call *call) { } } -static void get_final_status(grpc_call *call, grpc_status_code *code, - const char **details) { +static void get_final_status(grpc_call *call, grpc_recv_status_args args) { int i; for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].set) { - *code = call->status[i].code; - *details = call->status[i].details - ? grpc_mdstr_as_c_string(call->status[i].details) - : NULL; + *args.code = call->status[i].code; + if (call->status[i].details) { + gpr_slice details = call->status[i].details->slice; + size_t len = GPR_SLICE_LENGTH(details); + if (len + 1 > *args.details_capacity) { + *args.details_capacity = GPR_MAX(len + 1, *args.details_capacity * 3 / 2); + *args.details = gpr_realloc(*args.details, *args.details_capacity); + } + memcpy(*args.details, GPR_SLICE_START_PTR(details), len); + (*args.details)[len] = 0; + } else { + goto no_details; + } return; } } - *code = GRPC_STATUS_UNKNOWN; - *details = NULL; + *args.code = GRPC_STATUS_UNKNOWN; + +no_details: + if (0 == *args.details_capacity) { + *args.details_capacity = 8; + *args.details = gpr_malloc(*args.details_capacity); + } + **args.details = 0; } static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error status) { - reqinfo *master = call->requests[op].master; completed_request *cr; size_t i; - switch (call->requests[op].state) { - case REQ_INITIAL: /* not started yet */ - return; - case REQ_DONE: /* already finished */ - return; - case REQ_READY: - master->complete_mask |= 1 << op; - call->requests[op].state = - (op == GRPC_IOREQ_SEND_MESSAGES || op == GRPC_IOREQ_RECV_MESSAGES) - ? REQ_INITIAL - : REQ_DONE; - if (master->complete_mask == master->need_mask || - status == GRPC_OP_ERROR) { - if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { - get_final_status( - call, - &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status, - &call->requests[GRPC_IOREQ_RECV_STATUS] - .data.recv_status->details); - } - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (call->requests[i].master == master) { - call->requests[i].master = NULL; + if (call->requests[op].set < GRPC_IOREQ_OP_COUNT) { + reqinfo *master = &call->requests[call->requests[op].set]; + /* ioreq is live: we need to do something */ + master->complete_mask |= 1 << op; + call->requests[op].set = + (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE) + ? REQSET_EMPTY + : REQSET_DONE; + if (master->complete_mask == master->need_mask || + status == GRPC_OP_ERROR) { + if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) { + get_final_status( + call, + call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status); + } + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (call->requests[i].set == op) { + if (call->requests[i].status != GRPC_OP_OK) { + status = GRPC_OP_ERROR; } + call->requests[i].set = REQSET_EMPTY; } - cr = &call->completed_requests[call->num_completed_requests++]; - cr->status = status; - cr->on_complete = master->on_complete; - cr->user_data = master->user_data; } + cr = &call->completed_requests[call->num_completed_requests++]; + cr->status = status; + cr->on_complete = master->on_complete; + cr->user_data = master->user_data; + } } } -static void finish_write_step(void *pc, grpc_op_error error) { - grpc_call *call = pc; +static void finish_send_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error error) { lock(call); - if (error == GRPC_OP_OK) { - if (call->write_index == - call->requests[GRPC_IOREQ_SEND_MESSAGES].data.send_messages.count) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_OK); - } - } else { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR); - } + finish_ioreq_op(call, op, error); call->sending = 0; unlock(call); grpc_call_internal_unref(call, 0); } +static void finish_write_step(void *pc, grpc_op_error error) { + finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, error); +} + static void finish_finish_step(void *pc, grpc_op_error error) { - grpc_call *call = pc; - lock(call); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error); - call->sending = 0; - unlock(call); - grpc_call_internal_unref(call, 0); + finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, error); } static void finish_start_step(void *pc, grpc_op_error error) { - grpc_call *call = pc; - lock(call); - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error); - call->sending = 0; - unlock(call); - grpc_call_internal_unref(call, 0); + finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, error); } static send_action choose_send_action(grpc_call *call) { - switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state) { - case REQ_INITIAL: + switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set) { + case REQSET_EMPTY: return SEND_NOTHING; - case REQ_READY: + default: return SEND_INITIAL_METADATA; - case REQ_DONE: + case REQSET_DONE: break; } - switch (call->requests[GRPC_IOREQ_SEND_MESSAGES].state) { - case REQ_INITIAL: + switch (call->requests[GRPC_IOREQ_SEND_MESSAGE].set) { + case REQSET_EMPTY: return SEND_NOTHING; - case REQ_READY: + default: return SEND_MESSAGE; - case REQ_DONE: + case REQSET_DONE: break; } - switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state) { - case REQ_INITIAL: + switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set) { + case REQSET_EMPTY: return SEND_NOTHING; - case REQ_READY: + default: finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); return SEND_TRAILING_METADATA; - case REQ_DONE: + case REQSET_DONE: break; } - switch (call->requests[GRPC_IOREQ_SEND_CLOSE].state) { + switch (call->requests[GRPC_IOREQ_SEND_STATUS].set) { + case REQSET_EMPTY: + return SEND_NOTHING; default: + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK); + return SEND_STATUS; + case REQSET_DONE: + break; + } + switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) { + case REQSET_EMPTY: + case REQSET_DONE: return SEND_NOTHING; - case REQ_READY: + default: return SEND_FINISH; } } @@ -458,6 +472,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { grpc_ioreq_data data; grpc_call_op op; size_t i; + char status_str[GPR_LTOA_MIN_BUFSIZE]; switch (sa) { case SEND_NOTHING: @@ -481,11 +496,11 @@ static void enact_send_action(grpc_call *call, send_action sa) { grpc_call_execute_op(call, &op); break; case SEND_MESSAGE: - data = call->requests[GRPC_IOREQ_SEND_MESSAGES].data; + data = call->requests[GRPC_IOREQ_SEND_MESSAGE].data; op.type = GRPC_SEND_MESSAGE; op.dir = GRPC_CALL_DOWN; op.flags = 0; - op.data.message = data.send_messages.messages[call->write_index++]; + op.data.message = data.send_message; op.done_cb = finish_write_step; op.user_data = call; grpc_call_execute_op(call, &op); @@ -504,28 +519,27 @@ static void enact_send_action(grpc_call *call, send_action sa) { unlock(call); grpc_call_internal_unref(call, 0); break; - case SEND_FINISH: - if (!call->is_client) { - /* TODO(ctiller): cache common status values */ - char status_str[GPR_LTOA_MIN_BUFSIZE]; - data = call->requests[GRPC_IOREQ_SEND_CLOSE].data; - gpr_ltoa(data.send_close.status, status_str); - send_metadata( - call, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); - if (data.send_close.details) { - send_metadata(call, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref( - grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_close.details))); - } + case SEND_STATUS: + /* TODO(ctiller): cache common status values */ + data = call->requests[GRPC_IOREQ_SEND_CLOSE].data; + gpr_ltoa(data.send_status.code, status_str); + send_metadata( + call, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, status_str))); + if (data.send_status.details) { + send_metadata(call, + grpc_mdelem_from_metadata_strings( + call->metadata_context, + grpc_mdstr_ref( + grpc_channel_get_message_string(call->channel)), + grpc_mdstr_from_string(call->metadata_context, + data.send_status.details))); } + break; + case SEND_FINISH: op.type = GRPC_SEND_FINISH; op.dir = GRPC_CALL_DOWN; op.flags = 0; @@ -542,7 +556,7 @@ static grpc_call_error start_ioreq_error(grpc_call *call, size_t i; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { if (mutated_ops & (1 << i)) { - call->requests[i].master = NULL; + call->requests[i].set = REQSET_EMPTY; } } return ret; @@ -555,35 +569,32 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t i; gpr_uint32 have_ops = 0; grpc_ioreq_op op; - reqinfo *master = NULL; reqinfo *requests = call->requests; + reqinfo *master; grpc_ioreq_data data; + gpr_uint8 set; + + if (nreqs == 0) { + return GRPC_CALL_OK; + } + + set = reqs[0].op; + master = &requests[set]; for (i = 0; i < nreqs; i++) { op = reqs[i].op; - if (requests[op].master) { + if (requests[op].set < GRPC_IOREQ_OP_COUNT) { return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); - } - switch (requests[op].state) { - case REQ_INITIAL: - break; - case REQ_READY: - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); - case REQ_DONE: - return start_ioreq_error(call, have_ops, - GRPC_CALL_ERROR_ALREADY_INVOKED); - } - if (master == NULL) { - master = &requests[op]; + } else if (requests[op].set == REQSET_DONE) { + return start_ioreq_error(call, have_ops, + GRPC_CALL_ERROR_ALREADY_INVOKED); } have_ops |= 1 << op; data = reqs[i].data; - requests[op].state = REQ_READY; requests[op].data = data; - requests[op].master = master; + requests[op].set = set; } GPR_ASSERT(master != NULL); @@ -598,12 +609,10 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, switch (op) { default: break; - case GRPC_IOREQ_RECV_MESSAGES: - data.recv_messages->count = 0; - if (call->buffered_messages.count > 0 || call->read_closed) { - SWAP(grpc_byte_buffer_array, *data.recv_messages, - call->buffered_messages); - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); + case GRPC_IOREQ_RECV_MESSAGE: + *data.recv_message = grpc_bbq_pop(&call->incoming_queue); + if (*data.recv_message) { + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); } else { call->need_more_data = 1; } @@ -612,19 +621,18 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, } break; case GRPC_IOREQ_RECV_STATUS: - if (call->stream_closed && call->buffered_messages.count == 0) { + if (call->stream_closed) { finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); } break; - case GRPC_IOREQ_SEND_MESSAGES: + case GRPC_IOREQ_SEND_MESSAGE: if (call->stream_closed) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR); } - call->write_index = 0; break; case GRPC_IOREQ_SEND_CLOSE: - if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) { - requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE; + if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) { + requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE; } break; case GRPC_IOREQ_SEND_INITIAL_METADATA: @@ -1088,8 +1096,13 @@ void grpc_call_recv_message(grpc_call_element *elem, grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_byte_buffer_array *dest; lock(call); - if (call->requests[GRPC_IOREQ_RECV_MESSAGES].master != NULL) { - dest = call->requests[GRPC_IOREQ_RECV_MESSAGES].data.recv_messages; + if (call->requests[GRPC_IOREQ_RECV_MESSAGE].master != NULL) { + if (call->requests[GRPC_IOREQ_RECV_MESSAGE].state != REQ_READY) { + call->requests[GRPC_IOREQ_RECV_MESSAGE].status = GRPC_OP_ERROR; + } else { + *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer; + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + } } else { dest = &call->buffered_messages; } diff --git a/src/core/surface/call.h b/src/core/surface/call.h index c130a13b81..45816a312e 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -38,6 +38,47 @@ #include "src/core/channel/metadata_buffer.h" #include <grpc/grpc.h> +/* Primitive operation types - grpc_op's get rewritten into these */ +typedef enum { + GRPC_IOREQ_RECV_INITIAL_METADATA, + GRPC_IOREQ_RECV_MESSAGE, + GRPC_IOREQ_RECV_TRAILING_METADATA, + GRPC_IOREQ_RECV_STATUS, + GPRC_IOREQ_RECV_CLOSE, + GRPC_IOREQ_SEND_INITIAL_METADATA, + GRPC_IOREQ_SEND_MESSAGE, + GRPC_IOREQ_SEND_TRAILING_METADATA, + GRPC_IOREQ_SEND_STATUS, + GRPC_IOREQ_SEND_CLOSE, + GRPC_IOREQ_OP_COUNT +} grpc_ioreq_op; + +typedef struct { + grpc_status_code *code; + char **details; + size_t *details_capacity; +} grpc_recv_status_args; + +typedef union { + grpc_metadata_array *recv_metadata; + grpc_byte_buffer **recv_message; + grpc_recv_status_args recv_status; + struct { + size_t count; + grpc_metadata *metadata; + } send_metadata; + grpc_byte_buffer *send_message; + struct { + grpc_status_code code; + char *details; + } send_status; +} grpc_ioreq_data; + +typedef struct { + grpc_ioreq_op op; + grpc_ioreq_data data; +} grpc_ioreq; + typedef void (*grpc_ioreq_completion_func)(grpc_call *call, grpc_op_error status, void *user_data); diff --git a/test/core/echo/client.c b/test/core/echo/client.c index 32e85a1401..7d895c3f19 100644 --- a/test/core/echo/client.c +++ b/test/core/echo/client.c @@ -78,7 +78,7 @@ int main(int argc, char **argv) { GPR_ASSERT(argc == 2); channel = grpc_channel_create(argv[1], NULL); - call = grpc_channel_create_call_old( + call = grpc_channel_create_call( channel, "/foo", "localhost", gpr_time_add(gpr_time_from_seconds(5), gpr_now())); GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) == diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 340fa039fd..14db4495f2 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -112,7 +112,7 @@ void test_connect(const char *server_host, const char *client_host, int port, } /* Send a trivial request. */ - c = grpc_channel_create_call_old(client, "/foo", "test.google.com", deadline); + c = grpc_channel_create_call(client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, client_cq, tag(2), tag(3), 0)); diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c index e4b4e4bb7d..389a6429c4 100644 --- a/test/core/end2end/no_server_test.c +++ b/test/core/end2end/no_server_test.c @@ -56,7 +56,7 @@ int main(int argc, char **argv) { /* create a call, channel to a non existant server */ chan = grpc_channel_create("nonexistant:54321", NULL); - call = grpc_channel_create_call_old(chan, "/foo", "nonexistant", deadline); + call = grpc_channel_create_call(chan, "/foo", "nonexistant", deadline); GPR_ASSERT(grpc_call_invoke(call, cq, tag(2), tag(3), 0) == GRPC_CALL_OK); /* verify that all tags get completed */ cq_expect_client_metadata_read(cqv, tag(2), NULL); diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 3a71daa9a4..8c00e86f7e 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -113,8 +113,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c index b258dd4251..416b494668 100644 --- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c +++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c @@ -113,8 +113,7 @@ static void test_cancel_after_accept_and_writes_closed( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c index 12de361383..5dfb3f756a 100644 --- a/test/core/end2end/tests/cancel_after_invoke.c +++ b/test/core/end2end/tests/cancel_after_invoke.c @@ -111,8 +111,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config, gpr_timespec deadline = five_seconds_time(); cq_verifier *v_client = cq_verifier_create(f.client_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c index 7c706029a1..ac816484fd 100644 --- a/test/core/end2end/tests/cancel_before_invoke.c +++ b/test/core/end2end/tests/cancel_before_invoke.c @@ -109,8 +109,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config) { gpr_timespec deadline = five_seconds_time(); cq_verifier *v_client = cq_verifier_create(f.client_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == grpc_call_cancel(c)); diff --git a/test/core/end2end/tests/cancel_in_a_vacuum.c b/test/core/end2end/tests/cancel_in_a_vacuum.c index 6b5194fb07..5257ece297 100644 --- a/test/core/end2end/tests/cancel_in_a_vacuum.c +++ b/test/core/end2end/tests/cancel_in_a_vacuum.c @@ -109,8 +109,7 @@ static void test_cancel_in_a_vacuum(grpc_end2end_test_config config, gpr_timespec deadline = five_seconds_time(); cq_verifier *v_client = cq_verifier_create(f.client_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c)); diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c index 1b442e9e4c..cb37bf956d 100644 --- a/test/core/end2end/tests/census_simple_request.c +++ b/test/core/end2end/tests/census_simple_request.c @@ -106,8 +106,7 @@ static void test_body(grpc_end2end_test_fixture f) { cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); tag(1); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c index eafda6132b..50de3347b6 100644 --- a/test/core/end2end/tests/disappearing_server.c +++ b/test/core/end2end/tests/disappearing_server.c @@ -97,8 +97,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, grpc_call *s; gpr_timespec deadline = five_seconds_time(); - c = grpc_channel_create_call_old(f->client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c index 2eb56517f7..2f14c1b565 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c @@ -111,8 +111,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index 7ebd8e38b0..b918648cb1 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -110,8 +110,7 @@ static void test_early_server_shutdown_finishes_inflight_calls( cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index ac7071a04b..3ce661893c 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -122,8 +122,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index fc180fd962..a18c0453c6 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -109,8 +109,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -182,11 +181,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) { /* start two requests - ensuring that the second is not accepted until the first completes */ deadline = five_seconds_time(); - c1 = grpc_channel_create_call_old(f.client, "/alpha", "test.google.com", - deadline); + c1 = + grpc_channel_create_call(f.client, "/alpha", "test.google.com", deadline); GPR_ASSERT(c1); - c2 = grpc_channel_create_call_old(f.client, "/beta", "test.google.com", - deadline); + c2 = grpc_channel_create_call(f.client, "/beta", "test.google.com", deadline); GPR_ASSERT(c1); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c index e8adb82c19..adaab4ecd5 100644 --- a/test/core/end2end/tests/ping_pong_streaming.c +++ b/test/core/end2end/tests/ping_pong_streaming.c @@ -118,8 +118,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config, cq_verifier *v_server = cq_verifier_create(f.server_cq); gpr_log(GPR_INFO, "testing with %d message pairs.", messages); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c index 49720a7bea..c3cf2d7123 100644 --- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c @@ -137,8 +137,7 @@ static void test_request_response_with_metadata_and_payload( gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); /* add multiple metadata */ diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c index 14e791ae8e..fb7b155367 100644 --- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c @@ -128,8 +128,7 @@ static void test_request_response_with_metadata_and_payload( gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); /* add multiple metadata */ diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c index ee1e52e54c..87ed30c8f8 100644 --- a/test/core/end2end/tests/request_response_with_payload.c +++ b/test/core/end2end/tests/request_response_with_payload.c @@ -121,8 +121,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c index 65f890b4b9..c1f0e499d6 100644 --- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c @@ -130,8 +130,7 @@ static void test_request_response_with_metadata_and_payload( gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); /* add multiple metadata */ diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c index 438f2ef1d1..543bb999be 100644 --- a/test/core/end2end/tests/request_with_large_metadata.c +++ b/test/core/end2end/tests/request_with_large_metadata.c @@ -121,8 +121,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { meta.value[large_size] = 0; meta.value_length = large_size; - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); /* add the metadata */ diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 31d79df504..a2b5cfa750 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -116,8 +116,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { /* byte buffer holds the slice, we can unref it already */ gpr_slice_unref(payload_slice); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100))); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index 1222d167de..7e7ab54523 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -103,8 +103,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, config.init_client(f, client_args); - c = grpc_channel_create_call_old(f->client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 64a8340d54..cbade1252a 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -110,8 +110,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == @@ -157,8 +156,7 @@ static void simple_request_body2(grpc_end2end_test_fixture f) { cq_verifier *v_client = cq_verifier_create(f.client_cq); cq_verifier *v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/end2end/tests/thread_stress.c b/test/core/end2end/tests/thread_stress.c index 7e235be4f4..9d40ed16a8 100644 --- a/test/core/end2end/tests/thread_stress.c +++ b/test/core/end2end/tests/thread_stress.c @@ -108,7 +108,7 @@ static void drain_cq(int client, grpc_completion_queue *cq) { static void start_request(void) { gpr_slice slice = gpr_slice_malloc(100); grpc_byte_buffer *buf; - grpc_call *call = grpc_channel_create_call_old( + grpc_call *call = grpc_channel_create_call( g_fixture.client, "/Foo", "test.google.com", g_test_end_time); memset(GPR_SLICE_START_PTR(slice), 1, GPR_SLICE_LENGTH(slice)); diff --git a/test/core/end2end/tests/writes_done_hangs_with_pending_read.c b/test/core/end2end/tests/writes_done_hangs_with_pending_read.c index fb2fbdd092..f3ec4ae4f1 100644 --- a/test/core/end2end/tests/writes_done_hangs_with_pending_read.c +++ b/test/core/end2end/tests/writes_done_hangs_with_pending_read.c @@ -124,8 +124,7 @@ static void test_writes_done_hangs_with_pending_read( gpr_slice_unref(request_payload_slice); gpr_slice_unref(response_payload_slice); - c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com", - deadline); + c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline); GPR_ASSERT(c); GPR_ASSERT(GRPC_CALL_OK == diff --git a/test/core/fling/client.c b/test/core/fling/client.c index fa8f263941..6f197f1cd7 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -53,7 +53,7 @@ static grpc_call *call; static void init_ping_pong_request(void) {} static void step_ping_pong_request(void) { - call = grpc_channel_create_call_old(channel, "/Reflector/reflectUnary", + call = grpc_channel_create_call(channel, "/Reflector/reflectUnary", "localhost", gpr_inf_future); GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); @@ -71,7 +71,7 @@ static void step_ping_pong_request(void) { } static void init_ping_pong_stream(void) { - call = grpc_channel_create_call_old(channel, "/Reflector/reflectStream", + call = grpc_channel_create_call(channel, "/Reflector/reflectStream", "localhost", gpr_inf_future); GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) == GRPC_CALL_OK); diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index c43ac7c242..9b9f0202d6 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -51,7 +51,7 @@ int main(int argc, char **argv) { chan = grpc_lame_client_channel_create(); GPR_ASSERT(chan); - call = grpc_channel_create_call_old( + call = grpc_channel_create_call( chan, "/Foo", "anywhere", gpr_time_add(gpr_now(), gpr_time_from_seconds(100))); GPR_ASSERT(call); |