diff options
author | Yang Gao <yangg@google.com> | 2015-02-11 00:04:32 -0800 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-02-11 00:04:32 -0800 |
commit | d5a04bdc6e5ea6bc81ff15409381323c196b0e0f (patch) | |
tree | f2921aa2451083f54cf7694df561370e18f6cb68 /src/cpp/common/call.cc | |
parent | d608892313fff2983f2aba2602581bee7108c875 (diff) |
Implement FillOps
Diffstat (limited to 'src/cpp/common/call.cc')
-rw-r--r-- | src/cpp/common/call.cc | 95 |
1 files changed, 88 insertions, 7 deletions
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 37e374bad3..1aa79d4615 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -31,23 +31,64 @@ * */ +#include <include/grpc/support/alloc.h> #include <include/grpc++/impl/call.h> #include <include/grpc++/channel_interface.h> +#include "src/cpp/proto/proto_utils.h" + namespace grpc { void CallOpBuffer::Reset(void* next_return_tag) { return_tag_ = next_return_tag; - metadata_ = nullptr; + initial_metadata_count_ = 0; + if (initial_metadata_) { + gpr_free(initial_metadata_); + } send_message_ = nullptr; + if (write_buffer_) { + grpc_byte_buffer_destroy(write_buffer_); + write_buffer_ = nullptr; + } recv_message_ = nullptr; + if (recv_message_buf_) { + grpc_byte_buffer_destroy(recv_message_buf_); + recv_message_buf_ = nullptr; + } client_send_close_ = false; - status_ = false; + recv_status_ = nullptr; + status_code_ = GRPC_STATUS_OK; + if (status_details_) { + gpr_free(status_details_); + status_details_ = nullptr; + } + status_details_capacity_ = 0; +} + +namespace { +// TODO(yangg) if the map is changed before we send, the pointers will be a +// mess. Make sure it does not happen. +grpc_metadata* FillMetadata( + std::multimap<grpc::string, grpc::string>* metadata) { + if (metadata->empty()) { return nullptr; } + grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc( + metadata->size()* sizeof(grpc_metadata)); + size_t i = 0; + for (auto iter = metadata->cbegin(); + iter != metadata->cend(); + ++iter, ++i) { + metadata_array[i].key = iter->first.c_str(); + metadata_array[i].value = iter->second.c_str(); + metadata_array[i].value_length = iter->second.size(); + } + return metadata_array; } +} // namespace void CallOpBuffer::AddSendInitialMetadata( - std::multimap<igrpc::string, grpc::string>* metadata) { - metadata_ = metadata; + std::multimap<grpc::string, grpc::string>* metadata) { + initial_metadata_count_ = metadata->size(); + initial_metadata_ = FillMetadata(metadata); } void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) { @@ -59,16 +100,55 @@ void CallOpBuffer::AddRecvMessage(google::protobuf::Message *message) { } void CallOpBuffer::AddClientSendClose() { - client_sent_close_ = true; + client_send_close_ = true; } void CallOpBuffer::AddClientRecvStatus(Status *status) { - status_ = status; + recv_status_ = status; } -void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { +void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) { + *nops = 0; + if (initial_metadata_count_) { + ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[*nops].data.send_initial_metadata.count = initial_metadata_count_; + ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; + (*nops)++; + } + if (send_message_) { + bool success = SerializeProto(*send_message_, &write_buffer_); + if (!success) { + // TODO handle parse failure + } + ops[*nops].op = GRPC_OP_SEND_MESSAGE; + ops[*nops].data.send_message = write_buffer_; + (*nops)++; + } + if (recv_message_) { + ops[*nops].op = GRPC_OP_RECV_MESSAGE; + ops[*nops].data.recv_message = &recv_message_buf_; + (*nops)++; + } + if (client_send_close_) { + ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + (*nops)++; + } + if (recv_status_) { + ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + // ops[*nops].data.recv_status_on_client.trailing_metadata = + ops[*nops].data.recv_status_on_client.status = &status_code_; + ops[*nops].data.recv_status_on_client.status_details = &status_details_; + ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_; + (*nops)++; + } +} +void CallOpBuffer::ReleaseSendBuffer() { + if (write_buffer_) { + grpc_byte_buffer_destroy(write_buffer_); + write_buffer_ = nullptr; + } } void CallOpBuffer::FinalizeResult(void *tag, bool *status) { @@ -84,6 +164,7 @@ Call::Call(grpc_call* call, ChannelInterface* channel, CompletionQueue* cq) void Call::PerformOps(CallOpBuffer* buffer) { channel_->PerformOpsOnCall(buffer, this); + buffer->ReleaseSendBuffer(); } } // namespace grpc |