diff options
Diffstat (limited to 'src/node')
-rw-r--r-- | src/node/common.js | 82 | ||||
-rw-r--r-- | src/node/examples/math.proto | 41 | ||||
-rw-r--r-- | src/node/examples/math_server.js | 83 | ||||
-rw-r--r-- | src/node/main.js | 98 | ||||
-rw-r--r-- | src/node/package.json | 9 | ||||
-rw-r--r-- | src/node/server.js | 6 | ||||
-rw-r--r-- | src/node/surface_client.js | 34 | ||||
-rw-r--r-- | src/node/surface_server.js | 65 | ||||
-rw-r--r-- | src/node/test/math_client_test.js | 86 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 75 |
10 files changed, 356 insertions, 223 deletions
diff --git a/src/node/common.js b/src/node/common.js index c2dc276608..656a4aca95 100644 --- a/src/node/common.js +++ b/src/node/common.js @@ -31,32 +31,68 @@ * */ -var _ = require('highland'); +/** + * Get a function that deserializes a specific type of protobuf. + * @param {function()} cls The constructor of the message type to deserialize + * @return {function(Buffer):cls} The deserialization function + */ +function deserializeCls(cls) { + /** + * Deserialize a buffer to a message object + * @param {Buffer} arg_buf The buffer to deserialize + * @return {cls} The resulting object + */ + return function deserialize(arg_buf) { + return cls.decode(arg_buf); + }; +} + +/** + * Get a function that serializes objects to a buffer by protobuf class. + * @param {function()} Cls The constructor of the message type to serialize + * @return {function(Cls):Buffer} The serialization function + */ +function serializeCls(Cls) { + /** + * Serialize an object to a Buffer + * @param {Object} arg The object to serialize + * @return {Buffer} The serialized object + */ + return function serialize(arg) { + return new Buffer(new Cls(arg).encode().toBuffer()); + }; +} /** - * When the given stream finishes without error, call the callback once. This - * will not be called until something begins to consume the stream. - * @param {function} callback The callback to call at stream end - * @param {stream} source The stream to watch - * @return {stream} The stream with the callback attached + * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value. + * @param {ProtoBuf.Reflect.Namespace} value The value to get the name of + * @return {string} The fully qualified name of the value */ -function onSuccessfulStreamEnd(callback, source) { - var error = false; - return source.consume(function(err, x, push, next) { - if (x === _.nil) { - if (!error) { - callback(); - } - push(null, x); - } else if (err) { - error = true; - push(err); - next(); - } else { - push(err, x); - next(); +function fullyQualifiedName(value) { + if (value === null || value === undefined) { + return ''; + } + var name = value.name; + if (value.hasOwnProperty('parent')) { + var parent_name = fullyQualifiedName(value.parent); + if (parent_name !== '') { + name = parent_name + '.' + name; } - }); + } + return name; } -exports.onSuccessfulStreamEnd = onSuccessfulStreamEnd; +/** + * See docs for deserializeCls + */ +exports.deserializeCls = deserializeCls; + +/** + * See docs for serializeCls + */ +exports.serializeCls = serializeCls; + +/** + * See docs for fullyQualifiedName + */ +exports.fullyQualifiedName = fullyQualifiedName; diff --git a/src/node/examples/math.proto b/src/node/examples/math.proto index 14eff5daaf..c49787ad54 100644 --- a/src/node/examples/math.proto +++ b/src/node/examples/math.proto @@ -1,15 +1,15 @@ -syntax = "proto2"; +syntax = "proto3"; package math; message DivArgs { - required int64 dividend = 1; - required int64 divisor = 2; + optional int64 dividend = 1; + optional int64 divisor = 2; } message DivReply { - required int64 quotient = 1; - required int64 remainder = 2; + optional int64 quotient = 1; + optional int64 remainder = 2; } message FibArgs { @@ -17,9 +17,34 @@ message FibArgs { } message Num { - required int64 num = 1; + optional int64 num = 1; } message FibReply { - required int64 count = 1; -}
\ No newline at end of file + optional int64 count = 1; +} + +service Math { + // Div divides args.dividend by args.divisor and returns the quotient and + // remainder. + rpc Div (DivArgs) returns (DivReply) { + } + + // DivMany accepts an arbitrary number of division args from the client stream + // and sends back the results in the reply stream. The stream continues until + // the client closes its end; the server does the same after sending all the + // replies. The stream ends immediately if either end aborts. + rpc DivMany (stream DivArgs) returns (stream DivReply) { + } + + // Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib + // generates up to limit numbers; otherwise it continues until the call is + // canceled. Unlike Fib above, Fib has no final FibReply. + rpc Fib (FibArgs) returns (stream Num) { + } + + // Sum sums a stream of numbers, returning the final result once the stream + // is closed. + rpc Sum (stream Num) returns (Num) { + } +} diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js index 87336b61e5..366513dc17 100644 --- a/src/node/examples/math_server.js +++ b/src/node/examples/math_server.js @@ -38,77 +38,10 @@ var util = require('util'); var Transform = require('stream').Transform; -var builder = ProtoBuf.loadProtoFile(__dirname + '/math.proto'); -var math = builder.build('math'); +var grpc = require('..'); +var math = grpc.load(__dirname + '/math.proto').math; -var makeConstructor = require('../surface_server.js').makeServerConstructor; - -/** - * Get a function that deserializes a specific type of protobuf. - * @param {function()} cls The constructor of the message type to deserialize - * @return {function(Buffer):cls} The deserialization function - */ -function deserializeCls(cls) { - /** - * Deserialize a buffer to a message object - * @param {Buffer} arg_buf The buffer to deserialize - * @return {cls} The resulting object - */ - return function deserialize(arg_buf) { - return cls.decode(arg_buf); - }; -} - -/** - * Get a function that serializes objects to a buffer by protobuf class. - * @param {function()} Cls The constructor of the message type to serialize - * @return {function(Cls):Buffer} The serialization function - */ -function serializeCls(Cls) { - /** - * Serialize an object to a Buffer - * @param {Object} arg The object to serialize - * @return {Buffer} The serialized object - */ - return function serialize(arg) { - return new Buffer(new Cls(arg).encode().toBuffer()); - }; -} - -/* This function call creates a server constructor for servers that that expose - * the four specified methods. This specifies how to serialize messages that the - * server sends and deserialize messages that the client sends, and whether the - * client or the server will send a stream of messages, for each method. This - * also specifies a prefix that will be added to method names when sending them - * on the wire. This function call and all of the preceding code in this file - * are intended to approximate what the generated code will look like for the - * math service */ -var Server = makeConstructor({ - Div: { - serialize: serializeCls(math.DivReply), - deserialize: deserializeCls(math.DivArgs), - client_stream: false, - server_stream: false - }, - Fib: { - serialize: serializeCls(math.Num), - deserialize: deserializeCls(math.FibArgs), - client_stream: false, - server_stream: true - }, - Sum: { - serialize: serializeCls(math.Num), - deserialize: deserializeCls(math.Num), - client_stream: true, - server_stream: false - }, - DivMany: { - serialize: serializeCls(math.DivReply), - deserialize: deserializeCls(math.DivArgs), - client_stream: true, - server_stream: true - } -}, '/Math/'); +var Server = grpc.buildServer([math.Math.service]); /** * Server function for division. Provides the /Math/DivMany and /Math/Div @@ -185,10 +118,12 @@ function mathDivMany(stream) { } var server = new Server({ - Div: mathDiv, - Fib: mathFib, - Sum: mathSum, - DivMany: mathDivMany + 'math.Math' : { + Div: mathDiv, + Fib: mathFib, + Sum: mathSum, + DivMany: mathDivMany + } }); if (require.main === module) { diff --git a/src/node/main.js b/src/node/main.js new file mode 100644 index 0000000000..a8dfa20024 --- /dev/null +++ b/src/node/main.js @@ -0,0 +1,98 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +var _ = require('underscore'); + +var ProtoBuf = require('protobufjs'); + +var surface_client = require('./surface_client.js'); + +var surface_server = require('./surface_server.js'); + +var grpc = require('bindings')('grpc'); + +/** + * Load a gRPC object from an existing ProtoBuf.Reflect object. + * @param {ProtoBuf.Reflect.Namespace} value The ProtoBuf object to load. + * @return {Object<string, *>} The resulting gRPC object + */ +function loadObject(value) { + var result = {}; + if (value.className === 'Namespace') { + _.each(value.children, function(child) { + result[child.name] = loadObject(child); + }); + return result; + } else if (value.className === 'Service') { + return surface_client.makeClientConstructor(value); + } else if (value.className === 'Service.Message') { + return value.build(); + } else { + return value; + } +} + +/** + * Load a gRPC object from a .proto file. + * @param {string} filename The file to load + * @return {Object<string, *>} The resulting gRPC object + */ +function load(filename) { + var builder = ProtoBuf.loadProtoFile(filename); + + return loadObject(builder.ns); +} + +/** + * See docs for loadObject + */ +exports.loadObject = loadObject; + +/** + * See docs for load + */ +exports.load = load; + +/** + * See docs for surface_server.makeServerConstructor + */ +exports.buildServer = surface_server.makeServerConstructor; + +/** + * Status name to code number mapping + */ +exports.status = grpc.status; +/** + * Call error name to code number mapping + */ +exports.callError = grpc.callError; diff --git a/src/node/package.json b/src/node/package.json index a2940b29bb..ed93c4ff41 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -8,11 +8,12 @@ "dependencies": { "bindings": "^1.2.1", "nan": "~1.3.0", - "underscore": "^1.7.0" + "underscore": "^1.7.0", + "protobufjs": "murgatroid99/ProtoBuf.js" }, "devDependencies": { "mocha": "~1.21.0", - "highland": "~2.0.0", - "protobufjs": "~3.8.0" - } + "highland": "~2.0.0" + }, + "main": "main.js" } diff --git a/src/node/server.js b/src/node/server.js index 2704c68f17..e947032b29 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -31,6 +31,8 @@ * */ +var _ = require('underscore'); + var grpc = require('bindings')('grpc.node'); var common = require('./common'); @@ -176,6 +178,10 @@ function Server(options) { * @this Server */ this.start = function() { + console.log('Server starting'); + _.each(handlers, function(handler, handler_name) { + console.log('Serving', handler_name); + }); if (this.started) { throw 'Server is already running'; } diff --git a/src/node/surface_client.js b/src/node/surface_client.js index acd22089ce..77dab5ca6f 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -35,6 +35,8 @@ var _ = require('underscore'); var client = require('./client.js'); +var common = require('./common.js'); + var EventEmitter = require('events').EventEmitter; var stream = require('stream'); @@ -44,6 +46,7 @@ var Writable = stream.Writable; var Duplex = stream.Duplex; var util = require('util'); + function forwardEvent(fromEmitter, toEmitter, event) { fromEmitter.on(event, function forward() { _.partial(toEmitter.emit, event).apply(toEmitter, arguments); @@ -317,16 +320,13 @@ var requester_makers = { } /** - * Creates a constructor for clients with a service defined by the methods - * object. The methods object has string keys and values of this form: - * {serialize: function, deserialize: function, client_stream: bool, - * server_stream: bool} - * @param {!Object<string, Object>} methods Method descriptor for each method - * the client should expose - * @param {string} prefix The prefix to prepend to each method name + * Creates a constructor for clients for the given service + * @param {ProtoBuf.Reflect.Service} service The service to generate a client + * for * @return {function(string, Object)} New client constructor */ -function makeClientConstructor(methods, prefix) { +function makeClientConstructor(service) { + var prefix = '/' + common.fullyQualifiedName(service) + '/'; /** * Create a client with the given methods * @constructor @@ -337,27 +337,29 @@ function makeClientConstructor(methods, prefix) { this.channel = new client.Channel(address, options); } - _.each(methods, function(method, name) { + _.each(service.children, function(method) { var method_type; - if (method.client_stream) { - if (method.server_stream) { + if (method.requestStream) { + if (method.responseStream) { method_type = 'bidi'; } else { method_type = 'client_stream'; } } else { - if (method.server_stream) { + if (method.responseStream) { method_type = 'server_stream'; } else { method_type = 'unary'; } } - SurfaceClient.prototype[name] = requester_makers[method_type]( - prefix + name, - method.serialize, - method.deserialize); + SurfaceClient.prototype[method.name] = requester_makers[method_type]( + prefix + method.name, + common.serializeCls(method.resolvedRequestType.build()), + common.deserializeCls(method.resolvedResponseType.build())); }); + SurfaceClient.service = service; + return SurfaceClient; } diff --git a/src/node/surface_server.js b/src/node/surface_server.js index 295c1ccaff..b6e0c37b4c 100644 --- a/src/node/surface_server.js +++ b/src/node/surface_server.js @@ -42,6 +42,8 @@ var Writable = stream.Writable; var Duplex = stream.Duplex; var util = require('util'); +var common = require('./common.js'); + util.inherits(ServerReadableObjectStream, Readable); /** @@ -287,36 +289,59 @@ var handler_makers = { * @param {string} prefix The prefex to prepend to each method name * @return {function(Object, Object)} New server constructor */ -function makeServerConstructor(methods, prefix) { +function makeServerConstructor(services) { + var qual_names = []; + _.each(services, function(service) { + _.each(service.children, function(method) { + var name = common.fullyQualifiedName(method); + if (_.indexOf(qual_names, name) !== -1) { + throw new Error('Method ' + name + ' exposed by more than one service'); + } + qual_names.push(name); + }); + }); /** * Create a server with the given handlers for all of the methods. * @constructor - * @param {Object} handlers Map from method names to method handlers. + * @param {Object} service_handlers Map from service names to map from method + * names to handlers * @param {Object} options Options to pass to the underlying server */ - function SurfaceServer(handlers, options) { + function SurfaceServer(service_handlers, options) { var server = new Server(options); this.inner_server = server; - _.each(handlers, function(handler, name) { - var method = methods[name]; - var method_type; - if (method.client_stream) { - if (method.server_stream) { - method_type = 'bidi'; + _.each(services, function(service) { + var service_name = common.fullyQualifiedName(service); + if (service_handlers[service_name] === undefined) { + throw new Error('Handlers for service ' + + service_name + ' not provided.'); + } + var prefix = '/' + common.fullyQualifiedName(service) + '/'; + _.each(service.children, function(method) { + var method_type; + if (method.requestStream) { + if (method.responseStream) { + method_type = 'bidi'; + } else { + method_type = 'client_stream'; + } } else { - method_type = 'client_stream'; + if (method.responseStream) { + method_type = 'server_stream'; + } else { + method_type = 'unary'; + } } - } else { - if (method.server_stream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; + if (service_handlers[service_name][method.name] === undefined) { + throw new Error('Method handler for ' + + common.fullyQualifiedName(method) + ' not provided.'); } - } - var binary_handler = handler_makers[method_type](handler, - method.serialize, - method.deserialize); - server.register('' + prefix + name, binary_handler); + var binary_handler = handler_makers[method_type]( + service_handlers[service_name][method.name], + common.serializeCls(method.resolvedResponseType.build()), + common.deserializeCls(method.resolvedRequestType.build())); + server.register(prefix + method.name, binary_handler); + }); }, this); } diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js index 5b34a228ad..45c956d179 100644 --- a/src/node/test/math_client_test.js +++ b/src/node/test/math_client_test.js @@ -32,83 +32,13 @@ */ var assert = require('assert'); -var ProtoBuf = require('protobufjs'); var port_picker = require('../port_picker'); -var builder = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto'); -var math = builder.build('math'); +var grpc = require('..'); +var math = grpc.load(__dirname + '/../examples/math.proto').math; -var client = require('../surface_client.js'); -var makeConstructor = client.makeClientConstructor; /** - * Get a function that deserializes a specific type of protobuf. - * @param {function()} cls The constructor of the message type to deserialize - * @return {function(Buffer):cls} The deserialization function - */ -function deserializeCls(cls) { - /** - * Deserialize a buffer to a message object - * @param {Buffer} arg_buf The buffer to deserialize - * @return {cls} The resulting object - */ - return function deserialize(arg_buf) { - return cls.decode(arg_buf); - }; -} - -/** - * Get a function that serializes objects to a buffer by protobuf class. - * @param {function()} Cls The constructor of the message type to serialize - * @return {function(Cls):Buffer} The serialization function - */ -function serializeCls(Cls) { - /** - * Serialize an object to a Buffer - * @param {Object} arg The object to serialize - * @return {Buffer} The serialized object - */ - return function serialize(arg) { - return new Buffer(new Cls(arg).encode().toBuffer()); - }; -} - -/* This function call creates a client constructor for clients that expose the - * four specified methods. This specifies how to serialize messages that the - * client sends and deserialize messages that the server sends, and whether the - * client or the server will send a stream of messages, for each method. This - * also specifies a prefix that will be added to method names when sending them - * on the wire. This function call and all of the preceding code in this file - * are intended to approximate what the generated code will look like for the - * math client */ -var MathClient = makeConstructor({ - Div: { - serialize: serializeCls(math.DivArgs), - deserialize: deserializeCls(math.DivReply), - client_stream: false, - server_stream: false - }, - Fib: { - serialize: serializeCls(math.FibArgs), - deserialize: deserializeCls(math.Num), - client_stream: false, - server_stream: true - }, - Sum: { - serialize: serializeCls(math.Num), - deserialize: deserializeCls(math.Num), - client_stream: true, - server_stream: false - }, - DivMany: { - serialize: serializeCls(math.DivArgs), - deserialize: deserializeCls(math.DivReply), - client_stream: true, - server_stream: true - } -}, '/Math/'); - -/** - * Channel to use to make requests to a running server. + * Client to use to make requests to a running server. */ var math_client; @@ -122,7 +52,7 @@ describe('Math client', function() { before(function(done) { port_picker.nextAvailablePort(function(port) { server.bind(port).listen(); - math_client = new MathClient(port); + math_client = new math.Math(port); done(); }); }); @@ -137,7 +67,7 @@ describe('Math client', function() { assert.equal(value.remainder, 3); }); call.on('status', function checkStatus(status) { - assert.strictEqual(status.code, client.status.OK); + assert.strictEqual(status.code, grpc.status.OK); done(); }); }); @@ -150,7 +80,7 @@ describe('Math client', function() { next_expected += 1; }); call.on('status', function checkStatus(status) { - assert.strictEqual(status.code, client.status.OK); + assert.strictEqual(status.code, grpc.status.OK); done(); }); }); @@ -164,7 +94,7 @@ describe('Math client', function() { } call.end(); call.on('status', function checkStatus(status) { - assert.strictEqual(status.code, client.status.OK); + assert.strictEqual(status.code, grpc.status.OK); done(); }); }); @@ -184,7 +114,7 @@ describe('Math client', function() { } call.end(); call.on('status', function checkStatus(status) { - assert.strictEqual(status.code, client.status.OK); + assert.strictEqual(status.code, grpc.status.OK); done(); }); }); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js new file mode 100644 index 0000000000..8d0d8ec3bc --- /dev/null +++ b/src/node/test/surface_test.js @@ -0,0 +1,75 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +var assert = require('assert'); + +var surface_server = require('../surface_server.js'); + +var ProtoBuf = require('protobufjs'); + +var grpc = require('..'); + +var math_proto = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto'); + +var mathService = math_proto.lookup('math.Math'); + +describe('Surface server constructor', function() { + it('Should fail with conflicting method names', function() { + assert.throws(function() { + grpc.buildServer([mathService, mathService]); + }); + }); + it('Should succeed with a single service', function() { + assert.doesNotThrow(function() { + grpc.buildServer([mathService]); + }); + }); + it('Should fail with missing handlers', function() { + var Server = grpc.buildServer([mathService]); + assert.throws(function() { + new Server({ + 'math.Math': { + 'Div': function() {}, + 'DivMany': function() {}, + 'Fib': function() {} + } + }); + }, /math.Math.Sum/); + }); + it('Should fail with no handlers for the service', function() { + var Server = grpc.buildServer([mathService]); + assert.throws(function() { + new Server({}); + }, /math.Math/); + }); +}); |