aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-02-11 13:44:11 -0800
committerGravatar Yang Gao <yangg@google.com>2015-02-11 13:44:11 -0800
commitf1258c4951f9880e6943558c310da9c5629ea6de (patch)
tree88f3facbb5cf841356bbdc9c6685d0817f708e97
parent06ed31e976369cdd0e9698786d10e8b2152e6af3 (diff)
save before the change
-rw-r--r--include/grpc++/impl/call.h19
-rw-r--r--src/cpp/common/call.cc89
2 files changed, 92 insertions, 16 deletions
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index 141b16ab5b..40939e458f 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -63,11 +63,15 @@ class CallOpBuffer final : public CompletionQueueTag {
// Does not take ownership.
void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
+ void AddRecvInitialMetadata(
+ std::multimap<grpc::string, grpc::string> *metadata);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
- void AddClientRecvStatus(Status *status);
- void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata, const Status& status);
+ void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata,
+ Status *status);
+ void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata,
+ const Status& status);
// INTERNAL API:
@@ -79,17 +83,28 @@ class CallOpBuffer final : public CompletionQueueTag {
private:
void *return_tag_ = nullptr;
+ // Send initial metadata
size_t initial_metadata_count_ = 0;
grpc_metadata* initial_metadata_ = nullptr;
+ // Recv initial metadta
+ std::multimap<grpc::string, grpc::string>* recv_initial_metadata_ = nullptr;
+ grpc_metadata_array recv_initial_metadata_arr_ = {0, 0, nullptr};
+ // Send message
const google::protobuf::Message* send_message_ = nullptr;
grpc_byte_buffer* send_message_buf_ = nullptr;
+ // Recv message
google::protobuf::Message* recv_message_ = nullptr;
grpc_byte_buffer* recv_message_buf_ = nullptr;
+ // Client send close
bool client_send_close_ = false;
+ // Client recv status
+ std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_ = nullptr;
Status* recv_status_ = nullptr;
+ grpc_metadata_array recv_trailing_metadata_arr_ = {0, 0, nullptr};
grpc_status_code status_code_ = GRPC_STATUS_OK;
char* status_details_ = nullptr;
size_t status_details_capacity_ = 0;
+ // Server send status
Status* send_status_ = nullptr;
size_t trailing_metadata_count_ = 0;
grpc_metadata* trailing_metadata_ = nullptr;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index b2cd55fe24..22fad2f439 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -41,34 +41,47 @@ namespace grpc {
void CallOpBuffer::Reset(void* next_return_tag) {
return_tag_ = next_return_tag;
+
initial_metadata_count_ = 0;
- if (initial_metadata_) {
- gpr_free(initial_metadata_);
- }
+ gpr_free(initial_metadata_);
+
+ recv_initial_metadata_ = nullptr;
+ gpr_free(recv_initial_metadata_arr_.metadata);
+ recv_initial_metadata_arr_ = {0, 0, nullptr};
+
send_message_ = nullptr;
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
send_message_buf_ = nullptr;
}
+
recv_message_ = nullptr;
if (recv_message_buf_) {
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
}
+
client_send_close_ = false;
+
+ recv_trailing_metadata_ = nullptr;
recv_status_ = nullptr;
+ gpr_free(recv_trailing_metadata_arr_.metadata);
+ recv_trailing_metadata_arr_ = {0, 0, nullptr};
+
status_code_ = GRPC_STATUS_OK;
- if (status_details_) {
- gpr_free(status_details_);
- status_details_ = nullptr;
- }
+ gpr_free(status_details_);
+ status_details_ = nullptr;
status_details_capacity_ = 0;
+
+ send_status_ = nullptr;
+ trailing_metadata_count_ = 0;
+ trailing_metadata_ = nullptr;
}
namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
-grpc_metadata* FillMetadata(
+grpc_metadata* FillMetadataArray(
std::multimap<grpc::string, grpc::string>* metadata) {
if (metadata->empty()) { return nullptr; }
grpc_metadata* metadata_array = (grpc_metadata*)gpr_malloc(
@@ -83,6 +96,17 @@ grpc_metadata* FillMetadata(
}
return metadata_array;
}
+
+void FillMetadataMap(grpc_metadata_array* arr,
+ std::multimap<grpc::string, grpc::string>* metadata) {
+ for (size_t i = 0; i < arr->count; i++) {
+ // TODO(yangg) handle duplicates?
+ metadata->insert(std::pair<grpc::string, grpc::string>(
+ arr->metadata[i].key, {arr->metadata[i].value, arr->metadata[i].value_length}));
+ }
+ grpc_metadata_array_destroy(arr);
+ grpc_metadata_array_init(&recv_trailing_metadata_arr_);
+}
} // namespace
void CallOpBuffer::AddSendInitialMetadata(
@@ -91,6 +115,11 @@ void CallOpBuffer::AddSendInitialMetadata(
initial_metadata_ = FillMetadata(metadata);
}
+void CallOpBuffer::AddRecvInitialMetadata(
+ std::multimap<grpc::string, grpc::string>* metadata) {
+ recv_initial_metadata_ = metadata;
+}
+
void CallOpBuffer::AddSendMessage(const google::protobuf::Message& message) {
send_message_ = &message;
}
@@ -103,13 +132,17 @@ void CallOpBuffer::AddClientSendClose() {
client_send_close_ = true;
}
-void CallOpBuffer::AddClientRecvStatus(Status *status) {
+void CallOpBuffer::AddClientRecvStatus(
+ std::multimap<grpc::string, grpc::string>* metadata, Status *status) {
+ recv_trailing_metadata_ = metadata;
recv_status_ = status;
}
-void CallOpBuffer::AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata,
- const Status& status) {
-
+void CallOpBuffer::AddServerSendStatus(
+ std::multimap<grpc::string, grpc::string>* metadata, const Status& status) {
+ trailing_metadata_count_ = metadata->size();
+ trailing_metadata_ = FillMetadata(metadata);
+ send_status_ = &status;
}
void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
@@ -120,6 +153,11 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
(*nops)++;
}
+ if (recv_initial_metadata_) {
+ ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA;
+ ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
+ (*nops)++;
+ }
if (send_message_) {
bool success = SerializeProto(*send_message_, &send_message_buf_);
if (!success) {
@@ -140,10 +178,24 @@ void CallOpBuffer::FillOps(grpc_op *ops, size_t *nops) {
}
if (recv_status_) {
ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- // TODO ops[*nops].data.recv_status_on_client.trailing_metadata =
+ ops[*nops].data.recv_status_on_client.trailing_metadata =
+ &recv_trailing_metadata_arr_;
ops[*nops].data.recv_status_on_client.status = &status_code_;
ops[*nops].data.recv_status_on_client.status_details = &status_details_;
- ops[*nops].data.recv_status_on_client.status_details_capacity = &status_details_capacity_;
+ ops[*nops].data.recv_status_on_client.status_details_capacity =
+ &status_details_capacity_;
+ (*nops)++;
+ }
+ if (send_status_) {
+ ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ ops[*nops].data.send_status_from_server.trailing_metadata_count =
+ trailing_metadata_count_;
+ ops[*nops].data.send_status_from_server.trailing_metadata =
+ trailing_metadata_;
+ ops[*nops].data.send_status_from_server.status =
+ static_cast<grpc_status_code>(send_status_->code());
+ ops[*nops].data.send_status_from_server.status_details =
+ send_status_->details().c_str();
(*nops)++;
}
}
@@ -158,8 +210,16 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
gpr_free(initial_metadata_);
initial_metadata_ = nullptr;
}
+ if (trailing_metadata_count_) {
+ gpr_free(trailing_metadata_);
+ trailing_metadata_ = nullptr;
+ }
// Set user-facing tag.
*tag = return_tag_;
+ // Process received initial metadata
+ if (recv_initial_metadata_) {
+ FillMetadataMap(&recv_initial_metadata_, recv_initial_metadata_);
+ }
// Parse received message if any.
if (recv_message_ && recv_message_buf_) {
*status = DeserializeProto(recv_message_buf_, recv_message_);
@@ -168,6 +228,7 @@ void CallOpBuffer::FinalizeResult(void **tag, bool *status) {
}
// Parse received status.
if (recv_status_) {
+ FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
*recv_status_ = Status(
static_cast<StatusCode>(status_code_),
status_details_ ? grpc::string(status_details_, status_details_capacity_)