diff options
Diffstat (limited to 'src/node/examples/pubsub/pubsub_demo.js')
-rw-r--r-- | src/node/examples/pubsub/pubsub_demo.js | 285 |
1 files changed, 0 insertions, 285 deletions
diff --git a/src/node/examples/pubsub/pubsub_demo.js b/src/node/examples/pubsub/pubsub_demo.js deleted file mode 100644 index 26301515f0..0000000000 --- a/src/node/examples/pubsub/pubsub_demo.js +++ /dev/null @@ -1,285 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -'use strict'; - -var async = require('async'); -var fs = require('fs'); -var GoogleAuth = require('google-auth-library'); -var parseArgs = require('minimist'); -var strftime = require('strftime'); -var _ = require('underscore'); -var grpc = require('../..'); -var PROTO_PATH = __dirname + '/pubsub.proto'; -var pubsub = grpc.load(PROTO_PATH).tech.pubsub; - -function PubsubRunner(pub, sub, args) { - this.pub = pub; - this.sub = sub; - this.args = args; -} - -PubsubRunner.prototype.getTestTopicName = function() { - var base_name = '/topics/' + this.args.project_id + '/'; - if (this.args.topic_name) { - return base_name + this.args.topic_name; - } - var now_text = strftime('%Y%m%d%H%M%S%L'); - return base_name + process.env.USER + '-' + now_text; -}; - -PubsubRunner.prototype.getTestSubName = function() { - var base_name = '/subscriptions/' + this.args.project_id + '/'; - if (this.args.sub_name) { - return base_name + this.args.sub_name; - } - var now_text = strftime('%Y%m%d%H%M%S%L'); - return base_name + process.env.USER + '-' + now_text; -}; - -PubsubRunner.prototype.listProjectTopics = function(callback) { - var q = ('cloud.googleapis.com/project in (/projects/' + - this.args.project_id + ')'); - this.pub.listTopics({query: q}, callback); -}; - -PubsubRunner.prototype.topicExists = function(name, callback) { - this.listProjectTopics(function(err, response) { - if (err) { - callback(err); - } else { - callback(null, _.some(response.topic, function(t) { - return t.name === name; - })); - } - }); -}; - -PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) { - var self = this; - this.topicExists(name, function(err, exists) { - if (err) { - callback(err); - } else{ - if (exists) { - callback(null); - } else { - self.pub.createTopic({name: name}, callback); - } - } - }); -}; - -PubsubRunner.prototype.removeTopic = function(callback) { - var name = this.getTestTopicName(); - console.log('... removing Topic', name); - this.pub.deleteTopic({topic: name}, function(err, value) { - if (err) { - console.log('Could not delete a topic: rpc failed with', err); - callback(err); - } else { - console.log('removed Topic', name, 'OK'); - callback(null); - } - }); -}; - -PubsubRunner.prototype.createTopic = function(callback) { - var name = this.getTestTopicName(); - console.log('... creating Topic', name); - this.pub.createTopic({name: name}, function(err, value) { - if (err) { - console.log('Could not create a topic: rpc failed with', err); - callback(err); - } else { - console.log('created Topic', name, 'OK'); - callback(null); - } - }); -}; - -PubsubRunner.prototype.listSomeTopics = function(callback) { - console.log('Listing topics'); - console.log('-------------_'); - this.listProjectTopics(function(err, response) { - if (err) { - console.log('Could not list topic: rpc failed with', err); - callback(err); - } else { - _.each(response.topic, function(t) { - console.log(t.name); - }); - callback(null); - } - }); -}; - -PubsubRunner.prototype.checkExists = function(callback) { - var name = this.getTestTopicName(); - console.log('... checking for topic', name); - this.topicExists(name, function(err, exists) { - if (err) { - console.log('Could not check for a topics: rpc failed with', err); - callback(err); - } else { - if (exists) { - console.log(name, 'is a topic'); - } else { - console.log(name, 'is not a topic'); - } - callback(null); - } - }); -}; - -PubsubRunner.prototype.randomPubSub = function(callback) { - var self = this; - var topic_name = this.getTestTopicName(); - var sub_name = this.getTestSubName(); - var subscription = {name: sub_name, topic: topic_name}; - async.waterfall([ - _.bind(this.createTopicIfNeeded, this, topic_name), - _.bind(this.sub.createSubscription, this.sub, subscription), - function(resp, cb) { - var msg_count = _.random(10, 30); - // Set up msg_count messages to publish - var message_senders = _.times(msg_count, function(n) { - return _.bind(self.pub.publish, self.pub, { - topic: topic_name, - message: {data: new Buffer('message ' + n)} - }); - }); - async.parallel(message_senders, function(err, result) { - cb(err, result, msg_count); - }); - }, - function(result, msg_count, cb) { - console.log('Sent', msg_count, 'messages to', topic_name + ',', - 'checking for them now.'); - var batch_request = { - subscription: sub_name, - max_events: msg_count - }; - self.sub.pullBatch(batch_request, cb); - }, - function(batch, cb) { - var ack_id = _.pluck(batch.pull_responses, 'ack_id'); - console.log('Got', ack_id.length, 'messages, acknowledging them...'); - var ack_request = { - subscription: sub_name, - ack_id: ack_id - }; - self.sub.acknowledge(ack_request, cb); - }, - function(result, cb) { - console.log( - 'Test messages were acknowledged OK, deleting the subscription'); - self.sub.deleteSubscription({subscription: sub_name}, cb); - } - ], function (err, result) { - if (err) { - console.log('Could not do random pub sub: rpc failed with', err); - } - callback(err, result); - }); -}; - -function main(callback) { - var argv = parseArgs(process.argv, { - string: [ - 'host', - 'oauth_scope', - 'port', - 'action', - 'project_id', - 'topic_name', - 'sub_name' - ], - default: { - host: 'pubsub-staging.googleapis.com', - oauth_scope: 'https://www.googleapis.com/auth/pubsub', - port: 443, - action: 'listSomeTopics', - project_id: 'stoked-keyword-656' - } - }); - var valid_actions = [ - 'createTopic', - 'removeTopic', - 'listSomeTopics', - 'checkExists', - 'randomPubSub' - ]; - if (_.some(valid_actions, function(action) { - return action === argv.action; - })) { - callback(new Error('Action was not valid')); - } - var address = argv.host + ':' + argv.port; - (new GoogleAuth()).getApplicationDefault(function(err, credential) { - if (err) { - callback(err); - return; - } - if (credential.createScopedRequired()) { - credential = credential.createScoped(argv.oauth_scope); - } - var updateMetadata = grpc.getGoogleAuthDelegate(credential); - var ca_path = process.env.SSL_CERT_FILE; - fs.readFile(ca_path, function(err, ca_data) { - if (err) { - callback(err); - return; - } - var ssl_creds = grpc.Credentials.createSsl(ca_data); - var options = { - credentials: ssl_creds, - 'grpc.ssl_target_name_override': argv.host - }; - var pub = new pubsub.PublisherService(address, options, updateMetadata); - var sub = new pubsub.SubscriberService(address, options, updateMetadata); - var runner = new PubsubRunner(pub, sub, argv); - runner[argv.action](callback); - }); - }); -} - -if (require.main === module) { - main(function(err) { - if (err) { - throw err; - } - }); -} - -module.exports = PubsubRunner; |