From cc8c27950ce9e931a55b5380210144b0625f7f03 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 15 Oct 2018 12:10:55 -0700 Subject: Change types to use C++ types rather than core types --- include/grpcpp/impl/codegen/async_stream.h | 34 +++---- include/grpcpp/impl/codegen/async_unary_call.h | 15 +-- include/grpcpp/impl/codegen/call.h | 112 +++++++++++----------- include/grpcpp/impl/codegen/client_callback.h | 2 +- include/grpcpp/impl/codegen/client_unary_call.h | 2 +- include/grpcpp/impl/codegen/interceptor.h | 38 ++++---- include/grpcpp/impl/codegen/method_handler_impl.h | 20 ++-- include/grpcpp/impl/codegen/sync_stream.h | 20 ++-- src/cpp/server/server_context.cc | 2 +- 9 files changed, 126 insertions(+), 119 deletions(-) diff --git a/include/grpcpp/impl/codegen/async_stream.h b/include/grpcpp/impl/codegen/async_stream.h index 6e58fd0eef..bfb2df4f23 100644 --- a/include/grpcpp/impl/codegen/async_stream.h +++ b/include/grpcpp/impl/codegen/async_stream.h @@ -276,7 +276,7 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface { } void StartCallInternal(void* tag) { - init_ops_.SendInitialMetadata(context_->send_initial_metadata_, + init_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); init_ops_.set_output_tag(tag); call_.PerformOps(&init_ops_); @@ -441,7 +441,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface { } void StartCallInternal(void* tag) { - write_ops_.SendInitialMetadata(context_->send_initial_metadata_, + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); // if corked bit is set in context, we just keep the initial metadata // buffered up to coalesce with later message send. No op is performed. @@ -612,7 +612,7 @@ class ClientAsyncReaderWriter final } void StartCallInternal(void* tag) { - write_ops_.SendInitialMetadata(context_->send_initial_metadata_, + write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); // if corked bit is set in context, we just keep the initial metadata // buffered up to coalesce with later message send. No op is performed. @@ -710,7 +710,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); @@ -739,7 +739,7 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { void Finish(const W& msg, const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); @@ -748,10 +748,10 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { } // The response is dropped if the status is not OK. if (status.ok()) { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, finish_ops_.SendMessage(msg)); } else { - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); } call_.PerformOps(&finish_ops_); } @@ -769,14 +769,14 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface { GPR_CODEGEN_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_ops_.SendInitialMetadata(ctx_->initial_metadata_, + finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_ops_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -859,7 +859,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); @@ -904,7 +904,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { EnsureInitialMetadataSent(&write_ops_); options.set_buffer_hint(); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); - write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&write_ops_); } @@ -922,7 +922,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { void Finish(const Status& status, void* tag) override { finish_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&finish_ops_); - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -932,7 +932,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface { template void EnsureInitialMetadataSent(T* ops) { if (!ctx_->sent_initial_metadata_) { - ops->SendInitialMetadata(ctx_->initial_metadata_, + ops->SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { ops->set_compression_level(ctx_->compression_level()); @@ -1025,7 +1025,7 @@ class ServerAsyncReaderWriter final GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); - meta_ops_.SendInitialMetadata(ctx_->initial_metadata_, + meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_ops_.set_compression_level(ctx_->compression_level()); @@ -1075,7 +1075,7 @@ class ServerAsyncReaderWriter final EnsureInitialMetadataSent(&write_ops_); options.set_buffer_hint(); GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); - write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&write_ops_); } @@ -1094,7 +1094,7 @@ class ServerAsyncReaderWriter final finish_ops_.set_output_tag(tag); EnsureInitialMetadataSent(&finish_ops_); - finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_ops_); } @@ -1106,7 +1106,7 @@ class ServerAsyncReaderWriter final template void EnsureInitialMetadataSent(T* ops) { if (!ctx_->sent_initial_metadata_) { - ops->SendInitialMetadata(ctx_->initial_metadata_, + ops->SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { ops->set_compression_level(ctx_->compression_level()); diff --git a/include/grpcpp/impl/codegen/async_unary_call.h b/include/grpcpp/impl/codegen/async_unary_call.h index 60ff8e2f05..744b128141 100644 --- a/include/grpcpp/impl/codegen/async_unary_call.h +++ b/include/grpcpp/impl/codegen/async_unary_call.h @@ -174,7 +174,7 @@ class ClientAsyncResponseReader final } void StartCallInternal() { - single_buf.SendInitialMetadata(context_->send_initial_metadata_, + single_buf.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); } @@ -214,7 +214,7 @@ class ServerAsyncResponseWriter final GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_buf_.set_output_tag(tag); - meta_buf_.SendInitialMetadata(ctx_->initial_metadata_, + meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { meta_buf_.set_compression_level(ctx_->compression_level()); @@ -240,8 +240,9 @@ class ServerAsyncResponseWriter final /// metadata. void Finish(const W& msg, const Status& status, void* tag) { finish_buf_.set_output_tag(tag); + finish_buf_.set_cq_tag(&finish_buf_); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, + finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_buf_.set_compression_level(ctx_->compression_level()); @@ -250,10 +251,10 @@ class ServerAsyncResponseWriter final } // The response is dropped if the status is not OK. if (status.ok()) { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, finish_buf_.SendMessage(msg)); } else { - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); } call_.PerformOps(&finish_buf_); } @@ -274,14 +275,14 @@ class ServerAsyncResponseWriter final GPR_CODEGEN_ASSERT(!status.ok()); finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.SendInitialMetadata(ctx_->initial_metadata_, + finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { finish_buf_.set_compression_level(ctx_->compression_level()); } ctx_->sent_initial_metadata_ = true; } - finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); + finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index 684148efb6..2fcf306c8d 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -45,7 +45,6 @@ namespace grpc { -class ByteBuffer; class CompletionQueue; extern CoreCodegenInterface* g_core_codegen_interface; @@ -226,14 +225,12 @@ class CallOpSendInitialMetadata { maybe_compression_level_.is_set = false; } - void SendInitialMetadata( - const std::multimap& metadata, - uint32_t flags) { + void SendInitialMetadata(std::multimap* metadata, + uint32_t flags) { maybe_compression_level_.is_set = false; send_ = true; flags_ = flags; - initial_metadata_ = - FillMetadataArray(metadata, &initial_metadata_count_, ""); + metadata_map_ = metadata; } void set_compression_level(grpc_compression_level level) { @@ -248,6 +245,8 @@ class CallOpSendInitialMetadata { op->op = GRPC_OP_SEND_INITIAL_METADATA; op->flags = flags_; op->reserved = NULL; + initial_metadata_ = + FillMetadataArray(*metadata_map_, &initial_metadata_count_, ""); op->data.send_initial_metadata.count = initial_metadata_count_; op->data.send_initial_metadata.metadata = initial_metadata_; op->data.send_initial_metadata.maybe_compression_level.is_set = @@ -268,8 +267,7 @@ class CallOpSendInitialMetadata { if (!send_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA); - interceptor_methods->SetSendInitialMetadata(initial_metadata_, - &initial_metadata_count_); + interceptor_methods->SetSendInitialMetadata(metadata_map_); } void SetFinishInterceptionHookPoint( @@ -284,6 +282,7 @@ class CallOpSendInitialMetadata { bool send_; uint32_t flags_; size_t initial_metadata_count_; + std::multimap* metadata_map_; grpc_metadata* initial_metadata_; struct { bool is_set; @@ -322,7 +321,7 @@ class CallOpSendMessage { if (!send_buf_.Valid()) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); - interceptor_methods->SetSendMessage(send_buf_.c_buffer()); + interceptor_methods->SetSendMessage(&send_buf_); } void SetFinishInterceptionHookPoint( @@ -569,11 +568,10 @@ class CallOpServerSendStatus { CallOpServerSendStatus() : send_status_available_(false) {} void ServerSendStatus( - const std::multimap& trailing_metadata, + std::multimap* trailing_metadata, const Status& status) { send_error_details_ = status.error_details(); - trailing_metadata_ = FillMetadataArray( - trailing_metadata, &trailing_metadata_count_, send_error_details_); + metadata_map_ = trailing_metadata; send_status_available_ = true; send_status_code_ = static_cast(status.error_code()); send_error_message_ = status.error_message(); @@ -582,6 +580,8 @@ class CallOpServerSendStatus { protected: void AddOp(grpc_op* ops, size_t* nops) { if (!send_status_available_ || hijacked_) return; + trailing_metadata_ = FillMetadataArray( + *metadata_map_, &trailing_metadata_count_, send_error_details_); grpc_op* op = &ops[(*nops)++]; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = @@ -606,8 +606,7 @@ class CallOpServerSendStatus { if (!send_status_available_) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_SEND_STATUS); - interceptor_methods->SetSendTrailingMetadata(trailing_metadata_, - &trailing_metadata_count_); + interceptor_methods->SetSendTrailingMetadata(metadata_map_); interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_, &send_error_message_); } @@ -627,6 +626,7 @@ class CallOpServerSendStatus { grpc::string send_error_details_; grpc::string send_error_message_; size_t trailing_metadata_count_; + std::multimap* metadata_map_; grpc_metadata* trailing_metadata_; grpc_slice error_message_slice_; }; @@ -655,7 +655,9 @@ class CallOpRecvInitialMetadata { } void SetInterceptionHookPoint( - experimental::InterceptorBatchMethods* interceptor_methods) {} + experimental::InterceptorBatchMethods* interceptor_methods) { + interceptor_methods->SetRecvInitialMetadata(metadata_map_); + } void SetFinishInterceptionHookPoint( experimental::InterceptorBatchMethods* interceptor_methods) { @@ -671,7 +673,6 @@ class CallOpRecvInitialMetadata { if (metadata_map_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::PRE_RECV_INITIAL_METADATA); - interceptor_methods->SetRecvInitialMetadata(metadata_map_->arr()); } private: @@ -723,15 +724,16 @@ class CallOpClientRecvStatus { } void SetInterceptionHookPoint( - experimental::InterceptorBatchMethods* interceptor_methods) {} + experimental::InterceptorBatchMethods* interceptor_methods) { + interceptor_methods->SetRecvStatus(recv_status_); + interceptor_methods->SetRecvTrailingMetadata(metadata_map_); + } void SetFinishInterceptionHookPoint( experimental::InterceptorBatchMethods* interceptor_methods) { if (recv_status_ == nullptr) return; interceptor_methods->AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_STATUS); - interceptor_methods->SetRecvStatus(recv_status_); - interceptor_methods->SetRecvTrailingMetadata(metadata_map_->arr()); recv_status_ = nullptr; } @@ -916,54 +918,54 @@ class InterceptorBatchMethodsImpl hooks_[static_cast(type)] = true; } - virtual void GetSendMessage(grpc_byte_buffer** buf) override { + virtual void GetSendMessage(ByteBuffer** buf) override { *buf = send_message_; } - virtual void GetSendInitialMetadata(grpc_metadata** metadata, - size_t** count) override { + virtual void GetSendInitialMetadata( + std::multimap** metadata) override { *metadata = send_initial_metadata_; - *count = send_initial_metadata_count_; } - virtual void GetSendStatus(grpc_status_code** code, - grpc::string** error_details, - grpc::string** error_message) override { - *code = code_; - *error_details = error_details_; - *error_message = error_message_; + virtual void GetSendStatus(Status* status) override { + *status = Status(static_cast(*code_), *error_message_, + *error_details_); } - virtual void GetSendTrailingMetadata(grpc_metadata** metadata, - size_t** count) override { + virtual void ModifySendStatus(const Status& status) override { + *code_ = static_cast(status.error_code()); + *error_details_ = status.error_details(); + *error_message_ = status.error_message(); + } + + virtual void GetSendTrailingMetadata( + std::multimap** metadata) override { *metadata = send_trailing_metadata_; - *count = send_trailing_metadata_count_; } virtual void GetRecvMessage(void** message) override { *message = recv_message_; } - virtual void GetRecvInitialMetadata(grpc_metadata_array** array) override { - *array = recv_initial_metadata_; + virtual void GetRecvInitialMetadata( + std::multimap** map) override { + *map = recv_initial_metadata_->map(); } virtual void GetRecvStatus(Status** status) override { *status = recv_status_; } - virtual void GetRecvTrailingMetadata(grpc_metadata_array** map) override { - *map = recv_trailing_metadata_; + virtual void GetRecvTrailingMetadata( + std::multimap** map) override { + *map = recv_trailing_metadata_->map(); } - virtual void SetSendMessage(grpc_byte_buffer* buf) override { - send_message_ = buf; - } + virtual void SetSendMessage(ByteBuffer* buf) override { send_message_ = buf; } - virtual void SetSendInitialMetadata(grpc_metadata* metadata, - size_t* count) override { + virtual void SetSendInitialMetadata( + std::multimap* metadata) override { send_initial_metadata_ = metadata; - send_initial_metadata_count_ = count; } virtual void SetSendStatus(grpc_status_code* code, @@ -974,23 +976,22 @@ class InterceptorBatchMethodsImpl error_message_ = error_message; } - virtual void SetSendTrailingMetadata(grpc_metadata* metadata, - size_t* count) override { + virtual void SetSendTrailingMetadata( + std::multimap* metadata) override { send_trailing_metadata_ = metadata; - send_trailing_metadata_count_ = count; } virtual void SetRecvMessage(void* message) override { recv_message_ = message; } - virtual void SetRecvInitialMetadata(grpc_metadata_array* array) override { - recv_initial_metadata_ = array; + virtual void SetRecvInitialMetadata(internal::MetadataMap* map) override { + recv_initial_metadata_ = map; } virtual void SetRecvStatus(Status* status) override { recv_status_ = status; } - virtual void SetRecvTrailingMetadata(grpc_metadata_array* map) override { + virtual void SetRecvTrailingMetadata(internal::MetadataMap* map) override { recv_trailing_metadata_ = map; } @@ -1045,25 +1046,25 @@ class InterceptorBatchMethodsImpl nullptr; // The Call object is present along with CallOpSet object CallOpSetInterface* ops_ = nullptr; - grpc_byte_buffer* send_message_ = nullptr; + ByteBuffer* send_message_ = nullptr; - grpc_metadata* send_initial_metadata_ = nullptr; - size_t* send_initial_metadata_count_ = nullptr; + std::multimap* send_initial_metadata_; grpc_status_code* code_ = nullptr; grpc::string* error_details_ = nullptr; grpc::string* error_message_ = nullptr; + Status send_status_; - grpc_metadata* send_trailing_metadata_ = nullptr; + std::multimap* send_trailing_metadata_ = nullptr; size_t* send_trailing_metadata_count_ = nullptr; void* recv_message_ = nullptr; - grpc_metadata_array* recv_initial_metadata_ = nullptr; + internal::MetadataMap* recv_initial_metadata_ = nullptr; Status* recv_status_ = nullptr; - grpc_metadata_array* recv_trailing_metadata_ = nullptr; + internal::MetadataMap* recv_trailing_metadata_ = nullptr; // void (*hijacking_state_setter_)(); // void (*continue_after_interception_)(); @@ -1104,6 +1105,7 @@ class CallOpSet : public CallOpSetInterface, /* We have already finished intercepting and filling in the results. This * round trip from the core needed to be made because interceptors were * run */ + *tag = return_tag_; g_core_codegen_interface->grpc_call_unref(call_.call()); return true; } @@ -1114,9 +1116,9 @@ class CallOpSet : public CallOpSetInterface, this->Op4::FinishOp(status); this->Op5::FinishOp(status); this->Op6::FinishOp(status); - *tag = return_tag_; if (RunInterceptorsPostRecv()) { + *tag = return_tag_; g_core_codegen_interface->grpc_call_unref(call_.call()); return true; } diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 4d4faea063..ecb00a0769 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -77,7 +77,7 @@ class CallbackUnaryCallImpl { tag->force_run(s); return; } - ops->SendInitialMetadata(context->send_initial_metadata_, + ops->SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags()); ops->RecvInitialMetadata(context); ops->RecvMessage(result); diff --git a/include/grpcpp/impl/codegen/client_unary_call.h b/include/grpcpp/impl/codegen/client_unary_call.h index e4e8364e07..b1c80764f2 100644 --- a/include/grpcpp/impl/codegen/client_unary_call.h +++ b/include/grpcpp/impl/codegen/client_unary_call.h @@ -61,7 +61,7 @@ class BlockingUnaryCallImpl { if (!status_.ok()) { return; } - ops.SendInitialMetadata(context->send_initial_metadata_, + ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags()); ops.RecvInitialMetadata(context); ops.RecvMessage(result); diff --git a/include/grpcpp/impl/codegen/interceptor.h b/include/grpcpp/impl/codegen/interceptor.h index 2409388695..e9fd423b83 100644 --- a/include/grpcpp/impl/codegen/interceptor.h +++ b/include/grpcpp/impl/codegen/interceptor.h @@ -20,8 +20,10 @@ #define GRPCPP_IMPL_CODEGEN_INTERCEPTOR_H #include +#include #include #include +#include // struct grpc_byte_buffer; // struct grpc_status_code; @@ -75,45 +77,47 @@ class InterceptorBatchMethods { virtual void AddInterceptionHookPoint(InterceptionHookPoints type) = 0; - virtual void GetSendMessage(grpc_byte_buffer** buf) = 0; + virtual void GetSendMessage(ByteBuffer** buf) = 0; - virtual void GetSendInitialMetadata(grpc_metadata** metadata, - size_t** count) = 0; + virtual void GetSendInitialMetadata( + std::multimap** metadata) = 0; - virtual void GetSendStatus(grpc_status_code** code, - grpc::string** error_details, - grpc::string** error_message) = 0; + virtual void GetSendStatus(Status* status) = 0; - virtual void GetSendTrailingMetadata(grpc_metadata** metadata, - size_t** count) = 0; + virtual void ModifySendStatus(const Status& status) = 0; + + virtual void GetSendTrailingMetadata( + std::multimap** metadata) = 0; virtual void GetRecvMessage(void** message) = 0; - virtual void GetRecvInitialMetadata(grpc_metadata_array** array) = 0; + virtual void GetRecvInitialMetadata( + std::multimap** map) = 0; virtual void GetRecvStatus(Status** status) = 0; - virtual void GetRecvTrailingMetadata(grpc_metadata_array** map) = 0; + virtual void GetRecvTrailingMetadata( + std::multimap** map) = 0; - virtual void SetSendMessage(grpc_byte_buffer* buf) = 0; + virtual void SetSendMessage(ByteBuffer* buf) = 0; - virtual void SetSendInitialMetadata(grpc_metadata* metadata, - size_t* count) = 0; + virtual void SetSendInitialMetadata( + std::multimap* metadata) = 0; virtual void SetSendStatus(grpc_status_code* code, grpc::string* error_details, grpc::string* error_message) = 0; - virtual void SetSendTrailingMetadata(grpc_metadata* metadata, - size_t* count) = 0; + virtual void SetSendTrailingMetadata( + std::multimap* metadata) = 0; virtual void SetRecvMessage(void* message) = 0; - virtual void SetRecvInitialMetadata(grpc_metadata_array* array) = 0; + virtual void SetRecvInitialMetadata(internal::MetadataMap* map) = 0; virtual void SetRecvStatus(Status* status) = 0; - virtual void SetRecvTrailingMetadata(grpc_metadata_array* map) = 0; + virtual void SetRecvTrailingMetadata(internal::MetadataMap* map) = 0; }; } // namespace experimental } // namespace grpc diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index 53117f941b..1d3a2c9708 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -73,7 +73,7 @@ class RpcMethodHandler : public MethodHandler { CallOpSet ops; - ops.SendInitialMetadata(param.server_context->initial_metadata_, + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); if (param.server_context->compression_level_set()) { ops.set_compression_level(param.server_context->compression_level()); @@ -81,7 +81,7 @@ class RpcMethodHandler : public MethodHandler { if (status.ok()) { status = ops.SendMessage(rsp); } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); } @@ -117,7 +117,7 @@ class ClientStreamingHandler : public MethodHandler { CallOpServerSendStatus> ops; if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_, + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); if (param.server_context->compression_level_set()) { ops.set_compression_level(param.server_context->compression_level()); @@ -126,7 +126,7 @@ class ClientStreamingHandler : public MethodHandler { if (status.ok()) { status = ops.SendMessage(rsp); } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); param.call->cq()->Pluck(&ops); } @@ -163,13 +163,13 @@ class ServerStreamingHandler : public MethodHandler { CallOpSet ops; if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_, + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); if (param.server_context->compression_level_set()) { ops.set_compression_level(param.server_context->compression_level()); } } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); if (param.server_context->has_pending_ops_) { param.call->cq()->Pluck(¶m.server_context->pending_ops_); @@ -206,7 +206,7 @@ class TemplatedBidiStreamingHandler : public MethodHandler { CallOpSet ops; if (!param.server_context->sent_initial_metadata_) { - ops.SendInitialMetadata(param.server_context->initial_metadata_, + ops.SendInitialMetadata(¶m.server_context->initial_metadata_, param.server_context->initial_metadata_flags()); if (param.server_context->compression_level_set()) { ops.set_compression_level(param.server_context->compression_level()); @@ -218,7 +218,7 @@ class TemplatedBidiStreamingHandler : public MethodHandler { "Service did not provide response message"); } } - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); if (param.server_context->has_pending_ops_) { param.call->cq()->Pluck(¶m.server_context->pending_ops_); @@ -281,14 +281,14 @@ class ErrorMethodHandler : public MethodHandler { static void FillOps(ServerContext* context, T* ops) { Status status(code, ""); if (!context->sent_initial_metadata_) { - ops->SendInitialMetadata(context->initial_metadata_, + ops->SendInitialMetadata(&context->initial_metadata_, context->initial_metadata_flags()); if (context->compression_level_set()) { ops->set_compression_level(context->compression_level()); } context->sent_initial_metadata_ = true; } - ops->ServerSendStatus(context->trailing_metadata_, status); + ops->ServerSendStatus(&context->trailing_metadata_, status); } void RunHandler(const HandlerParameter& param) final { diff --git a/include/grpcpp/impl/codegen/sync_stream.h b/include/grpcpp/impl/codegen/sync_stream.h index cbfcf25d0a..6981076f04 100644 --- a/include/grpcpp/impl/codegen/sync_stream.h +++ b/include/grpcpp/impl/codegen/sync_stream.h @@ -250,7 +250,7 @@ class ClientReader final : public ClientReaderInterface { ::grpc::internal::CallOpSendMessage, ::grpc::internal::CallOpClientSendClose> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, + ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags()); // TODO(ctiller): don't assert GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); @@ -327,7 +327,7 @@ class ClientWriter : public ClientWriterInterface { ops.ClientSendClose(); } if (context_->initial_metadata_corked_) { - ops.SendInitialMetadata(context_->send_initial_metadata_, + ops.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); context_->set_initial_metadata_corked(false); } @@ -386,7 +386,7 @@ class ClientWriter : public ClientWriterInterface { if (!context_->initial_metadata_corked_) { ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, + ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags()); call_.PerformOps(&ops); cq_.Pluck(&ops); @@ -498,7 +498,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { ops.ClientSendClose(); } if (context_->initial_metadata_corked_) { - ops.SendInitialMetadata(context_->send_initial_metadata_, + ops.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags()); context_->set_initial_metadata_corked(false); } @@ -557,7 +557,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface { if (!context_->initial_metadata_corked_) { ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> ops; - ops.SendInitialMetadata(context->send_initial_metadata_, + ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags()); call_.PerformOps(&ops); cq_.Pluck(&ops); @@ -583,7 +583,7 @@ class ServerReader final : public ServerReaderInterface { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); internal::CallOpSet ops; - ops.SendInitialMetadata(ctx_->initial_metadata_, + ops.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { ops.set_compression_level(ctx_->compression_level()); @@ -635,7 +635,7 @@ class ServerWriter final : public ServerWriterInterface { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); internal::CallOpSet ops; - ops.SendInitialMetadata(ctx_->initial_metadata_, + ops.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { ops.set_compression_level(ctx_->compression_level()); @@ -660,7 +660,7 @@ class ServerWriter final : public ServerWriterInterface { return false; } if (!ctx_->sent_initial_metadata_) { - ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); @@ -708,7 +708,7 @@ class ServerReaderWriterBody final { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); CallOpSet ops; - ops.SendInitialMetadata(ctx_->initial_metadata_, + ops.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { ops.set_compression_level(ctx_->compression_level()); @@ -738,7 +738,7 @@ class ServerReaderWriterBody final { return false; } if (!ctx_->sent_initial_metadata_) { - ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags()); if (ctx_->compression_level_set()) { ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index dd94a44e1d..a68af4dc43 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -106,7 +106,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { ops.flags = 0; ops.reserved = nullptr; GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), &ops, 1, cq_tag(), nullptr)); + grpc_call_start_batch(call->call(), &ops, 1, this, nullptr)); } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { -- cgit v1.2.3