diff options
author | Abhishek Kumar <abhikumar@google.com> | 2015-07-20 13:56:11 -0700 |
---|---|---|
committer | Abhishek Kumar <abhikumar@google.com> | 2015-07-20 13:56:11 -0700 |
commit | 2355066c1ba5b1d5299ee83910df2d776aa07343 (patch) | |
tree | 2baff050e719ac384a840caffa66d9f93438101b /src | |
parent | 58df9eec679575de1023e9d5c15e7e0bfe27c419 (diff) | |
parent | ef9fd53c8010f03e101f16114b5c6ff1b5d76a44 (diff) |
Merge pull request #2464 from murgatroid99/node_server_construction_change
Changed to newer, simpler server construction interface
Diffstat (limited to 'src')
-rw-r--r-- | src/node/README.md | 4 | ||||
-rw-r--r-- | src/node/examples/math_server.js | 18 | ||||
-rw-r--r-- | src/node/examples/route_guide_server.js | 16 | ||||
-rw-r--r-- | src/node/examples/stock_server.js | 15 | ||||
-rw-r--r-- | src/node/index.js | 6 | ||||
-rw-r--r-- | src/node/interop/interop_server.js | 20 | ||||
-rw-r--r-- | src/node/src/server.js | 255 | ||||
-rw-r--r-- | src/node/test/health_test.js | 9 | ||||
-rw-r--r-- | src/node/test/interop_sanity_test.js | 2 | ||||
-rw-r--r-- | src/node/test/math_client_test.js | 2 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 268 |
11 files changed, 319 insertions, 296 deletions
diff --git a/src/node/README.md b/src/node/README.md index 2f4c49096d..78781dab14 100644 --- a/src/node/README.md +++ b/src/node/README.md @@ -54,10 +54,10 @@ function loadObject(reflectionObject) Returns the same structure that `load` returns, but takes a reflection object from `ProtoBuf.js` instead of a file name. ```javascript -function buildServer(serviceArray) +function Server([serverOpions]) ``` -Takes an array of service objects and returns a constructor for a server that handles requests to all of those services. +Constructs a server to which service/implementation pairs can be added. ```javascript diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js index 0a86e7eaff..b1f8a6323f 100644 --- a/src/node/examples/math_server.js +++ b/src/node/examples/math_server.js @@ -36,8 +36,6 @@ var grpc = require('..'); var math = grpc.load(__dirname + '/math.proto').math; -var Server = grpc.buildServer([math.Math.service]); - /** * Server function for division. Provides the /Math/DivMany and /Math/Div * functions (Div is just DivMany with only one stream element). For each @@ -108,19 +106,17 @@ function mathDivMany(stream) { stream.end(); }); } - -var server = new Server({ - 'math.Math' : { - div: mathDiv, - fib: mathFib, - sum: mathSum, - divMany: mathDivMany - } +var server = new grpc.Server(); +server.addProtoService(math.Math.service, { + div: mathDiv, + fib: mathFib, + sum: mathSum, + divMany: mathDivMany }); if (require.main === module) { server.bind('0.0.0.0:50051'); - server.listen(); + server.start(); } /** diff --git a/src/node/examples/route_guide_server.js b/src/node/examples/route_guide_server.js index c777eab7bc..70044a322c 100644 --- a/src/node/examples/route_guide_server.js +++ b/src/node/examples/route_guide_server.js @@ -40,8 +40,6 @@ var _ = require('lodash'); var grpc = require('..'); var examples = grpc.load(__dirname + '/route_guide.proto').examples; -var Server = grpc.buildServer([examples.RouteGuide.service]); - var COORD_FACTOR = 1e7; /** @@ -228,14 +226,14 @@ function routeChat(call) { * @return {Server} The new server object */ function getServer() { - return new Server({ - 'examples.RouteGuide' : { - getFeature: getFeature, - listFeatures: listFeatures, - recordRoute: recordRoute, - routeChat: routeChat - } + var server = new grpc.Server(); + server.addProtoService(examples.RouteGuide.service, { + getFeature: getFeature, + listFeatures: listFeatures, + recordRoute: recordRoute, + routeChat: routeChat }); + return server; } if (require.main === module) { diff --git a/src/node/examples/stock_server.js b/src/node/examples/stock_server.js index caaf9f99ba..f2eb6ad4ab 100644 --- a/src/node/examples/stock_server.js +++ b/src/node/examples/stock_server.js @@ -37,8 +37,6 @@ var _ = require('lodash'); var grpc = require('..'); var examples = grpc.load(__dirname + '/stock.proto').examples; -var StockServer = grpc.buildServer([examples.Stock.service]); - function getLastTradePrice(call, callback) { callback(null, {symbol: call.request.symbol, price: 88}); } @@ -73,13 +71,12 @@ function getLastTradePriceMultiple(call) { }); } -var stockServer = new StockServer({ - 'examples.Stock' : { - getLastTradePrice: getLastTradePrice, - getLastTradePriceMultiple: getLastTradePriceMultiple, - watchFutureTrades: watchFutureTrades, - getHighestTradePrice: getHighestTradePrice - } +var stockServer = new grpc.Server(); +stockServer.addProtoService(examples.Stock.service, { + getLastTradePrice: getLastTradePrice, + getLastTradePriceMultiple: getLastTradePriceMultiple, + watchFutureTrades: watchFutureTrades, + getHighestTradePrice: getHighestTradePrice }); if (require.main === module) { diff --git a/src/node/index.js b/src/node/index.js index b6a4e2d0ee..d81e780443 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -133,9 +133,9 @@ exports.loadObject = loadObject; exports.load = load; /** - * See docs for server.makeServerConstructor + * See docs for Server */ -exports.buildServer = server.makeProtobufServerConstructor; +exports.Server = server.Server; /** * Status name to code number mapping @@ -159,5 +159,3 @@ exports.ServerCredentials = grpc.ServerCredentials; exports.getGoogleAuthDelegate = getGoogleAuthDelegate; exports.makeGenericClientConstructor = client.makeClientConstructor; - -exports.makeGenericServerConstructor = server.makeServerConstructor; diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index 0baa78a094..505c6bb537 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -38,7 +38,6 @@ var path = require('path'); var _ = require('lodash'); var grpc = require('..'); var testProto = grpc.load(__dirname + '/test.proto').grpc.testing; -var Server = grpc.buildServer([testProto.TestService.service]); /** * Create a buffer filled with size zeroes @@ -173,16 +172,15 @@ function getServer(port, tls) { key_data, pem_data); } - var server = new Server({ - 'grpc.testing.TestService' : { - emptyCall: handleEmpty, - unaryCall: handleUnary, - streamingOutputCall: handleStreamingOutput, - streamingInputCall: handleStreamingInput, - fullDuplexCall: handleFullDuplex, - halfDuplexCall: handleHalfDuplex - } - }, null, options); + var server = new grpc.Server(options); + server.addProtoService(testProto.TestService.service, { + emptyCall: handleEmpty, + unaryCall: handleUnary, + streamingOutputCall: handleStreamingOutput, + streamingInputCall: handleStreamingInput, + fullDuplexCall: handleFullDuplex, + halfDuplexCall: handleHalfDuplex + }); var port_num = server.bind('0.0.0.0:' + port, server_creds); return {server: server, port: port_num}; } diff --git a/src/node/src/server.js b/src/node/src/server.js index 00be400e61..0a3a0031bd 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -72,6 +72,9 @@ function handleError(call, error) { status.metadata = error.metadata; } var error_batch = {}; + if (!call.metadataSent) { + error_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + } error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(error_batch, function(){}); } @@ -115,6 +118,10 @@ function sendUnaryResponse(call, value, serialize, metadata) { if (metadata) { status.metadata = metadata; } + if (!call.metadataSent) { + end_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + call.metadataSent = true; + } end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(end_batch, function (){}); @@ -136,6 +143,10 @@ function setUpWritable(stream, serialize) { stream.serialize = common.wrapIgnoreNull(serialize); function sendStatus() { var batch = {}; + if (!stream.call.metadataSent) { + stream.call.metadataSent = true; + batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + } batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; stream.call.startBatch(batch, function(){}); } @@ -239,6 +250,10 @@ function ServerWritableStream(call, serialize) { function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; + if (!this.call.metadataSent) { + batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + this.call.metadataSent = true; + } batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); this.call.startBatch(batch, function(err, value) { if (err) { @@ -251,6 +266,23 @@ function _write(chunk, encoding, callback) { ServerWritableStream.prototype._write = _write; +function sendMetadata(responseMetadata) { + /* jshint validthis: true */ + if (!this.call.metadataSent) { + this.call.metadataSent = true; + var batch = []; + batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + this.call.startBatch(batch, function(err) { + if (err) { + this.emit('error', err); + return; + } + }); + } +} + +ServerWritableStream.prototype.sendMetadata = sendMetadata; + util.inherits(ServerReadableStream, Readable); /** @@ -339,6 +371,7 @@ function ServerDuplexStream(call, serialize, deserialize) { ServerDuplexStream.prototype._read = _read; ServerDuplexStream.prototype._write = _write; +ServerDuplexStream.prototype.sendMetadata = sendMetadata; /** * Fully handle a unary call @@ -348,12 +381,20 @@ ServerDuplexStream.prototype._write = _write; */ function handleUnary(call, handler, metadata) { var emitter = new EventEmitter(); + emitter.sendMetadata = function(responseMetadata) { + if (!call.metadataSent) { + call.metadataSent = true; + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + call.startBatch(batch, function() {}); + } + }; emitter.on('error', function(error) { handleError(call, error); }); + emitter.metadata = metadata; waitForCancel(call, emitter); var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; batch[grpc.opType.RECV_MESSAGE] = true; call.startBatch(batch, function(err, result) { if (err) { @@ -392,8 +433,8 @@ function handleUnary(call, handler, metadata) { function handleServerStreaming(call, handler, metadata) { var stream = new ServerWritableStream(call, handler.serialize); waitForCancel(call, stream); + stream.metadata = metadata; var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; batch[grpc.opType.RECV_MESSAGE] = true; call.startBatch(batch, function(err, result) { if (err) { @@ -419,13 +460,19 @@ function handleServerStreaming(call, handler, metadata) { */ function handleClientStreaming(call, handler, metadata) { var stream = new ServerReadableStream(call, handler.deserialize); + stream.sendMetadata = function(responseMetadata) { + if (!call.metadataSent) { + call.metadataSent = true; + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + call.startBatch(batch, function() {}); + } + }; stream.on('error', function(error) { handleError(call, error); }); waitForCancel(call, stream); - var metadata_batch = {}; - metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; - call.startBatch(metadata_batch, function() {}); + stream.metadata = metadata; handler.func(stream, function(err, value, trailer) { stream.terminate(); if (err) { @@ -449,9 +496,7 @@ function handleBidiStreaming(call, handler, metadata) { var stream = new ServerDuplexStream(call, handler.serialize, handler.deserialize); waitForCancel(call, stream); - var metadata_batch = {}; - metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; - call.startBatch(metadata_batch, function() {}); + stream.metadata = metadata; handler.func(stream); } @@ -466,29 +511,28 @@ var streamHandlers = { * Constructs a server object that stores request handlers and delegates * incoming requests to those handlers * @constructor - * @param {function(string, Object<string, Array<Buffer>>): - Object<string, Array<Buffer|string>>=} getMetadata Callback that gets - * metatada for a given method * @param {Object=} options Options that should be passed to the internal server * implementation */ -function Server(getMetadata, options) { +function Server(options) { this.handlers = {}; var handlers = this.handlers; var server = new grpc.Server(options); this._server = server; + this.started = false; /** * Start the server and begin handling requests * @this Server */ - this.listen = function() { + this.start = function() { + if (this.started) { + throw new Error('Server is already running'); + } + this.started = true; console.log('Server starting'); _.each(handlers, function(handler, handler_name) { console.log('Serving', handler_name); }); - if (this.started) { - throw 'Server is already running'; - } server.start(); /** * Handles the SERVER_RPC_NEW event. If there is a handler associated with @@ -523,11 +567,7 @@ function Server(getMetadata, options) { call.startBatch(batch, function() {}); return; } - var response_metadata = {}; - if (getMetadata) { - response_metadata = getMetadata(method, metadata); - } - streamHandlers[handler.type](call, handler, response_metadata); + streamHandlers[handler.type](call, handler, metadata); } server.requestCall(handleNewCall); }; @@ -565,6 +605,47 @@ Server.prototype.register = function(name, handler, serialize, deserialize, return true; }; +Server.prototype.addService = function(service, implementation) { + if (this.started) { + throw new Error('Can\'t add a service to a started server.'); + } + var self = this; + _.each(service, function(attrs, name) { + var method_type; + if (attrs.requestStream) { + if (attrs.responseStream) { + method_type = 'bidi'; + } else { + method_type = 'client_stream'; + } + } else { + if (attrs.responseStream) { + method_type = 'server_stream'; + } else { + method_type = 'unary'; + } + } + if (implementation[name] === undefined) { + throw new Error('Method handler for ' + attrs.path + + ' not provided.'); + } + var serialize = attrs.responseSerialize; + var deserialize = attrs.requestDeserialize; + var register_success = self.register(attrs.path, + _.bind(implementation[name], + implementation), + serialize, deserialize, method_type); + if (!register_success) { + throw new Error('Method handler for ' + attrs.path + + ' already provided.'); + } + }); +}; + +Server.prototype.addProtoService = function(service, implementation) { + this.addService(common.getProtobufServiceAttrs(service), implementation); +}; + /** * Binds the server to the given port, with SSL enabled if creds is given * @param {string} port The port that the server should bind on, in the format @@ -573,6 +654,9 @@ Server.prototype.register = function(name, handler, serialize, deserialize, * nothing for an insecure port */ Server.prototype.bind = function(port, creds) { + if (this.started) { + throw new Error('Can\'t bind an already running server to an address'); + } if (creds) { return this._server.addSecureHttp2Port(port, creds); } else { @@ -581,131 +665,6 @@ Server.prototype.bind = function(port, creds) { }; /** - * Create a constructor for servers with services defined by service_attr_map. - * That is an object that maps (namespaced) service names to objects that in - * turn map method names to objects with the following keys: - * path: The path on the server for accessing the method. For example, for - * protocol buffers, we use "/service_name/method_name" - * requestStream: bool indicating whether the client sends a stream - * resonseStream: bool indicating whether the server sends a stream - * requestDeserialize: function to deserialize request objects - * responseSerialize: function to serialize response objects - * @param {Object} service_attr_map An object mapping service names to method - * attribute map objects - * @return {function(Object, function, Object=)} New server constructor - */ -function makeServerConstructor(service_attr_map) { - /** - * Create a server with the given handlers for all of the methods. - * @constructor - * @param {Object} service_handlers Map from service names to map from method - * names to handlers - * @param {function(string, Object<string, Array<Buffer>>): - Object<string, Array<Buffer|string>>=} getMetadata Callback that - * gets metatada for a given method - * @param {Object=} options Options to pass to the underlying server - */ - function SurfaceServer(service_handlers, getMetadata, options) { - var server = new Server(getMetadata, options); - this.inner_server = server; - _.each(service_attr_map, function(service_attrs, service_name) { - if (service_handlers[service_name] === undefined) { - throw new Error('Handlers for service ' + - service_name + ' not provided.'); - } - _.each(service_attrs, function(attrs, name) { - var method_type; - if (attrs.requestStream) { - if (attrs.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (attrs.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - if (service_handlers[service_name][name] === undefined) { - throw new Error('Method handler for ' + attrs.path + - ' not provided.'); - } - var serialize = attrs.responseSerialize; - var deserialize = attrs.requestDeserialize; - server.register(attrs.path, _.bind(service_handlers[service_name][name], - service_handlers[service_name]), - serialize, deserialize, method_type); - }); - }, this); - } - - /** - * Binds the server to the given port, with SSL enabled if creds is supplied - * @param {string} port The port that the server should bind on, in the format - * "address:port" - * @param {boolean=} creds Credentials to use for SSL - * @return {SurfaceServer} this - */ - SurfaceServer.prototype.bind = function(port, creds) { - return this.inner_server.bind(port, creds); - }; - - /** - * Starts the server listening on any bound ports - * @return {SurfaceServer} this - */ - SurfaceServer.prototype.listen = function() { - this.inner_server.listen(); - return this; - }; - - /** - * Shuts the server down; tells it to stop listening for new requests and to - * kill old requests. - */ - SurfaceServer.prototype.shutdown = function() { - this.inner_server.shutdown(); - }; - - return SurfaceServer; -} - -/** - * Create a constructor for servers that serve the given services. - * @param {Array<ProtoBuf.Reflect.Service>} services The services that the - * servers will serve - * @return {function(Object, function, Object=)} New server constructor - */ -function makeProtobufServerConstructor(services) { - var qual_names = []; - var service_attr_map = {}; - _.each(services, function(service) { - var service_name = common.fullyQualifiedName(service); - _.each(service.children, function(method) { - var name = common.fullyQualifiedName(method); - if (_.indexOf(qual_names, name) !== -1) { - throw new Error('Method ' + name + ' exposed by more than one service'); - } - qual_names.push(name); - }); - var method_attrs = common.getProtobufServiceAttrs(service); - if (!service_attr_map.hasOwnProperty(service_name)) { - service_attr_map[service_name] = {}; - } - service_attr_map[service_name] = _.extend(service_attr_map[service_name], - method_attrs); - }); - return makeServerConstructor(service_attr_map); -} - -/** - * See documentation for makeServerConstructor - */ -exports.makeServerConstructor = makeServerConstructor; - -/** - * See documentation for makeProtobufServerConstructor + * See documentation for Server */ -exports.makeProtobufServerConstructor = makeProtobufServerConstructor; +exports.Server = Server; diff --git a/src/node/test/health_test.js b/src/node/test/health_test.js index 4d1a5082e0..bb700cc46c 100644 --- a/src/node/test/health_test.js +++ b/src/node/test/health_test.js @@ -49,14 +49,13 @@ describe('Health Checking', function() { 'grpc.test.TestService': 'SERVING' } }; - var HealthServer = grpc.buildServer([health.service]); - var healthServer = new HealthServer({ - 'grpc.health.v1alpha.Health': new health.Implementation(statusMap) - }); + var healthServer = new grpc.Server(); + healthServer.addProtoService(health.service, + new health.Implementation(statusMap)); var healthClient; before(function() { var port_num = healthServer.bind('0.0.0.0:0'); - healthServer.listen(); + healthServer.start(); healthClient = new health.Client('localhost:' + port_num); }); after(function() { diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index fcd8eb6403..0a5eb29c0c 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -46,7 +46,7 @@ describe('Interop tests', function() { before(function(done) { var server_obj = interop_server.getServer(0, true); server = server_obj.server; - server.listen(); + server.start(); port = 'localhost:' + server_obj.port; done(); }); diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js index 3461922e66..f2751857ff 100644 --- a/src/node/test/math_client_test.js +++ b/src/node/test/math_client_test.js @@ -52,7 +52,7 @@ var server = require('../examples/math_server.js'); describe('Math client', function() { before(function(done) { var port_num = server.bind('0.0.0.0:0'); - server.listen(); + server.start(); math_client = new math.Math('localhost:' + port_num); done(); }); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 125957277f..6ede0444c9 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -69,34 +69,45 @@ describe('File loader', function() { }); }); }); -describe('Surface server constructor', function() { - it('Should fail with conflicting method names', function() { - assert.throws(function() { - grpc.buildServer([mathService, mathService]); - }); +describe('Server.prototype.addProtoService', function() { + var server; + var dummyImpls = { + 'div': function() {}, + 'divMany': function() {}, + 'fib': function() {}, + 'sum': function() {} + }; + beforeEach(function() { + server = new grpc.Server(); + }); + afterEach(function() { + server.shutdown(); }); it('Should succeed with a single service', function() { assert.doesNotThrow(function() { - grpc.buildServer([mathService]); + server.addProtoService(mathService, dummyImpls); + }); + }); + it('Should fail with conflicting method names', function() { + server.addProtoService(mathService, dummyImpls); + assert.throws(function() { + server.addProtoService(mathService, dummyImpls); }); }); it('Should fail with missing handlers', function() { - var Server = grpc.buildServer([mathService]); assert.throws(function() { - new Server({ - 'math.Math': { - 'div': function() {}, - 'divMany': function() {}, - 'fib': function() {} - } + server.addProtoService(mathService, { + 'div': function() {}, + 'divMany': function() {}, + 'fib': function() {} }); }, /math.Math.Sum/); }); - it('Should fail with no handlers for the service', function() { - var Server = grpc.buildServer([mathService]); + it('Should fail if the server has been started', function() { + server.start(); assert.throws(function() { - new Server({}); - }, /math.Math/); + server.addProtoService(mathService, dummyImpls); + }); }); }); describe('Echo service', function() { @@ -105,18 +116,16 @@ describe('Echo service', function() { before(function() { var test_proto = ProtoBuf.loadProtoFile(__dirname + '/echo_service.proto'); var echo_service = test_proto.lookup('EchoService'); - var Server = grpc.buildServer([echo_service]); - server = new Server({ - 'EchoService': { - echo: function(call, callback) { - callback(null, call.request); - } + server = new grpc.Server(); + server.addProtoService(echo_service, { + echo: function(call, callback) { + callback(null, call.request); } }); var port = server.bind('localhost:0'); var Client = surface_client.makeProtobufClientConstructor(echo_service); client = new Client('localhost:' + port); - server.listen(); + server.start(); }); after(function() { server.shutdown(); @@ -151,18 +160,14 @@ describe('Generic client and server', function() { var client; var server; before(function() { - var Server = grpc.makeGenericServerConstructor({ - string: string_service_attrs - }); - server = new Server({ - string: { - capitalize: function(call, callback) { - callback(null, _.capitalize(call.request)); - } + server = new grpc.Server(); + server.addService(string_service_attrs, { + capitalize: function(call, callback) { + callback(null, _.capitalize(call.request)); } }); var port = server.bind('localhost:0'); - server.listen(); + server.start(); var Client = grpc.makeGenericClientConstructor(string_service_attrs); client = new Client('localhost:' + port); }); @@ -178,6 +183,82 @@ describe('Generic client and server', function() { }); }); }); +describe('Echo metadata', function() { + var client; + var server; + before(function() { + var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_service = test_proto.lookup('TestService'); + server = new grpc.Server(); + server.addProtoService(test_service, { + unary: function(call, cb) { + call.sendMetadata(call.metadata); + cb(null, {}); + }, + clientStream: function(stream, cb){ + stream.on('data', function(data) {}); + stream.on('end', function() { + stream.sendMetadata(stream.metadata); + cb(null, {}); + }); + }, + serverStream: function(stream) { + stream.sendMetadata(stream.metadata); + stream.end(); + }, + bidiStream: function(stream) { + stream.on('data', function(data) {}); + stream.on('end', function() { + stream.sendMetadata(stream.metadata); + stream.end(); + }); + } + }); + var port = server.bind('localhost:0'); + var Client = surface_client.makeProtobufClientConstructor(test_service); + client = new Client('localhost:' + port); + server.start(); + }); + after(function() { + server.shutdown(); + }); + it('with unary call', function(done) { + var call = client.unary({}, function(err, data) { + assert.ifError(err); + }, {key: ['value']}); + call.on('metadata', function(metadata) { + assert.deepEqual(metadata.key, ['value']); + done(); + }); + }); + it('with client stream call', function(done) { + var call = client.clientStream(function(err, data) { + assert.ifError(err); + }, {key: ['value']}); + call.on('metadata', function(metadata) { + assert.deepEqual(metadata.key, ['value']); + done(); + }); + call.end(); + }); + it('with server stream call', function(done) { + var call = client.serverStream({}, {key: ['value']}); + call.on('data', function() {}); + call.on('metadata', function(metadata) { + assert.deepEqual(metadata.key, ['value']); + done(); + }); + }); + it('with bidi stream call', function(done) { + var call = client.bidiStream({key: ['value']}); + call.on('data', function() {}); + call.on('metadata', function(metadata) { + assert.deepEqual(metadata.key, ['value']); + done(); + }); + call.end(); + }); +}); describe('Other conditions', function() { var client; var server; @@ -185,72 +266,70 @@ describe('Other conditions', function() { before(function() { var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_service = test_proto.lookup('TestService'); - var Server = grpc.buildServer([test_service]); - server = new Server({ - TestService: { - unary: function(call, cb) { - var req = call.request; - if (req.error) { + server = new grpc.Server(); + server.addProtoService(test_service, { + unary: function(call, cb) { + var req = call.request; + if (req.error) { + cb(new Error('Requested error'), null, {metadata: ['yes']}); + } else { + cb(null, {count: 1}, {metadata: ['yes']}); + } + }, + clientStream: function(stream, cb){ + var count = 0; + var errored; + stream.on('data', function(data) { + if (data.error) { + errored = true; cb(new Error('Requested error'), null, {metadata: ['yes']}); } else { - cb(null, {count: 1}, {metadata: ['yes']}); + count += 1; } - }, - clientStream: function(stream, cb){ - var count = 0; - var errored; - stream.on('data', function(data) { - if (data.error) { - errored = true; - cb(new Error('Requested error'), null, {metadata: ['yes']}); - } else { - count += 1; - } - }); - stream.on('end', function() { - if (!errored) { - cb(null, {count: count}, {metadata: ['yes']}); - } - }); - }, - serverStream: function(stream) { - var req = stream.request; - if (req.error) { + }); + stream.on('end', function() { + if (!errored) { + cb(null, {count: count}, {metadata: ['yes']}); + } + }); + }, + serverStream: function(stream) { + var req = stream.request; + if (req.error) { + var err = new Error('Requested error'); + err.metadata = {metadata: ['yes']}; + stream.emit('error', err); + } else { + for (var i = 0; i < 5; i++) { + stream.write({count: i}); + } + stream.end({metadata: ['yes']}); + } + }, + bidiStream: function(stream) { + var count = 0; + stream.on('data', function(data) { + if (data.error) { var err = new Error('Requested error'); - err.metadata = {metadata: ['yes']}; + err.metadata = { + metadata: ['yes'], + count: ['' + count] + }; stream.emit('error', err); } else { - for (var i = 0; i < 5; i++) { - stream.write({count: i}); - } - stream.end({metadata: ['yes']}); + stream.write({count: count}); + count += 1; } - }, - bidiStream: function(stream) { - var count = 0; - stream.on('data', function(data) { - if (data.error) { - var err = new Error('Requested error'); - err.metadata = { - metadata: ['yes'], - count: ['' + count] - }; - stream.emit('error', err); - } else { - stream.write({count: count}); - count += 1; - } - }); - stream.on('end', function() { - stream.end({metadata: ['yes']}); - }); - } + }); + stream.on('end', function() { + stream.end({metadata: ['yes']}); + }); } }); port = server.bind('localhost:0'); var Client = surface_client.makeProtobufClientConstructor(test_service); client = new Client('localhost:' + port); - server.listen(); + server.start(); }); after(function() { server.shutdown(); @@ -465,18 +544,17 @@ describe('Cancelling surface client', function() { var client; var server; before(function() { - var Server = grpc.buildServer([mathService]); - server = new Server({ - 'math.Math': { - 'div': function(stream) {}, - 'divMany': function(stream) {}, - 'fib': function(stream) {}, - 'sum': function(stream) {} - } + server = new grpc.Server(); + server.addProtoService(mathService, { + 'div': function(stream) {}, + 'divMany': function(stream) {}, + 'fib': function(stream) {}, + 'sum': function(stream) {} }); var port = server.bind('localhost:0'); var Client = surface_client.makeProtobufClientConstructor(mathService); client = new Client('localhost:' + port); + server.start(); }); after(function() { server.shutdown(); |