aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-07 16:08:00 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-07-07 16:08:00 -0700
commit822d2c7bebf9810903bdc63e24dc26eb2640f756 (patch)
treec84efa15bccbb5afddbf8e76fb1dcc19f2326eae
parent772187cdf0ff9dfafd2e693474c51eeddfe4c800 (diff)
Support registering services against specific hosts
-rw-r--r--include/grpc++/server.h4
-rw-r--r--include/grpc++/server_builder.h26
-rw-r--r--src/cpp/client/channel.cc2
-rw-r--r--src/cpp/server/server.cc9
-rw-r--r--src/cpp/server/server_builder.cc18
-rw-r--r--test/cpp/end2end/end2end_test.cc27
-rw-r--r--test/cpp/util/messages.proto1
7 files changed, 72 insertions, 15 deletions
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 6a9e757e77..94ee0b6a4a 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -84,8 +84,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
int max_message_size);
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance.
- bool RegisterService(RpcService* service);
- bool RegisterAsyncService(AsynchronousService* service);
+ bool RegisterService(const grpc::string *host, RpcService* service);
+ bool RegisterAsyncService(const grpc::string *host, AsynchronousService* service);
void RegisterAsyncGenericService(AsyncGenericService* service);
// Add a listening port. Can be called multiple times.
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index ecee475e3e..2003d18ef5 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -69,6 +69,19 @@ class ServerBuilder {
// Register a generic service.
void RegisterAsyncGenericService(AsyncGenericService* service);
+ // Register a service. This call does not take ownership of the service.
+ // The service must exist for the lifetime of the Server instance returned by
+ // BuildAndStart().
+ void RegisterService(const grpc::string& host,
+ SynchronousService* service);
+
+ // Register an asynchronous service. New calls will be delevered to cq.
+ // This call does not take ownership of the service or completion queue.
+ // The service and completion queuemust exist for the lifetime of the Server
+ // instance returned by BuildAndStart().
+ void RegisterAsyncService(const grpc::string& host,
+ AsynchronousService* service);
+
// Set max message size in bytes.
void SetMaxMessageSize(int max_message_size) {
max_message_size_ = max_message_size;
@@ -98,9 +111,18 @@ class ServerBuilder {
int* selected_port;
};
+ typedef std::unique_ptr<grpc::string> HostString;
+ template <class T> struct NamedService {
+ explicit NamedService(T* s) : service(s) {}
+ explicit NamedService(const grpc::string& h, T *s)
+ : host(new grpc::string(h)), service(s) {}
+ HostString host;
+ T* service;
+ };
+
int max_message_size_;
- std::vector<RpcService*> services_;
- std::vector<AsynchronousService*> async_services_;
+ std::vector<NamedService<RpcService>> services_;
+ std::vector<NamedService<AsynchronousService>> async_services_;
std::vector<Port> ports_;
std::vector<ServerCompletionQueue*> cqs_;
std::shared_ptr<ServerCredentials> creds_;
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 72593f877e..406811d47f 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -59,7 +59,7 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
auto c_call =
- method.channel_tag()
+ method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(c_channel_, cq->cq(),
method.channel_tag(),
context->raw_deadline())
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 1437b2dea7..f9d20ff579 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -207,10 +207,11 @@ Server::~Server() {
delete sync_methods_;
}
-bool Server::RegisterService(RpcService* service) {
+bool Server::RegisterService(const grpc::string *host, RpcService* service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod* method = service->GetMethod(i);
- void* tag = grpc_server_register_method(server_, method->name(), nullptr);
+ void* tag = grpc_server_register_method(
+ server_, method->name(), host ? host->c_str() : nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
@@ -222,14 +223,14 @@ bool Server::RegisterService(RpcService* service) {
return true;
}
-bool Server::RegisterAsyncService(AsynchronousService* service) {
+bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->server_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
- nullptr);
+ host ? host->c_str() : nullptr);
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
service->method_names_[i]);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 3ee1d54e76..fa158c919e 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -51,11 +51,21 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
}
void ServerBuilder::RegisterService(SynchronousService* service) {
- services_.push_back(service->service());
+ services_.emplace_back(service->service());
}
void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
- async_services_.push_back(service);
+ async_services_.emplace_back(service);
+}
+
+void ServerBuilder::RegisterService(
+ const grpc::string& addr, SynchronousService* service) {
+ services_.emplace_back(addr, service->service());
+}
+
+void ServerBuilder::RegisterAsyncService(
+ const grpc::string& addr, AsynchronousService* service) {
+ async_services_.emplace_back(addr, service);
}
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
@@ -97,13 +107,13 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
}
for (auto service = services_.begin(); service != services_.end();
service++) {
- if (!server->RegisterService(*service)) {
+ if (!server->RegisterService(service->host.get(), service->service)) {
return nullptr;
}
}
for (auto service = async_services_.begin();
service != async_services_.end(); service++) {
- if (!server->RegisterAsyncService(*service)) {
+ if (!server->RegisterAsyncService(service->host.get(), service->service)) {
return nullptr;
}
}
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 5e850ea30a..ce12cc8488 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -87,12 +87,16 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
public:
- TestServiceImpl() : signal_client_(false) {}
+ TestServiceImpl() : signal_client_(false), host_(nullptr) {}
+ explicit TestServiceImpl(const grpc::string& host) : signal_client_(false), host_(new grpc::string(host)) {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) GRPC_OVERRIDE {
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
+ if (host_) {
+ response->mutable_param()->set_host(*host_);
+ }
if (request->has_param() && request->param().client_cancel_after_us()) {
{
std::unique_lock<std::mutex> lock(mu_);
@@ -191,6 +195,7 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
private:
bool signal_client_;
std::mutex mu_;
+ std::unique_ptr<grpc::string> host_;
};
class TestServiceImplDupPkg
@@ -205,7 +210,7 @@ class TestServiceImplDupPkg
class End2endTest : public ::testing::Test {
protected:
- End2endTest() : kMaxMessageSize_(8192), thread_pool_(2) {}
+ End2endTest() : kMaxMessageSize_(8192), special_service_("special"), thread_pool_(2) {}
void SetUp() GRPC_OVERRIDE {
int port = grpc_pick_unused_port_or_die();
@@ -215,6 +220,7 @@ class End2endTest : public ::testing::Test {
builder.AddListeningPort(server_address_.str(),
FakeTransportSecurityServerCredentials());
builder.RegisterService(&service_);
+ builder.RegisterService("special", &special_service_);
builder.SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
builder.RegisterService(&dup_pkg_service_);
@@ -236,6 +242,7 @@ class End2endTest : public ::testing::Test {
std::ostringstream server_address_;
const int kMaxMessageSize_;
TestServiceImpl service_;
+ TestServiceImpl special_service_;
TestServiceImplDupPkg dup_pkg_service_;
ThreadPool thread_pool_;
};
@@ -254,6 +261,22 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub,
}
}
+TEST_F(End2endTest, SimpleRpcWithHost) {
+ ResetStub();
+
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Hello");
+
+ ClientContext context;
+ context.set_authority("special");
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(response.has_param());
+ EXPECT_EQ(response.param().host(), "special");
+ EXPECT_TRUE(s.ok());
+}
+
TEST_F(End2endTest, SimpleRpc) {
ResetStub();
SendRpc(stub_.get(), 1);
diff --git a/test/cpp/util/messages.proto b/test/cpp/util/messages.proto
index 062f66c091..dc8572cc9c 100644
--- a/test/cpp/util/messages.proto
+++ b/test/cpp/util/messages.proto
@@ -46,6 +46,7 @@ message EchoRequest {
message ResponseParams {
optional int64 request_deadline = 1;
+ optional string host = 2;
}
message EchoResponse {