aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-23 14:57:29 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-23 14:57:29 -0800
commit5d0d33bbdeeb9c84b5bb15c5fad7ab50bd5da7e3 (patch)
treea1e1b12efec98ee0901f2f78053c8e6f646dc4cd /src
parentefcb4da8e005a5c0f24a64f4943095fd4e60fdc0 (diff)
parentc0748898293935517ca390977a55db6ea1424f50 (diff)
Merge branch 'kill-more-printf' of github.com:ctiller/grpc into kill-more-printf
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/byte_buffer.c11
-rw-r--r--src/core/surface/call.c184
-rw-r--r--src/cpp/client/channel.cc15
-rw-r--r--src/cpp/stream/stream_context.cc12
-rw-r--r--src/cpp/stream/stream_context.h1
-rw-r--r--src/node/binding.gyp35
-rw-r--r--src/node/call.cc31
-rw-r--r--src/node/call.h2
-rw-r--r--src/node/client.js99
-rw-r--r--src/node/interop/interop_server.js3
-rw-r--r--src/node/node_grpc.cc2
-rw-r--r--src/node/test/call_test.js42
-rw-r--r--src/node/test/constant_test.js1
-rw-r--r--src/node/test/end_to_end_test.js58
-rw-r--r--src/node/test/interop_sanity_test.js4
-rw-r--r--src/node/test/server_test.js41
-rw-r--r--src/php/ext/grpc/call.c21
-rw-r--r--src/php/ext/grpc/php_grpc.c4
-rwxr-xr-xsrc/php/lib/Grpc/ActiveCall.php3
-rwxr-xr-xsrc/php/tests/unit_tests/CallTest.php6
-rwxr-xr-xsrc/php/tests/unit_tests/EndToEndTest.php24
-rwxr-xr-xsrc/php/tests/unit_tests/SecureEndToEndTest.php24
-rw-r--r--src/python/_framework/foundation/_logging_pool_test.py2
-rw-r--r--src/ruby/ext/grpc/rb_call.c20
-rw-r--r--src/ruby/ext/grpc/rb_event.c6
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb28
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb4
-rw-r--r--src/ruby/spec/call_spec.rb16
-rw-r--r--src/ruby/spec/client_server_spec.rb8
-rw-r--r--src/ruby/spec/event_spec.rb3
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb56
31 files changed, 355 insertions, 411 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 27a6c6e33d..d1be41074d 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -49,6 +49,17 @@ grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
return bb;
}
+grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
+ switch (bb->type) {
+ case GRPC_BB_SLICE_BUFFER:
+ return grpc_byte_buffer_create(bb->data.slice_buffer.slices,
+ bb->data.slice_buffer.count);
+ }
+ gpr_log(GPR_INFO, "should never get here");
+ abort();
+ return NULL;
+}
+
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 1d8315a4b6..14d990df6a 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -173,11 +173,14 @@ struct grpc_call {
/* protects variables in this section */
gpr_mu read_mu;
+ gpr_uint8 received_start;
+ gpr_uint8 start_ok;
gpr_uint8 reads_done;
gpr_uint8 received_finish;
gpr_uint8 received_metadata;
gpr_uint8 have_read;
gpr_uint8 have_alarm;
+ gpr_uint8 pending_writes_done;
gpr_uint8 got_status_code;
/* The current outstanding read message tag (only valid if have_read == 1) */
void *read_tag;
@@ -190,6 +193,8 @@ struct grpc_call {
/* The current outstanding send message/context/invoke/end tag (only valid if
have_write == 1) */
void *write_tag;
+ grpc_byte_buffer *pending_write;
+ gpr_uint32 pending_write_flags;
/* The final status of the call */
grpc_status_code status_code;
@@ -227,11 +232,15 @@ grpc_call *grpc_call_create(grpc_channel *channel,
call->have_alarm = 0;
call->received_metadata = 0;
call->got_status_code = 0;
+ call->start_ok = 0;
call->status_code =
server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
call->status_details = NULL;
call->received_finish = 0;
call->reads_done = 0;
+ call->received_start = 0;
+ call->pending_write = NULL;
+ call->pending_writes_done = 0;
grpc_metadata_buffer_init(&call->incoming_metadata);
gpr_ref_init(&call->internal_refcount, 1);
grpc_call_stack_init(channel_stack, server_transport_data,
@@ -360,16 +369,6 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
return GRPC_CALL_OK;
}
-static void done_invoke(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
static void finish_call(grpc_call *call) {
size_t count;
grpc_metadata *elements;
@@ -384,11 +383,81 @@ static void finish_call(grpc_call *call) {
elements, count);
}
-grpc_call_error grpc_call_start_invoke(grpc_call *call,
- grpc_completion_queue *cq,
- void *invoke_accepted_tag,
- void *metadata_read_tag,
- void *finished_tag, gpr_uint32 flags) {
+static void done_write(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void done_writes_done(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ void *tag = call->write_tag;
+
+ GPR_ASSERT(call->have_write);
+ call->have_write = 0;
+ call->write_tag = INVALID_TAG;
+ grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
+}
+
+static void call_started(void *user_data, grpc_op_error error) {
+ grpc_call *call = user_data;
+ grpc_call_element *elem;
+ grpc_byte_buffer *pending_write = NULL;
+ gpr_uint32 pending_write_flags = 0;
+ gpr_uint8 pending_writes_done = 0;
+ int ok;
+ grpc_call_op op;
+
+ gpr_mu_lock(&call->read_mu);
+ GPR_ASSERT(!call->received_start);
+ call->received_start = 1;
+ ok = call->start_ok = (error == GRPC_OP_OK);
+ pending_write = call->pending_write;
+ pending_write_flags = call->pending_write_flags;
+ pending_writes_done = call->pending_writes_done;
+ gpr_mu_unlock(&call->read_mu);
+
+ if (pending_write) {
+ if (ok) {
+ op.type = GRPC_SEND_MESSAGE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = pending_write_flags;
+ op.done_cb = done_write;
+ op.user_data = call;
+ op.data.message = pending_write;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ } else {
+ done_write(call, error);
+ }
+ grpc_byte_buffer_destroy(pending_write);
+ }
+ if (pending_writes_done) {
+ if (ok) {
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ } else {
+ done_writes_done(call, error);
+ }
+ }
+
+ grpc_call_internal_unref(call);
+}
+
+grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
+ void *metadata_read_tag, void *finished_tag,
+ gpr_uint32 flags) {
grpc_call_element *elem;
grpc_call_op op;
@@ -420,7 +489,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
/* inform the completion queue of an incoming operation */
grpc_cq_begin_op(cq, call, GRPC_FINISHED);
grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
- grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED);
gpr_mu_lock(&call->read_mu);
@@ -431,8 +499,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
if (call->received_finish) {
/* handle early cancellation */
- grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL,
- GRPC_OP_ERROR);
grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL,
NULL, 0, NULL);
finish_call(call);
@@ -442,20 +508,18 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call,
return GRPC_CALL_OK;
}
- call->write_tag = invoke_accepted_tag;
call->metadata_tag = metadata_read_tag;
- call->have_write = 1;
-
gpr_mu_unlock(&call->read_mu);
/* call down the filter stack */
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
- op.done_cb = done_invoke;
+ op.done_cb = call_started;
op.data.start.pollset = grpc_cq_pollset(cq);
op.user_data = call;
+ grpc_call_internal_ref(call);
elem = CALL_ELEM_FROM_CALL(call, 0);
elem->filter->call_op(elem, NULL, &op);
@@ -486,6 +550,7 @@ grpc_call_error grpc_call_server_accept(grpc_call *call,
call->state = CALL_BOUNDCQ;
call->cq = cq;
call->finished_tag = finished_tag;
+ call->received_start = 1;
if (prq_is_empty(&call->prq) && call->received_finish) {
finish_call(call);
@@ -535,26 +600,6 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
return GRPC_CALL_OK;
}
-static void done_writes_done(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
-static void done_write(void *user_data, grpc_op_error error) {
- grpc_call *call = user_data;
- void *tag = call->write_tag;
-
- GPR_ASSERT(call->have_write);
- call->have_write = 0;
- call->write_tag = INVALID_TAG;
- grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error);
-}
-
void grpc_call_client_initial_metadata_complete(
grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element);
@@ -617,7 +662,7 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
} else {
call->read_tag = tag;
call->have_read = 1;
- request_more = 1;
+ request_more = call->received_start;
}
} else if (prq_is_empty(&call->prq) && call->received_finish) {
finish_call(call);
@@ -654,8 +699,6 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
- /* for now we do no buffering, so a NULL byte_buffer can have no impact
- on our behavior -- succeed immediately */
/* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a
flush, and that flush should be propogated down from here */
if (byte_buffer == NULL) {
@@ -666,15 +709,25 @@ grpc_call_error grpc_call_start_write(grpc_call *call,
call->write_tag = tag;
call->have_write = 1;
- op.type = GRPC_SEND_MESSAGE;
- op.dir = GRPC_CALL_DOWN;
- op.flags = flags;
- op.done_cb = done_write;
- op.user_data = call;
- op.data.message = byte_buffer;
+ gpr_mu_lock(&call->read_mu);
+ if (!call->received_start) {
+ call->pending_write = grpc_byte_buffer_copy(byte_buffer);
+ call->pending_write_flags = flags;
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ gpr_mu_unlock(&call->read_mu);
+ } else {
+ gpr_mu_unlock(&call->read_mu);
+
+ op.type = GRPC_SEND_MESSAGE;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = flags;
+ op.done_cb = done_write;
+ op.user_data = call;
+ op.data.message = byte_buffer;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ }
return GRPC_CALL_OK;
}
@@ -706,14 +759,23 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
call->write_tag = tag;
call->have_write = 1;
- op.type = GRPC_SEND_FINISH;
- op.dir = GRPC_CALL_DOWN;
- op.flags = 0;
- op.done_cb = done_writes_done;
- op.user_data = call;
+ gpr_mu_lock(&call->read_mu);
+ if (!call->received_start) {
+ call->pending_writes_done = 1;
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->call_op(elem, NULL, &op);
+ gpr_mu_unlock(&call->read_mu);
+ } else {
+ gpr_mu_unlock(&call->read_mu);
+
+ op.type = GRPC_SEND_FINISH;
+ op.dir = GRPC_CALL_DOWN;
+ op.flags = 0;
+ op.done_cb = done_writes_done;
+ op.user_data = call;
+
+ elem = CALL_ELEM_FROM_CALL(call, 0);
+ elem->filter->call_op(elem, NULL, &op);
+ }
return GRPC_CALL_OK;
}
@@ -818,6 +880,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdelem *md = op->data.metadata;
grpc_mdstr *key = md->key;
+ gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call,
+ grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value));
if (key == grpc_channel_get_status_string(call->channel)) {
maybe_set_status_code(call, decode_status(md));
grpc_mdelem_unref(md);
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index a8919a10d9..c8b2bb2cf6 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -104,7 +104,6 @@ Status Channel::StartBlockingRpc(const RpcMethod &method,
context->set_call(call);
grpc_event *ev;
void *finished_tag = reinterpret_cast<char *>(call);
- void *invoke_tag = reinterpret_cast<char *>(call) + 1;
void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;
void *write_tag = reinterpret_cast<char *>(call) + 3;
void *halfclose_tag = reinterpret_cast<char *>(call) + 4;
@@ -115,19 +114,11 @@ Status Channel::StartBlockingRpc(const RpcMethod &method,
// add_metadata from context
//
// invoke
- GPR_ASSERT(grpc_call_start_invoke(call, cq, invoke_tag, metadata_read_tag,
- finished_tag,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- ev = grpc_completion_queue_pluck(cq, invoke_tag, gpr_inf_future);
- bool success = ev->data.invoke_accepted == GRPC_OP_OK;
- grpc_event_finish(ev);
- if (!success) {
- GetFinalStatus(cq, finished_tag, &status);
- return status;
- }
+ GPR_ASSERT(grpc_call_invoke(call, cq, metadata_read_tag, finished_tag,
+ GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
// write request
grpc_byte_buffer *write_buffer = nullptr;
- success = SerializeProto(request, &write_buffer);
+ bool success = SerializeProto(request, &write_buffer);
if (!success) {
grpc_call_cancel(call);
status =
diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc
index e64010be64..edb2fc5ad9 100644
--- a/src/cpp/stream/stream_context.cc
+++ b/src/cpp/stream/stream_context.cc
@@ -80,17 +80,9 @@ void StreamContext::Start(bool buffered) {
if (is_client_) {
// TODO(yangg) handle metadata send path
int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0;
- grpc_call_error error = grpc_call_start_invoke(call(), cq(), invoke_tag(),
- client_metadata_read_tag(),
- finished_tag(), flag);
+ grpc_call_error error = grpc_call_invoke(
+ call(), cq(), client_metadata_read_tag(), finished_tag(), flag);
GPR_ASSERT(GRPC_CALL_OK == error);
- grpc_event *invoke_ev =
- grpc_completion_queue_pluck(cq(), invoke_tag(), gpr_inf_future);
- if (invoke_ev->data.invoke_accepted != GRPC_OP_OK) {
- peer_halfclosed_ = true;
- self_halfclosed_ = true;
- }
- grpc_event_finish(invoke_ev);
} else {
// TODO(yangg) metadata needs to be added before accept
// TODO(yangg) correctly set flag to accept
diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h
index 8697d86e83..8def589841 100644
--- a/src/cpp/stream/stream_context.h
+++ b/src/cpp/stream/stream_context.h
@@ -76,7 +76,6 @@ class StreamContext final : public StreamContextInterface {
void *read_tag() { return reinterpret_cast<char *>(this) + 1; }
void *write_tag() { return reinterpret_cast<char *>(this) + 2; }
void *halfclose_tag() { return reinterpret_cast<char *>(this) + 3; }
- void *invoke_tag() { return reinterpret_cast<char *>(this) + 4; }
void *client_metadata_read_tag() {
return reinterpret_cast<char *>(this) + 5;
}
diff --git a/src/node/binding.gyp b/src/node/binding.gyp
index da4a943491..fe4b5da9c8 100644
--- a/src/node/binding.gyp
+++ b/src/node/binding.gyp
@@ -1,8 +1,13 @@
{
+ "variables" : {
+ 'no_install': "<!(echo $GRPC_NO_INSTALL)",
+ 'grpc_root': "<!(echo $GRPC_ROOT)",
+ 'grpc_lib_subdir': "<!(echo $GRPC_LIB_SUBDIR)"
+ },
"targets" : [
{
'include_dirs': [
- "<!(node -e \"require('nan')\")"
+ "<!(nodejs -e \"require('nan')\")"
],
'cxxflags': [
'-Wall',
@@ -11,16 +16,13 @@
'-g',
'-zdefs'
'-Werror',
- ],
+ ],
'ldflags': [
- '-g',
- '-L/usr/local/google/home/mlumish/grpc_dev/lib'
+ '-g'
],
'link_settings': {
'libraries': [
- '-lgrpc',
'-lrt',
- '-lgpr',
'-lpthread'
],
},
@@ -37,6 +39,27 @@
"server_credentials.cc",
"tag.cc",
"timeval.cc"
+ ],
+ 'conditions' : [
+ ['no_install=="yes"', {
+ 'include_dirs': [
+ "<(grpc_root)/include"
+ ],
+ 'link_settings': {
+ 'libraries': [
+ '<(grpc_root)/<(grpc_lib_subdir)/libgrpc.a',
+ '<(grpc_root)/<(grpc_lib_subdir)/libgpr.a'
+ ]
+ }
+ }],
+ ['no_install!="yes"', {
+ 'link_settings': {
+ 'libraries': [
+ '-lgrpc',
+ '-lgpr'
+ ]
+ }
+ }]
]
}
]
diff --git a/src/node/call.cc b/src/node/call.cc
index b8ee1786a6..6434c2f0d5 100644
--- a/src/node/call.cc
+++ b/src/node/call.cc
@@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) {
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "addMetadata",
FunctionTemplate::New(AddMetadata)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startInvoke",
- FunctionTemplate::New(StartInvoke)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "invoke",
+ FunctionTemplate::New(Invoke)->GetFunction());
NanSetPrototypeTemplate(tpl, "serverAccept",
FunctionTemplate::New(ServerAccept)->GetFunction());
NanSetPrototypeTemplate(
@@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) {
NanReturnUndefined();
}
-NAN_METHOD(Call::StartInvoke) {
+NAN_METHOD(Call::Invoke) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("startInvoke can only be called on Call objects");
+ return NanThrowTypeError("invoke can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
- return NanThrowTypeError("StartInvoke's first argument must be a function");
+ return NanThrowTypeError("invoke's first argument must be a function");
}
if (!args[1]->IsFunction()) {
- return NanThrowTypeError(
- "StartInvoke's second argument must be a function");
- }
- if (!args[2]->IsFunction()) {
- return NanThrowTypeError("StartInvoke's third argument must be a function");
+ return NanThrowTypeError("invoke's second argument must be a function");
}
- if (!args[3]->IsUint32()) {
- return NanThrowTypeError(
- "StartInvoke's fourth argument must be integer flags");
+ if (!args[2]->IsUint32()) {
+ return NanThrowTypeError("invoke's third argument must be integer flags");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
unsigned int flags = args[3]->Uint32Value();
- grpc_call_error error = grpc_call_start_invoke(
+ grpc_call_error error = grpc_call_invoke(
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
- CreateTag(args[0], args.This()), CreateTag(args[1], args.This()),
- CreateTag(args[2], args.This()), flags);
+ CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
- CompletionQueueAsyncWorker::Next();
} else {
- return NanThrowError("startInvoke failed", error);
+ return NanThrowError("invoke failed", error);
}
NanReturnUndefined();
}
@@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) {
NAN_METHOD(Call::Cancel) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("startInvoke can only be called on Call objects");
+ return NanThrowTypeError("cancel can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call);
diff --git a/src/node/call.h b/src/node/call.h
index 55a6fc65b8..1924a1bf42 100644
--- a/src/node/call.h
+++ b/src/node/call.h
@@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap {
static NAN_METHOD(New);
static NAN_METHOD(AddMetadata);
- static NAN_METHOD(StartInvoke);
+ static NAN_METHOD(Invoke);
static NAN_METHOD(ServerAccept);
static NAN_METHOD(ServerEndInitialMetadata);
static NAN_METHOD(Cancel);
diff --git a/src/node/client.js b/src/node/client.js
index f913b06f29..2fefd14bbc 100644
--- a/src/node/client.js
+++ b/src/node/client.js
@@ -62,12 +62,9 @@ function GrpcClientStream(call, serialize, deserialize) {
};
}
var self = this;
- // Indicates that we can start reading and have not received a null read
- var can_read = false;
+ var finished = false;
// Indicates that a read is currently pending
var reading = false;
- // Indicates that we can call startWrite
- var can_write = false;
// Indicates that a write is currently pending
var writing = false;
this._call = call;
@@ -98,91 +95,46 @@ function GrpcClientStream(call, serialize, deserialize) {
return deserialize(buffer);
};
/**
- * Callback to handle receiving a READ event. Pushes the data from that event
- * onto the read queue and starts reading again if applicable.
- * @param {grpc.Event} event The READ event object
+ * Callback to be called when a READ event is received. Pushes the data onto
+ * the read queue and starts reading again if applicable
+ * @param {grpc.Event} event READ event object
*/
function readCallback(event) {
+ if (finished) {
+ self.push(null);
+ return;
+ }
var data = event.data;
- if (self.push(self.deserialize(data))) {
- if (data == null) {
- // Disable starting to read after null read was received
- can_read = false;
- reading = false;
- } else {
- call.startRead(readCallback);
- }
+ if (self.push(data) && data != null) {
+ self._call.startRead(readCallback);
} else {
- // Indicate that reading can be resumed by calling startReading
reading = false;
}
- };
- /**
- * Initiate a read, which continues until self.push returns false (indicating
- * that reading should be paused) or data is null (indicating that there is no
- * more data to read).
- */
- function startReading() {
- call.startRead(readCallback);
- }
- // TODO(mlumish): possibly change queue implementation due to shift slowness
- var write_queue = [];
- /**
- * Write the next chunk of data in the write queue if there is one. Otherwise
- * indicate that there is no pending write. When the write succeeds, this
- * function is called again.
- */
- function writeNext() {
- if (write_queue.length > 0) {
- writing = true;
- var next = write_queue.shift();
- var writeCallback = function(event) {
- next.callback();
- writeNext();
- };
- call.startWrite(self.serialize(next.chunk), writeCallback, 0);
- } else {
- writing = false;
- }
}
- call.startInvoke(function(event) {
- can_read = true;
- can_write = true;
- startReading();
- writeNext();
- }, function(event) {
+ call.invoke(function(event) {
self.emit('metadata', event.data);
}, function(event) {
+ finished = true;
self.emit('status', event.data);
}, 0);
this.on('finish', function() {
call.writesDone(function() {});
});
/**
- * Indicate that reads should start, and start them if the INVOKE_ACCEPTED
- * event has been received.
+ * Start reading if there is not already a pending read. Reading will
+ * continue until self.push returns false (indicating reads should slow
+ * down) or the read data is null (indicating that there is no more data).
*/
- this._enableRead = function() {
- if (!reading) {
- reading = true;
- if (can_read) {
- startReading();
+ this.startReading = function() {
+ if (finished) {
+ self.push(null);
+ } else {
+ if (!reading) {
+ reading = true;
+ self._call.startRead(readCallback);
}
}
};
- /**
- * Push the chunk onto the write queue, and write from the write queue if
- * there is not a pending write
- * @param {Buffer} chunk The chunk of data to write
- * @param {function(Error=)} callback The callback to call when the write
- * completes
- */
- this._tryWrite = function(chunk, callback) {
- write_queue.push({chunk: chunk, callback: callback});
- if (can_write && !writing) {
- writeNext();
- }
- };
}
/**
@@ -191,7 +143,7 @@ function GrpcClientStream(call, serialize, deserialize) {
* @param {number} size Ignored
*/
GrpcClientStream.prototype._read = function(size) {
- this._enableRead();
+ this.startReading();
};
/**
@@ -202,7 +154,10 @@ GrpcClientStream.prototype._read = function(size) {
* @param {function(Error=)} callback Ignored
*/
GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
- this._tryWrite(chunk, callback);
+ var self = this;
+ self._call.startWrite(chunk, function(event) {
+ callback();
+ }, 0);
};
/**
diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index 6d2bd7ae0d..ebf847876c 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -194,7 +194,8 @@ if (require.main === module) {
string: ['port', 'use_tls']
});
var server_obj = getServer(argv.port, argv.use_tls === 'true');
- server_obj.server.start();
+ console.log('Server attaching to port ' + argv.port);
+ server_obj.server.listen();
}
/**
diff --git a/src/node/node_grpc.cc b/src/node/node_grpc.cc
index acee0386d2..bc1dfaf899 100644
--- a/src/node/node_grpc.cc
+++ b/src/node/node_grpc.cc
@@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) {
completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
completion_type->Set(NanNew("READ"), READ);
- Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
- completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index e6dc9664f1..6e52ec89bd 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -118,12 +118,11 @@ describe('call', function() {
call.addMetadata(5);
}, TypeError);
});
- it('should fail if startInvoke was already called', function(done) {
+ it('should fail if invoke was already called', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.startInvoke(function() {},
- function() {},
- function() {done();},
- 0);
+ call.invoke(function() {},
+ function() {done();},
+ 0);
assert.throws(function() {
call.addMetadata({'key' : 'key', 'value' : new Buffer('value') });
}, function(err) {
@@ -133,32 +132,26 @@ describe('call', function() {
call.cancel();
});
});
- describe('startInvoke', function() {
- it('should fail with fewer than 4 arguments', function() {
+ describe('invoke', function() {
+ it('should fail with fewer than 3 arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.startInvoke();
+ call.invoke();
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {});
+ call.invoke(function() {});
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {},
- function() {});
- }, TypeError);
- assert.throws(function() {
- call.startInvoke(function() {},
- function() {},
- function() {});
+ call.invoke(function() {},
+ function() {});
}, TypeError);
});
- it('should work with 3 args and an int', function(done) {
+ it('should work with 2 args and an int', function(done) {
assert.doesNotThrow(function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.startInvoke(function() {},
- function() {},
- function() {done();},
- 0);
+ call.invoke(function() {},
+ function() {done();},
+ 0);
// Cancel to speed up the test
call.cancel();
});
@@ -166,12 +159,11 @@ describe('call', function() {
it('should reject incorrectly typed arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.startInvoke(0, 0, 0, 0);
+ call.invoke(0, 0, 0);
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {},
- function() {},
- function() {}, 'test');
+ call.invoke(function() {},
+ function() {}, 'test');
});
});
});
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index f65eea3cff..0138a55226 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -94,7 +94,6 @@ var opErrorNames = [
var completionTypeNames = [
'QUEUE_SHUTDOWN',
'READ',
- 'INVOKE_ACCEPTED',
'WRITE_ACCEPTED',
'FINISH_ACCEPTED',
'CLIENT_METADATA_READ',
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index db3834dbba..f7ccbcf5f2 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -70,16 +70,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
-
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
},function(event) {
@@ -109,8 +100,12 @@ describe('end-to-end', function() {
done();
});
});
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ });
});
-
it('should send and receive data without error', function(complete) {
var req_text = 'client_request';
var reply_text = 'server_response';
@@ -127,28 +122,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), reply_text);
- done();
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@@ -159,6 +133,24 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text);
done();
}, 0);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), reply_text);
+ done();
+ });
server.start();
server.requestCall(function(event) {
diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js
index 410b050e8d..3c062b9788 100644
--- a/src/node/test/interop_sanity_test.js
+++ b/src/node/test/interop_sanity_test.js
@@ -52,7 +52,8 @@ describe('Interop tests', function() {
it('should pass empty_unary', function(done) {
interop_client.runTest(port, name_override, 'empty_unary', true, done);
});
- it('should pass large_unary', function(done) {
+ // This fails due to an unknown bug
+ it.skip('should pass large_unary', function(done) {
interop_client.runTest(port, name_override, 'large_unary', true, done);
});
it('should pass client_streaming', function(done) {
@@ -64,7 +65,6 @@ describe('Interop tests', function() {
it('should pass ping_pong', function(done) {
interop_client.runTest(port, name_override, 'ping_pong', true, done);
});
- // This depends on the new invoke API
it.skip('should pass empty_stream', function(done) {
interop_client.runTest(port, name_override, 'empty_stream', true, done);
});
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
index 61aef4677e..457d13d2f5 100644
--- a/src/node/test/server_test.js
+++ b/src/node/test/server_test.js
@@ -81,28 +81,7 @@ describe('echo server', function() {
var call = new grpc.Call(channel,
'echo',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), req_text);
- done();
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@@ -114,5 +93,23 @@ describe('echo server', function() {
server.shutdown();
done();
}, 0);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), req_text);
+ done();
+ });
});
});
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index 410efbce68..b171c9c176 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -224,27 +224,25 @@ PHP_METHOD(Call, add_metadata) {
/**
* Invoke the RPC. Starts sending metadata and request headers over the wire
* @param CompletionQueue $queue The completion queue to use with this call
- * @param long $invoke_accepted_tag The tag to associate with this invocation
* @param long $metadata_tag The tag to associate with returned metadata
* @param long $finished_tag The tag to associate with the finished event
* @param long $flags A bitwise combination of the Grpc\WRITE_* constants
* (optional)
* @return Void
*/
-PHP_METHOD(Call, start_invoke) {
+PHP_METHOD(Call, invoke) {
grpc_call_error error_code;
long tag1;
long tag2;
- long tag3;
zval *queue_obj;
long flags = 0;
- /* "Olll|l" == 1 Object, 3 mandatory longs, 1 optional long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Olll|l", &queue_obj,
- grpc_ce_completion_queue, &tag1, &tag2, &tag3,
+ /* "Oll|l" == 1 Object, 3 mandatory longs, 1 optional long */
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Oll|l", &queue_obj,
+ grpc_ce_completion_queue, &tag1, &tag2,
&flags) == FAILURE) {
zend_throw_exception(
spl_ce_InvalidArgumentException,
- "start_invoke needs a CompletionQueue, 3 longs, and an optional long",
+ "invoke needs a CompletionQueue, 2 longs, and an optional long",
1 TSRMLS_CC);
return;
}
@@ -254,10 +252,9 @@ PHP_METHOD(Call, start_invoke) {
wrapped_grpc_completion_queue *queue =
(wrapped_grpc_completion_queue *)zend_object_store_get_object(
queue_obj TSRMLS_CC);
- error_code =
- grpc_call_start_invoke(call->wrapped, queue->wrapped, (void *)tag1,
- (void *)tag2, (void *)tag3, (gpr_uint32)flags);
- MAYBE_THROW_CALL_ERROR(start_invoke, error_code);
+ error_code = grpc_call_invoke(call->wrapped, queue->wrapped, (void *)tag1,
+ (void *)tag2, (gpr_uint32)flags);
+ MAYBE_THROW_CALL_ERROR(invoke, error_code);
}
/**
@@ -427,7 +424,7 @@ static zend_function_entry call_methods[] = {
PHP_ME(Call, server_end_initial_metadata, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, add_metadata, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, start_invoke, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, invoke, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, start_read, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, start_write, NULL, ZEND_ACC_PUBLIC)
PHP_ME(Call, start_write_status, NULL, ZEND_ACC_PUBLIC)
diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c
index e8b4643a58..492ac06739 100644
--- a/src/php/ext/grpc/php_grpc.c
+++ b/src/php/ext/grpc/php_grpc.c
@@ -107,11 +107,9 @@ PHP_MINIT_FUNCTION(grpc) {
/* Register completion type constants */
REGISTER_LONG_CONSTANT("Grpc\\QUEUE_SHUTDOWN", GRPC_QUEUE_SHUTDOWN, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\READ", GRPC_READ, CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\INVOKE_ACCEPTED", GRPC_INVOKE_ACCEPTED,
- CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\WRITE_ACCEPTED", GRPC_WRITE_ACCEPTED, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\FINISH_ACCEPTED", GRPC_FINISH_ACCEPTED,
CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\WRITE_ACCEPTED", GRPC_WRITE_ACCEPTED, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\CLIENT_METADATA_READ",
GRPC_CLIENT_METADATA_READ, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\FINISHED", GRPC_FINISHED, CONST_CS);
diff --git a/src/php/lib/Grpc/ActiveCall.php b/src/php/lib/Grpc/ActiveCall.php
index aa66dbb848..836a4b09e3 100755
--- a/src/php/lib/Grpc/ActiveCall.php
+++ b/src/php/lib/Grpc/ActiveCall.php
@@ -29,11 +29,8 @@ class ActiveCall {
// Invoke the call.
$this->call->start_invoke($this->completion_queue,
- INVOKE_ACCEPTED,
CLIENT_METADATA_READ,
FINISHED, 0);
- $this->completion_queue->pluck(INVOKE_ACCEPTED,
- Timeval::inf_future());
$metadata_event = $this->completion_queue->pluck(CLIENT_METADATA_READ,
Timeval::inf_future());
$this->metadata = $metadata_event->data;
diff --git a/src/php/tests/unit_tests/CallTest.php b/src/php/tests/unit_tests/CallTest.php
index 253052a038..795831cb65 100755
--- a/src/php/tests/unit_tests/CallTest.php
+++ b/src/php/tests/unit_tests/CallTest.php
@@ -19,10 +19,10 @@ class CallTest extends PHPUnit_Framework_TestCase{
/**
* @expectedException LogicException
* @expectedExceptionCode Grpc\CALL_ERROR_INVALID_FLAGS
- * @expectedExceptionMessage start_invoke
+ * @expectedExceptionMessage invoke
*/
- public function testStartInvokeRejectsBadFlags() {
- $this->call->start_invoke($this->cq, 0, 0, 0, 0xDEADBEEF);
+ public function testInvokeRejectsBadFlags() {
+ $this->call->invoke($this->cq, 0, 0, 0xDEADBEEF);
}
/**
diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php
index 3818f9531c..78c5e9f93b 100755
--- a/src/php/tests/unit_tests/EndToEndTest.php
+++ b/src/php/tests/unit_tests/EndToEndTest.php
@@ -25,18 +25,12 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$deadline);
$tag = 1;
$this->assertEquals(Grpc\CALL_OK,
- $call->start_invoke($this->client_queue,
- $tag,
- $tag,
- $tag));
+ $call->invoke($this->client_queue,
+ $tag,
+ $tag));
$server_tag = 2;
- // the client invocation was accepted
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type);
-
$call->writes_done($tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
@@ -103,18 +97,12 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$deadline);
$tag = 1;
$this->assertEquals(Grpc\CALL_OK,
- $call->start_invoke($this->client_queue,
- $tag,
- $tag,
- $tag));
+ $call->invoke($this->client_queue,
+ $tag,
+ $tag));
$server_tag = 2;
- // the client invocation was accepted
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type);
-
// the client writes
$call->start_write($req_text, $tag);
$event = $this->client_queue->next($deadline);
diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php
index c562a821a4..7c3ad8a07c 100755
--- a/src/php/tests/unit_tests/SecureEndToEndTest.php
+++ b/src/php/tests/unit_tests/SecureEndToEndTest.php
@@ -37,17 +37,11 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$deadline);
$tag = 1;
$this->assertEquals(Grpc\CALL_OK,
- $call->start_invoke($this->client_queue,
- $tag,
- $tag,
- $tag));
+ $call->invoke($this->client_queue,
+ $tag,
+ $tag));
$server_tag = 2;
- // the client invocation was accepted
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type);
-
$call->writes_done($tag);
$event = $this->client_queue->next($deadline);
$this->assertNotNull($event);
@@ -113,18 +107,12 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$deadline);
$tag = 1;
$this->assertEquals(Grpc\CALL_OK,
- $call->start_invoke($this->client_queue,
- $tag,
- $tag,
- $tag));
+ $call->invoke($this->client_queue,
+ $tag,
+ $tag));
$server_tag = 2;
- // the client invocation was accepted
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type);
-
// the client writes
$call->start_write($req_text, $tag);
$event = $this->client_queue->next($deadline);
diff --git a/src/python/_framework/foundation/_logging_pool_test.py b/src/python/_framework/foundation/_logging_pool_test.py
index ffe07c788d..f2224d80e5 100644
--- a/src/python/_framework/foundation/_logging_pool_test.py
+++ b/src/python/_framework/foundation/_logging_pool_test.py
@@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Tests for google3.net.rpc.python.framework.foundation.logging_pool."""
+"""Tests for _framework.foundation.logging_pool."""
import unittest
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 76b80bcaa1..1b6565f729 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -153,7 +153,7 @@ int grpc_rb_call_add_metadata_hash_cb(VALUE key, VALUE val, VALUE call_obj) {
Add metadata elements to the call from a ruby hash, to be sent upon
invocation. flags is a bit-field combination of the write flags defined
- above. REQUIRES: grpc_call_start_invoke/grpc_call_accept have not been
+ above. REQUIRES: grpc_call_invoke/grpc_call_accept have not been
called on this call. Produces no events. */
static VALUE grpc_rb_call_add_metadata(int argc, VALUE *argv, VALUE self) {
@@ -196,16 +196,15 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
/*
call-seq:
- call.start_invoke(completion_queue, tag, flags=nil)
+ call.invoke(completion_queue, tag, flags=nil)
Invoke the RPC. Starts sending metadata and request headers on the wire.
flags is a bit-field combination of the write flags defined above.
REQUIRES: Can be called at most once per call.
Can only be called on the client.
Produces a GRPC_INVOKE_ACCEPTED event on completion. */
-static VALUE grpc_rb_call_start_invoke(int argc, VALUE *argv, VALUE self) {
+static VALUE grpc_rb_call_invoke(int argc, VALUE *argv, VALUE self) {
VALUE cqueue = Qnil;
- VALUE invoke_accepted_tag = Qnil;
VALUE metadata_read_tag = Qnil;
VALUE finished_tag = Qnil;
VALUE flags = Qnil;
@@ -213,17 +212,16 @@ static VALUE grpc_rb_call_start_invoke(int argc, VALUE *argv, VALUE self) {
grpc_completion_queue *cq = NULL;
grpc_call_error err;
- /* "41" == 4 mandatory args, 1 (flags) is optional */
- rb_scan_args(argc, argv, "41", &cqueue, &invoke_accepted_tag,
- &metadata_read_tag, &finished_tag, &flags);
+ /* "31" == 3 mandatory args, 1 (flags) is optional */
+ rb_scan_args(argc, argv, "31", &cqueue, &metadata_read_tag, &finished_tag,
+ &flags);
if (NIL_P(flags)) {
flags = UINT2NUM(0); /* Default to no flags */
}
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
Data_Get_Struct(self, grpc_call, call);
- err = grpc_call_start_invoke(call, cq, ROBJECT(invoke_accepted_tag),
- ROBJECT(metadata_read_tag),
- ROBJECT(finished_tag), NUM2UINT(flags));
+ err = grpc_call_invoke(call, cq, ROBJECT(metadata_read_tag),
+ ROBJECT(finished_tag), NUM2UINT(flags));
if (err != GRPC_CALL_OK) {
rb_raise(rb_eCallError, "invoke failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
@@ -519,7 +517,7 @@ void Init_google_rpc_call() {
grpc_rb_call_server_end_initial_metadata, -1);
rb_define_method(rb_cCall, "add_metadata", grpc_rb_call_add_metadata, -1);
rb_define_method(rb_cCall, "cancel", grpc_rb_call_cancel, 0);
- rb_define_method(rb_cCall, "start_invoke", grpc_rb_call_start_invoke, -1);
+ rb_define_method(rb_cCall, "invoke", grpc_rb_call_invoke, -1);
rb_define_method(rb_cCall, "start_read", grpc_rb_call_start_read, 1);
rb_define_method(rb_cCall, "start_write", grpc_rb_call_start_write, -1);
rb_define_method(rb_cCall, "start_write_status",
diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c
index 0fae9502c3..a1ab6251c8 100644
--- a/src/ruby/ext/grpc/rb_event.c
+++ b/src/ruby/ext/grpc/rb_event.c
@@ -105,10 +105,6 @@ static VALUE grpc_rb_event_type(VALUE self) {
case GRPC_READ:
return rb_const_get(rb_mCompletionType, rb_intern("READ"));
- case GRPC_INVOKE_ACCEPTED:
- grpc_rb_event_result(self); /* validates the result */
- return rb_const_get(rb_mCompletionType, rb_intern("INVOKE_ACCEPTED"));
-
case GRPC_WRITE_ACCEPTED:
grpc_rb_event_result(self); /* validates the result */
return rb_const_get(rb_mCompletionType, rb_intern("WRITE_ACCEPTED"));
@@ -359,6 +355,8 @@ void Init_google_rpc_event() {
rb_define_const(rb_mCompletionType, "FINISHED", INT2NUM(GRPC_FINISHED));
rb_define_const(rb_mCompletionType, "SERVER_RPC_NEW",
INT2NUM(GRPC_SERVER_RPC_NEW));
+ rb_define_const(rb_mCompletionType, "SERVER_SHUTDOWN",
+ INT2NUM(GRPC_SERVER_SHUTDOWN));
rb_define_const(rb_mCompletionType, "RESERVED",
INT2NUM(GRPC_COMPLETION_DO_NOT_USE));
}
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index bd684a8d07..1cdc168bfe 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -47,7 +47,7 @@ module Google
include Core::TimeConsts
attr_reader(:deadline)
- # client_start_invoke begins a client invocation.
+ # client_invoke begins a client invocation.
#
# Flow Control note: this blocks until flow control accepts that client
# request can go ahead.
@@ -59,9 +59,9 @@ module Google
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param call [Call] a call on which to start and invocation
- # @param q [CompletionQueue] used to wait for INVOKE_ACCEPTED
- # @param deadline [Fixnum,TimeSpec] the deadline for INVOKE_ACCEPTED
- def self.client_start_invoke(call, q, _deadline, **kw)
+ # @param q [CompletionQueue] the completion queue
+ # @param deadline [Fixnum,TimeSpec] the deadline
+ def self.client_invoke(call, q, _deadline, **kw)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
unless q.is_a? Core::CompletionQueue
fail(ArgumentError, 'not a CompletionQueue')
@@ -69,24 +69,16 @@ module Google
call.add_metadata(kw) if kw.length > 0
invoke_accepted, client_metadata_read = Object.new, Object.new
finished_tag = Object.new
- call.start_invoke(q, invoke_accepted, client_metadata_read,
- finished_tag)
-
- # wait for the invocation to be accepted
- ev = q.pluck(invoke_accepted, INFINITE_FUTURE)
- fail OutOfTime if ev.nil?
- ev.close
-
+ call.invoke(q, client_metadata_read, finished_tag)
[finished_tag, client_metadata_read]
end
# Creates an ActiveCall.
#
- # ActiveCall should only be created after a call is accepted. That means
- # different things on a client and a server. On the client, the call is
- # accepted after call.start_invoke followed by receipt of the
- # corresponding INVOKE_ACCEPTED. on the server, this is after
- # call.accept.
+ # ActiveCall should only be created after a call is accepted. That
+ # means different things on a client and a server. On the client, the
+ # call is accepted after calling call.invoke. On the server, this is
+ # after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
@@ -495,7 +487,7 @@ module Google
private
def start_call(**kw)
- tags = ActiveCall.client_start_invoke(@call, @cq, @deadline, **kw)
+ tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw)
@finished_tag, @read_metadata_tag = tags
@started = true
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 36877dc648..099d57151c 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -50,9 +50,7 @@ module Google
#
# BidiCall should only be created after a call is accepted. That means
# different things on a client and a server. On the client, the call is
- # accepted after call.start_invoke followed by receipt of the
- # corresponding INVOKE_ACCEPTED. On the server, this is after
- # call.accept.
+ # accepted after call.invoke. On the server, this is after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index b8ecd64f39..9a510df1f3 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -122,24 +122,10 @@ describe GRPC::Core::Call do
end
end
- describe '#start_invoke' do
- it 'should cause the INVOKE_ACCEPTED event' do
- call = make_test_call
- expect(call.start_invoke(@client_queue, @tag, @tag, @tag)).to be_nil
- ev = @client_queue.next(deadline)
- expect(ev.call).to be_a(GRPC::Core::Call)
- expect(ev.tag).to be(@tag)
- expect(ev.type).to be(GRPC::Core::CompletionType::INVOKE_ACCEPTED)
- expect(ev.call).to_not be(call)
- end
- end
-
describe '#start_write' do
it 'should cause the WRITE_ACCEPTED event' do
call = make_test_call
- call.start_invoke(@client_queue, @tag, @tag, @tag)
- ev = @client_queue.next(deadline)
- expect(ev.type).to be(GRPC::Core::CompletionType::INVOKE_ACCEPTED)
+ call.invoke(@client_queue, @tag, @tag)
expect(call.start_write(GRPC::Core::ByteBuffer.new('test_start_write'),
@tag)).to be_nil
ev = @client_queue.next(deadline)
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index df70e56bca..b2afb0581e 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -83,10 +83,7 @@ shared_context 'setup: tags' do
def client_sends(call, sent = 'a message')
req = ByteBuffer.new(sent)
- call.start_invoke(@client_queue, @tag, @tag, @client_finished_tag)
- ev = @client_queue.pluck(@tag, TimeConsts::INFINITE_FUTURE)
- expect(ev).not_to be_nil
- expect(ev.type).to be(INVOKE_ACCEPTED)
+ call.invoke(@client_queue, @tag, @client_finished_tag)
call.start_write(req, @tag)
ev = @client_queue.pluck(@tag, TimeConsts::INFINITE_FUTURE)
expect(ev).not_to be_nil
@@ -233,8 +230,7 @@ shared_examples 'GRPC metadata delivery works OK' do
call.add_metadata(md)
# Client begins a call OK
- call.start_invoke(@client_queue, @tag, @tag, @client_finished_tag)
- expect_next_event_on(@client_queue, INVOKE_ACCEPTED, @tag)
+ call.invoke(@client_queue, @tag, @client_finished_tag)
# ... server has all metadata available even though the client did not
# send a write
diff --git a/src/ruby/spec/event_spec.rb b/src/ruby/spec/event_spec.rb
index 5dec07e1ed..7ef08d021b 100644
--- a/src/ruby/spec/event_spec.rb
+++ b/src/ruby/spec/event_spec.rb
@@ -40,7 +40,8 @@ describe GRPC::Core::CompletionType do
CLIENT_METADATA_READ: 5,
FINISHED: 6,
SERVER_RPC_NEW: 7,
- RESERVED: 8
+ SERVER_SHUTDOWN: 8,
+ RESERVED: 9
}
end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 898022f185..443ba3d192 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -60,8 +60,8 @@ describe GRPC::ActiveCall do
describe 'restricted view methods' do
before(:each) do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
@@ -92,8 +92,8 @@ describe GRPC::ActiveCall do
describe '#remote_send' do
it 'allows a client to send a payload to the server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
@client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
@@ -118,8 +118,8 @@ describe GRPC::ActiveCall do
it 'marshals the payload using the marshal func' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
marshal = proc { |x| 'marshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, marshal,
@pass_through, deadline,
@@ -139,11 +139,11 @@ describe GRPC::ActiveCall do
end
end
- describe '#client_start_invoke' do
+ describe '#client_invoke' do
it 'sends keywords as metadata to the server when the are present' do
call = make_test_call
- ActiveCall.client_start_invoke(call, @client_queue, deadline,
- k1: 'v1', k2: 'v2')
+ ActiveCall.client_invoke(call, @client_queue, deadline,
+ k1: 'v1', k2: 'v2')
@server.request_call(@server_tag)
ev = @server_queue.next(deadline)
expect(ev).to_not be_nil
@@ -155,8 +155,8 @@ describe GRPC::ActiveCall do
describe '#remote_read' do
it 'reads the response sent by a server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
@@ -170,8 +170,8 @@ describe GRPC::ActiveCall do
it 'saves metadata { status=200 } when the server adds no metadata' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
@@ -187,8 +187,8 @@ describe GRPC::ActiveCall do
it 'saves metadata add by the server' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
@@ -205,7 +205,7 @@ describe GRPC::ActiveCall do
it 'get a nil msg before a status when an OK status is sent' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
@@ -224,8 +224,8 @@ describe GRPC::ActiveCall do
it 'unmarshals the response using the unmarshal func' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
unmarshal = proc { |x| 'unmarshalled:' + x }
client_call = ActiveCall.new(call, @client_queue, @pass_through,
unmarshal, deadline,
@@ -251,8 +251,8 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that can read n responses' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
@@ -271,8 +271,8 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that stops after an OK Status' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
read_metadata_tag: meta_tag,
@@ -296,8 +296,8 @@ describe GRPC::ActiveCall do
describe '#writes_done' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
finished_tag: done_tag,
@@ -315,8 +315,8 @@ describe GRPC::ActiveCall do
it 'finishes ok if the server sends an early status response' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
read_metadata_tag: meta_tag,
@@ -334,8 +334,8 @@ describe GRPC::ActiveCall do
it 'finishes ok if writes_done is true' do
call = make_test_call
- done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue,
- deadline)
+ done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue,
+ deadline)
client_call = ActiveCall.new(call, @client_queue, @pass_through,
@pass_through, deadline,
read_metadata_tag: meta_tag,