aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-23 14:57:29 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-01-23 14:57:29 -0800
commit5d0d33bbdeeb9c84b5bb15c5fad7ab50bd5da7e3 (patch)
treea1e1b12efec98ee0901f2f78053c8e6f646dc4cd /src/node
parentefcb4da8e005a5c0f24a64f4943095fd4e60fdc0 (diff)
parentc0748898293935517ca390977a55db6ea1424f50 (diff)
Merge branch 'kill-more-printf' of github.com:ctiller/grpc into kill-more-printf
Diffstat (limited to 'src/node')
-rw-r--r--src/node/binding.gyp35
-rw-r--r--src/node/call.cc31
-rw-r--r--src/node/call.h2
-rw-r--r--src/node/client.js99
-rw-r--r--src/node/interop/interop_server.js3
-rw-r--r--src/node/node_grpc.cc2
-rw-r--r--src/node/test/call_test.js42
-rw-r--r--src/node/test/constant_test.js1
-rw-r--r--src/node/test/end_to_end_test.js58
-rw-r--r--src/node/test/interop_sanity_test.js4
-rw-r--r--src/node/test/server_test.js41
11 files changed, 134 insertions, 184 deletions
diff --git a/src/node/binding.gyp b/src/node/binding.gyp
index da4a943491..fe4b5da9c8 100644
--- a/src/node/binding.gyp
+++ b/src/node/binding.gyp
@@ -1,8 +1,13 @@
{
+ "variables" : {
+ 'no_install': "<!(echo $GRPC_NO_INSTALL)",
+ 'grpc_root': "<!(echo $GRPC_ROOT)",
+ 'grpc_lib_subdir': "<!(echo $GRPC_LIB_SUBDIR)"
+ },
"targets" : [
{
'include_dirs': [
- "<!(node -e \"require('nan')\")"
+ "<!(nodejs -e \"require('nan')\")"
],
'cxxflags': [
'-Wall',
@@ -11,16 +16,13 @@
'-g',
'-zdefs'
'-Werror',
- ],
+ ],
'ldflags': [
- '-g',
- '-L/usr/local/google/home/mlumish/grpc_dev/lib'
+ '-g'
],
'link_settings': {
'libraries': [
- '-lgrpc',
'-lrt',
- '-lgpr',
'-lpthread'
],
},
@@ -37,6 +39,27 @@
"server_credentials.cc",
"tag.cc",
"timeval.cc"
+ ],
+ 'conditions' : [
+ ['no_install=="yes"', {
+ 'include_dirs': [
+ "<(grpc_root)/include"
+ ],
+ 'link_settings': {
+ 'libraries': [
+ '<(grpc_root)/<(grpc_lib_subdir)/libgrpc.a',
+ '<(grpc_root)/<(grpc_lib_subdir)/libgpr.a'
+ ]
+ }
+ }],
+ ['no_install!="yes"', {
+ 'link_settings': {
+ 'libraries': [
+ '-lgrpc',
+ '-lgpr'
+ ]
+ }
+ }]
]
}
]
diff --git a/src/node/call.cc b/src/node/call.cc
index b8ee1786a6..6434c2f0d5 100644
--- a/src/node/call.cc
+++ b/src/node/call.cc
@@ -78,8 +78,8 @@ void Call::Init(Handle<Object> exports) {
tpl->InstanceTemplate()->SetInternalFieldCount(1);
NanSetPrototypeTemplate(tpl, "addMetadata",
FunctionTemplate::New(AddMetadata)->GetFunction());
- NanSetPrototypeTemplate(tpl, "startInvoke",
- FunctionTemplate::New(StartInvoke)->GetFunction());
+ NanSetPrototypeTemplate(tpl, "invoke",
+ FunctionTemplate::New(Invoke)->GetFunction());
NanSetPrototypeTemplate(tpl, "serverAccept",
FunctionTemplate::New(ServerAccept)->GetFunction());
NanSetPrototypeTemplate(
@@ -203,37 +203,30 @@ NAN_METHOD(Call::AddMetadata) {
NanReturnUndefined();
}
-NAN_METHOD(Call::StartInvoke) {
+NAN_METHOD(Call::Invoke) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("startInvoke can only be called on Call objects");
+ return NanThrowTypeError("invoke can only be called on Call objects");
}
if (!args[0]->IsFunction()) {
- return NanThrowTypeError("StartInvoke's first argument must be a function");
+ return NanThrowTypeError("invoke's first argument must be a function");
}
if (!args[1]->IsFunction()) {
- return NanThrowTypeError(
- "StartInvoke's second argument must be a function");
- }
- if (!args[2]->IsFunction()) {
- return NanThrowTypeError("StartInvoke's third argument must be a function");
+ return NanThrowTypeError("invoke's second argument must be a function");
}
- if (!args[3]->IsUint32()) {
- return NanThrowTypeError(
- "StartInvoke's fourth argument must be integer flags");
+ if (!args[2]->IsUint32()) {
+ return NanThrowTypeError("invoke's third argument must be integer flags");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
unsigned int flags = args[3]->Uint32Value();
- grpc_call_error error = grpc_call_start_invoke(
+ grpc_call_error error = grpc_call_invoke(
call->wrapped_call, CompletionQueueAsyncWorker::GetQueue(),
- CreateTag(args[0], args.This()), CreateTag(args[1], args.This()),
- CreateTag(args[2], args.This()), flags);
+ CreateTag(args[0], args.This()), CreateTag(args[1], args.This()), flags);
if (error == GRPC_CALL_OK) {
CompletionQueueAsyncWorker::Next();
CompletionQueueAsyncWorker::Next();
- CompletionQueueAsyncWorker::Next();
} else {
- return NanThrowError("startInvoke failed", error);
+ return NanThrowError("invoke failed", error);
}
NanReturnUndefined();
}
@@ -281,7 +274,7 @@ NAN_METHOD(Call::ServerEndInitialMetadata) {
NAN_METHOD(Call::Cancel) {
NanScope();
if (!HasInstance(args.This())) {
- return NanThrowTypeError("startInvoke can only be called on Call objects");
+ return NanThrowTypeError("cancel can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call);
diff --git a/src/node/call.h b/src/node/call.h
index 55a6fc65b8..1924a1bf42 100644
--- a/src/node/call.h
+++ b/src/node/call.h
@@ -61,7 +61,7 @@ class Call : public ::node::ObjectWrap {
static NAN_METHOD(New);
static NAN_METHOD(AddMetadata);
- static NAN_METHOD(StartInvoke);
+ static NAN_METHOD(Invoke);
static NAN_METHOD(ServerAccept);
static NAN_METHOD(ServerEndInitialMetadata);
static NAN_METHOD(Cancel);
diff --git a/src/node/client.js b/src/node/client.js
index f913b06f29..2fefd14bbc 100644
--- a/src/node/client.js
+++ b/src/node/client.js
@@ -62,12 +62,9 @@ function GrpcClientStream(call, serialize, deserialize) {
};
}
var self = this;
- // Indicates that we can start reading and have not received a null read
- var can_read = false;
+ var finished = false;
// Indicates that a read is currently pending
var reading = false;
- // Indicates that we can call startWrite
- var can_write = false;
// Indicates that a write is currently pending
var writing = false;
this._call = call;
@@ -98,91 +95,46 @@ function GrpcClientStream(call, serialize, deserialize) {
return deserialize(buffer);
};
/**
- * Callback to handle receiving a READ event. Pushes the data from that event
- * onto the read queue and starts reading again if applicable.
- * @param {grpc.Event} event The READ event object
+ * Callback to be called when a READ event is received. Pushes the data onto
+ * the read queue and starts reading again if applicable
+ * @param {grpc.Event} event READ event object
*/
function readCallback(event) {
+ if (finished) {
+ self.push(null);
+ return;
+ }
var data = event.data;
- if (self.push(self.deserialize(data))) {
- if (data == null) {
- // Disable starting to read after null read was received
- can_read = false;
- reading = false;
- } else {
- call.startRead(readCallback);
- }
+ if (self.push(data) && data != null) {
+ self._call.startRead(readCallback);
} else {
- // Indicate that reading can be resumed by calling startReading
reading = false;
}
- };
- /**
- * Initiate a read, which continues until self.push returns false (indicating
- * that reading should be paused) or data is null (indicating that there is no
- * more data to read).
- */
- function startReading() {
- call.startRead(readCallback);
- }
- // TODO(mlumish): possibly change queue implementation due to shift slowness
- var write_queue = [];
- /**
- * Write the next chunk of data in the write queue if there is one. Otherwise
- * indicate that there is no pending write. When the write succeeds, this
- * function is called again.
- */
- function writeNext() {
- if (write_queue.length > 0) {
- writing = true;
- var next = write_queue.shift();
- var writeCallback = function(event) {
- next.callback();
- writeNext();
- };
- call.startWrite(self.serialize(next.chunk), writeCallback, 0);
- } else {
- writing = false;
- }
}
- call.startInvoke(function(event) {
- can_read = true;
- can_write = true;
- startReading();
- writeNext();
- }, function(event) {
+ call.invoke(function(event) {
self.emit('metadata', event.data);
}, function(event) {
+ finished = true;
self.emit('status', event.data);
}, 0);
this.on('finish', function() {
call.writesDone(function() {});
});
/**
- * Indicate that reads should start, and start them if the INVOKE_ACCEPTED
- * event has been received.
+ * Start reading if there is not already a pending read. Reading will
+ * continue until self.push returns false (indicating reads should slow
+ * down) or the read data is null (indicating that there is no more data).
*/
- this._enableRead = function() {
- if (!reading) {
- reading = true;
- if (can_read) {
- startReading();
+ this.startReading = function() {
+ if (finished) {
+ self.push(null);
+ } else {
+ if (!reading) {
+ reading = true;
+ self._call.startRead(readCallback);
}
}
};
- /**
- * Push the chunk onto the write queue, and write from the write queue if
- * there is not a pending write
- * @param {Buffer} chunk The chunk of data to write
- * @param {function(Error=)} callback The callback to call when the write
- * completes
- */
- this._tryWrite = function(chunk, callback) {
- write_queue.push({chunk: chunk, callback: callback});
- if (can_write && !writing) {
- writeNext();
- }
- };
}
/**
@@ -191,7 +143,7 @@ function GrpcClientStream(call, serialize, deserialize) {
* @param {number} size Ignored
*/
GrpcClientStream.prototype._read = function(size) {
- this._enableRead();
+ this.startReading();
};
/**
@@ -202,7 +154,10 @@ GrpcClientStream.prototype._read = function(size) {
* @param {function(Error=)} callback Ignored
*/
GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
- this._tryWrite(chunk, callback);
+ var self = this;
+ self._call.startWrite(chunk, function(event) {
+ callback();
+ }, 0);
};
/**
diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index 6d2bd7ae0d..ebf847876c 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -194,7 +194,8 @@ if (require.main === module) {
string: ['port', 'use_tls']
});
var server_obj = getServer(argv.port, argv.use_tls === 'true');
- server_obj.server.start();
+ console.log('Server attaching to port ' + argv.port);
+ server_obj.server.listen();
}
/**
diff --git a/src/node/node_grpc.cc b/src/node/node_grpc.cc
index acee0386d2..bc1dfaf899 100644
--- a/src/node/node_grpc.cc
+++ b/src/node/node_grpc.cc
@@ -148,8 +148,6 @@ void InitCompletionTypeConstants(Handle<Object> exports) {
completion_type->Set(NanNew("QUEUE_SHUTDOWN"), QUEUE_SHUTDOWN);
Handle<Value> READ(NanNew<Uint32, uint32_t>(GRPC_READ));
completion_type->Set(NanNew("READ"), READ);
- Handle<Value> INVOKE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_INVOKE_ACCEPTED));
- completion_type->Set(NanNew("INVOKE_ACCEPTED"), INVOKE_ACCEPTED);
Handle<Value> WRITE_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_WRITE_ACCEPTED));
completion_type->Set(NanNew("WRITE_ACCEPTED"), WRITE_ACCEPTED);
Handle<Value> FINISH_ACCEPTED(NanNew<Uint32, uint32_t>(GRPC_FINISH_ACCEPTED));
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index e6dc9664f1..6e52ec89bd 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -118,12 +118,11 @@ describe('call', function() {
call.addMetadata(5);
}, TypeError);
});
- it('should fail if startInvoke was already called', function(done) {
+ it('should fail if invoke was already called', function(done) {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.startInvoke(function() {},
- function() {},
- function() {done();},
- 0);
+ call.invoke(function() {},
+ function() {done();},
+ 0);
assert.throws(function() {
call.addMetadata({'key' : 'key', 'value' : new Buffer('value') });
}, function(err) {
@@ -133,32 +132,26 @@ describe('call', function() {
call.cancel();
});
});
- describe('startInvoke', function() {
- it('should fail with fewer than 4 arguments', function() {
+ describe('invoke', function() {
+ it('should fail with fewer than 3 arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.startInvoke();
+ call.invoke();
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {});
+ call.invoke(function() {});
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {},
- function() {});
- }, TypeError);
- assert.throws(function() {
- call.startInvoke(function() {},
- function() {},
- function() {});
+ call.invoke(function() {},
+ function() {});
}, TypeError);
});
- it('should work with 3 args and an int', function(done) {
+ it('should work with 2 args and an int', function(done) {
assert.doesNotThrow(function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
- call.startInvoke(function() {},
- function() {},
- function() {done();},
- 0);
+ call.invoke(function() {},
+ function() {done();},
+ 0);
// Cancel to speed up the test
call.cancel();
});
@@ -166,12 +159,11 @@ describe('call', function() {
it('should reject incorrectly typed arguments', function() {
var call = new grpc.Call(channel, 'method', getDeadline(1));
assert.throws(function() {
- call.startInvoke(0, 0, 0, 0);
+ call.invoke(0, 0, 0);
}, TypeError);
assert.throws(function() {
- call.startInvoke(function() {},
- function() {},
- function() {}, 'test');
+ call.invoke(function() {},
+ function() {}, 'test');
});
});
});
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index f65eea3cff..0138a55226 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -94,7 +94,6 @@ var opErrorNames = [
var completionTypeNames = [
'QUEUE_SHUTDOWN',
'READ',
- 'INVOKE_ACCEPTED',
'WRITE_ACCEPTED',
'FINISH_ACCEPTED',
'CLIENT_METADATA_READ',
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index db3834dbba..f7ccbcf5f2 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -70,16 +70,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
-
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
},function(event) {
@@ -109,8 +100,12 @@ describe('end-to-end', function() {
done();
});
});
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ });
});
-
it('should send and receive data without error', function(complete) {
var req_text = 'client_request';
var reply_text = 'server_response';
@@ -127,28 +122,7 @@ describe('end-to-end', function() {
var call = new grpc.Call(channel,
'dummy_method',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), reply_text);
- done();
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@@ -159,6 +133,24 @@ describe('end-to-end', function() {
assert.strictEqual(status.details, status_text);
done();
}, 0);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), reply_text);
+ done();
+ });
server.start();
server.requestCall(function(event) {
diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js
index 410b050e8d..3c062b9788 100644
--- a/src/node/test/interop_sanity_test.js
+++ b/src/node/test/interop_sanity_test.js
@@ -52,7 +52,8 @@ describe('Interop tests', function() {
it('should pass empty_unary', function(done) {
interop_client.runTest(port, name_override, 'empty_unary', true, done);
});
- it('should pass large_unary', function(done) {
+ // This fails due to an unknown bug
+ it.skip('should pass large_unary', function(done) {
interop_client.runTest(port, name_override, 'large_unary', true, done);
});
it('should pass client_streaming', function(done) {
@@ -64,7 +65,6 @@ describe('Interop tests', function() {
it('should pass ping_pong', function(done) {
interop_client.runTest(port, name_override, 'ping_pong', true, done);
});
- // This depends on the new invoke API
it.skip('should pass empty_stream', function(done) {
interop_client.runTest(port, name_override, 'empty_stream', true, done);
});
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
index 61aef4677e..457d13d2f5 100644
--- a/src/node/test/server_test.js
+++ b/src/node/test/server_test.js
@@ -81,28 +81,7 @@ describe('echo server', function() {
var call = new grpc.Call(channel,
'echo',
deadline);
- call.startInvoke(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.INVOKE_ACCEPTED);
- call.startWrite(
- new Buffer(req_text),
- function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.WRITE_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- call.writesDone(function(event) {
- assert.strictEqual(event.type,
- grpc.completionType.FINISH_ACCEPTED);
- assert.strictEqual(event.data, grpc.opError.OK);
- done();
- });
- }, 0);
- call.startRead(function(event) {
- assert.strictEqual(event.type, grpc.completionType.READ);
- assert.strictEqual(event.data.toString(), req_text);
- done();
- });
- },function(event) {
+ call.invoke(function(event) {
assert.strictEqual(event.type,
grpc.completionType.CLIENT_METADATA_READ);
done();
@@ -114,5 +93,23 @@ describe('echo server', function() {
server.shutdown();
done();
}, 0);
+ call.startWrite(
+ new Buffer(req_text),
+ function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.WRITE_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ call.writesDone(function(event) {
+ assert.strictEqual(event.type,
+ grpc.completionType.FINISH_ACCEPTED);
+ assert.strictEqual(event.data, grpc.opError.OK);
+ done();
+ });
+ }, 0);
+ call.startRead(function(event) {
+ assert.strictEqual(event.type, grpc.completionType.READ);
+ assert.strictEqual(event.data.toString(), req_text);
+ done();
+ });
});
});