diff options
-rw-r--r-- | include/grpc++/impl/call.h | 14 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 95 |
2 files changed, 100 insertions, 9 deletions
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index 91f3f58443..edc6555b0c 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -34,6 +34,7 @@ #ifndef __GRPCPP_CALL_H__ #define __GRPCPP_CALL_H__ +#include <grpc/grpc.h> #include <grpc++/status.h> #include <grpc++/completion_queue.h> @@ -72,16 +73,25 @@ class CallOpBuffer final : public CompletionQueueTag { // Convert to an array of grpc_op elements void FillOps(grpc_op *ops, size_t *nops); + // Release send buffers. + void ReleaseSendBuffer(); + // Called by completion queue just prior to returning from Next() or Pluck() void FinalizeResult(void *tag, bool *status) override; private: void *return_tag_ = nullptr; - std::multimap<grpc::string, grpc::string>* metadata_ = nullptr; + size_t initial_metadata_count_ = 0; + grpc_metadata* initial_metadata_ = nullptr; const google::protobuf::Message* send_message_ = nullptr; + grpc_byte_buffer* write_buffer_ = nullptr; google::protobuf::Message* recv_message_ = nullptr; + grpc_byte_buffer* recv_message_buf_ = nullptr; bool client_send_close_ = false; - Status* status_ = nullptr; + Status* recv_status_ = nullptr; + grpc_status_code status_code_ = GRPC_STATUS_OK; + char* status_details_ = nullptr; + size_t status_details_capacity_ = 0; }; class CCallDeleter { diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 37e374bad3..1aa79d4615 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -31,23 +31,64 @@ * */ +#include <include/grpc/support/alloc.h> #include <include/grpc++/impl/call.h> #include <include/grpc++/channel_interface.h> +#include "src/cpp/proto/proto_utils.h" + namespace grpc { void CallOpBuffer::Reset(void* next_return_tag) { return_tag_ = next_return_tag; - metadata_ = nullptr; + initial_metadata_count_ = 0; + if (initial_metadata_) { + gpr_free(initial_metadata_); + } send_message_ = nullptr; + if (write_buffer_) { + grpc_byte_buffer_destroy(write_buffer_); + write_buffer_ = nullptr; + } recv_message_ = nullptr; + if (recv_message_buf_) { + grpc_byte_buffer_destroy(recv_message_buf_); + recv_message_buf_ = nullptr; + } client_send_close_ = false; - status_ = false; + recv_status_ = nullptr; + status_code_ = GRPC_STATUS_OK; + if (status_details_) { + gpr_free(status_details_); + status_details_ = nullptr; + } + status_details_capacity_ = 0; +} + +namespace { +// TODO(yangg) if the map is changed before we send, the pointers will be a +// mess. Make sure it does not happen. +grpc_metadata* FillMetadata( + std::multimap<grpc::string, grpc::string>* metadata) { + if (metadata->empty()) { return nullptr; } + grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc( + metadata->size()* sizeof(grpc_metadata)); + size_t i = 0; + for (auto iter = metadata->cbegin(); + iter != metadata->cend(); + ++iter, ++i) { + metadata_array[i].key = iter->first.c_str(); + metadata_array[i].value = iter->second.c_str(); + metadata_array[i].value_length = iter->second.size(); + } + return metadata_array; } +} // namespace void CallOpBuffer::AddSendInitialMetadata( - std::multimap<igrpc::string, grpc::string>* metadata) { - metadata_ = metadata; + std::multimap<grpc::string, grpc::string>* metadata) { + initial_metadata_count_ = metadata->size(); + initial_metadata_ = FillMetadata(metadata); } void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { @@ -59,16 +100,55 @@ void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message) { } void CallOpBuffer::AddClientSendClose() { - client_sent_close_ = true; + client_send_close_ = true; } void CallOpBuffer::AddClientRecvStatus(Status *status) { - status_ = status; + recv_status_ = status; } -void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { +void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { + *nops = 0; + if (initial_metadata_count_) { + ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[*nops].data.send_initial_metadata.count = initial_metadata_count_; + ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; + (*nops)++; + } + if (send_message_) { + bool success = SerializeProto(*send_message_, &write_buffer_); + if (!success) { + // TODO handle parse failure + } + ops[*nops].op = GRPC_OP_SEND_MESSAGE; + ops[*nops].data.send_message = write_buffer_; + (*nops)++; + } + if (recv_message_) { + ops[*nops].op = GRPC_OP_RECV_MESSAGE; + ops[*nops].data.recv_message = &recv_message_buf_; + (*nops)++; + } + if (client_send_close_) { + ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + (*nops)++; + } + if (recv_status_) { + ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + // ops[*nops].data.recv_status_on_client.trailing_metadata = + ops[*nops].data.recv_status_on_client.status = &status_code_; + ops[*nops].data.recv_status_on_client.status_details = &status_details_; + ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_; + (*nops)++; + } +} +void CallOpBuffer::ReleaseSendBuffer() { + if (write_buffer_) { + grpc_byte_buffer_destroy(write_buffer_); + write_buffer_ = nullptr; + } } void CallOpBuffer::FinalizeResult(void *tag, bool *status) { @@ -84,6 +164,7 @@ Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq) void Call::PerformOps(CallOpBuffer* buffer) { channel_->PerformOpsOnCall(buffer, this); + buffer->ReleaseSendBuffer(); } } // namespace grpc |