diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/grpc++/impl/codegen/client_unary_call.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/completion_queue.h | 23 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen.h | 6 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen_interface.h | 5 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 14 | ||||
-rw-r--r-- | include/grpc/grpc.h | 69 | ||||
-rw-r--r-- | include/grpc/impl/codegen/grpc_types.h | 7 |
7 files changed, 110 insertions, 16 deletions
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 201e52ae07..6218e3ed1b 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -52,7 +52,7 @@ template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result) { - CompletionQueue cq; + CompletionQueue cq(true); // Pluckable completion queue Call call(channel->CreateCall(method, context, &cq)); CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 03cecdc21c..0130e9ca0f 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -52,6 +52,7 @@ #include <grpc++/impl/codegen/grpc_library.h> #include <grpc++/impl/codegen/status.h> #include <grpc++/impl/codegen/time.h> +#include <grpc/grpc.h> #include <grpc/impl/codegen/atm.h> struct grpc_completion_queue; @@ -102,10 +103,7 @@ class CompletionQueue : private GrpcLibraryCodegen { public: /// Default constructor. Implicitly creates a \a grpc_completion_queue /// instance. - CompletionQueue() { - cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); - InitialAvalanching(); // reserve this for the future shutdown - } + CompletionQueue() : CompletionQueue(false) {} /// Wrap \a take, taking ownership of the instance. /// @@ -149,8 +147,9 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// \return true if read a regular event, false if the queue is shutting down. bool Next(void** tag, bool* ok) { - return (AsyncNextInternal(tag, ok, g_core_codegen_interface->gpr_inf_future( - GPR_CLOCK_REALTIME)) != SHUTDOWN); + return (AsyncNextInternal(tag, ok, + g_core_codegen_interface->gpr_inf_future( + GPR_CLOCK_REALTIME)) != SHUTDOWN); } /// Request the shutdown of the queue. @@ -218,6 +217,18 @@ class CompletionQueue : private GrpcLibraryCodegen { const InputMessage& request, OutputMessage* result); + /// Private constructor of CompletionQueue only visible to friend classes + CompletionQueue(bool is_pluck) { + if (is_pluck) { + cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_pluck( + nullptr); + } else { + cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_next( + nullptr); + } + InitialAvalanching(); // reserve this for the future shutdown + } + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 754bf14b25..af7abdf898 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -38,6 +38,7 @@ #include <grpc++/impl/codegen/core_codegen_interface.h> #include <grpc/byte_buffer.h> +#include <grpc/grpc.h> #include <grpc/impl/codegen/grpc_types.h> namespace grpc { @@ -45,7 +46,10 @@ namespace grpc { /// Implementation of the core codegen interface. class CoreCodegen : public CoreCodegenInterface { private: - grpc_completion_queue* grpc_completion_queue_create(void* reserved) override; + grpc_completion_queue* grpc_completion_queue_create_for_next( + void* reserved) override; + grpc_completion_queue* grpc_completion_queue_create_for_pluck( + void* reserved) override; void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index 45ea040303..fd4767a80a 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -36,6 +36,7 @@ #include <grpc++/impl/codegen/config.h> #include <grpc++/impl/codegen/status.h> +#include <grpc/grpc.h> #include <grpc/impl/codegen/byte_buffer_reader.h> #include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/sync.h> @@ -59,7 +60,9 @@ class CoreCodegenInterface { virtual void assert_fail(const char* failed_assertion, const char* file, int line) = 0; - virtual grpc_completion_queue* grpc_completion_queue_create( + virtual grpc_completion_queue* grpc_completion_queue_create_for_next( + void* reserved) = 0; + virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck( void* reserved) = 0; virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0; virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 4d9b074e95..4e12f3261e 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -137,7 +137,9 @@ class ClientReader final : public ClientReaderInterface<R> { template <class W> ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(true), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops; @@ -209,7 +211,9 @@ class ClientWriter : public ClientWriterInterface<W> { template <class R> ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(true), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -292,7 +296,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Blocking create a stream. ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(true), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); @@ -506,7 +512,7 @@ class ServerReaderWriterBody final { Call* const call_; ServerContext* const ctx_; }; -} +} // namespace internal // class to represent the user API for a bidirectional streaming call template <class W, class R> diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 1b33d48c02..42cf201da4 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -93,8 +93,75 @@ GRPCAPI const char *grpc_version_string(void); /** Return a string specifying what the 'g' in gRPC stands for */ GRPCAPI const char *grpc_g_stands_for(void); +/** Specifies the type of APIs to use to pop events from the completion queue */ +typedef enum { + /** Events are popped out by calling grpc_completion_queue_next() API ONLY */ + GRPC_CQ_NEXT = 1, + + /** Events are popped out by calling grpc_completion_queue_pluck() API ONLY*/ + GRPC_CQ_PLUCK +} grpc_cq_completion_type; + +/** Completion queues internally MAY maintain a set of file descriptors in a + structure called 'pollset'. This enum specifies if a completion queue has an + associated pollset and any restrictions on the type of file descriptors that + can be present in the pollset. + + I/O progress can only be made when grpc_completion_queue_next() or + grpc_completion_queue_pluck() are called on the completion queue (unless the + grpc_cq_polling_type is GRPC_CQ_NON_POLLING) and hence it is very important + to actively call these APIs */ +typedef enum { + /** The completion queue will have an associated pollset and there is no + restriction on the type of file descriptors the pollset may contain */ + GRPC_CQ_DEFAULT_POLLING, + + /** Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will + not contain any 'listening file descriptors' (i.e file descriptors used to + listen to incoming channels) */ + GRPC_CQ_NON_LISTENING, + + /** The completion queue will not have an associated pollset. Note that + grpc_completion_queue_next() or grpc_completion_queue_pluck() MUST still + be called to pop events from the completion queue; it is not required to + call them actively to make I/O progress */ + GRPC_CQ_NON_POLLING +} grpc_cq_polling_type; + +#define GRPC_CQ_CURRENT_VERSION 1 +typedef struct grpc_completion_queue_attributes { + /* The version number of this structure. More fields might be added to this + structure in future. */ + int version; /* Set to GRPC_CQ_CURRENT_VERSION */ + + grpc_cq_completion_type cq_completion_type; + + grpc_cq_polling_type cq_polling_type; +} grpc_completion_queue_attributes; + +/** The completion queue factory structure is opaque to the callers of grpc */ +typedef struct grpc_completion_queue_factory grpc_completion_queue_factory; + +/** Returns the completion queue factory based on the attributes. MAY return a + NULL if no factory can be found */ +GRPCAPI const grpc_completion_queue_factory * +grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes *attributes); + +/** Helper function to create a completion queue with grpc_cq_completion_type + of GRPC_CQ_NEXT and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING */ +GRPCAPI grpc_completion_queue *grpc_completion_queue_create_for_next( + void *reserved); + +/** Helper function to create a completion queue with grpc_cq_completion_type + of GRPC_CQ_PLUCK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING */ +GRPCAPI grpc_completion_queue *grpc_completion_queue_create_for_pluck( + void *reserved); + /** Create a completion queue */ -GRPCAPI grpc_completion_queue *grpc_completion_queue_create(void *reserved); +GRPCAPI grpc_completion_queue *grpc_completion_queue_create( + const grpc_completion_queue_factory *factory, + const grpc_completion_queue_attributes *attributes, void *reserved); /** Blocks until an event is available, the completion queue is being shut down, or deadline is reached. diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 887c176f1a..e2332a78cd 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -357,8 +357,11 @@ typedef enum grpc_completion_type { typedef struct grpc_event { /** The type of the completion. */ grpc_completion_type type; - /** non-zero if the operation was successful, 0 upon failure. - Only GRPC_OP_COMPLETE can succeed or fail. */ + /** If the grpc_completion_type is GRPC_OP_COMPLETE, this field indicates + whether the operation was successful or not; 0 in case of failure and + non-zero in case of success. + If grpc_completion_type is GRPC_QUEUE_SHUTDOWN or GRPC_QUEUE_TIMEOUT, this + field is guaranteed to be 0 */ int success; /** The tag passed to grpc_call_start_batch etc to start this operation. Only GRPC_OP_COMPLETE has a tag. */ |