aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/node/examples/math_server.js1
-rw-r--r--src/node/index.js10
-rw-r--r--src/node/src/client.js489
-rw-r--r--src/node/src/common.js25
-rw-r--r--src/node/src/server.js506
-rw-r--r--src/node/test/math_client_test.js5
6 files changed, 756 insertions, 280 deletions
diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js
index e1bd11b5a6..e010445389 100644
--- a/src/node/examples/math_server.js
+++ b/src/node/examples/math_server.js
@@ -69,6 +69,7 @@ function mathDiv(call, cb) {
* @param {stream} stream The stream for sending responses.
*/
function mathFib(stream) {
+ console.log(stream);
// Here, call is a standard writable Node object Stream
var previous = 0, current = 1;
for (var i = 0; i < stream.request.limit; i++) {
diff --git a/src/node/index.js b/src/node/index.js
index 0627e7f557..baef4d03c6 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -35,9 +35,9 @@ var _ = require('underscore');
var ProtoBuf = require('protobufjs');
-var surface_client = require('./src/surface_client.js');
+var client = require('./src/client.js');
-var surface_server = require('./src/surface_server.js');
+var server = require('./src/server.js');
var grpc = require('bindings')('grpc');
@@ -54,7 +54,7 @@ function loadObject(value) {
});
return result;
} else if (value.className === 'Service') {
- return surface_client.makeClientConstructor(value);
+ return client.makeClientConstructor(value);
} else if (value.className === 'Message' || value.className === 'Enum') {
return value.build();
} else {
@@ -84,9 +84,9 @@ exports.loadObject = loadObject;
exports.load = load;
/**
- * See docs for surface_server.makeServerConstructor
+ * See docs for server.makeServerConstructor
*/
-exports.buildServer = surface_server.makeServerConstructor;
+exports.buildServer = server.makeServerConstructor;
/**
* Status name to code number mapping
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 3a1c9eef84..88fa9dc2e2 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2014, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,14 +31,104 @@
*
*/
+var _ = require('underscore');
+
+var capitalize = require('underscore.string/capitalize');
+var decapitalize = require('underscore.string/decapitalize');
+
var grpc = require('bindings')('grpc.node');
-var common = require('./common');
+var common = require('./common.js');
+
+var EventEmitter = require('events').EventEmitter;
+
+var stream = require('stream');
-var Duplex = require('stream').Duplex;
+var Readable = stream.Readable;
+var Writable = stream.Writable;
+var Duplex = stream.Duplex;
var util = require('util');
-util.inherits(GrpcClientStream, Duplex);
+util.inherits(ClientWritableStream, Writable);
+
+function ClientWritableStream(call, serialize) {
+ Writable.call(this, {objectMode: true});
+ this.call = call;
+ this.serialize = common.wrapIgnoreNull(serialize);
+ this.on('finish', function() {
+ var batch = {};
+ batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ call.startBatch(batch, function() {});
+ });
+}
+
+/**
+ * Attempt to write the given chunk. Calls the callback when done. This is an
+ * implementation of a method needed for implementing stream.Writable.
+ * @param {Buffer} chunk The chunk to write
+ * @param {string} encoding Ignored
+ * @param {function(Error=)} callback Called when the write is complete
+ */
+function _write(chunk, encoding, callback) {
+ var batch = {};
+ batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
+ console.log(batch);
+ this.call.startBatch(batch, function(err, event) {
+ if (err) {
+ throw err;
+ }
+ callback();
+ });
+};
+
+ClientWritableStream.prototype._write = _write;
+
+util.inherits(ClientReadableStream, Readable);
+
+function ClientReadableStream(call, deserialize) {
+ Readable.call(this, {objectMode: true});
+ this.call = call;
+ this.finished = false;
+ this.reading = false;
+ this.serialize = common.wrapIgnoreNull(deserialize);
+}
+
+function _read(size) {
+ var self = this;
+ /**
+ * Callback to be called when a READ event is received. Pushes the data onto
+ * the read queue and starts reading again if applicable
+ * @param {grpc.Event} event READ event object
+ */
+ function readCallback(event) {
+ if (self.finished) {
+ self.push(null);
+ return;
+ }
+ var data = event.data;
+ if (self.push(self.deserialize(data)) && data != null) {
+ var read_batch = {};
+ read_batch[grpc.opType.RECV_MESSAGE] = true;
+ self.call.startBatch(read_batch, readCallback);
+ } else {
+ self.reading = false;
+ }
+ }
+ if (self.finished) {
+ self.push(null);
+ } else {
+ if (!self.reading) {
+ self.reading = true;
+ var read_batch = {};
+ read_batch[grpc.opType.RECV_MESSAGE] = true;
+ self.call.startBatch(read_batch, readCallback);
+ }
+ }
+};
+
+ClientReadableStream.prototype._read = _read;
+
+util.inherits(ClientDuplexStream, Duplex);
/**
* Class for representing a gRPC client side stream as a Node stream. Extends
@@ -49,167 +139,310 @@ util.inherits(GrpcClientStream, Duplex);
* @param {function(Buffer):*=} deserialize Deserialization function for
* responses
*/
-function GrpcClientStream(call, serialize, deserialize) {
+function ClientDuplexStream(call, serialize, deserialize) {
Duplex.call(this, {objectMode: true});
- if (!serialize) {
- serialize = function(value) {
- return value;
- };
- }
- if (!deserialize) {
- deserialize = function(value) {
- return value;
- };
- }
+ this.serialize = common.wrapIgnoreNull(serialize);
+ this.serialize = common.wrapIgnoreNull(deserialize);
var self = this;
var finished = false;
// Indicates that a read is currently pending
var reading = false;
- // Indicates that a write is currently pending
- var writing = false;
- this._call = call;
+ this.call = call;
+}
- /**
- * Serialize a request value to a buffer. Always maps null to null. Otherwise
- * uses the provided serialize function
- * @param {*} value The value to serialize
- * @return {Buffer} The serialized value
- */
- this.serialize = function(value) {
- if (value === null || value === undefined) {
- return null;
- }
- return serialize(value);
- };
+ClientDuplexStream.prototype._read = _read;
+ClientDuplexStream.prototype._write = _write;
+
+function cancel() {
+ this.call.cancel();
+}
+
+ClientReadableStream.prototype.cancel = cancel;
+ClientWritableStream.prototype.cancel = cancel;
+ClientDuplexStream.prototype.cancel = cancel;
+/**
+ * Get a function that can make unary requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeUnaryRequest
+ */
+function makeUnaryRequestFunction(method, serialize, deserialize) {
/**
- * Deserialize a response buffer to a value. Always maps null to null.
- * Otherwise uses the provided deserialize function.
- * @param {Buffer} buffer The buffer to deserialize
- * @return {*} The deserialized value
+ * Make a unary request with this method on the given channel with the given
+ * argument, callback, etc.
+ * @this {Client} Client object. Must have a channel member.
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {function(?Error, value=)} callback The callback to for when the
+ * response is received
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
*/
- this.deserialize = function(buffer) {
- if (buffer === null) {
- return null;
+ function makeUnaryRequest(argument, callback, metadata, deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
}
- return deserialize(buffer);
- };
+ var emitter = new EventEmitter();
+ var call = new grpc.Call(this.channel, method, deadline);
+ if (metadata === null || metadata === undefined) {
+ metadata = {};
+ }
+ emitter.cancel = function cancel() {
+ call.cancel();
+ };
+ var client_batch = {};
+ client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
+ client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ client_batch[grpc.opType.RECV_MESSAGE] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ emitter.emit('status', response.status);
+ emitter.emit('metadata', response.metadata);
+ callback(null, deserialize(response.read));
+ });
+ return emitter;
+ }
+ return makeUnaryRequest;
+}
+
+/**
+ * Get a function that can make client stream requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeClientStreamRequest
+ */
+function makeClientStreamRequestFunction(method, serialize, deserialize) {
/**
- * Callback to be called when a READ event is received. Pushes the data onto
- * the read queue and starts reading again if applicable
- * @param {grpc.Event} event READ event object
+ * Make a client stream request with this method on the given channel with the
+ * given callback, etc.
+ * @this {Client} Client object. Must have a channel member.
+ * @param {function(?Error, value=)} callback The callback to for when the
+ * response is received
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
*/
- function readCallback(event) {
- if (finished) {
- self.push(null);
- return;
+ function makeClientStreamRequest(callback, metadata, deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
}
- var data = event.data;
- if (self.push(self.deserialize(data)) && data != null) {
- self._call.startRead(readCallback);
- } else {
- reading = false;
+ var call = new grpc.Call(this.channel, method, deadline);
+ if (metadata === null || metadata === undefined) {
+ metadata = {};
}
+ var stream = new ClientWritableStream(call, serialize);
+ var metadata_batch = {};
+ metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ call.startBatch(metadata_batch, function(err, response) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ stream.emit('metadata', response.metadata);
+ });
+ var client_batch = {};
+ client_batch[grpc.opType.RECV_MESSAGE] = true;
+ client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(client_batch, function(err, response) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ stream.emit('status', response.status);
+ callback(null, deserialize(response.read));
+ });
+ return stream;
}
- call.invoke(function(event) {
- self.emit('metadata', event.data);
- }, function(event) {
- finished = true;
- self.emit('status', event.data);
- }, 0);
- this.on('finish', function() {
- call.writesDone(function() {});
- });
+ return makeClientStreamRequest;
+}
+
+/**
+ * Get a function that can make server stream requests to the specified method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeServerStreamRequest
+ */
+function makeServerStreamRequestFunction(method, serialize, deserialize) {
/**
- * Start reading if there is not already a pending read. Reading will
- * continue until self.push returns false (indicating reads should slow
- * down) or the read data is null (indicating that there is no more data).
+ * Make a server stream request with this method on the given channel with the
+ * given argument, etc.
+ * @this {SurfaceClient} Client object. Must have a channel member.
+ * @param {*} argument The argument to the call. Should be serializable with
+ * serialize
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
*/
- this.startReading = function() {
- if (finished) {
- self.push(null);
- } else {
- if (!reading) {
- reading = true;
- self._call.startRead(readCallback);
- }
+ function makeServerStreamRequest(argument, metadata, deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
+ }
+ var call = new grpc.Call(this.channel, method, deadline);
+ if (metadata === null || metadata === undefined) {
+ metadata = {};
}
- };
+ var stream = new ClientReadableStream(call, deserialize);
+ var start_batch = {};
+ console.log('Starting server streaming request on', method);
+ start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
+ start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
+ call.startBatch(start_batch, function(err, response) {
+ if (err) {
+ throw err;
+ }
+ console.log(response);
+ stream.emit('metadata', response.metadata);
+ });
+ var status_batch = {};
+ status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(status_batch, function(err, response) {
+ if (err) {
+ throw err;
+ }
+ stream.emit('status', response.status);
+ });
+ return stream;
+ }
+ return makeServerStreamRequest;
}
/**
- * Start reading. This is an implementation of a method needed for implementing
- * stream.Readable.
- * @param {number} size Ignored
+ * Get a function that can make bidirectional stream requests to the specified
+ * method.
+ * @param {string} method The name of the method to request
+ * @param {function(*):Buffer} serialize The serialization function for inputs
+ * @param {function(Buffer)} deserialize The deserialization function for
+ * outputs
+ * @return {Function} makeBidiStreamRequest
*/
-GrpcClientStream.prototype._read = function(size) {
- this.startReading();
-};
+function makeBidiStreamRequestFunction(method, serialize, deserialize) {
+ /**
+ * Make a bidirectional stream request with this method on the given channel.
+ * @this {SurfaceClient} Client object. Must have a channel member.
+ * @param {array=} metadata Array of metadata key/value pairs to add to the
+ * call
+ * @param {(number|Date)=} deadline The deadline for processing this request.
+ * Defaults to infinite future
+ * @return {EventEmitter} An event emitter for stream related events
+ */
+ function makeBidiStreamRequest(metadata, deadline) {
+ if (deadline === undefined) {
+ deadline = Infinity;
+ }
+ var call = new grpc.Call(this.channel, method, deadline);
+ if (metadata === null || metadata === undefined) {
+ metadata = {};
+ }
+ var stream = new ClientDuplexStream(call, serialize, deserialize);
+ var start_batch = {};
+ start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
+ call.startBatch(start_batch, function(err, response) {
+ if (err) {
+ throw err;
+ }
+ stream.emit('metadata', response.metadata);
+ });
+ var status_batch = {};
+ status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
+ call.startBatch(status_batch, function(err, response) {
+ if (err) {
+ throw err;
+ }
+ stream.emit('status', response.status);
+ });
+ return stream;
+ }
+ return makeBidiStreamRequest;
+}
-/**
- * Attempt to write the given chunk. Calls the callback when done. This is an
- * implementation of a method needed for implementing stream.Writable.
- * @param {Buffer} chunk The chunk to write
- * @param {string} encoding Ignored
- * @param {function(Error=)} callback Ignored
- */
-GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
- var self = this;
- self._call.startWrite(self.serialize(chunk), function(event) {
- callback();
- }, 0);
-};
/**
- * Cancel the ongoing call. If the call has not already finished, it will finish
- * with status CANCELLED.
+ * Map with short names for each of the requester maker functions. Used in
+ * makeClientConstructor
*/
-GrpcClientStream.prototype.cancel = function() {
- this._call.cancel();
+var requester_makers = {
+ unary: makeUnaryRequestFunction,
+ server_stream: makeServerStreamRequestFunction,
+ client_stream: makeClientStreamRequestFunction,
+ bidi: makeBidiStreamRequestFunction
};
/**
- * Make a request on the channel to the given method with the given arguments
- * @param {grpc.Channel} channel The channel on which to make the request
- * @param {string} method The method to request
- * @param {function(*):Buffer} serialize Serialization function for requests
- * @param {function(Buffer):*} deserialize Deserialization function for
- * responses
- * @param {array=} metadata Array of metadata key/value pairs to add to the call
- * @param {(number|Date)=} deadline The deadline for processing this request.
- * Defaults to infinite future.
- * @return {stream=} The stream of responses
+ * Creates a constructor for clients for the given service
+ * @param {ProtoBuf.Reflect.Service} service The service to generate a client
+ * for
+ * @return {function(string, Object)} New client constructor
*/
-function makeRequest(channel,
- method,
- serialize,
- deserialize,
- metadata,
- deadline) {
- if (deadline === undefined) {
- deadline = Infinity;
- }
- var call = new grpc.Call(channel, method, deadline);
- if (metadata) {
- call.addMetadata(metadata);
+function makeClientConstructor(service) {
+ var prefix = '/' + common.fullyQualifiedName(service) + '/';
+ /**
+ * Create a client with the given methods
+ * @constructor
+ * @param {string} address The address of the server to connect to
+ * @param {Object} options Options to pass to the underlying channel
+ */
+ function Client(address, options) {
+ this.channel = new grpc.Channel(address, options);
}
- return new GrpcClientStream(call, serialize, deserialize);
+
+ _.each(service.children, function(method) {
+ var method_type;
+ if (method.requestStream) {
+ if (method.responseStream) {
+ method_type = 'bidi';
+ } else {
+ method_type = 'client_stream';
+ }
+ } else {
+ if (method.responseStream) {
+ method_type = 'server_stream';
+ } else {
+ method_type = 'unary';
+ }
+ }
+ Client.prototype[decapitalize(method.name)] =
+ requester_makers[method_type](
+ prefix + capitalize(method.name),
+ common.serializeCls(method.resolvedRequestType.build()),
+ common.deserializeCls(method.resolvedResponseType.build()));
+ });
+
+ Client.service = service;
+
+ return Client;
}
-/**
- * See documentation for makeRequest above
- */
-exports.makeRequest = makeRequest;
+exports.makeClientConstructor = makeClientConstructor;
/**
- * Represents a client side gRPC channel associated with a single host.
- */
-exports.Channel = grpc.Channel;
-/**
- * Status name to code number mapping
+ * See docs for client.status
*/
exports.status = grpc.status;
/**
- * Call error name to code number mapping
+ * See docs for client.callError
*/
exports.callError = grpc.callError;
diff --git a/src/node/src/common.js b/src/node/src/common.js
index 54247e3fa1..7560cf1bdd 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -31,6 +31,8 @@
*
*/
+var _ = require('underscore');
+
var capitalize = require('underscore.string/capitalize');
/**
@@ -88,6 +90,24 @@ function fullyQualifiedName(value) {
}
/**
+ * Wrap a function to pass null-like values through without calling it. If no
+ * function is given, just uses the identity;
+ * @param {?function} func The function to wrap
+ * @return {function} The wrapped function
+ */
+function wrapIgnoreNull(func) {
+ if (!func) {
+ return _.identity;
+ }
+ return function(arg) {
+ if (arg === null || arg === undefined) {
+ return null;
+ }
+ return func(arg);
+ };
+}
+
+/**
* See docs for deserializeCls
*/
exports.deserializeCls = deserializeCls;
@@ -101,3 +121,8 @@ exports.serializeCls = serializeCls;
* See docs for fullyQualifiedName
*/
exports.fullyQualifiedName = fullyQualifiedName;
+
+/**
+ * See docs for wrapIgnoreNull
+ */
+exports.wrapIgnoreNull = wrapIgnoreNull;
diff --git a/src/node/src/server.js b/src/node/src/server.js
index e4f71ff05f..2d5396e3b7 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2014, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -33,80 +33,72 @@
var _ = require('underscore');
+var capitalize = require('underscore.string/capitalize');
+var decapitalize = require('underscore.string/decapitalize');
+
var grpc = require('bindings')('grpc.node');
var common = require('./common');
-var Duplex = require('stream').Duplex;
+var stream = require('stream');
+
+var Readable = stream.Readable;
+var Writable = stream.Writable;
+var Duplex = stream.Duplex;
var util = require('util');
-util.inherits(GrpcServerStream, Duplex);
+var EventEmitter = require('events').EventEmitter;
-/**
- * Class for representing a gRPC server side stream as a Node stream. Extends
- * from stream.Duplex.
- * @constructor
- * @param {grpc.Call} call Call object to proxy
- * @param {function(*):Buffer=} serialize Serialization function for responses
- * @param {function(Buffer):*=} deserialize Deserialization function for
- * requests
- */
-function GrpcServerStream(call, serialize, deserialize) {
- Duplex.call(this, {objectMode: true});
- if (!serialize) {
- serialize = function(value) {
- return value;
- };
- }
- if (!deserialize) {
- deserialize = function(value) {
- return value;
- };
- }
- this._call = call;
- // Indicate that a status has been sent
- var finished = false;
- var self = this;
- var status = {
- 'code' : grpc.status.OK,
- 'details' : 'OK'
- };
+var common = require('./common.js');
- /**
- * Serialize a response value to a buffer. Always maps null to null. Otherwise
- * uses the provided serialize function
- * @param {*} value The value to serialize
- * @return {Buffer} The serialized value
- */
- this.serialize = function(value) {
- if (value === null || value === undefined) {
- return null;
- }
- return serialize(value);
+function handleError(call, error) {
+ var error_batch = {};
+ error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: grpc.status.INTERNAL,
+ details: 'Unknown Error',
+ metadata: {}
};
+ call.startBatch(error_batch, function(){});
+}
- /**
- * Deserialize a request buffer to a value. Always maps null to null.
- * Otherwise uses the provided deserialize function.
- * @param {Buffer} buffer The buffer to deserialize
- * @return {*} The deserialized value
- */
- this.deserialize = function(buffer) {
- if (buffer === null) {
- return null;
+function waitForCancel(call, emitter) {
+ var cancel_batch = {};
+ cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ call.startBatch(cancel_batch, function(err, result) {
+ if (err) {
+ emitter.emit('error', err);
}
- return deserialize(buffer);
+ if (result.cancelled) {
+ emitter.cancelled = true;
+ emitter.emit('cancelled');
+ }
+ });
+}
+
+function sendUnaryResponse(call, value, serialize) {
+ var end_batch = {};
+ end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
+ end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: grpc.status.OK,
+ details: 'OK',
+ metadata: {}
};
+ call.startBatch(end_batch, function (){});
+}
- /**
- * Send the pending status
- */
+function setUpWritable(stream, serialize) {
+ stream.finished = false;
+ stream.status = {
+ 'code' : grpc.status.OK,
+ 'details' : 'OK'
+ };
+ stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() {
- call.startWriteStatus(status.code, status.details, function() {
- });
- finished = true;
+ var batch = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
+ stream.call.startBatch(batch, function(){});
}
- this.on('finish', sendStatus);
+ stream.on('finish', sendStatus);
/**
* Set the pending status to a given error status. If the error does not have
* code or details properties, the code will be set to grpc.status.INTERNAL
@@ -123,7 +115,7 @@ function GrpcServerStream(call, serialize, deserialize) {
details = err.details;
}
}
- status = {'code': code, 'details': details};
+ stream.status = {'code': code, 'details': details};
}
/**
* Terminate the call. This includes indicating that reads are done, draining
@@ -133,69 +125,196 @@ function GrpcServerStream(call, serialize, deserialize) {
*/
function terminateCall(err) {
// Drain readable data
- this.on('data', function() {});
setStatus(err);
- this.end();
+ stream.end();
}
- this.on('error', terminateCall);
- // Indicates that a read is pending
- var reading = false;
+ stream.on('error', terminateCall);
+}
+
+function setUpReadable(stream, deserialize) {
+ stream.deserialize = common.wrapIgnoreNull(deserialize);
+ stream.finished = false;
+ stream.reading = false;
+
+ stream.terminate = function() {
+ stream.finished = true;
+ stream.on('data', function() {});
+ };
+
+ stream.on('cancelled', function() {
+ stream.terminate();
+ });
+}
+
+util.inherits(ServerWritableStream, Writable);
+
+function ServerWritableStream(call, serialize) {
+ Writable.call(this, {objectMode: true});
+ this.call = call;
+
+ this.finished = false;
+ setUpWritable(this, serialize);
+}
+
+/**
+ * Start writing a chunk of data. This is an implementation of a method required
+ * for implementing stream.Writable.
+ * @param {Buffer} chunk The chunk of data to write
+ * @param {string} encoding Ignored
+ * @param {function(Error=)} callback Callback to indicate that the write is
+ * complete
+ */
+function _write(chunk, encoding, callback) {
+ var batch = {};
+ batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
+ this.call.startBatch(batch, function(err, value) {
+ if (err) {
+ this.emit('error', err);
+ return;
+ }
+ callback();
+ });
+}
+
+ServerWritableStream.prototype._write = _write;
+
+util.inherits(ServerReadableStream, Readable);
+
+function ServerReadableStream(call, deserialize) {
+ Readable.call(this, {objectMode: true});
+ this.call = call;
+ setUpReadable(this, deserialize);
+}
+
+/**
+ * Start reading from the gRPC data source. This is an implementation of a
+ * method required for implementing stream.Readable
+ * @param {number} size Ignored
+ */
+function _read(size) {
+ var self = this;
/**
* Callback to be called when a READ event is received. Pushes the data onto
* the read queue and starts reading again if applicable
* @param {grpc.Event} event READ event object
*/
- function readCallback(event) {
- if (finished) {
+ function readCallback(err, event) {
+ if (err) {
+ self.terminate();
+ return;
+ }
+ if (self.finished) {
self.push(null);
return;
}
- var data = event.data;
+ var data = event.read;
if (self.push(self.deserialize(data)) && data != null) {
- self._call.startRead(readCallback);
+ var read_batch = {};
+ read_batch[grpc.opType.RECV_MESSAGE] = true;
+ self.call.startBatch(read_batch, readCallback);
} else {
- reading = false;
+ self.reading = false;
}
}
- /**
- * Start reading if there is not already a pending read. Reading will
- * continue until self.push returns false (indicating reads should slow
- * down) or the read data is null (indicating that there is no more data).
- */
- this.startReading = function() {
- if (finished) {
- self.push(null);
- } else {
- if (!reading) {
- reading = true;
- self._call.startRead(readCallback);
+ if (self.finished) {
+ self.push(null);
+ } else {
+ if (!self.reading) {
+ self.reading = true;
+ var batch = {};
+ batch[grpc.opType.RECV_MESSAGE] = true;
+ self.call.startBatch(batch, readCallback);
+ }
+ }
+}
+
+ServerReadableStream.prototype._read = _read;
+
+util.inherits(ServerDuplexStream, Duplex);
+
+function ServerDuplexStream(call, serialize, deserialize) {
+ Duplex.call(this, {objectMode: true});
+ setUpWritable(this, serialize);
+ setUpReadable(this, deserialize);
+}
+
+ServerDuplexStream.prototype._read = _read;
+ServerDuplexStream.prototype._write = _write;
+
+function handleUnary(call, handler, metadata) {
+ var emitter = new EventEmitter();
+ emitter.on('error', function(error) {
+ handleError(call, error);
+ });
+ waitForCancel(call, emitter);
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ batch[grpc.opType.RECV_MESSAGE] = true;
+ call.startBatch(batch, function(err, result) {
+ if (err) {
+ handleError(call, err);
+ return;
+ }
+ emitter.request = handler.deserialize(result.read);
+ if (emitter.cancelled) {
+ return;
+ }
+ handler.func(emitter, function sendUnaryData(err, value) {
+ if (err) {
+ handleError(call, err);
}
+ sendUnaryResponse(call, value, handler.serialize);
+ });
+ });
+}
+
+function handleServerStreaming(call, handler, metadata) {
+ console.log('Handling server streaming call');
+ var stream = new ServerWritableStream(call, handler.serialize);
+ waitForCancel(call, stream);
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ batch[grpc.opType.RECV_MESSAGE] = true;
+ call.startBatch(batch, function(err, result) {
+ if (err) {
+ stream.emit('error', err);
+ return;
}
- };
+ stream.request = result.read;
+ handler.func(stream);
+ });
}
-/**
- * Start reading from the gRPC data source. This is an implementation of a
- * method required for implementing stream.Readable
- * @param {number} size Ignored
- */
-GrpcServerStream.prototype._read = function(size) {
- this.startReading();
-};
+function handleClientStreaming(call, handler, metadata) {
+ var stream = new ServerReadableStream(call, handler.deserialize);
+ waitForCancel(call, stream);
+ var metadata_batch = {};
+ metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ call.startBatch(metadata_batch, function() {});
+ handler.func(stream, function(err, value) {
+ stream.terminate();
+ if (err) {
+ handleError(call, err);
+ }
+ sendUnaryResponse(call, value, handler.serialize);
+ });
+}
-/**
- * Start writing a chunk of data. This is an implementation of a method required
- * for implementing stream.Writable.
- * @param {Buffer} chunk The chunk of data to write
- * @param {string} encoding Ignored
- * @param {function(Error=)} callback Callback to indicate that the write is
- * complete
- */
-GrpcServerStream.prototype._write = function(chunk, encoding, callback) {
- var self = this;
- self._call.startWrite(self.serialize(chunk), function(event) {
- callback();
- }, 0);
+function handleBidiStreaming(call, handler, metadata) {
+ var stream = new ServerDuplexStream(call, handler.serialize,
+ handler.deserialize);
+ waitForCancel(call, stream);
+ var metadata_batch = {};
+ metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
+ call.startBatch(metadata_batch, function() {});
+ handler.func(stream);
+}
+
+var streamHandlers = {
+ unary: handleUnary,
+ server_stream: handleServerStreaming,
+ client_stream: handleClientStreaming,
+ bidi: handleBidiStreaming
};
/**
@@ -218,7 +337,7 @@ function Server(getMetadata, options) {
* Start the server and begin handling requests
* @this Server
*/
- this.start = function() {
+ this.listen = function() {
console.log('Server starting');
_.each(handlers, function(handler, handler_name) {
console.log('Serving', handler_name);
@@ -233,48 +352,42 @@ function Server(getMetadata, options) {
* wait for the next request
* @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW
*/
- function handleNewCall(event) {
- var call = event.call;
- var data = event.data;
- if (data === null) {
+ function handleNewCall(err, event) {
+ console.log('Handling new call');
+ if (err) {
+ return;
+ }
+ var details = event['new call'];
+ var call = details.call;
+ var method = details.method;
+ var metadata = details.metadata;
+ if (method === null) {
return;
}
server.requestCall(handleNewCall);
var handler = undefined;
- var deadline = data.absolute_deadline;
- var cancelled = false;
- call.serverAccept(function(event) {
- if (event.data.code === grpc.status.CANCELLED) {
- cancelled = true;
- if (stream) {
- stream.emit('cancelled');
- }
- }
- }, 0);
- if (handlers.hasOwnProperty(data.method)) {
- handler = handlers[data.method];
+ var deadline = details.deadline;
+ if (handlers.hasOwnProperty(method)) {
+ handler = handlers[method];
+ console.log(handler);
} else {
- call.serverEndInitialMetadata(0);
- call.startWriteStatus(
- grpc.status.UNIMPLEMENTED,
- "This method is not available on this server.",
- function() {});
+ console.log(handlers);
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] = {};
+ batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ code: grpc.status.UNIMPLEMENTED,
+ details: "This method is not available on this server.",
+ metadata: {}
+ };
+ batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ call.startBatch(batch, function() {});
return;
}
+ var response_metadata = {};
if (getMetadata) {
- call.addMetadata(getMetadata(data.method, data.metadata));
- }
- call.serverEndInitialMetadata(0);
- var stream = new GrpcServerStream(call, handler.serialize,
- handler.deserialize);
- Object.defineProperty(stream, 'cancelled', {
- get: function() { return cancelled;}
- });
- try {
- handler.func(stream, data.metadata);
- } catch (e) {
- stream.emit('error', e);
+ response_metadata = getMetadata(method, metadata);
}
+ streamHandlers[handler.type](call, handler, response_metadata);
}
server.requestCall(handleNewCall);
};
@@ -294,17 +407,20 @@ function Server(getMetadata, options) {
* returns a stream of response values
* @param {function(*):Buffer} serialize Serialization function for responses
* @param {function(Buffer):*} deserialize Deserialization function for requests
+ * @param {string} type The streaming type of method that this handles
* @return {boolean} True if the handler was set. False if a handler was already
* set for that name.
*/
-Server.prototype.register = function(name, handler, serialize, deserialize) {
+Server.prototype.register = function(name, handler, serialize, deserialize,
+ type) {
if (this.handlers.hasOwnProperty(name)) {
return false;
}
this.handlers[name] = {
func: handler,
serialize: serialize,
- deserialize: deserialize
+ deserialize: deserialize,
+ type: type
};
return true;
};
@@ -324,6 +440,110 @@ Server.prototype.bind = function(port, secure) {
};
/**
- * See documentation for Server
+ * Creates a constructor for servers with a service defined by the methods
+ * object. The methods object has string keys and values of this form:
+ * {serialize: function, deserialize: function, client_stream: bool,
+ * server_stream: bool}
+ * @param {Object} methods Method descriptor for each method the server should
+ * expose
+ * @param {string} prefix The prefex to prepend to each method name
+ * @return {function(Object, Object)} New server constructor
+ */
+function makeServerConstructor(services) {
+ var qual_names = [];
+ _.each(services, function(service) {
+ _.each(service.children, function(method) {
+ var name = common.fullyQualifiedName(method);
+ if (_.indexOf(qual_names, name) !== -1) {
+ throw new Error('Method ' + name + ' exposed by more than one service');
+ }
+ qual_names.push(name);
+ });
+ });
+ /**
+ * Create a server with the given handlers for all of the methods.
+ * @constructor
+ * @param {Object} service_handlers Map from service names to map from method
+ * names to handlers
+ * @param {function(string, Object<string, Array<Buffer>>):
+ Object<string, Array<Buffer|string>>=} getMetadata Callback that
+ * gets metatada for a given method
+ * @param {Object=} options Options to pass to the underlying server
+ */
+ function SurfaceServer(service_handlers, getMetadata, options) {
+ var server = new Server(getMetadata, options);
+ this.inner_server = server;
+ _.each(services, function(service) {
+ var service_name = common.fullyQualifiedName(service);
+ if (service_handlers[service_name] === undefined) {
+ throw new Error('Handlers for service ' +
+ service_name + ' not provided.');
+ }
+ var prefix = '/' + common.fullyQualifiedName(service) + '/';
+ _.each(service.children, function(method) {
+ var method_type;
+ if (method.requestStream) {
+ if (method.responseStream) {
+ method_type = 'bidi';
+ } else {
+ method_type = 'client_stream';
+ }
+ } else {
+ if (method.responseStream) {
+ method_type = 'server_stream';
+ } else {
+ method_type = 'unary';
+ }
+ }
+ if (service_handlers[service_name][decapitalize(method.name)] ===
+ undefined) {
+ throw new Error('Method handler for ' +
+ common.fullyQualifiedName(method) + ' not provided.');
+ }
+ var serialize = common.serializeCls(
+ method.resolvedResponseType.build());
+ var deserialize = common.deserializeCls(
+ method.resolvedRequestType.build());
+ server.register(
+ prefix + capitalize(method.name),
+ service_handlers[service_name][decapitalize(method.name)],
+ serialize, deserialize, method_type);
+ });
+ }, this);
+ }
+
+ /**
+ * Binds the server to the given port, with SSL enabled if secure is specified
+ * @param {string} port The port that the server should bind on, in the format
+ * "address:port"
+ * @param {boolean=} secure Whether the server should open a secure port
+ * @return {SurfaceServer} this
+ */
+ SurfaceServer.prototype.bind = function(port, secure) {
+ return this.inner_server.bind(port, secure);
+ };
+
+ /**
+ * Starts the server listening on any bound ports
+ * @return {SurfaceServer} this
+ */
+ SurfaceServer.prototype.listen = function() {
+ this.inner_server.listen();
+ return this;
+ };
+
+ /**
+ * Shuts the server down; tells it to stop listening for new requests and to
+ * kill old requests.
+ */
+ SurfaceServer.prototype.shutdown = function() {
+ this.inner_server.shutdown();
+ };
+
+ return SurfaceServer;
+}
+
+/**
+ * See documentation for makeServerConstructor
*/
-module.exports = Server;
+exports.makeServerConstructor = makeServerConstructor;
diff --git a/src/node/test/math_client_test.js b/src/node/test/math_client_test.js
index 0e365bf870..f347b18ea0 100644
--- a/src/node/test/math_client_test.js
+++ b/src/node/test/math_client_test.js
@@ -63,13 +63,10 @@ describe('Math client', function() {
assert.ifError(err);
assert.equal(value.quotient, 1);
assert.equal(value.remainder, 3);
- });
- call.on('status', function checkStatus(status) {
- assert.strictEqual(status.code, grpc.status.OK);
done();
});
});
- it('should handle a server streaming request', function(done) {
+ it.only('should handle a server streaming request', function(done) {
var call = math_client.fib({limit: 7});
var expected_results = [1, 1, 2, 3, 5, 8, 13];
var next_expected = 0;