diff options
29 files changed, 609 insertions, 337 deletions
@@ -141,7 +141,6 @@ GRPCXX_SRCS = [ "src/cpp/server/server_posix.cc", "src/cpp/thread_manager/thread_manager.cc", "src/cpp/util/byte_buffer_cc.cc", - "src/cpp/util/slice_cc.cc", "src/cpp/util/status.cc", "src/cpp/util/string_ref.cc", "src/cpp/util/time_cc.cc", @@ -245,6 +244,8 @@ GRPCXX_PUBLIC_HDRS = [ "include/grpcpp/support/byte_buffer.h", "include/grpcpp/support/channel_arguments.h", "include/grpcpp/support/config.h", + "include/grpcpp/support/proto_buffer_reader.h", + "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/slice.h", "include/grpcpp/support/status.h", "include/grpcpp/support/status_code_enum.h", @@ -1871,6 +1872,8 @@ grpc_cc_library( language = "c++", public_hdrs = [ "include/grpc++/impl/codegen/proto_utils.h", + "include/grpcpp/impl/codegen/proto_buffer_reader.h", + "include/grpcpp/impl/codegen/proto_buffer_writer.h", "include/grpcpp/impl/codegen/proto_utils.h", ], deps = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index d94422f741..18400ea22a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2716,7 +2716,6 @@ add_library(grpc++ src/cpp/server/server_posix.cc src/cpp/thread_manager/thread_manager.cc src/cpp/util/byte_buffer_cc.cc - src/cpp/util/slice_cc.cc src/cpp/util/status.cc src/cpp/util/string_ref.cc src/cpp/util/time_cc.cc @@ -2841,6 +2840,8 @@ foreach(_hdr include/grpcpp/support/byte_buffer.h include/grpcpp/support/channel_arguments.h include/grpcpp/support/config.h + include/grpcpp/support/proto_buffer_reader.h + include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/slice.h include/grpcpp/support/status.h include/grpcpp/support/status_code_enum.h @@ -2959,6 +2960,8 @@ foreach(_hdr include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/time.h include/grpc++/impl/codegen/proto_utils.h + include/grpcpp/impl/codegen/proto_buffer_reader.h + include/grpcpp/impl/codegen/proto_buffer_writer.h include/grpcpp/impl/codegen/proto_utils.h include/grpc++/impl/codegen/config_protobuf.h include/grpcpp/impl/codegen/config_protobuf.h @@ -3067,7 +3070,6 @@ add_library(grpc++_cronet src/cpp/server/server_posix.cc src/cpp/thread_manager/thread_manager.cc src/cpp/util/byte_buffer_cc.cc - src/cpp/util/slice_cc.cc src/cpp/util/status.cc src/cpp/util/string_ref.cc src/cpp/util/time_cc.cc @@ -3402,6 +3404,8 @@ foreach(_hdr include/grpcpp/support/byte_buffer.h include/grpcpp/support/channel_arguments.h include/grpcpp/support/config.h + include/grpcpp/support/proto_buffer_reader.h + include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/slice.h include/grpcpp/support/status.h include/grpcpp/support/status_code_enum.h @@ -3945,6 +3949,8 @@ foreach(_hdr include/grpc/impl/codegen/sync_posix.h include/grpc/impl/codegen/sync_windows.h include/grpc++/impl/codegen/proto_utils.h + include/grpcpp/impl/codegen/proto_buffer_reader.h + include/grpcpp/impl/codegen/proto_buffer_writer.h include/grpcpp/impl/codegen/proto_utils.h include/grpc++/impl/codegen/config_protobuf.h include/grpcpp/impl/codegen/config_protobuf.h @@ -4118,6 +4124,8 @@ foreach(_hdr include/grpc/impl/codegen/sync_posix.h include/grpc/impl/codegen/sync_windows.h include/grpc++/impl/codegen/proto_utils.h + include/grpcpp/impl/codegen/proto_buffer_reader.h + include/grpcpp/impl/codegen/proto_buffer_writer.h include/grpcpp/impl/codegen/proto_utils.h include/grpc++/impl/codegen/config_protobuf.h include/grpcpp/impl/codegen/config_protobuf.h @@ -4166,7 +4174,6 @@ add_library(grpc++_unsecure src/cpp/server/server_posix.cc src/cpp/thread_manager/thread_manager.cc src/cpp/util/byte_buffer_cc.cc - src/cpp/util/slice_cc.cc src/cpp/util/status.cc src/cpp/util/string_ref.cc src/cpp/util/time_cc.cc @@ -4290,6 +4297,8 @@ foreach(_hdr include/grpcpp/support/byte_buffer.h include/grpcpp/support/channel_arguments.h include/grpcpp/support/config.h + include/grpcpp/support/proto_buffer_reader.h + include/grpcpp/support/proto_buffer_writer.h include/grpcpp/support/slice.h include/grpcpp/support/status.h include/grpcpp/support/status_code_enum.h @@ -5019,7 +5019,6 @@ LIBGRPC++_SRC = \ src/cpp/server/server_posix.cc \ src/cpp/thread_manager/thread_manager.cc \ src/cpp/util/byte_buffer_cc.cc \ - src/cpp/util/slice_cc.cc \ src/cpp/util/status.cc \ src/cpp/util/string_ref.cc \ src/cpp/util/time_cc.cc \ @@ -5109,6 +5108,8 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/config.h \ + include/grpcpp/support/proto_buffer_reader.h \ + include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ @@ -5227,6 +5228,8 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ include/grpc++/impl/codegen/proto_utils.h \ + include/grpcpp/impl/codegen/proto_buffer_reader.h \ + include/grpcpp/impl/codegen/proto_buffer_writer.h \ include/grpcpp/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/config_protobuf.h \ include/grpcpp/impl/codegen/config_protobuf.h \ @@ -5380,7 +5383,6 @@ LIBGRPC++_CRONET_SRC = \ src/cpp/server/server_posix.cc \ src/cpp/thread_manager/thread_manager.cc \ src/cpp/util/byte_buffer_cc.cc \ - src/cpp/util/slice_cc.cc \ src/cpp/util/status.cc \ src/cpp/util/string_ref.cc \ src/cpp/util/time_cc.cc \ @@ -5679,6 +5681,8 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/config.h \ + include/grpcpp/support/proto_buffer_reader.h \ + include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ @@ -6207,6 +6211,8 @@ PUBLIC_HEADERS_CXX += \ include/grpc/impl/codegen/sync_posix.h \ include/grpc/impl/codegen/sync_windows.h \ include/grpc++/impl/codegen/proto_utils.h \ + include/grpcpp/impl/codegen/proto_buffer_reader.h \ + include/grpcpp/impl/codegen/proto_buffer_writer.h \ include/grpcpp/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/config_protobuf.h \ include/grpcpp/impl/codegen/config_protobuf.h \ @@ -6357,6 +6363,8 @@ PUBLIC_HEADERS_CXX += \ include/grpc/impl/codegen/sync_posix.h \ include/grpc/impl/codegen/sync_windows.h \ include/grpc++/impl/codegen/proto_utils.h \ + include/grpcpp/impl/codegen/proto_buffer_reader.h \ + include/grpcpp/impl/codegen/proto_buffer_writer.h \ include/grpcpp/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/config_protobuf.h \ include/grpcpp/impl/codegen/config_protobuf.h \ @@ -6444,7 +6452,6 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/server/server_posix.cc \ src/cpp/thread_manager/thread_manager.cc \ src/cpp/util/byte_buffer_cc.cc \ - src/cpp/util/slice_cc.cc \ src/cpp/util/status.cc \ src/cpp/util/string_ref.cc \ src/cpp/util/time_cc.cc \ @@ -6534,6 +6541,8 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/config.h \ + include/grpcpp/support/proto_buffer_reader.h \ + include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ diff --git a/build.yaml b/build.yaml index aa5a40d849..594df9526c 100644 --- a/build.yaml +++ b/build.yaml @@ -1164,6 +1164,8 @@ filegroups: language: c++ public_headers: - include/grpc++/impl/codegen/proto_utils.h + - include/grpcpp/impl/codegen/proto_buffer_reader.h + - include/grpcpp/impl/codegen/proto_buffer_writer.h - include/grpcpp/impl/codegen/proto_utils.h uses: - grpc++_codegen_base @@ -1254,6 +1256,8 @@ filegroups: - include/grpcpp/support/byte_buffer.h - include/grpcpp/support/channel_arguments.h - include/grpcpp/support/config.h + - include/grpcpp/support/proto_buffer_reader.h + - include/grpcpp/support/proto_buffer_writer.h - include/grpcpp/support/slice.h - include/grpcpp/support/status.h - include/grpcpp/support/status_code_enum.h @@ -1300,7 +1304,6 @@ filegroups: - src/cpp/server/server_posix.cc - src/cpp/thread_manager/thread_manager.cc - src/cpp/util/byte_buffer_cc.cc - - src/cpp/util/slice_cc.cc - src/cpp/util/status.cc - src/cpp/util/string_ref.cc - src/cpp/util/time_cc.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index c5a4e50cd3..990b0a4f9d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -112,6 +112,8 @@ Pod::Spec.new do |s| 'include/grpcpp/support/byte_buffer.h', 'include/grpcpp/support/channel_arguments.h', 'include/grpcpp/support/config.h', + 'include/grpcpp/support/proto_buffer_reader.h', + 'include/grpcpp/support/proto_buffer_writer.h', 'include/grpcpp/support/slice.h', 'include/grpcpp/support/status.h', 'include/grpcpp/support/status_code_enum.h', @@ -206,7 +208,6 @@ Pod::Spec.new do |s| 'src/cpp/server/server_posix.cc', 'src/cpp/thread_manager/thread_manager.cc', 'src/cpp/util/byte_buffer_cc.cc', - 'src/cpp/util/slice_cc.cc', 'src/cpp/util/status.cc', 'src/cpp/util/string_ref.cc', 'src/cpp/util/time_cc.cc', @@ -1342,7 +1342,6 @@ 'src/cpp/server/server_posix.cc', 'src/cpp/thread_manager/thread_manager.cc', 'src/cpp/util/byte_buffer_cc.cc', - 'src/cpp/util/slice_cc.cc', 'src/cpp/util/status.cc', 'src/cpp/util/string_ref.cc', 'src/cpp/util/time_cc.cc', @@ -1490,7 +1489,6 @@ 'src/cpp/server/server_posix.cc', 'src/cpp/thread_manager/thread_manager.cc', 'src/cpp/util/byte_buffer_cc.cc', - 'src/cpp/util/slice_cc.cc', 'src/cpp/util/status.cc', 'src/cpp/util/string_ref.cc', 'src/cpp/util/time_cc.cc', diff --git a/include/grpcpp/impl/codegen/byte_buffer.h b/include/grpcpp/impl/codegen/byte_buffer.h index e637efeb85..fda13a04e6 100644 --- a/include/grpcpp/impl/codegen/byte_buffer.h +++ b/include/grpcpp/impl/codegen/byte_buffer.h @@ -32,6 +32,8 @@ namespace grpc { class ServerInterface; +class ByteBuffer; +class ServerInterface; namespace internal { class CallOpSendMessage; @@ -45,6 +47,7 @@ template <class ServiceType, class RequestType, class ResponseType> class ServerStreamingHandler; template <class R> class DeserializeFuncType; +class GrpcByteBufferPeer; } // namespace internal /// A sequence of bytes. class ByteBuffer final { @@ -53,7 +56,30 @@ class ByteBuffer final { ByteBuffer() : buffer_(nullptr) {} /// Construct buffer from \a slices, of which there are \a nslices. - ByteBuffer(const Slice* slices, size_t nslices); + ByteBuffer(const Slice* slices, size_t nslices) { + // The following assertions check that the representation of a grpc::Slice + // is identical to that of a grpc_slice: it has a grpc_slice field, and + // nothing else. + static_assert(std::is_same<decltype(slices[0].slice_), grpc_slice>::value, + "Slice must have same representation as grpc_slice"); + static_assert(sizeof(Slice) == sizeof(grpc_slice), + "Slice must have same representation as grpc_slice"); + // The following assertions check that the representation of a ByteBuffer is + // identical to grpc_byte_buffer*: it has a grpc_byte_buffer* field, + // and nothing else. + static_assert(std::is_same<decltype(buffer_), grpc_byte_buffer*>::value, + "ByteBuffer must have same representation as " + "grpc_byte_buffer*"); + static_assert(sizeof(ByteBuffer) == sizeof(grpc_byte_buffer*), + "ByteBuffer must have same representation as " + "grpc_byte_buffer*"); + // The const_cast is legal if grpc_raw_byte_buffer_create() does no more + // than its advertised side effect of increasing the reference count of the + // slices it processes, and such an increase does not affect the semantics + // seen by the caller of this constructor. + buffer_ = g_core_codegen_interface->grpc_raw_byte_buffer_create( + reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices); + } /// Constuct a byte buffer by referencing elements of existing buffer /// \a buf. Wrapper of core function grpc_byte_buffer_copy @@ -90,10 +116,18 @@ class ByteBuffer final { void Release() { buffer_ = nullptr; } /// Buffer size in bytes. - size_t Length() const; + size_t Length() const { + return buffer_ == nullptr + ? 0 + : g_core_codegen_interface->grpc_byte_buffer_length(buffer_); + } /// Swap the state of *this and *other. - void Swap(ByteBuffer* other); + void Swap(ByteBuffer* other) { + grpc_byte_buffer* tmp = other->buffer_; + other->buffer_ = buffer_; + buffer_ = tmp; + } /// Is this ByteBuffer valid? bool Valid() const { return (buffer_ != nullptr); } @@ -112,6 +146,9 @@ class ByteBuffer final { friend class internal::ServerStreamingHandler; template <class R> friend class internal::DeserializeFuncType; + friend class GrpcProtoBufferReader; + friend class GrpcProtoBufferWriter; + friend class internal::GrpcByteBufferPeer; grpc_byte_buffer* buffer_; diff --git a/include/grpcpp/impl/codegen/core_codegen.h b/include/grpcpp/impl/codegen/core_codegen.h index 0ca4ad524c..e9df96bf04 100644 --- a/include/grpcpp/impl/codegen/core_codegen.h +++ b/include/grpcpp/impl/codegen/core_codegen.h @@ -73,6 +73,7 @@ class CoreCodegen final : public CoreCodegenInterface { grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) override; void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; + size_t grpc_byte_buffer_length(grpc_byte_buffer* bb) override; int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer) override; @@ -86,6 +87,8 @@ class CoreCodegen final : public CoreCodegenInterface { grpc_slice grpc_slice_new_with_user_data(void* p, size_t len, void (*destroy)(void*), void* user_data) override; + grpc_slice grpc_slice_new_with_len(void* p, size_t len, + void (*destroy)(void*, size_t)) override; grpc_slice grpc_empty_slice() override; grpc_slice grpc_slice_malloc(size_t length) override; void grpc_slice_unref(grpc_slice slice) override; diff --git a/include/grpcpp/impl/codegen/core_codegen_interface.h b/include/grpcpp/impl/codegen/core_codegen_interface.h index d72f579c8e..1167a188a2 100644 --- a/include/grpcpp/impl/codegen/core_codegen_interface.h +++ b/include/grpcpp/impl/codegen/core_codegen_interface.h @@ -81,6 +81,8 @@ class CoreCodegenInterface { virtual grpc_byte_buffer* grpc_byte_buffer_copy(grpc_byte_buffer* bb) = 0; virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0; + virtual size_t grpc_byte_buffer_length(grpc_byte_buffer* bb) + GRPC_MUST_USE_RESULT = 0; virtual int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer) @@ -95,6 +97,9 @@ class CoreCodegenInterface { virtual grpc_slice grpc_slice_new_with_user_data(void* p, size_t len, void (*destroy)(void*), void* user_data) = 0; + virtual grpc_slice grpc_slice_new_with_len(void* p, size_t len, + void (*destroy)(void*, + size_t)) = 0; virtual grpc_call_error grpc_call_cancel_with_status(grpc_call* call, grpc_status_code status, const char* description, diff --git a/include/grpcpp/impl/codegen/proto_buffer_reader.h b/include/grpcpp/impl/codegen/proto_buffer_reader.h new file mode 100644 index 0000000000..149e05c74f --- /dev/null +++ b/include/grpcpp/impl/codegen/proto_buffer_reader.h @@ -0,0 +1,151 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_READER_H +#define GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_READER_H + +#include <type_traits> + +#include <grpc/impl/codegen/byte_buffer_reader.h> +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/impl/codegen/slice.h> +#include <grpcpp/impl/codegen/byte_buffer.h> +#include <grpcpp/impl/codegen/config_protobuf.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/serialization_traits.h> +#include <grpcpp/impl/codegen/status.h> + +/// This header provides an object that reads bytes directly from a +/// grpc::ByteBuffer, via the ZeroCopyInputStream interface + +namespace grpc { + +extern CoreCodegenInterface* g_core_codegen_interface; + +/// This is a specialization of the protobuf class ZeroCopyInputStream +/// The principle is to get one chunk of data at a time from the proto layer, +/// with options to backup (re-see some bytes) or skip (forward past some bytes) +/// +/// Read more about ZeroCopyInputStream interface here: +/// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyInputStream +class GrpcProtoBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { + public: + /// Constructs buffer reader from \a buffer. Will set \a status() to non ok + /// if \a buffer is invalid (the internal buffer has not been initialized). + explicit GrpcProtoBufferReader(ByteBuffer* buffer) + : byte_count_(0), backup_count_(0), status_() { + /// Implemented through a grpc_byte_buffer_reader which iterates + /// over the slices that make up a byte buffer + if (!buffer->Valid() || + !g_core_codegen_interface->grpc_byte_buffer_reader_init( + &reader_, buffer->c_buffer())) { + status_ = Status(StatusCode::INTERNAL, + "Couldn't initialize byte buffer reader"); + } + } + + ~GrpcProtoBufferReader() { + if (status_.ok()) { + g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_); + } + } + + /// Give the proto library a chunk of data from the stream. The caller + /// may safely read from data[0, size - 1]. + bool Next(const void** data, int* size) override { + if (!status_.ok()) { + return false; + } + /// If we have backed up previously, we need to return the backed-up slice + if (backup_count_ > 0) { + *data = GRPC_SLICE_START_PTR(slice_) + GRPC_SLICE_LENGTH(slice_) - + backup_count_; + GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX); + *size = (int)backup_count_; + backup_count_ = 0; + return true; + } + /// Otherwise get the next slice from the byte buffer reader + if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_, + &slice_)) { + return false; + } + g_core_codegen_interface->grpc_slice_unref(slice_); + *data = GRPC_SLICE_START_PTR(slice_); + // On win x64, int is only 32bit + GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); + byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); + return true; + } + + /// Returns the status of the buffer reader. + Status status() const { return status_; } + + /// The proto library calls this to indicate that we should back up \a count + /// bytes that have already been returned by the last call of Next. + /// So do the backup and have that ready for a later Next. + void BackUp(int count) override { + GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_))); + backup_count_ = count; + } + + /// The proto library calls this to skip over \a count bytes. Implement this + /// using Next and BackUp combined. + bool Skip(int count) override { + const void* data; + int size; + while (Next(&data, &size)) { + if (size >= count) { + BackUp(size - count); + return true; + } + // size < count; + count -= size; + } + // error or we have too large count; + return false; + } + + /// Returns the total number of bytes read since this object was created. + grpc::protobuf::int64 ByteCount() const override { + return byte_count_ - backup_count_; + } + + // These protected members are needed to support internal optimizations. + // they expose internal bits of grpc core that are NOT stable. If you have + // a use case needs to use one of these functions, please send an email to + // https://groups.google.com/forum/#!forum/grpc-io. + protected: + void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; } + int64_t backup_count() { return backup_count_; } + void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; } + grpc_byte_buffer_reader* reader() { return &reader_; } + grpc_slice* slice() { return &slice_; } + + private: + int64_t byte_count_; ///< total bytes read since object creation + int64_t backup_count_; ///< how far backed up in the stream we are + grpc_byte_buffer_reader reader_; ///< internal object to read \a grpc_slice + ///< from the \a grpc_byte_buffer + grpc_slice slice_; ///< current slice passed back to the caller + Status status_; ///< status of the entire object +}; + +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_READER_H diff --git a/include/grpcpp/impl/codegen/proto_buffer_writer.h b/include/grpcpp/impl/codegen/proto_buffer_writer.h new file mode 100644 index 0000000000..faf99800cd --- /dev/null +++ b/include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -0,0 +1,168 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H +#define GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H + +#include <type_traits> + +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/impl/codegen/slice.h> +#include <grpcpp/impl/codegen/byte_buffer.h> +#include <grpcpp/impl/codegen/config_protobuf.h> +#include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/serialization_traits.h> +#include <grpcpp/impl/codegen/status.h> + +/// This header provides an object that writes bytes directly into a +/// grpc::ByteBuffer, via the ZeroCopyOutputStream interface + +namespace grpc { + +extern CoreCodegenInterface* g_core_codegen_interface; + +// Forward declaration for testing use only +namespace internal { +class GrpcProtoBufferWriterPeer; +} // namespace internal + +const int kGrpcProtoBufferWriterMaxBufferLength = 1024 * 1024; + +/// This is a specialization of the protobuf class ZeroCopyOutputStream. +/// The principle is to give the proto layer one buffer of bytes at a time +/// that it can use to serialize the next portion of the message, with the +/// option to "backup" if more buffer is given than required at the last buffer. +/// +/// Read more about ZeroCopyOutputStream interface here: +/// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyOutputStream +class GrpcProtoBufferWriter + : public ::grpc::protobuf::io::ZeroCopyOutputStream { + public: + /// Constructor for this derived class + /// + /// \param[out] byte_buffer A pointer to the grpc::ByteBuffer created + /// \param block_size How big are the chunks to allocate at a time + /// \param total_size How many total bytes are required for this proto + GrpcProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size) + : block_size_(block_size), + total_size_(total_size), + byte_count_(0), + have_backup_(false) { + GPR_CODEGEN_ASSERT(!byte_buffer->Valid()); + /// Create an empty raw byte buffer and look at its underlying slice buffer + grpc_byte_buffer* bp = + g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0); + byte_buffer->set_buffer(bp); + slice_buffer_ = &bp->data.raw.slice_buffer; + } + + ~GrpcProtoBufferWriter() { + if (have_backup_) { + g_core_codegen_interface->grpc_slice_unref(backup_slice_); + } + } + + /// Give the proto library the next buffer of bytes and its size. It is + /// safe for the caller to write from data[0, size - 1]. + bool Next(void** data, int* size) override { + // Protobuf should not ask for more memory than total_size_. + GPR_CODEGEN_ASSERT(byte_count_ < total_size_); + // 1. Use the remaining backup slice if we have one + // 2. Otherwise allocate a slice, up to the remaining length needed + // or our maximum allocation size + // 3. Provide the slice start and size available + // 4. Add the slice being returned to the slice buffer + size_t remain = total_size_ - byte_count_; + if (have_backup_) { + /// If we have a backup slice, we should use it first + slice_ = backup_slice_; + have_backup_ = false; + if (GRPC_SLICE_LENGTH(slice_) > remain) { + GRPC_SLICE_SET_LENGTH(slice_, remain); + } + } else { + // When less than a whole block is needed, only allocate that much. + // But make sure the allocated slice is not inlined. + size_t allocate_length = + remain > static_cast<size_t>(block_size_) ? block_size_ : remain; + slice_ = g_core_codegen_interface->grpc_slice_malloc( + allocate_length > GRPC_SLICE_INLINED_SIZE + ? allocate_length + : GRPC_SLICE_INLINED_SIZE + 1); + } + *data = GRPC_SLICE_START_PTR(slice_); + // On win x64, int is only 32bit + GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); + byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); + g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); + return true; + } + + /// Backup by \a count bytes because Next returned more bytes than needed + /// (only used in the last buffer). \a count must be less than or equal too + /// the last buffer returned from next. + void BackUp(int count) override { + /// 1. Remove the partially-used last slice from the slice buffer + /// 2. Split it into the needed (if any) and unneeded part + /// 3. Add the needed part back to the slice buffer + /// 4. Mark that we still have the remaining part (for later use/unref) + GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_))); + g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_); + if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) { + backup_slice_ = slice_; + } else { + backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail( + &slice_, GRPC_SLICE_LENGTH(slice_) - count); + g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); + } + // It's dangerous to keep an inlined grpc_slice as the backup slice, since + // on a following Next() call, a reference will be returned to this slice + // via GRPC_SLICE_START_PTR, which will not be an address held by + // slice_buffer_. + have_backup_ = backup_slice_.refcount != NULL; + byte_count_ -= count; + } + + /// Returns the total number of bytes written since this object was created. + grpc::protobuf::int64 ByteCount() const override { return byte_count_; } + + // These protected members are needed to support internal optimizations. + // they expose internal bits of grpc core that are NOT stable. If you have + // a use case needs to use one of these functions, please send an email to + // https://groups.google.com/forum/#!forum/grpc-io. + protected: + grpc_slice_buffer* slice_buffer() { return slice_buffer_; } + void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; } + + private: + // friend for testing purposes only + friend class internal::GrpcProtoBufferWriterPeer; + const int block_size_; ///< size to alloc for each new \a grpc_slice needed + const int total_size_; ///< byte size of proto being serialized + int64_t byte_count_; ///< bytes written since this object was created + grpc_slice_buffer* + slice_buffer_; ///< internal buffer of slices holding the serialized data + bool have_backup_; ///< if we are holding a backup slice or not + grpc_slice backup_slice_; ///< holds space we can still write to, if the + ///< caller has called BackUp + grpc_slice slice_; ///< current slice passed back to the caller +}; + +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H diff --git a/include/grpcpp/impl/codegen/proto_utils.h b/include/grpcpp/impl/codegen/proto_utils.h index 81438ee1d5..ec1dccaa74 100644 --- a/include/grpcpp/impl/codegen/proto_utils.h +++ b/include/grpcpp/impl/codegen/proto_utils.h @@ -24,203 +24,62 @@ #include <grpc/impl/codegen/byte_buffer_reader.h> #include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/slice.h> +#include <grpcpp/impl/codegen/byte_buffer.h> #include <grpcpp/impl/codegen/config_protobuf.h> #include <grpcpp/impl/codegen/core_codegen_interface.h> +#include <grpcpp/impl/codegen/proto_buffer_reader.h> +#include <grpcpp/impl/codegen/proto_buffer_writer.h> #include <grpcpp/impl/codegen/serialization_traits.h> +#include <grpcpp/impl/codegen/slice.h> #include <grpcpp/impl/codegen/status.h> +/// This header provides serialization and deserialization between gRPC +/// messages serialized using protobuf and the C++ objects they represent. + namespace grpc { extern CoreCodegenInterface* g_core_codegen_interface; -namespace internal { - -class GrpcBufferWriterPeer; - -const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; - -class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { - public: - GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size) - : block_size_(block_size), - total_size_(total_size), - byte_count_(0), - have_backup_(false) { - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0); - slice_buffer_ = &(*bp)->data.raw.slice_buffer; - } - - ~GrpcBufferWriter() override { - if (have_backup_) { - g_core_codegen_interface->grpc_slice_unref(backup_slice_); - } - } - - bool Next(void** data, int* size) override { - // Protobuf should not ask for more memory than total_size_. - GPR_CODEGEN_ASSERT(byte_count_ < total_size_); - size_t remain = total_size_ - byte_count_; - if (have_backup_) { - slice_ = backup_slice_; - have_backup_ = false; - if (GRPC_SLICE_LENGTH(slice_) > remain) { - GRPC_SLICE_SET_LENGTH(slice_, remain); - } - } else { - // When less than a whole block is needed, only allocate that much. - // But make sure the allocated slice is not inlined. - size_t allocate_length = - remain > static_cast<size_t>(block_size_) ? block_size_ : remain; - slice_ = g_core_codegen_interface->grpc_slice_malloc( - allocate_length > GRPC_SLICE_INLINED_SIZE - ? allocate_length - : GRPC_SLICE_INLINED_SIZE + 1); - } - *data = GRPC_SLICE_START_PTR(slice_); - // On win x64, int is only 32bit - GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); - g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); - return true; - } - - void BackUp(int count) override { - g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_); - if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) { - backup_slice_ = slice_; - } else { - backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail( - &slice_, GRPC_SLICE_LENGTH(slice_) - count); - g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); - } - // It's dangerous to keep an inlined grpc_slice as the backup slice, since - // on a following Next() call, a reference will be returned to this slice - // via GRPC_SLICE_START_PTR, which will not be an adddress held by - // slice_buffer_. - have_backup_ = backup_slice_.refcount != NULL; - byte_count_ -= count; - } - - grpc::protobuf::int64 ByteCount() const override { return byte_count_; } - - protected: - friend class GrpcBufferWriterPeer; - const int block_size_; - const int total_size_; - int64_t byte_count_; - grpc_slice_buffer* slice_buffer_; - bool have_backup_; - grpc_slice backup_slice_; - grpc_slice slice_; -}; - -class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { - public: - explicit GrpcBufferReader(grpc_byte_buffer* buffer) - : byte_count_(0), backup_count_(0), status_() { - if (!g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader_, - buffer)) { - status_ = Status(StatusCode::INTERNAL, - "Couldn't initialize byte buffer reader"); - } - } - ~GrpcBufferReader() override { - g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_); - } - - bool Next(const void** data, int* size) override { - if (!status_.ok()) { - return false; - } - if (backup_count_ > 0) { - *data = GRPC_SLICE_START_PTR(slice_) + GRPC_SLICE_LENGTH(slice_) - - backup_count_; - GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX); - *size = (int)backup_count_; - backup_count_ = 0; - return true; - } - if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_, - &slice_)) { - return false; - } - g_core_codegen_interface->grpc_slice_unref(slice_); - *data = GRPC_SLICE_START_PTR(slice_); - // On win x64, int is only 32bit - GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); - return true; - } - - Status status() const { return status_; } - - void BackUp(int count) override { backup_count_ = count; } - - bool Skip(int count) override { - const void* data; - int size; - while (Next(&data, &size)) { - if (size >= count) { - BackUp(size - count); - return true; - } - // size < count; - count -= size; - } - // error or we have too large count; - return false; - } - - grpc::protobuf::int64 ByteCount() const override { - return byte_count_ - backup_count_; - } - - protected: - int64_t byte_count_; - int64_t backup_count_; - grpc_byte_buffer_reader reader_; - grpc_slice slice_; - Status status_; -}; - -// BufferWriter must be a subclass of io::ZeroCopyOutputStream. -template <class BufferWriter, class T> -Status GenericSerialize(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp, bool* own_buffer) { - static_assert( - std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value, - "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); +// ProtoBufferWriter must be a subclass of ::protobuf::io::ZeroCopyOutputStream. +template <class ProtoBufferWriter, class T> +Status GenericSerialize(const grpc::protobuf::Message& msg, ByteBuffer* bb, + bool* own_buffer) { + static_assert(std::is_base_of<protobuf::io::ZeroCopyOutputStream, + ProtoBufferWriter>::value, + "ProtoBufferWriter must be a subclass of " + "::protobuf::io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) { - grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); - GPR_CODEGEN_ASSERT( - GRPC_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); - g_core_codegen_interface->grpc_slice_unref(slice); + Slice slice(byte_size); + // We serialize directly into the allocated slices memory + GPR_CODEGEN_ASSERT(slice.end() == msg.SerializeWithCachedSizesToArray( + const_cast<uint8_t*>(slice.begin()))); + ByteBuffer tmp(&slice, 1); + bb->Swap(&tmp); return g_core_codegen_interface->ok(); } - BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); + ProtoBufferWriter writer(bb, kGrpcProtoBufferWriterMaxBufferLength, + byte_size); return msg.SerializeToZeroCopyStream(&writer) ? g_core_codegen_interface->ok() : Status(StatusCode::INTERNAL, "Failed to serialize message"); } -// BufferReader must be a subclass of io::ZeroCopyInputStream. -template <class BufferReader, class T> -Status GenericDeserialize(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg) { - static_assert( - std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value, - "BufferReader must be a subclass of io::ZeroCopyInputStream"); +// BufferReader must be a subclass of ::protobuf::io::ZeroCopyInputStream. +template <class ProtoBufferReader, class T> +Status GenericDeserialize(ByteBuffer* buffer, grpc::protobuf::Message* msg) { + static_assert(std::is_base_of<protobuf::io::ZeroCopyInputStream, + ProtoBufferReader>::value, + "ProtoBufferReader must be a subclass of " + "::protobuf::io::ZeroCopyInputStream"); if (buffer == nullptr) { return Status(StatusCode::INTERNAL, "No payload"); } Status result = g_core_codegen_interface->ok(); { - BufferReader reader(buffer); + ProtoBufferReader reader(buffer); if (!reader.status().ok()) { return reader.status(); } @@ -233,12 +92,10 @@ Status GenericDeserialize(grpc_byte_buffer* buffer, result = Status(StatusCode::INTERNAL, "Did not read entire message"); } } - g_core_codegen_interface->grpc_byte_buffer_destroy(buffer); + buffer->Clear(); return result; } -} // namespace internal - // this is needed so the following class does not conflict with protobuf // serializers that utilize internal-only tools. #ifdef GRPC_OPEN_SOURCE_PROTO @@ -249,16 +106,13 @@ template <class T> class SerializationTraits<T, typename std::enable_if<std::is_base_of< grpc::protobuf::Message, T>::value>::type> { public: - static Status Serialize(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp, bool* own_buffer) { - return internal::GenericSerialize<internal::GrpcBufferWriter, T>( - msg, bp, own_buffer); + static Status Serialize(const grpc::protobuf::Message& msg, ByteBuffer* bb, + bool* own_buffer) { + return GenericSerialize<GrpcProtoBufferWriter, T>(msg, bb, own_buffer); } - static Status Deserialize(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg) { - return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer, - msg); + static Status Deserialize(ByteBuffer* buffer, grpc::protobuf::Message* msg) { + return GenericDeserialize<GrpcProtoBufferReader, T>(buffer, msg); } }; #endif diff --git a/include/grpcpp/impl/codegen/slice.h b/include/grpcpp/impl/codegen/slice.h index fcccd4b68e..8966559dc8 100644 --- a/include/grpcpp/impl/codegen/slice.h +++ b/include/grpcpp/impl/codegen/slice.h @@ -35,34 +35,43 @@ namespace grpc { class Slice final { public: /// Construct an empty slice. - Slice(); + Slice() : slice_(g_core_codegen_interface->grpc_empty_slice()) {} /// Destructor - drops one reference. - ~Slice(); + ~Slice() { g_core_codegen_interface->grpc_slice_unref(slice_); } enum AddRef { ADD_REF }; /// Construct a slice from \a slice, adding a reference. - Slice(grpc_slice slice, AddRef); + Slice(grpc_slice slice, AddRef) + : slice_(g_core_codegen_interface->grpc_slice_ref(slice)) {} enum StealRef { STEAL_REF }; /// Construct a slice from \a slice, stealing a reference. - Slice(grpc_slice slice, StealRef); + Slice(grpc_slice slice, StealRef) : slice_(slice) {} /// Allocate a slice of specified size - Slice(size_t len); + Slice(size_t len) + : slice_(g_core_codegen_interface->grpc_slice_malloc(len)) {} /// Construct a slice from a copied buffer - Slice(const void* buf, size_t len); + Slice(const void* buf, size_t len) + : slice_(g_core_codegen_interface->grpc_slice_from_copied_buffer( + reinterpret_cast<const char*>(buf), len)) {} /// Construct a slice from a copied string - Slice(const grpc::string& str); + Slice(const grpc::string& str) + : slice_(g_core_codegen_interface->grpc_slice_from_copied_buffer( + str.c_str(), str.length())) {} enum StaticSlice { STATIC_SLICE }; /// Construct a slice from a static buffer - Slice(const void* buf, size_t len, StaticSlice); + Slice(const void* buf, size_t len, StaticSlice) + : slice_(g_core_codegen_interface->grpc_slice_from_static_buffer( + reinterpret_cast<const char*>(buf), len)) {} /// Copy constructor, adds a reference. - Slice(const Slice& other); + Slice(const Slice& other) + : slice_(g_core_codegen_interface->grpc_slice_ref(other.slice_)) {} /// Assignment, reference count is unchanged. Slice& operator=(Slice other) { @@ -75,14 +84,18 @@ class Slice final { /// user data pointer passed in at destruction. Can be the same as buf or /// different (e.g., if data is part of a larger structure that must be /// destroyed when the data is no longer needed) - Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data); + Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data) + : slice_(g_core_codegen_interface->grpc_slice_new_with_user_data( + buf, len, destroy, user_data)) {} /// Specialization of above for common case where buf == user_data Slice(void* buf, size_t len, void (*destroy)(void*)) : Slice(buf, len, destroy, buf) {} /// Similar to the above but has a destroy that also takes slice length - Slice(void* buf, size_t len, void (*destroy)(void*, size_t)); + Slice(void* buf, size_t len, void (*destroy)(void*, size_t)) + : slice_(g_core_codegen_interface->grpc_slice_new_with_len(buf, len, + destroy)) {} /// Byte size. size_t size() const { return GRPC_SLICE_LENGTH(slice_); } @@ -94,7 +107,9 @@ class Slice final { const uint8_t* end() const { return GRPC_SLICE_END_PTR(slice_); } /// Raw C slice. Caller needs to call grpc_slice_unref when done. - grpc_slice c_slice() const; + grpc_slice c_slice() const { + return g_core_codegen_interface->grpc_slice_ref(slice_); + } private: friend class ByteBuffer; diff --git a/include/grpcpp/support/proto_buffer_reader.h b/include/grpcpp/support/proto_buffer_reader.h new file mode 100644 index 0000000000..4cdb65d531 --- /dev/null +++ b/include/grpcpp/support/proto_buffer_reader.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_PROTO_BUFFER_READER_H +#define GRPCPP_SUPPORT_PROTO_BUFFER_READER_H + +#include <grpcpp/impl/codegen/proto_buffer_reader.h> + +#endif // GRPCPP_SUPPORT_PROTO_BUFFER_READER_H diff --git a/include/grpcpp/support/proto_buffer_writer.h b/include/grpcpp/support/proto_buffer_writer.h new file mode 100644 index 0000000000..01cf29c457 --- /dev/null +++ b/include/grpcpp/support/proto_buffer_writer.h @@ -0,0 +1,24 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H +#define GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H + +#include <grpcpp/impl/codegen/proto_buffer_writer.h> + +#endif // GRPCPP_SUPPORT_PROTO_BUFFER_WRITER_H diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 0ef7c03056..44d8cf2b1e 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -59,7 +59,10 @@ //#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 #define MAX_EPOLL_EVENTS 100 -#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 1 +// TODO(juanlishen): We use a greater-than-one value here as a workaround fix to +// a keepalive ping timeout issue. We may want to revert https://github +// .com/grpc/grpc/pull/14943 once we figure out the root cause. +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, "pollable_refcount"); @@ -198,6 +201,7 @@ struct grpc_pollset_worker { struct grpc_pollset { gpr_mu mu; + gpr_atm worker_count; pollable* active_pollable; bool kicked_without_poller; grpc_closure* shutdown_closure; @@ -685,6 +689,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) { static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { gpr_mu_init(&pollset->mu); + gpr_atm_no_barrier_store(&pollset->worker_count, 0); pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); pollset->kicked_without_poller = false; pollset->shutdown_closure = nullptr; @@ -758,8 +763,20 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, pollable* pollable_obj, bool drain) { GPR_TIMER_SCOPE("pollable_process_events", 0); static const char* err_desc = "pollset_process_events"; + // Use a simple heuristic to determine how many fd events to process + // per loop iteration. (events/workers) + int handle_count = 1; + int worker_count = gpr_atm_no_barrier_load(&pollset->worker_count); + GPR_ASSERT(worker_count > 0); + handle_count = + (pollable_obj->event_count - pollable_obj->event_cursor) / worker_count; + if (handle_count == 0) { + handle_count = 1; + } else if (handle_count > MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) { + handle_count = MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL; + } grpc_error* error = GRPC_ERROR_NONE; - for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && + for (int i = 0; (drain || i < handle_count) && pollable_obj->event_cursor != pollable_obj->event_count; i++) { int n = pollable_obj->event_cursor++; @@ -884,6 +901,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, GPR_TIMER_SCOPE("begin_worker", 0); bool do_poll = (pollset->shutdown_closure == nullptr && !pollset->already_shutdown); + gpr_atm_no_barrier_fetch_add(&pollset->worker_count, 1); if (worker_hdl != nullptr) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; @@ -964,6 +982,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } + gpr_atm_no_barrier_fetch_add(&pollset->worker_count, -1); } #ifndef NDEBUG diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index aa9788da76..619aacadaa 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -98,6 +98,10 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } +size_t CoreCodegen::grpc_byte_buffer_length(grpc_byte_buffer* bb) { + return ::grpc_byte_buffer_length(bb); +} + grpc_call_error CoreCodegen::grpc_call_cancel_with_status( grpc_call* call, grpc_status_code status, const char* description, void* reserved) { @@ -135,6 +139,12 @@ grpc_slice CoreCodegen::grpc_slice_new_with_user_data(void* p, size_t len, return ::grpc_slice_new_with_user_data(p, len, destroy, user_data); } +grpc_slice CoreCodegen::grpc_slice_new_with_len(void* p, size_t len, + void (*destroy)(void*, + size_t)) { + return ::grpc_slice_new_with_len(p, len, destroy); +} + grpc_slice CoreCodegen::grpc_empty_slice() { return ::grpc_empty_slice(); } grpc_slice CoreCodegen::grpc_slice_malloc(size_t length) { diff --git a/src/cpp/util/byte_buffer_cc.cc b/src/cpp/util/byte_buffer_cc.cc index fbc1768bcc..d976b69440 100644 --- a/src/cpp/util/byte_buffer_cc.cc +++ b/src/cpp/util/byte_buffer_cc.cc @@ -23,34 +23,6 @@ namespace grpc { -static internal::GrpcLibraryInitializer g_gli_initializer; - -ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) { - // The following assertions check that the representation of a grpc::Slice is - // identical to that of a grpc_slice: it has a grpc_slice field, and nothing - // else. - static_assert(std::is_same<decltype(slices[0].slice_), grpc_slice>::value, - "Slice must have same representation as grpc_slice"); - static_assert(sizeof(Slice) == sizeof(grpc_slice), - "Slice must have same representation as grpc_slice"); - // The following assertions check that the representation of a ByteBuffer is - // identical to grpc_byte_buffer*: it has a grpc_byte_buffer* field, - // and nothing else. - static_assert(std::is_same<decltype(buffer_), grpc_byte_buffer*>::value, - "ByteBuffer must have same representation as " - "grpc_byte_buffer*"); - static_assert(sizeof(ByteBuffer) == sizeof(grpc_byte_buffer*), - "ByteBuffer must have same representation as " - "grpc_byte_buffer*"); - g_gli_initializer.summon(); // Make sure that initializer linked in - // The const_cast is legal if grpc_raw_byte_buffer_create() does no more - // than its advertised side effect of increasing the reference count of the - // slices it processes, and such an increase does not affect the semantics - // seen by the caller of this constructor. - buffer_ = grpc_raw_byte_buffer_create( - reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices); -} - Status ByteBuffer::Dump(std::vector<Slice>* slices) const { slices->clear(); if (!buffer_) { @@ -69,14 +41,6 @@ Status ByteBuffer::Dump(std::vector<Slice>* slices) const { return Status::OK; } -size_t ByteBuffer::Length() const { - if (buffer_) { - return grpc_byte_buffer_length(buffer_); - } else { - return 0; - } -} - ByteBuffer::ByteBuffer(const ByteBuffer& buf) : buffer_(grpc_byte_buffer_copy(buf.buffer_)) {} @@ -90,10 +54,4 @@ ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) { return *this; } -void ByteBuffer::Swap(ByteBuffer* other) { - grpc_byte_buffer* tmp = other->buffer_; - other->buffer_ = buffer_; - buffer_ = tmp; -} - } // namespace grpc diff --git a/src/cpp/util/slice_cc.cc b/src/cpp/util/slice_cc.cc deleted file mode 100644 index c72dbdbfc0..0000000000 --- a/src/cpp/util/slice_cc.cc +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/slice.h> -#include <grpcpp/support/slice.h> - -namespace grpc { - -Slice::Slice() : slice_(grpc_empty_slice()) {} - -Slice::~Slice() { grpc_slice_unref(slice_); } - -Slice::Slice(grpc_slice slice, AddRef) : slice_(grpc_slice_ref(slice)) {} - -Slice::Slice(grpc_slice slice, StealRef) : slice_(slice) {} - -Slice::Slice(size_t len) : slice_(grpc_slice_malloc(len)) {} - -Slice::Slice(const void* buf, size_t len) - : slice_( - grpc_slice_from_copied_buffer(static_cast<const char*>(buf), len)) {} - -Slice::Slice(const grpc::string& str) - : slice_(grpc_slice_from_copied_buffer(str.c_str(), str.length())) {} - -Slice::Slice(const void* buf, size_t len, StaticSlice) - : slice_( - grpc_slice_from_static_buffer(static_cast<const char*>(buf), len)) {} - -Slice::Slice(const Slice& other) : slice_(grpc_slice_ref(other.slice_)) {} - -Slice::Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data) - : slice_(grpc_slice_new_with_user_data(buf, len, destroy, user_data)) {} - -Slice::Slice(void* buf, size_t len, void (*destroy)(void*, size_t)) - : slice_(grpc_slice_new_with_len(buf, len, destroy)) {} - -grpc_slice Slice::c_slice() const { return grpc_slice_ref(slice_); } - -} // namespace grpc diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb index 8f078cfbed..8ec2073d98 100755 --- a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -54,8 +54,14 @@ def run_multiple_killed_watches(num_threads, sleep_time) end def main + STDERR.puts '10 iterations, sleep 0.1 before killing thread' run_multiple_killed_watches(10, 0.1) + STDERR.puts '1000 iterations, sleep 0.001 before killing thread' run_multiple_killed_watches(1000, 0.001) + STDERR.puts '10000 iterations, sleep 0.00001 before killing thread' + run_multiple_killed_watches(10_000, 0.00001) + STDERR.puts '20000 iterations, sleep 0.00001 before killing thread' + run_multiple_killed_watches(20_000, 0.00001) end main diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index e8e87e4f15..4760f33e38 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -84,7 +84,7 @@ if grpc_config == 'gcov' end if grpc_config == 'dbg' - $CFLAGS << ' -O0' + $CFLAGS << ' -O0 -ggdb3' end $LDFLAGS << ' -Wl,-wrap,memcpy' if RUBY_PLATFORM =~ /linux/ diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index e8bfeb32a0..3f0dc530cf 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -315,7 +315,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE* argv, } typedef struct watch_state_stack { - grpc_channel* channel; + bg_watched_channel* bg_wrapped; gpr_timespec deadline; int last_state; } watch_state_stack; @@ -328,15 +328,15 @@ static void* wait_for_watch_state_op_complete_without_gvl(void* arg) { gpr_mu_lock(&global_connection_polling_mu); // its unsafe to do a "watch" after "channel polling abort" because the cq has // been shut down. - if (abort_channel_polling) { + if (abort_channel_polling || stack->bg_wrapped->channel_destroyed) { gpr_mu_unlock(&global_connection_polling_mu); return (void*)0; } op = gpr_zalloc(sizeof(watch_state_op)); op->op_type = WATCH_STATE_API; - grpc_channel_watch_connectivity_state(stack->channel, stack->last_state, - stack->deadline, channel_polling_cq, - op); + grpc_channel_watch_connectivity_state(stack->bg_wrapped->channel, + stack->last_state, stack->deadline, + channel_polling_cq, op); while (!op->op.api_callback_args.called_back) { gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu, @@ -388,7 +388,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, return Qnil; } - stack.channel = wrapper->bg_wrapped->channel; + stack.bg_wrapped = wrapper->bg_wrapped; stack.deadline = grpc_rb_time_timeval(deadline, 0), stack.last_state = NUM2LONG(last_state); diff --git a/test/cpp/codegen/proto_utils_test.cc b/test/cpp/codegen/proto_utils_test.cc index 836d3d8076..53e2a2864c 100644 --- a/test/cpp/codegen/proto_utils_test.cc +++ b/test/cpp/codegen/proto_utils_test.cc @@ -24,32 +24,43 @@ #include <gtest/gtest.h> namespace grpc { + namespace internal { -// Provide access to GrpcBufferWriter internals. -class GrpcBufferWriterPeer { +// Provide access to GrpcProtoBufferWriter internals. +class GrpcProtoBufferWriterPeer { public: - explicit GrpcBufferWriterPeer(internal::GrpcBufferWriter* writer) + explicit GrpcProtoBufferWriterPeer(GrpcProtoBufferWriter* writer) : writer_(writer) {} bool have_backup() const { return writer_->have_backup_; } const grpc_slice& backup_slice() const { return writer_->backup_slice_; } const grpc_slice& slice() const { return writer_->slice_; } private: - GrpcBufferWriter* writer_; + GrpcProtoBufferWriter* writer_; +}; + +// Provide access to ByteBuffer internals. +class GrpcByteBufferPeer { + public: + explicit GrpcByteBufferPeer(ByteBuffer* bb) : bb_(bb) {} + grpc_byte_buffer* c_buffer() { return bb_->c_buffer(); } + + private: + ByteBuffer* bb_; }; class ProtoUtilsTest : public ::testing::Test {}; // Regression test for a memory corruption bug where a series of -// GrpcBufferWriter Next()/Backup() invocations could result in a dangling +// GrpcProtoBufferWriter Next()/Backup() invocations could result in a dangling // pointer returned by Next() due to the interaction between grpc_slice inlining // and GRPC_SLICE_START_PTR. TEST_F(ProtoUtilsTest, TinyBackupThenNext) { - grpc_byte_buffer* bp; + ByteBuffer bp; const int block_size = 1024; - GrpcBufferWriter writer(&bp, block_size, 8192); - GrpcBufferWriterPeer peer(&writer); + GrpcProtoBufferWriter writer(&bp, block_size, 8192); + GrpcProtoBufferWriterPeer peer(&writer); void* data; int size; @@ -63,17 +74,14 @@ TEST_F(ProtoUtilsTest, TinyBackupThenNext) { ASSERT_TRUE(writer.Next(&data, &size)); EXPECT_TRUE(peer.slice().refcount != nullptr); EXPECT_EQ(block_size, size); - - // Cleanup. - g_core_codegen_interface->grpc_byte_buffer_destroy(bp); } namespace { // Set backup_size to 0 to indicate no backup is needed. void BufferWriterTest(int block_size, int total_size, int backup_size) { - grpc_byte_buffer* bp; - GrpcBufferWriter writer(&bp, block_size, total_size); + ByteBuffer bb; + GrpcProtoBufferWriter writer(&bb, block_size, total_size); int written_size = 0; void* data; @@ -110,10 +118,11 @@ void BufferWriterTest(int block_size, int total_size, int backup_size) { writer.BackUp(backup_size); } } - EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size); + EXPECT_EQ(bb.Length(), (size_t)total_size); grpc_byte_buffer_reader reader; - grpc_byte_buffer_reader_init(&reader, bp); + GrpcByteBufferPeer peer(&bb); + grpc_byte_buffer_reader_init(&reader, peer.c_buffer()); int read_bytes = 0; while (read_bytes < total_size) { grpc_slice s; @@ -126,7 +135,6 @@ void BufferWriterTest(int block_size, int total_size, int backup_size) { } EXPECT_EQ(read_bytes, total_size); grpc_byte_buffer_reader_destroy(&reader); - grpc_byte_buffer_destroy(bp); } TEST(WriterTest, TinyBlockTinyBackup) { @@ -154,7 +162,7 @@ TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); } } // namespace grpc int main(int argc, char** argv) { - // Ensure the GrpcBufferWriter internals are initialized. + // Ensure the GrpcProtoBufferWriter internals are initialized. grpc::internal::GrpcLibraryInitializer init; init.summon(); grpc::GrpcLibraryCodegen lib; diff --git a/test/cpp/util/byte_buffer_test.cc b/test/cpp/util/byte_buffer_test.cc index 605ef15123..47a5b7f03a 100644 --- a/test/cpp/util/byte_buffer_test.cc +++ b/test/cpp/util/byte_buffer_test.cc @@ -16,7 +16,8 @@ * */ -#include <grpcpp/support/byte_buffer.h> +#include <grpc++/support/byte_buffer.h> +#include <grpcpp/impl/grpc_library.h> #include <cstring> #include <vector> @@ -27,6 +28,9 @@ #include <gtest/gtest.h> namespace grpc { + +static internal::GrpcLibraryInitializer g_gli_initializer; + namespace { const char* kContent1 = "hello xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; diff --git a/test/cpp/util/slice_test.cc b/test/cpp/util/slice_test.cc index 8e06062360..dc1910038f 100644 --- a/test/cpp/util/slice_test.cc +++ b/test/cpp/util/slice_test.cc @@ -16,13 +16,17 @@ * */ -#include <grpcpp/support/slice.h> +#include <grpc++/support/slice.h> +#include <grpcpp/impl/grpc_library.h> #include <grpc/grpc.h> #include <grpc/slice.h> #include <gtest/gtest.h> namespace grpc { + +static internal::GrpcLibraryInitializer g_gli_initializer; + namespace { const char* kContent = "hello xxxxxxxxxxxxxxxxxxxx world"; diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 047b27ea57..22f225ec54 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -954,6 +954,8 @@ include/grpcpp/impl/codegen/create_auth_context.h \ include/grpcpp/impl/codegen/grpc_library.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ +include/grpcpp/impl/codegen/proto_buffer_reader.h \ +include/grpcpp/impl/codegen/proto_buffer_writer.h \ include/grpcpp/impl/codegen/proto_utils.h \ include/grpcpp/impl/codegen/rpc_method.h \ include/grpcpp/impl/codegen/rpc_service_method.h \ @@ -992,6 +994,8 @@ include/grpcpp/support/async_unary_call.h \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/config.h \ +include/grpcpp/support/proto_buffer_reader.h \ +include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 3e2c192864..63d238d741 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -956,6 +956,8 @@ include/grpcpp/impl/codegen/create_auth_context.h \ include/grpcpp/impl/codegen/grpc_library.h \ include/grpcpp/impl/codegen/metadata_map.h \ include/grpcpp/impl/codegen/method_handler_impl.h \ +include/grpcpp/impl/codegen/proto_buffer_reader.h \ +include/grpcpp/impl/codegen/proto_buffer_writer.h \ include/grpcpp/impl/codegen/proto_utils.h \ include/grpcpp/impl/codegen/rpc_method.h \ include/grpcpp/impl/codegen/rpc_service_method.h \ @@ -994,6 +996,8 @@ include/grpcpp/support/async_unary_call.h \ include/grpcpp/support/byte_buffer.h \ include/grpcpp/support/channel_arguments.h \ include/grpcpp/support/config.h \ +include/grpcpp/support/proto_buffer_reader.h \ +include/grpcpp/support/proto_buffer_writer.h \ include/grpcpp/support/slice.h \ include/grpcpp/support/status.h \ include/grpcpp/support/status_code_enum.h \ @@ -1209,7 +1213,6 @@ src/cpp/server/thread_pool_interface.h \ src/cpp/thread_manager/thread_manager.cc \ src/cpp/thread_manager/thread_manager.h \ src/cpp/util/byte_buffer_cc.cc \ -src/cpp/util/slice_cc.cc \ src/cpp/util/status.cc \ src/cpp/util/string_ref.cc \ src/cpp/util/time_cc.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index c7ea43fb09..c79996f818 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -10746,6 +10746,8 @@ ], "headers": [ "include/grpc++/impl/codegen/proto_utils.h", + "include/grpcpp/impl/codegen/proto_buffer_reader.h", + "include/grpcpp/impl/codegen/proto_buffer_writer.h", "include/grpcpp/impl/codegen/proto_utils.h" ], "is_filegroup": true, @@ -10753,6 +10755,8 @@ "name": "grpc++_codegen_proto", "src": [ "include/grpc++/impl/codegen/proto_utils.h", + "include/grpcpp/impl/codegen/proto_buffer_reader.h", + "include/grpcpp/impl/codegen/proto_buffer_writer.h", "include/grpcpp/impl/codegen/proto_utils.h" ], "third_party": false, @@ -10851,6 +10855,8 @@ "include/grpcpp/support/byte_buffer.h", "include/grpcpp/support/channel_arguments.h", "include/grpcpp/support/config.h", + "include/grpcpp/support/proto_buffer_reader.h", + "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/slice.h", "include/grpcpp/support/status.h", "include/grpcpp/support/status_code_enum.h", @@ -10953,6 +10959,8 @@ "include/grpcpp/support/byte_buffer.h", "include/grpcpp/support/channel_arguments.h", "include/grpcpp/support/config.h", + "include/grpcpp/support/proto_buffer_reader.h", + "include/grpcpp/support/proto_buffer_writer.h", "include/grpcpp/support/slice.h", "include/grpcpp/support/status.h", "include/grpcpp/support/status_code_enum.h", @@ -10997,7 +11005,6 @@ "src/cpp/thread_manager/thread_manager.cc", "src/cpp/thread_manager/thread_manager.h", "src/cpp/util/byte_buffer_cc.cc", - "src/cpp/util/slice_cc.cc", "src/cpp/util/status.cc", "src/cpp/util/string_ref.cc", "src/cpp/util/time_cc.cc" diff --git a/tools/run_tests/python_utils/upload_test_results.py b/tools/run_tests/python_utils/upload_test_results.py index a2dd1c66cf..09dcd57ad4 100644 --- a/tools/run_tests/python_utils/upload_test_results.py +++ b/tools/run_tests/python_utils/upload_test_results.py @@ -74,7 +74,7 @@ def _get_build_metadata(test_results): build_id = os.getenv('BUILD_ID') or os.getenv('KOKORO_BUILD_NUMBER') build_url = os.getenv('BUILD_URL') if os.getenv('KOKORO_BUILD_ID'): - build_url = 'https://sponge.corp.google.com/invocation?id=%s' % os.getenv( + build_url = 'https://source.cloud.google.com/results/invocations/%s' % os.getenv( 'KOKORO_BUILD_ID') job_name = os.getenv('JOB_BASE_NAME') or os.getenv('KOKORO_JOB_NAME') |