aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node/interop
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2015-12-11 15:06:32 -0800
committerGravatar murgatroid99 <mlumish@google.com>2015-12-11 15:23:17 -0800
commit221ae636774a8af761758fcf6c0fec63131eabb4 (patch)
tree9c68aaea138437f1fe1f90c68f436a86c8c94afc /src/node/interop
parent65af7af148671c123ecdba66fcf8265945d1dfcd (diff)
Added support for ResponseParameters.interval_us to the Node interop server
Diffstat (limited to 'src/node/interop')
-rw-r--r--src/node/interop/async_delay_queue.js79
-rw-r--r--src/node/interop/interop_server.js23
2 files changed, 98 insertions, 4 deletions
diff --git a/src/node/interop/async_delay_queue.js b/src/node/interop/async_delay_queue.js
new file mode 100644
index 0000000000..2bd3ca4da3
--- /dev/null
+++ b/src/node/interop/async_delay_queue.js
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+'use strict';
+
+var _ = require('lodash');
+
+/**
+ * This class represents a queue of callbacks that must happen sequentially, each
+ * with a specific delay after the previous event.
+ */
+function AsyncDelayQueue() {
+ this.queue = [];
+
+ this.callback_pending = false;
+}
+
+/**
+ * Run the next callback after its corresponding delay, if there are any
+ * remaining.
+ */
+AsyncDelayQueue.prototype.runNext = function() {
+ var next = this.queue.shift();
+ var continueCallback = _.bind(this.runNext, this);
+ if (next) {
+ this.callback_pending = true;
+ setTimeout(function() {
+ next.callback(continueCallback);
+ }, next.delay);
+ } else {
+ this.callback_pending = false;
+ }
+};
+
+/**
+ * Add a callback to be called with a specific delay after now or after the
+ * current last item in the queue or current pending callback, whichever is
+ * latest.
+ * @param {function(function())} callback The callback
+ * @param {Number} The delay to apply, in milliseconds
+ */
+AsyncDelayQueue.prototype.add = function(callback, delay) {
+ this.queue.push({callback: callback, delay: delay});
+ if (!this.callback_pending) {
+ this.runNext();
+ }
+};
+
+module.exports = AsyncDelayQueue;
diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index 5321005c86..9526b5d183 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -36,6 +36,7 @@
var fs = require('fs');
var path = require('path');
var _ = require('lodash');
+var AsyncDelayQueue = require('./async_delay_queue');
var grpc = require('..');
var testProto = grpc.load({
root: __dirname + '/../../..',
@@ -155,6 +156,7 @@ function handleStreamingInput(call, callback) {
*/
function handleStreamingOutput(call) {
echoHeader(call);
+ var delay_queue = new AsyncDelayQueue();
var req = call.request;
if (req.response_status) {
var status = req.response_status;
@@ -163,9 +165,15 @@ function handleStreamingOutput(call) {
return;
}
_.each(req.response_parameters, function(resp_param) {
- call.write({payload: getPayload(req.response_type, resp_param.size)});
+ delay_queue.add(function(next) {
+ call.write({payload: getPayload(req.response_type, resp_param.size)});
+ next();
+ }, resp_param.interval_us);
+ });
+ delay_queue.add(function(next) {
+ call.end(getEchoTrailer(call));
+ next();
});
- call.end(getEchoTrailer(call));
}
/**
@@ -175,6 +183,7 @@ function handleStreamingOutput(call) {
*/
function handleFullDuplex(call) {
echoHeader(call);
+ var delay_queue = new AsyncDelayQueue();
call.on('data', function(value) {
if (value.response_status) {
var status = value.response_status;
@@ -183,11 +192,17 @@ function handleFullDuplex(call) {
return;
}
_.each(value.response_parameters, function(resp_param) {
- call.write({payload: getPayload(value.response_type, resp_param.size)});
+ delay_queue.add(function(next) {
+ call.write({payload: getPayload(value.response_type, resp_param.size)});
+ next();
+ }, resp_param.interval_us);
});
});
call.on('end', function() {
- call.end(getEchoTrailer(call));
+ delay_queue.add(function(next) {
+ call.end(getEchoTrailer(call));
+ next();
+ });
});
}