diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/cpp/qps/server_async.cc | 106 |
1 files changed, 65 insertions, 41 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 85a47ff71d..b1e393dd40 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -38,16 +38,16 @@ #include <thread> #include <gflags/gflags.h> -#include <grpc/grpc.h> -#include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> -#include <grpc/support/log.h> #include <grpc++/generic/async_generic_service.h> -#include <grpc++/support/config.h> +#include <grpc++/security/server_credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> #include <grpc++/server_context.h> -#include <grpc++/security/server_credentials.h> +#include <grpc++/support/config.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> #include <gtest/gtest.h> #include "test/cpp/qps/server.h" @@ -56,15 +56,25 @@ namespace grpc { namespace testing { -template <class RequestType, class ResponseType, class ServiceType, class ServerContextType> +template <class RequestType, class ResponseType, class ServiceType, + class ServerContextType> class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig &config, - std::function<void(ServerBuilder *, ServiceType *)> register_service, - std::function<void(ServiceType *, ServerContextType *, RequestType *, ServerAsyncResponseWriter<ResponseType>*, CompletionQueue *, ServerCompletionQueue *, void *)> request_unary_function, - std::function<void(ServiceType *, ServerContextType *, ServerAsyncReaderWriter<ResponseType, RequestType>*, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, - std::function<grpc::Status(const ServerConfig&, const RequestType *, ResponseType *)> process_rpc) - : Server(config) { + AsyncQpsServerTest( + const ServerConfig &config, + std::function<void(ServerBuilder *, ServiceType *)> register_service, + std::function<void(ServiceType *, ServerContextType *, RequestType *, + ServerAsyncResponseWriter<ResponseType> *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_unary_function, + std::function<void(ServiceType *, ServerContextType *, + ServerAsyncReaderWriter<ResponseType, RequestType> *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_streaming_function, + std::function<grpc::Status(const ServerConfig &, const RequestType *, + ResponseType *)> + process_rpc) + : Server(config) { char *server_address = NULL; gpr_join_host_port(&server_address, config.host().c_str(), port()); @@ -85,22 +95,23 @@ class AsyncQpsServerTest : public Server { using namespace std::placeholders; auto process_rpc_bound = std::bind(process_rpc, config, _1, _2); - + for (int i = 0; i < 10000 / config.async_server_threads(); i++) { for (int j = 0; j < config.async_server_threads(); j++) { - if (request_unary_function) { - auto request_unary = std::bind( - request_unary_function, &async_service_, - _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); - contexts_.push_front(new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); - } - if (request_streaming_function) { - auto request_streaming = std::bind( - request_streaming_function, - &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); - contexts_.push_front(new ServerRpcContextStreamingImpl( - request_streaming, process_rpc_bound)); - } + if (request_unary_function) { + auto request_unary = + std::bind(request_unary_function, &async_service_, _1, _2, _3, + srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + contexts_.push_front( + new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); + } + if (request_streaming_function) { + auto request_streaming = + std::bind(request_streaming_function, &async_service_, _1, _2, + srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front(new ServerRpcContextStreamingImpl( + request_streaming, process_rpc_bound)); + } } } @@ -173,7 +184,8 @@ class AsyncQpsServerTest : public Server { ServerRpcContextUnaryImpl( std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, - void *)> request_method, + void *)> + request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), @@ -231,9 +243,10 @@ class AsyncQpsServerTest : public Server { class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextStreamingImpl( - std::function<void(ServerContextType *, grpc::ServerAsyncReaderWriter< - ResponseType, RequestType> *, - void *)> request_method, + std::function<void( + ServerContextType *, + grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> + request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> invoke_method) : srv_ctx_(new ServerContextType), @@ -337,38 +350,49 @@ class AsyncQpsServerTest : public Server { }; static void RegisterBenchmarkService(ServerBuilder *builder, - BenchmarkService::AsyncService *service) { + BenchmarkService::AsyncService *service) { builder->RegisterAsyncService(service); } static void RegisterGenericService(ServerBuilder *builder, - grpc::AsyncGenericService *service) { + grpc::AsyncGenericService *service) { builder->RegisterAsyncGenericService(service); } -template<class RequestType, class ResponseType> +template <class RequestType, class ResponseType> Status ProcessRPC(const ServerConfig &config, const RequestType *request, - ResponseType *response) { + ResponseType *response) { if (request->response_size() > 0) { if (!Server::SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { + response->mutable_payload())) { return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } } return Status::OK; } -template<> +template <> Status ProcessRPC(const ServerConfig &config, const ByteBuffer *request, - ByteBuffer *response) { + ByteBuffer *response) { return Status::OK; } - std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr<Server>(new AsyncQpsServerTest<SimpleRequest,SimpleResponse,BenchmarkService::AsyncService,grpc::ServerContext>(config, RegisterBenchmarkService, &BenchmarkService::AsyncService::RequestUnaryCall, &BenchmarkService::AsyncService::RequestStreamingCall, ProcessRPC<SimpleRequest,SimpleResponse>)); + return std::unique_ptr<Server>( + new AsyncQpsServerTest<SimpleRequest, SimpleResponse, + BenchmarkService::AsyncService, + grpc::ServerContext>( + config, RegisterBenchmarkService, + &BenchmarkService::AsyncService::RequestUnaryCall, + &BenchmarkService::AsyncService::RequestStreamingCall, + ProcessRPC<SimpleRequest, SimpleResponse>)); } std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) { - return std::unique_ptr<Server>(new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,grpc::GenericServerContext>(config, RegisterGenericService, nullptr, &grpc::AsyncGenericService::RequestCall, ProcessRPC<ByteBuffer, ByteBuffer>)); + return std::unique_ptr<Server>( + new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService, + grpc::GenericServerContext>( + config, RegisterGenericService, nullptr, + &grpc::AsyncGenericService::RequestCall, + ProcessRPC<ByteBuffer, ByteBuffer>)); } } // namespace testing |