aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/node/examples/pubsub/pubsub_demo.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/node/examples/pubsub/pubsub_demo.js')
-rw-r--r--src/node/examples/pubsub/pubsub_demo.js285
1 files changed, 0 insertions, 285 deletions
diff --git a/src/node/examples/pubsub/pubsub_demo.js b/src/node/examples/pubsub/pubsub_demo.js
deleted file mode 100644
index 26301515f0..0000000000
--- a/src/node/examples/pubsub/pubsub_demo.js
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-'use strict';
-
-var async = require('async');
-var fs = require('fs');
-var GoogleAuth = require('google-auth-library');
-var parseArgs = require('minimist');
-var strftime = require('strftime');
-var _ = require('underscore');
-var grpc = require('../..');
-var PROTO_PATH = __dirname + '/pubsub.proto';
-var pubsub = grpc.load(PROTO_PATH).tech.pubsub;
-
-function PubsubRunner(pub, sub, args) {
- this.pub = pub;
- this.sub = sub;
- this.args = args;
-}
-
-PubsubRunner.prototype.getTestTopicName = function() {
- var base_name = '/topics/' + this.args.project_id + '/';
- if (this.args.topic_name) {
- return base_name + this.args.topic_name;
- }
- var now_text = strftime('%Y%m%d%H%M%S%L');
- return base_name + process.env.USER + '-' + now_text;
-};
-
-PubsubRunner.prototype.getTestSubName = function() {
- var base_name = '/subscriptions/' + this.args.project_id + '/';
- if (this.args.sub_name) {
- return base_name + this.args.sub_name;
- }
- var now_text = strftime('%Y%m%d%H%M%S%L');
- return base_name + process.env.USER + '-' + now_text;
-};
-
-PubsubRunner.prototype.listProjectTopics = function(callback) {
- var q = ('cloud.googleapis.com/project in (/projects/' +
- this.args.project_id + ')');
- this.pub.listTopics({query: q}, callback);
-};
-
-PubsubRunner.prototype.topicExists = function(name, callback) {
- this.listProjectTopics(function(err, response) {
- if (err) {
- callback(err);
- } else {
- callback(null, _.some(response.topic, function(t) {
- return t.name === name;
- }));
- }
- });
-};
-
-PubsubRunner.prototype.createTopicIfNeeded = function(name, callback) {
- var self = this;
- this.topicExists(name, function(err, exists) {
- if (err) {
- callback(err);
- } else{
- if (exists) {
- callback(null);
- } else {
- self.pub.createTopic({name: name}, callback);
- }
- }
- });
-};
-
-PubsubRunner.prototype.removeTopic = function(callback) {
- var name = this.getTestTopicName();
- console.log('... removing Topic', name);
- this.pub.deleteTopic({topic: name}, function(err, value) {
- if (err) {
- console.log('Could not delete a topic: rpc failed with', err);
- callback(err);
- } else {
- console.log('removed Topic', name, 'OK');
- callback(null);
- }
- });
-};
-
-PubsubRunner.prototype.createTopic = function(callback) {
- var name = this.getTestTopicName();
- console.log('... creating Topic', name);
- this.pub.createTopic({name: name}, function(err, value) {
- if (err) {
- console.log('Could not create a topic: rpc failed with', err);
- callback(err);
- } else {
- console.log('created Topic', name, 'OK');
- callback(null);
- }
- });
-};
-
-PubsubRunner.prototype.listSomeTopics = function(callback) {
- console.log('Listing topics');
- console.log('-------------_');
- this.listProjectTopics(function(err, response) {
- if (err) {
- console.log('Could not list topic: rpc failed with', err);
- callback(err);
- } else {
- _.each(response.topic, function(t) {
- console.log(t.name);
- });
- callback(null);
- }
- });
-};
-
-PubsubRunner.prototype.checkExists = function(callback) {
- var name = this.getTestTopicName();
- console.log('... checking for topic', name);
- this.topicExists(name, function(err, exists) {
- if (err) {
- console.log('Could not check for a topics: rpc failed with', err);
- callback(err);
- } else {
- if (exists) {
- console.log(name, 'is a topic');
- } else {
- console.log(name, 'is not a topic');
- }
- callback(null);
- }
- });
-};
-
-PubsubRunner.prototype.randomPubSub = function(callback) {
- var self = this;
- var topic_name = this.getTestTopicName();
- var sub_name = this.getTestSubName();
- var subscription = {name: sub_name, topic: topic_name};
- async.waterfall([
- _.bind(this.createTopicIfNeeded, this, topic_name),
- _.bind(this.sub.createSubscription, this.sub, subscription),
- function(resp, cb) {
- var msg_count = _.random(10, 30);
- // Set up msg_count messages to publish
- var message_senders = _.times(msg_count, function(n) {
- return _.bind(self.pub.publish, self.pub, {
- topic: topic_name,
- message: {data: new Buffer('message ' + n)}
- });
- });
- async.parallel(message_senders, function(err, result) {
- cb(err, result, msg_count);
- });
- },
- function(result, msg_count, cb) {
- console.log('Sent', msg_count, 'messages to', topic_name + ',',
- 'checking for them now.');
- var batch_request = {
- subscription: sub_name,
- max_events: msg_count
- };
- self.sub.pullBatch(batch_request, cb);
- },
- function(batch, cb) {
- var ack_id = _.pluck(batch.pull_responses, 'ack_id');
- console.log('Got', ack_id.length, 'messages, acknowledging them...');
- var ack_request = {
- subscription: sub_name,
- ack_id: ack_id
- };
- self.sub.acknowledge(ack_request, cb);
- },
- function(result, cb) {
- console.log(
- 'Test messages were acknowledged OK, deleting the subscription');
- self.sub.deleteSubscription({subscription: sub_name}, cb);
- }
- ], function (err, result) {
- if (err) {
- console.log('Could not do random pub sub: rpc failed with', err);
- }
- callback(err, result);
- });
-};
-
-function main(callback) {
- var argv = parseArgs(process.argv, {
- string: [
- 'host',
- 'oauth_scope',
- 'port',
- 'action',
- 'project_id',
- 'topic_name',
- 'sub_name'
- ],
- default: {
- host: 'pubsub-staging.googleapis.com',
- oauth_scope: 'https://www.googleapis.com/auth/pubsub',
- port: 443,
- action: 'listSomeTopics',
- project_id: 'stoked-keyword-656'
- }
- });
- var valid_actions = [
- 'createTopic',
- 'removeTopic',
- 'listSomeTopics',
- 'checkExists',
- 'randomPubSub'
- ];
- if (_.some(valid_actions, function(action) {
- return action === argv.action;
- })) {
- callback(new Error('Action was not valid'));
- }
- var address = argv.host + ':' + argv.port;
- (new GoogleAuth()).getApplicationDefault(function(err, credential) {
- if (err) {
- callback(err);
- return;
- }
- if (credential.createScopedRequired()) {
- credential = credential.createScoped(argv.oauth_scope);
- }
- var updateMetadata = grpc.getGoogleAuthDelegate(credential);
- var ca_path = process.env.SSL_CERT_FILE;
- fs.readFile(ca_path, function(err, ca_data) {
- if (err) {
- callback(err);
- return;
- }
- var ssl_creds = grpc.Credentials.createSsl(ca_data);
- var options = {
- credentials: ssl_creds,
- 'grpc.ssl_target_name_override': argv.host
- };
- var pub = new pubsub.PublisherService(address, options, updateMetadata);
- var sub = new pubsub.SubscriberService(address, options, updateMetadata);
- var runner = new PubsubRunner(pub, sub, argv);
- runner[argv.action](callback);
- });
- });
-}
-
-if (require.main === module) {
- main(function(err) {
- if (err) {
- throw err;
- }
- });
-}
-
-module.exports = PubsubRunner;