diff options
Diffstat (limited to 'src/node/src')
-rw-r--r-- | src/node/src/client.js | 951 | ||||
-rw-r--r-- | src/node/src/common.js | 172 | ||||
-rw-r--r-- | src/node/src/constants.js | 236 | ||||
-rw-r--r-- | src/node/src/credentials.js | 207 | ||||
-rw-r--r-- | src/node/src/grpc_extension.js | 32 | ||||
-rw-r--r-- | src/node/src/metadata.js | 172 | ||||
-rw-r--r-- | src/node/src/protobuf_js_5_common.js | 171 | ||||
-rw-r--r-- | src/node/src/protobuf_js_6_common.js | 160 | ||||
-rw-r--r-- | src/node/src/server.js | 965 |
9 files changed, 0 insertions, 3066 deletions
diff --git a/src/node/src/client.js b/src/node/src/client.js deleted file mode 100644 index edc51b7802..0000000000 --- a/src/node/src/client.js +++ /dev/null @@ -1,951 +0,0 @@ -/** - * @license - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** - * Client module - * - * This module contains the factory method for creating Client classes, and the - * method calling code for all types of methods. - * - * @example <caption>Create a client and call a method on it</caption> - * - * var proto_obj = grpc.load(proto_file_path); - * var Client = proto_obj.package.subpackage.ServiceName; - * var client = new Client(server_address, client_credentials); - * var call = client.unaryMethod(arguments, callback); - */ - -'use strict'; - -var _ = require('lodash'); -var arguejs = require('arguejs'); - -var grpc = require('./grpc_extension'); - -var common = require('./common'); - -var Metadata = require('./metadata'); - -var constants = require('./constants'); - -var EventEmitter = require('events').EventEmitter; - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); -var version = require('../../../package.json').version; - -/** - * Initial response metadata sent by the server when it starts processing the - * call - * @event grpc~ClientUnaryCall#metadata - * @type {grpc.Metadata} - */ - -/** - * Status of the call when it has completed. - * @event grpc~ClientUnaryCall#status - * @type grpc~StatusObject - */ - -util.inherits(ClientUnaryCall, EventEmitter); - -/** - * An EventEmitter. Used for unary calls. - * @constructor grpc~ClientUnaryCall - * @extends external:EventEmitter - * @param {grpc.internal~Call} call The call object associated with the request - */ -function ClientUnaryCall(call) { - EventEmitter.call(this); - this.call = call; -} - -util.inherits(ClientWritableStream, Writable); - -/** - * A stream that the client can write to. Used for calls that are streaming from - * the client side. - * @constructor grpc~ClientWritableStream - * @extends external:Writable - * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientWritableStream#cancel - * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientWritableStream#getPeer - * @borrows grpc~ClientUnaryCall#event:metadata as - * grpc~ClientWritableStream#metadata - * @borrows grpc~ClientUnaryCall#event:status as - * grpc~ClientWritableStream#status - * @param {grpc.internal~Call} call The call object to send data with - * @param {grpc~serialize=} [serialize=identity] Serialization - * function for writes. - */ -function ClientWritableStream(call, serialize) { - Writable.call(this, {objectMode: true}); - this.call = call; - this.serialize = common.wrapIgnoreNull(serialize); - this.on('finish', function() { - var batch = {}; - batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; - call.startBatch(batch, function() {}); - }); -} - -/** - * Write a message to the request stream. If serializing the argument fails, - * the call will be cancelled and the stream will end with an error. - * @name grpc~ClientWritableStream#write - * @kind function - * @override - * @param {*} message The message to write. Must be a valid argument to the - * serialize function of the corresponding method - * @param {grpc.writeFlags} flags Flags to modify how the message is written - * @param {Function} callback Callback for when this chunk of data is flushed - * @return {boolean} As defined for [Writable]{@link external:Writable} - */ - -/** - * Attempt to write the given chunk. Calls the callback when done. This is an - * implementation of a method needed for implementing stream.Writable. - * @private - * @param {*} chunk The chunk to write - * @param {grpc.writeFlags} encoding Used to pass write flags - * @param {function(Error=)} callback Called when the write is complete - */ -function _write(chunk, encoding, callback) { - /* jshint validthis: true */ - var batch = {}; - var message; - var self = this; - if (this.writeFailed) { - /* Once a write fails, just call the callback immediately to let the caller - flush any pending writes. */ - setImmediate(callback); - return; - } - try { - message = this.serialize(chunk); - } catch (e) { - /* Sending this error to the server and emitting it immediately on the - client may put the call in a slightly weird state on the client side, - but passing an object that causes a serialization failure is a misuse - of the API anyway, so that's OK. The primary purpose here is to give the - programmer a useful error and to stop the stream properly */ - this.call.cancelWithStatus(constants.status.INTERNAL, - 'Serialization failure'); - callback(e); - return; - } - if (_.isFinite(encoding)) { - /* Attach the encoding if it is a finite number. This is the closest we - * can get to checking that it is valid flags */ - message.grpcWriteFlags = encoding; - } - batch[grpc.opType.SEND_MESSAGE] = message; - this.call.startBatch(batch, function(err, event) { - if (err) { - /* Assume that the call is complete and that writing failed because a - status was received. In that case, set a flag to discard all future - writes */ - self.writeFailed = true; - } - callback(); - }); -} - -ClientWritableStream.prototype._write = _write; - -util.inherits(ClientReadableStream, Readable); - -/** - * A stream that the client can read from. Used for calls that are streaming - * from the server side. - * @constructor grpc~ClientReadableStream - * @extends external:Readable - * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientReadableStream#cancel - * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientReadableStream#getPeer - * @borrows grpc~ClientUnaryCall#event:metadata as - * grpc~ClientReadableStream#metadata - * @borrows grpc~ClientUnaryCall#event:status as - * grpc~ClientReadableStream#status - * @param {grpc.internal~Call} call The call object to read data with - * @param {grpc~deserialize=} [deserialize=identity] - * Deserialization function for reads - */ -function ClientReadableStream(call, deserialize) { - Readable.call(this, {objectMode: true}); - this.call = call; - this.finished = false; - this.reading = false; - this.deserialize = common.wrapIgnoreNull(deserialize); - /* Status generated from reading messages from the server. Overrides the - * status from the server if not OK */ - this.read_status = null; - /* Status received from the server. */ - this.received_status = null; -} - -/** - * Called when all messages from the server have been processed. The status - * parameter indicates that the call should end with that status. status - * defaults to OK if not provided. - * @param {Object!} status The status that the call should end with - * @private - */ -function _readsDone(status) { - /* jshint validthis: true */ - if (!status) { - status = {code: constants.status.OK, details: 'OK'}; - } - if (status.code !== constants.status.OK) { - this.call.cancelWithStatus(status.code, status.details); - } - this.finished = true; - this.read_status = status; - this._emitStatusIfDone(); -} - -ClientReadableStream.prototype._readsDone = _readsDone; - -/** - * Called to indicate that we have received a status from the server. - * @private - */ -function _receiveStatus(status) { - /* jshint validthis: true */ - this.received_status = status; - this._emitStatusIfDone(); -} - -ClientReadableStream.prototype._receiveStatus = _receiveStatus; - -/** - * If we have both processed all incoming messages and received the status from - * the server, emit the status. Otherwise, do nothing. - * @private - */ -function _emitStatusIfDone() { - /* jshint validthis: true */ - var status; - if (this.read_status && this.received_status) { - if (this.read_status.code !== constants.status.OK) { - status = this.read_status; - } else { - status = this.received_status; - } - if (status.code === constants.status.OK) { - this.push(null); - } else { - var error = new Error(status.details); - error.code = status.code; - error.metadata = status.metadata; - this.emit('error', error); - } - this.emit('status', status); - } -} - -ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone; - -/** - * Read the next object from the stream. - * @private - * @param {*} size Ignored because we use objectMode=true - */ -function _read(size) { - /* jshint validthis: true */ - var self = this; - /** - * Callback to be called when a READ event is received. Pushes the data onto - * the read queue and starts reading again if applicable - * @param {grpc.Event} event READ event object - */ - function readCallback(err, event) { - if (err) { - // Something has gone wrong. Stop reading and wait for status - self.finished = true; - self._readsDone(); - return; - } - var data = event.read; - var deserialized; - try { - deserialized = self.deserialize(data); - } catch (e) { - self._readsDone({code: constants.status.INTERNAL, - details: 'Failed to parse server response'}); - return; - } - if (data === null) { - self._readsDone(); - return; - } - if (self.push(deserialized) && data !== null) { - var read_batch = {}; - read_batch[grpc.opType.RECV_MESSAGE] = true; - self.call.startBatch(read_batch, readCallback); - } else { - self.reading = false; - } - } - if (self.finished) { - self.push(null); - } else { - if (!self.reading) { - self.reading = true; - var read_batch = {}; - read_batch[grpc.opType.RECV_MESSAGE] = true; - self.call.startBatch(read_batch, readCallback); - } - } -} - -ClientReadableStream.prototype._read = _read; - -util.inherits(ClientDuplexStream, Duplex); - -/** - * A stream that the client can read from or write to. Used for calls with - * duplex streaming. - * @constructor grpc~ClientDuplexStream - * @extends external:Duplex - * @borrows grpc~ClientUnaryCall#cancel as grpc~ClientDuplexStream#cancel - * @borrows grpc~ClientUnaryCall#getPeer as grpc~ClientDuplexStream#getPeer - * @borrows grpc~ClientWritableStream#write as grpc~ClientDuplexStream#write - * @borrows grpc~ClientUnaryCall#event:metadata as - * grpc~ClientDuplexStream#metadata - * @borrows grpc~ClientUnaryCall#event:status as - * grpc~ClientDuplexStream#status - * @param {grpc.internal~Call} call Call object to proxy - * @param {grpc~serialize=} [serialize=identity] Serialization - * function for requests - * @param {grpc~deserialize=} [deserialize=identity] - * Deserialization function for responses - */ -function ClientDuplexStream(call, serialize, deserialize) { - Duplex.call(this, {objectMode: true}); - this.serialize = common.wrapIgnoreNull(serialize); - this.deserialize = common.wrapIgnoreNull(deserialize); - this.call = call; - /* Status generated from reading messages from the server. Overrides the - * status from the server if not OK */ - this.read_status = null; - /* Status received from the server. */ - this.received_status = null; - this.on('finish', function() { - var batch = {}; - batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; - call.startBatch(batch, function() {}); - }); -} - -ClientDuplexStream.prototype._readsDone = _readsDone; -ClientDuplexStream.prototype._receiveStatus = _receiveStatus; -ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone; -ClientDuplexStream.prototype._read = _read; -ClientDuplexStream.prototype._write = _write; - -/** - * Cancel the ongoing call. Results in the call ending with a CANCELLED status, - * unless it has already ended with some other status. - * @alias grpc~ClientUnaryCall#cancel - */ -function cancel() { - /* jshint validthis: true */ - this.call.cancel(); -} - -ClientUnaryCall.prototype.cancel = cancel; -ClientReadableStream.prototype.cancel = cancel; -ClientWritableStream.prototype.cancel = cancel; -ClientDuplexStream.prototype.cancel = cancel; - -/** - * Get the endpoint this call/stream is connected to. - * @return {string} The URI of the endpoint - * @alias grpc~ClientUnaryCall#getPeer - */ -function getPeer() { - /* jshint validthis: true */ - return this.call.getPeer(); -} - -ClientUnaryCall.prototype.getPeer = getPeer; -ClientReadableStream.prototype.getPeer = getPeer; -ClientWritableStream.prototype.getPeer = getPeer; -ClientDuplexStream.prototype.getPeer = getPeer; - -/** - * Any client call type - * @typedef {(ClientUnaryCall|ClientReadableStream| - * ClientWritableStream|ClientDuplexStream)} - * grpc.Client~Call - */ - -/** - * Options that can be set on a call. - * @typedef {Object} grpc.Client~CallOptions - * @property {grpc~Deadline} deadline The deadline for the entire call to - * complete. - * @property {string} host Server hostname to set on the call. Only meaningful - * if different from the server address used to construct the client. - * @property {grpc.Client~Call} parent Parent call. Used in servers when - * making a call as part of the process of handling a call. Used to - * propagate some information automatically, as specified by - * propagate_flags. - * @property {number} propagate_flags Indicates which properties of a parent - * call should propagate to this call. Bitwise combination of flags in - * {@link grpc.propagate}. - * @property {grpc.credentials~CallCredentials} credentials The credentials that - * should be used to make this particular call. - */ - -/** - * Get a call object built with the provided options. - * @access private - * @param {grpc.Client~CallOptions=} options Options object. - */ -function getCall(channel, method, options) { - var deadline; - var host; - var parent; - var propagate_flags; - var credentials; - if (options) { - deadline = options.deadline; - host = options.host; - parent = _.get(options, 'parent.call'); - propagate_flags = options.propagate_flags; - credentials = options.credentials; - } - if (deadline === undefined) { - deadline = Infinity; - } - var call = new grpc.Call(channel, method, deadline, host, - parent, propagate_flags); - if (credentials) { - call.setCredentials(credentials); - } - return call; -} - -/** - * A generic gRPC client. Primarily useful as a base class for generated clients - * @memberof grpc - * @constructor - * @param {string} address Server address to connect to - * @param {grpc~ChannelCredentials} credentials Credentials to use to connect to - * the server - * @param {Object} options Options to apply to channel creation - */ -function Client(address, credentials, options) { - if (!options) { - options = {}; - } - /* Append the grpc-node user agent string after the application user agent - * string, and put the combination at the beginning of the user agent string - */ - if (options['grpc.primary_user_agent']) { - options['grpc.primary_user_agent'] += ' '; - } else { - options['grpc.primary_user_agent'] = ''; - } - options['grpc.primary_user_agent'] += 'grpc-node/' + version; - /* Private fields use $ as a prefix instead of _ because it is an invalid - * prefix of a method name */ - this.$channel = new grpc.Channel(address, credentials, options); -} - -exports.Client = Client; - -/** - * @callback grpc.Client~requestCallback - * @param {?grpc~ServiceError} error The error, if the call - * failed - * @param {*} value The response value, if the call succeeded - */ - -/** - * Make a unary request to the given method, using the given serialize - * and deserialize functions, with the given argument. - * @param {string} method The name of the method to request - * @param {grpc~serialize} serialize The serialization function for - * inputs - * @param {grpc~deserialize} deserialize The deserialization - * function for outputs - * @param {*} argument The argument to the call. Should be serializable with - * serialize - * @param {grpc.Metadata=} metadata Metadata to add to the call - * @param {grpc.Client~CallOptions=} options Options map - * @param {grpc.Client~requestCallback} callback The callback to - * for when the response is received - * @return {grpc~ClientUnaryCall} An event emitter for stream related events - */ -Client.prototype.makeUnaryRequest = function(method, serialize, deserialize, - argument, metadata, options, - callback) { - /* While the arguments are listed in the function signature, those variables - * are not used directly. Instead, ArgueJS processes the arguments - * object. This allows for simple handling of optional arguments in the - * middle of the argument list, and also provides type checking. */ - var args = arguejs({method: String, serialize: Function, - deserialize: Function, - argument: null, metadata: [Metadata, new Metadata()], - options: [Object], callback: Function}, arguments); - var call = getCall(this.$channel, method, args.options); - var emitter = new ClientUnaryCall(call); - metadata = args.metadata.clone(); - var client_batch = {}; - var message = serialize(args.argument); - if (args.options) { - message.grpcWriteFlags = args.options.flags; - } - - client_batch[grpc.opType.SEND_INITIAL_METADATA] = - metadata._getCoreRepresentation(); - client_batch[grpc.opType.SEND_MESSAGE] = message; - client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; - client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - client_batch[grpc.opType.RECV_MESSAGE] = true; - client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; - call.startBatch(client_batch, function(err, response) { - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - var status = response.status; - var error; - var deserialized; - emitter.emit('metadata', Metadata._fromCoreRepresentation( - response.metadata)); - if (status.code === constants.status.OK) { - if (err) { - // Got a batch error, but OK status. Something went wrong - args.callback(err); - return; - } else { - try { - deserialized = deserialize(response.read); - } catch (e) { - /* Change status to indicate bad server response. This will result - * in passing an error to the callback */ - status = { - code: constants.status.INTERNAL, - details: 'Failed to parse server response' - }; - } - } - } - if (status.code !== constants.status.OK) { - error = new Error(status.details); - error.code = status.code; - error.metadata = status.metadata; - args.callback(error); - } else { - args.callback(null, deserialized); - } - emitter.emit('status', status); - }); - return emitter; -}; - -/** - * Make a client stream request to the given method, using the given serialize - * and deserialize functions, with the given argument. - * @param {string} method The name of the method to request - * @param {grpc~serialize} serialize The serialization function for - * inputs - * @param {grpc~deserialize} deserialize The deserialization - * function for outputs - * @param {grpc.Metadata=} metadata Array of metadata key/value pairs to add to - * the call - * @param {grpc.Client~CallOptions=} options Options map - * @param {grpc.Client~requestCallback} callback The callback to for when the - * response is received - * @return {grpc~ClientWritableStream} An event emitter for stream related - * events - */ -Client.prototype.makeClientStreamRequest = function(method, serialize, - deserialize, metadata, - options, callback) { - /* While the arguments are listed in the function signature, those variables - * are not used directly. Instead, ArgueJS processes the arguments - * object. This allows for simple handling of optional arguments in the - * middle of the argument list, and also provides type checking. */ - var args = arguejs({method:String, serialize: Function, - deserialize: Function, - metadata: [Metadata, new Metadata()], - options: [Object], callback: Function}, arguments); - var call = getCall(this.$channel, method, args.options); - metadata = args.metadata.clone(); - var stream = new ClientWritableStream(call, serialize); - var metadata_batch = {}; - metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = - metadata._getCoreRepresentation(); - metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - call.startBatch(metadata_batch, function(err, response) { - if (err) { - // The call has stopped for some reason. A non-OK status will arrive - // in the other batch. - return; - } - stream.emit('metadata', Metadata._fromCoreRepresentation( - response.metadata)); - }); - var client_batch = {}; - client_batch[grpc.opType.RECV_MESSAGE] = true; - client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; - call.startBatch(client_batch, function(err, response) { - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - var status = response.status; - var error; - var deserialized; - if (status.code === constants.status.OK) { - if (err) { - // Got a batch error, but OK status. Something went wrong - args.callback(err); - return; - } else { - try { - deserialized = deserialize(response.read); - } catch (e) { - /* Change status to indicate bad server response. This will result - * in passing an error to the callback */ - status = { - code: constants.status.INTERNAL, - details: 'Failed to parse server response' - }; - } - } - } - if (status.code !== constants.status.OK) { - error = new Error(response.status.details); - error.code = status.code; - error.metadata = status.metadata; - args.callback(error); - } else { - args.callback(null, deserialized); - } - stream.emit('status', status); - }); - return stream; -}; - -/** - * Make a server stream request to the given method, with the given serialize - * and deserialize function, using the given argument - * @param {string} method The name of the method to request - * @param {grpc~serialize} serialize The serialization function for inputs - * @param {grpc~deserialize} deserialize The deserialization - * function for outputs - * @param {*} argument The argument to the call. Should be serializable with - * serialize - * @param {grpc.Metadata=} metadata Array of metadata key/value pairs to add to - * the call - * @param {grpc.Client~CallOptions=} options Options map - * @return {grpc~ClientReadableStream} An event emitter for stream related - * events - */ -Client.prototype.makeServerStreamRequest = function(method, serialize, - deserialize, argument, - metadata, options) { - /* While the arguments are listed in the function signature, those variables - * are not used directly. Instead, ArgueJS processes the arguments - * object. */ - var args = arguejs({method:String, serialize: Function, - deserialize: Function, - argument: null, metadata: [Metadata, new Metadata()], - options: [Object]}, arguments); - var call = getCall(this.$channel, method, args.options); - metadata = args.metadata.clone(); - var stream = new ClientReadableStream(call, deserialize); - var start_batch = {}; - var message = serialize(args.argument); - if (args.options) { - message.grpcWriteFlags = args.options.flags; - } - start_batch[grpc.opType.SEND_INITIAL_METADATA] = - metadata._getCoreRepresentation(); - start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - start_batch[grpc.opType.SEND_MESSAGE] = message; - start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; - call.startBatch(start_batch, function(err, response) { - if (err) { - // The call has stopped for some reason. A non-OK status will arrive - // in the other batch. - return; - } - stream.emit('metadata', Metadata._fromCoreRepresentation( - response.metadata)); - }); - var status_batch = {}; - status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; - call.startBatch(status_batch, function(err, response) { - if (err) { - stream.emit('error', err); - return; - } - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - stream._receiveStatus(response.status); - }); - return stream; -}; - - -/** - * Make a bidirectional stream request with this method on the given channel. - * @param {string} method The name of the method to request - * @param {grpc~serialize} serialize The serialization function for inputs - * @param {grpc~deserialize} deserialize The deserialization - * function for outputs - * @param {grpc.Metadata=} metadata Array of metadata key/value - * pairs to add to the call - * @param {grpc.Client~CallOptions=} options Options map - * @return {grpc~ClientDuplexStream} An event emitter for stream related events - */ -Client.prototype.makeBidiStreamRequest = function(method, serialize, - deserialize, metadata, - options) { - /* While the arguments are listed in the function signature, those variables - * are not used directly. Instead, ArgueJS processes the arguments - * object. */ - var args = arguejs({method:String, serialize: Function, - deserialize: Function, - metadata: [Metadata, new Metadata()], - options: [Object]}, arguments); - var call = getCall(this.$channel, method, args.options); - metadata = args.metadata.clone(); - var stream = new ClientDuplexStream(call, serialize, deserialize); - var start_batch = {}; - start_batch[grpc.opType.SEND_INITIAL_METADATA] = - metadata._getCoreRepresentation(); - start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - call.startBatch(start_batch, function(err, response) { - if (err) { - // The call has stopped for some reason. A non-OK status will arrive - // in the other batch. - return; - } - stream.emit('metadata', Metadata._fromCoreRepresentation( - response.metadata)); - }); - var status_batch = {}; - status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; - call.startBatch(status_batch, function(err, response) { - if (err) { - stream.emit('error', err); - return; - } - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - stream._receiveStatus(response.status); - }); - return stream; -}; - -/** - * Close this client. - */ -Client.prototype.close = function() { - this.$channel.close(); -}; - -/** - * Return the underlying channel object for the specified client - * @return {Channel} The channel - */ -Client.prototype.getChannel = function() { - return this.$channel; -}; - -/** - * Wait for the client to be ready. The callback will be called when the - * client has successfully connected to the server, and it will be called - * with an error if the attempt to connect to the server has unrecoverablly - * failed or if the deadline expires. This function will make the channel - * start connecting if it has not already done so. - * @param {grpc~Deadline} deadline When to stop waiting for a connection. - * @param {function(Error)} callback The callback to call when done attempting - * to connect. - */ -Client.prototype.waitForReady = function(deadline, callback) { - var self = this; - var checkState = function(err) { - if (err) { - callback(new Error('Failed to connect before the deadline')); - return; - } - var new_state = self.$channel.getConnectivityState(true); - if (new_state === grpc.connectivityState.READY) { - callback(); - } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { - callback(new Error('Failed to connect to server')); - } else { - self.$channel.watchConnectivityState(new_state, deadline, checkState); - } - }; - checkState(); -}; - -/** - * Map with short names for each of the requester maker functions. Used in - * makeClientConstructor - * @private - */ -var requester_funcs = { - unary: Client.prototype.makeUnaryRequest, - server_stream: Client.prototype.makeServerStreamRequest, - client_stream: Client.prototype.makeClientStreamRequest, - bidi: Client.prototype.makeBidiStreamRequest -}; - -function getDefaultValues(metadata, options) { - var res = {}; - res.metadata = metadata || new Metadata(); - res.options = options || {}; - return res; -} - -/** - * Map with wrappers for each type of requester function to make it use the old - * argument order with optional arguments after the callback. - * @access private - */ -var deprecated_request_wrap = { - unary: function(makeUnaryRequest) { - return function makeWrappedUnaryRequest(argument, callback, - metadata, options) { - /* jshint validthis: true */ - var opt_args = getDefaultValues(metadata, metadata); - return makeUnaryRequest.call(this, argument, opt_args.metadata, - opt_args.options, callback); - }; - }, - client_stream: function(makeServerStreamRequest) { - return function makeWrappedClientStreamRequest(callback, metadata, - options) { - /* jshint validthis: true */ - var opt_args = getDefaultValues(metadata, options); - return makeServerStreamRequest.call(this, opt_args.metadata, - opt_args.options, callback); - }; - }, - server_stream: _.identity, - bidi: _.identity -}; - -/** - * Creates a constructor for a client with the given methods, as specified in - * the methods argument. The resulting class will have an instance method for - * each method in the service, which is a partial application of one of the - * [Client]{@link grpc.Client} request methods, depending on `requestSerialize` - * and `responseSerialize`, with the `method`, `serialize`, and `deserialize` - * arguments predefined. - * @memberof grpc - * @alias grpc~makeGenericClientConstructor - * @param {grpc~ServiceDefinition} methods An object mapping method names to - * method attributes - * @param {string} serviceName The fully qualified name of the service - * @param {Object} class_options An options object. - * @param {boolean=} [class_options.deprecatedArgumentOrder=false] Indicates - * that the old argument order should be used for methods, with optional - * arguments at the end instead of the callback at the end. This option - * is only a temporary stopgap measure to smooth an API breakage. - * It is deprecated, and new code should not use it. - * @return {function} New client constructor, which is a subclass of - * {@link grpc.Client}, and has the same arguments as that constructor. - */ -exports.makeClientConstructor = function(methods, serviceName, - class_options) { - if (!class_options) { - class_options = {}; - } - - function ServiceClient(address, credentials, options) { - Client.call(this, address, credentials, options); - } - - util.inherits(ServiceClient, Client); - - _.each(methods, function(attrs, name) { - var method_type; - if (_.startsWith(name, '$')) { - throw new Error('Method names cannot start with $'); - } - if (attrs.requestStream) { - if (attrs.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (attrs.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - var serialize = attrs.requestSerialize; - var deserialize = attrs.responseDeserialize; - var method_func = _.partial(requester_funcs[method_type], attrs.path, - serialize, deserialize); - if (class_options.deprecatedArgumentOrder) { - ServiceClient.prototype[name] = deprecated_request_wrap(method_func); - } else { - ServiceClient.prototype[name] = method_func; - } - // Associate all provided attributes with the method - _.assign(ServiceClient.prototype[name], attrs); - }); - - ServiceClient.service = methods; - - return ServiceClient; -}; - -/** - * Return the underlying channel object for the specified client - * @memberof grpc - * @alias grpc~getClientChannel - * @param {Client} client - * @return {Channel} The channel - * @see grpc.Client#getChannel - */ -exports.getClientChannel = function(client) { - return Client.prototype.getChannel.call(client); -}; - -/** - * Wait for the client to be ready. The callback will be called when the - * client has successfully connected to the server, and it will be called - * with an error if the attempt to connect to the server has unrecoverablly - * failed or if the deadline expires. This function will make the channel - * start connecting if it has not already done so. - * @memberof grpc - * @alias grpc~waitForClientReady - * @param {Client} client The client to wait on - * @param {grpc~Deadline} deadline When to stop waiting for a connection. Pass - * Infinity to wait forever. - * @param {function(Error)} callback The callback to call when done attempting - * to connect. - * @see grpc.Client#waitForReady - */ -exports.waitForClientReady = function(client, deadline, callback) { - Client.prototype.waitForReady.call(client, deadline, callback); -}; diff --git a/src/node/src/common.js b/src/node/src/common.js deleted file mode 100644 index 5a444f5e96..0000000000 --- a/src/node/src/common.js +++ /dev/null @@ -1,172 +0,0 @@ -/** - * @license - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -'use strict'; - -var _ = require('lodash'); - -/** - * Wrap a function to pass null-like values through without calling it. If no - * function is given, just uses the identity. - * @private - * @param {?function} func The function to wrap - * @return {function} The wrapped function - */ -exports.wrapIgnoreNull = function wrapIgnoreNull(func) { - if (!func) { - return _.identity; - } - return function(arg) { - if (arg === null || arg === undefined) { - return null; - } - return func(arg); - }; -}; - -/** - * The logger object for the gRPC module. Defaults to console. - * @private - */ -exports.logger = console; - -/** - * The current logging verbosity. 0 corresponds to logging everything - * @private - */ -exports.logVerbosity = 0; - -/** - * Log a message if the severity is at least as high as the current verbosity - * @private - * @param {Number} severity A value of the grpc.logVerbosity map - * @param {String} message The message to log - */ -exports.log = function log(severity, message) { - if (severity >= exports.logVerbosity) { - exports.logger.error(message); - } -}; - -/** - * Default options for loading proto files into gRPC - * @alias grpc~defaultLoadOptions - */ -exports.defaultGrpcOptions = { - convertFieldsToCamelCase: false, - binaryAsBase64: false, - longsAsStrings: true, - enumsAsStrings: true, - deprecatedArgumentOrder: false -}; - -// JSDoc definitions that are used in multiple other modules - -/** - * Represents the status of a completed request. If `code` is - * {@link grpc.status}.OK, then the request has completed successfully. - * Otherwise, the request has failed, `details` will contain a description of - * the error. Either way, `metadata` contains the trailing response metadata - * sent by the server when it finishes processing the call. - * @typedef {object} grpc~StatusObject - * @property {number} code The error code, a key of {@link grpc.status} - * @property {string} details Human-readable description of the status - * @property {grpc.Metadata} metadata Trailing metadata sent with the status, - * if applicable - */ - -/** - * Describes how a request has failed. The member `message` will be the same as - * `details` in {@link grpc~StatusObject}, and `code` and `metadata` are the - * same as in that object. - * @typedef {Error} grpc~ServiceError - * @property {number} code The error code, a key of {@link grpc.status} that is - * not `grpc.status.OK` - * @property {grpc.Metadata} metadata Trailing metadata sent with the status, - * if applicable - */ - -/** - * The EventEmitter class in the event standard module - * @external EventEmitter - * @see https://nodejs.org/api/events.html#events_class_eventemitter - */ - -/** - * The Readable class in the stream standard module - * @external Readable - * @see https://nodejs.org/api/stream.html#stream_readable_streams - */ - -/** - * The Writable class in the stream standard module - * @external Writable - * @see https://nodejs.org/api/stream.html#stream_writable_streams - */ - -/** - * The Duplex class in the stream standard module - * @external Duplex - * @see https://nodejs.org/api/stream.html#stream_class_stream_duplex - */ - -/** - * A serialization function - * @callback grpc~serialize - * @param {*} value The value to serialize - * @return {Buffer} The value serialized as a byte sequence - */ - -/** - * A deserialization function - * @callback grpc~deserialize - * @param {Buffer} data The byte sequence to deserialize - * @return {*} The data deserialized as a value - */ - -/** - * The deadline of an operation. If it is a date, the deadline is reached at - * the date and time specified. If it is a finite number, it is treated as - * a number of milliseconds since the Unix Epoch. If it is Infinity, the - * deadline will never be reached. If it is -Infinity, the deadline has already - * passed. - * @typedef {(number|date)} grpc~Deadline - */ - -/** - * An object that completely defines a service method signature. - * @typedef {Object} grpc~MethodDefinition - * @property {string} path The method's URL path - * @property {boolean} requestStream Indicates whether the method accepts - * a stream of requests - * @property {boolean} responseStream Indicates whether the method returns - * a stream of responses - * @property {grpc~serialize} requestSerialize Serialization - * function for request values - * @property {grpc~serialize} responseSerialize Serialization - * function for response values - * @property {grpc~deserialize} requestDeserialize Deserialization - * function for request data - * @property {grpc~deserialize} responseDeserialize Deserialization - * function for repsonse data - */ - -/** - * An object that completely defines a service. - * @typedef {Object.<string, grpc~MethodDefinition>} grpc~ServiceDefinition - */ diff --git a/src/node/src/constants.js b/src/node/src/constants.js deleted file mode 100644 index c90e44d0d3..0000000000 --- a/src/node/src/constants.js +++ /dev/null @@ -1,236 +0,0 @@ -/** - * @license - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/* The comments about status codes are copied verbatim (with some formatting - * modifications) from include/grpc/impl/codegen/status.h, for the purpose of - * including them in generated documentation. - */ -/** - * Enum of status codes that gRPC can return - * @memberof grpc - * @alias grpc.status - * @readonly - * @enum {number} - */ -exports.status = { - /** Not an error; returned on success */ - OK: 0, - /** The operation was cancelled (typically by the caller). */ - CANCELLED: 1, - /** - * Unknown error. An example of where this error may be returned is - * if a status value received from another address space belongs to - * an error-space that is not known in this address space. Also - * errors raised by APIs that do not return enough error information - * may be converted to this error. - */ - UNKNOWN: 2, - /** - * Client specified an invalid argument. Note that this differs - * from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments - * that are problematic regardless of the state of the system - * (e.g., a malformed file name). - */ - INVALID_ARGUMENT: 3, - /** - * Deadline expired before operation could complete. For operations - * that change the state of the system, this error may be returned - * even if the operation has completed successfully. For example, a - * successful response from a server could have been delayed long - * enough for the deadline to expire. - */ - DEADLINE_EXCEEDED: 4, - /** Some requested entity (e.g., file or directory) was not found. */ - NOT_FOUND: 5, - /** - * Some entity that we attempted to create (e.g., file or directory) - * already exists. - */ - ALREADY_EXISTS: 6, - /** - * The caller does not have permission to execute the specified - * operation. PERMISSION_DENIED must not be used for rejections - * caused by exhausting some resource (use RESOURCE_EXHAUSTED - * instead for those errors). PERMISSION_DENIED must not be - * used if the caller can not be identified (use UNAUTHENTICATED - * instead for those errors). - */ - PERMISSION_DENIED: 7, - /** - * Some resource has been exhausted, perhaps a per-user quota, or - * perhaps the entire file system is out of space. - */ - RESOURCE_EXHAUSTED: 8, - /** - * Operation was rejected because the system is not in a state - * required for the operation's execution. For example, directory - * to be deleted may be non-empty, an rmdir operation is applied to - * a non-directory, etc. - * - * A litmus test that may help a service implementor in deciding - * between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: - * - * - Use UNAVAILABLE if the client can retry just the failing call. - * - Use ABORTED if the client should retry at a higher-level - * (e.g., restarting a read-modify-write sequence). - * - Use FAILED_PRECONDITION if the client should not retry until - * the system state has been explicitly fixed. E.g., if an "rmdir" - * fails because the directory is non-empty, FAILED_PRECONDITION - * should be returned since the client should not retry unless - * they have first fixed up the directory by deleting files from it. - * - Use FAILED_PRECONDITION if the client performs conditional - * REST Get/Update/Delete on a resource and the resource on the - * server does not match the condition. E.g., conflicting - * read-modify-write on the same resource. - */ - FAILED_PRECONDITION: 9, - /** - * The operation was aborted, typically due to a concurrency issue - * like sequencer check failures, transaction aborts, etc. - * - * See litmus test above for deciding between FAILED_PRECONDITION, - * ABORTED, and UNAVAILABLE. - */ - ABORTED: 10, - /** - * Operation was attempted past the valid range. E.g., seeking or - * reading past end of file. - * - * Unlike INVALID_ARGUMENT, this error indicates a problem that may - * be fixed if the system state changes. For example, a 32-bit file - * system will generate INVALID_ARGUMENT if asked to read at an - * offset that is not in the range [0,2^32-1], but it will generate - * OUT_OF_RANGE if asked to read from an offset past the current - * file size. - * - * There is a fair bit of overlap between FAILED_PRECONDITION and - * OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific - * error) when it applies so that callers who are iterating through - * a space can easily look for an OUT_OF_RANGE error to detect when - * they are done. - */ - OUT_OF_RANGE: 11, - /** Operation is not implemented or not supported/enabled in this service. */ - UNIMPLEMENTED: 12, - /** - * Internal errors. Means some invariants expected by underlying - * system has been broken. If you see one of these errors, - * something is very broken. - */ - INTERNAL: 13, - /** - * The service is currently unavailable. This is a most likely a - * transient condition and may be corrected by retrying with - * a backoff. - * - * See litmus test above for deciding between FAILED_PRECONDITION, - * ABORTED, and UNAVAILABLE. */ - UNAVAILABLE: 14, - /** Unrecoverable data loss or corruption. */ - DATA_LOSS: 15, - /** - * The request does not have valid authentication credentials for the - * operation. - */ - UNAUTHENTICATED: 16 -}; - -/* The comments about propagation bit flags are copied rom - * include/grpc/impl/codegen/propagation_bits.h for the purpose of including - * them in generated documentation. - */ -/** - * Propagation flags: these can be bitwise or-ed to form the propagation option - * for calls. - * - * Users are encouraged to write propagation masks as deltas from the default. - * i.e. write `grpc.propagate.DEFAULTS & ~grpc.propagate.DEADLINE` to disable - * deadline propagation. - * @memberof grpc - * @alias grpc.propagate - * @enum {number} - */ -exports.propagate = { - DEADLINE: 1, - CENSUS_STATS_CONTEXT: 2, - CENSUS_TRACING_CONTEXT: 4, - CANCELLATION: 8, - DEFAULTS: 65535 -}; - -/* Many of the following comments are copied from - * include/grpc/impl/codegen/grpc_types.h - */ -/** - * Call error constants. Call errors almost always indicate bugs in the gRPC - * library, and these error codes are mainly useful for finding those bugs. - * @memberof grpc - * @readonly - * @enum {number} - */ -const callError = { - OK: 0, - ERROR: 1, - NOT_ON_SERVER: 2, - NOT_ON_CLIENT: 3, - ALREADY_INVOKED: 5, - NOT_INVOKED: 6, - ALREADY_FINISHED: 7, - TOO_MANY_OPERATIONS: 8, - INVALID_FLAGS: 9, - INVALID_METADATA: 10, - INVALID_MESSAGE: 11, - NOT_SERVER_COMPLETION_QUEUE: 12, - BATCH_TOO_BIG: 13, - PAYLOAD_TYPE_MISMATCH: 14 -}; - -exports.callError = callError; - -/** - * Write flags: these can be bitwise or-ed to form write options that modify - * how data is written. - * @memberof grpc - * @alias grpc.writeFlags - * @readonly - * @enum {number} - */ -exports.writeFlags = { - /** - * Hint that the write may be buffered and need not go out on the wire - * immediately. GRPC is free to buffer the message until the next non-buffered - * write, or until writes_done, but it need not buffer completely or at all. - */ - BUFFER_HINT: 1, - /** - * Force compression to be disabled for a particular write - */ - NO_COMPRESS: 2 -}; - -/** - * @memberof grpc - * @alias grpc.logVerbosity - * @readonly - * @enum {number} - */ -exports.logVerbosity = { - DEBUG: 0, - INFO: 1, - ERROR: 2 -}; diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js deleted file mode 100644 index d68d888e6a..0000000000 --- a/src/node/src/credentials.js +++ /dev/null @@ -1,207 +0,0 @@ -/** - * @license - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** - * Credentials module - * - * This module contains factory methods for two different credential types: - * CallCredentials and ChannelCredentials. ChannelCredentials are things like - * SSL credentials that can be used to secure a connection, and are used to - * construct a Client object. CallCredentials genrally modify metadata, so they - * can be attached to an individual method call. - * - * CallCredentials can be composed with other CallCredentials to create - * CallCredentials. ChannelCredentials can be composed with CallCredentials - * to create ChannelCredentials. No combined credential can have more than - * one ChannelCredentials. - * - * For example, to create a client secured with SSL that uses Google - * default application credentials to authenticate: - * - * @example - * var channel_creds = credentials.createSsl(root_certs); - * (new GoogleAuth()).getApplicationDefault(function(err, credential) { - * var call_creds = credentials.createFromGoogleCredential(credential); - * var combined_creds = credentials.combineChannelCredentials( - * channel_creds, call_creds); - * var client = new Client(address, combined_creds); - * }); - * - * @namespace grpc.credentials - */ - -'use strict'; - -var grpc = require('./grpc_extension'); - -/** - * This cannot be constructed directly. Instead, instances of this class should - * be created using the factory functions in {@link grpc.credentials} - * @constructor grpc.credentials~CallCredentials - */ -var CallCredentials = grpc.CallCredentials; - -/** - * This cannot be constructed directly. Instead, instances of this class should - * be created using the factory functions in {@link grpc.credentials} - * @constructor grpc.credentials~ChannelCredentials - */ -var ChannelCredentials = grpc.ChannelCredentials; - -var Metadata = require('./metadata.js'); - -var common = require('./common.js'); - -var constants = require('./constants'); - -var _ = require('lodash'); - -/** - * @external GoogleCredential - * @see https://github.com/google/google-auth-library-nodejs - */ - -/** - * Create an SSL Credentials object. If using a client-side certificate, both - * the second and third arguments must be passed. - * @memberof grpc.credentials - * @alias grpc.credentials.createSsl - * @kind function - * @param {Buffer=} root_certs The root certificate data - * @param {Buffer=} private_key The client certificate private key, if - * applicable - * @param {Buffer=} cert_chain The client certificate cert chain, if applicable - * @return {grpc.credentials.ChannelCredentials} The SSL Credentials object - */ -exports.createSsl = ChannelCredentials.createSsl; - -/** - * @callback grpc.credentials~metadataCallback - * @param {Error} error The error, if getting metadata failed - * @param {grpc.Metadata} metadata The metadata - */ - -/** - * @callback grpc.credentials~generateMetadata - * @param {Object} params Parameters that can modify metadata generation - * @param {string} params.service_url The URL of the service that the call is - * going to - * @param {grpc.credentials~metadataCallback} callback - */ - -/** - * Create a gRPC credentials object from a metadata generation function. This - * function gets the service URL and a callback as parameters. The error - * passed to the callback can optionally have a 'code' value attached to it, - * which corresponds to a status code that this library uses. - * @memberof grpc.credentials - * @alias grpc.credentials.createFromMetadataGenerator - * @param {grpc.credentials~generateMetadata} metadata_generator The function - * that generates metadata - * @return {grpc.credentials.CallCredentials} The credentials object - */ -exports.createFromMetadataGenerator = function(metadata_generator) { - return CallCredentials.createFromPlugin(function(service_url, cb_data, - callback) { - metadata_generator({service_url: service_url}, function(error, metadata) { - var code = constants.status.OK; - var message = ''; - if (error) { - message = error.message; - if (error.hasOwnProperty('code') && _.isFinite(error.code)) { - code = error.code; - } else { - code = constants.status.UNAUTHENTICATED; - } - if (!metadata) { - metadata = new Metadata(); - } - } - callback(code, message, metadata._getCoreRepresentation(), cb_data); - }); - }); -}; - -/** - * Create a gRPC credential from a Google credential object. - * @memberof grpc.credentials - * @alias grpc.credentials.createFromGoogleCredential - * @param {external:GoogleCredential} google_credential The Google credential - * object to use - * @return {grpc.credentials.CallCredentials} The resulting credentials object - */ -exports.createFromGoogleCredential = function(google_credential) { - return exports.createFromMetadataGenerator(function(auth_context, callback) { - var service_url = auth_context.service_url; - google_credential.getRequestMetadata(service_url, function(err, header) { - if (err) { - common.log(constants.logVerbosity.INFO, 'Auth error:' + err); - callback(err); - return; - } - var metadata = new Metadata(); - metadata.add('authorization', header.Authorization); - callback(null, metadata); - }); - }); -}; - -/** - * Combine a ChannelCredentials with any number of CallCredentials into a single - * ChannelCredentials object. - * @memberof grpc.credentials - * @alias grpc.credentials.combineChannelCredentials - * @param {ChannelCredentials} channel_credential The ChannelCredentials to - * start with - * @param {...CallCredentials} credentials The CallCredentials to compose - * @return ChannelCredentials A credentials object that combines all of the - * input credentials - */ -exports.combineChannelCredentials = function(channel_credential) { - var current = channel_credential; - for (var i = 1; i < arguments.length; i++) { - current = current.compose(arguments[i]); - } - return current; -}; - -/** - * Combine any number of CallCredentials into a single CallCredentials object - * @memberof grpc.credentials - * @alias grpc.credentials.combineCallCredentials - * @param {...CallCredentials} credentials the CallCredentials to compose - * @return CallCredentials A credentials object that combines all of the input - * credentials - */ -exports.combineCallCredentials = function() { - var current = arguments[0]; - for (var i = 1; i < arguments.length; i++) { - current = current.compose(arguments[i]); - } - return current; -}; - -/** - * Create an insecure credentials object. This is used to create a channel that - * does not use SSL. This cannot be composed with anything. - * @memberof grpc.credentials - * @alias grpc.credentials.createInsecure - * @kind function - * @return {ChannelCredentials} The insecure credentials object - */ -exports.createInsecure = ChannelCredentials.createInsecure; diff --git a/src/node/src/grpc_extension.js b/src/node/src/grpc_extension.js deleted file mode 100644 index af43eacad2..0000000000 --- a/src/node/src/grpc_extension.js +++ /dev/null @@ -1,32 +0,0 @@ -/** - * @license - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** - * @module - * @private - */ - -'use strict'; - -var binary = require('node-pre-gyp/lib/pre-binding'); -var path = require('path'); -var binding_path = - binary.find(path.resolve(path.join(__dirname, '../../../package.json'))); -var binding = require(binding_path); - -module.exports = binding; diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js deleted file mode 100644 index 46f9e0fead..0000000000 --- a/src/node/src/metadata.js +++ /dev/null @@ -1,172 +0,0 @@ -/** - * @license - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -'use strict'; - -var _ = require('lodash'); - -var grpc = require('./grpc_extension'); - -/** - * Class for storing metadata. Keys are normalized to lowercase ASCII. - * @memberof grpc - * @constructor - * @example - * var metadata = new metadata_module.Metadata(); - * metadata.set('key1', 'value1'); - * metadata.add('key1', 'value2'); - * metadata.get('key1') // returns ['value1', 'value2'] - */ -function Metadata() { - this._internal_repr = {}; -} - -function normalizeKey(key) { - key = key.toLowerCase(); - if (grpc.metadataKeyIsLegal(key)) { - return key; - } else { - throw new Error('Metadata key"' + key + '" contains illegal characters'); - } -} - -function validate(key, value) { - if (grpc.metadataKeyIsBinary(key)) { - if (!(value instanceof Buffer)) { - throw new Error('keys that end with \'-bin\' must have Buffer values'); - } - } else { - if (!_.isString(value)) { - throw new Error( - 'keys that don\'t end with \'-bin\' must have String values'); - } - if (!grpc.metadataNonbinValueIsLegal(value)) { - throw new Error('Metadata string value "' + value + - '" contains illegal characters'); - } - } -} - -/** - * Sets the given value for the given key, replacing any other values associated - * with that key. Normalizes the key. - * @param {String} key The key to set - * @param {String|Buffer} value The value to set. Must be a buffer if and only - * if the normalized key ends with '-bin' - */ -Metadata.prototype.set = function(key, value) { - key = normalizeKey(key); - validate(key, value); - this._internal_repr[key] = [value]; -}; - -/** - * Adds the given value for the given key. Normalizes the key. - * @param {String} key The key to add to. - * @param {String|Buffer} value The value to add. Must be a buffer if and only - * if the normalized key ends with '-bin' - */ -Metadata.prototype.add = function(key, value) { - key = normalizeKey(key); - validate(key, value); - if (!this._internal_repr[key]) { - this._internal_repr[key] = []; - } - this._internal_repr[key].push(value); -}; - -/** - * Remove the given key and any associated values. Normalizes the key. - * @param {String} key The key to remove - */ -Metadata.prototype.remove = function(key) { - key = normalizeKey(key); - if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) { - delete this._internal_repr[key]; - } -}; - -/** - * Gets a list of all values associated with the key. Normalizes the key. - * @param {String} key The key to get - * @return {Array.<String|Buffer>} The values associated with that key - */ -Metadata.prototype.get = function(key) { - key = normalizeKey(key); - if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) { - return this._internal_repr[key]; - } else { - return []; - } -}; - -/** - * Get a map of each key to a single associated value. This reflects the most - * common way that people will want to see metadata. - * @return {Object.<String,String|Buffer>} A key/value mapping of the metadata - */ -Metadata.prototype.getMap = function() { - var result = {}; - _.forOwn(this._internal_repr, function(values, key) { - if(values.length > 0) { - result[key] = values[0]; - } - }); - return result; -}; - -/** - * Clone the metadata object. - * @return {Metadata} The new cloned object - */ -Metadata.prototype.clone = function() { - var copy = new Metadata(); - _.forOwn(this._internal_repr, function(value, key) { - copy._internal_repr[key] = _.clone(value); - }); - return copy; -}; - -/** - * Gets the metadata in the format used by interal code. Intended for internal - * use only. API stability is not guaranteed. - * @private - * @return {Object.<String, Array.<String|Buffer>>} The metadata - */ -Metadata.prototype._getCoreRepresentation = function() { - return this._internal_repr; -}; - -/** - * Creates a Metadata object from a metadata map in the internal format. - * Intended for internal use only. API stability is not guaranteed. - * @private - * @param {Object.<String, Array.<String|Buffer>>} The metadata - * @return {Metadata} The new Metadata object - */ -Metadata._fromCoreRepresentation = function(metadata) { - var newMetadata = new Metadata(); - if (metadata) { - _.forOwn(metadata, function(value, key) { - newMetadata._internal_repr[key] = _.clone(value); - }); - } - return newMetadata; -}; - -module.exports = Metadata; diff --git a/src/node/src/protobuf_js_5_common.js b/src/node/src/protobuf_js_5_common.js deleted file mode 100644 index 541965fd0a..0000000000 --- a/src/node/src/protobuf_js_5_common.js +++ /dev/null @@ -1,171 +0,0 @@ -/** - * @license - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** - * @module - * @private - */ - -'use strict'; - -var _ = require('lodash'); -var client = require('./client'); - -/** - * Get a function that deserializes a specific type of protobuf. - * @param {function()} cls The constructor of the message type to deserialize - * @param {bool=} binaryAsBase64 Deserialize bytes fields as base64 strings - * instead of Buffers. Defaults to false - * @param {bool=} longsAsStrings Deserialize long values as strings instead of - * objects. Defaults to true - * @return {function(Buffer):cls} The deserialization function - */ -exports.deserializeCls = function deserializeCls(cls, options) { - /** - * Deserialize a buffer to a message object - * @param {Buffer} arg_buf The buffer to deserialize - * @return {cls} The resulting object - */ - return function deserialize(arg_buf) { - // Convert to a native object with binary fields as Buffers (first argument) - // and longs as strings (second argument) - return cls.decode(arg_buf).toRaw(options.binaryAsBase64, - options.longsAsStrings); - }; -}; - -var deserializeCls = exports.deserializeCls; - -/** - * Get a function that serializes objects to a buffer by protobuf class. - * @param {function()} Cls The constructor of the message type to serialize - * @return {function(Cls):Buffer} The serialization function - */ -exports.serializeCls = function serializeCls(Cls) { - /** - * Serialize an object to a Buffer - * @param {Object} arg The object to serialize - * @return {Buffer} The serialized object - */ - return function serialize(arg) { - return new Buffer(new Cls(arg).encode().toBuffer()); - }; -}; - -var serializeCls = exports.serializeCls; - -/** - * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value. - * @param {ProtoBuf.Reflect.Namespace} value The value to get the name of - * @return {string} The fully qualified name of the value - */ -exports.fullyQualifiedName = function fullyQualifiedName(value) { - if (value === null || value === undefined) { - return ''; - } - var name = value.name; - var parent_name = fullyQualifiedName(value.parent); - if (parent_name !== '') { - name = parent_name + '.' + name; - } - return name; -}; - -var fullyQualifiedName = exports.fullyQualifiedName; - -/** - * Return a map from method names to method attributes for the service. - * @param {ProtoBuf.Reflect.Service} service The service to get attributes for - * @param {Object=} options Options to apply to these attributes - * @return {Object} The attributes map - */ -exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service, - options) { - var prefix = '/' + fullyQualifiedName(service) + '/'; - var binaryAsBase64, longsAsStrings; - if (options) { - binaryAsBase64 = options.binaryAsBase64; - longsAsStrings = options.longsAsStrings; - } - /* This slightly awkward construction is used to make sure we only use - lodash@3.10.1-compatible functions. A previous version used - _.fromPairs, which would be cleaner, but was introduced in lodash - version 4 */ - return _.zipObject(_.map(service.children, function(method) { - return _.camelCase(method.name); - }), _.map(service.children, function(method) { - return { - originalName: method.name, - path: prefix + method.name, - requestStream: method.requestStream, - responseStream: method.responseStream, - requestType: method.resolvedRequestType, - responseType: method.resolvedResponseType, - requestSerialize: serializeCls(method.resolvedRequestType.build()), - requestDeserialize: deserializeCls(method.resolvedRequestType.build(), - options), - responseSerialize: serializeCls(method.resolvedResponseType.build()), - responseDeserialize: deserializeCls(method.resolvedResponseType.build(), - options) - }; - })); -}; - -var getProtobufServiceAttrs = exports.getProtobufServiceAttrs; - -/** - * Load a gRPC object from an existing ProtoBuf.Reflect object. - * @param {ProtoBuf.Reflect.Namespace} value The ProtoBuf object to load. - * @param {Object=} options Options to apply to the loaded object - * @return {Object<string, *>} The resulting gRPC object - */ -exports.loadObject = function loadObject(value, options) { - var result = {}; - if (!value) { - return value; - } - if (value.hasOwnProperty('ns')) { - return loadObject(value.ns, options); - } - if (value.className === 'Namespace') { - _.each(value.children, function(child) { - result[child.name] = loadObject(child, options); - }); - return result; - } else if (value.className === 'Service') { - return client.makeClientConstructor(getProtobufServiceAttrs(value, options), - options); - } else if (value.className === 'Message' || value.className === 'Enum') { - return value.build(); - } else { - return value; - } -}; - -/** - * The primary purpose of this method is to distinguish between reflection - * objects from different versions of ProtoBuf.js. This is just a heuristic, - * checking for properties that are (currently) specific to this version of - * ProtoBuf.js - * @param {Object} obj The object to check - * @return {boolean} Whether the object appears to be a Protobuf.js 5 - * ReflectionObject - */ -exports.isProbablyProtobufJs5 = function isProbablyProtobufJs5(obj) { - return _.isArray(obj.children) && (typeof obj.build === 'function'); -}; diff --git a/src/node/src/protobuf_js_6_common.js b/src/node/src/protobuf_js_6_common.js deleted file mode 100644 index 0f07251677..0000000000 --- a/src/node/src/protobuf_js_6_common.js +++ /dev/null @@ -1,160 +0,0 @@ -/** - * @license - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** - * @module - * @private - */ - -'use strict'; - -var _ = require('lodash'); -var client = require('./client'); - -/** - * Get a function that deserializes a specific type of protobuf. - * @param {function()} cls The constructor of the message type to deserialize - * @param {bool=} binaryAsBase64 Deserialize bytes fields as base64 strings - * instead of Buffers. Defaults to false - * @param {bool=} longsAsStrings Deserialize long values as strings instead of - * objects. Defaults to true - * @return {function(Buffer):cls} The deserialization function - */ -exports.deserializeCls = function deserializeCls(cls, options) { - var conversion_options = { - defaults: true, - bytes: options.binaryAsBase64 ? String : Buffer, - longs: options.longsAsStrings ? String : null, - enums: options.enumsAsStrings ? String : null, - oneofs: true - }; - /** - * Deserialize a buffer to a message object - * @param {Buffer} arg_buf The buffer to deserialize - * @return {cls} The resulting object - */ - return function deserialize(arg_buf) { - return cls.toObject(cls.decode(arg_buf), conversion_options); - }; -}; - -var deserializeCls = exports.deserializeCls; - -/** - * Get a function that serializes objects to a buffer by protobuf class. - * @param {function()} Cls The constructor of the message type to serialize - * @return {function(Cls):Buffer} The serialization function - */ -exports.serializeCls = function serializeCls(cls) { - /** - * Serialize an object to a Buffer - * @param {Object} arg The object to serialize - * @return {Buffer} The serialized object - */ - return function serialize(arg) { - var message = cls.fromObject(arg); - return cls.encode(message).finish(); - }; -}; - -var serializeCls = exports.serializeCls; - -/** - * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value. - * @param {ProtoBuf.ReflectionObject} value The value to get the name of - * @return {string} The fully qualified name of the value - */ -exports.fullyQualifiedName = function fullyQualifiedName(value) { - if (value === null || value === undefined) { - return ''; - } - var name = value.name; - var parent_fqn = fullyQualifiedName(value.parent); - if (parent_fqn !== '') { - name = parent_fqn + '.' + name; - } - return name; -}; - -var fullyQualifiedName = exports.fullyQualifiedName; - -/** - * Return a map from method names to method attributes for the service. - * @param {ProtoBuf.Service} service The service to get attributes for - * @param {Object=} options Options to apply to these attributes - * @return {Object} The attributes map - */ -exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service, - options) { - var prefix = '/' + fullyQualifiedName(service) + '/'; - service.resolveAll(); - return _.zipObject(_.map(service.methods, function(method) { - return _.camelCase(method.name); - }), _.map(service.methods, function(method) { - return { - originalName: method.name, - path: prefix + method.name, - requestStream: !!method.requestStream, - responseStream: !!method.responseStream, - requestType: method.resolvedRequestType, - responseType: method.resolvedResponseType, - requestSerialize: serializeCls(method.resolvedRequestType), - requestDeserialize: deserializeCls(method.resolvedRequestType, options), - responseSerialize: serializeCls(method.resolvedResponseType), - responseDeserialize: deserializeCls(method.resolvedResponseType, options) - }; - })); -}; - -var getProtobufServiceAttrs = exports.getProtobufServiceAttrs; - -exports.loadObject = function loadObject(value, options) { - var result = {}; - if (!value) { - return value; - } - if (value.hasOwnProperty('methods')) { - // It's a service object - var service_attrs = getProtobufServiceAttrs(value, options); - return client.makeClientConstructor(service_attrs); - } - - if (value.hasOwnProperty('nested')) { - // It's a namespace or root object - _.each(value.nested, function(nested, name) { - result[name] = loadObject(nested, options); - }); - return result; - } - - // Otherwise, it's not something we need to change - return value; -}; - -/** - * The primary purpose of this method is to distinguish between reflection - * objects from different versions of ProtoBuf.js. This is just a heuristic, - * checking for properties that are (currently) specific to this version of - * ProtoBuf.js - * @param {Object} obj The object to check - * @return {boolean} Whether the object appears to be a Protobuf.js 6 - * ReflectionObject - */ -exports.isProbablyProtobufJs6 = function isProbablyProtobufJs6(obj) { - return (typeof obj.root === 'object') && (typeof obj.resolve === 'function'); -}; diff --git a/src/node/src/server.js b/src/node/src/server.js deleted file mode 100644 index 8b7c0b6862..0000000000 --- a/src/node/src/server.js +++ /dev/null @@ -1,965 +0,0 @@ -/** - * @license - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -'use strict'; - -var _ = require('lodash'); - -var grpc = require('./grpc_extension'); - -var common = require('./common'); - -var Metadata = require('./metadata'); - -var constants = require('./constants'); - -var stream = require('stream'); - -var Readable = stream.Readable; -var Writable = stream.Writable; -var Duplex = stream.Duplex; -var util = require('util'); - -var EventEmitter = require('events').EventEmitter; - -/** - * Handle an error on a call by sending it as a status - * @private - * @param {grpc.internal~Call} call The call to send the error on - * @param {(Object|Error)} error The error object - */ -function handleError(call, error) { - var statusMetadata = new Metadata(); - var status = { - code: constants.status.UNKNOWN, - details: 'Unknown Error' - }; - if (error.hasOwnProperty('message')) { - status.details = error.message; - } - if (error.hasOwnProperty('code')) { - status.code = error.code; - if (error.hasOwnProperty('details')) { - status.details = error.details; - } - } - if (error.hasOwnProperty('metadata')) { - statusMetadata = error.metadata; - } - status.metadata = statusMetadata._getCoreRepresentation(); - var error_batch = {}; - if (!call.metadataSent) { - error_batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - } - error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; - call.startBatch(error_batch, function(){}); -} - -/** - * Send a response to a unary or client streaming call. - * @private - * @param {grpc.Call} call The call to respond on - * @param {*} value The value to respond with - * @param {grpc~serialize} serialize Serialization function for the - * response - * @param {grpc.Metadata=} metadata Optional trailing metadata to send with - * status - * @param {number=} [flags=0] Flags for modifying how the message is sent. - */ -function sendUnaryResponse(call, value, serialize, metadata, flags) { - var end_batch = {}; - var statusMetadata = new Metadata(); - var status = { - code: constants.status.OK, - details: 'OK' - }; - if (metadata) { - statusMetadata = metadata; - } - var message; - try { - message = serialize(value); - } catch (e) { - e.code = constants.status.INTERNAL; - handleError(call, e); - return; - } - status.metadata = statusMetadata._getCoreRepresentation(); - if (!call.metadataSent) { - end_batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - call.metadataSent = true; - } - message.grpcWriteFlags = flags; - end_batch[grpc.opType.SEND_MESSAGE] = message; - end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; - call.startBatch(end_batch, function (){}); -} - -/** - * Initialize a writable stream. This is used for both the writable and duplex - * stream constructors. - * @private - * @param {Writable} stream The stream to set up - * @param {function(*):Buffer=} Serialization function for responses - */ -function setUpWritable(stream, serialize) { - stream.finished = false; - stream.status = { - code : constants.status.OK, - details : 'OK', - metadata : new Metadata() - }; - stream.serialize = common.wrapIgnoreNull(serialize); - function sendStatus() { - var batch = {}; - if (!stream.call.metadataSent) { - stream.call.metadataSent = true; - batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - } - - if (stream.status.metadata) { - stream.status.metadata = stream.status.metadata._getCoreRepresentation(); - } - batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; - stream.call.startBatch(batch, function(){}); - } - stream.on('finish', sendStatus); - /** - * Set the pending status to a given error status. If the error does not have - * code or details properties, the code will be set to grpc.status.UNKNOWN - * and the details will be set to 'Unknown Error'. - * @param {Error} err The error object - */ - function setStatus(err) { - var code = constants.status.UNKNOWN; - var details = 'Unknown Error'; - var metadata = new Metadata(); - if (err.hasOwnProperty('message')) { - details = err.message; - } - if (err.hasOwnProperty('code')) { - code = err.code; - if (err.hasOwnProperty('details')) { - details = err.details; - } - } - if (err.hasOwnProperty('metadata')) { - metadata = err.metadata; - } - stream.status = {code: code, details: details, metadata: metadata}; - } - /** - * Terminate the call. This includes indicating that reads are done, draining - * all pending writes, and sending the given error as a status - * @param {Error} err The error object - * @this GrpcServerStream - */ - function terminateCall(err) { - // Drain readable data - setStatus(err); - stream.end(); - } - stream.on('error', terminateCall); - /** - * Override of Writable#end method that allows for sending metadata with a - * success status. - * @param {Metadata=} metadata Metadata to send with the status - */ - stream.end = function(metadata) { - if (metadata) { - stream.status.metadata = metadata; - } - Writable.prototype.end.call(this); - }; -} - -/** - * Initialize a readable stream. This is used for both the readable and duplex - * stream constructors. - * @private - * @param {Readable} stream The stream to initialize - * @param {grpc~deserialize} deserialize Deserialization function for - * incoming data. - */ -function setUpReadable(stream, deserialize) { - stream.deserialize = common.wrapIgnoreNull(deserialize); - stream.finished = false; - stream.reading = false; - - stream.terminate = function() { - stream.finished = true; - stream.on('data', function() {}); - }; - - stream.on('cancelled', function() { - stream.terminate(); - }); -} - -/** - * Emitted when the call has been cancelled. After this has been emitted, the - * call's `cancelled` property will be set to `true`. - * @event grpc~ServerUnaryCall~cancelled - */ - -util.inherits(ServerUnaryCall, EventEmitter); - -/** - * An EventEmitter. Used for unary calls. - * @constructor grpc~ServerUnaryCall - * @extends external:EventEmitter - * @param {grpc.internal~Call} call The call object associated with the request - * @param {grpc.Metadata} metadata The request metadata from the client - */ -function ServerUnaryCall(call, metadata) { - EventEmitter.call(this); - this.call = call; - /** - * Indicates if the call has been cancelled - * @member {boolean} grpc~ServerUnaryCall#cancelled - */ - this.cancelled = false; - /** - * The request metadata from the client - * @member {grpc.Metadata} grpc~ServerUnaryCall#metadata - */ - this.metadata = metadata; - /** - * The request message from the client - * @member {*} grpc~ServerUnaryCall#request - */ - this.request = undefined; -} - -/** - * Emitted when the call has been cancelled. After this has been emitted, the - * call's `cancelled` property will be set to `true`. - * @event grpc~ServerWritableStream~cancelled - */ - -util.inherits(ServerWritableStream, Writable); - -/** - * A stream that the server can write to. Used for calls that are streaming from - * the server side. - * @constructor grpc~ServerWritableStream - * @extends external:Writable - * @borrows grpc~ServerUnaryCall#sendMetadata as - * grpc~ServerWritableStream#sendMetadata - * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerWritableStream#getPeer - * @param {grpc.internal~Call} call The call object to send data with - * @param {grpc.Metadata} metadata The request metadata from the client - * @param {grpc~serialize} serialize Serialization function for writes - */ -function ServerWritableStream(call, metadata, serialize) { - Writable.call(this, {objectMode: true}); - this.call = call; - - this.finished = false; - setUpWritable(this, serialize); - /** - * Indicates if the call has been cancelled - * @member {boolean} grpc~ServerWritableStream#cancelled - */ - this.cancelled = false; - /** - * The request metadata from the client - * @member {grpc.Metadata} grpc~ServerWritableStream#metadata - */ - this.metadata = metadata; - /** - * The request message from the client - * @member {*} grpc~ServerWritableStream#request - */ - this.request = undefined; -} - -/** - * Start writing a chunk of data. This is an implementation of a method required - * for implementing stream.Writable. - * @private - * @param {Buffer} chunk The chunk of data to write - * @param {string} encoding Used to pass write flags - * @param {function(Error=)} callback Callback to indicate that the write is - * complete - */ -function _write(chunk, encoding, callback) { - /* jshint validthis: true */ - var batch = {}; - var self = this; - var message; - try { - message = this.serialize(chunk); - } catch (e) { - e.code = constants.status.INTERNAL; - callback(e); - return; - } - if (!this.call.metadataSent) { - batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - this.call.metadataSent = true; - } - if (_.isFinite(encoding)) { - /* Attach the encoding if it is a finite number. This is the closest we - * can get to checking that it is valid flags */ - message.grpcWriteFlags = encoding; - } - batch[grpc.opType.SEND_MESSAGE] = message; - this.call.startBatch(batch, function(err, value) { - if (err) { - self.emit('error', err); - return; - } - callback(); - }); -} - -ServerWritableStream.prototype._write = _write; - -/** - * Emitted when the call has been cancelled. After this has been emitted, the - * call's `cancelled` property will be set to `true`. - * @event grpc~ServerReadableStream~cancelled - */ - -util.inherits(ServerReadableStream, Readable); - -/** - * A stream that the server can read from. Used for calls that are streaming - * from the client side. - * @constructor grpc~ServerReadableStream - * @extends external:Readable - * @borrows grpc~ServerUnaryCall#sendMetadata as - * grpc~ServerReadableStream#sendMetadata - * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerReadableStream#getPeer - * @param {grpc.internal~Call} call The call object to read data with - * @param {grpc.Metadata} metadata The request metadata from the client - * @param {grpc~deserialize} deserialize Deserialization function for reads - */ -function ServerReadableStream(call, metadata, deserialize) { - Readable.call(this, {objectMode: true}); - this.call = call; - setUpReadable(this, deserialize); - /** - * Indicates if the call has been cancelled - * @member {boolean} grpc~ServerReadableStream#cancelled - */ - this.cancelled = false; - /** - * The request metadata from the client - * @member {grpc.Metadata} grpc~ServerReadableStream#metadata - */ - this.metadata = metadata; -} - -/** - * Start reading from the gRPC data source. This is an implementation of a - * method required for implementing stream.Readable - * @access private - * @param {number} size Ignored - */ -function _read(size) { - /* jshint validthis: true */ - var self = this; - /** - * Callback to be called when a READ event is received. Pushes the data onto - * the read queue and starts reading again if applicable - * @param {grpc.Event} event READ event object - */ - function readCallback(err, event) { - if (err) { - self.terminate(); - return; - } - if (self.finished) { - self.push(null); - return; - } - var data = event.read; - var deserialized; - try { - deserialized = self.deserialize(data); - } catch (e) { - e.code = constants.status.INTERNAL; - self.emit('error', e); - return; - } - if (self.push(deserialized) && data !== null) { - var read_batch = {}; - read_batch[grpc.opType.RECV_MESSAGE] = true; - self.call.startBatch(read_batch, readCallback); - } else { - self.reading = false; - } - } - if (self.finished) { - self.push(null); - } else { - if (!self.reading) { - self.reading = true; - var batch = {}; - batch[grpc.opType.RECV_MESSAGE] = true; - self.call.startBatch(batch, readCallback); - } - } -} - -ServerReadableStream.prototype._read = _read; - -/** - * Emitted when the call has been cancelled. After this has been emitted, the - * call's `cancelled` property will be set to `true`. - * @event grpc~ServerDuplexStream~cancelled - */ - -util.inherits(ServerDuplexStream, Duplex); - -/** - * A stream that the server can read from or write to. Used for calls with - * duplex streaming. - * @constructor grpc~ServerDuplexStream - * @extends external:Duplex - * @borrows grpc~ServerUnaryCall#sendMetadata as - * grpc~ServerDuplexStream#sendMetadata - * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerDuplexStream#getPeer - * @param {grpc.internal~Call} call Call object to proxy - * @param {grpc.Metadata} metadata The request metadata from the client - * @param {grpc~serialize} serialize Serialization function for requests - * @param {grpc~deserialize} deserialize Deserialization function for - * responses - */ -function ServerDuplexStream(call, metadata, serialize, deserialize) { - Duplex.call(this, {objectMode: true}); - this.call = call; - setUpWritable(this, serialize); - setUpReadable(this, deserialize); - /** - * Indicates if the call has been cancelled - * @member {boolean} grpc~ServerReadableStream#cancelled - */ - this.cancelled = false; - /** - * The request metadata from the client - * @member {grpc.Metadata} grpc~ServerReadableStream#metadata - */ - this.metadata = metadata; -} - -ServerDuplexStream.prototype._read = _read; -ServerDuplexStream.prototype._write = _write; - -/** - * Send the initial metadata for a writable stream. - * @alias grpc~ServerUnaryCall#sendMetadata - * @param {Metadata} responseMetadata Metadata to send - */ -function sendMetadata(responseMetadata) { - /* jshint validthis: true */ - var self = this; - if (!this.call.metadataSent) { - this.call.metadataSent = true; - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - responseMetadata._getCoreRepresentation(); - this.call.startBatch(batch, function(err) { - if (err) { - self.emit('error', err); - return; - } - }); - } -} - -ServerUnaryCall.prototype.sendMetadata = sendMetadata; -ServerWritableStream.prototype.sendMetadata = sendMetadata; -ServerReadableStream.prototype.sendMetadata = sendMetadata; -ServerDuplexStream.prototype.sendMetadata = sendMetadata; - -/** - * Get the endpoint this call/stream is connected to. - * @alias grpc~ServerUnaryCall#getPeer - * @return {string} The URI of the endpoint - */ -function getPeer() { - /* jshint validthis: true */ - return this.call.getPeer(); -} - -ServerUnaryCall.prototype.getPeer = getPeer; -ServerReadableStream.prototype.getPeer = getPeer; -ServerWritableStream.prototype.getPeer = getPeer; -ServerDuplexStream.prototype.getPeer = getPeer; - -/** - * Wait for the client to close, then emit a cancelled event if the client - * cancelled. - * @private - */ -function waitForCancel() { - /* jshint validthis: true */ - var self = this; - var cancel_batch = {}; - cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - self.call.startBatch(cancel_batch, function(err, result) { - if (err) { - self.emit('error', err); - } - if (result.cancelled) { - self.cancelled = true; - self.emit('cancelled'); - } - }); -} - -ServerUnaryCall.prototype.waitForCancel = waitForCancel; -ServerReadableStream.prototype.waitForCancel = waitForCancel; -ServerWritableStream.prototype.waitForCancel = waitForCancel; -ServerDuplexStream.prototype.waitForCancel = waitForCancel; - -/** - * Callback function passed to server handlers that handle methods with unary - * responses. - * @callback grpc.Server~sendUnaryData - * @param {grpc~ServiceError} error An error, if the call failed - * @param {*} value The response value. Must be a valid argument to the - * `responseSerialize` method of the method that is being handled - * @param {grpc.Metadata=} trailer Trailing metadata to send, if applicable - * @param {grpc.writeFlags=} flags Flags to modify writing the response - */ - -/** - * User-provided method to handle unary requests on a server - * @callback grpc.Server~handleUnaryCall - * @param {grpc~ServerUnaryCall} call The call object - * @param {grpc.Server~sendUnaryData} callback The callback to call to respond - * to the request - */ - -/** - * Fully handle a unary call - * @private - * @param {grpc.internal~Call} call The call to handle - * @param {Object} handler Request handler object for the method that was called - * @param {grpc~Server.handleUnaryCall} handler.func The handler function - * @param {grpc~deserialize} handler.deserialize The deserialization function - * for request data - * @param {grpc~serialize} handler.serialize The serialization function for - * response data - * @param {grpc.Metadata} metadata Metadata from the client - */ -function handleUnary(call, handler, metadata) { - var emitter = new ServerUnaryCall(call, metadata); - emitter.on('error', function(error) { - handleError(call, error); - }); - emitter.waitForCancel(); - var batch = {}; - batch[grpc.opType.RECV_MESSAGE] = true; - call.startBatch(batch, function(err, result) { - if (err) { - handleError(call, err); - return; - } - try { - emitter.request = handler.deserialize(result.read); - } catch (e) { - e.code = constants.status.INTERNAL; - handleError(call, e); - return; - } - if (emitter.cancelled) { - return; - } - handler.func(emitter, function sendUnaryData(err, value, trailer, flags) { - if (err) { - if (trailer) { - err.metadata = trailer; - } - handleError(call, err); - } else { - sendUnaryResponse(call, value, handler.serialize, trailer, flags); - } - }); - }); -} - -/** - * User provided method to handle server streaming methods on the server. - * @callback grpc.Server~handleServerStreamingCall - * @param {grpc~ServerWritableStream} call The call object - */ - -/** - * Fully handle a server streaming call - * @private - * @param {grpc.internal~Call} call The call to handle - * @param {Object} handler Request handler object for the method that was called - * @param {grpc~Server.handleServerStreamingCall} handler.func The handler - * function - * @param {grpc~deserialize} handler.deserialize The deserialization function - * for request data - * @param {grpc~serialize} handler.serialize The serialization function for - * response data - * @param {grpc.Metadata} metadata Metadata from the client - */ -function handleServerStreaming(call, handler, metadata) { - var stream = new ServerWritableStream(call, metadata, handler.serialize); - stream.waitForCancel(); - var batch = {}; - batch[grpc.opType.RECV_MESSAGE] = true; - call.startBatch(batch, function(err, result) { - if (err) { - stream.emit('error', err); - return; - } - try { - stream.request = handler.deserialize(result.read); - } catch (e) { - e.code = constants.status.INTERNAL; - stream.emit('error', e); - return; - } - handler.func(stream); - }); -} - -/** - * User provided method to handle client streaming methods on the server. - * @callback grpc.Server~handleClientStreamingCall - * @param {grpc~ServerReadableStream} call The call object - * @param {grpc.Server~sendUnaryData} callback The callback to call to respond - * to the request - */ - -/** - * Fully handle a client streaming call - * @access private - * @param {grpc.internal~Call} call The call to handle - * @param {Object} handler Request handler object for the method that was called - * @param {grpc~Server.handleClientStreamingCall} handler.func The handler - * function - * @param {grpc~deserialize} handler.deserialize The deserialization function - * for request data - * @param {grpc~serialize} handler.serialize The serialization function for - * response data - * @param {grpc.Metadata} metadata Metadata from the client - */ -function handleClientStreaming(call, handler, metadata) { - var stream = new ServerReadableStream(call, metadata, handler.deserialize); - stream.on('error', function(error) { - handleError(call, error); - }); - stream.waitForCancel(); - handler.func(stream, function(err, value, trailer, flags) { - stream.terminate(); - if (err) { - if (trailer) { - err.metadata = trailer; - } - handleError(call, err); - } else { - sendUnaryResponse(call, value, handler.serialize, trailer, flags); - } - }); -} - -/** - * User provided method to handle bidirectional streaming calls on the server. - * @callback grpc.Server~handleBidiStreamingCall - * @param {grpc~ServerDuplexStream} call The call object - */ - -/** - * Fully handle a bidirectional streaming call - * @private - * @param {grpc.internal~Call} call The call to handle - * @param {Object} handler Request handler object for the method that was called - * @param {grpc~Server.handleBidiStreamingCall} handler.func The handler - * function - * @param {grpc~deserialize} handler.deserialize The deserialization function - * for request data - * @param {grpc~serialize} handler.serialize The serialization function for - * response data - * @param {Metadata} metadata Metadata from the client - */ -function handleBidiStreaming(call, handler, metadata) { - var stream = new ServerDuplexStream(call, metadata, handler.serialize, - handler.deserialize); - stream.waitForCancel(); - handler.func(stream); -} - -var streamHandlers = { - unary: handleUnary, - server_stream: handleServerStreaming, - client_stream: handleClientStreaming, - bidi: handleBidiStreaming -}; - -/** - * Constructs a server object that stores request handlers and delegates - * incoming requests to those handlers - * @memberof grpc - * @constructor - * @param {Object=} options Options that should be passed to the internal server - * implementation - * @example - * var server = new grpc.Server(); - * server.addProtoService(protobuf_service_descriptor, service_implementation); - * server.bind('address:port', server_credential); - * server.start(); - */ -function Server(options) { - this.handlers = {}; - var server = new grpc.Server(options); - this._server = server; - this.started = false; -} - -/** - * Start the server and begin handling requests - */ -Server.prototype.start = function() { - if (this.started) { - throw new Error('Server is already running'); - } - var self = this; - this.started = true; - this._server.start(); - /** - * Handles the SERVER_RPC_NEW event. If there is a handler associated with - * the requested method, use that handler to respond to the request. Then - * wait for the next request - * @param {grpc.internal~Event} event The event to handle with tag - * SERVER_RPC_NEW - */ - function handleNewCall(err, event) { - if (err) { - return; - } - var details = event.new_call; - var call = details.call; - var method = details.method; - var metadata = Metadata._fromCoreRepresentation(details.metadata); - if (method === null) { - return; - } - self._server.requestCall(handleNewCall); - var handler; - if (self.handlers.hasOwnProperty(method)) { - handler = self.handlers[method]; - } else { - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - (new Metadata())._getCoreRepresentation(); - batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { - code: constants.status.UNIMPLEMENTED, - details: '', - metadata: {} - }; - batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - call.startBatch(batch, function() {}); - return; - } - streamHandlers[handler.type](call, handler, metadata); - } - this._server.requestCall(handleNewCall); -}; - -/** - * Unified type for application handlers for all types of calls - * @typedef {(grpc.Server~handleUnaryCall - * |grpc.Server~handleClientStreamingCall - * |grpc.Server~handleServerStreamingCall - * |grpc.Server~handleBidiStreamingCall)} grpc.Server~handleCall - */ - -/** - * Registers a handler to handle the named method. Fails if there already is - * a handler for the given method. Returns true on success - * @param {string} name The name of the method that the provided function should - * handle/respond to. - * @param {grpc.Server~handleCall} handler Function that takes a stream of - * request values and returns a stream of response values - * @param {grpc~serialize} serialize Serialization function for responses - * @param {grpc~deserialize} deserialize Deserialization function for requests - * @param {string} type The streaming type of method that this handles - * @return {boolean} True if the handler was set. False if a handler was already - * set for that name. - */ -Server.prototype.register = function(name, handler, serialize, deserialize, - type) { - if (this.handlers.hasOwnProperty(name)) { - return false; - } - this.handlers[name] = { - func: handler, - serialize: serialize, - deserialize: deserialize, - type: type - }; - return true; -}; - -/** - * Gracefully shuts down the server. The server will stop receiving new calls, - * and any pending calls will complete. The callback will be called when all - * pending calls have completed and the server is fully shut down. This method - * is idempotent with itself and forceShutdown. - * @param {function()} callback The shutdown complete callback - */ -Server.prototype.tryShutdown = function(callback) { - this._server.tryShutdown(callback); -}; - -/** - * Forcibly shuts down the server. The server will stop receiving new calls - * and cancel all pending calls. When it returns, the server has shut down. - * This method is idempotent with itself and tryShutdown, and it will trigger - * any outstanding tryShutdown callbacks. - */ -Server.prototype.forceShutdown = function() { - this._server.forceShutdown(); -}; - -var unimplementedStatusResponse = { - code: constants.status.UNIMPLEMENTED, - details: 'The server does not implement this method' -}; - -var defaultHandler = { - unary: function(call, callback) { - callback(unimplementedStatusResponse); - }, - client_stream: function(call, callback) { - callback(unimplementedStatusResponse); - }, - server_stream: function(call) { - call.emit('error', unimplementedStatusResponse); - }, - bidi: function(call) { - call.emit('error', unimplementedStatusResponse); - } -}; - -/** - * Add a service to the server, with a corresponding implementation. - * @param {grpc~ServiceDefinition} service The service descriptor - * @param {Object<String, grpc.Server~handleCall>} implementation Map of method - * names to method implementation for the provided service. - */ -Server.prototype.addService = function(service, implementation) { - if (!_.isObject(service) || !_.isObject(implementation)) { - throw new Error('addService requires two objects as arguments'); - } - if (_.keys(service).length === 0) { - throw new Error('Cannot add an empty service to a server'); - } - if (this.started) { - throw new Error('Can\'t add a service to a started server.'); - } - var self = this; - _.forOwn(service, function(attrs, name) { - var method_type; - if (attrs.requestStream) { - if (attrs.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (attrs.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - var impl; - if (implementation[name] === undefined) { - /* Handle the case where the method is passed with the name exactly as - written in the proto file, instead of using JavaScript function - naming style */ - if (implementation[attrs.originalName] === undefined) { - common.log(constants.logVerbosity.ERROR, 'Method handler ' + name + - ' for ' + attrs.path + ' expected but not provided'); - impl = defaultHandler[method_type]; - } else { - impl = _.bind(implementation[attrs.originalName], implementation); - } - } else { - impl = _.bind(implementation[name], implementation); - } - var serialize = attrs.responseSerialize; - var deserialize = attrs.requestDeserialize; - var register_success = self.register(attrs.path, impl, serialize, - deserialize, method_type); - if (!register_success) { - throw new Error('Method handler for ' + attrs.path + - ' already provided.'); - } - }); -}; - -/** - * Add a proto service to the server, with a corresponding implementation - * @deprecated Use {@link grpc.Server#addService} instead - * @param {Protobuf.Reflect.Service} service The proto service descriptor - * @param {Object<String, grpc.Server~handleCall>} implementation Map of method - * names to method implementation for the provided service. - */ -Server.prototype.addProtoService = util.deprecate(function(service, - implementation) { - var options; - var protobuf_js_5_common = require('./protobuf_js_5_common'); - var protobuf_js_6_common = require('./protobuf_js_6_common'); - if (protobuf_js_5_common.isProbablyProtobufJs5(service)) { - options = _.defaults(service.grpc_options, common.defaultGrpcOptions); - this.addService( - protobuf_js_5_common.getProtobufServiceAttrs(service, options), - implementation); - } else if (protobuf_js_6_common.isProbablyProtobufJs6(service)) { - options = _.defaults(service.grpc_options, common.defaultGrpcOptions); - this.addService( - protobuf_js_6_common.getProtobufServiceAttrs(service, options), - implementation); - } else { - // We assume that this is a service attributes object - this.addService(service, implementation); - } -}, 'Server#addProtoService: Use Server#addService instead'); - -/** - * Binds the server to the given port, with SSL disabled if creds is an - * insecure credentials object - * @param {string} port The port that the server should bind on, in the format - * "address:port" - * @param {grpc.ServerCredentials} creds Server credential object to be used for - * SSL. Pass an insecure credentials object for an insecure port. - */ -Server.prototype.bind = function(port, creds) { - if (this.started) { - throw new Error('Can\'t bind an already running server to an address'); - } - return this._server.addHttp2Port(port, creds); -}; - -exports.Server = Server; |