diff options
Diffstat (limited to 'src')
27 files changed, 321 insertions, 401 deletions
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c index 27a6c6e33d..d1be41074d 100644 --- a/src/core/surface/byte_buffer.c +++ b/src/core/surface/byte_buffer.c @@ -49,6 +49,17 @@ grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) { return bb; } +grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { + switch (bb->type) { + case GRPC_BB_SLICE_BUFFER: + return grpc_byte_buffer_create(bb->data.slice_buffer.slices, + bb->data.slice_buffer.count); + } + gpr_log(GPR_INFO, "should never get here"); + abort(); + return NULL; +} + void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { switch (bb->type) { case GRPC_BB_SLICE_BUFFER: diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 46502fb6b1..05f6661254 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -173,11 +173,14 @@ struct grpc_call { /* protects variables in this section */ gpr_mu read_mu; + gpr_uint8 received_start; + gpr_uint8 start_ok; gpr_uint8 reads_done; gpr_uint8 received_finish; gpr_uint8 received_metadata; gpr_uint8 have_read; gpr_uint8 have_alarm; + gpr_uint8 pending_writes_done; gpr_uint8 got_status_code; /* The current outstanding read message tag (only valid if have_read == 1) */ void *read_tag; @@ -190,6 +193,8 @@ struct grpc_call { /* The current outstanding send message/context/invoke/end tag (only valid if have_write == 1) */ void *write_tag; + grpc_byte_buffer *pending_write; + gpr_uint32 pending_write_flags; /* The final status of the call */ grpc_status_code status_code; @@ -227,11 +232,15 @@ grpc_call *grpc_call_create(grpc_channel *channel, call->have_alarm = 0; call->received_metadata = 0; call->got_status_code = 0; + call->start_ok = 0; call->status_code = server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN; call->status_details = NULL; call->received_finish = 0; call->reads_done = 0; + call->received_start = 0; + call->pending_write = NULL; + call->pending_writes_done = 0; grpc_metadata_buffer_init(&call->incoming_metadata); gpr_ref_init(&call->internal_refcount, 1); grpc_call_stack_init(channel_stack, server_transport_data, @@ -360,16 +369,6 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, return GRPC_CALL_OK; } -static void done_invoke(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_invoke_accepted(call->cq, tag, call, NULL, NULL, error); -} - static void finish_call(grpc_call *call) { size_t count; grpc_metadata *elements; @@ -384,11 +383,81 @@ static void finish_call(grpc_call *call) { elements, count); } -grpc_call_error grpc_call_start_invoke(grpc_call *call, - grpc_completion_queue *cq, - void *invoke_accepted_tag, - void *metadata_read_tag, - void *finished_tag, gpr_uint32 flags) { +static void done_write(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + void *tag = call->write_tag; + + GPR_ASSERT(call->have_write); + call->have_write = 0; + call->write_tag = INVALID_TAG; + grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error); +} + +static void done_writes_done(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + void *tag = call->write_tag; + + GPR_ASSERT(call->have_write); + call->have_write = 0; + call->write_tag = INVALID_TAG; + grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error); +} + +static void call_started(void *user_data, grpc_op_error error) { + grpc_call *call = user_data; + grpc_call_element *elem; + grpc_byte_buffer *pending_write = NULL; + gpr_uint32 pending_write_flags = 0; + gpr_uint8 pending_writes_done = 0; + int ok; + grpc_call_op op; + + gpr_mu_lock(&call->read_mu); + GPR_ASSERT(!call->received_start); + call->received_start = 1; + ok = call->start_ok = (error == GRPC_OP_OK); + pending_write = call->pending_write; + pending_write_flags = call->pending_write_flags; + pending_writes_done = call->pending_writes_done; + gpr_mu_unlock(&call->read_mu); + + if (pending_write) { + if (ok) { + op.type = GRPC_SEND_MESSAGE; + op.dir = GRPC_CALL_DOWN; + op.flags = pending_write_flags; + op.done_cb = done_write; + op.user_data = call; + op.data.message = pending_write; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, &op); + } else { + done_write(call, error); + } + grpc_byte_buffer_destroy(pending_write); + } + if (pending_writes_done) { + if (ok) { + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = done_writes_done; + op.user_data = call; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, &op); + } else { + done_writes_done(call, error); + } + } + + grpc_call_internal_unref(call); +} + +grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, + void *metadata_read_tag, void *finished_tag, + gpr_uint32 flags) { grpc_call_element *elem; grpc_call_op op; @@ -420,7 +489,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call, /* inform the completion queue of an incoming operation */ grpc_cq_begin_op(cq, call, GRPC_FINISHED); grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); - grpc_cq_begin_op(cq, call, GRPC_INVOKE_ACCEPTED); gpr_mu_lock(&call->read_mu); @@ -431,8 +499,6 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call, if (call->received_finish) { /* handle early cancellation */ - grpc_cq_end_invoke_accepted(call->cq, invoke_accepted_tag, call, NULL, NULL, - GRPC_OP_ERROR); grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, NULL, NULL, 0, NULL); finish_call(call); @@ -442,20 +508,18 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call, return GRPC_CALL_OK; } - call->write_tag = invoke_accepted_tag; call->metadata_tag = metadata_read_tag; - call->have_write = 1; - gpr_mu_unlock(&call->read_mu); /* call down the filter stack */ op.type = GRPC_SEND_START; op.dir = GRPC_CALL_DOWN; op.flags = flags; - op.done_cb = done_invoke; + op.done_cb = call_started; op.data.start.pollset = grpc_cq_pollset(cq); op.user_data = call; + grpc_call_internal_ref(call); elem = CALL_ELEM_FROM_CALL(call, 0); elem->filter->call_op(elem, NULL, &op); @@ -486,6 +550,7 @@ grpc_call_error grpc_call_server_accept(grpc_call *call, call->state = CALL_BOUNDCQ; call->cq = cq; call->finished_tag = finished_tag; + call->received_start = 1; if (prq_is_empty(&call->prq) && call->received_finish) { finish_call(call); @@ -535,26 +600,6 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, return GRPC_CALL_OK; } -static void done_writes_done(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_finish_accepted(call->cq, tag, call, NULL, NULL, error); -} - -static void done_write(void *user_data, grpc_op_error error) { - grpc_call *call = user_data; - void *tag = call->write_tag; - - GPR_ASSERT(call->have_write); - call->have_write = 0; - call->write_tag = INVALID_TAG; - grpc_cq_end_write_accepted(call->cq, tag, call, NULL, NULL, error); -} - void grpc_call_client_initial_metadata_complete( grpc_call_element *surface_element) { grpc_call *call = grpc_call_from_top_element(surface_element); @@ -617,7 +662,7 @@ grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) { } else { call->read_tag = tag; call->have_read = 1; - request_more = 1; + request_more = call->received_start; } } else if (prq_is_empty(&call->prq) && call->received_finish) { finish_call(call); @@ -654,8 +699,6 @@ grpc_call_error grpc_call_start_write(grpc_call *call, grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); - /* for now we do no buffering, so a NULL byte_buffer can have no impact - on our behavior -- succeed immediately */ /* TODO(ctiller): if flags & GRPC_WRITE_BUFFER_HINT == 0, this indicates a flush, and that flush should be propogated down from here */ if (byte_buffer == NULL) { @@ -666,15 +709,25 @@ grpc_call_error grpc_call_start_write(grpc_call *call, call->write_tag = tag; call->have_write = 1; - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.done_cb = done_write; - op.user_data = call; - op.data.message = byte_buffer; + gpr_mu_lock(&call->read_mu); + if (!call->received_start) { + call->pending_write = grpc_byte_buffer_copy(byte_buffer); + call->pending_write_flags = flags; - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + gpr_mu_unlock(&call->read_mu); + } else { + gpr_mu_unlock(&call->read_mu); + + op.type = GRPC_SEND_MESSAGE; + op.dir = GRPC_CALL_DOWN; + op.flags = flags; + op.done_cb = done_write; + op.user_data = call; + op.data.message = byte_buffer; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, &op); + } return GRPC_CALL_OK; } @@ -706,14 +759,23 @@ grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) { call->write_tag = tag; call->have_write = 1; - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = done_writes_done; - op.user_data = call; + gpr_mu_lock(&call->read_mu); + if (!call->received_start) { + call->pending_writes_done = 1; - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->call_op(elem, NULL, &op); + gpr_mu_unlock(&call->read_mu); + } else { + gpr_mu_unlock(&call->read_mu); + + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = done_writes_done; + op.user_data = call; + + elem = CALL_ELEM_FROM_CALL(call, 0); + elem->filter->call_op(elem, NULL, &op); + } return GRPC_CALL_OK; } @@ -818,6 +880,8 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_mdelem *md = op->data.metadata; grpc_mdstr *key = md->key; + gpr_log(GPR_DEBUG, "call %p got metadata %s %s", call, + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); if (key == grpc_channel_get_status_string(call->channel)) { maybe_set_status_code(call, decode_status(md)); grpc_mdelem_unref(md); diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index a8919a10d9..c8b2bb2cf6 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -104,7 +104,6 @@ Status Channel::StartBlockingRpc(const RpcMethod &method, context->set_call(call); grpc_event *ev; void *finished_tag = reinterpret_cast<char *>(call); - void *invoke_tag = reinterpret_cast<char *>(call) + 1; void *metadata_read_tag = reinterpret_cast<char *>(call) + 2; void *write_tag = reinterpret_cast<char *>(call) + 3; void *halfclose_tag = reinterpret_cast<char *>(call) + 4; @@ -115,19 +114,11 @@ Status Channel::StartBlockingRpc(const RpcMethod &method, // add_metadata from context // // invoke - GPR_ASSERT(grpc_call_start_invoke(call, cq, invoke_tag, metadata_read_tag, - finished_tag, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - ev = grpc_completion_queue_pluck(cq, invoke_tag, gpr_inf_future); - bool success = ev->data.invoke_accepted == GRPC_OP_OK; - grpc_event_finish(ev); - if (!success) { - GetFinalStatus(cq, finished_tag, &status); - return status; - } + GPR_ASSERT(grpc_call_invoke(call, cq, metadata_read_tag, finished_tag, + GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); // write request grpc_byte_buffer *write_buffer = nullptr; - success = SerializeProto(request, &write_buffer); + bool success = SerializeProto(request, &write_buffer); if (!success) { grpc_call_cancel(call); status = diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc index e64010be64..edb2fc5ad9 100644 --- a/src/cpp/stream/stream_context.cc +++ b/src/cpp/stream/stream_context.cc @@ -80,17 +80,9 @@ void StreamContext::Start(bool buffered) { if (is_client_) { // TODO(yangg) handle metadata send path int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0; - grpc_call_error error = grpc_call_start_invoke(call(), cq(), invoke_tag(), - client_metadata_read_tag(), - finished_tag(), flag); + grpc_call_error error = grpc_call_invoke( + call(), cq(), client_metadata_read_tag(), finished_tag(), flag); GPR_ASSERT(GRPC_CALL_OK == error); - grpc_event *invoke_ev = - grpc_completion_queue_pluck(cq(), invoke_tag(), gpr_inf_future); - if (invoke_ev->data.invoke_accepted != GRPC_OP_OK) { - peer_halfclosed_ = true; - self_halfclosed_ = true; - } - grpc_event_finish(invoke_ev); } else { // TODO(yangg) metadata needs to be added before accept // TODO(yangg) correctly set flag to accept diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h index 8697d86e83..8def589841 100644 --- a/src/cpp/stream/stream_context.h +++ b/src/cpp/stream/stream_context.h @@ -76,7 +76,6 @@ class StreamContext final : public StreamContextInterface { void *read_tag() { return reinterpret_cast<char *>(this) + 1; } void *write_tag() { return reinterpret_cast<char *>(this) + 2; } void *halfclose_tag() { return reinterpret_cast<char *>(this) + 3; } - void *invoke_tag() { return reinterpret_cast<char *>(this) + 4; } void *client_metadata_read_tag() { return reinterpret_cast<char *>(this) + 5; } diff --git a/src/node/call.cc b/src/node/call.cc index b8ee1786a6..6434c2f0d5 100644 --- a/src/node/call.cc +++ b/src/node/call.cc @@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) { tpl->InstanceTemplate()->SetInternalFieldCount(1); NanSetPrototypeTemplate(tpl, "addMetadata", FunctionTemplate::New(AddMetadata)->GetFunction()); - NanSetPrototypeTemplate(tpl, "startInvoke", - FunctionTemplate::New(StartInvoke)->GetFunction()); + NanSetPrototypeTemplate(tpl, "invoke", + FunctionTemplate::New(Invoke)->GetFunction()); NanSetPrototypeTemplate(tpl, "serverAccept", FunctionTemplate::New(ServerAccept)->GetFunction()); NanSetPrototypeTemplate( @@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) { NanReturnUndefined(); } -NAN_METHOD(Call::StartInvoke) { +NAN_METHOD(Call::Invoke) { NanScope(); if (!HasInstance(args.This())) { - return NanThrowTypeError("startInvoke can only be called on Call objects"); + return NanThrowTypeError("invoke can only be called on Call objects"); } if (!args[0]->IsFunction()) { - return NanThrowTypeError("StartInvoke's first argument must be a function"); + return NanThrowTypeError("invoke's first argument must be a function"); } if (!args[1]->IsFunction()) { - return NanThrowTypeError( - "StartInvoke's second argument must be a function"); - } - if (!args[2]->IsFunction()) { - return NanThrowTypeError("StartInvoke's third argument must be a function"); + return NanThrowTypeError("invoke's second argument must be a function"); } - if (!args[3]->IsUint32()) { - return NanThrowTypeError( - "StartInvoke's fourth argument must be integer flags"); + if (!args[2]->IsUint32()) { + return NanThrowTypeError("invoke's third argument must be integer flags"); } Call *call = ObjectWrap::Unwrap<Call>(args.This()); unsigned int flags = args[3]->Uint32Value(); - grpc_call_error error = grpc_call_start_invoke( + grpc_call_error error = grpc_call_invoke( call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(), - CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), - CreateTag(args[2], args.This()), flags); + CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags); if (error == GRPC_CALL_OK) { CompletionQueueAsyncWorker::Next(); CompletionQueueAsyncWorker::Next(); - CompletionQueueAsyncWorker::Next(); } else { - return NanThrowError("startInvoke failed", error); + return NanThrowError("invoke failed", error); } NanReturnUndefined(); } @@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) { NAN_METHOD(Call::Cancel) { NanScope(); if (!HasInstance(args.This())) { - return NanThrowTypeError("startInvoke can only be called on Call objects"); + return NanThrowTypeError("cancel can only be called on Call objects"); } Call *call = ObjectWrap::Unwrap<Call>(args.This()); grpc_call_error error = grpc_call_cancel(call->wrapped_call); diff --git a/src/node/call.h b/src/node/call.h index 55a6fc65b8..1924a1bf42 100644 --- a/src/node/call.h +++ b/src/node/call.h @@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap { static NAN_METHOD(New); static NAN_METHOD(AddMetadata); - static NAN_METHOD(StartInvoke); + static NAN_METHOD(Invoke); static NAN_METHOD(ServerAccept); static NAN_METHOD(ServerEndInitialMetadata); static NAN_METHOD(Cancel); diff --git a/src/node/client.js b/src/node/client.js index f913b06f29..2fefd14bbc 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -62,12 +62,9 @@ function GrpcClientStream(call, serialize, deserialize) { }; } var self = this; - // Indicates that we can start reading and have not received a null read - var can_read = false; + var finished = false; // Indicates that a read is currently pending var reading = false; - // Indicates that we can call startWrite - var can_write = false; // Indicates that a write is currently pending var writing = false; this._call = call; @@ -98,91 +95,46 @@ function GrpcClientStream(call, serialize, deserialize) { return deserialize(buffer); }; /** - * Callback to handle receiving a READ event. Pushes the data from that event - * onto the read queue and starts reading again if applicable. - * @param {grpc.Event} event The READ event object + * Callback to be called when a READ event is received. Pushes the data onto + * the read queue and starts reading again if applicable + * @param {grpc.Event} event READ event object */ function readCallback(event) { + if (finished) { + self.push(null); + return; + } var data = event.data; - if (self.push(self.deserialize(data))) { - if (data == null) { - // Disable starting to read after null read was received - can_read = false; - reading = false; - } else { - call.startRead(readCallback); - } + if (self.push(data) && data != null) { + self._call.startRead(readCallback); } else { - // Indicate that reading can be resumed by calling startReading reading = false; } - }; - /** - * Initiate a read, which continues until self.push returns false (indicating - * that reading should be paused) or data is null (indicating that there is no - * more data to read). - */ - function startReading() { - call.startRead(readCallback); - } - // TODO(mlumish): possibly change queue implementation due to shift slowness - var write_queue = []; - /** - * Write the next chunk of data in the write queue if there is one. Otherwise - * indicate that there is no pending write. When the write succeeds, this - * function is called again. - */ - function writeNext() { - if (write_queue.length > 0) { - writing = true; - var next = write_queue.shift(); - var writeCallback = function(event) { - next.callback(); - writeNext(); - }; - call.startWrite(self.serialize(next.chunk), writeCallback, 0); - } else { - writing = false; - } } - call.startInvoke(function(event) { - can_read = true; - can_write = true; - startReading(); - writeNext(); - }, function(event) { + call.invoke(function(event) { self.emit('metadata', event.data); }, function(event) { + finished = true; self.emit('status', event.data); }, 0); this.on('finish', function() { call.writesDone(function() {}); }); /** - * Indicate that reads should start, and start them if the INVOKE_ACCEPTED - * event has been received. + * Start reading if there is not already a pending read. Reading will + * continue until self.push returns false (indicating reads should slow + * down) or the read data is null (indicating that there is no more data). */ - this._enableRead = function() { - if (!reading) { - reading = true; - if (can_read) { - startReading(); + this.startReading = function() { + if (finished) { + self.push(null); + } else { + if (!reading) { + reading = true; + self._call.startRead(readCallback); } } }; - /** - * Push the chunk onto the write queue, and write from the write queue if - * there is not a pending write - * @param {Buffer} chunk The chunk of data to write - * @param {function(Error=)} callback The callback to call when the write - * completes - */ - this._tryWrite = function(chunk, callback) { - write_queue.push({chunk: chunk, callback: callback}); - if (can_write && !writing) { - writeNext(); - } - }; } /** @@ -191,7 +143,7 @@ function GrpcClientStream(call, serialize, deserialize) { * @param {number} size Ignored */ GrpcClientStream.prototype._read = function(size) { - this._enableRead(); + this.startReading(); }; /** @@ -202,7 +154,10 @@ GrpcClientStream.prototype._read = function(size) { * @param {function(Error=)} callback Ignored */ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { - this._tryWrite(chunk, callback); + var self = this; + self._call.startWrite(chunk, function(event) { + callback(); + }, 0); }; /** diff --git a/src/node/node_grpc.cc b/src/node/node_grpc.cc index acee0386d2..bc1dfaf899 100644 --- a/src/node/node_grpc.cc +++ b/src/node/node_grpc.cc @@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) { completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN); Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ)); completion_type->Set(NanNew("READ"), READ); - Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED)); - completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED); Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED)); completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED); Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED)); diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index e6dc9664f1..6e52ec89bd 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -118,12 +118,11 @@ describe('call', function() { call.addMetadata(5); }, TypeError); }); - it('should fail if startInvoke was already called', function(done) { + it('should fail if invoke was already called', function(done) { var call = new grpc.Call(channel, 'method', getDeadline(1)); - call.startInvoke(function() {}, - function() {}, - function() {done();}, - 0); + call.invoke(function() {}, + function() {done();}, + 0); assert.throws(function() { call.addMetadata({'key' : 'key', 'value' : new Buffer('value') }); }, function(err) { @@ -133,32 +132,26 @@ describe('call', function() { call.cancel(); }); }); - describe('startInvoke', function() { - it('should fail with fewer than 4 arguments', function() { + describe('invoke', function() { + it('should fail with fewer than 3 arguments', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.startInvoke(); + call.invoke(); }, TypeError); assert.throws(function() { - call.startInvoke(function() {}); + call.invoke(function() {}); }, TypeError); assert.throws(function() { - call.startInvoke(function() {}, - function() {}); - }, TypeError); - assert.throws(function() { - call.startInvoke(function() {}, - function() {}, - function() {}); + call.invoke(function() {}, + function() {}); }, TypeError); }); - it('should work with 3 args and an int', function(done) { + it('should work with 2 args and an int', function(done) { assert.doesNotThrow(function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); - call.startInvoke(function() {}, - function() {}, - function() {done();}, - 0); + call.invoke(function() {}, + function() {done();}, + 0); // Cancel to speed up the test call.cancel(); }); @@ -166,12 +159,11 @@ describe('call', function() { it('should reject incorrectly typed arguments', function() { var call = new grpc.Call(channel, 'method', getDeadline(1)); assert.throws(function() { - call.startInvoke(0, 0, 0, 0); + call.invoke(0, 0, 0); }, TypeError); assert.throws(function() { - call.startInvoke(function() {}, - function() {}, - function() {}, 'test'); + call.invoke(function() {}, + function() {}, 'test'); }); }); }); diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js index f65eea3cff..0138a55226 100644 --- a/src/node/test/constant_test.js +++ b/src/node/test/constant_test.js @@ -94,7 +94,6 @@ var opErrorNames = [ var completionTypeNames = [ 'QUEUE_SHUTDOWN', 'READ', - 'INVOKE_ACCEPTED', 'WRITE_ACCEPTED', 'FINISH_ACCEPTED', 'CLIENT_METADATA_READ', diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index db3834dbba..f7ccbcf5f2 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -70,16 +70,7 @@ describe('end-to-end', function() { var call = new grpc.Call(channel, 'dummy_method', deadline); - call.startInvoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.INVOKE_ACCEPTED); - - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - }); - },function(event) { + call.invoke(function(event) { assert.strictEqual(event.type, grpc.completionType.CLIENT_METADATA_READ); },function(event) { @@ -109,8 +100,12 @@ describe('end-to-end', function() { done(); }); }); + call.writesDone(function(event) { + assert.strictEqual(event.type, + grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + }); }); - it('should send and receive data without error', function(complete) { var req_text = 'client_request'; var reply_text = 'server_response'; @@ -127,28 +122,7 @@ describe('end-to-end', function() { var call = new grpc.Call(channel, 'dummy_method', deadline); - call.startInvoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.INVOKE_ACCEPTED); - call.startWrite( - new Buffer(req_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); - call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), reply_text); - done(); - }); - },function(event) { + call.invoke(function(event) { assert.strictEqual(event.type, grpc.completionType.CLIENT_METADATA_READ); done(); @@ -159,6 +133,24 @@ describe('end-to-end', function() { assert.strictEqual(status.details, status_text); done(); }, 0); + call.startWrite( + new Buffer(req_text), + function(event) { + assert.strictEqual(event.type, + grpc.completionType.WRITE_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + call.writesDone(function(event) { + assert.strictEqual(event.type, + grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + done(); + }); + }, 0); + call.startRead(function(event) { + assert.strictEqual(event.type, grpc.completionType.READ); + assert.strictEqual(event.data.toString(), reply_text); + done(); + }); server.start(); server.requestCall(function(event) { diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js index 61aef4677e..457d13d2f5 100644 --- a/src/node/test/server_test.js +++ b/src/node/test/server_test.js @@ -81,28 +81,7 @@ describe('echo server', function() { var call = new grpc.Call(channel, 'echo', deadline); - call.startInvoke(function(event) { - assert.strictEqual(event.type, - grpc.completionType.INVOKE_ACCEPTED); - call.startWrite( - new Buffer(req_text), - function(event) { - assert.strictEqual(event.type, - grpc.completionType.WRITE_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - call.writesDone(function(event) { - assert.strictEqual(event.type, - grpc.completionType.FINISH_ACCEPTED); - assert.strictEqual(event.data, grpc.opError.OK); - done(); - }); - }, 0); - call.startRead(function(event) { - assert.strictEqual(event.type, grpc.completionType.READ); - assert.strictEqual(event.data.toString(), req_text); - done(); - }); - },function(event) { + call.invoke(function(event) { assert.strictEqual(event.type, grpc.completionType.CLIENT_METADATA_READ); done(); @@ -114,5 +93,23 @@ describe('echo server', function() { server.shutdown(); done(); }, 0); + call.startWrite( + new Buffer(req_text), + function(event) { + assert.strictEqual(event.type, + grpc.completionType.WRITE_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + call.writesDone(function(event) { + assert.strictEqual(event.type, + grpc.completionType.FINISH_ACCEPTED); + assert.strictEqual(event.data, grpc.opError.OK); + done(); + }); + }, 0); + call.startRead(function(event) { + assert.strictEqual(event.type, grpc.completionType.READ); + assert.strictEqual(event.data.toString(), req_text); + done(); + }); }); }); diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 410efbce68..b171c9c176 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -224,27 +224,25 @@ PHP_METHOD(Call, add_metadata) { /** * Invoke the RPC. Starts sending metadata and request headers over the wire * @param CompletionQueue $queue The completion queue to use with this call - * @param long $invoke_accepted_tag The tag to associate with this invocation * @param long $metadata_tag The tag to associate with returned metadata * @param long $finished_tag The tag to associate with the finished event * @param long $flags A bitwise combination of the Grpc\WRITE_* constants * (optional) * @return Void */ -PHP_METHOD(Call, start_invoke) { +PHP_METHOD(Call, invoke) { grpc_call_error error_code; long tag1; long tag2; - long tag3; zval *queue_obj; long flags = 0; - /* "Olll|l" == 1 Object, 3 mandatory longs, 1 optional long */ - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Olll|l", &queue_obj, - grpc_ce_completion_queue, &tag1, &tag2, &tag3, + /* "Oll|l" == 1 Object, 3 mandatory longs, 1 optional long */ + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Oll|l", &queue_obj, + grpc_ce_completion_queue, &tag1, &tag2, &flags) == FAILURE) { zend_throw_exception( spl_ce_InvalidArgumentException, - "start_invoke needs a CompletionQueue, 3 longs, and an optional long", + "invoke needs a CompletionQueue, 2 longs, and an optional long", 1 TSRMLS_CC); return; } @@ -254,10 +252,9 @@ PHP_METHOD(Call, start_invoke) { wrapped_grpc_completion_queue *queue = (wrapped_grpc_completion_queue *)zend_object_store_get_object( queue_obj TSRMLS_CC); - error_code = - grpc_call_start_invoke(call->wrapped, queue->wrapped, (void *)tag1, - (void *)tag2, (void *)tag3, (gpr_uint32)flags); - MAYBE_THROW_CALL_ERROR(start_invoke, error_code); + error_code = grpc_call_invoke(call->wrapped, queue->wrapped, (void *)tag1, + (void *)tag2, (gpr_uint32)flags); + MAYBE_THROW_CALL_ERROR(invoke, error_code); } /** @@ -427,7 +424,7 @@ static zend_function_entry call_methods[] = { PHP_ME(Call, server_end_initial_metadata, NULL, ZEND_ACC_PUBLIC) PHP_ME(Call, add_metadata, NULL, ZEND_ACC_PUBLIC) PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Call, start_invoke, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Call, invoke, NULL, ZEND_ACC_PUBLIC) PHP_ME(Call, start_read, NULL, ZEND_ACC_PUBLIC) PHP_ME(Call, start_write, NULL, ZEND_ACC_PUBLIC) PHP_ME(Call, start_write_status, NULL, ZEND_ACC_PUBLIC) diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c index e8b4643a58..492ac06739 100644 --- a/src/php/ext/grpc/php_grpc.c +++ b/src/php/ext/grpc/php_grpc.c @@ -107,11 +107,9 @@ PHP_MINIT_FUNCTION(grpc) { /* Register completion type constants */ REGISTER_LONG_CONSTANT("Grpc\\QUEUE_SHUTDOWN", GRPC_QUEUE_SHUTDOWN, CONST_CS); REGISTER_LONG_CONSTANT("Grpc\\READ", GRPC_READ, CONST_CS); - REGISTER_LONG_CONSTANT("Grpc\\INVOKE_ACCEPTED", GRPC_INVOKE_ACCEPTED, - CONST_CS); - REGISTER_LONG_CONSTANT("Grpc\\WRITE_ACCEPTED", GRPC_WRITE_ACCEPTED, CONST_CS); REGISTER_LONG_CONSTANT("Grpc\\FINISH_ACCEPTED", GRPC_FINISH_ACCEPTED, CONST_CS); + REGISTER_LONG_CONSTANT("Grpc\\WRITE_ACCEPTED", GRPC_WRITE_ACCEPTED, CONST_CS); REGISTER_LONG_CONSTANT("Grpc\\CLIENT_METADATA_READ", GRPC_CLIENT_METADATA_READ, CONST_CS); REGISTER_LONG_CONSTANT("Grpc\\FINISHED", GRPC_FINISHED, CONST_CS); diff --git a/src/php/lib/Grpc/ActiveCall.php b/src/php/lib/Grpc/ActiveCall.php index aa66dbb848..836a4b09e3 100755 --- a/src/php/lib/Grpc/ActiveCall.php +++ b/src/php/lib/Grpc/ActiveCall.php @@ -29,11 +29,8 @@ class ActiveCall { // Invoke the call. $this->call->start_invoke($this->completion_queue, - INVOKE_ACCEPTED, CLIENT_METADATA_READ, FINISHED, 0); - $this->completion_queue->pluck(INVOKE_ACCEPTED, - Timeval::inf_future()); $metadata_event = $this->completion_queue->pluck(CLIENT_METADATA_READ, Timeval::inf_future()); $this->metadata = $metadata_event->data; diff --git a/src/php/tests/unit_tests/CallTest.php b/src/php/tests/unit_tests/CallTest.php index 253052a038..795831cb65 100755 --- a/src/php/tests/unit_tests/CallTest.php +++ b/src/php/tests/unit_tests/CallTest.php @@ -19,10 +19,10 @@ class CallTest extends PHPUnit_Framework_TestCase{ /** * @expectedException LogicException * @expectedExceptionCode Grpc\CALL_ERROR_INVALID_FLAGS - * @expectedExceptionMessage start_invoke + * @expectedExceptionMessage invoke */ - public function testStartInvokeRejectsBadFlags() { - $this->call->start_invoke($this->cq, 0, 0, 0, 0xDEADBEEF); + public function testInvokeRejectsBadFlags() { + $this->call->invoke($this->cq, 0, 0, 0xDEADBEEF); } /** diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php index 3818f9531c..78c5e9f93b 100755 --- a/src/php/tests/unit_tests/EndToEndTest.php +++ b/src/php/tests/unit_tests/EndToEndTest.php @@ -25,18 +25,12 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $deadline); $tag = 1; $this->assertEquals(Grpc\CALL_OK, - $call->start_invoke($this->client_queue, - $tag, - $tag, - $tag)); + $call->invoke($this->client_queue, + $tag, + $tag)); $server_tag = 2; - // the client invocation was accepted - $event = $this->client_queue->next($deadline); - $this->assertNotNull($event); - $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type); - $call->writes_done($tag); $event = $this->client_queue->next($deadline); $this->assertNotNull($event); @@ -103,18 +97,12 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{ $deadline); $tag = 1; $this->assertEquals(Grpc\CALL_OK, - $call->start_invoke($this->client_queue, - $tag, - $tag, - $tag)); + $call->invoke($this->client_queue, + $tag, + $tag)); $server_tag = 2; - // the client invocation was accepted - $event = $this->client_queue->next($deadline); - $this->assertNotNull($event); - $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type); - // the client writes $call->start_write($req_text, $tag); $event = $this->client_queue->next($deadline); diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index c562a821a4..7c3ad8a07c 100755 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php @@ -37,17 +37,11 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $deadline); $tag = 1; $this->assertEquals(Grpc\CALL_OK, - $call->start_invoke($this->client_queue, - $tag, - $tag, - $tag)); + $call->invoke($this->client_queue, + $tag, + $tag)); $server_tag = 2; - // the client invocation was accepted - $event = $this->client_queue->next($deadline); - $this->assertNotNull($event); - $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type); - $call->writes_done($tag); $event = $this->client_queue->next($deadline); $this->assertNotNull($event); @@ -113,18 +107,12 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $deadline); $tag = 1; $this->assertEquals(Grpc\CALL_OK, - $call->start_invoke($this->client_queue, - $tag, - $tag, - $tag)); + $call->invoke($this->client_queue, + $tag, + $tag)); $server_tag = 2; - // the client invocation was accepted - $event = $this->client_queue->next($deadline); - $this->assertNotNull($event); - $this->assertEquals(Grpc\INVOKE_ACCEPTED, $event->type); - // the client writes $call->start_write($req_text, $tag); $event = $this->client_queue->next($deadline); diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 76b80bcaa1..1b6565f729 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -153,7 +153,7 @@ int grpc_rb_call_add_metadata_hash_cb(VALUE key, VALUE val, VALUE call_obj) { Add metadata elements to the call from a ruby hash, to be sent upon invocation. flags is a bit-field combination of the write flags defined - above. REQUIRES: grpc_call_start_invoke/grpc_call_accept have not been + above. REQUIRES: grpc_call_invoke/grpc_call_accept have not been called on this call. Produces no events. */ static VALUE grpc_rb_call_add_metadata(int argc, VALUE *argv, VALUE self) { @@ -196,16 +196,15 @@ static VALUE grpc_rb_call_cancel(VALUE self) { /* call-seq: - call.start_invoke(completion_queue, tag, flags=nil) + call.invoke(completion_queue, tag, flags=nil) Invoke the RPC. Starts sending metadata and request headers on the wire. flags is a bit-field combination of the write flags defined above. REQUIRES: Can be called at most once per call. Can only be called on the client. Produces a GRPC_INVOKE_ACCEPTED event on completion. */ -static VALUE grpc_rb_call_start_invoke(int argc, VALUE *argv, VALUE self) { +static VALUE grpc_rb_call_invoke(int argc, VALUE *argv, VALUE self) { VALUE cqueue = Qnil; - VALUE invoke_accepted_tag = Qnil; VALUE metadata_read_tag = Qnil; VALUE finished_tag = Qnil; VALUE flags = Qnil; @@ -213,17 +212,16 @@ static VALUE grpc_rb_call_start_invoke(int argc, VALUE *argv, VALUE self) { grpc_completion_queue *cq = NULL; grpc_call_error err; - /* "41" == 4 mandatory args, 1 (flags) is optional */ - rb_scan_args(argc, argv, "41", &cqueue, &invoke_accepted_tag, - &metadata_read_tag, &finished_tag, &flags); + /* "31" == 3 mandatory args, 1 (flags) is optional */ + rb_scan_args(argc, argv, "31", &cqueue, &metadata_read_tag, &finished_tag, + &flags); if (NIL_P(flags)) { flags = UINT2NUM(0); /* Default to no flags */ } cq = grpc_rb_get_wrapped_completion_queue(cqueue); Data_Get_Struct(self, grpc_call, call); - err = grpc_call_start_invoke(call, cq, ROBJECT(invoke_accepted_tag), - ROBJECT(metadata_read_tag), - ROBJECT(finished_tag), NUM2UINT(flags)); + err = grpc_call_invoke(call, cq, ROBJECT(metadata_read_tag), + ROBJECT(finished_tag), NUM2UINT(flags)); if (err != GRPC_CALL_OK) { rb_raise(rb_eCallError, "invoke failed: %s (code=%d)", grpc_call_error_detail_of(err), err); @@ -519,7 +517,7 @@ void Init_google_rpc_call() { grpc_rb_call_server_end_initial_metadata, -1); rb_define_method(rb_cCall, "add_metadata", grpc_rb_call_add_metadata, -1); rb_define_method(rb_cCall, "cancel", grpc_rb_call_cancel, 0); - rb_define_method(rb_cCall, "start_invoke", grpc_rb_call_start_invoke, -1); + rb_define_method(rb_cCall, "invoke", grpc_rb_call_invoke, -1); rb_define_method(rb_cCall, "start_read", grpc_rb_call_start_read, 1); rb_define_method(rb_cCall, "start_write", grpc_rb_call_start_write, -1); rb_define_method(rb_cCall, "start_write_status", diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c index 0fae9502c3..a1ab6251c8 100644 --- a/src/ruby/ext/grpc/rb_event.c +++ b/src/ruby/ext/grpc/rb_event.c @@ -105,10 +105,6 @@ static VALUE grpc_rb_event_type(VALUE self) { case GRPC_READ: return rb_const_get(rb_mCompletionType, rb_intern("READ")); - case GRPC_INVOKE_ACCEPTED: - grpc_rb_event_result(self); /* validates the result */ - return rb_const_get(rb_mCompletionType, rb_intern("INVOKE_ACCEPTED")); - case GRPC_WRITE_ACCEPTED: grpc_rb_event_result(self); /* validates the result */ return rb_const_get(rb_mCompletionType, rb_intern("WRITE_ACCEPTED")); @@ -359,6 +355,8 @@ void Init_google_rpc_event() { rb_define_const(rb_mCompletionType, "FINISHED", INT2NUM(GRPC_FINISHED)); rb_define_const(rb_mCompletionType, "SERVER_RPC_NEW", INT2NUM(GRPC_SERVER_RPC_NEW)); + rb_define_const(rb_mCompletionType, "SERVER_SHUTDOWN", + INT2NUM(GRPC_SERVER_SHUTDOWN)); rb_define_const(rb_mCompletionType, "RESERVED", INT2NUM(GRPC_COMPLETION_DO_NOT_USE)); } diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index bd684a8d07..1cdc168bfe 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -47,7 +47,7 @@ module Google include Core::TimeConsts attr_reader(:deadline) - # client_start_invoke begins a client invocation. + # client_invoke begins a client invocation. # # Flow Control note: this blocks until flow control accepts that client # request can go ahead. @@ -59,9 +59,9 @@ module Google # if a keyword value is a list, multiple metadata for it's key are sent # # @param call [Call] a call on which to start and invocation - # @param q [CompletionQueue] used to wait for INVOKE_ACCEPTED - # @param deadline [Fixnum,TimeSpec] the deadline for INVOKE_ACCEPTED - def self.client_start_invoke(call, q, _deadline, **kw) + # @param q [CompletionQueue] the completion queue + # @param deadline [Fixnum,TimeSpec] the deadline + def self.client_invoke(call, q, _deadline, **kw) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') @@ -69,24 +69,16 @@ module Google call.add_metadata(kw) if kw.length > 0 invoke_accepted, client_metadata_read = Object.new, Object.new finished_tag = Object.new - call.start_invoke(q, invoke_accepted, client_metadata_read, - finished_tag) - - # wait for the invocation to be accepted - ev = q.pluck(invoke_accepted, INFINITE_FUTURE) - fail OutOfTime if ev.nil? - ev.close - + call.invoke(q, client_metadata_read, finished_tag) [finished_tag, client_metadata_read] end # Creates an ActiveCall. # - # ActiveCall should only be created after a call is accepted. That means - # different things on a client and a server. On the client, the call is - # accepted after call.start_invoke followed by receipt of the - # corresponding INVOKE_ACCEPTED. on the server, this is after - # call.accept. + # ActiveCall should only be created after a call is accepted. That + # means different things on a client and a server. On the client, the + # call is accepted after calling call.invoke. On the server, this is + # after call.accept. # # #initialize cannot determine if the call is accepted or not; so if a # call that's not accepted is used here, the error won't be visible until @@ -495,7 +487,7 @@ module Google private def start_call(**kw) - tags = ActiveCall.client_start_invoke(@call, @cq, @deadline, **kw) + tags = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) @finished_tag, @read_metadata_tag = tags @started = true end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 14ef6c531f..7653192ad6 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -50,9 +50,7 @@ module Google # # BidiCall should only be created after a call is accepted. That means # different things on a client and a server. On the client, the call is - # accepted after call.start_invoke followed by receipt of the - # corresponding INVOKE_ACCEPTED. On the server, this is after - # call.accept. + # accepted after call.invoke. On the server, this is after call.accept. # # #initialize cannot determine if the call is accepted or not; so if a # call that's not accepted is used here, the error won't be visible until diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb index b8ecd64f39..9a510df1f3 100644 --- a/src/ruby/spec/call_spec.rb +++ b/src/ruby/spec/call_spec.rb @@ -122,24 +122,10 @@ describe GRPC::Core::Call do end end - describe '#start_invoke' do - it 'should cause the INVOKE_ACCEPTED event' do - call = make_test_call - expect(call.start_invoke(@client_queue, @tag, @tag, @tag)).to be_nil - ev = @client_queue.next(deadline) - expect(ev.call).to be_a(GRPC::Core::Call) - expect(ev.tag).to be(@tag) - expect(ev.type).to be(GRPC::Core::CompletionType::INVOKE_ACCEPTED) - expect(ev.call).to_not be(call) - end - end - describe '#start_write' do it 'should cause the WRITE_ACCEPTED event' do call = make_test_call - call.start_invoke(@client_queue, @tag, @tag, @tag) - ev = @client_queue.next(deadline) - expect(ev.type).to be(GRPC::Core::CompletionType::INVOKE_ACCEPTED) + call.invoke(@client_queue, @tag, @tag) expect(call.start_write(GRPC::Core::ByteBuffer.new('test_start_write'), @tag)).to be_nil ev = @client_queue.next(deadline) diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 1bcbc66446..59b4bbd9d8 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -83,10 +83,7 @@ shared_context 'setup: tags' do def client_sends(call, sent = 'a message') req = ByteBuffer.new(sent) - call.start_invoke(@client_queue, @tag, @tag, @client_finished_tag) - ev = @client_queue.pluck(@tag, TimeConsts::INFINITE_FUTURE) - expect(ev).not_to be_nil - expect(ev.type).to be(INVOKE_ACCEPTED) + call.invoke(@client_queue, @tag, @client_finished_tag) call.start_write(req, @tag) ev = @client_queue.pluck(@tag, TimeConsts::INFINITE_FUTURE) expect(ev).not_to be_nil @@ -233,8 +230,7 @@ shared_examples 'GRPC metadata delivery works OK' do call.add_metadata(md) # Client begins a call OK - call.start_invoke(@client_queue, @tag, @tag, @client_finished_tag) - expect_next_event_on(@client_queue, INVOKE_ACCEPTED, @tag) + call.invoke(@client_queue, @tag, @client_finished_tag) # ... server has all metadata available even though the client did not # send a write diff --git a/src/ruby/spec/event_spec.rb b/src/ruby/spec/event_spec.rb index 5dec07e1ed..7ef08d021b 100644 --- a/src/ruby/spec/event_spec.rb +++ b/src/ruby/spec/event_spec.rb @@ -40,7 +40,8 @@ describe GRPC::Core::CompletionType do CLIENT_METADATA_READ: 5, FINISHED: 6, SERVER_RPC_NEW: 7, - RESERVED: 8 + SERVER_SHUTDOWN: 8, + RESERVED: 9 } end diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 898022f185..443ba3d192 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -60,8 +60,8 @@ describe GRPC::ActiveCall do describe 'restricted view methods' do before(:each) do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, finished_tag: done_tag, @@ -92,8 +92,8 @@ describe GRPC::ActiveCall do describe '#remote_send' do it 'allows a client to send a payload to the server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, finished_tag: done_tag, @@ -118,8 +118,8 @@ describe GRPC::ActiveCall do it 'marshals the payload using the marshal func' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) marshal = proc { |x| 'marshalled:' + x } client_call = ActiveCall.new(call, @client_queue, marshal, @pass_through, deadline, @@ -139,11 +139,11 @@ describe GRPC::ActiveCall do end end - describe '#client_start_invoke' do + describe '#client_invoke' do it 'sends keywords as metadata to the server when the are present' do call = make_test_call - ActiveCall.client_start_invoke(call, @client_queue, deadline, - k1: 'v1', k2: 'v2') + ActiveCall.client_invoke(call, @client_queue, deadline, + k1: 'v1', k2: 'v2') @server.request_call(@server_tag) ev = @server_queue.next(deadline) expect(ev).to_not be_nil @@ -155,8 +155,8 @@ describe GRPC::ActiveCall do describe '#remote_read' do it 'reads the response sent by a server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, finished_tag: done_tag, @@ -170,8 +170,8 @@ describe GRPC::ActiveCall do it 'saves metadata { status=200 } when the server adds no metadata' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, finished_tag: done_tag, @@ -187,8 +187,8 @@ describe GRPC::ActiveCall do it 'saves metadata add by the server' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, finished_tag: done_tag, @@ -205,7 +205,7 @@ describe GRPC::ActiveCall do it 'get a nil msg before a status when an OK status is sent' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, @@ -224,8 +224,8 @@ describe GRPC::ActiveCall do it 'unmarshals the response using the unmarshal func' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) unmarshal = proc { |x| 'unmarshalled:' + x } client_call = ActiveCall.new(call, @client_queue, @pass_through, unmarshal, deadline, @@ -251,8 +251,8 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that can read n responses' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, finished_tag: done_tag, @@ -271,8 +271,8 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that stops after an OK Status' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, read_metadata_tag: meta_tag, @@ -296,8 +296,8 @@ describe GRPC::ActiveCall do describe '#writes_done' do it 'finishes ok if the server sends a status response' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, finished_tag: done_tag, @@ -315,8 +315,8 @@ describe GRPC::ActiveCall do it 'finishes ok if the server sends an early status response' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, read_metadata_tag: meta_tag, @@ -334,8 +334,8 @@ describe GRPC::ActiveCall do it 'finishes ok if writes_done is true' do call = make_test_call - done_tag, meta_tag = ActiveCall.client_start_invoke(call, @client_queue, - deadline) + done_tag, meta_tag = ActiveCall.client_invoke(call, @client_queue, + deadline) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, read_metadata_tag: meta_tag, |