diff options
author | 2015-01-30 18:26:16 -0800 | |
---|---|---|
committer | 2015-01-30 18:26:16 -0800 | |
commit | 04f1aa809a60bda7e64e72863890bba69ac0cfbf (patch) | |
tree | 0a708739bf311f465bdda4fd07e7cdb08f81c259 /examples/tips | |
parent | 6de65b01cb933b9b2cd1eb69db6ed763434297aa (diff) |
Implement both Publisher and Subscriber.
Diffstat (limited to 'examples/tips')
-rw-r--r-- | examples/tips/main.cc (renamed from examples/tips/client_main.cc) | 49 | ||||
-rw-r--r-- | examples/tips/publisher.cc (renamed from examples/tips/client.cc) | 30 | ||||
-rw-r--r-- | examples/tips/publisher.h (renamed from examples/tips/client.h) | 14 | ||||
-rw-r--r-- | examples/tips/publisher_test.cc | 144 | ||||
-rw-r--r-- | examples/tips/subscriber.cc | 86 | ||||
-rw-r--r-- | examples/tips/subscriber.h | 64 | ||||
-rw-r--r-- | examples/tips/subscriber_test.cc (renamed from examples/tips/client_test.cc) | 51 |
7 files changed, 404 insertions, 34 deletions
diff --git a/examples/tips/client_main.cc b/examples/tips/main.cc index 5a3a0daab7..94a0bc7995 100644 --- a/examples/tips/client_main.cc +++ b/examples/tips/main.cc @@ -46,7 +46,8 @@ #include <grpc++/credentials.h> #include <grpc++/status.h> -#include "examples/tips/client.h" +#include "examples/tips/publisher.h" +#include "examples/tips/subscriber.h" #include "test/cpp/util/create_test_channel.h" DEFINE_int32(server_port, 443, "Server port."); @@ -54,7 +55,17 @@ DEFINE_string(server_host, "pubsub-staging.googleapis.com", "Server host to connect to"); DEFINE_string(service_account_key_file, "", "Path to service account json key file."); -DEFINE_string(oauth_scope, "", "Scope for OAuth tokens."); +DEFINE_string(oauth_scope, + "https://www.googleapis.com/auth/cloud-platform", + "Scope for OAuth tokens."); + +namespace { + +const char kTopic[] = "/topics/stoked-keyword-656/testtopics"; +const char kSubscriptionName[] = "stoked-keyword-656/testsubscription"; +const char kMessageData[] = "Message Data"; + +} // namespace grpc::string GetServiceAccountJsonKey() { static grpc::string json_key; @@ -94,20 +105,40 @@ int main(int argc, char** argv) { true, // use prod roots creds)); - grpc::examples::tips::Client client(channel); + grpc::examples::tips::Publisher publisher(channel); + grpc::examples::tips::Subscriber subscriber(channel); + + grpc::string topic = kTopic; + + if (publisher.GetTopic(topic).IsOk()) publisher.DeleteTopic(topic); + + grpc::Status s = publisher.CreateTopic(topic); + gpr_log(GPR_INFO, "Create topic returns code %d, %s", + s.code(), s.details().c_str()); + GPR_ASSERT(s.IsOk()); + + s = publisher.GetTopic(topic); + gpr_log(GPR_INFO, "Get topic returns code %d, %s", + s.code(), s.details().c_str()); + GPR_ASSERT(s.IsOk()); - grpc::Status s = client.CreateTopic("/topics/stoked-keyword-656/testtopics"); - gpr_log(GPR_INFO, "return code %d, %s", s.code(), s.details().c_str()); + s = publisher.Publish(topic, kMessageData); + gpr_log(GPR_INFO, "Publish returns code %d, %s", + s.code(), s.details().c_str()); GPR_ASSERT(s.IsOk()); - s = client.GetTopic("/topics/stoked-keyword-656/testtopics"); - gpr_log(GPR_INFO, "return code %d, %s", s.code(), s.details().c_str()); + s = subscriber.CreateSubscription(kTopic, kSubscriptionName); + gpr_log(GPR_INFO, "create subscrption returns code %d, %s", + s.code(), s.details().c_str()); GPR_ASSERT(s.IsOk()); - s = client.DeleteTopic("/topics/stoked-keyword-656/testtopics"); - gpr_log(GPR_INFO, "return code %d, %s", s.code(), s.details().c_str()); + s = publisher.DeleteTopic(kTopic); + gpr_log(GPR_INFO, "Delete topic returns code %d, %s", + s.code(), s.details().c_str()); GPR_ASSERT(s.IsOk()); + subscriber.Shutdown(); + publisher.Shutdown(); channel.reset(); grpc_shutdown(); return 0; diff --git a/examples/tips/client.cc b/examples/tips/publisher.cc index f9d53197ed..238ea596a2 100644 --- a/examples/tips/client.cc +++ b/examples/tips/publisher.cc @@ -33,7 +33,7 @@ #include <grpc++/client_context.h> -#include "examples/tips/client.h" +#include "examples/tips/publisher.h" using tech::pubsub::Topic; using tech::pubsub::DeleteTopicRequest; @@ -41,16 +41,22 @@ using tech::pubsub::GetTopicRequest; using tech::pubsub::PublisherService; using tech::pubsub::ListTopicsRequest; using tech::pubsub::ListTopicsResponse; +using tech::pubsub::PublishRequest; +using tech::pubsub::PubsubMessage; namespace grpc { namespace examples { namespace tips { -Client::Client(std::shared_ptr<ChannelInterface> channel) +Publisher::Publisher(std::shared_ptr<ChannelInterface> channel) : stub_(PublisherService::NewStub(channel)) { } -Status Client::CreateTopic(grpc::string topic) { +void Publisher::Shutdown() { + stub_.reset(); +} + +Status Publisher::CreateTopic(grpc::string topic) { Topic request; Topic response; request.set_name(topic); @@ -59,7 +65,7 @@ Status Client::CreateTopic(grpc::string topic) { return stub_->CreateTopic(&context, request, &response); } -Status Client::ListTopics() { +Status Publisher::ListTopics() { ListTopicsRequest request; ListTopicsResponse response; ClientContext context; @@ -67,7 +73,7 @@ Status Client::ListTopics() { return stub_->ListTopics(&context, request, &response); } -Status Client::GetTopic(grpc::string topic) { +Status Publisher::GetTopic(grpc::string topic) { GetTopicRequest request; Topic response; ClientContext context; @@ -77,7 +83,7 @@ Status Client::GetTopic(grpc::string topic) { return stub_->GetTopic(&context, request, &response); } -Status Client::DeleteTopic(grpc::string topic) { +Status Publisher::DeleteTopic(grpc::string topic) { DeleteTopicRequest request; proto2::Empty response; ClientContext context; @@ -87,6 +93,18 @@ Status Client::DeleteTopic(grpc::string topic) { return stub_->DeleteTopic(&context, request, &response); } +Status Publisher::Publish(const grpc::string& topic, + const grpc::string& data) { + PublishRequest request; + proto2::Empty response; + ClientContext context; + + request.mutable_message()->set_data(data); + request.set_topic(topic); + + return stub_->Publish(&context, request, &response); +} + } // namespace tips } // namespace examples } // namespace grpc diff --git a/examples/tips/client.h b/examples/tips/publisher.h index 661ee5c4af..a97dbe6de2 100644 --- a/examples/tips/client.h +++ b/examples/tips/publisher.h @@ -31,8 +31,8 @@ * */ -#ifndef __GRPCPP_EXAMPLES_TIPS_CLIENT_H_ -#define __GRPCPP_EXAMPLES_TIPS_CLIENT_H_ +#ifndef __GRPCPP_EXAMPLES_TIPS_PUBLISHER_H_ +#define __GRPCPP_EXAMPLES_TIPS_PUBLISHER_H_ #include <grpc++/channel_interface.h> #include <grpc++/status.h> @@ -43,14 +43,18 @@ namespace grpc { namespace examples { namespace tips { -class Client { +class Publisher { public: - Client(std::shared_ptr<grpc::ChannelInterface> channel); + Publisher(std::shared_ptr<grpc::ChannelInterface> channel); + void Shutdown(); + Status CreateTopic(grpc::string topic); Status GetTopic(grpc::string topic); Status DeleteTopic(grpc::string topic); Status ListTopics(); + Status Publish(const grpc::string& topic, const grpc::string& data); + private: std::unique_ptr<tech::pubsub::PublisherService::Stub> stub_; }; @@ -59,4 +63,4 @@ class Client { } // namespace examples } // namespace grpc -#endif // __GRPCPP_EXAMPLES_TIPS_CLIENT_H_ +#endif // __GRPCPP_EXAMPLES_TIPS_PUBLISHER_H_ diff --git a/examples/tips/publisher_test.cc b/examples/tips/publisher_test.cc new file mode 100644 index 0000000000..7f845fed23 --- /dev/null +++ b/examples/tips/publisher_test.cc @@ -0,0 +1,144 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/channel_arguments.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/status.h> +#include <gtest/gtest.h> + +#include "examples/tips/publisher.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +using grpc::ChannelInterface; + +namespace grpc { +namespace testing { +namespace { + +const char kTopic[] = "test topic"; +const char kMessageData[] = "test message data"; + +class PublisherServiceImpl : public tech::pubsub::PublisherService::Service { + public: + Status CreateTopic(::grpc::ServerContext* context, + const ::tech::pubsub::Topic* request, + ::tech::pubsub::Topic* response) override { + EXPECT_EQ(request->name(), kTopic); + return Status::OK; + } + + Status Publish(ServerContext* context, + const ::tech::pubsub::PublishRequest* request, + ::proto2::Empty* response) override { + EXPECT_EQ(request->message().data(), kMessageData); + return Status::OK; + } + + Status GetTopic(ServerContext* context, + const ::tech::pubsub::GetTopicRequest* request, + ::tech::pubsub::Topic* response) override { + EXPECT_EQ(request->topic(), kTopic); + return Status::OK; + } + + Status ListTopics(ServerContext* context, + const ::tech::pubsub::ListTopicsRequest* request, + ::tech::pubsub::ListTopicsResponse* response) override { + return Status::OK; + } + + Status DeleteTopic(ServerContext* context, + const ::tech::pubsub::DeleteTopicRequest* request, + ::proto2::Empty* response) override { + EXPECT_EQ(request->topic(), kTopic); + return Status::OK; + } + +}; + +class PublisherTest : public ::testing::Test { + protected: + // Setup a server and a client for PublisherService. + void SetUp() override { + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + ServerBuilder builder; + builder.AddPort(server_address_.str()); + builder.RegisterService(service_.service()); + server_ = builder.BuildAndStart(); + + channel_ = CreateChannel(server_address_.str(), ChannelArguments()); + + publisher_.reset(new grpc::examples::tips::Publisher(channel_)); + } + + void TearDown() override { + server_->Shutdown(); + publisher_->Shutdown(); + } + + std::ostringstream server_address_; + std::unique_ptr<Server> server_; + PublisherServiceImpl service_; + + std::shared_ptr<ChannelInterface> channel_; + + std::unique_ptr<grpc::examples::tips::Publisher> publisher_; +}; + +TEST_F(PublisherTest, TestPublisher) { + EXPECT_TRUE(publisher_->CreateTopic(kTopic).IsOk()); + EXPECT_TRUE(publisher_->Publish(kTopic, kMessageData).IsOk()); + EXPECT_TRUE(publisher_->GetTopic(kTopic).IsOk()); + EXPECT_TRUE(publisher_->ListTopics().IsOk()); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + grpc_init(); + ::testing::InitGoogleTest(&argc, argv); + gpr_log(GPR_INFO, "Start test ..."); + int result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/examples/tips/subscriber.cc b/examples/tips/subscriber.cc new file mode 100644 index 0000000000..a482ad6263 --- /dev/null +++ b/examples/tips/subscriber.cc @@ -0,0 +1,86 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/client_context.h> + +#include "examples/tips/subscriber.h" + +using tech::pubsub::Topic; +using tech::pubsub::DeleteTopicRequest; +using tech::pubsub::GetTopicRequest; +using tech::pubsub::SubscriberService; +using tech::pubsub::ListTopicsRequest; +using tech::pubsub::ListTopicsResponse; +using tech::pubsub::PublishRequest; +using tech::pubsub::PubsubMessage; + +namespace grpc { +namespace examples { +namespace tips { + +Subscriber::Subscriber(std::shared_ptr<ChannelInterface> channel) + : stub_(SubscriberService::NewStub(channel)) { +} + +void Subscriber::Shutdown() { + stub_.reset(); +} + +Status Subscriber::CreateSubscription(const grpc::string& topic, + const grpc::string& name) { + tech::pubsub::Subscription request; + tech::pubsub::Subscription response; + ClientContext context; + + request.set_topic(topic); + request.set_name(name); + + return stub_->CreateSubscription(&context, request, &response); +} + +Status Subscriber::GetSubscription(const grpc::string& name, + grpc::string* topic) { + tech::pubsub::GetSubscriptionRequest request; + tech::pubsub::Subscription response; + ClientContext context; + + request.set_subscription(name); + + Status s = stub_->GetSubscription(&context, request, &response); + *topic = response.topic(); + return s; +} + +} // namespace tips +} // namespace examples +} // namespace grpc diff --git a/examples/tips/subscriber.h b/examples/tips/subscriber.h new file mode 100644 index 0000000000..e0491ff919 --- /dev/null +++ b/examples/tips/subscriber.h @@ -0,0 +1,64 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPCPP_EXAMPLES_TIPS_SUBSCRIBER_H_ +#define __GRPCPP_EXAMPLES_TIPS_SUBSCRIBER_H_ + +#include <grpc++/channel_interface.h> +#include <grpc++/status.h> + +#include "examples/tips/pubsub.pb.h" + +namespace grpc { +namespace examples { +namespace tips { + +class Subscriber { + public: + Subscriber(std::shared_ptr<grpc::ChannelInterface> channel); + void Shutdown(); + + Status CreateSubscription(const grpc::string& topic, + const grpc::string& name); + + Status GetSubscription(const grpc::string& name, grpc::string* topic); + + private: + std::unique_ptr<tech::pubsub::SubscriberService::Stub> stub_; +}; + +} // namespace tips +} // namespace examples +} // namespace grpc + +#endif // __GRPCPP_EXAMPLES_TIPS_SUBSCRIBER_H_ diff --git a/examples/tips/client_test.cc b/examples/tips/subscriber_test.cc index 69238f2c6f..4894814252 100644 --- a/examples/tips/client_test.cc +++ b/examples/tips/subscriber_test.cc @@ -41,7 +41,7 @@ #include <grpc++/status.h> #include <gtest/gtest.h> -#include "examples/tips/client.h" +#include "examples/tips/subscriber.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -52,43 +52,66 @@ namespace testing { namespace { const char kTopic[] = "test topic"; +const char kSubscriptionName[] = "subscription name"; -class PublishServiceImpl : public tech::pubsub::PublisherService::Service { +class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service { public: - Status CreateTopic(::grpc::ServerContext* context, - const ::tech::pubsub::Topic* request, - ::tech::pubsub::Topic* response) override { - EXPECT_EQ(request->name(), kTopic); + Status CreateSubscription(ServerContext* context, + const tech::pubsub::Subscription* request, + tech::pubsub::Subscription* response) override { + EXPECT_EQ(request->topic(), kTopic); + EXPECT_EQ(request->name(), kSubscriptionName); return Status::OK; } + + Status GetSubscription(ServerContext* context, + const tech::pubsub::GetSubscriptionRequest* request, + tech::pubsub::Subscription* response) override { + EXPECT_EQ(request->subscription(), kSubscriptionName); + response->set_topic(kTopic); + return Status::OK; + } + }; -class End2endTest : public ::testing::Test { +class SubscriberTest : public ::testing::Test { protected: + // Setup a server and a client for SubscriberService. void SetUp() override { int port = grpc_pick_unused_port_or_die(); server_address_ << "localhost:" << port; - // Setup server ServerBuilder builder; builder.AddPort(server_address_.str()); builder.RegisterService(service_.service()); server_ = builder.BuildAndStart(); channel_ = CreateChannel(server_address_.str(), ChannelArguments()); + + subscriber_.reset(new grpc::examples::tips::Subscriber(channel_)); } - void TearDown() override { server_->Shutdown(); } + void TearDown() override { + server_->Shutdown(); + subscriber_->Shutdown(); + } - std::unique_ptr<Server> server_; std::ostringstream server_address_; - PublishServiceImpl service_; + std::unique_ptr<Server> server_; + SubscriberServiceImpl service_; std::shared_ptr<ChannelInterface> channel_; + + std::unique_ptr<grpc::examples::tips::Subscriber> subscriber_; }; -TEST_F(End2endTest, CreateTopic) { - grpc::examples::tips::Client client(channel_); - client.CreateTopic(kTopic); +TEST_F(SubscriberTest, TestSubscriber) { + EXPECT_TRUE(subscriber_->CreateSubscription(kTopic, + kSubscriptionName).IsOk()); + + grpc::string topic; + EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName, + &topic).IsOk()); + EXPECT_EQ(topic, kTopic); } } // namespace |