diff options
Diffstat (limited to 'src/cpp/common/call.cc')
-rw-r--r-- | src/cpp/common/call.cc | 307 |
1 files changed, 16 insertions, 291 deletions
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index edce6396bd..0a5c976e01 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -39,107 +39,32 @@ #include <grpc++/channel_interface.h> #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" namespace grpc { -CallOpBuffer::CallOpBuffer() - : return_tag_(this), - send_initial_metadata_(false), - initial_metadata_count_(0), - initial_metadata_(nullptr), - recv_initial_metadata_(nullptr), - send_message_(nullptr), - send_message_buffer_(nullptr), - send_buf_(nullptr), - recv_message_(nullptr), - recv_message_buffer_(nullptr), - recv_buf_(nullptr), - max_message_size_(-1), - client_send_close_(false), - recv_trailing_metadata_(nullptr), - recv_status_(nullptr), - status_code_(GRPC_STATUS_OK), - status_details_(nullptr), - status_details_capacity_(0), - send_status_available_(false), - send_status_code_(GRPC_STATUS_OK), - trailing_metadata_count_(0), - trailing_metadata_(nullptr), - cancelled_buf_(0), - recv_closed_(nullptr) { - memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_)); - memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_)); - recv_trailing_metadata_arr_.metadata = nullptr; - recv_initial_metadata_arr_.metadata = nullptr; -} - -void CallOpBuffer::Reset(void* next_return_tag) { - return_tag_ = next_return_tag; - - send_initial_metadata_ = false; - initial_metadata_count_ = 0; - gpr_free(initial_metadata_); - - recv_initial_metadata_ = nullptr; - recv_initial_metadata_arr_.count = 0; - - if (send_buf_ && send_message_) { - grpc_byte_buffer_destroy(send_buf_); - } - send_message_ = nullptr; - send_message_buffer_ = nullptr; - send_buf_ = nullptr; - - got_message = false; - if (recv_buf_ && recv_message_) { - grpc_byte_buffer_destroy(recv_buf_); - } - recv_message_ = nullptr; - recv_message_buffer_ = nullptr; - recv_buf_ = nullptr; - - client_send_close_ = false; - - recv_trailing_metadata_ = nullptr; - recv_status_ = nullptr; - recv_trailing_metadata_arr_.count = 0; - - status_code_ = GRPC_STATUS_OK; - - send_status_available_ = false; - send_status_code_ = GRPC_STATUS_OK; - send_status_details_.clear(); - trailing_metadata_count_ = 0; - trailing_metadata_ = nullptr; - - recv_closed_ = nullptr; -} - -CallOpBuffer::~CallOpBuffer() { - gpr_free(status_details_); - gpr_free(recv_initial_metadata_arr_.metadata); - gpr_free(recv_trailing_metadata_arr_.metadata); - if (recv_buf_ && recv_message_) { - grpc_byte_buffer_destroy(recv_buf_); - } - if (send_buf_ && send_message_) { - grpc_byte_buffer_destroy(send_buf_); +void FillMetadataMap(grpc_metadata_array* arr, + std::multimap<grpc::string, grpc::string>* metadata) { + for (size_t i = 0; i < arr->count; i++) { + // TODO(yangg) handle duplicates? + metadata->insert(std::pair<grpc::string, grpc::string>( + arr->metadata[i].key, + grpc::string(arr->metadata[i].value, arr->metadata[i].value_length))); } + grpc_metadata_array_destroy(arr); + grpc_metadata_array_init(arr); } -namespace { // TODO(yangg) if the map is changed before we send, the pointers will be a // mess. Make sure it does not happen. grpc_metadata* FillMetadataArray( - std::multimap<grpc::string, grpc::string>* metadata) { - if (metadata->empty()) { + const std::multimap<grpc::string, grpc::string>& metadata) { + if (metadata.empty()) { return nullptr; } grpc_metadata* metadata_array = - (grpc_metadata*)gpr_malloc(metadata->size() * sizeof(grpc_metadata)); + (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata)); size_t i = 0; - for (auto iter = metadata->cbegin(); iter != metadata->cend(); ++iter, ++i) { + for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) { metadata_array[i].key = iter->first.c_str(); metadata_array[i].value = iter->second.c_str(); metadata_array[i].value_length = iter->second.size(); @@ -147,206 +72,6 @@ grpc_metadata* FillMetadataArray( return metadata_array; } -void FillMetadataMap(grpc_metadata_array* arr, - std::multimap<grpc::string, grpc::string>* metadata) { - for (size_t i = 0; i < arr->count; i++) { - // TODO(yangg) handle duplicates? - metadata->insert(std::pair<grpc::string, grpc::string>( - arr->metadata[i].key, - grpc::string(arr->metadata[i].value, arr->metadata[i].value_length))); - } - grpc_metadata_array_destroy(arr); - grpc_metadata_array_init(arr); -} -} // namespace - -void CallOpBuffer::AddSendInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) { - send_initial_metadata_ = true; - initial_metadata_count_ = metadata->size(); - initial_metadata_ = FillMetadataArray(metadata); -} - -void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) { - ctx->initial_metadata_received_ = true; - recv_initial_metadata_ = &ctx->recv_initial_metadata_; -} - -void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) { - AddSendInitialMetadata(&ctx->send_initial_metadata_); -} - -void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) { - send_message_ = &message; -} - -void CallOpBuffer::AddSendMessage(const ByteBuffer& message) { - send_message_buffer_ = &message; -} - -void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) { - recv_message_ = message; - recv_message_->Clear(); -} - -void CallOpBuffer::AddRecvMessage(ByteBuffer* message) { - recv_message_buffer_ = message; - recv_message_buffer_->Clear(); -} - -void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; } - -void CallOpBuffer::AddServerRecvClose(bool* cancelled) { - recv_closed_ = cancelled; -} - -void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) { - recv_trailing_metadata_ = &context->trailing_metadata_; - recv_status_ = status; -} - -void CallOpBuffer::AddServerSendStatus( - std::multimap<grpc::string, grpc::string>* metadata, const Status& status) { - if (metadata != NULL) { - trailing_metadata_count_ = metadata->size(); - trailing_metadata_ = FillMetadataArray(metadata); - } else { - trailing_metadata_count_ = 0; - } - send_status_available_ = true; - send_status_code_ = static_cast<grpc_status_code>(status.error_code()); - send_status_details_ = status.error_message(); -} - -void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { - *nops = 0; - if (send_initial_metadata_) { - ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[*nops].data.send_initial_metadata.count = initial_metadata_count_; - ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; - ops[*nops].flags = 0; - (*nops)++; - } - if (recv_initial_metadata_) { - ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_; - ops[*nops].flags = 0; - (*nops)++; - } - if (send_message_ || send_message_buffer_) { - if (send_message_) { - GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_SERIALIZE, 0); - bool success = SerializeProto(*send_message_, &send_buf_); - if (!success) { - abort(); - // TODO handle parse failure - } - GRPC_TIMER_END(GRPC_PTAG_PROTO_SERIALIZE, 0); - } else { - send_buf_ = send_message_buffer_->buffer(); - } - ops[*nops].op = GRPC_OP_SEND_MESSAGE; - ops[*nops].data.send_message = send_buf_; - ops[*nops].flags = 0; - (*nops)++; - } - if (recv_message_ || recv_message_buffer_) { - ops[*nops].op = GRPC_OP_RECV_MESSAGE; - ops[*nops].data.recv_message = &recv_buf_; - ops[*nops].flags = 0; - (*nops)++; - } - if (client_send_close_) { - ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - ops[*nops].flags = 0; - (*nops)++; - } - if (recv_status_) { - ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[*nops].data.recv_status_on_client.trailing_metadata = - &recv_trailing_metadata_arr_; - ops[*nops].data.recv_status_on_client.status = &status_code_; - ops[*nops].data.recv_status_on_client.status_details = &status_details_; - ops[*nops].data.recv_status_on_client.status_details_capacity = - &status_details_capacity_; - ops[*nops].flags = 0; - (*nops)++; - } - if (send_status_available_) { - ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER; - ops[*nops].data.send_status_from_server.trailing_metadata_count = - trailing_metadata_count_; - ops[*nops].data.send_status_from_server.trailing_metadata = - trailing_metadata_; - ops[*nops].data.send_status_from_server.status = send_status_code_; - ops[*nops].data.send_status_from_server.status_details = - send_status_details_.empty() ? nullptr : send_status_details_.c_str(); - ops[*nops].flags = 0; - (*nops)++; - } - if (recv_closed_) { - ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_; - ops[*nops].flags = 0; - (*nops)++; - } -} - -bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { - // Release send buffers. - if (send_buf_ && send_message_) { - if (send_message_) { - grpc_byte_buffer_destroy(send_buf_); - } - send_buf_ = nullptr; - } - if (initial_metadata_) { - gpr_free(initial_metadata_); - initial_metadata_ = nullptr; - } - if (trailing_metadata_count_) { - gpr_free(trailing_metadata_); - trailing_metadata_ = nullptr; - } - // Set user-facing tag. - *tag = return_tag_; - // Process received initial metadata - if (recv_initial_metadata_) { - FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); - } - // Parse received message if any. - if (recv_message_ || recv_message_buffer_) { - if (recv_buf_) { - got_message = *status; - if (recv_message_) { - GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, 0); - *status = *status && - DeserializeProto(recv_buf_, recv_message_, max_message_size_); - grpc_byte_buffer_destroy(recv_buf_); - GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, 0); - } else { - recv_message_buffer_->set_buffer(recv_buf_); - } - recv_buf_ = nullptr; - } else { - // Read failed - got_message = false; - *status = false; - } - } - // Parse received status. - if (recv_status_) { - FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_); - *recv_status_ = Status( - static_cast<StatusCode>(status_code_), - status_details_ ? grpc::string(status_details_) : grpc::string()); - } - if (recv_closed_) { - *recv_closed_ = cancelled_buf_ != 0; - } - return true; -} - Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {} @@ -357,11 +82,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, call_(call), max_message_size_(max_message_size) {} -void Call::PerformOps(CallOpBuffer* buffer) { +void Call::PerformOps(CallOpSetInterface* ops) { if (max_message_size_ > 0) { - buffer->set_max_message_size(max_message_size_); + ops->set_max_message_size(max_message_size_); } - call_hook_->PerformOpsOnCall(buffer, this); + call_hook_->PerformOpsOnCall(ops, this); } } // namespace grpc |