diff options
author | Yang Gao <yangg@google.com> | 2015-03-05 16:39:25 -0800 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-03-05 16:39:25 -0800 |
commit | 1c40233814db12cca53857241c7314b8ef14ea54 (patch) | |
tree | e341e1c18fdcdc38d016c5d37c705885b9c2e26c /src/cpp/server | |
parent | c79a57c3a9ffea6f5a1cbea7b061d204a11281db (diff) |
first sets of changes, it builds
Diffstat (limited to 'src/cpp/server')
-rw-r--r-- | src/cpp/server/anonymous_service.cc | 47 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 50 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 13 |
3 files changed, 103 insertions, 7 deletions
diff --git a/src/cpp/server/anonymous_service.cc b/src/cpp/server/anonymous_service.cc new file mode 100644 index 0000000000..ef20cade34 --- /dev/null +++ b/src/cpp/server/anonymous_service.cc @@ -0,0 +1,47 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc++/anonymous_service.h> + +#include <grpc++/server.h> + +namespace grpc { + +void AnonymousService::RequestCall(AnonymousServerContext* ctx, + GenericServerReaderWriter* reader_writer, + CompletionQueue* cq, void* tag) { + server_->RequestAsyncAnonymousCall(ctx, reader_writer, cq, tag); +} + +} // namespace grpc + diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 2a5a7fe5eb..4de1ad290e 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -37,6 +37,7 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpc/support/log.h> +#include <grpc++/anonymous_service.h> #include <grpc++/completion_queue.h> #include <grpc++/impl/rpc_service_method.h> #include <grpc++/impl/service_type.h> @@ -239,6 +240,12 @@ bool Server::RegisterAsyncService(AsynchronousService* service) { return true; } +void Server::RegisterAnonymousService(AnonymousService* service) { + GPR_ASSERT(service->server_ == nullptr && + "Can only register an anonymous service against one server."); + service->server_ = this; +} + int Server::AddPort(const grpc::string& addr) { GPR_ASSERT(!started_); if (secure_) { @@ -306,15 +313,36 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { stream_(stream), cq_(cq), ctx_(ctx), + anonymous_ctx_(nullptr), server_(server), call_(nullptr), payload_(nullptr) { memset(&array_, 0, sizeof(array_)); + grpc_call_details_init(&call_details_); grpc_server_request_registered_call( - server->server_, registered_method, &call_, &deadline_, &array_, - request ? &payload_ : nullptr, cq->cq(), this); + server->server_, registered_method, &call_, &call_details_.deadline, + &array_, request ? &payload_ : nullptr, cq->cq(), this); + } + + AsyncRequest(Server* server, AnonymousServerContext* ctx, + ServerAsyncStreamingInterface* stream, CompletionQueue* cq, + void* tag) + : tag_(tag), + request_(nullptr), + stream_(stream), + cq_(cq), + ctx_(nullptr), + anonymous_ctx_(ctx), + server_(server), + call_(nullptr), + payload_(nullptr) { + memset(&array_, 0, sizeof(array_)); + grpc_call_details_init(&call_details_); + grpc_server_request_call( + server->server_, &call_, &call_details_, &array_, cq->cq(), this); } + ~AsyncRequest() { if (payload_) { grpc_byte_buffer_destroy(payload_); @@ -332,20 +360,22 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { *status = false; } } + ServerContext* ctx = ctx_ ? ctx_ : anonymous_ctx_; + GPR_ASSERT(ctx); if (*status) { - ctx_->deadline_ = Timespec2Timepoint(deadline_); + ctx->deadline_ = Timespec2Timepoint(call_details_.deadline); for (size_t i = 0; i < array_.count; i++) { - ctx_->client_metadata_.insert(std::make_pair( + ctx->client_metadata_.insert(std::make_pair( grpc::string(array_.metadata[i].key), grpc::string( array_.metadata[i].value, array_.metadata[i].value + array_.metadata[i].value_length))); } } - ctx_->call_ = call_; + ctx->call_ = call_; Call call(call_, server_, cq_); if (orig_status && call_) { - ctx_->BeginCompletionOp(&call); + ctx->BeginCompletionOp(&call); } // just the pointers inside call are copied here stream_->BindCall(&call); @@ -359,9 +389,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { ServerAsyncStreamingInterface* const stream_; CompletionQueue* const cq_; ServerContext* const ctx_; + AnonymousServerContext* const anonymous_ctx_; Server* const server_; grpc_call* call_; - gpr_timespec deadline_; + grpc_call_details call_details_; grpc_metadata_array array_; grpc_byte_buffer* payload_; }; @@ -372,6 +403,11 @@ void Server::RequestAsyncCall(void* registered_method, ServerContext* context, CompletionQueue* cq, void* tag) { new AsyncRequest(this, registered_method, context, request, stream, cq, tag); } +void Server::RequestAsyncAnonymousCall(AnonymousServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* cq, void* tag) { + new AsyncRequest(this, context, stream, cq, tag); +} void Server::ScheduleCallback() { { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index ae60f3d8b6..cc6f8ca9e5 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -51,6 +51,16 @@ void ServerBuilder::RegisterAsyncService(AsynchronousService* service) { async_services_.push_back(service); } +void ServerBuilder::RegisterAnonymousService(AnonymousService* service) { + if (anonymous_service_) { + gpr_log(GPR_ERROR, + "Adding multiple AnonymousService is unsupported for now. " + "Dropping the service %p", service); + return; + } + anonymous_service_ = service; +} + void ServerBuilder::AddPort(const grpc::string& addr) { ports_.push_back(addr); } @@ -89,6 +99,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { return nullptr; } } + if (anonymous_service_) { + server->RegisterAnonymousService(anonymous_service_); + } for (auto& port : ports_) { if (!server->AddPort(port)) { return nullptr; |