diff options
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/impl/channel_argument_option.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/async_unary_call.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/client_unary_call.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/completion_queue.h | 5 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/config_protobuf.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen_interface.h | 8 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/metadata_map.h | 38 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/proto_utils.h | 112 | ||||
-rw-r--r-- | include/grpc++/server_builder.h | 5 |
10 files changed, 113 insertions, 69 deletions
diff --git a/include/grpc++/impl/channel_argument_option.h b/include/grpc++/impl/channel_argument_option.h index e918f57e92..f157ec1d7e 100644 --- a/include/grpc++/impl/channel_argument_option.h +++ b/include/grpc++/impl/channel_argument_option.h @@ -28,9 +28,9 @@ namespace grpc { std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( - const grpc::string &name, const grpc::string &value); + const grpc::string& name, const grpc::string& value); std::unique_ptr<ServerBuilderOption> MakeChannelArgumentOption( - const grpc::string &name, int value); + const grpc::string& name, int value); } // namespace grpc diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h index 6d51c78d5c..b9ea5fd19c 100644 --- a/include/grpc++/impl/codegen/async_unary_call.h +++ b/include/grpc++/impl/codegen/async_unary_call.h @@ -297,6 +297,6 @@ class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> { public: void operator()(void* p) {} }; -} +} // namespace std #endif // GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 170c562cf3..256dd859d3 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -41,7 +41,7 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, return BlockingUnaryCallImpl<InputMessage, OutputMessage>( channel, method, context, request, result) .status(); -}; +} template <class InputMessage, class OutputMessage> class BlockingUnaryCallImpl { diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 14de30a62e..11cc588879 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -164,8 +164,9 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// \return true if read a regular event, false if the queue is shutting down. bool Next(void** tag, bool* ok) { - return (AsyncNextInternal(tag, ok, g_core_codegen_interface->gpr_inf_future( - GPR_CLOCK_REALTIME)) != SHUTDOWN); + return (AsyncNextInternal(tag, ok, + g_core_codegen_interface->gpr_inf_future( + GPR_CLOCK_REALTIME)) != SHUTDOWN); } /// Request the shutdown of the queue. diff --git a/include/grpc++/impl/codegen/config_protobuf.h b/include/grpc++/impl/codegen/config_protobuf.h index c5e5bdf0db..7387fa25c6 100644 --- a/include/grpc++/impl/codegen/config_protobuf.h +++ b/include/grpc++/impl/codegen/config_protobuf.h @@ -19,6 +19,8 @@ #ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H +#define GRPC_OPEN_SOURCE_PROTO + #ifndef GRPC_CUSTOM_PROTOBUF_INT64 #include <google/protobuf/stubs/common.h> #define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index c751c1e734..d7c57bebb9 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -50,6 +50,9 @@ class CoreCodegen final : public CoreCodegenInterface { void* gpr_malloc(size_t size) override; void gpr_free(void* p) override; + void grpc_init() override; + void grpc_shutdown() override; + void gpr_mu_init(gpr_mu* mu) override; void gpr_mu_destroy(gpr_mu* mu) override; void gpr_mu_lock(gpr_mu* mu) override; @@ -89,6 +92,7 @@ class CoreCodegen final : public CoreCodegenInterface { grpc_slice grpc_slice_ref(grpc_slice slice) override; grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override; grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) override; + grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end) override; void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override; void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override; grpc_slice grpc_slice_from_static_buffer(const void* buffer, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index a4c50dab87..1949cdab76 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -63,6 +63,13 @@ class CoreCodegenInterface { virtual void* gpr_malloc(size_t size) = 0; virtual void gpr_free(void* p) = 0; + // These are only to be used to fix edge cases involving grpc_init and + // grpc_shutdown. Calling grpc_init from the codegen interface before + // the real grpc_init is called will cause a crash, so if you use this + // function, ensure that it is not the first call to grpc_init. + virtual void grpc_init() = 0; + virtual void grpc_shutdown() = 0; + virtual void gpr_mu_init(gpr_mu* mu) = 0; virtual void gpr_mu_destroy(gpr_mu* mu) = 0; virtual void gpr_mu_lock(gpr_mu* mu) = 0; @@ -103,6 +110,7 @@ class CoreCodegenInterface { virtual grpc_slice grpc_slice_ref(grpc_slice slice) = 0; virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0; virtual grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) = 0; + virtual grpc_slice grpc_slice_sub(grpc_slice s, size_t begin, size_t end) = 0; virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) = 0; virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0; diff --git a/include/grpc++/impl/codegen/metadata_map.h b/include/grpc++/impl/codegen/metadata_map.h index fd4750efdd..8dc7211ba8 100644 --- a/include/grpc++/impl/codegen/metadata_map.h +++ b/include/grpc++/impl/codegen/metadata_map.h @@ -1,20 +1,20 @@ /* -* -* Copyright 2015 gRPC authors. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -*/ + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ #ifndef GRPCXX_IMPL_CODEGEN_METADATA_MAP_H #define GRPCXX_IMPL_CODEGEN_METADATA_MAP_H @@ -41,11 +41,11 @@ class MetadataMap { } } - std::multimap<grpc::string_ref, grpc::string_ref> *map() { return &map_; } - const std::multimap<grpc::string_ref, grpc::string_ref> *map() const { + std::multimap<grpc::string_ref, grpc::string_ref>* map() { return &map_; } + const std::multimap<grpc::string_ref, grpc::string_ref>* map() const { return &map_; } - grpc_metadata_array *arr() { return &arr_; } + grpc_metadata_array* arr() { return &arr_; } private: grpc_metadata_array arr_; diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 67e8f71a89..0f7e115c9a 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -39,8 +39,7 @@ class GrpcBufferWriterPeer; const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; -class GrpcBufferWriter final - : public ::grpc::protobuf::io::ZeroCopyOutputStream { +class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) : block_size_(block_size), byte_count_(0), have_backup_(false) { @@ -88,7 +87,7 @@ class GrpcBufferWriter final grpc::protobuf::int64 ByteCount() const override { return byte_count_; } - private: + protected: friend class GrpcBufferWriterPeer; const int block_size_; int64_t byte_count_; @@ -98,8 +97,7 @@ class GrpcBufferWriter final grpc_slice slice_; }; -class GrpcBufferReader final - : public ::grpc::protobuf::io::ZeroCopyInputStream { +class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { public: explicit GrpcBufferReader(grpc_byte_buffer* buffer) : byte_count_(0), backup_count_(0), status_() { @@ -160,7 +158,7 @@ class GrpcBufferReader final return byte_count_ - backup_count_; } - private: + protected: int64_t byte_count_; int64_t backup_count_; grpc_byte_buffer_reader reader_; @@ -168,57 +166,85 @@ class GrpcBufferReader final Status status_; }; +// BufferWriter must be a subclass of io::ZeroCopyOutputStream. +template <class BufferWriter, class T> +Status GenericSerialize(const grpc::protobuf::Message& msg, + grpc_byte_buffer** bp, bool* own_buffer) { + static_assert( + std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value, + "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); + *own_buffer = true; + int byte_size = msg.ByteSize(); + if (byte_size <= kGrpcBufferWriterMaxBufferLength) { + grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); + GPR_CODEGEN_ASSERT( + GRPC_SLICE_END_PTR(slice) == + msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); + *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); + g_core_codegen_interface->grpc_slice_unref(slice); + return g_core_codegen_interface->ok(); + } else { + BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength); + return msg.SerializeToZeroCopyStream(&writer) + ? g_core_codegen_interface->ok() + : Status(StatusCode::INTERNAL, "Failed to serialize message"); + } +} + +// BufferReader must be a subclass of io::ZeroCopyInputStream. +template <class BufferReader, class T> +Status GenericDeserialize(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg) { + static_assert( + std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value, + "BufferReader must be a subclass of io::ZeroCopyInputStream"); + if (buffer == nullptr) { + return Status(StatusCode::INTERNAL, "No payload"); + } + Status result = g_core_codegen_interface->ok(); + { + BufferReader reader(buffer); + if (!reader.status().ok()) { + return reader.status(); + } + ::grpc::protobuf::io::CodedInputStream decoder(&reader); + decoder.SetTotalBytesLimit(INT_MAX, INT_MAX); + if (!msg->ParseFromCodedStream(&decoder)) { + result = Status(StatusCode::INTERNAL, msg->InitializationErrorString()); + } + if (!decoder.ConsumedEntireMessage()) { + result = Status(StatusCode::INTERNAL, "Did not read entire message"); + } + } + g_core_codegen_interface->grpc_byte_buffer_destroy(buffer); + return result; +} + } // namespace internal +// this is needed so the following class does not conflict with protobuf +// serializers that utilize internal-only tools. +#ifdef GRPC_OPEN_SOURCE_PROTO +// This class provides a protobuf serializer. It translates between protobuf +// objects and grpc_byte_buffers. More information about SerializationTraits can +// be found in include/grpc++/impl/codegen/serialization_traits.h. template <class T> class SerializationTraits<T, typename std::enable_if<std::is_base_of< grpc::protobuf::Message, T>::value>::type> { public: static Status Serialize(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp, bool* own_buffer) { - *own_buffer = true; - int byte_size = msg.ByteSize(); - if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) { - grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); - GPR_CODEGEN_ASSERT( - GRPC_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); - g_core_codegen_interface->grpc_slice_unref(slice); - return g_core_codegen_interface->ok(); - } else { - internal::GrpcBufferWriter writer( - bp, internal::kGrpcBufferWriterMaxBufferLength); - return msg.SerializeToZeroCopyStream(&writer) - ? g_core_codegen_interface->ok() - : Status(StatusCode::INTERNAL, "Failed to serialize message"); - } + return internal::GenericSerialize<internal::GrpcBufferWriter, T>( + msg, bp, own_buffer); } static Status Deserialize(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg) { - if (buffer == nullptr) { - return Status(StatusCode::INTERNAL, "No payload"); - } - Status result = g_core_codegen_interface->ok(); - { - internal::GrpcBufferReader reader(buffer); - if (!reader.status().ok()) { - return reader.status(); - } - ::grpc::protobuf::io::CodedInputStream decoder(&reader); - decoder.SetTotalBytesLimit(INT_MAX, INT_MAX); - if (!msg->ParseFromCodedStream(&decoder)) { - result = Status(StatusCode::INTERNAL, msg->InitializationErrorString()); - } - if (!decoder.ConsumedEntireMessage()) { - result = Status(StatusCode::INTERNAL, "Did not read entire message"); - } - } - g_core_codegen_interface->grpc_byte_buffer_destroy(buffer); - return result; + return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer, + msg); } }; +#endif } // namespace grpc diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 0888bef0d9..bf842baf6f 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -202,7 +202,10 @@ class ServerBuilder { struct SyncServerSettings { SyncServerSettings() - : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} + : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())), + min_pollers(1), + max_pollers(2), + cq_timeout_msec(10000) {} /// Number of server completion queues to create to listen to incoming RPCs. int num_cqs; |