aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/anonymous_service.cc47
-rw-r--r--src/cpp/server/server.cc57
-rw-r--r--src/cpp/server/server_builder.cc16
3 files changed, 112 insertions, 8 deletions
diff --git a/src/cpp/server/anonymous_service.cc b/src/cpp/server/anonymous_service.cc
new file mode 100644
index 0000000000..ef20cade34
--- /dev/null
+++ b/src/cpp/server/anonymous_service.cc
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/anonymous_service.h>
+
+#include <grpc++/server.h>
+
+namespace grpc {
+
+void AnonymousService::RequestCall(AnonymousServerContext* ctx,
+ GenericServerReaderWriter* reader_writer,
+ CompletionQueue* cq, void* tag) {
+ server_->RequestAsyncAnonymousCall(ctx, reader_writer, cq, tag);
+}
+
+} // namespace grpc
+
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index e69032a657..d8425f1dfc 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -37,6 +37,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
+#include <grpc++/anonymous_service.h>
#include <grpc++/completion_queue.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
@@ -226,6 +227,12 @@ bool Server::RegisterAsyncService(AsynchronousService* service) {
return true;
}
+void Server::RegisterAnonymousService(AnonymousService* service) {
+ GPR_ASSERT(service->server_ == nullptr &&
+ "Can only register an anonymous service against one server.");
+ service->server_ = this;
+}
+
int Server::AddPort(const grpc::string& addr, ServerCredentials* creds) {
GPR_ASSERT(!started_);
return creds->AddPortToServer(addr, server_);
@@ -289,15 +296,36 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
stream_(stream),
cq_(cq),
ctx_(ctx),
+ anonymous_ctx_(nullptr),
server_(server),
call_(nullptr),
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
+ grpc_call_details_init(&call_details_);
grpc_server_request_registered_call(
- server->server_, registered_method, &call_, &deadline_, &array_,
- request ? &payload_ : nullptr, cq->cq(), this);
+ server->server_, registered_method, &call_, &call_details_.deadline,
+ &array_, request ? &payload_ : nullptr, cq->cq(), this);
+ }
+
+ AsyncRequest(Server* server, AnonymousServerContext* ctx,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
+ void* tag)
+ : tag_(tag),
+ request_(nullptr),
+ stream_(stream),
+ cq_(cq),
+ ctx_(nullptr),
+ anonymous_ctx_(ctx),
+ server_(server),
+ call_(nullptr),
+ payload_(nullptr) {
+ memset(&array_, 0, sizeof(array_));
+ grpc_call_details_init(&call_details_);
+ grpc_server_request_call(
+ server->server_, &call_, &call_details_, &array_, cq->cq(), this);
}
+
~AsyncRequest() {
if (payload_) {
grpc_byte_buffer_destroy(payload_);
@@ -315,20 +343,28 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
*status = false;
}
}
+ ServerContext* ctx = ctx_ ? ctx_ : anonymous_ctx_;
+ GPR_ASSERT(ctx);
if (*status) {
- ctx_->deadline_ = Timespec2Timepoint(deadline_);
+ ctx->deadline_ = Timespec2Timepoint(call_details_.deadline);
for (size_t i = 0; i < array_.count; i++) {
- ctx_->client_metadata_.insert(std::make_pair(
+ ctx->client_metadata_.insert(std::make_pair(
grpc::string(array_.metadata[i].key),
grpc::string(
array_.metadata[i].value,
array_.metadata[i].value + array_.metadata[i].value_length)));
}
+ if (anonymous_ctx_) {
+ anonymous_ctx_->method_.assign(call_details_.method,
+ call_details_.method_capacity);
+ anonymous_ctx_->host_.assign(call_details_.host,
+ call_details_.host_capacity);
+ }
}
- ctx_->call_ = call_;
+ ctx->call_ = call_;
Call call(call_, server_, cq_);
if (orig_status && call_) {
- ctx_->BeginCompletionOp(&call);
+ ctx->BeginCompletionOp(&call);
}
// just the pointers inside call are copied here
stream_->BindCall(&call);
@@ -342,9 +378,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const cq_;
ServerContext* const ctx_;
+ AnonymousServerContext* const anonymous_ctx_;
Server* const server_;
grpc_call* call_;
- gpr_timespec deadline_;
+ grpc_call_details call_details_;
grpc_metadata_array array_;
grpc_byte_buffer* payload_;
};
@@ -356,6 +393,12 @@ void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
new AsyncRequest(this, registered_method, context, request, stream, cq, tag);
}
+void Server::RequestAsyncAnonymousCall(AnonymousServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* cq, void* tag) {
+ new AsyncRequest(this, context, stream, cq, tag);
+}
+
void Server::ScheduleCallback() {
{
std::unique_lock<std::mutex> lock(mu_);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 5de592334d..e3b9cdfd7f 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -41,7 +41,8 @@
namespace grpc {
-ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {}
+ServerBuilder::ServerBuilder()
+ : anonymous_service_(nullptr), thread_pool_(nullptr) {}
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service());
@@ -51,6 +52,16 @@ void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
async_services_.push_back(service);
}
+void ServerBuilder::RegisterAnonymousService(AnonymousService* service) {
+ if (anonymous_service_) {
+ gpr_log(GPR_ERROR,
+ "Adding multiple AnonymousService is unsupported for now. "
+ "Dropping the service %p", service);
+ return;
+ }
+ anonymous_service_ = service;
+}
+
void ServerBuilder::AddPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
@@ -84,6 +95,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
+ if (anonymous_service_) {
+ server->RegisterAnonymousService(anonymous_service_);
+ }
for (auto& port : ports_) {
int r = server->AddPort(port.addr, port.creds.get());
if (!r) return nullptr;