diff options
Diffstat (limited to 'src/cpp/server/server.cc')
-rw-r--r-- | src/cpp/server/server.cc | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index c08506c97f..bd97d707a7 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -422,6 +422,69 @@ void Server::RequestAsyncGenericCall(GenericServerContext* context, } #endif +Server::BaseAsyncRequest::BaseAsyncRequest(Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) +: server_(server), context_(context), stream_(stream), call_cq_(call_cq), call_(nullptr) { + memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); +} + +Server::BaseAsyncRequest::~BaseAsyncRequest() { +} + +bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { + if (*status) { + for (size_t i = 0; i < initial_metadata_array_.count; i++) { + context_->client_metadata_.insert(std::make_pair( + grpc::string(initial_metadata_array_.metadata[i].key), + grpc::string( + initial_metadata_array_.metadata[i].value, + initial_metadata_array_.metadata[i].value + initial_metadata_array_.metadata[i].value_length))); + } + } + context_->call_ = call_; + context_->cq_ = call_cq_; + Call call(call_, server_, call_cq_, server_->max_message_size_); + if (*status && call_) { + context_->BeginCompletionOp(&call); + } + // just the pointers inside call are copied here + stream_->BindCall(&call); + delete this; + return true; +} + +Server::RegisteredAsyncRequest::RegisteredAsyncRequest(Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + : BaseAsyncRequest(server, context, stream, call_cq, tag) {} + + +void Server::RegisteredAsyncRequest::IssueRequest(void* registered_method, grpc_byte_buffer** payload, ServerCompletionQueue *notification_cq) { + grpc_server_request_registered_call( + server_->server_, registered_method, &call_, &context_->deadline_, &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(), this); +} + +Server::GenericAsyncRequest::GenericAsyncRequest(Server* server, GenericServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, + void* tag) +: BaseAsyncRequest(server, context, stream, call_cq, tag) { + grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); + grpc_server_request_call(server->server_, &call_, &call_details_, &initial_metadata_array_, + call_cq->cq(), notification_cq->cq(), this); +} + +bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { + // TODO(yangg) remove the copy here. + static_cast<GenericServerContext*>(context_)->method_ = call_details_.method; + static_cast<GenericServerContext*>(context_)->host_ = call_details_.host; + gpr_free(call_details_.method); + gpr_free(call_details_.host); + return BaseAsyncRequest::FinalizeResult(tag, status); +} + void Server::ScheduleCallback() { { grpc::unique_lock<grpc::mutex> lock(mu_); |