diff options
-rw-r--r-- | include/grpc++/channel_interface.h | 4 | ||||
-rw-r--r-- | include/grpc++/client_context.h | 7 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 18 | ||||
-rw-r--r-- | include/grpc++/server.h | 11 | ||||
-rw-r--r-- | include/grpc/grpc.h | 2 | ||||
-rw-r--r-- | src/core/surface/server.c | 73 | ||||
-rw-r--r-- | src/cpp/client/client_context.cc | 2 | ||||
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 1 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 21 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 37 |
10 files changed, 102 insertions, 74 deletions
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h index 3631ea4d5d..b0366faabb 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel_interface.h @@ -35,6 +35,7 @@ #define __GRPCPP_CHANNEL_INTERFACE_H__ #include <grpc++/status.h> +#include <grpc++/impl/call.h> namespace google { namespace protobuf { @@ -52,13 +53,12 @@ class CompletionQueue; class RpcMethod; class CallInterface; -class ChannelInterface { +class ChannelInterface : public CallHook { public: virtual ~ChannelInterface() {} virtual Call CreateCall(const RpcMethod &method, ClientContext *context, CompletionQueue *cq) = 0; - virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; }; } // namespace grpc diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 0cf6bdc647..a6e8ccc67c 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -35,8 +35,8 @@ #define __GRPCPP_CLIENT_CONTEXT_H__ #include <chrono> +#include <map> #include <string> -#include <vector> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -49,6 +49,8 @@ struct grpc_completion_queue; namespace grpc { +class CallOpBuffer; + class ClientContext { public: ClientContext(); @@ -67,6 +69,7 @@ class ClientContext { ClientContext(const ClientContext &); ClientContext &operator=(const ClientContext &); + friend class CallOpBuffer; friend class Channel; friend class StreamContext; @@ -84,7 +87,7 @@ class ClientContext { grpc_call *call_; grpc_completion_queue *cq_; gpr_timespec absolute_deadline_; - std::vector<std::pair<grpc::string, grpc::string> > metadata_; + std::multimap<grpc::string, grpc::string> metadata_; }; } // namespace grpc diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 40939e458f..8fed305ac6 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -52,7 +52,7 @@ struct grpc_op; namespace grpc { -class ChannelInterface; +class Call; class CallOpBuffer final : public CompletionQueueTag { public: @@ -63,6 +63,7 @@ class CallOpBuffer final : public CompletionQueueTag { // Does not take ownership. void AddSendInitialMetadata( std::multimap<grpc::string, grpc::string> *metadata); + void AddSendInitialMetadata(ClientContext *ctx); void AddRecvInitialMetadata( std::multimap<grpc::string, grpc::string> *metadata); void AddSendMessage(const google::protobuf::Message &message); @@ -102,12 +103,12 @@ class CallOpBuffer final : public CompletionQueueTag { Status* recv_status_ = nullptr; grpc_metadata_array recv_trailing_metadata_arr_ = {0, 0, nullptr}; grpc_status_code status_code_ = GRPC_STATUS_OK; - char* status_details_ = nullptr; + char *status_details_ = nullptr; size_t status_details_capacity_ = 0; // Server send status Status* send_status_ = nullptr; size_t trailing_metadata_count_ = 0; - grpc_metadata* trailing_metadata_ = nullptr; + grpc_metadata *trailing_metadata_ = nullptr; }; class CCallDeleter { @@ -115,10 +116,17 @@ class CCallDeleter { void operator()(grpc_call *c); }; +// Channel and Server implement this to allow them to hook performing ops +class CallHook { + public: + virtual ~CallHook() {} + virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; +}; + // Straightforward wrapping of the C call object class Call final { public: - Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq); + Call(grpc_call *call, CallHook *call_hook_, CompletionQueue *cq); void PerformOps(CallOpBuffer *buffer); @@ -126,7 +134,7 @@ class Call final { CompletionQueue *cq() { return cq_; } private: - ChannelInterface *channel_; + CallHook *call_hook_; CompletionQueue *cq_; std::unique_ptr<grpc_call, CCallDeleter> call_; }; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index b02c4130d9..98f3f17197 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -41,6 +41,7 @@ #include <grpc++/completion_queue.h> #include <grpc++/config.h> +#include <grpc++/impl/call.h> #include <grpc++/status.h> struct grpc_server; @@ -59,7 +60,7 @@ class ServerCredentials; class ThreadPoolInterface; // Currently it only supports handling rpcs in a single thread. -class Server { +class Server final : private CallHook { public: ~Server(); @@ -72,7 +73,8 @@ class Server { class MethodRequestData; // ServerBuilder use only - Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds); + Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, + ServerCredentials* creds); Server(); // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance. @@ -86,9 +88,10 @@ class Server { void RunRpc(); void ScheduleCallback(); + void PerformOpsOnCall(CallOpBuffer* ops, Call* call) override; + // Completion queue. - std::unique_ptr<CompletionQueue> cq_sync_; - std::unique_ptr<CompletionQueue> cq_async_; + CompletionQueue cq_; // Sever status std::mutex mu_; diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 7733f8bb2a..561bc9a5a9 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -553,7 +553,6 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, - grpc_completion_queue *cq_when_rpc_available, grpc_completion_queue *cq_bound_to_call, void *tag_new); @@ -564,7 +563,6 @@ grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *cq_when_rpc_available, grpc_completion_queue *cq_bound_to_call, void *tag_new); /* Create a server */ diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 93994e6bdd..3f1c2add55 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -74,14 +74,12 @@ typedef struct { void *tag; union { struct { - grpc_completion_queue *cq_new; grpc_completion_queue *cq_bind; grpc_call **call; grpc_call_details *details; grpc_metadata_array *initial_metadata; } batch; struct { - grpc_completion_queue *cq_new; grpc_completion_queue *cq_bind; grpc_call **call; registered_method *registered_method; @@ -174,8 +172,6 @@ struct call_data { call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; - - grpc_completion_queue *cq_new; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -187,8 +183,7 @@ static void begin_call(grpc_server *server, call_data *calld, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc); -static int call_list_join(call_data **root, call_data *call, - call_list list) { +static int call_list_join(call_data **root, call_data *call, call_list list) { GPR_ASSERT(!call->root[list]); call->root[list] = root; if (!*root) { @@ -290,7 +285,10 @@ static void destroy_channel(channel_data *chand) { grpc_iomgr_add_callback(finish_destroy_channel, chand); } -static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, requested_call_array *array) { +static void finish_start_new_rpc_and_unlock(grpc_server *server, + grpc_call_element *elem, + call_data **pending_root, + requested_call_array *array) { requested_call rc; call_data *calld = elem->call_data; if (array->count == 0) { @@ -318,25 +316,32 @@ static void start_new_rpc(grpc_call_element *elem) { /* check for an exact match with host */ hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash); for (i = 0; i < chand->registered_method_max_probes; i++) { - rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; + rm = &chand->registered_methods[(hash + i) % + chand->registered_method_slots]; if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); + finish_start_new_rpc_and_unlock(server, elem, + &rm->server_registered_method->pending, + &rm->server_registered_method->requested); return; } /* check for a wildcard method definition (no host set) */ hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash); for (i = 0; i < chand->registered_method_max_probes; i++) { - rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots]; + rm = &chand->registered_methods[(hash + i) % + chand->registered_method_slots]; if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested); + finish_start_new_rpc_and_unlock(server, elem, + &rm->server_registered_method->pending, + &rm->server_registered_method->requested); return; } } - finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], &server->requested_calls); + finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], + &server->requested_calls); } static void kill_zombie(void *elem, int success) { @@ -682,9 +687,12 @@ grpc_transport_setup_result grpc_server_setup_transport( memset(chand->registered_methods, 0, alloc); for (rm = s->registered_methods; rm; rm = rm->next) { host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; - method = grpc_mdstr_from_string(mdctx, rm->host); + method = grpc_mdstr_from_string(mdctx, rm->method); hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); - for (probes = 0; chand->registered_methods[(hash + probes) % slots].server_registered_method != NULL; probes++); + for (probes = 0; chand->registered_methods[(hash + probes) % slots] + .server_registered_method != NULL; + probes++) + ; if (probes > max_probes) max_probes = probes; crm = &chand->registered_methods[(hash + probes) % slots]; crm->server_registered_method = rm; @@ -829,10 +837,12 @@ static grpc_call_error queue_call_request(grpc_server *server, switch (rc->type) { case LEGACY_CALL: case BATCH_CALL: - calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); + calld = + call_list_remove_head(&server->lists[PENDING_START], PENDING_START); break; case REGISTERED_CALL: - calld = call_list_remove_head(&rc->data.registered.registered_method->pending, PENDING_START); + calld = call_list_remove_head( + &rc->data.registered.registered_method->pending, PENDING_START); break; } if (calld) { @@ -851,13 +861,12 @@ static grpc_call_error queue_call_request(grpc_server *server, grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq_new, - grpc_completion_queue *cq_bind, void *tag) { + grpc_completion_queue *cq_bind, + void *tag) { requested_call rc; - grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; - rc.data.batch.cq_new = cq_new; rc.data.batch.cq_bind = cq_bind; rc.data.batch.call = call; rc.data.batch.details = details; @@ -868,13 +877,12 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, - grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_new, grpc_completion_queue *cq_bind, + grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind, void *tag) { requested_call rc; - grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; - rc.data.registered.cq_new = cq_new; rc.data.registered.cq_bind = cq_bind; rc.data.registered.call = call; rc.data.registered.registered_method = registered_method; @@ -896,7 +904,8 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag); -static void publish_was_not_set(grpc_call *call, grpc_op_error status, void *tag) { +static void publish_was_not_set(grpc_call *call, grpc_op_error status, + void *tag) { abort(); } @@ -942,7 +951,6 @@ static void begin_call(grpc_server *server, call_data *calld, r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; - calld->cq_new = rc->data.batch.cq_new; publish = publish_registered_or_batch; break; case REGISTERED_CALL: @@ -957,7 +965,6 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } - calld->cq_new = rc->data.registered.cq_new; publish = publish_registered_or_batch; break; } @@ -976,14 +983,14 @@ static void fail_call(grpc_server *server, requested_call *rc) { case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.batch.cq_new, rc->tag, NULL, do_nothing, - NULL, GRPC_OP_ERROR); + grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); break; case REGISTERED_CALL: *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.registered.cq_new, rc->tag, NULL, do_nothing, - NULL, GRPC_OP_ERROR); + grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); break; } } @@ -1011,9 +1018,9 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - call_data *calld = elem->call_data; - grpc_cq_end_op_complete(calld->cq_new, tag, call, - do_nothing, NULL, status); + channel_data *chand = elem->channel_data; + grpc_server *server = chand->server; + grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 7bda2d07c3..5c2772f5df 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -72,7 +72,7 @@ system_clock::time_point ClientContext::absolute_deadline() { void ClientContext::AddMetadata(const grpc::string &meta_key, const grpc::string &meta_value) { - return; + metadata_.insert(std::make_pair(meta_key, meta_value)); } void ClientContext::StartCancel() {} diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 8598592068..73be3cff8c 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -48,6 +48,7 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, Call call(channel->CreateCall(method, context, &cq)); CallOpBuffer buf; Status status; + buf.AddSendInitialMetadata(context); buf.AddSendMessage(request); buf.AddRecvMessage(result); buf.AddClientSendClose(); diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 22fad2f439..607958df89 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -31,9 +31,10 @@ * */ -#include <include/grpc/support/alloc.h> -#include <include/grpc++/impl/call.h> -#include <include/grpc++/channel_interface.h> +#include <grpc/support/alloc.h> +#include <grpc++/impl/call.h> +#include <grpc++/client_context.h> +#include <grpc++/channel_interface.h> #include "src/cpp/proto/proto_utils.h" @@ -111,13 +112,13 @@ void FillMetadataMap(grpc_metadata_array* arr, void CallOpBuffer::AddSendInitialMetadata( std::multimap<grpc::string, grpc::string>* metadata) { + send_initial_metadata_ = true; initial_metadata_count_ = metadata->size(); initial_metadata_ = FillMetadata(metadata); } -void CallOpBuffer::AddRecvInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) { - recv_initial_metadata_ = metadata; +void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) { + AddSendInitialMetadata(&ctx->metadata_); } void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { @@ -147,7 +148,7 @@ void CallOpBuffer::AddServerSendStatus( void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { *nops = 0; - if (initial_metadata_count_) { + if (send_initial_metadata_) { ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA; ops[*nops].data.send_initial_metadata.count = initial_metadata_count_; ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; @@ -240,11 +241,11 @@ void CCallDeleter::operator()(grpc_call* c) { grpc_call_destroy(c); } -Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq) - : channel_(channel), cq_(cq), call_(call) {} +Call::Call(grpc_call* call, CallHook *call_hook, CompletionQueue* cq) + : call_hook_(call_hook), cq_(cq), call_(call) {} void Call::PerformOps(CallOpBuffer* buffer) { - channel_->PerformOpsOnCall(buffer, this); + call_hook_->PerformOpsOnCall(buffer, this); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 6d014a55f3..8974850b8c 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -56,9 +56,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, thread_pool_owned_(thread_pool_owned), secure_(creds != nullptr) { if (creds) { - server_ = grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr); + server_ = grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr); } else { - server_ = grpc_server_create(nullptr, nullptr); + server_ = grpc_server_create(cq_.cq(), nullptr); } } @@ -82,9 +82,6 @@ Server::~Server() { } bool Server::RegisterService(RpcService *service) { - if (!cq_sync_) { - cq_sync_.reset(new CompletionQueue); - } for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod *method = service->GetMethod(i); void *tag = grpc_server_register_method(server_, method->name(), nullptr); @@ -131,14 +128,14 @@ class Server::MethodRequestData final : public CompletionQueueTag { return mrd; } - void Request(grpc_server *server, CompletionQueue *cq) { + void Request(grpc_server *server) { GPR_ASSERT(!in_flight_); in_flight_ = true; cq_ = grpc_completion_queue_create(); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq->cq(), + has_request_payload_ ? &request_payload_ : nullptr, cq_, this)); } @@ -146,9 +143,9 @@ class Server::MethodRequestData final : public CompletionQueueTag { class CallData { public: - explicit CallData(MethodRequestData *mrd) + explicit CallData(Server *server, MethodRequestData *mrd) : cq_(mrd->cq_), - call_(mrd->call_, nullptr, &cq_), + call_(mrd->call_, server, &cq_), ctx_(mrd->deadline_, mrd->request_metadata_.metadata, mrd->request_metadata_.count), has_request_payload_(mrd->has_request_payload_), @@ -170,7 +167,7 @@ class Server::MethodRequestData final : public CompletionQueueTag { } } if (has_response_payload_) { - req.reset(method_->AllocateResponseProto()); + res.reset(method_->AllocateResponseProto()); } auto status = method_->handler()->RunHandler( MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); @@ -212,9 +209,9 @@ bool Server::Start() { grpc_server_start(server_); // Start processing rpcs. - if (cq_sync_) { + if (!methods_.empty()) { for (auto &m : methods_) { - m.Request(server_, cq_sync_.get()); + m.Request(server_); } ScheduleCallback(); @@ -238,6 +235,16 @@ void Server::Shutdown() { } } +void Server::PerformOpsOnCall(CallOpBuffer *buf, Call *call) { + static const size_t MAX_OPS = 8; + size_t nops = MAX_OPS; + grpc_op ops[MAX_OPS]; + buf->FillOps(ops, &nops); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call->call(), ops, nops, + buf)); +} + void Server::ScheduleCallback() { { std::unique_lock<std::mutex> lock(mu_); @@ -249,12 +256,12 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. bool ok; - auto *mrd = MethodRequestData::Wait(cq_sync_.get(), &ok); + auto *mrd = MethodRequestData::Wait(&cq_, &ok); if (mrd) { - MethodRequestData::CallData cd(mrd); + MethodRequestData::CallData cd(this, mrd); if (ok) { - mrd->Request(server_, cq_sync_.get()); + mrd->Request(server_); ScheduleCallback(); cd.Run(); |