diff options
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/impl/codegen/client_unary_call.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/completion_queue.h | 36 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen.h | 9 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen_interface.h | 6 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/proto_utils.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 12 | ||||
-rw-r--r-- | include/grpc++/test/mock_stream.h | 163 |
7 files changed, 209 insertions, 23 deletions
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index a5a4f3d739..4bf35ae778 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -52,7 +52,9 @@ template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result) { - CompletionQueue cq(true); // Pluckable completion queue + CompletionQueue cq(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}); // Pluckable completion queue Call call(channel->CreateCall(method, context, &cq)); CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 61617f2bdc..c8ab726b0f 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -102,7 +102,9 @@ class CompletionQueue : private GrpcLibraryCodegen { public: /// Default constructor. Implicitly creates a \a grpc_completion_queue /// instance. - CompletionQueue() : CompletionQueue(false) {} + CompletionQueue() + : CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING}) {} /// Wrap \a take, taking ownership of the instance. /// @@ -182,6 +184,16 @@ class CompletionQueue : private GrpcLibraryCodegen { }; void CompleteAvalanching(); + protected: + /// Private constructor of CompletionQueue only visible to friend classes + CompletionQueue(const grpc_completion_queue_attributes& attributes) { + cq_ = g_core_codegen_interface->grpc_completion_queue_create( + g_core_codegen_interface->grpc_completion_queue_factory_lookup( + &attributes), + &attributes, NULL); + InitialAvalanching(); // reserve this for the future shutdown + } + private: // Friend synchronous wrappers so that they can access Pluck(), which is // a semi-private API geared towards the synchronous implementation. @@ -215,18 +227,6 @@ class CompletionQueue : private GrpcLibraryCodegen { const InputMessage& request, OutputMessage* result); - /// Private constructor of CompletionQueue only visible to friend classes - CompletionQueue(bool is_pluck) { - if (is_pluck) { - cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_pluck( - nullptr); - } else { - cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_next( - nullptr); - } - InitialAvalanching(); // reserve this for the future shutdown - } - NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. @@ -289,17 +289,19 @@ class CompletionQueue : private GrpcLibraryCodegen { /// by servers. Instantiated by \a ServerBuilder. class ServerCompletionQueue : public CompletionQueue { public: - bool IsFrequentlyPolled() { return is_frequently_polled_; } + bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; } private: - bool is_frequently_polled_; + grpc_cq_polling_type polling_type_; friend class ServerBuilder; /// \param is_frequently_polled Informs the GRPC library about whether the /// server completion queue would be actively polled (by calling Next() or /// AsyncNext()). By default all server completion queues are assumed to be /// frequently polled. - ServerCompletionQueue(bool is_frequently_polled = true) - : is_frequently_polled_(is_frequently_polled) {} + ServerCompletionQueue(grpc_cq_polling_type polling_type) + : CompletionQueue(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type}), + polling_type_(polling_type) {} }; } // namespace grpc diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 86601076bd..a1593729eb 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -44,8 +44,15 @@ namespace grpc { /// Implementation of the core codegen interface. -class CoreCodegen : public CoreCodegenInterface { +class CoreCodegen final : public CoreCodegenInterface { private: + virtual const grpc_completion_queue_factory* + grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) override; + virtual grpc_completion_queue* grpc_completion_queue_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attributes, + void* reserved) override; grpc_completion_queue* grpc_completion_queue_create_for_next( void* reserved) override; grpc_completion_queue* grpc_completion_queue_create_for_pluck( diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index a0665f4e06..7cc3a82476 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -59,6 +59,12 @@ class CoreCodegenInterface { virtual void assert_fail(const char* failed_assertion, const char* file, int line) = 0; + virtual const grpc_completion_queue_factory* + grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) = 0; + virtual grpc_completion_queue* grpc_completion_queue_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attributes, void* reserved) = 0; virtual grpc_completion_queue* grpc_completion_queue_create_for_next( void* reserved) = 0; virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck( diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 6df9de4fd2..8c0c32bfc5 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -52,7 +52,7 @@ namespace internal { class GrpcBufferWriterPeer; -const int kGrpcBufferWriterMaxBufferLength = 8192; +const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; class GrpcBufferWriter final : public ::grpc::protobuf::io::ZeroCopyOutputStream { diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 328d5cb1e8..a010924cef 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -156,7 +156,9 @@ class ClientReader final : public ClientReaderInterface<R> { ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) : context_(context), - cq_(true), // Pluckable cq + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> @@ -230,7 +232,9 @@ class ClientWriter : public ClientWriterInterface<W> { ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) : context_(context), - cq_(true), // Pluckable cq + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -330,7 +334,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) : context_(context), - cq_(true), // Pluckable cq + cq_(grpc_completion_queue_attributes{ + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, + GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq call_(channel->CreateCall(method, context, &cq_)) { if (!context_->initial_metadata_corked_) { CallOpSet<CallOpSendInitialMetadata> ops; diff --git a/include/grpc++/test/mock_stream.h b/include/grpc++/test/mock_stream.h new file mode 100644 index 0000000000..f2de9472d6 --- /dev/null +++ b/include/grpc++/test/mock_stream.h @@ -0,0 +1,163 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_TEST_MOCK_STREAM_H +#define GRPCXX_TEST_MOCK_STREAM_H + +#include <stdint.h> + +#include <gmock/gmock.h> +#include <grpc++/impl/codegen/call.h> +#include <grpc++/support/async_stream.h> +#include <grpc++/support/async_unary_call.h> +#include <grpc++/support/sync_stream.h> + +namespace grpc { +namespace testing { + +template <class R> +class MockClientReader : public ClientReaderInterface<R> { + public: + MockClientReader() = default; + + // ClientStreamingInterface + MOCK_METHOD0_T(Finish, Status()); + + // ReaderInterface + MOCK_METHOD1_T(NextMessageSize, bool(uint32_t*)); + MOCK_METHOD1_T(Read, bool(R*)); + + // ClientReaderInterface + MOCK_METHOD0_T(WaitForInitialMetadata, void()); +}; + +template <class W> +class MockClientWriter : public ClientWriterInterface<W> { + public: + MockClientWriter() = default; + + // ClientStreamingInterface + MOCK_METHOD0_T(Finish, Status()); + + // WriterInterface + MOCK_METHOD2_T(Write, bool(const W&, const WriteOptions)); + + // ClientWriterInterface + MOCK_METHOD0_T(WritesDone, bool()); +}; + +template <class W, class R> +class MockClientReaderWriter : public ClientReaderWriterInterface<W, R> { + public: + MockClientReaderWriter() = default; + + // ClientStreamingInterface + MOCK_METHOD0_T(Finish, Status()); + + // ReaderInterface + MOCK_METHOD1_T(NextMessageSize, bool(uint32_t*)); + MOCK_METHOD1_T(Read, bool(R*)); + + // WriterInterface + MOCK_METHOD2_T(Write, bool(const W&, const WriteOptions)); + + // ClientReaderWriterInterface + MOCK_METHOD0_T(WaitForInitialMetadata, void()); + MOCK_METHOD0_T(WritesDone, bool()); +}; + +// TODO: We do not support mocking an async RPC for now. + +template <class R> +class MockClientAsyncResponseReader + : public ClientAsyncResponseReaderInterface<R> { + public: + MockClientAsyncResponseReader() = default; + + MOCK_METHOD1_T(ReadInitialMetadata, void(void*)); + MOCK_METHOD3_T(Finish, void(R*, Status*, void*)); +}; + +template <class R> +class MockClientAsyncReader : public ClientAsyncReaderInterface<R> { + public: + MockClientAsyncReader() = default; + + // ClientAsyncStreamingInterface + MOCK_METHOD1_T(ReadInitialMetadata, void(void*)); + MOCK_METHOD2_T(Finish, void(Status*, void*)); + + // AsyncReaderInterface + MOCK_METHOD2_T(Read, void(R*, void*)); +}; + +template <class W> +class MockClientAsyncWriter : public ClientAsyncWriterInterface<W> { + public: + MockClientAsyncWriter() = default; + + // ClientAsyncStreamingInterface + MOCK_METHOD1_T(ReadInitialMetadata, void(void*)); + MOCK_METHOD2_T(Finish, void(Status*, void*)); + + // AsyncWriterInterface + MOCK_METHOD2_T(Write, void(const W&, void*)); + + // ClientAsyncWriterInterface + MOCK_METHOD1_T(WritesDone, void(void*)); +}; + +template <class W, class R> +class MockClientAsyncReaderWriter + : public ClientAsyncReaderWriterInterface<W, R> { + public: + MockClientAsyncReaderWriter() = default; + + // ClientAsyncStreamingInterface + MOCK_METHOD1_T(ReadInitialMetadata, void(void*)); + MOCK_METHOD2_T(Finish, void(Status*, void*)); + + // AsyncWriterInterface + MOCK_METHOD2_T(Write, void(const W&, void*)); + + // AsyncReaderInterface + MOCK_METHOD2_T(Read, void(R*, void*)); + + // ClientAsyncReaderWriterInterface + MOCK_METHOD1_T(WritesDone, void(void*)); +}; + +} // namespace testing +} // namespace grpc + +#endif // GRPCXX_TEST_MOCK_STREAM_H |