diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/tips/client.cc | 60 | ||||
-rw-r--r-- | examples/tips/client.h | 54 | ||||
-rw-r--r-- | examples/tips/client_main.cc | 73 | ||||
-rw-r--r-- | examples/tips/client_test.cc | 106 | ||||
-rw-r--r-- | examples/tips/empty.proto | 13 | ||||
-rw-r--r-- | examples/tips/label.proto | 48 | ||||
-rw-r--r-- | examples/tips/pubsub.proto | 702 |
7 files changed, 1056 insertions, 0 deletions
diff --git a/examples/tips/client.cc b/examples/tips/client.cc new file mode 100644 index 0000000000..695ff80e8a --- /dev/null +++ b/examples/tips/client.cc @@ -0,0 +1,60 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/client_context.h> + +#include "examples/tips/client.h" + +using tech::pubsub::Topic; +using tech::pubsub::PublisherService; + +namespace grpc { +namespace examples { +namespace tips { + +Client::Client(std::shared_ptr<ChannelInterface> channel) + : stub_(PublisherService::NewStub(channel)) { +} + +Status Client::CreateTopic(grpc::string topic) { + Topic request; + Topic response; + request.set_name(topic); + ClientContext context; + + return stub_->CreateTopic(&context, request, &response); +} + +} // namespace tips +} // namespace examples +} // namespace grpc diff --git a/examples/tips/client.h b/examples/tips/client.h new file mode 100644 index 0000000000..3f4f1fd836 --- /dev/null +++ b/examples/tips/client.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/channel_interface.h> +#include <grpc++/status.h> + +#include "examples/tips/pubsub.pb.h" + +namespace grpc { +namespace examples { +namespace tips { + +class Client { + public: + Client(std::shared_ptr<grpc::ChannelInterface> channel); + Status CreateTopic(grpc::string topic); + + private: + std::unique_ptr<tech::pubsub::PublisherService::Stub> stub_; +}; + +} // namespace tips +} // namespace examples +} // namespace grpc diff --git a/examples/tips/client_main.cc b/examples/tips/client_main.cc new file mode 100644 index 0000000000..17567b6f17 --- /dev/null +++ b/examples/tips/client_main.cc @@ -0,0 +1,73 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include <google/gflags.h> +#include <grpc++/channel_interface.h> +#include <grpc++/create_channel.h> +#include <grpc++/status.h> + +#include "examples/tips/client.h" +#include "test/cpp/util/create_test_channel.h" + +DEFINE_bool(enable_ssl, true, "Whether to use ssl/tls."); +DEFINE_bool(use_prod_roots, true, "True to use SSL roots for production GFE"); +DEFINE_int32(server_port, 0, "Server port."); +DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); +DEFINE_string(server_host_override, "foo.test.google.com", + "Override the server host which is sent in HTTP header"); + +int main(int argc, char** argv) { + grpc_init(); + google::ParseCommandLineFlags(&argc, &argv, true); + gpr_log(GPR_INFO, "Start TIPS client"); + + GPR_ASSERT(FLAGS_server_port); + const int host_port_buf_size = 1024; + char host_port[host_port_buf_size]; + snprintf(host_port, host_port_buf_size, "%s:%d", FLAGS_server_host.c_str(), + FLAGS_server_port); + + std::shared_ptr<grpc::ChannelInterface> channel( + grpc::CreateTestChannel(host_port, FLAGS_server_host_override, + FLAGS_enable_ssl, FLAGS_use_prod_roots)); + + grpc::examples::tips::Client client(channel); + grpc::Status s = client.CreateTopic("test"); + GPR_ASSERT(s.IsOk()); + + channel.reset(); + grpc_shutdown(); + return 0; +} diff --git a/examples/tips/client_test.cc b/examples/tips/client_test.cc new file mode 100644 index 0000000000..69238f2c6f --- /dev/null +++ b/examples/tips/client_test.cc @@ -0,0 +1,106 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/channel_arguments.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/status.h> +#include <gtest/gtest.h> + +#include "examples/tips/client.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +using grpc::ChannelInterface; + +namespace grpc { +namespace testing { +namespace { + +const char kTopic[] = "test topic"; + +class PublishServiceImpl : public tech::pubsub::PublisherService::Service { + public: + Status CreateTopic(::grpc::ServerContext* context, + const ::tech::pubsub::Topic* request, + ::tech::pubsub::Topic* response) override { + EXPECT_EQ(request->name(), kTopic); + return Status::OK; + } +}; + +class End2endTest : public ::testing::Test { + protected: + void SetUp() override { + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + // Setup server + ServerBuilder builder; + builder.AddPort(server_address_.str()); + builder.RegisterService(service_.service()); + server_ = builder.BuildAndStart(); + + channel_ = CreateChannel(server_address_.str(), ChannelArguments()); + } + + void TearDown() override { server_->Shutdown(); } + + std::unique_ptr<Server> server_; + std::ostringstream server_address_; + PublishServiceImpl service_; + + std::shared_ptr<ChannelInterface> channel_; +}; + +TEST_F(End2endTest, CreateTopic) { + grpc::examples::tips::Client client(channel_); + client.CreateTopic(kTopic); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + grpc_init(); + ::testing::InitGoogleTest(&argc, argv); + gpr_log(GPR_INFO, "Start test ..."); + int result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/examples/tips/empty.proto b/examples/tips/empty.proto new file mode 100644 index 0000000000..adf66b5e61 --- /dev/null +++ b/examples/tips/empty.proto @@ -0,0 +1,13 @@ +syntax = "proto2"; + +package proto2; + +// An empty message that you can re-use to avoid defining duplicated empty +// messages in your project. A typical example is to use it as argument or the +// return value of a service API. For instance: +// +// service Foo { +// rpc Bar (proto2.Empty) returns (proto2.Empty) { }; +// }; +// +message Empty {} diff --git a/examples/tips/label.proto b/examples/tips/label.proto new file mode 100644 index 0000000000..e93ac9dea3 --- /dev/null +++ b/examples/tips/label.proto @@ -0,0 +1,48 @@ +// Labels provide a way to associate user-defined metadata with various +// objects. Labels may be used to organize objects into non-hierarchical +// groups; think metadata tags attached to mp3s. + +syntax = "proto2"; + +package tech.label; + +// A key-value pair applied to a given object. +message Label { + // The key of a label is a syntactically valid URL (as per RFC 1738) with + // the "scheme" and initial slashes omitted and with the additional + // restrictions noted below. Each key should be globally unique. The + // "host" portion is called the "namespace" and is not necessarily + // resolvable to a network endpoint. Instead, the namespace indicates what + // system or entity defines the semantics of the label. Namespaces do not + // restrict the set of objects to which a label may be associated. + // + // Keys are defined by the following grammar: + // + // key = hostname "/" kpath + // kpath = ksegment *[ "/" ksegment ] + // ksegment = alphadigit | *[ alphadigit | "-" | "_" | "." ] + // + // where "hostname" and "alphadigit" are defined as in RFC 1738. + // + // Example key: + // spanner.google.com/universe + required string key = 1; + + // The value of the label. + oneof value { + // A string value. + string str_value = 2; + // An integer value. + int64 num_value = 3; + } +} + +// A collection of labels, such as the set of all labels attached to an +// object. Each label in the set must have a different key. +// +// Users should prefer to embed "repeated Label" directly when possible. +// This message should only be used in cases where that isn't possible (e.g. +// with oneof). +message Labels { + repeated Label label = 1; +} diff --git a/examples/tips/pubsub.proto b/examples/tips/pubsub.proto new file mode 100644 index 0000000000..0b3bd5d012 --- /dev/null +++ b/examples/tips/pubsub.proto @@ -0,0 +1,702 @@ +// Specification of the Pubsub API. + +syntax = "proto2"; + +import "examples/tips/empty.proto"; +import "examples/tips/label.proto"; + +package tech.pubsub; + +// ----------------------------------------------------------------------------- +// Overview of the Pubsub API +// ----------------------------------------------------------------------------- + +// This file describes an API for a Pubsub system. This system provides a +// reliable many-to-many communication mechanism between independently written +// publishers and subscribers where the publisher publishes messages to "topics" +// and each subscriber creates a "subscription" and consumes messages from it. +// +// (a) The pubsub system maintains bindings between topics and subscriptions. +// (b) A publisher publishes messages into a topic. +// (c) The pubsub system delivers messages from topics into relevant +// subscriptions. +// (d) A subscriber receives pending messages from its subscription and +// acknowledges or nacks each one to the pubsub system. +// (e) The pubsub system removes acknowledged messages from that subscription. + +// ----------------------------------------------------------------------------- +// Data Model +// ----------------------------------------------------------------------------- + +// The data model consists of the following: +// +// * Topic: A topic is a resource to which messages are published by publishers. +// Topics are named, and the name of the topic is unique within the pubsub +// system. +// +// * Subscription: A subscription records the subscriber's interest in a topic. +// It can optionally include a query to select a subset of interesting +// messages. The pubsub system maintains a logical cursor tracking the +// matching messages which still need to be delivered and acked so that +// they can retried as needed. The set of messages that have not been +// acknowledged is called the subscription backlog. +// +// * Message: A message is a unit of data that flows in the system. It contains +// opaque data from the publisher along with its labels. +// +// * Message Labels (optional): A set of opaque key, value pairs assigned +// by the publisher which the subscriber can use for filtering out messages +// in the topic. For example, a label with key "foo.com/device_type" and +// value "mobile" may be added for messages that are only relevant for a +// mobile subscriber; a subscriber on a phone may decide to create a +// subscription only for messages that have this label. + +// ----------------------------------------------------------------------------- +// Publisher Flow +// ----------------------------------------------------------------------------- + +// A publisher publishes messages to the topic using the Publish request: +// +// PubsubMessage message; +// message.set_data("...."); +// Label label; +// label.set_key("foo.com/key1"); +// label.set_str_value("value1"); +// message.add_label(label); +// PublishRequest request; +// request.set_topic("topicName"); +// request.set_message(message); +// PublisherService.Publish(request); + +// ----------------------------------------------------------------------------- +// Subscriber Flow +// ----------------------------------------------------------------------------- + +// The subscriber part of the API is richer than the publisher part and has a +// number of concepts w.r.t. subscription creation and monitoring: +// +// (1) A subscriber creates a subscription using the CreateSubscription call. +// It may specify an optional "query" to indicate that it wants to receive +// only messages with a certain set of labels using the label query syntax. +// It may also specify an optional truncation policy to indicate when old +// messages from the subcription can be removed. +// +// (2) A subscriber receives messages in one of two ways: via push or pull. +// +// (a) To receive messages via push, the PushConfig field must be specified in +// the Subscription parameter when creating a subscription. The PushConfig +// specifies an endpoint at which the subscriber must expose the +// PushEndpointService. Messages are received via the HandlePubsubEvent +// method. The push subscriber responds to the HandlePubsubEvent method +// with a result code that indicates one of three things: Ack (the message +// has been successfully processed and the Pubsub system may delete it), +// Nack (the message has been rejected, the Pubsub system should resend it +// at a later time), or Push-Back (this is a Nack with the additional +// semantics that the subscriber is overloaded and the pubsub system should +// back off on the rate at which it is invoking HandlePubsubEvent). The +// endpoint may be a load balancer for better scalability. +// +// (b) To receive messages via pull a subscriber calls the Pull method on the +// SubscriberService to get messages from the subscription. For each +// individual message, the subscriber may use the ack_id received in the +// PullResponse to Ack the message, Nack the message, or modify the ack +// deadline with ModifyAckDeadline. See the +// Subscription.ack_deadline_seconds field documentation for details on the +// ack deadline behavior. +// +// Note: Messages may be consumed in parallel by multiple subscribers making +// Pull calls to the same subscription; this will result in the set of +// messages from the subscription being shared and each subscriber +// receiving a subset of the messages. +// +// (4) The subscriber can explicitly truncate the current subscription. +// +// (5) "Truncated" events are delivered when a subscription is +// truncated, whether due to the subscription's truncation policy +// or an explicit request from the subscriber. +// +// Subscription creation: +// +// Subscription subscription; +// subscription.set_topic("topicName"); +// subscription.set_name("subscriptionName"); +// subscription.push_config().set_push_endpoint("machinename:8888"); +// SubscriberService.CreateSubscription(subscription); +// +// Consuming messages via push: +// +// TODO(eschapira): Add HTTP push example. +// +// The port 'machinename:8888' must be bound to a stubby server that implements +// the PushEndpointService with the following method: +// +// int HandlePubsubEvent(PubsubEvent event) { +// if (event.subscription().equals("subscriptionName")) { +// if (event.has_message()) { +// Process(event.message().data()); +// } else if (event.truncated()) { +// ProcessTruncatedEvent(); +// } +// } +// return OK; // This return code implies an acknowledgment +// } +// +// Consuming messages via pull: +// +// The subscription must be created without setting the push_config field. +// +// PullRequest pull_request; +// pull_request.set_subscription("subscriptionName"); +// pull_request.set_return_immediately(false); +// while (true) { +// PullResponse pull_response; +// if (SubscriberService.Pull(pull_request, pull_response) == OK) { +// PubsubEvent event = pull_response.pubsub_event(); +// if (event.has_message()) { +// Process(event.message().data()); +// } else if (event.truncated()) { +// ProcessTruncatedEvent(); +// } +// AcknowledgeRequest ack_request; +// ackRequest.set_subscription("subscriptionName"); +// ackRequest.set_ack_id(pull_response.ack_id()); +// SubscriberService.Acknowledge(ack_request); +// } +// } + +// ----------------------------------------------------------------------------- +// Reliability Semantics +// ----------------------------------------------------------------------------- + +// When a subscriber successfully creates a subscription using +// Subscriber.CreateSubscription, it establishes a "subscription point" with +// respect to that subscription - the subscriber is guaranteed to receive any +// message published after this subscription point that matches the +// subscription's query. Note that messages published before the Subscription +// point may or may not be delivered. +// +// If the system truncates the subscription according to the specified +// truncation policy, the system delivers a subscription status event with the +// "truncated" field set to true. We refer to such events as "truncation +// events". A truncation event: +// +// * Informs the subscriber that part of the subscription messages have been +// discarded. The subscriber may want to recover from the message loss, e.g., +// by resyncing its state with its backend. +// * Establishes a new subscription point, i.e., the subscriber is guaranteed to +// receive all changes published after the trunction event is received (or +// until another truncation event is received). +// +// Note that messages are not delivered in any particular order by the pubsub +// system. Furthermore, the system guarantees at-least-once delivery +// of each message or truncation events until acked. + +// ----------------------------------------------------------------------------- +// Deletion +// ----------------------------------------------------------------------------- + +// Both topics and subscriptions may be deleted. Deletion of a topic implies +// deletion of all attached subscriptions. +// +// When a subscription is deleted directly by calling DeleteSubscription, all +// messages are immediately dropped. If it is a pull subscriber, future pull +// requests will return NOT_FOUND. +// +// When a topic is deleted all corresponding subscriptions are immediately +// deleted, and subscribers experience the same behavior as directly deleting +// the subscription. + +// ----------------------------------------------------------------------------- +// The Publisher service and its protos. +// ----------------------------------------------------------------------------- + +// The service that an application uses to manipulate topics, and to send +// messages to a topic. +service PublisherService { + + // Creates the given topic with the given name. + rpc CreateTopic(Topic) returns (Topic) { + } + + // Adds a message to the topic. Returns NOT_FOUND if the topic does not + // exist. + // (-- For different error code values returned via Stubby, see + // util/task/codes.proto. --) + rpc Publish(PublishRequest) returns (proto2.Empty) { + } + + // Adds one or more messages to the topic. Returns NOT_FOUND if the topic does + // not exist. + rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse) { + } + + // Gets the configuration of a topic. Since the topic only has the name + // attribute, this method is only useful to check the existence of a topic. + // If other attributes are added in the future, they will be returned here. + rpc GetTopic(GetTopicRequest) returns (Topic) { + } + + // Lists matching topics. + rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse) { + } + + // Deletes the topic with the given name. All subscriptions to this topic + // are also deleted. Returns NOT_FOUND if the topic does not exist. + // After a topic is deleted, a new topic may be created with the same name. + rpc DeleteTopic(DeleteTopicRequest) returns (proto2.Empty) { + } +} + +// A topic resource. +message Topic { + // Name of the topic. + optional string name = 1; +} + +// A message data and its labels. +message PubsubMessage { + // The message payload. + optional bytes data = 1; + + // Optional list of labels for this message. Keys in this collection must + // be unique. + //(-- TODO(eschapira): Define how key namespace may be scoped to the topic.--) + repeated tech.label.Label label = 2; + + // ID of this message assigned by the server at publication time. Guaranteed + // to be unique within the topic. This value may be read by a subscriber + // that receives a PubsubMessage via a Pull call or a push delivery. It must + // not be populated by a publisher in a Publish call. + optional string message_id = 3; +} + +// Request for the GetTopic method. +message GetTopicRequest { + // The name of the topic to get. + optional string topic = 1; +} + +// Request for the Publish method. +message PublishRequest { + // The message in the request will be published on this topic. + optional string topic = 1; + + // The message to publish. + optional PubsubMessage message = 2; +} + +// Request for the PublishBatch method. +message PublishBatchRequest { + // The messages in the request will be published on this topic. + optional string topic = 1; + + // The messages to publish. + repeated PubsubMessage messages = 2; +} + +// Response for the PublishBatch method. +message PublishBatchResponse { + // The server-assigned ID of each published message, in the same order as + // the messages in the request. IDs are guaranteed to be unique within + // the topic. + repeated string message_ids = 1; +} + +// Request for the ListTopics method. +message ListTopicsRequest { + // A valid label query expression. + // (-- Which labels are required or supported is implementation-specific. --) + optional string query = 1; + + // Maximum number of topics to return. + // (-- If not specified or <= 0, the implementation will select a reasonable + // value. --) + optional int32 max_results = 2; + + // The value obtained in the last <code>ListTopicsResponse</code> + // for continuation. + optional string page_token = 3; + +} + +// Response for the ListTopics method. +message ListTopicsResponse { + // The resulting topics. + repeated Topic topic = 1; + + // If not empty, indicates that there are more topics that match the request, + // and this value should be passed to the next <code>ListTopicsRequest</code> + // to continue. + optional string next_page_token = 2; +} + +// Request for the Delete method. +message DeleteTopicRequest { + // Name of the topic to delete. + optional string topic = 1; +} + +// ----------------------------------------------------------------------------- +// The Subscriber service and its protos. +// ----------------------------------------------------------------------------- + +// The service that an application uses to manipulate subscriptions and to +// consume messages from a subscription via the pull method. +service SubscriberService { + + // Creates a subscription on a given topic for a given subscriber. + // If the subscription already exists, returns ALREADY_EXISTS. + // If the corresponding topic doesn't exist, returns NOT_FOUND. + // + // If the name is not provided in the request, the server will assign a random + // name for this subscription on the same project as the topic. + rpc CreateSubscription(Subscription) returns (Subscription) { + } + + // Gets the configuration details of a subscription. + rpc GetSubscription(GetSubscriptionRequest) returns (Subscription) { + } + + // Lists matching subscriptions. + rpc ListSubscriptions(ListSubscriptionsRequest) + returns (ListSubscriptionsResponse) { + } + + // Deletes an existing subscription. All pending messages in the subscription + // are immediately dropped. Calls to Pull after deletion will return + // NOT_FOUND. + rpc DeleteSubscription(DeleteSubscriptionRequest) returns (proto2.Empty) { + } + + // Removes all the pending messages in the subscription and releases the + // storage associated with them. Results in a truncation event to be sent to + // the subscriber. Messages added after this call returns are stored in the + // subscription as before. + rpc TruncateSubscription(TruncateSubscriptionRequest) returns (proto2.Empty) { + } + + // + // Push subscriber calls. + // + + // Modifies the <code>PushConfig</code> for a specified subscription. + // This method can be used to suspend the flow of messages to an endpoint + // by clearing the <code>PushConfig</code> field in the request. Messages + // will be accumulated for delivery even if no push configuration is + // defined or while the configuration is modified. + rpc ModifyPushConfig(ModifyPushConfigRequest) returns (proto2.Empty) { + } + + // + // Pull Subscriber calls + // + + // Pulls a single message from the server. + // If return_immediately is true, and no messages are available in the + // subscription, this method returns FAILED_PRECONDITION. The system is free + // to return an UNAVAILABLE error if no messages are available in a + // reasonable amount of time (to reduce system load). + rpc Pull(PullRequest) returns (PullResponse) { + } + + // Pulls messages from the server. Returns an empty list if there are no + // messages available in the backlog. The system is free to return UNAVAILABLE + // if there are too many pull requests outstanding for the given subscription. + rpc PullBatch(PullBatchRequest) returns (PullBatchResponse) { + } + + // Modifies the Ack deadline for a message received from a pull request. + rpc ModifyAckDeadline(ModifyAckDeadlineRequest) returns (proto2.Empty) { + } + + // Acknowledges a particular received message: the Pub/Sub system can remove + // the given message from the subscription. Acknowledging a message whose + // Ack deadline has expired may succeed, but the message could have been + // already redelivered. Acknowledging a message more than once will not + // result in an error. This is only used for messages received via pull. + rpc Acknowledge(AcknowledgeRequest) returns (proto2.Empty) { + } + + // Refuses processing a particular received message. The system will + // redeliver this message to some consumer of the subscription at some + // future time. This is only used for messages received via pull. + rpc Nack(NackRequest) returns (proto2.Empty) { + } +} + +// A subscription resource. +message Subscription { + // Name of the subscription. + optional string name = 1; + + // The name of the topic from which this subscription is receiving messages. + optional string topic = 2; + + // If <code>query</code> is non-empty, only messages on the subscriber's + // topic whose labels match the query will be returned. Otherwise all + // messages on the topic will be returned. + // (-- The query syntax is described in tech/label/proto/label_query.proto --) + optional string query = 3; + + // The subscriber may specify requirements for truncating unacknowledged + // subscription entries. The system will honor the + // <code>CreateSubscription</code> request only if it can meet these + // requirements. If this field is not specified, messages are never truncated + // by the system. + optional TruncationPolicy truncation_policy = 4; + + // Specifies which messages can be truncated by the system. + message TruncationPolicy { + oneof policy { + // If <code>max_bytes</code> is specified, the system is allowed to drop + // old messages to keep the combined size of stored messages under + // <code>max_bytes</code>. This is a hint; the system may keep more than + // this many bytes, but will make a best effort to keep the size from + // growing much beyond this parameter. + int64 max_bytes = 1; + + // If <code>max_age_seconds</code> is specified, the system is allowed to + // drop messages that have been stored for at least this many seconds. + // This is a hint; the system may keep these messages, but will make a + // best effort to remove them when their maximum age is reached. + int64 max_age_seconds = 2; + } + } + + // If push delivery is used with this subscription, this field is + // used to configure it. + optional PushConfig push_config = 5; + + // For either push or pull delivery, the value is the maximum time after a + // subscriber receives a message before the subscriber should acknowledge or + // Nack the message. If the Ack deadline for a message passes without an + // Ack or a Nack, the Pub/Sub system will eventually redeliver the message. + // If a subscriber acknowledges after the deadline, the Pub/Sub system may + // accept the Ack, but it is possible that the message has been already + // delivered again. Multiple Acks to the message are allowed and will + // succeed. + // + // For push delivery, this value is used to set the request timeout for + // the call to the push endpoint. + // + // For pull delivery, this value is used as the initial value for the Ack + // deadline. It may be overridden for a specific pull request (message) with + // <code>ModifyAckDeadline</code>. + // While a message is outstanding (i.e. it has been delivered to a pull + // subscriber and the subscriber has not yet Acked or Nacked), the Pub/Sub + // system will not deliver that message to another pull subscriber + // (on a best-effort basis). + optional int32 ack_deadline_seconds = 6; + + // If this parameter is set to n, the system is allowed to (but not required + // to) delete the subscription when at least n seconds have elapsed since the + // client presence was detected. (Presence is detected through any + // interaction using the subscription ID, including Pull(), Get(), or + // acknowledging a message.) + // + // If this parameter is not set, the subscription will stay live until + // explicitly deleted. + // + // Clients can detect such garbage collection when a Get call or a Pull call + // (for pull subscribers only) returns NOT_FOUND. + optional int64 garbage_collect_seconds = 7; +} + +// Configuration for a push delivery endpoint. +message PushConfig { + // A URL locating the endpoint to which messages should be pushed. + // For example, a Webhook endpoint might use "https://example.com/push". + // (-- An Android application might use "gcm:<REGID>", where <REGID> is a + // GCM registration id allocated for pushing messages to the application. --) + optional string push_endpoint = 1; +} + +// An event indicating a received message or truncation event. +message PubsubEvent { + // The subscription that received the event. + optional string subscription = 1; + + oneof type { + // A received message. + PubsubMessage message = 2; + + // Indicates that this subscription has been truncated. + bool truncated = 3; + + // Indicates that this subscription has been deleted. (Note that pull + // subscribers will always receive NOT_FOUND in response in their pull + // request on the subscription, rather than seeing this boolean.) + bool deleted = 4; + } +} + +// Request for the GetSubscription method. +message GetSubscriptionRequest { + // The name of the subscription to get. + optional string subscription = 1; +} + +// Request for the ListSubscriptions method. +message ListSubscriptionsRequest { + // A valid label query expression. + // (-- Which labels are required or supported is implementation-specific. + // TODO(eschapira): This method must support to query by topic. We must + // define the key URI for the "topic" label. --) + optional string query = 1; + + // Maximum number of subscriptions to return. + // (-- If not specified or <= 0, the implementation will select a reasonable + // value. --) + optional int32 max_results = 3; + + // The value obtained in the last <code>ListSubscriptionsResponse</code> + // for continuation. + optional string page_token = 4; +} + +// Response for the ListSubscriptions method. +message ListSubscriptionsResponse { + // The subscriptions that match the request. + repeated Subscription subscription = 1; + + // If not empty, indicates that there are more subscriptions that match the + // request and this value should be passed to the next + // <code>ListSubscriptionsRequest</code> to continue. + optional string next_page_token = 2; +} + +// Request for the TruncateSubscription method. +message TruncateSubscriptionRequest { + // The subscription that is being truncated. + optional string subscription = 1; +} + +// Request for the DeleteSubscription method. +message DeleteSubscriptionRequest { + // The subscription to delete. + optional string subscription = 1; +} + +// Request for the ModifyPushConfig method. +message ModifyPushConfigRequest { + // The name of the subscription. + optional string subscription = 1; + + // An empty <code>push_config</code> indicates that the Pub/Sub system should + // pause pushing messages from the given subscription. + optional PushConfig push_config = 2; +} + +// ----------------------------------------------------------------------------- +// The protos used by a pull subscriber. +// ----------------------------------------------------------------------------- + +// Request for the Pull method. +message PullRequest { + // The subscription from which a message should be pulled. + optional string subscription = 1; + + // If this is specified as true the system will respond immediately even if + // it is not able to return a message in the Pull response. Otherwise the + // system is allowed to wait until at least one message is available rather + // than returning FAILED_PRECONDITION. The client may cancel the request if + // it does not wish to wait any longer for the response. + optional bool return_immediately = 2; +} + +// Either a <code>PubsubMessage</code> or a truncation event. One of these two +// must be populated. +message PullResponse { + // This ID must be used to acknowledge the received event or message. + optional string ack_id = 1; + + // A pubsub message or truncation event. + optional PubsubEvent pubsub_event = 2; +} + +// Request for the PullBatch method. +message PullBatchRequest { + // The subscription from which messages should be pulled. + optional string subscription = 1; + + // If this is specified as true the system will respond immediately even if + // it is not able to return a message in the Pull response. Otherwise the + // system is allowed to wait until at least one message is available rather + // than returning no messages. The client may cancel the request if it does + // not wish to wait any longer for the response. + optional bool return_immediately = 2; + + // The maximum number of PubsubEvents returned for this request. The Pub/Sub + // system may return fewer than the number of events specified. + optional int32 max_events = 3; +} + +// Response for the PullBatch method. +message PullBatchResponse { + + // Received Pub/Sub messages or status events. The Pub/Sub system will return + // zero messages if there are no more messages available in the backlog. The + // Pub/Sub system may return fewer than the max_events requested even if + // there are more messages available in the backlog. + repeated PullResponse pull_responses = 2; +} + +// Request for the ModifyAckDeadline method. +message ModifyAckDeadlineRequest { + // The name of the subscription from which messages are being pulled. + optional string subscription = 1; + + // The acknowledgment ID. + optional string ack_id = 2; + + // The new Ack deadline. Must be >= 0. + optional int32 ack_deadline_seconds = 3; +} + +// Request for the Acknowledge method. +message AcknowledgeRequest { + // The subscription whose message is being acknowledged. + optional string subscription = 1; + + // The acknowledgment ID for the message being acknowledged. This was + // returned by the Pub/Sub system in the Pull response. + repeated string ack_id = 2; +} + +// Request for the Nack method. +message NackRequest { + // The subscription whose message is being Nacked. + optional string subscription = 1; + + // The acknowledgment ID for the message being refused. This was returned by + // the Pub/Sub system in the Pull response. + repeated string ack_id = 2; +} + +// ----------------------------------------------------------------------------- +// The service and protos used by a push subscriber. +// ----------------------------------------------------------------------------- + +// The service that a subscriber uses to handle messages sent via push +// delivery. +// This service is not currently exported for HTTP clients. +// TODO(eschapira): Explain HTTP subscribers. +service PushEndpointService { + // Sends a <code>PubsubMessage</code> or a subscription status event to a + // push endpoint. + // The push endpoint responds with an empty message and a code from + // util/task/codes.proto. The following codes have a particular meaning to the + // Pub/Sub system: + // OK - This is interpreted by Pub/Sub as Ack. + // ABORTED - This is intepreted by Pub/Sub as a Nack, without implying + // pushback for congestion control. The Pub/Sub system will + // retry this message at a later time. + // UNAVAILABLE - This is intepreted by Pub/Sub as a Nack, with the additional + // semantics of push-back. The Pub/Sub system will use an AIMD + // congestion control algorithm to backoff the rate of sending + // messages from this subscription. + // Any other code, or a failure to respond, will be interpreted in the same + // way as ABORTED; i.e. the system will retry the message at a later time to + // ensure reliable delivery. + rpc HandlePubsubEvent(PubsubEvent) returns (proto2.Empty); +} |