aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r--src/cpp/server/server.cc63
1 files changed, 63 insertions, 0 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index c08506c97f..bd97d707a7 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -422,6 +422,69 @@ void Server::RequestAsyncGenericCall(GenericServerContext* context,
}
#endif
+Server::BaseAsyncRequest::BaseAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+: server_(server), context_(context), stream_(stream), call_cq_(call_cq), call_(nullptr) {
+ memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
+}
+
+Server::BaseAsyncRequest::~BaseAsyncRequest() {
+}
+
+bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ if (*status) {
+ for (size_t i = 0; i < initial_metadata_array_.count; i++) {
+ context_->client_metadata_.insert(std::make_pair(
+ grpc::string(initial_metadata_array_.metadata[i].key),
+ grpc::string(
+ initial_metadata_array_.metadata[i].value,
+ initial_metadata_array_.metadata[i].value + initial_metadata_array_.metadata[i].value_length)));
+ }
+ }
+ context_->call_ = call_;
+ context_->cq_ = call_cq_;
+ Call call(call_, server_, call_cq_, server_->max_message_size_);
+ if (*status && call_) {
+ context_->BeginCompletionOp(&call);
+ }
+ // just the pointers inside call are copied here
+ stream_->BindCall(&call);
+ delete this;
+ return true;
+}
+
+Server::RegisteredAsyncRequest::RegisteredAsyncRequest(Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag) {}
+
+
+void Server::RegisteredAsyncRequest::IssueRequest(void* registered_method, grpc_byte_buffer** payload, ServerCompletionQueue *notification_cq) {
+ grpc_server_request_registered_call(
+ server_->server_, registered_method, &call_, &context_->deadline_, &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(), this);
+}
+
+Server::GenericAsyncRequest::GenericAsyncRequest(Server* server, GenericServerContext* context,
+ ServerAsyncStreamingInterface* stream,
+ CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq,
+ void* tag)
+: BaseAsyncRequest(server, context, stream, call_cq, tag) {
+ grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
+ grpc_server_request_call(server->server_, &call_, &call_details_, &initial_metadata_array_,
+ call_cq->cq(), notification_cq->cq(), this);
+}
+
+bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ // TODO(yangg) remove the copy here.
+ static_cast<GenericServerContext*>(context_)->method_ = call_details_.method;
+ static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
+ gpr_free(call_details_.method);
+ gpr_free(call_details_.host);
+ return BaseAsyncRequest::FinalizeResult(tag, status);
+}
+
void Server::ScheduleCallback() {
{
grpc::unique_lock<grpc::mutex> lock(mu_);