aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/channel_interface.h4
-rw-r--r--include/grpc++/client_context.h7
-rw-r--r--include/grpc++/impl/call.h18
-rw-r--r--include/grpc++/server.h11
-rw-r--r--include/grpc/grpc.h2
-rw-r--r--src/core/surface/server.c73
-rw-r--r--src/cpp/client/client_context.cc2
-rw-r--r--src/cpp/client/client_unary_call.cc1
-rw-r--r--src/cpp/common/call.cc21
-rw-r--r--src/cpp/server/server.cc37
10 files changed, 102 insertions, 74 deletions
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h
index 3631ea4d5d..b0366faabb 100644
--- a/include/grpc++/channel_interface.h
+++ b/include/grpc++/channel_interface.h
@@ -35,6 +35,7 @@
#define __GRPCPP_CHANNEL_INTERFACE_H__
#include <grpc++/status.h>
+#include <grpc++/impl/call.h>
namespace google {
namespace protobuf {
@@ -52,13 +53,12 @@ class CompletionQueue;
class RpcMethod;
class CallInterface;
-class ChannelInterface {
+class ChannelInterface : public CallHook {
public:
virtual ~ChannelInterface() {}
virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
CompletionQueue *cq) = 0;
- virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
};
} // namespace grpc
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 0cf6bdc647..a6e8ccc67c 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -35,8 +35,8 @@
#define __GRPCPP_CLIENT_CONTEXT_H__
#include <chrono>
+#include <map>
#include <string>
-#include <vector>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -49,6 +49,8 @@ struct grpc_completion_queue;
namespace grpc {
+class CallOpBuffer;
+
class ClientContext {
public:
ClientContext();
@@ -67,6 +69,7 @@ class ClientContext {
ClientContext(const ClientContext &);
ClientContext &operator=(const ClientContext &);
+ friend class CallOpBuffer;
friend class Channel;
friend class StreamContext;
@@ -84,7 +87,7 @@ class ClientContext {
grpc_call *call_;
grpc_completion_queue *cq_;
gpr_timespec absolute_deadline_;
- std::vector<std::pair<grpc::string, grpc::string> > metadata_;
+ std::multimap<grpc::string, grpc::string> metadata_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index 40939e458f..8fed305ac6 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -52,7 +52,7 @@ struct grpc_op;
namespace grpc {
-class ChannelInterface;
+class Call;
class CallOpBuffer final : public CompletionQueueTag {
public:
@@ -63,6 +63,7 @@ class CallOpBuffer final : public CompletionQueueTag {
// Does not take ownership.
void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
+ void AddSendInitialMetadata(ClientContext *ctx);
void AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendMessage(const google::protobuf::Message &message);
@@ -102,12 +103,12 @@ class CallOpBuffer final : public CompletionQueueTag {
Status* recv_status_ = nullptr;
grpc_metadata_array recv_trailing_metadata_arr_ = {0, 0, nullptr};
grpc_status_code status_code_ = GRPC_STATUS_OK;
- char* status_details_ = nullptr;
+ char *status_details_ = nullptr;
size_t status_details_capacity_ = 0;
// Server send status
Status* send_status_ = nullptr;
size_t trailing_metadata_count_ = 0;
- grpc_metadata* trailing_metadata_ = nullptr;
+ grpc_metadata *trailing_metadata_ = nullptr;
};
class CCallDeleter {
@@ -115,10 +116,17 @@ class CCallDeleter {
void operator()(grpc_call *c);
};
+// Channel and Server implement this to allow them to hook performing ops
+class CallHook {
+ public:
+ virtual ~CallHook() {}
+ virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
+};
+
// Straightforward wrapping of the C call object
class Call final {
public:
- Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq);
+ Call(grpc_call *call, CallHook *call_hook_, CompletionQueue *cq);
void PerformOps(CallOpBuffer *buffer);
@@ -126,7 +134,7 @@ class Call final {
CompletionQueue *cq() { return cq_; }
private:
- ChannelInterface *channel_;
+ CallHook *call_hook_;
CompletionQueue *cq_;
std::unique_ptr<grpc_call, CCallDeleter> call_;
};
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index b02c4130d9..98f3f17197 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -41,6 +41,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
+#include <grpc++/impl/call.h>
#include <grpc++/status.h>
struct grpc_server;
@@ -59,7 +60,7 @@ class ServerCredentials;
class ThreadPoolInterface;
// Currently it only supports handling rpcs in a single thread.
-class Server {
+class Server final : private CallHook {
public:
~Server();
@@ -72,7 +73,8 @@ class Server {
class MethodRequestData;
// ServerBuilder use only
- Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds);
+ Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
+ ServerCredentials* creds);
Server();
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance.
@@ -86,9 +88,10 @@ class Server {
void RunRpc();
void ScheduleCallback();
+ void PerformOpsOnCall(CallOpBuffer* ops, Call* call) override;
+
// Completion queue.
- std::unique_ptr<CompletionQueue> cq_sync_;
- std::unique_ptr<CompletionQueue> cq_async_;
+ CompletionQueue cq_;
// Sever status
std::mutex mu_;
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 7733f8bb2a..561bc9a5a9 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -553,7 +553,6 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server,
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata,
- grpc_completion_queue *cq_when_rpc_available,
grpc_completion_queue *cq_bound_to_call,
void *tag_new);
@@ -564,7 +563,6 @@ grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata,
grpc_byte_buffer **optional_payload,
- grpc_completion_queue *cq_when_rpc_available,
grpc_completion_queue *cq_bound_to_call, void *tag_new);
/* Create a server */
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 93994e6bdd..3f1c2add55 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -74,14 +74,12 @@ typedef struct {
void *tag;
union {
struct {
- grpc_completion_queue *cq_new;
grpc_completion_queue *cq_bind;
grpc_call **call;
grpc_call_details *details;
grpc_metadata_array *initial_metadata;
} batch;
struct {
- grpc_completion_queue *cq_new;
grpc_completion_queue *cq_bind;
grpc_call **call;
registered_method *registered_method;
@@ -174,8 +172,6 @@ struct call_data {
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
-
- grpc_completion_queue *cq_new;
};
#define SERVER_FROM_CALL_ELEM(elem) \
@@ -187,8 +183,7 @@ static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
-static int call_list_join(call_data **root, call_data *call,
- call_list list) {
+static int call_list_join(call_data **root, call_data *call, call_list list) {
GPR_ASSERT(!call->root[list]);
call->root[list] = root;
if (!*root) {
@@ -290,7 +285,10 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(finish_destroy_channel, chand);
}
-static void finish_start_new_rpc_and_unlock(grpc_server *server, grpc_call_element *elem, call_data **pending_root, requested_call_array *array) {
+static void finish_start_new_rpc_and_unlock(grpc_server *server,
+ grpc_call_element *elem,
+ call_data **pending_root,
+ requested_call_array *array) {
requested_call rc;
call_data *calld = elem->call_data;
if (array->count == 0) {
@@ -318,25 +316,32 @@ static void start_new_rpc(grpc_call_element *elem) {
/* check for an exact match with host */
hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
for (i = 0; i < chand->registered_method_max_probes; i++) {
- rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots];
+ rm = &chand->registered_methods[(hash + i) %
+ chand->registered_method_slots];
if (!rm) break;
if (rm->host != calld->host) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested);
+ finish_start_new_rpc_and_unlock(server, elem,
+ &rm->server_registered_method->pending,
+ &rm->server_registered_method->requested);
return;
}
/* check for a wildcard method definition (no host set) */
hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
for (i = 0; i < chand->registered_method_max_probes; i++) {
- rm = &chand->registered_methods[(hash + i) % chand->registered_method_slots];
+ rm = &chand->registered_methods[(hash + i) %
+ chand->registered_method_slots];
if (!rm) break;
if (rm->host != NULL) continue;
if (rm->method != calld->path) continue;
- finish_start_new_rpc_and_unlock(server, elem, &rm->server_registered_method->pending, &rm->server_registered_method->requested);
+ finish_start_new_rpc_and_unlock(server, elem,
+ &rm->server_registered_method->pending,
+ &rm->server_registered_method->requested);
return;
}
}
- finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START], &server->requested_calls);
+ finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
+ &server->requested_calls);
}
static void kill_zombie(void *elem, int success) {
@@ -682,9 +687,12 @@ grpc_transport_setup_result grpc_server_setup_transport(
memset(chand->registered_methods, 0, alloc);
for (rm = s->registered_methods; rm; rm = rm->next) {
host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
- method = grpc_mdstr_from_string(mdctx, rm->host);
+ method = grpc_mdstr_from_string(mdctx, rm->method);
hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
- for (probes = 0; chand->registered_methods[(hash + probes) % slots].server_registered_method != NULL; probes++);
+ for (probes = 0; chand->registered_methods[(hash + probes) % slots]
+ .server_registered_method != NULL;
+ probes++)
+ ;
if (probes > max_probes) max_probes = probes;
crm = &chand->registered_methods[(hash + probes) % slots];
crm->server_registered_method = rm;
@@ -829,10 +837,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
switch (rc->type) {
case LEGACY_CALL:
case BATCH_CALL:
- calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
+ calld =
+ call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
break;
case REGISTERED_CALL:
- calld = call_list_remove_head(&rc->data.registered.registered_method->pending, PENDING_START);
+ calld = call_list_remove_head(
+ &rc->data.registered.registered_method->pending, PENDING_START);
break;
}
if (calld) {
@@ -851,13 +861,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_details *details,
grpc_metadata_array *initial_metadata,
- grpc_completion_queue *cq_new,
- grpc_completion_queue *cq_bind, void *tag) {
+ grpc_completion_queue *cq_bind,
+ void *tag) {
requested_call rc;
- grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
rc.type = BATCH_CALL;
rc.tag = tag;
- rc.data.batch.cq_new = cq_new;
rc.data.batch.cq_bind = cq_bind;
rc.data.batch.call = call;
rc.data.batch.details = details;
@@ -868,13 +877,12 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
- grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_new, grpc_completion_queue *cq_bind,
+ grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
void *tag) {
requested_call rc;
- grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
rc.type = REGISTERED_CALL;
rc.tag = tag;
- rc.data.registered.cq_new = cq_new;
rc.data.registered.cq_bind = cq_bind;
rc.data.registered.call = call;
rc.data.registered.registered_method = registered_method;
@@ -896,7 +904,8 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server,
static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag);
-static void publish_was_not_set(grpc_call *call, grpc_op_error status, void *tag) {
+static void publish_was_not_set(grpc_call *call, grpc_op_error status,
+ void *tag) {
abort();
}
@@ -942,7 +951,6 @@ static void begin_call(grpc_server *server, call_data *calld,
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
r++;
- calld->cq_new = rc->data.batch.cq_new;
publish = publish_registered_or_batch;
break;
case REGISTERED_CALL:
@@ -957,7 +965,6 @@ static void begin_call(grpc_server *server, call_data *calld,
r->data.recv_message = rc->data.registered.optional_payload;
r++;
}
- calld->cq_new = rc->data.registered.cq_new;
publish = publish_registered_or_batch;
break;
}
@@ -976,14 +983,14 @@ static void fail_call(grpc_server *server, requested_call *rc) {
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
- grpc_cq_end_op_complete(rc->data.batch.cq_new, rc->tag, NULL, do_nothing,
- NULL, GRPC_OP_ERROR);
+ grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
+ GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
- grpc_cq_end_op_complete(rc->data.registered.cq_new, rc->tag, NULL, do_nothing,
- NULL, GRPC_OP_ERROR);
+ grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
+ GRPC_OP_ERROR);
break;
}
}
@@ -1011,9 +1018,9 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- call_data *calld = elem->call_data;
- grpc_cq_end_op_complete(calld->cq_new, tag, call,
- do_nothing, NULL, status);
+ channel_data *chand = elem->channel_data;
+ grpc_server *server = chand->server;
+ grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 7bda2d07c3..5c2772f5df 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -72,7 +72,7 @@ system_clock::time_point ClientContext::absolute_deadline() {
void ClientContext::AddMetadata(const grpc::string &meta_key,
const grpc::string &meta_value) {
- return;
+ metadata_.insert(std::make_pair(meta_key, meta_value));
}
void ClientContext::StartCancel() {}
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
index 8598592068..73be3cff8c 100644
--- a/src/cpp/client/client_unary_call.cc
+++ b/src/cpp/client/client_unary_call.cc
@@ -48,6 +48,7 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
Call call(channel->CreateCall(method, context, &cq));
CallOpBuffer buf;
Status status;
+ buf.AddSendInitialMetadata(context);
buf.AddSendMessage(request);
buf.AddRecvMessage(result);
buf.AddClientSendClose();
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 22fad2f439..607958df89 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -31,9 +31,10 @@
*
*/
-#include <include/grpc/support/alloc.h>
-#include <include/grpc++/impl/call.h>
-#include <include/grpc++/channel_interface.h>
+#include <grpc/support/alloc.h>
+#include <grpc++/impl/call.h>
+#include <grpc++/client_context.h>
+#include <grpc++/channel_interface.h>
#include "src/cpp/proto/proto_utils.h"
@@ -111,13 +112,13 @@ void FillMetadataMap(grpc_metadata_array* arr,
void CallOpBuffer::AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string>* metadata) {
+ send_initial_metadata_ = true;
initial_metadata_count_ = metadata->size();
initial_metadata_ = FillMetadata(metadata);
}
-void CallOpBuffer::AddRecvInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) {
- recv_initial_metadata_ = metadata;
+void CallOpBuffer::AddSendInitialMetadata(ClientContext *ctx) {
+ AddSendInitialMetadata(&ctx->metadata_);
}
void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) {
@@ -147,7 +148,7 @@ void CallOpBuffer::AddServerSendStatus(
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
*nops = 0;
- if (initial_metadata_count_) {
+ if (send_initial_metadata_) {
ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[*nops].data.send_initial_metadata.count = initial_metadata_count_;
ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
@@ -240,11 +241,11 @@ void CCallDeleter::operator()(grpc_call* c) {
grpc_call_destroy(c);
}
-Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq)
- : channel_(channel), cq_(cq), call_(call) {}
+Call::Call(grpc_call* call, CallHook *call_hook, CompletionQueue* cq)
+ : call_hook_(call_hook), cq_(cq), call_(call) {}
void Call::PerformOps(CallOpBuffer* buffer) {
- channel_->PerformOpsOnCall(buffer, this);
+ call_hook_->PerformOpsOnCall(buffer, this);
}
} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 6d014a55f3..8974850b8c 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -56,9 +56,9 @@ Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned,
thread_pool_owned_(thread_pool_owned),
secure_(creds != nullptr) {
if (creds) {
- server_ = grpc_secure_server_create(creds->GetRawCreds(), nullptr, nullptr);
+ server_ = grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr);
} else {
- server_ = grpc_server_create(nullptr, nullptr);
+ server_ = grpc_server_create(cq_.cq(), nullptr);
}
}
@@ -82,9 +82,6 @@ Server::~Server() {
}
bool Server::RegisterService(RpcService *service) {
- if (!cq_sync_) {
- cq_sync_.reset(new CompletionQueue);
- }
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod *method = service->GetMethod(i);
void *tag = grpc_server_register_method(server_, method->name(), nullptr);
@@ -131,14 +128,14 @@ class Server::MethodRequestData final : public CompletionQueueTag {
return mrd;
}
- void Request(grpc_server *server, CompletionQueue *cq) {
+ void Request(grpc_server *server) {
GPR_ASSERT(!in_flight_);
in_flight_ = true;
cq_ = grpc_completion_queue_create();
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq->cq(),
+ has_request_payload_ ? &request_payload_ : nullptr,
cq_, this));
}
@@ -146,9 +143,9 @@ class Server::MethodRequestData final : public CompletionQueueTag {
class CallData {
public:
- explicit CallData(MethodRequestData *mrd)
+ explicit CallData(Server *server, MethodRequestData *mrd)
: cq_(mrd->cq_),
- call_(mrd->call_, nullptr, &cq_),
+ call_(mrd->call_, server, &cq_),
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
@@ -170,7 +167,7 @@ class Server::MethodRequestData final : public CompletionQueueTag {
}
}
if (has_response_payload_) {
- req.reset(method_->AllocateResponseProto());
+ res.reset(method_->AllocateResponseProto());
}
auto status = method_->handler()->RunHandler(
MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
@@ -212,9 +209,9 @@ bool Server::Start() {
grpc_server_start(server_);
// Start processing rpcs.
- if (cq_sync_) {
+ if (!methods_.empty()) {
for (auto &m : methods_) {
- m.Request(server_, cq_sync_.get());
+ m.Request(server_);
}
ScheduleCallback();
@@ -238,6 +235,16 @@ void Server::Shutdown() {
}
}
+void Server::PerformOpsOnCall(CallOpBuffer *buf, Call *call) {
+ static const size_t MAX_OPS = 8;
+ size_t nops = MAX_OPS;
+ grpc_op ops[MAX_OPS];
+ buf->FillOps(ops, &nops);
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call->call(), ops, nops,
+ buf));
+}
+
void Server::ScheduleCallback() {
{
std::unique_lock<std::mutex> lock(mu_);
@@ -249,12 +256,12 @@ void Server::ScheduleCallback() {
void Server::RunRpc() {
// Wait for one more incoming rpc.
bool ok;
- auto *mrd = MethodRequestData::Wait(cq_sync_.get(), &ok);
+ auto *mrd = MethodRequestData::Wait(&cq_, &ok);
if (mrd) {
- MethodRequestData::CallData cd(mrd);
+ MethodRequestData::CallData cd(this, mrd);
if (ok) {
- mrd->Request(server_, cq_sync_.get());
+ mrd->Request(server_);
ScheduleCallback();
cd.Run();