aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node/examples
diff options
context:
space:
mode:
authorGravatar murgatroid99 <mlumish@google.com>2015-02-23 10:26:01 -0800
committerGravatar murgatroid99 <mlumish@google.com>2015-02-23 10:26:01 -0800
commit2822e9ad7598a68597e49e8f1875486aa12bf267 (patch)
tree8b91fc187e6d2b3a6c39e23bc831ee3471ee2e23 /src/node/examples
parent9c3604774deac203a0eaaeefcc7d1a9ec7b04797 (diff)
Added pubsub demo client
Diffstat (limited to 'src/node/examples')
-rw-r--r--src/node/examples/pubsub/pubsub.proto4
-rw-r--r--src/node/examples/pubsub/pubsub_demo.js62
2 files changed, 35 insertions, 31 deletions
diff --git a/src/node/examples/pubsub/pubsub.proto b/src/node/examples/pubsub/pubsub.proto
index ef88981bd5..41a354773f 100644
--- a/src/node/examples/pubsub/pubsub.proto
+++ b/src/node/examples/pubsub/pubsub.proto
@@ -34,8 +34,8 @@
syntax = "proto2";
-import "examples/pubsub/empty.proto";
-import "examples/pubsub/label.proto";
+import "empty.proto";
+import "label.proto";
package tech.pubsub;
diff --git a/src/node/examples/pubsub/pubsub_demo.js b/src/node/examples/pubsub/pubsub_demo.js
index d61fe2a7f0..a9b6acbd7e 100644
--- a/src/node/examples/pubsub/pubsub_demo.js
+++ b/src/node/examples/pubsub/pubsub_demo.js
@@ -28,7 +28,10 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
var async = require('async');
+var fs = require('fs');
+var GoogleAuth = require('googleauth');
var parseArgs = require('minimist');
+var strftime = require('strftime');
var _ = require('underscore');
var grpc = require('../..');
var PROTO_PATH = __dirname + '/pubsub.proto';
@@ -45,7 +48,7 @@ PubsubRunner.prototype.getTestTopicName = function() {
if (this.args.topic_name) {
return base_name + this.args.topic_name;
}
- var now_text = new Date().toLocaleFormat('%Y%m%d%H%M%S%L');
+ var now_text = strftime('%Y%m%d%H%M%S%L');
return base_name + process.env.USER + '-' + now_text;
};
@@ -54,7 +57,7 @@ PubsubRunner.prototype.getTestSubName = function() {
if (this.args.sub_name) {
return base_name + this.args.sub_name;
}
- var now_text = new Date().toLocaleFormat('%Y%m%d%H%M%S%L');
+ var now_text = strftime('%Y%m%d%H%M%S%L');
return base_name + process.env.USER + '-' + now_text;
};
@@ -77,6 +80,7 @@ PubsubRunner.prototype.topicExists = function(name, callback) {
};
PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
+ var self = this;
this.topicExists(name, function(err, exists) {
if (err) {
callback(err);
@@ -84,7 +88,7 @@ PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
if (exists) {
callback(null);
} else {
- this.pub.createTopic({name: name}, callback);
+ self.pub.createTopic({name: name}, callback);
}
}
});
@@ -153,45 +157,48 @@ PubsubRunner.prototype.checkExists = function(callback) {
};
PubsubRunner.prototype.randomPubSub = function(callback) {
+ var self = this;
var topic_name = this.getTestTopicName();
var sub_name = this.getTestSubName();
var subscription = {name: sub_name, topic: topic_name};
async.waterfall([
_.bind(this.createTopicIfNeeded, this, topic_name),
- _.bind(this.sub.createSubscription, this, subscription),
+ _.bind(this.sub.createSubscription, this.sub, subscription),
function(resp, cb) {
var msg_count = _.random(10, 30);
// Set up msg_count messages to publish
var message_senders = _.times(msg_count, function(n) {
- return _.bind(this.pub.publish, this.pub, {
+ return _.bind(self.pub.publish, self.pub, {
topic: topic_name,
- message: {data: 'message ' + n}
+ message: {data: new Buffer('message ' + n)}
});
});
- async.parallel(message_senders, cb);
+ async.parallel(message_senders, function(err, result) {
+ cb(err, result, msg_count);
+ });
},
- function(result, cb) {
+ function(result, msg_count, cb) {
console.log('Sent', msg_count, 'messages to', topic_name + ',',
'checking for them now.');
var batch_request = {
subscription: sub_name,
max_events: msg_count
};
- this.sub.pull_batch(batch_request, cb);
+ self.sub.pullBatch(batch_request, cb);
},
function(batch, cb) {
- var ack_ids = _.pluck(batch.pull_responses, 'ack_id');
- console.log('Got', ack_ids.length, 'messages, acknowledging them...');
+ var ack_id = _.pluck(batch.pull_responses, 'ack_id');
+ console.log('Got', ack_id.length, 'messages, acknowledging them...');
var ack_request = {
subscription: sub_name,
- ack_ids: ack_ids
+ ack_id: ack_id
};
- this.sub.acknowledge(ack_request, cb);
+ self.sub.acknowledge(ack_request, cb);
},
function(result, cb) {
console.log(
'Test messages were acknowledged OK, deleting the subscription');
- this.sub.delete({subscription: sub_name}, cb);
+ self.sub.deleteSubscription({subscription: sub_name}, cb);
}
], function (err, result) {
if (err) {
@@ -213,23 +220,23 @@ function main(callback) {
'sub_name'
],
default: {
- host: 'pubsub-testing.googleapis.com',
+ host: 'pubsub-staging.googleapis.com',
oauth_scope: 'https://www.googleapis.com/auth/pubsub',
port: 443,
- action: 'all',
- project: 'stoked-keyword-656'
+ action: 'listSomeTopics',
+ project_id: 'stoked-keyword-656'
}
});
var valid_actions = [
- 'removeTopic',
'createTopic',
- 'listSomeTopic',
+ 'removeTopic',
+ 'listSomeTopics',
'checkExists',
'randomPubSub'
];
- if (!(argv.action === 'all' || _.some(valid_actions, function(action) {
+ if (_.some(valid_actions, function(action) {
return action === argv.action;
- }))) {
+ })) {
callback(new Error('Action was not valid'));
}
var address = argv.host + ':' + argv.port;
@@ -249,17 +256,14 @@ function main(callback) {
return;
}
var ssl_creds = grpc.Credentials.createSsl(ca_data);
- var options = {credentials: ssl_creds};
+ var options = {
+ credentials: ssl_creds,
+ 'grpc.ssl_target_name_override': argv.host
+ };
var pub = new pubsub.PublisherService(address, options, updateMetadata);
var sub = new pubsub.SubscriberService(address, options, updateMetadata);
var runner = new PubsubRunner(pub, sub, argv);
- if (argv.action === 'all') {
- async.series(_.map(valid_actions, function(name) {
- _.bind(runner[name], runner);
- }), callback);
- } else {
- runner[argv.action](callback);
- }
+ runner[argv.action](callback);
});
});
}