diff options
author | murgatroid99 <mlumish@google.com> | 2015-02-23 10:26:01 -0800 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2015-02-23 10:26:01 -0800 |
commit | 2822e9ad7598a68597e49e8f1875486aa12bf267 (patch) | |
tree | 8b91fc187e6d2b3a6c39e23bc831ee3471ee2e23 /src/node/examples | |
parent | 9c3604774deac203a0eaaeefcc7d1a9ec7b04797 (diff) |
Added pubsub demo client
Diffstat (limited to 'src/node/examples')
-rw-r--r-- | src/node/examples/pubsub/pubsub.proto | 4 | ||||
-rw-r--r-- | src/node/examples/pubsub/pubsub_demo.js | 62 |
2 files changed, 35 insertions, 31 deletions
diff --git a/src/node/examples/pubsub/pubsub.proto b/src/node/examples/pubsub/pubsub.proto index ef88981bd5..41a354773f 100644 --- a/src/node/examples/pubsub/pubsub.proto +++ b/src/node/examples/pubsub/pubsub.proto @@ -34,8 +34,8 @@ syntax = "proto2"; -import "examples/pubsub/empty.proto"; -import "examples/pubsub/label.proto"; +import "empty.proto"; +import "label.proto"; package tech.pubsub; diff --git a/src/node/examples/pubsub/pubsub_demo.js b/src/node/examples/pubsub/pubsub_demo.js index d61fe2a7f0..a9b6acbd7e 100644 --- a/src/node/examples/pubsub/pubsub_demo.js +++ b/src/node/examples/pubsub/pubsub_demo.js @@ -28,7 +28,10 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. var async = require('async'); +var fs = require('fs'); +var GoogleAuth = require('googleauth'); var parseArgs = require('minimist'); +var strftime = require('strftime'); var _ = require('underscore'); var grpc = require('../..'); var PROTO_PATH = __dirname + '/pubsub.proto'; @@ -45,7 +48,7 @@ PubsubRunner.prototype.getTestTopicName = function() { if (this.args.topic_name) { return base_name + this.args.topic_name; } - var now_text = new Date().toLocaleFormat('%Y%m%d%H%M%S%L'); + var now_text = strftime('%Y%m%d%H%M%S%L'); return base_name + process.env.USER + '-' + now_text; }; @@ -54,7 +57,7 @@ PubsubRunner.prototype.getTestSubName = function() { if (this.args.sub_name) { return base_name + this.args.sub_name; } - var now_text = new Date().toLocaleFormat('%Y%m%d%H%M%S%L'); + var now_text = strftime('%Y%m%d%H%M%S%L'); return base_name + process.env.USER + '-' + now_text; }; @@ -77,6 +80,7 @@ PubsubRunner.prototype.topicExists = function(name, callback) { }; PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) { + var self = this; this.topicExists(name, function(err, exists) { if (err) { callback(err); @@ -84,7 +88,7 @@ PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) { if (exists) { callback(null); } else { - this.pub.createTopic({name: name}, callback); + self.pub.createTopic({name: name}, callback); } } }); @@ -153,45 +157,48 @@ PubsubRunner.prototype.checkExists = function(callback) { }; PubsubRunner.prototype.randomPubSub = function(callback) { + var self = this; var topic_name = this.getTestTopicName(); var sub_name = this.getTestSubName(); var subscription = {name: sub_name, topic: topic_name}; async.waterfall([ _.bind(this.createTopicIfNeeded, this, topic_name), - _.bind(this.sub.createSubscription, this, subscription), + _.bind(this.sub.createSubscription, this.sub, subscription), function(resp, cb) { var msg_count = _.random(10, 30); // Set up msg_count messages to publish var message_senders = _.times(msg_count, function(n) { - return _.bind(this.pub.publish, this.pub, { + return _.bind(self.pub.publish, self.pub, { topic: topic_name, - message: {data: 'message ' + n} + message: {data: new Buffer('message ' + n)} }); }); - async.parallel(message_senders, cb); + async.parallel(message_senders, function(err, result) { + cb(err, result, msg_count); + }); }, - function(result, cb) { + function(result, msg_count, cb) { console.log('Sent', msg_count, 'messages to', topic_name + ',', 'checking for them now.'); var batch_request = { subscription: sub_name, max_events: msg_count }; - this.sub.pull_batch(batch_request, cb); + self.sub.pullBatch(batch_request, cb); }, function(batch, cb) { - var ack_ids = _.pluck(batch.pull_responses, 'ack_id'); - console.log('Got', ack_ids.length, 'messages, acknowledging them...'); + var ack_id = _.pluck(batch.pull_responses, 'ack_id'); + console.log('Got', ack_id.length, 'messages, acknowledging them...'); var ack_request = { subscription: sub_name, - ack_ids: ack_ids + ack_id: ack_id }; - this.sub.acknowledge(ack_request, cb); + self.sub.acknowledge(ack_request, cb); }, function(result, cb) { console.log( 'Test messages were acknowledged OK, deleting the subscription'); - this.sub.delete({subscription: sub_name}, cb); + self.sub.deleteSubscription({subscription: sub_name}, cb); } ], function (err, result) { if (err) { @@ -213,23 +220,23 @@ function main(callback) { 'sub_name' ], default: { - host: 'pubsub-testing.googleapis.com', + host: 'pubsub-staging.googleapis.com', oauth_scope: 'https://www.googleapis.com/auth/pubsub', port: 443, - action: 'all', - project: 'stoked-keyword-656' + action: 'listSomeTopics', + project_id: 'stoked-keyword-656' } }); var valid_actions = [ - 'removeTopic', 'createTopic', - 'listSomeTopic', + 'removeTopic', + 'listSomeTopics', 'checkExists', 'randomPubSub' ]; - if (!(argv.action === 'all' || _.some(valid_actions, function(action) { + if (_.some(valid_actions, function(action) { return action === argv.action; - }))) { + })) { callback(new Error('Action was not valid')); } var address = argv.host + ':' + argv.port; @@ -249,17 +256,14 @@ function main(callback) { return; } var ssl_creds = grpc.Credentials.createSsl(ca_data); - var options = {credentials: ssl_creds}; + var options = { + credentials: ssl_creds, + 'grpc.ssl_target_name_override': argv.host + }; var pub = new pubsub.PublisherService(address, options, updateMetadata); var sub = new pubsub.SubscriberService(address, options, updateMetadata); var runner = new PubsubRunner(pub, sub, argv); - if (argv.action === 'all') { - async.series(_.map(valid_actions, function(name) { - _.bind(runner[name], runner); - }), callback); - } else { - runner[argv.action](callback); - } + runner[argv.action](callback); }); }); } |