aboutsummaryrefslogtreecommitdiffhomepage
path: root/examples/tips
diff options
context:
space:
mode:
authorGravatar Chen Wang <chenw@google.com>2015-02-01 20:44:33 -0800
committerGravatar Chen Wang <chenw@google.com>2015-02-01 20:44:33 -0800
commit0010cdae47270a22fd4442835261c796ee565900 (patch)
tree37cf3e04b660803a7194941bb6b6a217a8d0261d /examples/tips
parent04f1aa809a60bda7e64e72863890bba69ac0cfbf (diff)
Add Pull method to subscriber
Diffstat (limited to 'examples/tips')
-rw-r--r--examples/tips/subscriber.cc23
-rw-r--r--examples/tips/subscriber.h2
-rw-r--r--examples/tips/subscriber_test.cc21
3 files changed, 46 insertions, 0 deletions
diff --git a/examples/tips/subscriber.cc b/examples/tips/subscriber.cc
index a482ad6263..2e2370ee2d 100644
--- a/examples/tips/subscriber.cc
+++ b/examples/tips/subscriber.cc
@@ -81,6 +81,29 @@ Status Subscriber::GetSubscription(const grpc::string& name,
return s;
}
+Status Subscriber::Pull(const grpc::string& name,
+ grpc::string* data) {
+ tech::pubsub::PullRequest request;
+ tech::pubsub::PullResponse response;
+ ClientContext context;
+
+ request.set_subscription(name);
+ Status s = stub_->Pull(&context, request, &response);
+ if (s.IsOk()) {
+ tech::pubsub::PubsubEvent event = response.pubsub_event();
+ if (event.has_message()) {
+ *data = event.message().data();
+ }
+ tech::pubsub::AcknowledgeRequest ack;
+ proto2::Empty empty;
+ ClientContext ack_context;
+ ack.set_subscription(name);
+ ack.add_ack_id(response.ack_id());
+ stub_->Acknowledge(&ack_context, ack, &empty);
+ }
+ return s;
+}
+
} // namespace tips
} // namespace examples
} // namespace grpc
diff --git a/examples/tips/subscriber.h b/examples/tips/subscriber.h
index e0491ff919..38345c0c5a 100644
--- a/examples/tips/subscriber.h
+++ b/examples/tips/subscriber.h
@@ -53,6 +53,8 @@ class Subscriber {
Status GetSubscription(const grpc::string& name, grpc::string* topic);
+ Status Pull(const grpc::string& name, grpc::string* data);
+
private:
std::unique_ptr<tech::pubsub::SubscriberService::Stub> stub_;
};
diff --git a/examples/tips/subscriber_test.cc b/examples/tips/subscriber_test.cc
index 4894814252..4ff93643ae 100644
--- a/examples/tips/subscriber_test.cc
+++ b/examples/tips/subscriber_test.cc
@@ -53,6 +53,7 @@ namespace {
const char kTopic[] = "test topic";
const char kSubscriptionName[] = "subscription name";
+const char kData[] = "Message data";
class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service {
public:
@@ -72,6 +73,21 @@ class SubscriberServiceImpl : public tech::pubsub::SubscriberService::Service {
return Status::OK;
}
+ Status Pull(ServerContext* context,
+ const tech::pubsub::PullRequest* request,
+ tech::pubsub::PullResponse* response) override {
+ EXPECT_EQ(request->subscription(), kSubscriptionName);
+ response->set_ack_id("1");
+ response->mutable_pubsub_event()->mutable_message()->set_data(kData);
+ return Status::OK;
+ }
+
+ Status Acknowledge(ServerContext* context,
+ const tech::pubsub::AcknowledgeRequest* request,
+ proto2::Empty* response) override {
+ return Status::OK;
+ }
+
};
class SubscriberTest : public ::testing::Test {
@@ -108,10 +124,15 @@ TEST_F(SubscriberTest, TestSubscriber) {
EXPECT_TRUE(subscriber_->CreateSubscription(kTopic,
kSubscriptionName).IsOk());
+
grpc::string topic;
EXPECT_TRUE(subscriber_->GetSubscription(kSubscriptionName,
&topic).IsOk());
EXPECT_EQ(topic, kTopic);
+
+ grpc::string data;
+ EXPECT_TRUE(subscriber_->Pull(kSubscriptionName,
+ &data).IsOk());
}
} // namespace