diff options
Diffstat (limited to 'src/node/src/server.js')
-rw-r--r-- | src/node/src/server.js | 965 |
1 files changed, 0 insertions, 965 deletions
diff --git a/src/node/src/server.js b/src/node/src/server.js deleted file mode 100644 index 8b7c0b6862..0000000000 --- a/src/node/src/server.js +++ /dev/null @@ -1,965 +0,0 @@ -/** - * @license - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -'use strict'; - -var _ = require('lodash'); - -var grpc = require('./grpc_extension'); - -var common = require('./common'); - -var Metadata = require('./metadata'); - -var constants = require('./constants'); - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); - -var EventEmitter = require('events').EventEmitter; - -/** - * Handle an error on a call by sending it as a status - * @private - * @param {grpc.internal~Call} call The call to send the error on - * @param {(Object|Error)} error The error object - */ -function handleError(call, error) { - var statusMetadata = new Metadata(); - var status = { - code: constants.status.UNKNOWN, - details: 'Unknown Error' - }; - if (error.hasOwnProperty('message')) { - status.details = error.message; - } - if (error.hasOwnProperty('code')) { - status.code = error.code; - if (error.hasOwnProperty('details')) { - status.details = error.details; - } - } - if (error.hasOwnProperty('metadata')) { - statusMetadata = error.metadata; - } - status.metadata = statusMetadata._getCoreRepresentation(); - var error_batch = {}; - if (!call.metadataSent) { - error_batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - } - error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; - call.startBatch(error_batch, function(){}); -} - -/** - * Send a response to a unary or client streaming call. - * @private - * @param {grpc.Call} call The call to respond on - * @param {*} value The value to respond with - * @param {grpc~serialize} serialize Serialization function for the - * response - * @param {grpc.Metadata=} metadata Optional trailing metadata to send with - * status - * @param {number=} [flags=0] Flags for modifying how the message is sent. - */ -function sendUnaryResponse(call, value, serialize, metadata, flags) { - var end_batch = {}; - var statusMetadata = new Metadata(); - var status = { - code: constants.status.OK, - details: 'OK' - }; - if (metadata) { - statusMetadata = metadata; - } - var message; - try { - message = serialize(value); - } catch (e) { - e.code = constants.status.INTERNAL; - handleError(call, e); - return; - } - status.metadata = statusMetadata._getCoreRepresentation(); - if (!call.metadataSent) { - end_batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - call.metadataSent = true; - } - message.grpcWriteFlags = flags; - end_batch[grpc.opType.SEND_MESSAGE] = message; - end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; - call.startBatch(end_batch, function (){}); -} - -/** - * Initialize a writable stream. This is used for both the writable and duplex - * stream constructors. - * @private - * @param {Writable} stream The stream to set up - * @param {function(*):Buffer=} Serialization function for responses - */ -function setUpWritable(stream, serialize) { - stream.finished = false; - stream.status = { - code : constants.status.OK, - details : 'OK', - metadata : new Metadata() - }; - stream.serialize = common.wrapIgnoreNull(serialize); - function sendStatus() { - var batch = {}; - if (!stream.call.metadataSent) { - stream.call.metadataSent = true; - batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - } - - if (stream.status.metadata) { - stream.status.metadata = stream.status.metadata._getCoreRepresentation(); - } - batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; - stream.call.startBatch(batch, function(){}); - } - stream.on('finish', sendStatus); - /** - * Set the pending status to a given error status. If the error does not have - * code or details properties, the code will be set to grpc.status.UNKNOWN - * and the details will be set to 'Unknown Error'. - * @param {Error} err The error object - */ - function setStatus(err) { - var code = constants.status.UNKNOWN; - var details = 'Unknown Error'; - var metadata = new Metadata(); - if (err.hasOwnProperty('message')) { - details = err.message; - } - if (err.hasOwnProperty('code')) { - code = err.code; - if (err.hasOwnProperty('details')) { - details = err.details; - } - } - if (err.hasOwnProperty('metadata')) { - metadata = err.metadata; - } - stream.status = {code: code, details: details, metadata: metadata}; - } - /** - * Terminate the call. This includes indicating that reads are done, draining - * all pending writes, and sending the given error as a status - * @param {Error} err The error object - * @this GrpcServerStream - */ - function terminateCall(err) { - // Drain readable data - setStatus(err); - stream.end(); - } - stream.on('error', terminateCall); - /** - * Override of Writable#end method that allows for sending metadata with a - * success status. - * @param {Metadata=} metadata Metadata to send with the status - */ - stream.end = function(metadata) { - if (metadata) { - stream.status.metadata = metadata; - } - Writable.prototype.end.call(this); - }; -} - -/** - * Initialize a readable stream. This is used for both the readable and duplex - * stream constructors. - * @private - * @param {Readable} stream The stream to initialize - * @param {grpc~deserialize} deserialize Deserialization function for - * incoming data. - */ -function setUpReadable(stream, deserialize) { - stream.deserialize = common.wrapIgnoreNull(deserialize); - stream.finished = false; - stream.reading = false; - - stream.terminate = function() { - stream.finished = true; - stream.on('data', function() {}); - }; - - stream.on('cancelled', function() { - stream.terminate(); - }); -} - -/** - * Emitted when the call has been cancelled. After this has been emitted, the - * call's `cancelled` property will be set to `true`. - * @event grpc~ServerUnaryCall~cancelled - */ - -util.inherits(ServerUnaryCall, EventEmitter); - -/** - * An EventEmitter. Used for unary calls. - * @constructor grpc~ServerUnaryCall - * @extends external:EventEmitter - * @param {grpc.internal~Call} call The call object associated with the request - * @param {grpc.Metadata} metadata The request metadata from the client - */ -function ServerUnaryCall(call, metadata) { - EventEmitter.call(this); - this.call = call; - /** - * Indicates if the call has been cancelled - * @member {boolean} grpc~ServerUnaryCall#cancelled - */ - this.cancelled = false; - /** - * The request metadata from the client - * @member {grpc.Metadata} grpc~ServerUnaryCall#metadata - */ - this.metadata = metadata; - /** - * The request message from the client - * @member {*} grpc~ServerUnaryCall#request - */ - this.request = undefined; -} - -/** - * Emitted when the call has been cancelled. After this has been emitted, the - * call's `cancelled` property will be set to `true`. - * @event grpc~ServerWritableStream~cancelled - */ - -util.inherits(ServerWritableStream, Writable); - -/** - * A stream that the server can write to. Used for calls that are streaming from - * the server side. - * @constructor grpc~ServerWritableStream - * @extends external:Writable - * @borrows grpc~ServerUnaryCall#sendMetadata as - * grpc~ServerWritableStream#sendMetadata - * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerWritableStream#getPeer - * @param {grpc.internal~Call} call The call object to send data with - * @param {grpc.Metadata} metadata The request metadata from the client - * @param {grpc~serialize} serialize Serialization function for writes - */ -function ServerWritableStream(call, metadata, serialize) { - Writable.call(this, {objectMode: true}); - this.call = call; - - this.finished = false; - setUpWritable(this, serialize); - /** - * Indicates if the call has been cancelled - * @member {boolean} grpc~ServerWritableStream#cancelled - */ - this.cancelled = false; - /** - * The request metadata from the client - * @member {grpc.Metadata} grpc~ServerWritableStream#metadata - */ - this.metadata = metadata; - /** - * The request message from the client - * @member {*} grpc~ServerWritableStream#request - */ - this.request = undefined; -} - -/** - * Start writing a chunk of data. This is an implementation of a method required - * for implementing stream.Writable. - * @private - * @param {Buffer} chunk The chunk of data to write - * @param {string} encoding Used to pass write flags - * @param {function(Error=)} callback Callback to indicate that the write is - * complete - */ -function _write(chunk, encoding, callback) { - /* jshint validthis: true */ - var batch = {}; - var self = this; - var message; - try { - message = this.serialize(chunk); - } catch (e) { - e.code = constants.status.INTERNAL; - callback(e); - return; - } - if (!this.call.metadataSent) { - batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - this.call.metadataSent = true; - } - if (_.isFinite(encoding)) { - /* Attach the encoding if it is a finite number. This is the closest we - * can get to checking that it is valid flags */ - message.grpcWriteFlags = encoding; - } - batch[grpc.opType.SEND_MESSAGE] = message; - this.call.startBatch(batch, function(err, value) { - if (err) { - self.emit('error', err); - return; - } - callback(); - }); -} - -ServerWritableStream.prototype._write = _write; - -/** - * Emitted when the call has been cancelled. After this has been emitted, the - * call's `cancelled` property will be set to `true`. - * @event grpc~ServerReadableStream~cancelled - */ - -util.inherits(ServerReadableStream, Readable); - -/** - * A stream that the server can read from. Used for calls that are streaming - * from the client side. - * @constructor grpc~ServerReadableStream - * @extends external:Readable - * @borrows grpc~ServerUnaryCall#sendMetadata as - * grpc~ServerReadableStream#sendMetadata - * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerReadableStream#getPeer - * @param {grpc.internal~Call} call The call object to read data with - * @param {grpc.Metadata} metadata The request metadata from the client - * @param {grpc~deserialize} deserialize Deserialization function for reads - */ -function ServerReadableStream(call, metadata, deserialize) { - Readable.call(this, {objectMode: true}); - this.call = call; - setUpReadable(this, deserialize); - /** - * Indicates if the call has been cancelled - * @member {boolean} grpc~ServerReadableStream#cancelled - */ - this.cancelled = false; - /** - * The request metadata from the client - * @member {grpc.Metadata} grpc~ServerReadableStream#metadata - */ - this.metadata = metadata; -} - -/** - * Start reading from the gRPC data source. This is an implementation of a - * method required for implementing stream.Readable - * @access private - * @param {number} size Ignored - */ -function _read(size) { - /* jshint validthis: true */ - var self = this; - /** - * Callback to be called when a READ event is received. Pushes the data onto - * the read queue and starts reading again if applicable - * @param {grpc.Event} event READ event object - */ - function readCallback(err, event) { - if (err) { - self.terminate(); - return; - } - if (self.finished) { - self.push(null); - return; - } - var data = event.read; - var deserialized; - try { - deserialized = self.deserialize(data); - } catch (e) { - e.code = constants.status.INTERNAL; - self.emit('error', e); - return; - } - if (self.push(deserialized) && data !== null) { - var read_batch = {}; - read_batch[grpc.opType.RECV_MESSAGE] = true; - self.call.startBatch(read_batch, readCallback); - } else { - self.reading = false; - } - } - if (self.finished) { - self.push(null); - } else { - if (!self.reading) { - self.reading = true; - var batch = {}; - batch[grpc.opType.RECV_MESSAGE] = true; - self.call.startBatch(batch, readCallback); - } - } -} - -ServerReadableStream.prototype._read = _read; - -/** - * Emitted when the call has been cancelled. After this has been emitted, the - * call's `cancelled` property will be set to `true`. - * @event grpc~ServerDuplexStream~cancelled - */ - -util.inherits(ServerDuplexStream, Duplex); - -/** - * A stream that the server can read from or write to. Used for calls with - * duplex streaming. - * @constructor grpc~ServerDuplexStream - * @extends external:Duplex - * @borrows grpc~ServerUnaryCall#sendMetadata as - * grpc~ServerDuplexStream#sendMetadata - * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerDuplexStream#getPeer - * @param {grpc.internal~Call} call Call object to proxy - * @param {grpc.Metadata} metadata The request metadata from the client - * @param {grpc~serialize} serialize Serialization function for requests - * @param {grpc~deserialize} deserialize Deserialization function for - * responses - */ -function ServerDuplexStream(call, metadata, serialize, deserialize) { - Duplex.call(this, {objectMode: true}); - this.call = call; - setUpWritable(this, serialize); - setUpReadable(this, deserialize); - /** - * Indicates if the call has been cancelled - * @member {boolean} grpc~ServerReadableStream#cancelled - */ - this.cancelled = false; - /** - * The request metadata from the client - * @member {grpc.Metadata} grpc~ServerReadableStream#metadata - */ - this.metadata = metadata; -} - -ServerDuplexStream.prototype._read = _read; -ServerDuplexStream.prototype._write = _write; - -/** - * Send the initial metadata for a writable stream. - * @alias grpc~ServerUnaryCall#sendMetadata - * @param {Metadata} responseMetadata Metadata to send - */ -function sendMetadata(responseMetadata) { - /* jshint validthis: true */ - var self = this; - if (!this.call.metadataSent) { - this.call.metadataSent = true; - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - responseMetadata._getCoreRepresentation(); - this.call.startBatch(batch, function(err) { - if (err) { - self.emit('error', err); - return; - } - }); - } -} - -ServerUnaryCall.prototype.sendMetadata = sendMetadata; -ServerWritableStream.prototype.sendMetadata = sendMetadata; -ServerReadableStream.prototype.sendMetadata = sendMetadata; -ServerDuplexStream.prototype.sendMetadata = sendMetadata; - -/** - * Get the endpoint this call/stream is connected to. - * @alias grpc~ServerUnaryCall#getPeer - * @return {string} The URI of the endpoint - */ -function getPeer() { - /* jshint validthis: true */ - return this.call.getPeer(); -} - -ServerUnaryCall.prototype.getPeer = getPeer; -ServerReadableStream.prototype.getPeer = getPeer; -ServerWritableStream.prototype.getPeer = getPeer; -ServerDuplexStream.prototype.getPeer = getPeer; - -/** - * Wait for the client to close, then emit a cancelled event if the client - * cancelled. - * @private - */ -function waitForCancel() { - /* jshint validthis: true */ - var self = this; - var cancel_batch = {}; - cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - self.call.startBatch(cancel_batch, function(err, result) { - if (err) { - self.emit('error', err); - } - if (result.cancelled) { - self.cancelled = true; - self.emit('cancelled'); - } - }); -} - -ServerUnaryCall.prototype.waitForCancel = waitForCancel; -ServerReadableStream.prototype.waitForCancel = waitForCancel; -ServerWritableStream.prototype.waitForCancel = waitForCancel; -ServerDuplexStream.prototype.waitForCancel = waitForCancel; - -/** - * Callback function passed to server handlers that handle methods with unary - * responses. - * @callback grpc.Server~sendUnaryData - * @param {grpc~ServiceError} error An error, if the call failed - * @param {*} value The response value. Must be a valid argument to the - * `responseSerialize` method of the method that is being handled - * @param {grpc.Metadata=} trailer Trailing metadata to send, if applicable - * @param {grpc.writeFlags=} flags Flags to modify writing the response - */ - -/** - * User-provided method to handle unary requests on a server - * @callback grpc.Server~handleUnaryCall - * @param {grpc~ServerUnaryCall} call The call object - * @param {grpc.Server~sendUnaryData} callback The callback to call to respond - * to the request - */ - -/** - * Fully handle a unary call - * @private - * @param {grpc.internal~Call} call The call to handle - * @param {Object} handler Request handler object for the method that was called - * @param {grpc~Server.handleUnaryCall} handler.func The handler function - * @param {grpc~deserialize} handler.deserialize The deserialization function - * for request data - * @param {grpc~serialize} handler.serialize The serialization function for - * response data - * @param {grpc.Metadata} metadata Metadata from the client - */ -function handleUnary(call, handler, metadata) { - var emitter = new ServerUnaryCall(call, metadata); - emitter.on('error', function(error) { - handleError(call, error); - }); - emitter.waitForCancel(); - var batch = {}; - batch[grpc.opType.RECV_MESSAGE] = true; - call.startBatch(batch, function(err, result) { - if (err) { - handleError(call, err); - return; - } - try { - emitter.request = handler.deserialize(result.read); - } catch (e) { - e.code = constants.status.INTERNAL; - handleError(call, e); - return; - } - if (emitter.cancelled) { - return; - } - handler.func(emitter, function sendUnaryData(err, value, trailer, flags) { - if (err) { - if (trailer) { - err.metadata = trailer; - } - handleError(call, err); - } else { - sendUnaryResponse(call, value, handler.serialize, trailer, flags); - } - }); - }); -} - -/** - * User provided method to handle server streaming methods on the server. - * @callback grpc.Server~handleServerStreamingCall - * @param {grpc~ServerWritableStream} call The call object - */ - -/** - * Fully handle a server streaming call - * @private - * @param {grpc.internal~Call} call The call to handle - * @param {Object} handler Request handler object for the method that was called - * @param {grpc~Server.handleServerStreamingCall} handler.func The handler - * function - * @param {grpc~deserialize} handler.deserialize The deserialization function - * for request data - * @param {grpc~serialize} handler.serialize The serialization function for - * response data - * @param {grpc.Metadata} metadata Metadata from the client - */ -function handleServerStreaming(call, handler, metadata) { - var stream = new ServerWritableStream(call, metadata, handler.serialize); - stream.waitForCancel(); - var batch = {}; - batch[grpc.opType.RECV_MESSAGE] = true; - call.startBatch(batch, function(err, result) { - if (err) { - stream.emit('error', err); - return; - } - try { - stream.request = handler.deserialize(result.read); - } catch (e) { - e.code = constants.status.INTERNAL; - stream.emit('error', e); - return; - } - handler.func(stream); - }); -} - -/** - * User provided method to handle client streaming methods on the server. - * @callback grpc.Server~handleClientStreamingCall - * @param {grpc~ServerReadableStream} call The call object - * @param {grpc.Server~sendUnaryData} callback The callback to call to respond - * to the request - */ - -/** - * Fully handle a client streaming call - * @access private - * @param {grpc.internal~Call} call The call to handle - * @param {Object} handler Request handler object for the method that was called - * @param {grpc~Server.handleClientStreamingCall} handler.func The handler - * function - * @param {grpc~deserialize} handler.deserialize The deserialization function - * for request data - * @param {grpc~serialize} handler.serialize The serialization function for - * response data - * @param {grpc.Metadata} metadata Metadata from the client - */ -function handleClientStreaming(call, handler, metadata) { - var stream = new ServerReadableStream(call, metadata, handler.deserialize); - stream.on('error', function(error) { - handleError(call, error); - }); - stream.waitForCancel(); - handler.func(stream, function(err, value, trailer, flags) { - stream.terminate(); - if (err) { - if (trailer) { - err.metadata = trailer; - } - handleError(call, err); - } else { - sendUnaryResponse(call, value, handler.serialize, trailer, flags); - } - }); -} - -/** - * User provided method to handle bidirectional streaming calls on the server. - * @callback grpc.Server~handleBidiStreamingCall - * @param {grpc~ServerDuplexStream} call The call object - */ - -/** - * Fully handle a bidirectional streaming call - * @private - * @param {grpc.internal~Call} call The call to handle - * @param {Object} handler Request handler object for the method that was called - * @param {grpc~Server.handleBidiStreamingCall} handler.func The handler - * function - * @param {grpc~deserialize} handler.deserialize The deserialization function - * for request data - * @param {grpc~serialize} handler.serialize The serialization function for - * response data - * @param {Metadata} metadata Metadata from the client - */ -function handleBidiStreaming(call, handler, metadata) { - var stream = new ServerDuplexStream(call, metadata, handler.serialize, - handler.deserialize); - stream.waitForCancel(); - handler.func(stream); -} - -var streamHandlers = { - unary: handleUnary, - server_stream: handleServerStreaming, - client_stream: handleClientStreaming, - bidi: handleBidiStreaming -}; - -/** - * Constructs a server object that stores request handlers and delegates - * incoming requests to those handlers - * @memberof grpc - * @constructor - * @param {Object=} options Options that should be passed to the internal server - * implementation - * @example - * var server = new grpc.Server(); - * server.addProtoService(protobuf_service_descriptor, service_implementation); - * server.bind('address:port', server_credential); - * server.start(); - */ -function Server(options) { - this.handlers = {}; - var server = new grpc.Server(options); - this._server = server; - this.started = false; -} - -/** - * Start the server and begin handling requests - */ -Server.prototype.start = function() { - if (this.started) { - throw new Error('Server is already running'); - } - var self = this; - this.started = true; - this._server.start(); - /** - * Handles the SERVER_RPC_NEW event. If there is a handler associated with - * the requested method, use that handler to respond to the request. Then - * wait for the next request - * @param {grpc.internal~Event} event The event to handle with tag - * SERVER_RPC_NEW - */ - function handleNewCall(err, event) { - if (err) { - return; - } - var details = event.new_call; - var call = details.call; - var method = details.method; - var metadata = Metadata._fromCoreRepresentation(details.metadata); - if (method === null) { - return; - } - self._server.requestCall(handleNewCall); - var handler; - if (self.handlers.hasOwnProperty(method)) { - handler = self.handlers[method]; - } else { - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { - code: constants.status.UNIMPLEMENTED, - details: '', - metadata: {} - }; - batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - call.startBatch(batch, function() {}); - return; - } - streamHandlers[handler.type](call, handler, metadata); - } - this._server.requestCall(handleNewCall); -}; - -/** - * Unified type for application handlers for all types of calls - * @typedef {(grpc.Server~handleUnaryCall - * |grpc.Server~handleClientStreamingCall - * |grpc.Server~handleServerStreamingCall - * |grpc.Server~handleBidiStreamingCall)} grpc.Server~handleCall - */ - -/** - * Registers a handler to handle the named method. Fails if there already is - * a handler for the given method. Returns true on success - * @param {string} name The name of the method that the provided function should - * handle/respond to. - * @param {grpc.Server~handleCall} handler Function that takes a stream of - * request values and returns a stream of response values - * @param {grpc~serialize} serialize Serialization function for responses - * @param {grpc~deserialize} deserialize Deserialization function for requests - * @param {string} type The streaming type of method that this handles - * @return {boolean} True if the handler was set. False if a handler was already - * set for that name. - */ -Server.prototype.register = function(name, handler, serialize, deserialize, - type) { - if (this.handlers.hasOwnProperty(name)) { - return false; - } - this.handlers[name] = { - func: handler, - serialize: serialize, - deserialize: deserialize, - type: type - }; - return true; -}; - -/** - * Gracefully shuts down the server. The server will stop receiving new calls, - * and any pending calls will complete. The callback will be called when all - * pending calls have completed and the server is fully shut down. This method - * is idempotent with itself and forceShutdown. - * @param {function()} callback The shutdown complete callback - */ -Server.prototype.tryShutdown = function(callback) { - this._server.tryShutdown(callback); -}; - -/** - * Forcibly shuts down the server. The server will stop receiving new calls - * and cancel all pending calls. When it returns, the server has shut down. - * This method is idempotent with itself and tryShutdown, and it will trigger - * any outstanding tryShutdown callbacks. - */ -Server.prototype.forceShutdown = function() { - this._server.forceShutdown(); -}; - -var unimplementedStatusResponse = { - code: constants.status.UNIMPLEMENTED, - details: 'The server does not implement this method' -}; - -var defaultHandler = { - unary: function(call, callback) { - callback(unimplementedStatusResponse); - }, - client_stream: function(call, callback) { - callback(unimplementedStatusResponse); - }, - server_stream: function(call) { - call.emit('error', unimplementedStatusResponse); - }, - bidi: function(call) { - call.emit('error', unimplementedStatusResponse); - } -}; - -/** - * Add a service to the server, with a corresponding implementation. - * @param {grpc~ServiceDefinition} service The service descriptor - * @param {Object<String, grpc.Server~handleCall>} implementation Map of method - * names to method implementation for the provided service. - */ -Server.prototype.addService = function(service, implementation) { - if (!_.isObject(service) || !_.isObject(implementation)) { - throw new Error('addService requires two objects as arguments'); - } - if (_.keys(service).length === 0) { - throw new Error('Cannot add an empty service to a server'); - } - if (this.started) { - throw new Error('Can\'t add a service to a started server.'); - } - var self = this; - _.forOwn(service, function(attrs, name) { - var method_type; - if (attrs.requestStream) { - if (attrs.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (attrs.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - var impl; - if (implementation[name] === undefined) { - /* Handle the case where the method is passed with the name exactly as - written in the proto file, instead of using JavaScript function - naming style */ - if (implementation[attrs.originalName] === undefined) { - common.log(constants.logVerbosity.ERROR, 'Method handler ' + name + - ' for ' + attrs.path + ' expected but not provided'); - impl = defaultHandler[method_type]; - } else { - impl = _.bind(implementation[attrs.originalName], implementation); - } - } else { - impl = _.bind(implementation[name], implementation); - } - var serialize = attrs.responseSerialize; - var deserialize = attrs.requestDeserialize; - var register_success = self.register(attrs.path, impl, serialize, - deserialize, method_type); - if (!register_success) { - throw new Error('Method handler for ' + attrs.path + - ' already provided.'); - } - }); -}; - -/** - * Add a proto service to the server, with a corresponding implementation - * @deprecated Use {@link grpc.Server#addService} instead - * @param {Protobuf.Reflect.Service} service The proto service descriptor - * @param {Object<String, grpc.Server~handleCall>} implementation Map of method - * names to method implementation for the provided service. - */ -Server.prototype.addProtoService = util.deprecate(function(service, - implementation) { - var options; - var protobuf_js_5_common = require('./protobuf_js_5_common'); - var protobuf_js_6_common = require('./protobuf_js_6_common'); - if (protobuf_js_5_common.isProbablyProtobufJs5(service)) { - options = _.defaults(service.grpc_options, common.defaultGrpcOptions); - this.addService( - protobuf_js_5_common.getProtobufServiceAttrs(service, options), - implementation); - } else if (protobuf_js_6_common.isProbablyProtobufJs6(service)) { - options = _.defaults(service.grpc_options, common.defaultGrpcOptions); - this.addService( - protobuf_js_6_common.getProtobufServiceAttrs(service, options), - implementation); - } else { - // We assume that this is a service attributes object - this.addService(service, implementation); - } -}, 'Server#addProtoService: Use Server#addService instead'); - -/** - * Binds the server to the given port, with SSL disabled if creds is an - * insecure credentials object - * @param {string} port The port that the server should bind on, in the format - * "address:port" - * @param {grpc.ServerCredentials} creds Server credential object to be used for - * SSL. Pass an insecure credentials object for an insecure port. - */ -Server.prototype.bind = function(port, creds) { - if (this.started) { - throw new Error('Can\'t bind an already running server to an address'); - } - return this._server.addHttp2Port(port, creds); -}; - -exports.Server = Server; |