aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/client/client_context.cc2
-rw-r--r--src/cpp/client/client_unary_call.cc1
-rw-r--r--src/cpp/common/call.cc21
-rw-r--r--src/cpp/server/server.cc37
4 files changed, 35 insertions, 26 deletions
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 7bda2d07c3..5c2772f5df 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -72,7 +72,7 @@ system_clock::time_point ClientContext::absolute_deadline() {
void ClientContext::AddMetadata(const grpc::string &meta_key,
const grpc::string &meta_value) {
- return;
+ metadata_.insert(std::make_pair(meta_key, meta_value));
}
void ClientContext::StartCancel() {}
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
index 8598592068..73be3cff8c 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/src/cpp/client/client_unary_call.cc
@@ -48,6 +48,7 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
Call call(channel->CreateCall(method, context, &cq));
CallOpBuffer buf;
Status status;
+ buf.AddSendInitialMetadata(context);
buf.AddSendMessage(request);
buf.AddRecvMessage(result);
buf.AddClientSendClose();
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 22fad2f439..607958df89 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -31,9 +31,10 @@
*
*/
-#include <include/grpc/support/alloc.h>
-#include <include/grpc++/impl/call.h>
-#include <include/grpc++/channel_interface.h>
+#include <grpc/support/alloc.h>
+#include <grpc++/impl/call.h>
+#include <grpc++/client_context.h>
+#include <grpc++/channel_interface.h>
#include "src/cpp/proto/proto_utils.h"
@@ -111,13 +112,13 @@ void FillMetadataMap(grpc_metadata_array* arr,
void CallOpBuffer::AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string>* metadata) {
+ send_initial_metadata_ = true;
initial_metadata_count_ = metadata->size();
initial_metadata_ = FillMetadata(metadata);
}
-void CallOpBuffer::AddRecvInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) {
- recv_initial_metadata_ = metadata;
+void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) {
+ AddSendInitialMetadata(&ctx->metadata_);
}
void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) {
@@ -147,7 +148,7 @@ void CallOpBuffer::AddServerSendStatus(
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
*nops = 0;
- if (initial_metadata_count_) {
+ if (send_initial_metadata_) {
ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[*nops].data.send_initial_metadata.count = initial_metadata_count_;
ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
@@ -240,11 +241,11 @@ void CCallDeleter::operator()(grpc_call* c) {
grpc_call_destroy(c);
}
-Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq)
- : channel_(channel), cq_(cq), call_(call) {}
+Call::Call(grpc_call* call, CallHook *call_hook, CompletionQueue* cq)
+ : call_hook_(call_hook), cq_(cq), call_(call) {}
void Call::PerformOps(CallOpBuffer* buffer) {
- channel_->PerformOpsOnCall(buffer, this);
+ call_hook_->PerformOpsOnCall(buffer, this);
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 6d014a55f3..8974850b8c 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -56,9 +56,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned,
thread_pool_owned_(thread_pool_owned),
secure_(creds != nullptr) {
if (creds) {
- server_ = grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr);
+ server_ = grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr);
} else {
- server_ = grpc_server_create(nullptr, nullptr);
+ server_ = grpc_server_create(cq_.cq(), nullptr);
}
}
@@ -82,9 +82,6 @@ Server::~Server() {
}
bool Server::RegisterService(RpcService *service) {
- if (!cq_sync_) {
- cq_sync_.reset(new CompletionQueue);
- }
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod *method = service->GetMethod(i);
void *tag = grpc_server_register_method(server_, method->name(), nullptr);
@@ -131,14 +128,14 @@ class Server::MethodRequestData final : public CompletionQueueTag {
return mrd;
}
- void Request(grpc_server *server, CompletionQueue *cq) {
+ void Request(grpc_server *server) {
GPR_ASSERT(!in_flight_);
in_flight_ = true;
cq_ = grpc_completion_queue_create();
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq->cq(),
+ has_request_payload_ ? &request_payload_ : nullptr,
cq_, this));
}
@@ -146,9 +143,9 @@ class Server::MethodRequestData final : public CompletionQueueTag {
class CallData {
public:
- explicit CallData(MethodRequestData *mrd)
+ explicit CallData(Server *server, MethodRequestData *mrd)
: cq_(mrd->cq_),
- call_(mrd->call_, nullptr, &cq_),
+ call_(mrd->call_, server, &cq_),
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
@@ -170,7 +167,7 @@ class Server::MethodRequestData final : public CompletionQueueTag {
}
}
if (has_response_payload_) {
- req.reset(method_->AllocateResponseProto());
+ res.reset(method_->AllocateResponseProto());
}
auto status = method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
@@ -212,9 +209,9 @@ bool Server::Start() {
grpc_server_start(server_);
// Start processing rpcs.
- if (cq_sync_) {
+ if (!methods_.empty()) {
for (auto &m : methods_) {
- m.Request(server_, cq_sync_.get());
+ m.Request(server_);
}
ScheduleCallback();
@@ -238,6 +235,16 @@ void Server::Shutdown() {
}
}
+void Server::PerformOpsOnCall(CallOpBuffer *buf, Call *call) {
+ static const size_t MAX_OPS = 8;
+ size_t nops = MAX_OPS;
+ grpc_op ops[MAX_OPS];
+ buf->FillOps(ops, &nops);
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call->call(), ops, nops,
+ buf));
+}
+
void Server::ScheduleCallback() {
{
std::unique_lock<std::mutex> lock(mu_);
@@ -249,12 +256,12 @@ void Server::ScheduleCallback() {
void Server::RunRpc() {
// Wait for one more incoming rpc.
bool ok;
- auto *mrd = MethodRequestData::Wait(cq_sync_.get(), &ok);
+ auto *mrd = MethodRequestData::Wait(&cq_, &ok);
if (mrd) {
- MethodRequestData::CallData cd(mrd);
+ MethodRequestData::CallData cd(this, mrd);
if (ok) {
- mrd->Request(server_, cq_sync_.get());
+ mrd->Request(server_);
ScheduleCallback();
cd.Run();