aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/node/call.cc31
-rw-r--r--src/node/call.h2
-rw-r--r--src/node/client.js99
-rw-r--r--src/node/node_grpc.cc2
-rw-r--r--src/node/test/call_test.js42
-rw-r--r--src/node/test/constant_test.js1
-rw-r--r--src/node/test/end_to_end_test.js57
-rw-r--r--src/node/test/server_test.js41
8 files changed, 101 insertions, 174 deletions
diff --git a/src/node/call.cc b/src/node/call.cc
index b8ee1786a6..6434c2f0d5 100644
--- a/src/node/call.cc
+++ b/src/node/call.cc
@@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) {
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "addMetadata",
FunctionTemplate::New(AddMetadata)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startInvoke",
- FunctionTemplate::New(StartInvoke)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "invoke",
+ FunctionTemplate::New(Invoke)->GetFunction());
NanSetPrototypeTemplate(tpl, "serverAccept",
FunctionTemplate::New(ServerAccept)->GetFunction());
NanSetPrototypeTemplate(
@@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) {
NanReturnUndefined();
}
-NAN_METHOD(Call::StartInvoke) {
+NAN_METHOD(Call::Invoke) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("startInvoke can only be called on Call objects");
+ return NanThrowTypeError("invoke can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
- return NanThrowTypeError("StartInvoke's first argument must be a function");
+ return NanThrowTypeError("invoke's first argument must be a function");
}
if (!args[1]->IsFunction()) {
- return NanThrowTypeError(
- "StartInvoke's second argument must be a function");
- }
- if (!args[2]->IsFunction()) {
- return NanThrowTypeError("StartInvoke's third argument must be a function");
+ return NanThrowTypeError("invoke's second argument must be a function");
}
- if (!args[3]->IsUint32()) {
- return NanThrowTypeError(
- "StartInvoke's fourth argument must be integer flags");
+ if (!args[2]->IsUint32()) {
+ return NanThrowTypeError("invoke's third argument must be integer flags");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
unsigned int flags = args[3]->Uint32Value();
- grpc_call_error error = grpc_call_start_invoke(
+ grpc_call_error error = grpc_call_invoke(
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
- CreateTag(args[0], args.This()), CreateTag(args[1], args.This()),
- CreateTag(args[2], args.This()), flags);
+ CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
- CompletionQueueAsyncWorker::Next();
} else {
- return NanThrowError("startInvoke failed", error);
+ return NanThrowError("invoke failed", error);
}
NanReturnUndefined();
}
@@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) {
NAN_METHOD(Call::Cancel) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("startInvoke can only be called on Call objects");
+ return NanThrowTypeError("cancel can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call);
diff --git a/src/node/call.h b/src/node/call.h
index 55a6fc65b8..1924a1bf42 100644
--- a/src/node/call.h
+++ b/src/node/call.h
@@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap {
static NAN_METHOD(New);
static NAN_METHOD(AddMetadata);
- static NAN_METHOD(StartInvoke);
+ static NAN_METHOD(Invoke);
static NAN_METHOD(ServerAccept);
static NAN_METHOD(ServerEndInitialMetadata);
static NAN_METHOD(Cancel);
diff --git a/src/node/client.js b/src/node/client.js
index edaa115d0f..24f69d8bfd 100644
--- a/src/node/client.js
+++ b/src/node/client.js
@@ -50,101 +50,53 @@ util.inherits(GrpcClientStream, Duplex);
function GrpcClientStream(call, options) {
Duplex.call(this, options);
var self = this;
- // Indicates that we can start reading and have not received a null read
- var can_read = false;
+ var finished = false;
// Indicates that a read is currently pending
var reading = false;
- // Indicates that we can call startWrite
- var can_write = false;
// Indicates that a write is currently pending
var writing = false;
this._call = call;
/**
- * Callback to handle receiving a READ event. Pushes the data from that event
- * onto the read queue and starts reading again if applicable.
- * @param {grpc.Event} event The READ event object
+ * Callback to be called when a READ event is received. Pushes the data onto
+ * the read queue and starts reading again if applicable
+ * @param {grpc.Event} event READ event object
*/
function readCallback(event) {
+ if (finished) {
+ self.push(null);
+ return;
+ }
var data = event.data;
- if (self.push(data)) {
- if (data == null) {
- // Disable starting to read after null read was received
- can_read = false;
- reading = false;
- } else {
- call.startRead(readCallback);
- }
+ if (self.push(data) && data != null) {
+ self._call.startRead(readCallback);
} else {
- // Indicate that reading can be resumed by calling startReading
reading = false;
}
- };
- /**
- * Initiate a read, which continues until self.push returns false (indicating
- * that reading should be paused) or data is null (indicating that there is no
- * more data to read).
- */
- function startReading() {
- call.startRead(readCallback);
- }
- // TODO(mlumish): possibly change queue implementation due to shift slowness
- var write_queue = [];
- /**
- * Write the next chunk of data in the write queue if there is one. Otherwise
- * indicate that there is no pending write. When the write succeeds, this
- * function is called again.
- */
- function writeNext() {
- if (write_queue.length > 0) {
- writing = true;
- var next = write_queue.shift();
- var writeCallback = function(event) {
- next.callback();
- writeNext();
- };
- call.startWrite(next.chunk, writeCallback, 0);
- } else {
- writing = false;
- }
}
- call.startInvoke(function(event) {
- can_read = true;
- can_write = true;
- startReading();
- writeNext();
- }, function(event) {
+ call.invoke(function(event) {
self.emit('metadata', event.data);
}, function(event) {
+ finished = true;
self.emit('status', event.data);
}, 0);
this.on('finish', function() {
call.writesDone(function() {});
});
/**
- * Indicate that reads should start, and start them if the INVOKE_ACCEPTED
- * event has been received.
+ * Start reading if there is not already a pending read. Reading will
+ * continue until self.push returns false (indicating reads should slow
+ * down) or the read data is null (indicating that there is no more data).
*/
- this._enableRead = function() {
- if (!reading) {
- reading = true;
- if (can_read) {
- startReading();
+ this.startReading = function() {
+ if (finished) {
+ self.push(null);
+ } else {
+ if (!reading) {
+ reading = true;
+ self._call.startRead(readCallback);
}
}
};
- /**
- * Push the chunk onto the write queue, and write from the write queue if
- * there is not a pending write
- * @param {Buffer} chunk The chunk of data to write
- * @param {function(Error=)} callback The callback to call when the write
- * completes
- */
- this._tryWrite = function(chunk, callback) {
- write_queue.push({chunk: chunk, callback: callback});
- if (can_write && !writing) {
- writeNext();
- }
- };
}
/**
@@ -153,7 +105,7 @@ function GrpcClientStream(call, options) {
* @param {number} size Ignored
*/
GrpcClientStream.prototype._read = function(size) {
- this._enableRead();
+ this.startReading();
};
/**
@@ -164,7 +116,10 @@ GrpcClientStream.prototype._read = function(size) {
* @param {function(Error=)} callback Ignored
*/
GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
- this._tryWrite(chunk, callback);
+ var self = this;
+ self._call.startWrite(chunk, function(event) {
+ callback();
+ }, 0);
};
/**
diff --git a/src/node/node_grpc.cc b/src/node/node_grpc.cc
index acee0386d2..bc1dfaf899 100644
--- a/src/node/node_grpc.cc
+++ b/src/node/node_grpc.cc
@@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) {
completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
completion_type->Set(NanNew("READ"), READ);
- Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
- completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index e6dc9664f1..6e52ec89bd 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -118,12 +118,11 @@ describe('call', function() {
call.addMetadata(5);
}, TypeError);
});
- it('should fail if startInvoke was already called', function(done) {
+ it('should fail if invoke was already called', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.startInvoke(function() {},
- function() {},
- function() {done();},
- 0);
+ call.invoke(function() {},
+ function() {done();},
+ 0);
assert.throws(function() {
call.addMetadata({'key' : 'key', 'value' : new Buffer('value') });
}, function(err) {
@@ -133,32 +132,26 @@ describe('call', function() {
call.cancel();
});
});
- describe('startInvoke', function() {
- it('should fail with fewer than 4 arguments', function() {
+ describe('invoke', function() {
+ it('should fail with fewer than 3 arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.startInvoke();
+ call.invoke();
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {});
+ call.invoke(function() {});
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {},
- function() {});
- }, TypeError);
- assert.throws(function() {
- call.startInvoke(function() {},
- function() {},
- function() {});
+ call.invoke(function() {},
+ function() {});
}, TypeError);
});
- it('should work with 3 args and an int', function(done) {
+ it('should work with 2 args and an int', function(done) {
assert.doesNotThrow(function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.startInvoke(function() {},
- function() {},
- function() {done();},
- 0);
+ call.invoke(function() {},
+ function() {done();},
+ 0);
// Cancel to speed up the test
call.cancel();
});
@@ -166,12 +159,11 @@ describe('call', function() {
it('should reject incorrectly typed arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.startInvoke(0, 0, 0, 0);
+ call.invoke(0, 0, 0);
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {},
- function() {},
- function() {}, 'test');
+ call.invoke(function() {},
+ function() {}, 'test');
});
});
});
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index f65eea3cff..0138a55226 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -94,7 +94,6 @@ var opErrorNames = [
var completionTypeNames = [
'QUEUE_SHUTDOWN',
'READ',
- 'INVOKE_ACCEPTED',
'WRITE_ACCEPTED',
'FINISH_ACCEPTED',
'CLIENT_METADATA_READ',
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index 40bb5f3bbd..b9e7bb5691 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -72,16 +72,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
-
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
},function(event) {
@@ -91,6 +82,11 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text);
done();
}, 0);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ });
server.start();
server.requestCall(function(event) {
@@ -131,28 +127,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), reply_text);
- done();
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@@ -163,6 +138,24 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text);
done();
}, 0);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), reply_text);
+ done();
+ });
server.start();
server.requestCall(function(event) {
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
index 79f7b32948..9978853bc2 100644
--- a/src/node/test/server_test.js
+++ b/src/node/test/server_test.js
@@ -83,28 +83,7 @@ describe('echo server', function() {
var call = new grpc.Call(channel,
'echo',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), req_text);
- done();
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@@ -116,6 +95,24 @@ describe('echo server', function() {
server.shutdown();
done();
}, 0);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), req_text);
+ done();
+ });
});
});
});