diff options
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/client/client_context.cc | 2 | ||||
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 1 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 21 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 37 |
4 files changed, 35 insertions, 26 deletions
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 7bda2d07c3..5c2772f5df 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -72,7 +72,7 @@ system_clock::time_point ClientContext::absolute_deadline() { void ClientContext::AddMetadata(const grpc::string &meta_key, const grpc::string &meta_value) { - return; + metadata_.insert(std::make_pair(meta_key, meta_value)); } void ClientContext::StartCancel() {} diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc index 8598592068..73be3cff8c 100644 --- a/src/cpp/client/client_unary_call.cc +++ b/src/cpp/client/client_unary_call.cc @@ -48,6 +48,7 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, Call call(channel->CreateCall(method, context, &cq)); CallOpBuffer buf; Status status; + buf.AddSendInitialMetadata(context); buf.AddSendMessage(request); buf.AddRecvMessage(result); buf.AddClientSendClose(); diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 22fad2f439..607958df89 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -31,9 +31,10 @@ * */ -#include <include/grpc/support/alloc.h> -#include <include/grpc++/impl/call.h> -#include <include/grpc++/channel_interface.h> +#include <grpc/support/alloc.h> +#include <grpc++/impl/call.h> +#include <grpc++/client_context.h> +#include <grpc++/channel_interface.h> #include "src/cpp/proto/proto_utils.h" @@ -111,13 +112,13 @@ void FillMetadataMap(grpc_metadata_array* arr, void CallOpBuffer::AddSendInitialMetadata( std::multimap<grpc::string, grpc::string>* metadata) { + send_initial_metadata_ = true; initial_metadata_count_ = metadata->size(); initial_metadata_ = FillMetadata(metadata); } -void CallOpBuffer::AddRecvInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) { - recv_initial_metadata_ = metadata; +void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) { + AddSendInitialMetadata(&ctx->metadata_); } void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { @@ -147,7 +148,7 @@ void CallOpBuffer::AddServerSendStatus( void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { *nops = 0; - if (initial_metadata_count_) { + if (send_initial_metadata_) { ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA; ops[*nops].data.send_initial_metadata.count = initial_metadata_count_; ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; @@ -240,11 +241,11 @@ void CCallDeleter::operator()(grpc_call* c) { grpc_call_destroy(c); } -Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq) - : channel_(channel), cq_(cq), call_(call) {} +Call::Call(grpc_call* call, CallHook *call_hook, CompletionQueue* cq) + : call_hook_(call_hook), cq_(cq), call_(call) {} void Call::PerformOps(CallOpBuffer* buffer) { - channel_->PerformOpsOnCall(buffer, this); + call_hook_->PerformOpsOnCall(buffer, this); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 6d014a55f3..8974850b8c 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -56,9 +56,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, thread_pool_owned_(thread_pool_owned), secure_(creds != nullptr) { if (creds) { - server_ = grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr); + server_ = grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr); } else { - server_ = grpc_server_create(nullptr, nullptr); + server_ = grpc_server_create(cq_.cq(), nullptr); } } @@ -82,9 +82,6 @@ Server::~Server() { } bool Server::RegisterService(RpcService *service) { - if (!cq_sync_) { - cq_sync_.reset(new CompletionQueue); - } for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod *method = service->GetMethod(i); void *tag = grpc_server_register_method(server_, method->name(), nullptr); @@ -131,14 +128,14 @@ class Server::MethodRequestData final : public CompletionQueueTag { return mrd; } - void Request(grpc_server *server, CompletionQueue *cq) { + void Request(grpc_server *server) { GPR_ASSERT(!in_flight_); in_flight_ = true; cq_ = grpc_completion_queue_create(); GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call( server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq->cq(), + has_request_payload_ ? &request_payload_ : nullptr, cq_, this)); } @@ -146,9 +143,9 @@ class Server::MethodRequestData final : public CompletionQueueTag { class CallData { public: - explicit CallData(MethodRequestData *mrd) + explicit CallData(Server *server, MethodRequestData *mrd) : cq_(mrd->cq_), - call_(mrd->call_, nullptr, &cq_), + call_(mrd->call_, server, &cq_), ctx_(mrd->deadline_, mrd->request_metadata_.metadata, mrd->request_metadata_.count), has_request_payload_(mrd->has_request_payload_), @@ -170,7 +167,7 @@ class Server::MethodRequestData final : public CompletionQueueTag { } } if (has_response_payload_) { - req.reset(method_->AllocateResponseProto()); + res.reset(method_->AllocateResponseProto()); } auto status = method_->handler()->RunHandler( MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); @@ -212,9 +209,9 @@ bool Server::Start() { grpc_server_start(server_); // Start processing rpcs. - if (cq_sync_) { + if (!methods_.empty()) { for (auto &m : methods_) { - m.Request(server_, cq_sync_.get()); + m.Request(server_); } ScheduleCallback(); @@ -238,6 +235,16 @@ void Server::Shutdown() { } } +void Server::PerformOpsOnCall(CallOpBuffer *buf, Call *call) { + static const size_t MAX_OPS = 8; + size_t nops = MAX_OPS; + grpc_op ops[MAX_OPS]; + buf->FillOps(ops, &nops); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call->call(), ops, nops, + buf)); +} + void Server::ScheduleCallback() { { std::unique_lock<std::mutex> lock(mu_); @@ -249,12 +256,12 @@ void Server::ScheduleCallback() { void Server::RunRpc() { // Wait for one more incoming rpc. bool ok; - auto *mrd = MethodRequestData::Wait(cq_sync_.get(), &ok); + auto *mrd = MethodRequestData::Wait(&cq_, &ok); if (mrd) { - MethodRequestData::CallData cd(mrd); + MethodRequestData::CallData cd(this, mrd); if (ok) { - mrd->Request(server_, cq_sync_.get()); + mrd->Request(server_); ScheduleCallback(); cd.Run(); |