aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-01-16 15:24:17 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-01-16 15:24:17 -0800
commit5188535a265824704d6b632122322689925f778e (patch)
treed809816b57844d9c7445bab8a15e2857d1d490c3
parentbeddbdaef22989309e4adf996843efc25d1891bd (diff)
parent662820ee015078341456cac1f878296ce296695d (diff)
Merge branch 'new_invoke_api' of github.com:google/grpc into update-api
-rw-r--r--include/grpc/grpc.h11
-rw-r--r--src/core/surface/byte_buffer.c11
-rw-r--r--src/core/surface/call.c184
-rw-r--r--src/cpp/client/channel.cc15
-rw-r--r--src/cpp/stream/stream_context.cc12
-rw-r--r--src/cpp/stream/stream_context.h1
-rw-r--r--src/node/call.cc31
-rw-r--r--src/node/call.h2
-rw-r--r--src/node/client.js99
-rw-r--r--src/node/node_grpc.cc2
-rw-r--r--src/node/test/call_test.js42
-rw-r--r--src/node/test/constant_test.js1
-rw-r--r--src/node/test/end_to_end_test.js57
-rw-r--r--src/node/test/server_test.js41
-rw-r--r--src/php/ext/grpc/call.c38
-rw-r--r--src/php/ext/grpc/channel.c2
-rw-r--r--src/php/ext/grpc/completion_queue.c16
-rw-r--r--src/php/ext/grpc/credentials.c13
-rw-r--r--src/php/ext/grpc/php_grpc.c7
-rw-r--r--src/php/ext/grpc/server.c8
-rw-r--r--src/php/ext/grpc/server_credentials.c4
-rw-r--r--src/php/ext/grpc/timeval.c24
-rwxr-xr-xsrc/php/lib/Grpc/ActiveCall.php3
-rwxr-xr-xsrc/php/tests/unit_tests/CallTest.php6
-rwxr-xr-xsrc/php/tests/unit_tests/EndToEndTest.php24
-rwxr-xr-xsrc/php/tests/unit_tests/SecureEndToEndTest.php24
-rw-r--r--test/core/echo/client.c7
-rw-r--r--test/core/end2end/cq_verifier.c11
-rw-r--r--test/core/end2end/cq_verifier.h1
-rw-r--r--test/core/end2end/dualstack_socket_test.c10
-rw-r--r--test/core/end2end/dualstack_socket_test.c.orig213
-rw-r--r--test/core/end2end/no_server_test.c4
-rw-r--r--test/core/end2end/tests/cancel_after_accept.c4
-rw-r--r--test/core/end2end/tests/cancel_after_accept_and_writes_closed.c4
-rw-r--r--test/core/end2end/tests/cancel_after_invoke.c4
-rw-r--r--test/core/end2end/tests/cancel_before_invoke.c3
-rw-r--r--test/core/end2end/tests/census_simple_request.c4
-rw-r--r--test/core/end2end/tests/disappearing_server.c7
-rw-r--r--test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c4
-rw-r--r--test/core/end2end/tests/invoke_large_request.c4
-rw-r--r--test/core/end2end/tests/max_concurrent_streams.c35
-rw-r--r--test/core/end2end/tests/ping_pong_streaming.c3
-rw-r--r--test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c4
-rw-r--r--test/core/end2end/tests/request_response_with_metadata_and_payload.c4
-rw-r--r--test/core/end2end/tests/request_response_with_payload.c4
-rw-r--r--test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c4
-rw-r--r--test/core/end2end/tests/request_with_large_metadata.c4
-rw-r--r--test/core/end2end/tests/request_with_payload.c4
-rw-r--r--test/core/end2end/tests/simple_delayed_request.c6
-rw-r--r--test/core/end2end/tests/simple_request.c8
-rw-r--r--test/core/end2end/tests/thread_stress.c40
-rw-r--r--test/core/end2end/tests/writes_done_hangs_with_pending_read.c4
-rw-r--r--test/core/fling/client.c11
-rw-r--r--test/core/surface/lame_client_test.c4
m---------third_party/libevent0
55 files changed, 588 insertions, 505 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 3c5b0de195..56904baa32 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -158,6 +158,7 @@ typedef struct grpc_byte_buffer grpc_byte_buffer;
/* Sample helpers to obtain byte buffers (these will certainly move place */
grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices);
+grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb);
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb);
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer);
@@ -313,18 +314,14 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
flags is a bit-field combination of the write flags defined above.
REQUIRES: Can be called at most once per call.
Can only be called on the client.
- Produces a GRPC_INVOKE_ACCEPTED event with invoke_accepted_tag when the
- call has been invoked (meaning bytes can start flowing to the wire).
Produces a GRPC_CLIENT_METADATA_READ event with metadata_read_tag when
the servers initial metadata has been read.
Produces a GRPC_FINISHED event with finished_tag when the call has been
completed (there may be other events for the call pending at this
time) */
-grpc_call_error grpc_call_start_invoke(grpc_call *call,
- grpc_completion_queue *cq,
- void *invoke_accepted_tag,
- void *metadata_read_tag,
- void *finished_tag, gpr_uint32 flags);
+grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
+ void *metadata_read_tag, void *finished_tag,
+ gpr_uint32 flags);
/* Accept an incoming RPC, binding a completion queue to it.
To be called before sending or receiving messages.
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 27a6c6e33d..d1be41074d 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -49,6 +49,17 @@ grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
return bb;
}
+grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
+ switch (bb->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ return grpc_byte_buffer_create(bb->data.slice_buffer.slices,
+ bb->data.slice_buffer.count);
+ }
+ gpr_log(GPR_INFO, "should never get here");
+ abort();
+ return NULL;
+}
+
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 26bfa02ad1..63cf7bb6b0 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -173,11 +173,14 @@ struct grpc_call {
/* protects variables in this section */
gpr_mu read_mu;
+ gpr_uint8 received_start;
+ gpr_uint8 start_ok;
gpr_uint8 reads_done;
gpr_uint8 received_finish;
gpr_uint8 received_metadata;
gpr_uint8 have_read;
gpr_uint8 have_alarm;
+ gpr_uint8 pending_writes_done;
gpr_uint8 got_status_code;
/* The current outstanding read message tag (only valid if have_read == 1) */
void *read_tag;
@@ -190,6 +193,8 @@ struct grpc_call {
/* The current outstanding send message/context/invoke/end tag (only valid if
have_write == 1) */
void *write_tag;
+ grpc_byte_buffer *pending_write;
+ gpr_uint32 pending_write_flags;
/* The final status of the call */
grpc_status_code status_code;
@@ -227,11 +232,15 @@ grpc_call *grpc_call_create(grpc_channel *channel,
call->have_alarm = 0;
call->received_metadata = 0;
call->got_status_code = 0;
+ call->start_ok = 0;
call->status_code =
server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
call->status_details = NULL;
call->received_finish = 0;
call->reads_done = 0;
+ call->received_start = 0;
+ call->pending_write = NULL;
+ call->pending_writes_done = 0;
grpc_metadata_buffer_init(&call->incoming_metadata);
gpr_ref_init(&call->internal_refcount, 1);
grpc_call_stack_init(channel_stack, server_transport_data,
@@ -360,16 +369,6 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
return GRPC_CALL_OK;
}
-static void done_invoke(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
static void finish_call(grpc_call *call) {
size_t count;
grpc_metadata *elements;
@@ -384,11 +383,81 @@ static void finish_call(grpc_call *call) {
elements, count);
}
-grpc_call_error grpc_call_start_invoke(grpc_call *call,
- grpc_completion_queue *cq,
- void *invoke_accepted_tag,
- void *metadata_read_tag,
- void *finished_tag, gpr_uint32 flags) {
+static void done_write(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void done_writes_done(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void call_started(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ grpc_call_element *elem;
+ grpc_byte_buffer *pending_write = NULL;
+ gpr_uint32 pending_write_flags = 0;
+ gpr_uint8 pending_writes_done = 0;
+ int ok;
+ grpc_call_op op;
+
+ gpr_mu_lock(&call->read_mu);
+ GPR_ASSERT(!call->received_start);
+ call->received_start = 1;
+ ok = call->start_ok = (error == GRPC_OP_OK);
+ pending_write = call->pending_write;
+ pending_write_flags = call->pending_write_flags;
+ pending_writes_done = call->pending_writes_done;
+ gpr_mu_unlock(&call->read_mu);
+
+ if (pending_write) {
+ if (ok) {
+ op.type = GRPC_SEND_MESSAGE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = pending_write_flags;
+ op.done_cb = done_write;
+ op.user_data = call;
+ op.data.message = pending_write;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ } else {
+ done_write(call, error);
+ }
+ grpc_byte_buffer_destroy(pending_write);
+ }
+ if (pending_writes_done) {
+ if (ok) {
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ } else {
+ done_writes_done(call, error);
+ }
+ }
+
+ grpc_call_internal_unref(call);
+}
+
+grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
+ void *metadata_read_tag, void *finished_tag,
+ gpr_uint32 flags) {
grpc_call_element *elem;
grpc_call_op op;
@@ -420,7 +489,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
/* inform the completion queue of an incoming operation */
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
- grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED);
gpr_mu_lock(&call->read_mu);
@@ -431,8 +499,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
if (call->received_finish) {
/* handle early cancellation */
- grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL,
- GRPC_OP_ERROR);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL,
NULL, 0, NULL);
finish_call(call);
@@ -442,20 +508,18 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
return GRPC_CALL_OK;
}
- call->write_tag = invoke_accepted_tag;
call->metadata_tag = metadata_read_tag;
- call->have_write = 1;
-
gpr_mu_unlock(&call->read_mu);
/* call down the filter stack */
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
- op.done_cb = done_invoke;
+ op.done_cb = call_started;
op.data.start.pollset = grpc_cq_pollset(cq);
op.user_data = call;
+ grpc_call_internal_ref(call);
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
@@ -486,6 +550,7 @@ grpc_call_error grpc_call_server_accept(grpc_call *call,
call->state = CALL_BOUNDCQ;
call->cq = cq;
call->finished_tag = finished_tag;
+ call->received_start = 1;
if (prq_is_empty(&call->prq) && call->received_finish) {
finish_call(call);
@@ -535,26 +600,6 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
return GRPC_CALL_OK;
}
-static void done_writes_done(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
-static void done_write(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
void grpc_call_client_initial_metadata_complete(
grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element);
@@ -617,7 +662,7 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
} else {
call->read_tag = tag;
call->have_read = 1;
- request_more = 1;
+ request_more = call->received_start;
}
} else if (prq_is_empty(&call->prq) && call->received_finish) {
finish_call(call);
@@ -654,8 +699,6 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
- /* for now we do no buffering, so a NULL byte_buffer can have no impact
- on our behavior -- succeed immediately */
/* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a
flush, and that flush should be propogated down from here */
if (byte_buffer == NULL) {
@@ -666,15 +709,25 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
call->write_tag = tag;
call->have_write = 1;
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.done_cb = done_write;
- op.user_data = call;
- op.data.message = byte_buffer;
+ gpr_mu_lock(&call->read_mu);
+ if (!call->received_start) {
+ call->pending_write = grpc_byte_buffer_copy(byte_buffer);
+ call->pending_write_flags = flags;
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ gpr_mu_unlock(&call->read_mu);
+ } else {
+ gpr_mu_unlock(&call->read_mu);
+
+ op.type = GRPC_SEND_MESSAGE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.done_cb = done_write;
+ op.user_data = call;
+ op.data.message = byte_buffer;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ }
return GRPC_CALL_OK;
}
@@ -706,14 +759,23 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
call->write_tag = tag;
call->have_write = 1;
- op.type = GRPC_SEND_FINISH;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = done_writes_done;
- op.user_data = call;
+ gpr_mu_lock(&call->read_mu);
+ if (!call->received_start) {
+ call->pending_writes_done = 1;
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ gpr_mu_unlock(&call->read_mu);
+ } else {
+ gpr_mu_unlock(&call->read_mu);
+
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ }
return GRPC_CALL_OK;
}
@@ -818,6 +880,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdelem *md = op->data.metadata;
grpc_mdstr *key = md->key;
+ gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call,
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
if (key == grpc_channel_get_status_string(call->channel)) {
maybe_set_status_code(call, decode_status(md));
grpc_mdelem_unref(md);
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index ddda8c22d6..f476f77a49 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -104,7 +104,6 @@ Status Channel::StartBlockingRpc(const RpcMethod& method,
context->set_call(call);
grpc_event* ev;
void* finished_tag = reinterpret_cast<char*>(call);
- void* invoke_tag = reinterpret_cast<char*>(call) + 1;
void* metadata_read_tag = reinterpret_cast<char*>(call) + 2;
void* write_tag = reinterpret_cast<char*>(call) + 3;
void* halfclose_tag = reinterpret_cast<char*>(call) + 4;
@@ -115,19 +114,11 @@ Status Channel::StartBlockingRpc(const RpcMethod& method,
// add_metadata from context
//
// invoke
- GPR_ASSERT(grpc_call_start_invoke(call, cq, invoke_tag, metadata_read_tag,
- finished_tag,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- ev = grpc_completion_queue_pluck(cq, invoke_tag, gpr_inf_future);
- bool success = ev->data.invoke_accepted == GRPC_OP_OK;
- grpc_event_finish(ev);
- if (!success) {
- GetFinalStatus(cq, finished_tag, &status);
- return status;
- }
+ GPR_ASSERT(grpc_call_invoke(call, cq, metadata_read_tag, finished_tag,
+ GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
// write request
grpc_byte_buffer* write_buffer = nullptr;
- success = SerializeProto(request, &write_buffer);
+ bool success = SerializeProto(request, &write_buffer);
if (!success) {
grpc_call_cancel(call);
status =
diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc
index 5ccf8c9682..ebe71594c0 100644
--- a/src/cpp/stream/stream_context.cc
+++ b/src/cpp/stream/stream_context.cc
@@ -80,17 +80,9 @@ void StreamContext::Start(bool buffered) {
if (is_client_) {
// TODO(yangg) handle metadata send path
int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0;
- grpc_call_error error = grpc_call_start_invoke(call(), cq(), invoke_tag(),
- client_metadata_read_tag(),
- finished_tag(), flag);
+ grpc_call_error error = grpc_call_invoke(
+ call(), cq(), client_metadata_read_tag(), finished_tag(), flag);
GPR_ASSERT(GRPC_CALL_OK == error);
- grpc_event* invoke_ev =
- grpc_completion_queue_pluck(cq(), invoke_tag(), gpr_inf_future);
- if (invoke_ev->data.invoke_accepted != GRPC_OP_OK) {
- peer_halfclosed_ = true;
- self_halfclosed_ = true;
- }
- grpc_event_finish(invoke_ev);
} else {
// TODO(yangg) metadata needs to be added before accept
// TODO(yangg) correctly set flag to accept
diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h
index 4781f27a77..d6bd7a2370 100644
--- a/src/cpp/stream/stream_context.h
+++ b/src/cpp/stream/stream_context.h
@@ -76,7 +76,6 @@ class StreamContext final : public StreamContextInterface {
void* read_tag() { return reinterpret_cast<char*>(this) + 1; }
void* write_tag() { return reinterpret_cast<char*>(this) + 2; }
void* halfclose_tag() { return reinterpret_cast<char*>(this) + 3; }
- void* invoke_tag() { return reinterpret_cast<char*>(this) + 4; }
void* client_metadata_read_tag() { return reinterpret_cast<char*>(this) + 5; }
grpc_call* call() { return call_; }
grpc_completion_queue* cq() { return cq_; }
diff --git a/src/node/call.cc b/src/node/call.cc
index b8ee1786a6..6434c2f0d5 100644
--- a/src/node/call.cc
+++ b/src/node/call.cc
@@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) {
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "addMetadata",
FunctionTemplate::New(AddMetadata)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startInvoke",
- FunctionTemplate::New(StartInvoke)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "invoke",
+ FunctionTemplate::New(Invoke)->GetFunction());
NanSetPrototypeTemplate(tpl, "serverAccept",
FunctionTemplate::New(ServerAccept)->GetFunction());
NanSetPrototypeTemplate(
@@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) {
NanReturnUndefined();
}
-NAN_METHOD(Call::StartInvoke) {
+NAN_METHOD(Call::Invoke) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("startInvoke can only be called on Call objects");
+ return NanThrowTypeError("invoke can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
- return NanThrowTypeError("StartInvoke's first argument must be a function");
+ return NanThrowTypeError("invoke's first argument must be a function");
}
if (!args[1]->IsFunction()) {
- return NanThrowTypeError(
- "StartInvoke's second argument must be a function");
- }
- if (!args[2]->IsFunction()) {
- return NanThrowTypeError("StartInvoke's third argument must be a function");
+ return NanThrowTypeError("invoke's second argument must be a function");
}
- if (!args[3]->IsUint32()) {
- return NanThrowTypeError(
- "StartInvoke's fourth argument must be integer flags");
+ if (!args[2]->IsUint32()) {
+ return NanThrowTypeError("invoke's third argument must be integer flags");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
unsigned int flags = args[3]->Uint32Value();
- grpc_call_error error = grpc_call_start_invoke(
+ grpc_call_error error = grpc_call_invoke(
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
- CreateTag(args[0], args.This()), CreateTag(args[1], args.This()),
- CreateTag(args[2], args.This()), flags);
+ CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
- CompletionQueueAsyncWorker::Next();
} else {
- return NanThrowError("startInvoke failed", error);
+ return NanThrowError("invoke failed", error);
}
NanReturnUndefined();
}
@@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) {
NAN_METHOD(Call::Cancel) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("startInvoke can only be called on Call objects");
+ return NanThrowTypeError("cancel can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call);
diff --git a/src/node/call.h b/src/node/call.h
index 55a6fc65b8..1924a1bf42 100644
--- a/src/node/call.h
+++ b/src/node/call.h
@@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap {
static NAN_METHOD(New);
static NAN_METHOD(AddMetadata);
- static NAN_METHOD(StartInvoke);
+ static NAN_METHOD(Invoke);
static NAN_METHOD(ServerAccept);
static NAN_METHOD(ServerEndInitialMetadata);
static NAN_METHOD(Cancel);
diff --git a/src/node/client.js b/src/node/client.js
index edaa115d0f..24f69d8bfd 100644
--- a/src/node/client.js
+++ b/src/node/client.js
@@ -50,101 +50,53 @@ util.inherits(GrpcClientStream, Duplex);
function GrpcClientStream(call, options) {
Duplex.call(this, options);
var self = this;
- // Indicates that we can start reading and have not received a null read
- var can_read = false;
+ var finished = false;
// Indicates that a read is currently pending
var reading = false;
- // Indicates that we can call startWrite
- var can_write = false;
// Indicates that a write is currently pending
var writing = false;
this._call = call;
/**
- * Callback to handle receiving a READ event. Pushes the data from that event
- * onto the read queue and starts reading again if applicable.
- * @param {grpc.Event} event The READ event object
+ * Callback to be called when a READ event is received. Pushes the data onto
+ * the read queue and starts reading again if applicable
+ * @param {grpc.Event} event READ event object
*/
function readCallback(event) {
+ if (finished) {
+ self.push(null);
+ return;
+ }
var data = event.data;
- if (self.push(data)) {
- if (data == null) {
- // Disable starting to read after null read was received
- can_read = false;
- reading = false;
- } else {
- call.startRead(readCallback);
- }
+ if (self.push(data) && data != null) {
+ self._call.startRead(readCallback);
} else {
- // Indicate that reading can be resumed by calling startReading
reading = false;
}
- };
- /**
- * Initiate a read, which continues until self.push returns false (indicating
- * that reading should be paused) or data is null (indicating that there is no
- * more data to read).
- */
- function startReading() {
- call.startRead(readCallback);
- }
- // TODO(mlumish): possibly change queue implementation due to shift slowness
- var write_queue = [];
- /**
- * Write the next chunk of data in the write queue if there is one. Otherwise
- * indicate that there is no pending write. When the write succeeds, this
- * function is called again.
- */
- function writeNext() {
- if (write_queue.length > 0) {
- writing = true;
- var next = write_queue.shift();
- var writeCallback = function(event) {
- next.callback();
- writeNext();
- };
- call.startWrite(next.chunk, writeCallback, 0);
- } else {
- writing = false;
- }
}
- call.startInvoke(function(event) {
- can_read = true;
- can_write = true;
- startReading();
- writeNext();
- }, function(event) {
+ call.invoke(function(event) {
self.emit('metadata', event.data);
}, function(event) {
+ finished = true;
self.emit('status', event.data);
}, 0);
this.on('finish', function() {
call.writesDone(function() {});
});
/**
- * Indicate that reads should start, and start them if the INVOKE_ACCEPTED
- * event has been received.
+ * Start reading if there is not already a pending read. Reading will
+ * continue until self.push returns false (indicating reads should slow
+ * down) or the read data is null (indicating that there is no more data).
*/
- this._enableRead = function() {
- if (!reading) {
- reading = true;
- if (can_read) {
- startReading();
+ this.startReading = function() {
+ if (finished) {
+ self.push(null);
+ } else {
+ if (!reading) {
+ reading = true;
+ self._call.startRead(readCallback);
}
}
};
- /**
- * Push the chunk onto the write queue, and write from the write queue if
- * there is not a pending write
- * @param {Buffer} chunk The chunk of data to write
- * @param {function(Error=)} callback The callback to call when the write
- * completes
- */
- this._tryWrite = function(chunk, callback) {
- write_queue.push({chunk: chunk, callback: callback});
- if (can_write && !writing) {
- writeNext();
- }
- };
}
/**
@@ -153,7 +105,7 @@ function GrpcClientStream(call, options) {
* @param {number} size Ignored
*/
GrpcClientStream.prototype._read = function(size) {
- this._enableRead();
+ this.startReading();
};
/**
@@ -164,7 +116,10 @@ GrpcClientStream.prototype._read = function(size) {
* @param {function(Error=)} callback Ignored
*/
GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
- this._tryWrite(chunk, callback);
+ var self = this;
+ self._call.startWrite(chunk, function(event) {
+ callback();
+ }, 0);
};
/**
diff --git a/src/node/node_grpc.cc b/src/node/node_grpc.cc
index acee0386d2..bc1dfaf899 100644
--- a/src/node/node_grpc.cc
+++ b/src/node/node_grpc.cc
@@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) {
completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
completion_type->Set(NanNew("READ"), READ);
- Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
- completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index e6dc9664f1..6e52ec89bd 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -118,12 +118,11 @@ describe('call', function() {
call.addMetadata(5);
}, TypeError);
});
- it('should fail if startInvoke was already called', function(done) {
+ it('should fail if invoke was already called', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.startInvoke(function() {},
- function() {},
- function() {done();},
- 0);
+ call.invoke(function() {},
+ function() {done();},
+ 0);
assert.throws(function() {
call.addMetadata({'key' : 'key', 'value' : new Buffer('value') });
}, function(err) {
@@ -133,32 +132,26 @@ describe('call', function() {
call.cancel();
});
});
- describe('startInvoke', function() {
- it('should fail with fewer than 4 arguments', function() {
+ describe('invoke', function() {
+ it('should fail with fewer than 3 arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.startInvoke();
+ call.invoke();
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {});
+ call.invoke(function() {});
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {},
- function() {});
- }, TypeError);
- assert.throws(function() {
- call.startInvoke(function() {},
- function() {},
- function() {});
+ call.invoke(function() {},
+ function() {});
}, TypeError);
});
- it('should work with 3 args and an int', function(done) {
+ it('should work with 2 args and an int', function(done) {
assert.doesNotThrow(function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.startInvoke(function() {},
- function() {},
- function() {done();},
- 0);
+ call.invoke(function() {},
+ function() {done();},
+ 0);
// Cancel to speed up the test
call.cancel();
});
@@ -166,12 +159,11 @@ describe('call', function() {
it('should reject incorrectly typed arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.startInvoke(0, 0, 0, 0);
+ call.invoke(0, 0, 0);
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {},
- function() {},
- function() {}, 'test');
+ call.invoke(function() {},
+ function() {}, 'test');
});
});
});
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index f65eea3cff..0138a55226 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -94,7 +94,6 @@ var opErrorNames = [
var completionTypeNames = [
'QUEUE_SHUTDOWN',
'READ',
- 'INVOKE_ACCEPTED',
'WRITE_ACCEPTED',
'FINISH_ACCEPTED',
'CLIENT_METADATA_READ',
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index 40bb5f3bbd..b9e7bb5691 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -72,16 +72,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
-
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
},function(event) {
@@ -91,6 +82,11 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text);
done();
}, 0);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ });
server.start();
server.requestCall(function(event) {
@@ -131,28 +127,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), reply_text);
- done();
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@@ -163,6 +138,24 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text);
done();
}, 0);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), reply_text);
+ done();
+ });
server.start();
server.requestCall(function(event) {
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
index 79f7b32948..9978853bc2 100644
--- a/src/node/test/server_test.js
+++ b/src/node/test/server_test.js
@@ -83,28 +83,7 @@ describe('echo server', function() {
var call = new grpc.Call(channel,
'echo',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), req_text);
- done();
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@@ -116,6 +95,24 @@ describe('echo server', function() {
server.shutdown();
done();
}, 0);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), req_text);
+ done();
+ });
});
});
});
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index c01af34e95..b171c9c176 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -224,27 +224,25 @@ PHP_METHOD(Call, add_metadata) {
/**
* Invoke the RPC. Starts sending metadata and request headers over the wire
* @param CompletionQueue $queue The completion queue to use with this call
- * @param long $invoke_accepted_tag The tag to associate with this invocation
* @param long $metadata_tag The tag to associate with returned metadata
* @param long $finished_tag The tag to associate with the finished event
* @param long $flags A bitwise combination of the Grpc\WRITE_* constants
* (optional)
* @return Void
*/
-PHP_METHOD(Call, start_invoke) {
+PHP_METHOD(Call, invoke) {
grpc_call_error error_code;
long tag1;
long tag2;
- long tag3;
zval *queue_obj;
long flags = 0;
- /* "Olll|l" == 1 Object, 3 mandatory longs, 1 optional long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Olll|l", &queue_obj,
- grpc_ce_completion_queue, &tag1, &tag2, &tag3,
+ /* "Oll|l" == 1 Object, 3 mandatory longs, 1 optional long */
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Oll|l", &queue_obj,
+ grpc_ce_completion_queue, &tag1, &tag2,
&flags) == FAILURE) {
zend_throw_exception(
spl_ce_InvalidArgumentException,
- "start_invoke needs a CompletionQueue, 3 longs, and an optional long",
+ "invoke needs a CompletionQueue, 2 longs, and an optional long",
1 TSRMLS_CC);
return;
}
@@ -254,10 +252,9 @@ PHP_METHOD(Call, start_invoke) {
wrapped_grpc_completion_queue *queue =
(wrapped_grpc_completion_queue *)zend_object_store_get_object(
queue_obj TSRMLS_CC);
- error_code =
- grpc_call_start_invoke(call->wrapped, queue->wrapped, (void *)tag1,
- (void *)tag2, (void *)tag3, (gpr_uint32)flags);
- MAYBE_THROW_CALL_ERROR(start_invoke, error_code);
+ error_code = grpc_call_invoke(call->wrapped, queue->wrapped, (void *)tag1,
+ (void *)tag2, (gpr_uint32)flags);
+ MAYBE_THROW_CALL_ERROR(invoke, error_code);
}
/**
@@ -423,16 +420,15 @@ PHP_METHOD(Call, start_read) {
static zend_function_entry call_methods[] = {
PHP_ME(Call, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
- PHP_ME(Call, server_accept, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, server_end_initial_metadata, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, add_metadata, NULL, ZEND_ACC_PUBLIC) PHP_ME(
- Call, cancel, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, start_invoke, NULL, ZEND_ACC_PUBLIC) PHP_ME(
- Call, start_read, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, start_write, NULL, ZEND_ACC_PUBLIC) PHP_ME(
- Call, start_write_status, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, writes_done, NULL, ZEND_ACC_PUBLIC)
- PHP_FE_END};
+ PHP_ME(Call, server_accept, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, server_end_initial_metadata, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, add_metadata, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, invoke, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, start_read, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, start_write, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, start_write_status, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, writes_done, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
void grpc_init_call(TSRMLS_D) {
zend_class_entry ce;
diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c
index f0e4153b22..2ab229f5e6 100644
--- a/src/php/ext/grpc/channel.c
+++ b/src/php/ext/grpc/channel.c
@@ -155,7 +155,7 @@ PHP_METHOD(Channel, close) {
static zend_function_entry channel_methods[] = {
PHP_ME(Channel, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
- PHP_ME(Channel, close, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
+ PHP_ME(Channel, close, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
void grpc_init_channel(TSRMLS_D) {
zend_class_entry ce;
diff --git a/src/php/ext/grpc/completion_queue.c b/src/php/ext/grpc/completion_queue.c
index 9785eab8cc..3a93bfcff7 100644
--- a/src/php/ext/grpc/completion_queue.c
+++ b/src/php/ext/grpc/completion_queue.c
@@ -63,8 +63,8 @@ zend_object_value create_wrapped_grpc_completion_queue(
*/
PHP_METHOD(CompletionQueue, __construct) {
wrapped_grpc_completion_queue *queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(
- getThis() TSRMLS_CC);
+ (wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
+ TSRMLS_CC);
queue->wrapped = grpc_completion_queue_create();
}
@@ -86,8 +86,8 @@ PHP_METHOD(CompletionQueue, next) {
return;
}
wrapped_grpc_completion_queue *completion_queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(
- getThis() TSRMLS_CC);
+ (wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
+ TSRMLS_CC);
wrapped_grpc_timeval *wrapped_timeout =
(wrapped_grpc_timeval *)zend_object_store_get_object(timeout TSRMLS_CC);
grpc_event *event = grpc_completion_queue_next(completion_queue->wrapped,
@@ -109,8 +109,8 @@ PHP_METHOD(CompletionQueue, pluck) {
"pluck needs a long and a Timeval", 1 TSRMLS_CC);
}
wrapped_grpc_completion_queue *completion_queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(
- getThis() TSRMLS_CC);
+ (wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
+ TSRMLS_CC);
wrapped_grpc_timeval *wrapped_timeout =
(wrapped_grpc_timeval *)zend_object_store_get_object(timeout TSRMLS_CC);
grpc_event *event = grpc_completion_queue_pluck(
@@ -124,8 +124,8 @@ PHP_METHOD(CompletionQueue, pluck) {
static zend_function_entry completion_queue_methods[] = {
PHP_ME(CompletionQueue, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
- PHP_ME(CompletionQueue, next, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(CompletionQueue, pluck, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
+ PHP_ME(CompletionQueue, next, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(CompletionQueue, pluck, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
void grpc_init_completion_queue(TSRMLS_D) {
zend_class_entry ce;
diff --git a/src/php/ext/grpc/credentials.c b/src/php/ext/grpc/credentials.c
index f486272531..2a83d1cbc1 100644
--- a/src/php/ext/grpc/credentials.c
+++ b/src/php/ext/grpc/credentials.c
@@ -151,13 +151,12 @@ PHP_METHOD(Credentials, createFake) {
static zend_function_entry credentials_methods[] = {
PHP_ME(Credentials, createDefault, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Credentials, createSsl, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Credentials, createComposite, NULL,
- ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Credentials, createGce, NULL,
- ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Credentials, createFake, NULL,
- ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_FE_END};
+ PHP_ME(Credentials, createSsl, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_ME(Credentials, createComposite, NULL,
+ ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_ME(Credentials, createGce, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_ME(Credentials, createFake, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_FE_END};
void grpc_init_credentials(TSRMLS_D) {
zend_class_entry ce;
diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c
index c1042293aa..492ac06739 100644
--- a/src/php/ext/grpc/php_grpc.c
+++ b/src/php/ext/grpc/php_grpc.c
@@ -33,7 +33,8 @@ zend_module_entry grpc_module_entry = {
#if ZEND_MODULE_API_NO >= 20010901
STANDARD_MODULE_HEADER,
#endif
- "grpc", grpc_functions, PHP_MINIT(grpc), PHP_MSHUTDOWN(grpc), NULL, NULL,
+ "grpc", grpc_functions, PHP_MINIT(grpc),
+ PHP_MSHUTDOWN(grpc), NULL, NULL,
PHP_MINFO(grpc),
#if ZEND_MODULE_API_NO >= 20010901
PHP_GRPC_VERSION,
@@ -106,11 +107,9 @@ PHP_MINIT_FUNCTION(grpc) {
/* Register completion type constants */
REGISTER_LONG_CONSTANT("Grpc\\QUEUE_SHUTDOWN", GRPC_QUEUE_SHUTDOWN, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\READ", GRPC_READ, CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\INVOKE_ACCEPTED", GRPC_INVOKE_ACCEPTED,
- CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\WRITE_ACCEPTED", GRPC_WRITE_ACCEPTED, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\FINISH_ACCEPTED", GRPC_FINISH_ACCEPTED,
CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\WRITE_ACCEPTED", GRPC_WRITE_ACCEPTED, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\CLIENT_METADATA_READ",
GRPC_CLIENT_METADATA_READ, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\FINISHED", GRPC_FINISHED, CONST_CS);
diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c
index f484375712..38777f3d54 100644
--- a/src/php/ext/grpc/server.c
+++ b/src/php/ext/grpc/server.c
@@ -176,10 +176,10 @@ PHP_METHOD(Server, start) {
static zend_function_entry server_methods[] = {
PHP_ME(Server, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
- PHP_ME(Server, request_call, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Server, add_http2_port, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Server, add_secure_http2_port, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Server, start, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
+ PHP_ME(Server, request_call, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Server, add_http2_port, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Server, add_secure_http2_port, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Server, start, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
void grpc_init_server(TSRMLS_D) {
zend_class_entry ce;
diff --git a/src/php/ext/grpc/server_credentials.c b/src/php/ext/grpc/server_credentials.c
index 5b9ab3390d..1f8e58aa4d 100644
--- a/src/php/ext/grpc/server_credentials.c
+++ b/src/php/ext/grpc/server_credentials.c
@@ -102,8 +102,8 @@ PHP_METHOD(ServerCredentials, createFake) {
static zend_function_entry server_credentials_methods[] = {
PHP_ME(ServerCredentials, createSsl, NULL,
ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(ServerCredentials, createFake, NULL,
- ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_FE_END};
+ PHP_ME(ServerCredentials, createFake, NULL,
+ ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_FE_END};
void grpc_init_server_credentials(TSRMLS_D) {
zend_class_entry ce;
diff --git a/src/php/ext/grpc/timeval.c b/src/php/ext/grpc/timeval.c
index a5508115e4..cbbbf37915 100644
--- a/src/php/ext/grpc/timeval.c
+++ b/src/php/ext/grpc/timeval.c
@@ -217,20 +217,16 @@ PHP_METHOD(Timeval, sleep_until) {
}
static zend_function_entry timeval_methods[] = {
- PHP_ME(Timeval, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) PHP_ME(
- Timeval, add, NULL,
- ZEND_ACC_PUBLIC) PHP_ME(Timeval, compare, NULL,
- ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Timeval, inf_future, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Timeval, inf_past, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Timeval, now, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Timeval, similar, NULL,
- ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_ME(Timeval, sleep_until, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Timeval, subtract, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Timeval, zero, NULL,
- ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
- PHP_FE_END};
+ PHP_ME(Timeval, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
+ PHP_ME(Timeval, add, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Timeval, compare, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_ME(Timeval, inf_future, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_ME(Timeval, inf_past, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_ME(Timeval, now, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_ME(Timeval, similar, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
+ PHP_ME(Timeval, sleep_until, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Timeval, subtract, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Timeval, zero, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_FE_END};
void grpc_init_timeval(TSRMLS_D) {
zend_class_entry ce;
diff --git a/src/php/lib/Grpc/ActiveCall.php b/src/php/lib/Grpc/ActiveCall.php
index aa66dbb848..836a4b09e3 100755
--- a/src/php/lib/Grpc/ActiveCall.php
+++ b/src/php/lib/Grpc/ActiveCall.php
@@ -29,11 +29,8 @@ class ActiveCall {
// Invoke the call.
$this->call->start_invoke($this->completion_queue,
- INVOKE_ACCEPTED,
CLIENT_METADATA_READ,
FINISHED, 0);
- $this->completion_queue->pluck(INVOKE_ACCEPTED,
- Timeval::inf_future());
$metadata_event = $this->completion_queue->pluck(CLIENT_METADATA_READ,
Timeval::inf_future());
$this->metadata = $metadata_event->data;
diff --git a/src/php/tests/unit_tests/CallTest.php b/src/php/tests/unit_tests/CallTest.php
index 253052a038..795831cb65 100755
--- a/src/php/tests/unit_tests/CallTest.php
+++ b/src/php/tests/unit_tests/CallTest.php
@@ -19,10 +19,10 @@ class CallTest extends PHPUnit_Framework_TestCase{
/**
* @expectedException LogicException
* @expectedExceptionCode Grpc\CALL_ERROR_INVALID_FLAGS
- * @expectedExceptionMessage start_invoke
+ * @expectedExceptionMessage invoke
*/
- public function testStartInvokeRejectsBadFlags() {
- $this->call->start_invoke($this->cq, 0, 0, 0, 0xDEADBEEF);
+ public function testInvokeRejectsBadFlags() {
+ $this->call->invoke($this->cq, 0, 0, 0xDEADBEEF);
}
/**
diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php
index 3818f9531c..78c5e9f93b 100755
--- a/src/php/tests/unit_tests/EndToEndTest.php
+++ b/src/php/tests/unit_tests/EndToEndTest.php
@@ -25,18 +25,12 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$deadline);
$tag = 1;
$this->assertEquals(Grpc\CALL_OK,
- $call->start_invoke($this->client_queue,
- $tag,
- $tag,
- $tag));
+ $call->invoke($this->client_queue,
+ $tag,
+ $tag));
$server_tag = 2;
- // the client invocation was accepted
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type);
-
$call->writes_done($tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
@@ -103,18 +97,12 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$deadline);
$tag = 1;
$this->assertEquals(Grpc\CALL_OK,
- $call->start_invoke($this->client_queue,
- $tag,
- $tag,
- $tag));
+ $call->invoke($this->client_queue,
+ $tag,
+ $tag));
$server_tag = 2;
- // the client invocation was accepted
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type);
-
// the client writes
$call->start_write($req_text, $tag);
$event = $this->client_queue->next($deadline);
diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php
index c562a821a4..7c3ad8a07c 100755
--- a/src/php/tests/unit_tests/SecureEndToEndTest.php
+++ b/src/php/tests/unit_tests/SecureEndToEndTest.php
@@ -37,17 +37,11 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$deadline);
$tag = 1;
$this->assertEquals(Grpc\CALL_OK,
- $call->start_invoke($this->client_queue,
- $tag,
- $tag,
- $tag));
+ $call->invoke($this->client_queue,
+ $tag,
+ $tag));
$server_tag = 2;
- // the client invocation was accepted
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type);
-
$call->writes_done($tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
@@ -113,18 +107,12 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$deadline);
$tag = 1;
$this->assertEquals(Grpc\CALL_OK,
- $call->start_invoke($this->client_queue,
- $tag,
- $tag,
- $tag));
+ $call->invoke($this->client_queue,
+ $tag,
+ $tag));
$server_tag = 2;
- // the client invocation was accepted
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type);
-
// the client writes
$call->start_write($req_text, $tag);
$event = $this->client_queue->next($deadline);
diff --git a/test/core/echo/client.c b/test/core/echo/client.c
index 1905863e11..2ad29df53c 100644
--- a/test/core/echo/client.c
+++ b/test/core/echo/client.c
@@ -79,11 +79,8 @@ int main(int argc, char **argv) {
GPR_ASSERT(argc == 2);
channel = grpc_channel_create(argv[1], NULL);
call = grpc_channel_create_call(channel, "/foo", "localhost", gpr_inf_future);
- GPR_ASSERT(grpc_call_start_invoke(call, cq, (void *)1, (void *)1, (void *)1,
- 0) == GRPC_CALL_OK);
- ev = grpc_completion_queue_next(cq, gpr_inf_future);
- GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK);
- grpc_event_finish(ev);
+ GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) ==
+ GRPC_CALL_OK);
start_write_next_slice(call, bytes_written, WRITE_SLICE_LENGTH);
bytes_written += WRITE_SLICE_LENGTH;
diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c
index 60b0bf3303..c6f77ae07c 100644
--- a/test/core/end2end/cq_verifier.c
+++ b/test/core/end2end/cq_verifier.c
@@ -70,7 +70,6 @@ typedef struct expectation {
union {
grpc_op_error finish_accepted;
grpc_op_error write_accepted;
- grpc_op_error invoke_accepted;
struct {
const char *method;
const char *host;
@@ -182,7 +181,7 @@ static void verify_matches(expectation *e, grpc_event *ev) {
GPR_ASSERT(e->data.write_accepted == ev->data.write_accepted);
break;
case GRPC_INVOKE_ACCEPTED:
- GPR_ASSERT(e->data.invoke_accepted == ev->data.invoke_accepted);
+ abort();
break;
case GRPC_SERVER_RPC_NEW:
GPR_ASSERT(string_equivalent(e->data.server_rpc_new.method,
@@ -270,8 +269,7 @@ static size_t expectation_to_string(char *out, expectation *e) {
return sprintf(out, "GRPC_WRITE_ACCEPTED result=%d",
e->data.write_accepted);
case GRPC_INVOKE_ACCEPTED:
- return sprintf(out, "GRPC_INVOKE_ACCEPTED result=%d",
- e->data.invoke_accepted);
+ return sprintf(out, "GRPC_INVOKE_ACCEPTED");
case GRPC_SERVER_RPC_NEW:
timeout = gpr_time_sub(e->data.server_rpc_new.deadline, gpr_now());
return sprintf(out, "GRPC_SERVER_RPC_NEW method=%s host=%s timeout=%fsec",
@@ -418,11 +416,6 @@ static metadata *metadata_from_args(va_list args) {
}
}
-void cq_expect_invoke_accepted(cq_verifier *v, void *tag,
- grpc_op_error result) {
- add(v, GRPC_INVOKE_ACCEPTED, tag)->data.invoke_accepted = result;
-}
-
void cq_expect_write_accepted(cq_verifier *v, void *tag, grpc_op_error result) {
add(v, GRPC_WRITE_ACCEPTED, tag)->data.write_accepted = result;
}
diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h
index a1966c14c5..6e031d8152 100644
--- a/test/core/end2end/cq_verifier.h
+++ b/test/core/end2end/cq_verifier.h
@@ -56,7 +56,6 @@ void cq_verify_empty(cq_verifier *v);
Any functions taking ... expect a NULL terminated list of key/value pairs
(each pair using two parameter slots) of metadata that MUST be present in
the event. */
-void cq_expect_invoke_accepted(cq_verifier *v, void *tag, grpc_op_error result);
void cq_expect_write_accepted(cq_verifier *v, void *tag, grpc_op_error result);
void cq_expect_finish_accepted(cq_verifier *v, void *tag, grpc_op_error result);
void cq_expect_read(cq_verifier *v, void *tag, gpr_slice bytes);
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index 4327b91298..6219f57500 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -115,14 +115,10 @@ void test_connect(const char *server_host, const char *client_host, int port,
c = grpc_channel_create_call(client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, client_cq, tag(1), tag(2), tag(3), 0));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(c, client_cq, tag(2), tag(3), 0));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
if (expect_ok) {
/* Check for a successful request. */
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
-
- GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
cq_verify(v_client);
@@ -152,11 +148,11 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_call_destroy(s);
} else {
/* Check for a failed connection. */
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_expect_finished_with_status(v_client, tag(3),
GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded", NULL);
+ cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_ERROR);
cq_verify(v_client);
grpc_call_destroy(c);
diff --git a/test/core/end2end/dualstack_socket_test.c.orig b/test/core/end2end/dualstack_socket_test.c.orig
new file mode 100644
index 0000000000..b443caa2a6
--- /dev/null
+++ b/test/core/end2end/dualstack_socket_test.c.orig
@@ -0,0 +1,213 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/socket_utils_posix.h"
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include "test/core/end2end/cq_verifier.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+/* This test exercises IPv4, IPv6, and dualstack sockets in various ways. */
+
+static void *tag(gpr_intptr i) { return (void *)i; }
+
+static gpr_timespec ms_from_now(int ms) {
+ return gpr_time_add(gpr_now(), gpr_time_from_micros(GPR_US_PER_MS * ms));
+}
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event *ev;
+ grpc_completion_type type;
+ do {
+ ev = grpc_completion_queue_next(cq, ms_from_now(5000));
+ GPR_ASSERT(ev);
+ type = ev->type;
+ grpc_event_finish(ev);
+ gpr_log(GPR_INFO, "Drained event type %d", type);
+ } while (type != GRPC_QUEUE_SHUTDOWN);
+}
+
+void test_connect(const char *server_host, const char *client_host, int port,
+ int expect_ok) {
+ char *client_hostport;
+ char *server_hostport;
+ grpc_channel *client;
+ grpc_server *server;
+ grpc_completion_queue *client_cq;
+ grpc_completion_queue *server_cq;
+ grpc_call *c;
+ grpc_call *s;
+ cq_verifier *v_client;
+ cq_verifier *v_server;
+ gpr_timespec deadline;
+
+ gpr_join_host_port(&client_hostport, client_host, port);
+ gpr_join_host_port(&server_hostport, server_host, port);
+ gpr_log(GPR_INFO, "Testing with server=%s client=%s (expecting %s)",
+ server_hostport, client_hostport, expect_ok ? "success" : "failure");
+
+ /* Create server. */
+ server_cq = grpc_completion_queue_create();
+ server = grpc_server_create(server_cq, NULL);
+ GPR_ASSERT(grpc_server_add_http2_port(server, server_hostport));
+ grpc_server_start(server);
+ gpr_free(server_hostport);
+ v_server = cq_verifier_create(server_cq);
+
+ /* Create client. */
+ client_cq = grpc_completion_queue_create();
+ client = grpc_channel_create(client_hostport, NULL);
+ gpr_free(client_hostport);
+ v_client = cq_verifier_create(client_cq);
+
+ if (expect_ok) {
+ /* Normal deadline, shouldn't be reached. */
+ deadline = ms_from_now(60000);
+ } else {
+ /* Give up faster when failure is expected.
+ BUG: Setting this to 1000 reveals a memory leak (b/18608927). */
+ deadline = ms_from_now(1500);
+ }
+
+ /* Send a trivial request. */
+ c = grpc_channel_create_call(client, "/foo", "test.google.com", deadline);
+ GPR_ASSERT(c);
+
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_invoke(c, client_cq, tag(1), tag(2), tag(3), 0));
+ if (expect_ok) {
+ /* Check for a successful request. */
+ cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
+ cq_verify(v_client);
+
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
+ cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
+ cq_verify(v_client);
+
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(server, tag(100)));
+ cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",
+ deadline, NULL);
+ cq_verify(v_server);
+
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s, server_cq, tag(102), 0));
+ cq_expect_client_metadata_read(v_client, tag(2), NULL);
+ cq_verify(v_client);
+
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_write_status(s, GRPC_STATUS_UNIMPLEMENTED, "xyz",
+ tag(5)));
+ cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_UNIMPLEMENTED,
+ "xyz", NULL);
+ cq_verify(v_client);
+
+ cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
+ cq_verify(v_server);
+ cq_expect_finished(v_server, tag(102), NULL);
+ cq_verify(v_server);
+
+ grpc_call_destroy(c);
+ grpc_call_destroy(s);
+ } else {
+ /* Check for a failed connection. */
+ cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_ERROR);
+ cq_expect_client_metadata_read(v_client, tag(2), NULL);
+ cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED,
+ NULL, NULL);
+ cq_verify(v_client);
+
+ grpc_call_destroy(c);
+ }
+
+ cq_verifier_destroy(v_client);
+ cq_verifier_destroy(v_server);
+
+ /* Destroy client. */
+ grpc_channel_destroy(client);
+ grpc_completion_queue_shutdown(client_cq);
+ drain_cq(client_cq);
+ grpc_completion_queue_destroy(client_cq);
+
+ /* Destroy server. */
+ grpc_server_shutdown(server);
+ grpc_server_destroy(server);
+ grpc_completion_queue_shutdown(server_cq);
+ drain_cq(server_cq);
+ grpc_completion_queue_destroy(server_cq);
+}
+
+int main(int argc, char **argv) {
+ int do_ipv6 = 1;
+ int i;
+ int port = grpc_pick_unused_port_or_die();
+
+ grpc_test_init(argc, argv);
+ grpc_init();
+
+ if (!grpc_ipv6_loopback_available()) {
+ gpr_log(GPR_INFO, "Can't bind to ::1. Skipping IPv6 tests.");
+ do_ipv6 = 0;
+ }
+
+ for (i = 0; i <= 1; i++) {
+ /* For coverage, test with and without dualstack sockets. */
+ grpc_forbid_dualstack_sockets_for_testing = i;
+
+ /* :: and 0.0.0.0 are handled identically. */
+ test_connect("::", "127.0.0.1", port, 1);
+ test_connect("::", "::ffff:127.0.0.1", port, 1);
+ test_connect("::", "localhost", port, 1);
+ test_connect("0.0.0.0", "127.0.0.1", port, 1);
+ test_connect("0.0.0.0", "::ffff:127.0.0.1", port, 1);
+ test_connect("0.0.0.0", "localhost", port, 1);
+ if (do_ipv6) {
+ test_connect("::", "::1", port, 1);
+ test_connect("0.0.0.0", "::1", port, 1);
+ }
+
+ /* These only work when the families agree. */
+ test_connect("127.0.0.1", "127.0.0.1", port, 1);
+ if (do_ipv6) {
+ test_connect("::1", "::1", port, 1);
+ test_connect("::1", "127.0.0.1", port, 0);
+ test_connect("127.0.0.1", "::1", port, 0);
+ }
+
+ }
+
+ grpc_shutdown();
+
+ return 0;
+}
diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c
index b9660f14b3..389a6429c4 100644
--- a/test/core/end2end/no_server_test.c
+++ b/test/core/end2end/no_server_test.c
@@ -57,10 +57,8 @@ int main(int argc, char **argv) {
/* create a call, channel to a non existant server */
chan = grpc_channel_create("nonexistant:54321", NULL);
call = grpc_channel_create_call(chan, "/foo", "nonexistant", deadline);
- GPR_ASSERT(grpc_call_start_invoke(call, cq, tag(1), tag(2), tag(3), 0) ==
- GRPC_CALL_OK);
+ GPR_ASSERT(grpc_call_invoke(call, cq, tag(2), tag(3), 0) == GRPC_CALL_OK);
/* verify that all tags get completed */
- cq_expect_invoke_accepted(cqv, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(cqv, tag(2), NULL);
cq_expect_finished_with_status(cqv, tag(3), GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded", NULL);
diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c
index cfbb4796aa..33aed98c38 100644
--- a/test/core/end2end/tests/cancel_after_accept.c
+++ b/test/core/end2end/tests/cancel_after_accept.c
@@ -117,9 +117,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",
diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
index 74670bdc91..f348488b18 100644
--- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
+++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
@@ -117,9 +117,7 @@ static void test_cancel_after_accept_and_writes_closed(
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",
diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c
index d4cb5e4f13..3bb86723e6 100644
--- a/test/core/end2end/tests/cancel_after_invoke.c
+++ b/test/core/end2end/tests/cancel_after_invoke.c
@@ -115,9 +115,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == mode.initiate_cancel(c));
diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c
index f799cba71d..ac816484fd 100644
--- a/test/core/end2end/tests/cancel_before_invoke.c
+++ b/test/core/end2end/tests/cancel_before_invoke.c
@@ -115,8 +115,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_cancel(c));
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_ERROR);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_CANCELLED, NULL,
NULL);
diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c
index baeed5cb46..719f0fe662 100644
--- a/test/core/end2end/tests/census_simple_request.c
+++ b/test/core/end2end/tests/census_simple_request.c
@@ -109,9 +109,7 @@ static void test_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
tag(1);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c
index b27a356eaa..036fdc2501 100644
--- a/test/core/end2end/tests/disappearing_server.c
+++ b/test/core/end2end/tests/disappearing_server.c
@@ -100,11 +100,8 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
- GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c, f->client_cq, tag(1),
- tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
-
- cq_verify(v_client);
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_invoke(c, f->client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
index 6ed0e4e106..66e3c44f4b 100644
--- a/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
+++ b/test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
@@ -115,9 +115,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c
index fc461250d1..f187eceadb 100644
--- a/test/core/end2end/tests/invoke_large_request.c
+++ b/test/core/end2end/tests/invoke_large_request.c
@@ -126,9 +126,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));
diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c
index e88f418cb0..a177a7b2f2 100644
--- a/test/core/end2end/tests/max_concurrent_streams.c
+++ b/test/core/end2end/tests/max_concurrent_streams.c
@@ -113,9 +113,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
@@ -158,7 +156,6 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
grpc_call *s1;
grpc_call *s2;
int live_call;
- grpc_call *live_call_obj;
gpr_timespec deadline;
cq_verifier *v_client;
cq_verifier *v_server;
@@ -192,26 +189,24 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
- GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c1, f.client_cq, tag(300),
- tag(301), tag(302), 0));
- GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c2, f.client_cq, tag(400),
- tag(401), tag(402), 0));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_invoke(c1, f.client_cq, tag(301), tag(302), 0));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_invoke(c2, f.client_cq, tag(401), tag(402), 0));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c1, tag(303)));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c2, tag(303)));
+
ev = grpc_completion_queue_next(
f.client_cq, gpr_time_add(gpr_now(), gpr_time_from_seconds(10)));
GPR_ASSERT(ev);
- GPR_ASSERT(ev->type == GRPC_INVOKE_ACCEPTED);
+ GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED);
GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK);
/* The /alpha or /beta calls started above could be invoked (but NOT both);
* check this here */
- live_call = (int)(gpr_intptr)ev->tag;
- live_call_obj = live_call == 300 ? c1 : c2;
+ /* We'll get tag 303 or 403, we want 300, 400 */
+ live_call = ((int)(gpr_intptr)ev->tag) - 3;
grpc_event_finish(ev);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_writes_done(live_call_obj, tag(live_call + 3)));
- cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK);
- cq_verify(v_client);
-
cq_expect_server_rpc_new(v_server, &s1, tag(100),
live_call == 300 ? "/alpha" : "/beta",
"test.google.com", deadline, NULL);
@@ -233,14 +228,8 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
/* first request is finished, we should be able to start the second */
cq_expect_finished_with_status(v_client, tag(live_call + 2),
GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL);
- live_call = (live_call == 300) ? 400 : 300;
- live_call_obj = live_call == 300 ? c1 : c2;
- cq_expect_invoke_accepted(v_client, tag(live_call), GRPC_OP_OK);
- cq_verify(v_client);
-
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_writes_done(live_call_obj, tag(live_call + 3)));
cq_expect_finish_accepted(v_client, tag(live_call + 3), GRPC_OP_OK);
+ live_call = (live_call == 300) ? 400 : 300;
cq_verify(v_client);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(200)));
diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c
index 03d549a7b4..6768bd8aa9 100644
--- a/test/core/end2end/tests/ping_pong_streaming.c
+++ b/test/core/end2end/tests/ping_pong_streaming.c
@@ -122,8 +122,7 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
index f58bf77dfd..1dd798dc8d 100644
--- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
@@ -145,9 +145,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(c, &meta2, 0));
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));
diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
index 09923b2fc5..cfc9b61f56 100644
--- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
@@ -136,9 +136,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(c, &meta2, 0));
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));
diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c
index be65bf1567..32bf5129ff 100644
--- a/test/core/end2end/tests/request_response_with_payload.c
+++ b/test/core/end2end/tests/request_response_with_payload.c
@@ -125,9 +125,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));
diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
index d99141e024..4f1de8b466 100644
--- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
@@ -138,9 +138,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(c, &meta2, 0));
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));
diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c
index e2f554b322..83628449a2 100644
--- a/test/core/end2end/tests/request_with_large_metadata.c
+++ b/test/core/end2end/tests/request_with_large_metadata.c
@@ -128,9 +128,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(c, &meta, 0));
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com",
deadline, "key", meta.value, NULL);
diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c
index 09b3c864fd..a352783965 100644
--- a/test/core/end2end/tests/request_with_payload.c
+++ b/test/core/end2end/tests/request_with_payload.c
@@ -122,9 +122,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, tag(100)));
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write(c, payload, tag(4), 0));
/* destroy byte buffer early to ensure async code keeps track of its contents
diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c
index 90ed227749..1e15eaa9cc 100644
--- a/test/core/end2end/tests/simple_delayed_request.c
+++ b/test/core/end2end/tests/simple_delayed_request.c
@@ -106,10 +106,8 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline);
GPR_ASSERT(c);
- GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c, f->client_cq, tag(1),
- tag(2), tag(3), 0));
- gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_micros(delay_us)));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_invoke(c, f->client_cq, tag(2), tag(3), 0));
config.init_server(f, server_args);
diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c
index 93dfa1fb0a..23fc201d84 100644
--- a/test/core/end2end/tests/simple_request.c
+++ b/test/core/end2end/tests/simple_request.c
@@ -113,9 +113,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
@@ -161,9 +159,7 @@ static void simple_request_body2(grpc_end2end_test_fixture f) {
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4)));
cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK);
diff --git a/test/core/end2end/tests/thread_stress.c b/test/core/end2end/tests/thread_stress.c
index 5410258201..8fdb765951 100644
--- a/test/core/end2end/tests/thread_stress.c
+++ b/test/core/end2end/tests/thread_stress.c
@@ -106,25 +106,30 @@ static void drain_cq(int client, grpc_completion_queue *cq) {
/* Kick off a new request - assumes g_mu taken */
static void start_request(void) {
+ gpr_slice slice = gpr_slice_malloc(100);
+ grpc_byte_buffer *buf;
grpc_call *call = grpc_channel_create_call(
g_fixture.client, "/Foo", "test.google.com", g_test_end_time);
+
+ memset(GPR_SLICE_START_PTR(slice), 1, GPR_SLICE_LENGTH(slice));
+ buf = grpc_byte_buffer_create(&slice, 1);
+ gpr_slice_unref(slice);
+
g_active_requests++;
- GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(call, g_fixture.client_cq,
- NULL, NULL, NULL, 0));
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_invoke(call, g_fixture.client_cq, NULL, NULL, 0));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(call, NULL));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write(call, buf, NULL, 0));
+
+ grpc_byte_buffer_destroy(buf);
}
/* Async client: handle sending requests, reading responses, and starting
new requests when old ones finish */
static void client_thread(void *p) {
- int id = (gpr_intptr)p;
+ gpr_intptr id = (gpr_intptr)p;
grpc_event *ev;
- gpr_slice slice = gpr_slice_malloc(100);
- grpc_byte_buffer *buf;
char *estr;
- memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice));
-
- buf = grpc_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
for (;;) {
ev = grpc_completion_queue_next(g_fixture.client_cq, n_seconds_time(1));
@@ -135,14 +140,6 @@ static void client_thread(void *p) {
gpr_log(GPR_ERROR, "unexpected event: %s", estr);
gpr_free(estr);
break;
- case GRPC_INVOKE_ACCEPTED:
- /* better not keep going if the invoke failed */
- if (ev->data.invoke_accepted == GRPC_OP_OK) {
- GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, NULL));
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_write(ev->call, buf, NULL, 0));
- }
- break;
case GRPC_READ:
break;
case GRPC_WRITE_ACCEPTED:
@@ -173,7 +170,6 @@ static void client_thread(void *p) {
gpr_mu_unlock(&g_mu);
}
- grpc_byte_buffer_destroy(buf);
gpr_event_set(&g_client_done[id], (void *)1);
}
@@ -196,17 +192,17 @@ static void maybe_end_server_call(grpc_call *call, gpr_refcount *rc) {
static void server_thread(void *p) {
int id = (gpr_intptr)p;
- grpc_event *ev;
gpr_slice slice = gpr_slice_malloc(100);
grpc_byte_buffer *buf;
+ grpc_event *ev;
char *estr;
- memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice));
-
- request_server_call();
+ memset(GPR_SLICE_START_PTR(slice), 1, GPR_SLICE_LENGTH(slice));
buf = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
+ request_server_call();
+
for (;;) {
ev = grpc_completion_queue_next(g_fixture.server_cq, n_seconds_time(1));
if (ev) {
diff --git a/test/core/end2end/tests/writes_done_hangs_with_pending_read.c b/test/core/end2end/tests/writes_done_hangs_with_pending_read.c
index 9878b4ce9a..eea459459a 100644
--- a/test/core/end2end/tests/writes_done_hangs_with_pending_read.c
+++ b/test/core/end2end/tests/writes_done_hangs_with_pending_read.c
@@ -128,9 +128,7 @@ static void test_writes_done_hangs_with_pending_read(
GPR_ASSERT(c);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(c, f.client_cq, tag(1), tag(2), tag(3), 0));
- cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK);
- cq_verify(v_client);
+ grpc_call_invoke(c, f.client_cq, tag(2), tag(3), 0));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_write(c, request_payload, tag(4), 0));
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index 7e93860dc3..7eb195811b 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -55,9 +55,8 @@ static void init_ping_pong_request(void) {}
static void step_ping_pong_request(void) {
call = grpc_channel_create_call(channel, "/Reflector/reflectUnary",
"localhost", gpr_inf_future);
- GPR_ASSERT(grpc_call_start_invoke(call, cq, (void *)1, (void *)1, (void *)1,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
+ GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1,
+ GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
GPR_ASSERT(grpc_call_start_write(call, the_buffer, (void *)1,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
@@ -66,7 +65,6 @@ static void step_ping_pong_request(void) {
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
- grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
grpc_call_destroy(call);
call = NULL;
}
@@ -74,9 +72,8 @@ static void step_ping_pong_request(void) {
static void init_ping_pong_stream(void) {
call = grpc_channel_create_call(channel, "/Reflector/reflectStream",
"localhost", gpr_inf_future);
- GPR_ASSERT(grpc_call_start_invoke(call, cq, (void *)1, (void *)1, (void *)1,
- 0) == GRPC_CALL_OK);
- grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
+ GPR_ASSERT(grpc_call_invoke(call, cq, (void *)1, (void *)1, 0) ==
+ GRPC_CALL_OK);
grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future));
}
diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c
index 0520a39ea2..9b9f0202d6 100644
--- a/test/core/surface/lame_client_test.c
+++ b/test/core/surface/lame_client_test.c
@@ -62,11 +62,9 @@ int main(int argc, char **argv) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_add_metadata(call, &md, 0));
/* and invoke the call */
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_invoke(call, cq, tag(1), tag(2), tag(3), 0));
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_invoke(call, cq, tag(2), tag(3), 0));
/* the call should immediately fail */
- cq_expect_invoke_accepted(cqv, tag(1), GRPC_OP_ERROR);
cq_expect_client_metadata_read(cqv, tag(2), NULL);
cq_expect_finished(cqv, tag(3), NULL);
cq_verify(cqv);
diff --git a/third_party/libevent b/third_party/libevent
new file mode 160000
+Subproject f7d92c63928a1460f3d99b9bc418bd3b686a0dc