aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc/grpc.h93
-rw-r--r--src/core/surface/byte_buffer_queue.c112
-rw-r--r--src/core/surface/byte_buffer_queue.h53
-rw-r--r--src/core/surface/call.c305
-rw-r--r--src/core/surface/call.h41
-rw-r--r--test/core/echo/client.c2
-rw-r--r--test/core/end2end/dualstack_socket_test.c2
-rw-r--r--test/core/end2end/no_server_test.c2
-rw-r--r--test/core/end2end/tests/cancel_after_accept.c3
-rw-r--r--test/core/end2end/tests/cancel_after_accept_and_writes_closed.c3
-rw-r--r--test/core/end2end/tests/cancel_after_invoke.c3
-rw-r--r--test/core/end2end/tests/cancel_before_invoke.c3
-rw-r--r--test/core/end2end/tests/cancel_in_a_vacuum.c3
-rw-r--r--test/core/end2end/tests/census_simple_request.c3
-rw-r--r--test/core/end2end/tests/disappearing_server.c3
-rw-r--r--test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c3
-rw-r--r--test/core/end2end/tests/graceful_server_shutdown.c3
-rw-r--r--test/core/end2end/tests/invoke_large_request.c3
-rw-r--r--test/core/end2end/tests/max_concurrent_streams.c10
-rw-r--r--test/core/end2end/tests/ping_pong_streaming.c3
-rw-r--r--test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c3
-rw-r--r--test/core/end2end/tests/request_response_with_metadata_and_payload.c3
-rw-r--r--test/core/end2end/tests/request_response_with_payload.c3
-rw-r--r--test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c3
-rw-r--r--test/core/end2end/tests/request_with_large_metadata.c3
-rw-r--r--test/core/end2end/tests/request_with_payload.c3
-rw-r--r--test/core/end2end/tests/simple_delayed_request.c3
-rw-r--r--test/core/end2end/tests/simple_request.c6
-rw-r--r--test/core/end2end/tests/thread_stress.c2
-rw-r--r--test/core/end2end/tests/writes_done_hangs_with_pending_read.c3
-rw-r--r--test/core/fling/client.c4
-rw-r--r--test/core/surface/lame_client_test.c2
32 files changed, 441 insertions, 250 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 6a818fcd47..0ebfad8824 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -240,12 +240,6 @@ typedef struct {
} grpc_metadata_array;
typedef struct {
- size_t count;
- size_t capacity;
- grpc_byte_buffer **buffers;
-} grpc_byte_buffer_array;
-
-typedef struct {
grpc_status_code status;
const char *details;
} grpc_recv_status;
@@ -257,40 +251,43 @@ typedef struct {
} grpc_call_details;
typedef enum {
- GRPC_IOREQ_SEND_INITIAL_METADATA = 0,
- GRPC_IOREQ_SEND_TRAILING_METADATA,
- GRPC_IOREQ_SEND_MESSAGES,
- GRPC_IOREQ_SEND_CLOSE,
- GRPC_IOREQ_RECV_INITIAL_METADATA,
- GRPC_IOREQ_RECV_TRAILING_METADATA,
- GRPC_IOREQ_RECV_MESSAGES,
- GRPC_IOREQ_RECV_STATUS,
- GRPC_IOREQ_OP_COUNT
-} grpc_ioreq_op;
-
-typedef union {
- struct {
- size_t count;
- const grpc_metadata *metadata;
- } send_metadata;
- struct {
- size_t count;
- grpc_byte_buffer **messages;
- } send_messages;
- struct {
- /* fields only make sense on the server */
- grpc_status_code status;
- const char *details;
- } send_close;
- grpc_metadata_array *recv_metadata;
- grpc_byte_buffer_array *recv_messages;
- grpc_recv_status *recv_status;
-} grpc_ioreq_data;
-
-typedef struct grpc_ioreq {
- grpc_ioreq_op op;
- grpc_ioreq_data data;
-} grpc_ioreq;
+ GRPC_OP_SEND_INITIAL_METADATA = 0,
+ GRPC_OP_SEND_MESSAGE,
+ GRPC_OP_SEND_CLOSE_FROM_CLIENT,
+ GRPC_OP_SEND_STATUS_FROM_SERVER,
+ GRPC_OP_RECV_INITIAL_METADATA,
+ GRPC_OP_RECV_MESSAGES,
+ GRPC_OP_RECV_STATUS_ON_CLIENT,
+ GRPC_OP_RECV_CLOSE_ON_SERVER
+} grpc_op_type;
+
+typedef struct grpc_op {
+ grpc_op_type op;
+ union {
+ struct {
+ size_t count;
+ const grpc_metadata *metadata;
+ } send_initial_metadata;
+ grpc_byte_buffer *send_message;
+ struct {
+ size_t trailing_metadata_count;
+ grpc_metadata *trailing_metadata;
+ grpc_status_code status;
+ const char *status_details;
+ } send_status_from_server;
+ grpc_metadata_array *recv_initial_metadata;
+ grpc_byte_buffer **recv_message;
+ struct {
+ grpc_metadata_array *trailing_metadata;
+ grpc_status_code *status;
+ char **status_details;
+ size_t *status_details_capacity;
+ } recv_status_on_client;
+ struct {
+ int *cancelled;
+ } recv_close_on_server;
+ } data;
+} grpc_op;
/* Initialize the grpc library */
void grpc_init(void);
@@ -335,14 +332,12 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cq);
is not sent until grpc_call_invoke is called. All completions are sent to
'completion_queue'. */
-grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *method, const char *host, gpr_timespec deadline);
-
-grpc_call *grpc_channel_create_call(grpc_channel *channel,
- grpc_completion_queue *cq,
- const grpc_call_details *details);
+grpc_call *grpc_channel_create_call(grpc_channel *channel, const char *method,
+ const char *host, gpr_timespec deadline);
-grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
- size_t nreqs, void *tag);
+grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
+ size_t nops, grpc_completion_queue *cq,
+ void *tag);
/* Create a client channel */
grpc_channel *grpc_channel_create(const char *target,
@@ -483,8 +478,8 @@ void grpc_call_destroy(grpc_call *call);
grpc_call_error grpc_server_request_call_old(grpc_server *server, void *tag_new);
grpc_call_error grpc_server_request_call(
- grpc_server *server, grpc_completion_queue *cq, grpc_call_details *details,
- grpc_metadata_array *initial_metadata, void *tag);
+ grpc_server *server, grpc_call_details *details,
+ grpc_metadata_array *initial_metadata, grpc_completion_queue *cq, void *tag);
/* Create a server */
grpc_server *grpc_server_create(grpc_completion_queue *cq,
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
new file mode 100644
index 0000000000..bd5263b2f6
--- /dev/null
+++ b/src/core/surface/byte_buffer_queue.c
@@ -0,0 +1,112 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/byte_buffer_queue.h"
+
+#define INITIAL_PENDING_READ_COUNT 4
+
+static void pra_init(pending_read_array *array) {
+ array->data = gpr_malloc(sizeof(pending_read) * INITIAL_PENDING_READ_COUNT);
+ array->count = 0;
+ array->capacity = INITIAL_PENDING_READ_COUNT;
+}
+
+static void pra_destroy(pending_read_array *array,
+ size_t finish_starting_from) {
+ size_t i;
+ for (i = finish_starting_from; i < array->count; i++) {
+ array->data[i].on_finish(array->data[i].user_data, GRPC_OP_ERROR);
+ }
+ gpr_free(array->data);
+}
+
+/* Append an operation to an array, expanding as needed */
+static void pra_push(pending_read_array *a, grpc_byte_buffer *buffer,
+ void (*on_finish)(void *user_data, grpc_op_error error),
+ void *user_data) {
+ if (a->count == a->capacity) {
+ a->capacity *= 2;
+ a->data = gpr_realloc(a->data, sizeof(pending_read) * a->capacity);
+ }
+ a->data[a->count].byte_buffer = buffer;
+ a->data[a->count].user_data = user_data;
+ a->data[a->count].on_finish = on_finish;
+ a->count++;
+}
+
+static void prq_init(pending_read_queue *q) {
+ q->drain_pos = 0;
+ pra_init(&q->filling);
+ pra_init(&q->draining);
+}
+
+static void prq_destroy(pending_read_queue *q) {
+ pra_destroy(&q->filling, 0);
+ pra_destroy(&q->draining, q->drain_pos);
+}
+
+static int prq_is_empty(pending_read_queue *q) {
+ return (q->drain_pos == q->draining.count && q->filling.count == 0);
+}
+
+static void prq_push(pending_read_queue *q, grpc_byte_buffer *buffer,
+ void (*on_finish)(void *user_data, grpc_op_error error),
+ void *user_data) {
+ pra_push(&q->filling, buffer, on_finish, user_data);
+}
+
+/* Take the first queue element and move it to the completion queue. Do nothing
+ if q is empty */
+static int prq_pop_to_cq(pending_read_queue *q, void *tag, grpc_call *call,
+ grpc_completion_queue *cq) {
+ pending_read_array temp_array;
+ pending_read *pr;
+
+ if (q->drain_pos == q->draining.count) {
+ if (q->filling.count == 0) {
+ return 0;
+ }
+ q->draining.count = 0;
+ q->drain_pos = 0;
+ /* swap arrays */
+ temp_array = q->filling;
+ q->filling = q->draining;
+ q->draining = temp_array;
+ }
+
+ pr = q->draining.data + q->drain_pos;
+ q->drain_pos++;
+ grpc_cq_end_read(cq, tag, call, pr->on_finish, pr->user_data,
+ pr->byte_buffer);
+ return 1;
+}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
new file mode 100644
index 0000000000..ffd2616d9d
--- /dev/null
+++ b/src/core/surface/byte_buffer_queue.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__
+#define __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__
+
+/* TODO(ctiller): inline an element or two into this struct to avoid per-call
+ allocations */
+typedef struct {
+ grpc_byte_buffer **data;
+ size_t count;
+ size_t capacity;
+} grpc_bbq_array;
+
+typedef struct {
+ size_t drain_pos;
+ grpc_bbq_array filling;
+ grpc_bbq_array draining;
+} grpc_byte_buffer_queue;
+
+grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
+
+#endif /* __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5040aeef13..4f8ac6193a 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -36,6 +36,7 @@
#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/support/string.h"
+#include "src/core/surface/byte_buffer_queue.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
@@ -59,7 +60,7 @@ typedef struct {
grpc_metadata_array trailing_md_in;
grpc_recv_status status_in;
size_t msg_in_read_idx;
- grpc_byte_buffer_array msg_in;
+ grpc_byte_buffer *msg_in;
gpr_uint8 got_status;
void *finished_tag;
@@ -72,6 +73,7 @@ typedef enum {
SEND_INITIAL_METADATA,
SEND_MESSAGE,
SEND_TRAILING_METADATA,
+ SEND_STATUS,
SEND_FINISH
} send_action;
@@ -81,10 +83,24 @@ typedef struct {
grpc_op_error status;
} completed_request;
+/* See reqinfo.set below for a description */
+#define REQSET_EMPTY 255
+#define REQSET_DONE 254
+
+/* The state of an ioreq */
typedef struct reqinfo {
- req_state state;
+ /* User supplied parameters */
grpc_ioreq_data data;
- struct reqinfo *master;
+ /* In which set is this ioreq?
+ This value could be:
+ - an element of grpc_ioreq_op enumeration, in which case
+ it designates the master ioreq in a set of requests
+ - REQSET_EMPTY, in which case this reqinfo type has no application
+ request against it
+ - REQSET_DONE, in which case this reqinfo has been satisfied for
+ all time for this call, and no further use will be made of it */
+ gpr_uint8 set;
+ grpc_op_error status;
grpc_ioreq_completion_func on_complete;
void *user_data;
gpr_uint32 need_mask;
@@ -122,10 +138,9 @@ struct grpc_call {
reqinfo requests[GRPC_IOREQ_OP_COUNT];
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
- grpc_byte_buffer_array buffered_messages;
+ grpc_byte_buffer_queue incoming_queue;
grpc_metadata_array buffered_initial_metadata;
grpc_metadata_array buffered_trailing_metadata;
- size_t write_index;
grpc_mdelem **owned_metadata;
size_t owned_metadata_count;
size_t owned_metadata_capacity;
@@ -159,6 +174,7 @@ static void enact_send_action(grpc_call *call, send_action sa);
grpc_call *grpc_call_create(grpc_channel *channel,
const void *server_transport_data) {
+ size_t i;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
@@ -166,8 +182,11 @@ grpc_call *grpc_call_create(grpc_channel *channel,
gpr_mu_init(&call->mu);
call->channel = channel;
call->is_client = server_transport_data == NULL;
+ for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+ call->requests[i].set = REQSET_EMPTY;
+ }
if (call->is_client) {
- call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state = REQ_DONE;
+ call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE;
}
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
@@ -189,15 +208,6 @@ legacy_state *get_legacy_state(grpc_call *call) {
void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
-static void destroy_message_array(grpc_byte_buffer_array *array,
- size_t start_idx) {
- size_t i;
- for (i = start_idx; i < array->count; i++) {
- grpc_byte_buffer_destroy(array->buffers[i]);
- }
- gpr_free(array->buffers);
-}
-
static void destroy_call(void *call, int ignored_success) {
size_t i, j;
grpc_call *c = call;
@@ -213,7 +223,6 @@ static void destroy_call(void *call, int ignored_success) {
grpc_mdelem_unref(c->owned_metadata[i]);
}
gpr_free(c->owned_metadata);
- destroy_message_array(&c->buffered_messages, 0);
gpr_free(c->buffered_initial_metadata.metadata);
gpr_free(c->buffered_trailing_metadata.metadata);
if (c->legacy_state) {
@@ -226,8 +235,6 @@ static void destroy_call(void *call, int ignored_success) {
}
gpr_free(c->legacy_state->initial_md_in.metadata);
gpr_free(c->legacy_state->trailing_md_in.metadata);
- destroy_message_array(&c->legacy_state->msg_in,
- c->legacy_state->msg_in_read_idx);
gpr_free(c->legacy_state);
}
gpr_free(c);
@@ -284,7 +291,7 @@ static void unlock(grpc_call *call) {
int num_completed_requests = call->num_completed_requests;
int need_more_data =
call->need_more_data &&
- call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state == REQ_DONE;
+ call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set == REQSET_DONE;
int i;
if (need_more_data) {
@@ -321,124 +328,131 @@ static void unlock(grpc_call *call) {
}
}
-static void get_final_status(grpc_call *call, grpc_status_code *code,
- const char **details) {
+static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
int i;
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].set) {
- *code = call->status[i].code;
- *details = call->status[i].details
- ? grpc_mdstr_as_c_string(call->status[i].details)
- : NULL;
+ *args.code = call->status[i].code;
+ if (call->status[i].details) {
+ gpr_slice details = call->status[i].details->slice;
+ size_t len = GPR_SLICE_LENGTH(details);
+ if (len + 1 > *args.details_capacity) {
+ *args.details_capacity = GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
+ *args.details = gpr_realloc(*args.details, *args.details_capacity);
+ }
+ memcpy(*args.details, GPR_SLICE_START_PTR(details), len);
+ (*args.details)[len] = 0;
+ } else {
+ goto no_details;
+ }
return;
}
}
- *code = GRPC_STATUS_UNKNOWN;
- *details = NULL;
+ *args.code = GRPC_STATUS_UNKNOWN;
+
+no_details:
+ if (0 == *args.details_capacity) {
+ *args.details_capacity = 8;
+ *args.details = gpr_malloc(*args.details_capacity);
+ }
+ **args.details = 0;
}
static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
grpc_op_error status) {
- reqinfo *master = call->requests[op].master;
completed_request *cr;
size_t i;
- switch (call->requests[op].state) {
- case REQ_INITIAL: /* not started yet */
- return;
- case REQ_DONE: /* already finished */
- return;
- case REQ_READY:
- master->complete_mask |= 1 << op;
- call->requests[op].state =
- (op == GRPC_IOREQ_SEND_MESSAGES || op == GRPC_IOREQ_RECV_MESSAGES)
- ? REQ_INITIAL
- : REQ_DONE;
- if (master->complete_mask == master->need_mask ||
- status == GRPC_OP_ERROR) {
- if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
- get_final_status(
- call,
- &call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status->status,
- &call->requests[GRPC_IOREQ_RECV_STATUS]
- .data.recv_status->details);
- }
- for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
- if (call->requests[i].master == master) {
- call->requests[i].master = NULL;
+ if (call->requests[op].set < GRPC_IOREQ_OP_COUNT) {
+ reqinfo *master = &call->requests[call->requests[op].set];
+ /* ioreq is live: we need to do something */
+ master->complete_mask |= 1 << op;
+ call->requests[op].set =
+ (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE)
+ ? REQSET_EMPTY
+ : REQSET_DONE;
+ if (master->complete_mask == master->need_mask ||
+ status == GRPC_OP_ERROR) {
+ if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
+ get_final_status(
+ call,
+ call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
+ }
+ for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
+ if (call->requests[i].set == op) {
+ if (call->requests[i].status != GRPC_OP_OK) {
+ status = GRPC_OP_ERROR;
}
+ call->requests[i].set = REQSET_EMPTY;
}
- cr = &call->completed_requests[call->num_completed_requests++];
- cr->status = status;
- cr->on_complete = master->on_complete;
- cr->user_data = master->user_data;
}
+ cr = &call->completed_requests[call->num_completed_requests++];
+ cr->status = status;
+ cr->on_complete = master->on_complete;
+ cr->user_data = master->user_data;
+ }
}
}
-static void finish_write_step(void *pc, grpc_op_error error) {
- grpc_call *call = pc;
+static void finish_send_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error error) {
lock(call);
- if (error == GRPC_OP_OK) {
- if (call->write_index ==
- call->requests[GRPC_IOREQ_SEND_MESSAGES].data.send_messages.count) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_OK);
- }
- } else {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR);
- }
+ finish_ioreq_op(call, op, error);
call->sending = 0;
unlock(call);
grpc_call_internal_unref(call, 0);
}
+static void finish_write_step(void *pc, grpc_op_error error) {
+ finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, error);
+}
+
static void finish_finish_step(void *pc, grpc_op_error error) {
- grpc_call *call = pc;
- lock(call);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, error);
- call->sending = 0;
- unlock(call);
- grpc_call_internal_unref(call, 0);
+ finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, error);
}
static void finish_start_step(void *pc, grpc_op_error error) {
- grpc_call *call = pc;
- lock(call);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
- call->sending = 0;
- unlock(call);
- grpc_call_internal_unref(call, 0);
+ finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
}
static send_action choose_send_action(grpc_call *call) {
- switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state) {
- case REQ_INITIAL:
+ switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set) {
+ case REQSET_EMPTY:
return SEND_NOTHING;
- case REQ_READY:
+ default:
return SEND_INITIAL_METADATA;
- case REQ_DONE:
+ case REQSET_DONE:
break;
}
- switch (call->requests[GRPC_IOREQ_SEND_MESSAGES].state) {
- case REQ_INITIAL:
+ switch (call->requests[GRPC_IOREQ_SEND_MESSAGE].set) {
+ case REQSET_EMPTY:
return SEND_NOTHING;
- case REQ_READY:
+ default:
return SEND_MESSAGE;
- case REQ_DONE:
+ case REQSET_DONE:
break;
}
- switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state) {
- case REQ_INITIAL:
+ switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set) {
+ case REQSET_EMPTY:
return SEND_NOTHING;
- case REQ_READY:
+ default:
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
return SEND_TRAILING_METADATA;
- case REQ_DONE:
+ case REQSET_DONE:
break;
}
- switch (call->requests[GRPC_IOREQ_SEND_CLOSE].state) {
+ switch (call->requests[GRPC_IOREQ_SEND_STATUS].set) {
+ case REQSET_EMPTY:
+ return SEND_NOTHING;
default:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
+ return SEND_STATUS;
+ case REQSET_DONE:
+ break;
+ }
+ switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) {
+ case REQSET_EMPTY:
+ case REQSET_DONE:
return SEND_NOTHING;
- case REQ_READY:
+ default:
return SEND_FINISH;
}
}
@@ -458,6 +472,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_ioreq_data data;
grpc_call_op op;
size_t i;
+ char status_str[GPR_LTOA_MIN_BUFSIZE];
switch (sa) {
case SEND_NOTHING:
@@ -481,11 +496,11 @@ static void enact_send_action(grpc_call *call, send_action sa) {
grpc_call_execute_op(call, &op);
break;
case SEND_MESSAGE:
- data = call->requests[GRPC_IOREQ_SEND_MESSAGES].data;
+ data = call->requests[GRPC_IOREQ_SEND_MESSAGE].data;
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
- op.data.message = data.send_messages.messages[call->write_index++];
+ op.data.message = data.send_message;
op.done_cb = finish_write_step;
op.user_data = call;
grpc_call_execute_op(call, &op);
@@ -504,28 +519,27 @@ static void enact_send_action(grpc_call *call, send_action sa) {
unlock(call);
grpc_call_internal_unref(call, 0);
break;
- case SEND_FINISH:
- if (!call->is_client) {
- /* TODO(ctiller): cache common status values */
- char status_str[GPR_LTOA_MIN_BUFSIZE];
- data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
- gpr_ltoa(data.send_close.status, status_str);
- send_metadata(
- call,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context, status_str)));
- if (data.send_close.details) {
- send_metadata(call,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(
- grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_close.details)));
- }
+ case SEND_STATUS:
+ /* TODO(ctiller): cache common status values */
+ data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
+ gpr_ltoa(data.send_status.code, status_str);
+ send_metadata(
+ call,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context, status_str)));
+ if (data.send_status.details) {
+ send_metadata(call,
+ grpc_mdelem_from_metadata_strings(
+ call->metadata_context,
+ grpc_mdstr_ref(
+ grpc_channel_get_message_string(call->channel)),
+ grpc_mdstr_from_string(call->metadata_context,
+ data.send_status.details)));
}
+ break;
+ case SEND_FINISH:
op.type = GRPC_SEND_FINISH;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
@@ -542,7 +556,7 @@ static grpc_call_error start_ioreq_error(grpc_call *call,
size_t i;
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
if (mutated_ops & (1 << i)) {
- call->requests[i].master = NULL;
+ call->requests[i].set = REQSET_EMPTY;
}
}
return ret;
@@ -555,35 +569,32 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t i;
gpr_uint32 have_ops = 0;
grpc_ioreq_op op;
- reqinfo *master = NULL;
reqinfo *requests = call->requests;
+ reqinfo *master;
grpc_ioreq_data data;
+ gpr_uint8 set;
+
+ if (nreqs == 0) {
+ return GRPC_CALL_OK;
+ }
+
+ set = reqs[0].op;
+ master = &requests[set];
for (i = 0; i < nreqs; i++) {
op = reqs[i].op;
- if (requests[op].master) {
+ if (requests[op].set < GRPC_IOREQ_OP_COUNT) {
return start_ioreq_error(call, have_ops,
GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
- }
- switch (requests[op].state) {
- case REQ_INITIAL:
- break;
- case REQ_READY:
- return start_ioreq_error(call, have_ops,
- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
- case REQ_DONE:
- return start_ioreq_error(call, have_ops,
- GRPC_CALL_ERROR_ALREADY_INVOKED);
- }
- if (master == NULL) {
- master = &requests[op];
+ } else if (requests[op].set == REQSET_DONE) {
+ return start_ioreq_error(call, have_ops,
+ GRPC_CALL_ERROR_ALREADY_INVOKED);
}
have_ops |= 1 << op;
data = reqs[i].data;
- requests[op].state = REQ_READY;
requests[op].data = data;
- requests[op].master = master;
+ requests[op].set = set;
}
GPR_ASSERT(master != NULL);
@@ -598,12 +609,10 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
switch (op) {
default:
break;
- case GRPC_IOREQ_RECV_MESSAGES:
- data.recv_messages->count = 0;
- if (call->buffered_messages.count > 0 || call->read_closed) {
- SWAP(grpc_byte_buffer_array, *data.recv_messages,
- call->buffered_messages);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
+ case GRPC_IOREQ_RECV_MESSAGE:
+ *data.recv_message = grpc_bbq_pop(&call->incoming_queue);
+ if (*data.recv_message) {
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
} else {
call->need_more_data = 1;
}
@@ -612,19 +621,18 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
}
break;
case GRPC_IOREQ_RECV_STATUS:
- if (call->stream_closed && call->buffered_messages.count == 0) {
+ if (call->stream_closed) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
}
break;
- case GRPC_IOREQ_SEND_MESSAGES:
+ case GRPC_IOREQ_SEND_MESSAGE:
if (call->stream_closed) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
}
- call->write_index = 0;
break;
case GRPC_IOREQ_SEND_CLOSE:
- if (requests[GRPC_IOREQ_SEND_MESSAGES].state == REQ_INITIAL) {
- requests[GRPC_IOREQ_SEND_MESSAGES].state = REQ_DONE;
+ if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) {
+ requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE;
}
break;
case GRPC_IOREQ_SEND_INITIAL_METADATA:
@@ -1088,8 +1096,13 @@ void grpc_call_recv_message(grpc_call_element *elem,
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_byte_buffer_array *dest;
lock(call);
- if (call->requests[GRPC_IOREQ_RECV_MESSAGES].master != NULL) {
- dest = call->requests[GRPC_IOREQ_RECV_MESSAGES].data.recv_messages;
+ if (call->requests[GRPC_IOREQ_RECV_MESSAGE].master != NULL) {
+ if (call->requests[GRPC_IOREQ_RECV_MESSAGE].state != REQ_READY) {
+ call->requests[GRPC_IOREQ_RECV_MESSAGE].status = GRPC_OP_ERROR;
+ } else {
+ *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ }
} else {
dest = &call->buffered_messages;
}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index c130a13b81..45816a312e 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -38,6 +38,47 @@
#include "src/core/channel/metadata_buffer.h"
#include <grpc/grpc.h>
+/* Primitive operation types - grpc_op's get rewritten into these */
+typedef enum {
+ GRPC_IOREQ_RECV_INITIAL_METADATA,
+ GRPC_IOREQ_RECV_MESSAGE,
+ GRPC_IOREQ_RECV_TRAILING_METADATA,
+ GRPC_IOREQ_RECV_STATUS,
+ GPRC_IOREQ_RECV_CLOSE,
+ GRPC_IOREQ_SEND_INITIAL_METADATA,
+ GRPC_IOREQ_SEND_MESSAGE,
+ GRPC_IOREQ_SEND_TRAILING_METADATA,
+ GRPC_IOREQ_SEND_STATUS,
+ GRPC_IOREQ_SEND_CLOSE,
+ GRPC_IOREQ_OP_COUNT
+} grpc_ioreq_op;
+
+typedef struct {
+ grpc_status_code *code;
+ char **details;
+ size_t *details_capacity;
+} grpc_recv_status_args;
+
+typedef union {
+ grpc_metadata_array *recv_metadata;
+ grpc_byte_buffer **recv_message;
+ grpc_recv_status_args recv_status;
+ struct {
+ size_t count;
+ grpc_metadata *metadata;
+ } send_metadata;
+ grpc_byte_buffer *send_message;
+ struct {
+ grpc_status_code code;
+ char *details;
+ } send_status;
+} grpc_ioreq_data;
+
+typedef struct {
+ grpc_ioreq_op op;
+ grpc_ioreq_data data;
+} grpc_ioreq;
+
typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
grpc_op_error status,
void *user_data);
diff --git a/test/core/echo/client.c b/test/core/echo/client.c
index 32e85a1401..7d895c3f19 100644
--- a/test/core/echo/client.c
+++ b/test/core/echo/client.c
@@ -78,7 +78,7 @@ int main(int argc, char **argv) {
GPR_ASSERT(argc == 2);
channel = grpc_channel_create(argv[1], NULL);
- call = grpc_channel_create_call_old(
+ call = grpc_channel_create_call(
channel, "/foo", "localhost",
gpr_time_add(gpr_time_from_seconds(5), gpr_now()));
GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) ==
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index 340fa039fd..14db4495f2 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -112,7 +112,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
}
/* Send a trivial request. */
- c = grpc_channel_create_call_old(client, "/foo", "test.google.com", deadline);
+ c = grpc_channel_create_call(client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, client_cq, tag(2), tag(3), 0));
diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c
index e4b4e4bb7d..389a6429c4 100644
--- a/test/core/end2end/no_server_test.c
+++ b/test/core/end2end/no_server_test.c
@@ -56,7 +56,7 @@ int main(int argc, char **argv) {
/* create a call, channel to a non existant server */
chan = grpc_channel_create("nonexistant:54321", NULL);
- call = grpc_channel_create_call_old(chan, "/foo", "nonexistant", deadline);
+ call = grpc_channel_create_call(chan, "/foo", "nonexistant", deadline);
GPR_ASSERT(grpc_call_invoke(call, cq, tag(2), tag(3), 0) == GRPC_CALL_OK);
/* verify that all tags get completed */
cq_expect_client_metadata_read(cqv, tag(2), NULL);
diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c
index 3a71daa9a4..8c00e86f7e 100644
--- a/test/core/end2end/tests/cancel_after_accept.c
+++ b/test/core/end2end/tests/cancel_after_accept.c
@@ -113,8 +113,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
index b258dd4251..416b494668 100644
--- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
+++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
@@ -113,8 +113,7 @@ static void test_cancel_after_accept_and_writes_closed(
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c
index 12de361383..5dfb3f756a 100644
--- a/test/core/end2end/tests/cancel_after_invoke.c
+++ b/test/core/end2end/tests/cancel_after_invoke.c
@@ -111,8 +111,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
gpr_timespec deadline = five_seconds_time();
cq_verifier *v_client = cq_verifier_create(f.client_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c
index 7c706029a1..ac816484fd 100644
--- a/test/core/end2end/tests/cancel_before_invoke.c
+++ b/test/core/end2end/tests/cancel_before_invoke.c
@@ -109,8 +109,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config) {
gpr_timespec deadline = five_seconds_time();
cq_verifier *v_client = cq_verifier_create(f.client_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_cancel(c));
diff --git a/test/core/end2end/tests/cancel_in_a_vacuum.c b/test/core/end2end/tests/cancel_in_a_vacuum.c
index 6b5194fb07..5257ece297 100644
--- a/test/core/end2end/tests/cancel_in_a_vacuum.c
+++ b/test/core/end2end/tests/cancel_in_a_vacuum.c
@@ -109,8 +109,7 @@ static void test_cancel_in_a_vacuum(grpc_end2end_test_config config,
gpr_timespec deadline = five_seconds_time();
cq_verifier *v_client = cq_verifier_create(f.client_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c
index 1b442e9e4c..cb37bf956d 100644
--- a/test/core/end2end/tests/census_simple_request.c
+++ b/test/core/end2end/tests/census_simple_request.c
@@ -106,8 +106,7 @@ static void test_body(grpc_end2end_test_fixture f) {
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
tag(1);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c
index eafda6132b..50de3347b6 100644
--- a/test/core/end2end/tests/disappearing_server.c
+++ b/test/core/end2end/tests/disappearing_server.c
@@ -97,8 +97,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
grpc_call *s;
gpr_timespec deadline = five_seconds_time();
- c = grpc_channel_create_call_old(f->client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
index 2eb56517f7..2f14c1b565 100644
--- a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
+++ b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
@@ -111,8 +111,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c
index 7ebd8e38b0..b918648cb1 100644
--- a/test/core/end2end/tests/graceful_server_shutdown.c
+++ b/test/core/end2end/tests/graceful_server_shutdown.c
@@ -110,8 +110,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c
index ac7071a04b..3ce661893c 100644
--- a/test/core/end2end/tests/invoke_large_request.c
+++ b/test/core/end2end/tests/invoke_large_request.c
@@ -122,8 +122,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100)));
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c
index fc180fd962..a18c0453c6 100644
--- a/test/core/end2end/tests/max_concurrent_streams.c
+++ b/test/core/end2end/tests/max_concurrent_streams.c
@@ -109,8 +109,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
@@ -182,11 +181,10 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
/* start two requests - ensuring that the second is not accepted until
the first completes */
deadline = five_seconds_time();
- c1 = grpc_channel_create_call_old(f.client, "/alpha", "test.google.com",
- deadline);
+ c1 =
+ grpc_channel_create_call(f.client, "/alpha", "test.google.com", deadline);
GPR_ASSERT(c1);
- c2 = grpc_channel_create_call_old(f.client, "/beta", "test.google.com",
- deadline);
+ c2 = grpc_channel_create_call(f.client, "/beta", "test.google.com", deadline);
GPR_ASSERT(c1);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100)));
diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c
index e8adb82c19..adaab4ecd5 100644
--- a/test/core/end2end/tests/ping_pong_streaming.c
+++ b/test/core/end2end/tests/ping_pong_streaming.c
@@ -118,8 +118,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
cq_verifier *v_server = cq_verifier_create(f.server_cq);
gpr_log(GPR_INFO, "testing with %d message pairs.", messages);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
index 49720a7bea..c3cf2d7123 100644
--- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
@@ -137,8 +137,7 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice_unref(request_payload_slice);
gpr_slice_unref(response_payload_slice);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
/* add multiple metadata */
diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
index 14e791ae8e..fb7b155367 100644
--- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
@@ -128,8 +128,7 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice_unref(request_payload_slice);
gpr_slice_unref(response_payload_slice);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
/* add multiple metadata */
diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c
index ee1e52e54c..87ed30c8f8 100644
--- a/test/core/end2end/tests/request_response_with_payload.c
+++ b/test/core/end2end/tests/request_response_with_payload.c
@@ -121,8 +121,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100)));
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
index 65f890b4b9..c1f0e499d6 100644
--- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
@@ -130,8 +130,7 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice_unref(request_payload_slice);
gpr_slice_unref(response_payload_slice);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
/* add multiple metadata */
diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c
index 438f2ef1d1..543bb999be 100644
--- a/test/core/end2end/tests/request_with_large_metadata.c
+++ b/test/core/end2end/tests/request_with_large_metadata.c
@@ -121,8 +121,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
meta.value[large_size] = 0;
meta.value_length = large_size;
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
/* add the metadata */
diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c
index 31d79df504..a2b5cfa750 100644
--- a/test/core/end2end/tests/request_with_payload.c
+++ b/test/core/end2end/tests/request_with_payload.c
@@ -116,8 +116,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
/* byte buffer holds the slice, we can unref it already */
gpr_slice_unref(payload_slice);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call_old(f.server, tag(100)));
diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c
index 1222d167de..7e7ab54523 100644
--- a/test/core/end2end/tests/simple_delayed_request.c
+++ b/test/core/end2end/tests/simple_delayed_request.c
@@ -103,8 +103,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
config.init_client(f, client_args);
- c = grpc_channel_create_call_old(f->client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c
index 64a8340d54..cbade1252a 100644
--- a/test/core/end2end/tests/simple_request.c
+++ b/test/core/end2end/tests/simple_request.c
@@ -110,8 +110,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
@@ -157,8 +156,7 @@ static void simple_request_body2(grpc_end2end_test_fixture f) {
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/end2end/tests/thread_stress.c b/test/core/end2end/tests/thread_stress.c
index 7e235be4f4..9d40ed16a8 100644
--- a/test/core/end2end/tests/thread_stress.c
+++ b/test/core/end2end/tests/thread_stress.c
@@ -108,7 +108,7 @@ static void drain_cq(int client, grpc_completion_queue *cq) {
static void start_request(void) {
gpr_slice slice = gpr_slice_malloc(100);
grpc_byte_buffer *buf;
- grpc_call *call = grpc_channel_create_call_old(
+ grpc_call *call = grpc_channel_create_call(
g_fixture.client, "/Foo", "test.google.com", g_test_end_time);
memset(GPR_SLICE_START_PTR(slice), 1, GPR_SLICE_LENGTH(slice));
diff --git a/test/core/end2end/tests/writes_done_hangs_with_pending_read.c b/test/core/end2end/tests/writes_done_hangs_with_pending_read.c
index fb2fbdd092..f3ec4ae4f1 100644
--- a/test/core/end2end/tests/writes_done_hangs_with_pending_read.c
+++ b/test/core/end2end/tests/writes_done_hangs_with_pending_read.c
@@ -124,8 +124,7 @@ static void test_writes_done_hangs_with_pending_read(
gpr_slice_unref(request_payload_slice);
gpr_slice_unref(response_payload_slice);
- c = grpc_channel_create_call_old(f.client, "/foo", "test.google.com",
- deadline);
+ c = grpc_channel_create_call(f.client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index fa8f263941..6f197f1cd7 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -53,7 +53,7 @@ static grpc_call *call;
static void init_ping_pong_request(void) {}
static void step_ping_pong_request(void) {
- call = grpc_channel_create_call_old(channel, "/Reflector/reflectUnary",
+ call = grpc_channel_create_call(channel, "/Reflector/reflectUnary",
"localhost", gpr_inf_future);
GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
@@ -71,7 +71,7 @@ static void step_ping_pong_request(void) {
}
static void init_ping_pong_stream(void) {
- call = grpc_channel_create_call_old(channel, "/Reflector/reflectStream",
+ call = grpc_channel_create_call(channel, "/Reflector/reflectStream",
"localhost", gpr_inf_future);
GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) ==
GRPC_CALL_OK);
diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c
index c43ac7c242..9b9f0202d6 100644
--- a/test/core/surface/lame_client_test.c
+++ b/test/core/surface/lame_client_test.c
@@ -51,7 +51,7 @@ int main(int argc, char **argv) {
chan = grpc_lame_client_channel_create();
GPR_ASSERT(chan);
- call = grpc_channel_create_call_old(
+ call = grpc_channel_create_call(
chan, "/Foo", "anywhere",
gpr_time_add(gpr_now(), gpr_time_from_seconds(100)));
GPR_ASSERT(call);