diff options
Diffstat (limited to 'include/grpc++/impl')
-rw-r--r-- | include/grpc++/impl/codegen/client_unary_call.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/completion_queue.h | 40 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen.h | 6 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen_interface.h | 4 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/server_context.h | 3 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 14 |
6 files changed, 58 insertions, 11 deletions
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 201e52ae07..a5a4f3d739 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -52,7 +52,7 @@ template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result) { - CompletionQueue cq; + CompletionQueue cq(true); // Pluckable completion queue Call call(channel->CreateCall(method, context, &cq)); CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 03cecdc21c..61617f2bdc 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -102,10 +102,7 @@ class CompletionQueue : private GrpcLibraryCodegen { public: /// Default constructor. Implicitly creates a \a grpc_completion_queue /// instance. - CompletionQueue() { - cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); - InitialAvalanching(); // reserve this for the future shutdown - } + CompletionQueue() : CompletionQueue(false) {} /// Wrap \a take, taking ownership of the instance. /// @@ -218,6 +215,18 @@ class CompletionQueue : private GrpcLibraryCodegen { const InputMessage& request, OutputMessage* result); + /// Private constructor of CompletionQueue only visible to friend classes + CompletionQueue(bool is_pluck) { + if (is_pluck) { + cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_pluck( + nullptr); + } else { + cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_next( + nullptr); + } + InitialAvalanching(); // reserve this for the future shutdown + } + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. @@ -237,6 +246,12 @@ class CompletionQueue : private GrpcLibraryCodegen { /// Performs a single polling pluck on \a tag. /// \warning Must not be mixed with calls to \a Next. + /// + /// TODO: sreek - This calls tag->FinalizeResult() even if the cq_ is already + /// shutdown. This is most likely a bug and if it is a bug, then change this + /// implementation to simple call the other TryPluck function with a zero + /// timeout. i.e: + /// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME)) void TryPluck(CompletionQueueTag* tag) { auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME); auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( @@ -248,6 +263,23 @@ class CompletionQueue : private GrpcLibraryCodegen { GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); } + /// Performs a single polling pluck on \a tag. Calls tag->FinalizeResult if + /// the pluck() was successful and returned the tag. + /// + /// This exects tag->FinalizeResult (if called) to return 'false' i.e expects + /// that the tag is internal not something that is returned to the user. + void TryPluck(CompletionQueueTag* tag, gpr_timespec deadline) { + auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) { + return; + } + + bool ok = ev.success != 0; + void* ignored = tag; + GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); + } + grpc_completion_queue* cq_; // owned gpr_atm avalanches_in_flight_; diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 6bf95129b8..65151590b2 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -38,6 +38,7 @@ #include <grpc++/impl/codegen/core_codegen_interface.h> #include <grpc/byte_buffer.h> +#include <grpc/grpc.h> #include <grpc/impl/codegen/grpc_types.h> namespace grpc { @@ -45,7 +46,10 @@ namespace grpc { /// Implementation of the core codegen interface. class CoreCodegen : public CoreCodegenInterface { private: - grpc_completion_queue* grpc_completion_queue_create(void* reserved) override; + grpc_completion_queue* grpc_completion_queue_create_for_next( + void* reserved) override; + grpc_completion_queue* grpc_completion_queue_create_for_pluck( + void* reserved) override; void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index e111d59364..529bef687b 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -59,7 +59,9 @@ class CoreCodegenInterface { virtual void assert_fail(const char* failed_assertion, const char* file, int line) = 0; - virtual grpc_completion_queue* grpc_completion_queue_create( + virtual grpc_completion_queue* grpc_completion_queue_create_for_next( + void* reserved) = 0; + virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck( void* reserved) = 0; virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0; virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index 91f0be06e7..ada304d571 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -40,6 +40,7 @@ #include <grpc/impl/codegen/compression_types.h> +#include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/codegen/config.h> #include <grpc++/impl/codegen/create_auth_context.h> #include <grpc++/impl/codegen/metadata_map.h> @@ -211,6 +212,8 @@ class ServerContext { class CompletionOp; void BeginCompletionOp(Call* call); + // Return the tag queued by BeginCompletionOp() + CompletionQueueTag* GetCompletionOpTag(); ServerContext(gpr_timespec deadline, grpc_metadata_array* arr); diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index ae3b8e441d..328d5cb1e8 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -155,7 +155,9 @@ class ClientReader final : public ClientReaderInterface<R> { template <class W> ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(true), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops; @@ -227,7 +229,9 @@ class ClientWriter : public ClientWriterInterface<W> { template <class R> ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(true), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -325,7 +329,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Blocking create a stream. ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(true), // Pluckable cq + call_(channel->CreateCall(method, context, &cq_)) { if (!context_->initial_metadata_corked_) { CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(context->send_initial_metadata_, @@ -562,7 +568,7 @@ class ServerReaderWriterBody final { Call* const call_; ServerContext* const ctx_; }; -} +} // namespace internal // class to represent the user API for a bidirectional streaming call template <class W, class R> |