aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node/src/server.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/node/src/server.js')
-rw-r--r--src/node/src/server.js255
1 files changed, 107 insertions, 148 deletions
diff --git a/src/node/src/server.js b/src/node/src/server.js
index 00be400e61..0a3a0031bd 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -72,6 +72,9 @@ function handleError(call, error) {
status.metadata = error.metadata;
}
var error_batch = {};
+ if (!call.metadataSent) {
+ error_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ }
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){});
}
@@ -115,6 +118,10 @@ function sendUnaryResponse(call, value, serialize, metadata) {
if (metadata) {
status.metadata = metadata;
}
+ if (!call.metadataSent) {
+ end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ call.metadataSent = true;
+ }
end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
@@ -136,6 +143,10 @@ function setUpWritable(stream, serialize) {
stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() {
var batch = {};
+ if (!stream.call.metadataSent) {
+ stream.call.metadataSent = true;
+ batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ }
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
stream.call.startBatch(batch, function(){});
}
@@ -239,6 +250,10 @@ function ServerWritableStream(call, serialize) {
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
+ if (!this.call.metadataSent) {
+ batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ this.call.metadataSent = true;
+ }
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, value) {
if (err) {
@@ -251,6 +266,23 @@ function _write(chunk, encoding, callback) {
ServerWritableStream.prototype._write = _write;
+function sendMetadata(responseMetadata) {
+ /* jshint validthis: true */
+ if (!this.call.metadataSent) {
+ this.call.metadataSent = true;
+ var batch = [];
+ batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
+ this.call.startBatch(batch, function(err) {
+ if (err) {
+ this.emit('error', err);
+ return;
+ }
+ });
+ }
+}
+
+ServerWritableStream.prototype.sendMetadata = sendMetadata;
+
util.inherits(ServerReadableStream, Readable);
/**
@@ -339,6 +371,7 @@ function ServerDuplexStream(call, serialize, deserialize) {
ServerDuplexStream.prototype._read = _read;
ServerDuplexStream.prototype._write = _write;
+ServerDuplexStream.prototype.sendMetadata = sendMetadata;
/**
* Fully handle a unary call
@@ -348,12 +381,20 @@ ServerDuplexStream.prototype._write = _write;
*/
function handleUnary(call, handler, metadata) {
var emitter = new EventEmitter();
+ emitter.sendMetadata = function(responseMetadata) {
+ if (!call.metadataSent) {
+ call.metadataSent = true;
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
+ call.startBatch(batch, function() {});
+ }
+ };
emitter.on('error', function(error) {
handleError(call, error);
});
+ emitter.metadata = metadata;
waitForCancel(call, emitter);
var batch = {};
- batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
if (err) {
@@ -392,8 +433,8 @@ function handleUnary(call, handler, metadata) {
function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, handler.serialize);
waitForCancel(call, stream);
+ stream.metadata = metadata;
var batch = {};
- batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
if (err) {
@@ -419,13 +460,19 @@ function handleServerStreaming(call, handler, metadata) {
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize);
+ stream.sendMetadata = function(responseMetadata) {
+ if (!call.metadataSent) {
+ call.metadataSent = true;
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
+ call.startBatch(batch, function() {});
+ }
+ };
stream.on('error', function(error) {
handleError(call, error);
});
waitForCancel(call, stream);
- var metadata_batch = {};
- metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
- call.startBatch(metadata_batch, function() {});
+ stream.metadata = metadata;
handler.func(stream, function(err, value, trailer) {
stream.terminate();
if (err) {
@@ -449,9 +496,7 @@ function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, handler.serialize,
handler.deserialize);
waitForCancel(call, stream);
- var metadata_batch = {};
- metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
- call.startBatch(metadata_batch, function() {});
+ stream.metadata = metadata;
handler.func(stream);
}
@@ -466,29 +511,28 @@ var streamHandlers = {
* Constructs a server object that stores request handlers and delegates
* incoming requests to those handlers
* @constructor
- * @param {function(string, Object<string, Array<Buffer>>):
- Object<string, Array<Buffer|string>>=} getMetadata Callback that gets
- * metatada for a given method
* @param {Object=} options Options that should be passed to the internal server
* implementation
*/
-function Server(getMetadata, options) {
+function Server(options) {
this.handlers = {};
var handlers = this.handlers;
var server = new grpc.Server(options);
this._server = server;
+ this.started = false;
/**
* Start the server and begin handling requests
* @this Server
*/
- this.listen = function() {
+ this.start = function() {
+ if (this.started) {
+ throw new Error('Server is already running');
+ }
+ this.started = true;
console.log('Server starting');
_.each(handlers, function(handler, handler_name) {
console.log('Serving', handler_name);
});
- if (this.started) {
- throw 'Server is already running';
- }
server.start();
/**
* Handles the SERVER_RPC_NEW event. If there is a handler associated with
@@ -523,11 +567,7 @@ function Server(getMetadata, options) {
call.startBatch(batch, function() {});
return;
}
- var response_metadata = {};
- if (getMetadata) {
- response_metadata = getMetadata(method, metadata);
- }
- streamHandlers[handler.type](call, handler, response_metadata);
+ streamHandlers[handler.type](call, handler, metadata);
}
server.requestCall(handleNewCall);
};
@@ -565,6 +605,47 @@ Server.prototype.register = function(name, handler, serialize, deserialize,
return true;
};
+Server.prototype.addService = function(service, implementation) {
+ if (this.started) {
+ throw new Error('Can\'t add a service to a started server.');
+ }
+ var self = this;
+ _.each(service, function(attrs, name) {
+ var method_type;
+ if (attrs.requestStream) {
+ if (attrs.responseStream) {
+ method_type = 'bidi';
+ } else {
+ method_type = 'client_stream';
+ }
+ } else {
+ if (attrs.responseStream) {
+ method_type = 'server_stream';
+ } else {
+ method_type = 'unary';
+ }
+ }
+ if (implementation[name] === undefined) {
+ throw new Error('Method handler for ' + attrs.path +
+ ' not provided.');
+ }
+ var serialize = attrs.responseSerialize;
+ var deserialize = attrs.requestDeserialize;
+ var register_success = self.register(attrs.path,
+ _.bind(implementation[name],
+ implementation),
+ serialize, deserialize, method_type);
+ if (!register_success) {
+ throw new Error('Method handler for ' + attrs.path +
+ ' already provided.');
+ }
+ });
+};
+
+Server.prototype.addProtoService = function(service, implementation) {
+ this.addService(common.getProtobufServiceAttrs(service), implementation);
+};
+
/**
* Binds the server to the given port, with SSL enabled if creds is given
* @param {string} port The port that the server should bind on, in the format
@@ -573,6 +654,9 @@ Server.prototype.register = function(name, handler, serialize, deserialize,
* nothing for an insecure port
*/
Server.prototype.bind = function(port, creds) {
+ if (this.started) {
+ throw new Error('Can\'t bind an already running server to an address');
+ }
if (creds) {
return this._server.addSecureHttp2Port(port, creds);
} else {
@@ -581,131 +665,6 @@ Server.prototype.bind = function(port, creds) {
};
/**
- * Create a constructor for servers with services defined by service_attr_map.
- * That is an object that maps (namespaced) service names to objects that in
- * turn map method names to objects with the following keys:
- * path: The path on the server for accessing the method. For example, for
- * protocol buffers, we use "/service_name/method_name"
- * requestStream: bool indicating whether the client sends a stream
- * resonseStream: bool indicating whether the server sends a stream
- * requestDeserialize: function to deserialize request objects
- * responseSerialize: function to serialize response objects
- * @param {Object} service_attr_map An object mapping service names to method
- * attribute map objects
- * @return {function(Object, function, Object=)} New server constructor
- */
-function makeServerConstructor(service_attr_map) {
- /**
- * Create a server with the given handlers for all of the methods.
- * @constructor
- * @param {Object} service_handlers Map from service names to map from method
- * names to handlers
- * @param {function(string, Object<string, Array<Buffer>>):
- Object<string, Array<Buffer|string>>=} getMetadata Callback that
- * gets metatada for a given method
- * @param {Object=} options Options to pass to the underlying server
- */
- function SurfaceServer(service_handlers, getMetadata, options) {
- var server = new Server(getMetadata, options);
- this.inner_server = server;
- _.each(service_attr_map, function(service_attrs, service_name) {
- if (service_handlers[service_name] === undefined) {
- throw new Error('Handlers for service ' +
- service_name + ' not provided.');
- }
- _.each(service_attrs, function(attrs, name) {
- var method_type;
- if (attrs.requestStream) {
- if (attrs.responseStream) {
- method_type = 'bidi';
- } else {
- method_type = 'client_stream';
- }
- } else {
- if (attrs.responseStream) {
- method_type = 'server_stream';
- } else {
- method_type = 'unary';
- }
- }
- if (service_handlers[service_name][name] === undefined) {
- throw new Error('Method handler for ' + attrs.path +
- ' not provided.');
- }
- var serialize = attrs.responseSerialize;
- var deserialize = attrs.requestDeserialize;
- server.register(attrs.path, _.bind(service_handlers[service_name][name],
- service_handlers[service_name]),
- serialize, deserialize, method_type);
- });
- }, this);
- }
-
- /**
- * Binds the server to the given port, with SSL enabled if creds is supplied
- * @param {string} port The port that the server should bind on, in the format
- * "address:port"
- * @param {boolean=} creds Credentials to use for SSL
- * @return {SurfaceServer} this
- */
- SurfaceServer.prototype.bind = function(port, creds) {
- return this.inner_server.bind(port, creds);
- };
-
- /**
- * Starts the server listening on any bound ports
- * @return {SurfaceServer} this
- */
- SurfaceServer.prototype.listen = function() {
- this.inner_server.listen();
- return this;
- };
-
- /**
- * Shuts the server down; tells it to stop listening for new requests and to
- * kill old requests.
- */
- SurfaceServer.prototype.shutdown = function() {
- this.inner_server.shutdown();
- };
-
- return SurfaceServer;
-}
-
-/**
- * Create a constructor for servers that serve the given services.
- * @param {Array<ProtoBuf.Reflect.Service>} services The services that the
- * servers will serve
- * @return {function(Object, function, Object=)} New server constructor
- */
-function makeProtobufServerConstructor(services) {
- var qual_names = [];
- var service_attr_map = {};
- _.each(services, function(service) {
- var service_name = common.fullyQualifiedName(service);
- _.each(service.children, function(method) {
- var name = common.fullyQualifiedName(method);
- if (_.indexOf(qual_names, name) !== -1) {
- throw new Error('Method ' + name + ' exposed by more than one service');
- }
- qual_names.push(name);
- });
- var method_attrs = common.getProtobufServiceAttrs(service);
- if (!service_attr_map.hasOwnProperty(service_name)) {
- service_attr_map[service_name] = {};
- }
- service_attr_map[service_name] = _.extend(service_attr_map[service_name],
- method_attrs);
- });
- return makeServerConstructor(service_attr_map);
-}
-
-/**
- * See documentation for makeServerConstructor
- */
-exports.makeServerConstructor = makeServerConstructor;
-
-/**
- * See documentation for makeProtobufServerConstructor
+ * See documentation for Server
*/
-exports.makeProtobufServerConstructor = makeProtobufServerConstructor;
+exports.Server = Server;