diff options
-rw-r--r-- | include/grpc++/impl/codegen/call.h | 12 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen.h | 1 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen_interface.h | 1 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.c | 1 | ||||
-rw-r--r-- | src/cpp/common/core_codegen.cc | 4 | ||||
-rw-r--r-- | test/core/end2end/tests/resource_quota_server.c | 11 |
6 files changed, 21 insertions, 9 deletions
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index 0cb11b4cca..3c30ccdb1e 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -272,7 +272,7 @@ class CallOpSendInitialMetadata { class CallOpSendMessage { public: - CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {} + CallOpSendMessage() : send_buf_(nullptr) {} /// Send \a message using \a options for the write. The \a options are cleared /// after use. @@ -295,20 +295,24 @@ class CallOpSendMessage { write_options_.Clear(); } void FinishOp(bool* status) { - if (own_buf_) g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_); + g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_); send_buf_ = nullptr; } private: grpc_byte_buffer* send_buf_; WriteOptions write_options_; - bool own_buf_; }; template <class M> Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { write_options_ = options; - return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_); + bool own_buf; + Status result = SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf); + if (!own_buf) { + send_buf_ = g_core_codegen_interface->grpc_byte_buffer_copy(send_buf_); + } + return result; } template <class M> diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 2b15a01845..5f96c8345b 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -68,6 +68,7 @@ class CoreCodegen final : public CoreCodegenInterface { void grpc_call_unref(grpc_call* call) override; virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) override; + grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) override; void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index b4c771ac93..7556016f27 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -74,6 +74,7 @@ class CoreCodegenInterface { virtual void gpr_cv_signal(gpr_cv* cv) = 0; virtual void gpr_cv_broadcast(gpr_cv* cv) = 0; + virtual grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) = 0; virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0; virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index fb03a10315..08f61629a9 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -85,6 +85,7 @@ static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx, static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer); GRPC_ERROR_UNREF(stream->shutdown_error); } diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index c7c6b6b13b..e81509904f 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -89,6 +89,10 @@ int CoreCodegen::gpr_cv_wait(gpr_cv* cv, gpr_mu* mu, void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); } void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); } +grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer *bb) { + return ::grpc_byte_buffer_copy(bb); +} + void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index 010e20c4c2..34a6a80a31 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -143,6 +143,8 @@ void resource_quota_server(grpc_end2end_test_config config) { malloc(sizeof(grpc_call_details) * NUM_CALLS); grpc_status_code *status = malloc(sizeof(grpc_status_code) * NUM_CALLS); grpc_slice *details = malloc(sizeof(grpc_slice) * NUM_CALLS); + grpc_byte_buffer **request_payload = + malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS); grpc_byte_buffer **request_payload_recv = malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS); int *was_cancelled = malloc(sizeof(int) * NUM_CALLS); @@ -156,9 +158,6 @@ void resource_quota_server(grpc_end2end_test_config config) { int deadline_exceeded = 0; int unavailable = 0; - grpc_byte_buffer *request_payload = - grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_op ops[6]; grpc_op *op; @@ -167,6 +166,7 @@ void resource_quota_server(grpc_end2end_test_config config) { grpc_metadata_array_init(&trailing_metadata_recv[i]); grpc_metadata_array_init(&request_metadata_recv[i]); grpc_call_details_init(&call_details[i]); + request_payload[i] = grpc_raw_byte_buffer_create(&request_payload_slice, 1); request_payload_recv[i] = NULL; was_cancelled[i] = 0; } @@ -195,7 +195,7 @@ void resource_quota_server(grpc_end2end_test_config config) { op->reserved = NULL; op++; op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = request_payload; + op->data.send_message.send_message = request_payload[i]; op->flags = 0; op->reserved = NULL; op++; @@ -261,6 +261,7 @@ void resource_quota_server(grpc_end2end_test_config config) { grpc_metadata_array_destroy(&trailing_metadata_recv[call_id]); grpc_call_unref(client_calls[call_id]); grpc_slice_unref(details[call_id]); + grpc_byte_buffer_destroy(request_payload[call_id]); pending_client_calls--; } else if (ev_tag < SERVER_RECV_BASE_TAG) { @@ -351,7 +352,6 @@ void resource_quota_server(grpc_end2end_test_config config) { NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client, deadline_exceeded, unavailable); - grpc_byte_buffer_destroy(request_payload); grpc_slice_unref(request_payload_slice); grpc_resource_quota_unref(resource_quota); @@ -366,6 +366,7 @@ void resource_quota_server(grpc_end2end_test_config config) { free(call_details); free(status); free(details); + free(request_payload); free(request_payload_recv); free(was_cancelled); } |