aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/surface/completion_queue.c48
-rw-r--r--src/core/lib/surface/completion_queue.h6
-rw-r--r--src/core/lib/surface/completion_queue_factory.c33
-rw-r--r--src/core/lib/surface/server.c9
-rw-r--r--src/core/lib/surface/version.c2
-rw-r--r--src/cpp/common/core_codegen.cc9
-rw-r--r--src/cpp/server/server_cc.cc21
-rw-r--r--src/cpp/server/server_context.cc4
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs10
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs9
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c9
-rw-r--r--src/node/ext/completion_queue_threadpool.cc18
-rw-r--r--src/node/ext/completion_queue_uv.cc14
-rw-r--r--src/node/ext/server_generic.cc4
-rw-r--r--src/objective-c/GRPCClient/private/GRPCCompletionQueue.m2
-rw-r--r--src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m3
-rw-r--r--src/objective-c/tests/CronetUnitTests/CronetUnitTests.m4
-rw-r--r--src/php/ext/grpc/completion_queue.c5
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi3
-rw-r--r--src/ruby/ext/grpc/rb_channel.c4
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h2
-rw-r--r--src/ruby/ext/grpc/rb_server.c56
27 files changed, 197 insertions, 102 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3273addf1d..35e9f7eb30 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -64,6 +64,10 @@ typedef struct {
struct grpc_completion_queue {
/** owned by pollset */
gpr_mu *mu;
+
+ grpc_cq_completion_type completion_type;
+ grpc_cq_polling_type polling_type;
+
/** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
@@ -79,6 +83,7 @@ struct grpc_completion_queue {
int shutdown_called;
int is_server_cq;
/** Can the server cq accept incoming channels */
+ /* TODO: sreek - This will no longer be needed. Use polling_type set */
int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
@@ -110,13 +115,17 @@ int grpc_cq_event_timeout_trace;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error);
-grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
+grpc_completion_queue *grpc_completion_queue_create_internal(
+ grpc_cq_completion_type completion_type,
+ grpc_cq_polling_type polling_type) {
grpc_completion_queue *cc;
- GPR_ASSERT(!reserved);
- GPR_TIMER_BEGIN("grpc_completion_queue_create", 0);
+ GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
- GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
+ GRPC_API_TRACE(
+ "grpc_completion_queue_create_internal(completion_type=%d, "
+ "polling_type=%d)",
+ 2, (completion_type, polling_type));
cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
@@ -125,6 +134,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->outstanding_tag_capacity = 0;
#endif
+ cc->completion_type = completion_type;
+ cc->polling_type = polling_type;
+
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
@@ -143,11 +155,19 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
grpc_schedule_on_exec_ctx);
- GPR_TIMER_END("grpc_completion_queue_create", 0);
+ GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
return cc;
}
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
+ return cc->completion_type;
+}
+
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) {
+ return cc->polling_type;
+}
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@@ -347,6 +367,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_event ret;
gpr_timespec now;
+ if (cc->completion_type != GRPC_CQ_NEXT) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_next() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_NEXT");
+ abort();
+ }
+
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
GRPC_API_TRACE(
@@ -516,6 +543,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
+ if (cc->completion_type != GRPC_CQ_PLUCK) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_pluck() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_PLUCK");
+ abort();
+ }
+
if (grpc_cq_pluck_trace) {
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
@@ -680,10 +714,14 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
}
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
+ /* TODO: sreek - use cc->polling_type field here and add a validation check
+ (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose
+ polling_type is set to GRPC_CQ_NON_LISTENING */
cc->is_non_listening_server_cq = 1;
}
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
+ /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */
return (cc->is_non_listening_server_cq == 1);
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 5d73dd7216..1ff3d64293 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -99,4 +99,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc);
+
+grpc_completion_queue *grpc_completion_queue_create_internal(
+ grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
+
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c
index db67a5192b..d68b84eddd 100644
--- a/src/core/lib/surface/completion_queue_factory.c
+++ b/src/core/lib/surface/completion_queue_factory.c
@@ -36,12 +36,15 @@
#include <grpc/support/log.h>
-/* TODO (sreek) - Currently this does not use the attributes arg. This will be
- added in a future PR */
+/*
+ * == Default completion queue factory implementation ==
+ */
+
static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
- const grpc_completion_queue_attributes* attributes) {
- return grpc_completion_queue_create(NULL);
+ const grpc_completion_queue_attributes* attr) {
+ return grpc_completion_queue_create_internal(attr->cq_completion_type,
+ attr->cq_polling_type);
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@@ -49,19 +52,24 @@ static grpc_completion_queue_factory_vtable default_vtable = {default_create};
static const grpc_completion_queue_factory g_default_cq_factory = {
"Default Factory", NULL, &default_vtable};
+/*
+ * == Completion queue factory APIs
+ */
+
const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) {
- /* As we add more fields to grpc_completion_queue_attributes, we may have to
- change this assert to:
- GPR_ASSERT (attributes->version >= 1 &&
- attributes->version <= GRPC_CQ_CURRENT_VERSION) */
- GPR_ASSERT(attributes->version == 1);
+ GPR_ASSERT(attributes->version >= 1 &&
+ attributes->version <= GRPC_CQ_CURRENT_VERSION);
/* The default factory can handle version 1 of the attributes structure. We
may have to change this as more fields are added to the structure */
return &g_default_cq_factory;
}
+/*
+ * == Completion queue creation APIs ==
+ */
+
grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT,
@@ -75,3 +83,10 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
GRPC_CQ_DEFAULT_POLLING};
return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
}
+
+grpc_completion_queue* grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attr, void* reserved) {
+ GPR_ASSERT(!reserved);
+ return factory->vtable->create(factory, attr);
+}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 191ee75252..9496f90390 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1000,6 +1000,15 @@ void grpc_server_register_completion_queue(grpc_server *server,
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
+
+ if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {
+ gpr_log(GPR_INFO,
+ "Completion queue which is not of type GRPC_CQ_NEXT is being "
+ "registered as a server-completion-queue");
+ /* Ideally we should log an error and abort but ruby-wrapped-language API
+ calls grpc_completion_queue_pluck() on server completion queues */
+ }
+
register_completion_queue(server, cq, false, reserved);
}
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index ba80bd801e..3793845559 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,6 +36,6 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "3.0.0-dev"; }
+const char *grpc_version_string(void) { return "4.0.0-dev"; }
const char *grpc_g_stands_for(void) { return "gentle"; }
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index 43dffe7a2a..0dd758ec4e 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -54,9 +54,14 @@ struct grpc_byte_buffer;
namespace grpc {
-grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next(
void* reserved) {
- return ::grpc_completion_queue_create(reserved);
+ return ::grpc_completion_queue_create_for_next(reserved);
+}
+
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_pluck(
+ void* reserved) {
+ return ::grpc_completion_queue_create_for_pluck(reserved);
}
void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) {
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index ce173a1ee2..bee359a792 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -124,6 +124,14 @@ class ShutdownTag : public CompletionQueueTag {
bool FinalizeResult(void** tag, bool* status) { return false; }
};
+class DummyTag : public CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool* status) {
+ *status = true;
+ return true;
+ }
+};
+
class Server::SyncRequest final : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@@ -145,7 +153,7 @@ class Server::SyncRequest final : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
- void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
+ void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@@ -213,10 +221,15 @@ class Server::SyncRequest final : public CompletionQueueTag {
MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
- void* ignored_tag;
- bool ignored_ok;
+
cq_.Shutdown();
- GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
+
+ CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
+ cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
+
+ /* Ensure the cq_ is shutdown */
+ DummyTag ignored_tag;
+ GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
}
private:
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 3a408eb23e..afc9873c14 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -167,6 +167,10 @@ void ServerContext::BeginCompletionOp(Call* call) {
call->PerformOps(completion_op_);
}
+CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+ return static_cast<CompletionQueueTag*>(completion_op_);
+}
+
void ServerContext::AddInitialMetadata(const grpc::string& key,
const grpc::string& value) {
initial_metadata_.insert(std::make_pair(key, value));
diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
index e9ec59eb3d..8649906bec 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
@@ -43,19 +43,19 @@ namespace Grpc.Core.Internal.Tests
public class CompletionQueueSafeHandleTest
{
[Test]
- public void CreateAndDestroy()
+ public void CreateSyncAndDestroy()
{
GrpcEnvironment.AddRef();
- var cq = CompletionQueueSafeHandle.Create();
+ var cq = CompletionQueueSafeHandle.CreateSync();
cq.Dispose();
GrpcEnvironment.ReleaseAsync().Wait();
}
[Test]
- public void CreateAndShutdown()
+ public void CreateAsyncAndShutdown()
{
- GrpcEnvironment.AddRef();
- var cq = CompletionQueueSafeHandle.Create();
+ var env = GrpcEnvironment.AddRef();
+ var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env));
cq.Shutdown();
var ev = cq.Next();
cq.Dispose();
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index d3735c7880..d760717ba6 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -53,7 +53,7 @@ namespace Grpc.Core.Tests
/// (~1.26us .NET Windows)
/// </summary>
[Test]
- public void CompletionQueueCreateDestroyBenchmark()
+ public void CompletionQueueCreateSyncDestroyBenchmark()
{
GrpcEnvironment.AddRef(); // completion queue requires gRPC environment being initialized.
@@ -61,7 +61,7 @@ namespace Grpc.Core.Tests
10, 10,
() =>
{
- CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create();
+ CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync();
cq.Dispose();
});
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 1f738a3b6f..f037b2351a 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -87,7 +87,7 @@ namespace Grpc.Core.Internal
var profiler = Profilers.ForCurrentThread();
using (profiler.NewScope("AsyncCall.UnaryCall"))
- using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
+ using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync())
{
byte[] payload = UnsafeSerialize(msg);
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 6c9a31921e..577d7044a5 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -51,14 +51,20 @@ namespace Grpc.Core.Internal
{
}
- public static CompletionQueueSafeHandle Create()
+ /// <summary>
+ /// Create a completion queue that can only be used for Pluck operations.
+ /// </summary>
+ public static CompletionQueueSafeHandle CreateSync()
{
- return Native.grpcsharp_completion_queue_create();
+ return Native.grpcsharp_completion_queue_create_sync();
}
- public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry)
+ /// <summary>
+ /// Create a completion queue that can only be used for Next operations.
+ /// </summary>
+ public static CompletionQueueSafeHandle CreateAsync(CompletionRegistry completionRegistry)
{
- var cq = Native.grpcsharp_completion_queue_create();
+ var cq = Native.grpcsharp_completion_queue_create_async();
cq.completionRegistry = completionRegistry;
return cq;
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 25a6589f11..07fea812b2 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -197,7 +197,7 @@ namespace Grpc.Core.Internal
for (int i = 0; i < completionQueueCount; i++)
{
var completionRegistry = new CompletionRegistry(environment);
- list.Add(CompletionQueueSafeHandle.Create(completionRegistry));
+ list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry));
}
return list.AsReadOnly();
}
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index dd65f05217..a98861af61 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -115,7 +115,8 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_sizeof_grpc_event_delegate grpcsharp_sizeof_grpc_event;
- public readonly Delegates.grpcsharp_completion_queue_create_delegate grpcsharp_completion_queue_create;
+ public readonly Delegates.grpcsharp_completion_queue_create_async_delegate grpcsharp_completion_queue_create_async;
+ public readonly Delegates.grpcsharp_completion_queue_create_sync_delegate grpcsharp_completion_queue_create_sync;
public readonly Delegates.grpcsharp_completion_queue_shutdown_delegate grpcsharp_completion_queue_shutdown;
public readonly Delegates.grpcsharp_completion_queue_next_delegate grpcsharp_completion_queue_next;
public readonly Delegates.grpcsharp_completion_queue_pluck_delegate grpcsharp_completion_queue_pluck;
@@ -229,7 +230,8 @@ namespace Grpc.Core.Internal
this.grpcsharp_sizeof_grpc_event = GetMethodDelegate<Delegates.grpcsharp_sizeof_grpc_event_delegate>(library);
- this.grpcsharp_completion_queue_create = GetMethodDelegate<Delegates.grpcsharp_completion_queue_create_delegate>(library);
+ this.grpcsharp_completion_queue_create_async = GetMethodDelegate<Delegates.grpcsharp_completion_queue_create_async_delegate>(library);
+ this.grpcsharp_completion_queue_create_sync = GetMethodDelegate<Delegates.grpcsharp_completion_queue_create_sync_delegate>(library);
this.grpcsharp_completion_queue_shutdown = GetMethodDelegate<Delegates.grpcsharp_completion_queue_shutdown_delegate>(library);
this.grpcsharp_completion_queue_next = GetMethodDelegate<Delegates.grpcsharp_completion_queue_next_delegate>(library);
this.grpcsharp_completion_queue_pluck = GetMethodDelegate<Delegates.grpcsharp_completion_queue_pluck_delegate>(library);
@@ -383,7 +385,8 @@ namespace Grpc.Core.Internal
public delegate int grpcsharp_sizeof_grpc_event_delegate();
- public delegate CompletionQueueSafeHandle grpcsharp_completion_queue_create_delegate();
+ public delegate CompletionQueueSafeHandle grpcsharp_completion_queue_create_async_delegate();
+ public delegate CompletionQueueSafeHandle grpcsharp_completion_queue_create_sync_delegate();
public delegate void grpcsharp_completion_queue_shutdown_delegate(CompletionQueueSafeHandle cq);
public delegate CompletionQueueEvent grpcsharp_completion_queue_next_delegate(CompletionQueueSafeHandle cq);
public delegate CompletionQueueEvent grpcsharp_completion_queue_pluck_delegate(CompletionQueueSafeHandle cq, IntPtr tag);
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 491df4de6a..27de7bc11f 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -354,8 +354,13 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_shutdown(void) { grpc_shutdown(); }
/* Completion queue */
GPR_EXPORT grpc_completion_queue *GPR_CALLTYPE
-grpcsharp_completion_queue_create(void) {
- return grpc_completion_queue_create(NULL);
+grpcsharp_completion_queue_create_async(void) {
+ return grpc_completion_queue_create_for_next(NULL);
+}
+
+GPR_EXPORT grpc_completion_queue *GPR_CALLTYPE
+grpcsharp_completion_queue_create_sync(void) {
+ return grpc_completion_queue_create_for_pluck(NULL);
}
GPR_EXPORT void GPR_CALLTYPE
diff --git a/src/node/ext/completion_queue_threadpool.cc b/src/node/ext/completion_queue_threadpool.cc
index 7b1bdda033..72df5d1d65 100644
--- a/src/node/ext/completion_queue_threadpool.cc
+++ b/src/node/ext/completion_queue_threadpool.cc
@@ -34,14 +34,14 @@
/* I don't like using #ifndef, but I don't see a better way to do this */
#ifndef GRPC_UV
-#include <node.h>
#include <nan.h>
+#include <node.h>
+#include "call.h"
+#include "completion_queue.h"
#include "grpc/grpc.h"
#include "grpc/support/log.h"
#include "grpc/support/time.h"
-#include "completion_queue.h"
-#include "call.h"
namespace grpc {
namespace node {
@@ -111,8 +111,8 @@ CompletionQueueAsyncWorker::CompletionQueueAsyncWorker()
CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
void CompletionQueueAsyncWorker::Execute() {
- result =
- grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ result = grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME),
+ NULL);
if (!result.success) {
SetErrorMessage("The async function encountered an error");
}
@@ -141,7 +141,7 @@ void CompletionQueueAsyncWorker::Init(Local<Object> exports) {
Nan::HandleScope scope;
current_threads = 0;
waiting_next_calls = 0;
- queue = grpc_completion_queue_create(NULL);
+ queue = grpc_completion_queue_create_for_next(NULL);
}
void CompletionQueueAsyncWorker::HandleOKCallback() {
@@ -168,9 +168,7 @@ grpc_completion_queue *GetCompletionQueue() {
return CompletionQueueAsyncWorker::GetQueue();
}
-void CompletionQueueNext() {
- CompletionQueueAsyncWorker::Next();
-}
+void CompletionQueueNext() { CompletionQueueAsyncWorker::Next(); }
void CompletionQueueInit(Local<Object> exports) {
CompletionQueueAsyncWorker::Init(exports);
@@ -179,4 +177,4 @@ void CompletionQueueInit(Local<Object> exports) {
} // namespace node
} // namespace grpc
-#endif /* GRPC_UV */
+#endif /* GRPC_UV */
diff --git a/src/node/ext/completion_queue_uv.cc b/src/node/ext/completion_queue_uv.cc
index 0f6f7da460..9b60911d1e 100644
--- a/src/node/ext/completion_queue_uv.cc
+++ b/src/node/ext/completion_queue_uv.cc
@@ -33,10 +33,10 @@
#ifdef GRPC_UV
-#include <uv.h>
+#include <grpc/grpc.h>
#include <node.h>
+#include <uv.h>
#include <v8.h>
-#include <grpc/grpc.h>
#include "call.h"
#include "completion_queue.h"
@@ -57,8 +57,8 @@ void drain_completion_queue(uv_prepare_t *handle) {
grpc_event event;
(void)handle;
do {
- event = grpc_completion_queue_next(
- queue, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL);
+ event = grpc_completion_queue_next(queue, gpr_inf_past(GPR_CLOCK_MONOTONIC),
+ NULL);
if (event.type == GRPC_OP_COMPLETE) {
const char *error_message;
@@ -77,9 +77,7 @@ void drain_completion_queue(uv_prepare_t *handle) {
} while (event.type != GRPC_QUEUE_TIMEOUT);
}
-grpc_completion_queue *GetCompletionQueue() {
- return queue;
-}
+grpc_completion_queue *GetCompletionQueue() { return queue; }
void CompletionQueueNext() {
if (pending_batches == 0) {
@@ -90,7 +88,7 @@ void CompletionQueueNext() {
}
void CompletionQueueInit(Local<Object> exports) {
- queue = grpc_completion_queue_create(NULL);
+ queue = grpc_completion_queue_create_for_next(NULL);
uv_prepare_init(uv_default_loop(), &prepare);
pending_batches = 0;
}
diff --git a/src/node/ext/server_generic.cc b/src/node/ext/server_generic.cc
index 0cf20f754a..24573bd52f 100644
--- a/src/node/ext/server_generic.cc
+++ b/src/node/ext/server_generic.cc
@@ -35,8 +35,8 @@
#include "server.h"
-#include <node.h>
#include <nan.h>
+#include <node.h>
#include "grpc/grpc.h"
#include "grpc/support/time.h"
@@ -44,7 +44,7 @@ namespace grpc {
namespace node {
Server::Server(grpc_server *server) : wrapped_server(server) {
- shutdown_queue = grpc_completion_queue_create(NULL);
+ shutdown_queue = grpc_completion_queue_create_for_pluck(NULL);
grpc_server_register_non_listening_completion_queue(server, shutdown_queue,
NULL);
}
diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
index 539b5ab83c..5ff77eac4c 100644
--- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
+++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
@@ -48,7 +48,7 @@
- (instancetype)init {
if ((self = [super init])) {
- _unmanagedQueue = grpc_completion_queue_create(NULL);
+ _unmanagedQueue = grpc_completion_queue_create_for_next(NULL);
// This is for the following block to capture the pointer by value (instead
// of retaining self and doing self->_unmanagedQueue). This is essential
diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
index 3b442645e8..3dd264718c 100644
--- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
+++ b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
@@ -79,7 +79,8 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack(
gpr_join_host_port(&ffd->localaddr, "127.0.0.1", port);
f.fixture_data = ffd;
- f.cq = grpc_completion_queue_create(NULL);
+ f.cq = grpc_completion_queue_create_for_next(NULL);
+ f.shutdown_cq = grpc_completion_queue_create_for_pluck(NULL);
return f;
}
diff --git a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m
index a76e45416b..6d94116c75 100644
--- a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m
+++ b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m
@@ -160,7 +160,7 @@ unsigned int parse_h2_length(const char *field) {
int port = grpc_pick_unused_port_or_die();
char *addr;
gpr_join_host_port(&addr, "127.0.0.1", port);
- grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
+ grpc_completion_queue *cq = grpc_completion_queue_create_for_next(NULL);
stream_engine *cronetEngine = [Cronet getGlobalEngine];
grpc_channel *client =
grpc_cronet_secure_channel_create(cronetEngine, addr, NULL, NULL);
@@ -295,7 +295,7 @@ unsigned int parse_h2_length(const char *field) {
int port = grpc_pick_unused_port_or_die();
char *addr;
gpr_join_host_port(&addr, "127.0.0.1", port);
- grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
+ grpc_completion_queue *cq = grpc_completion_queue_create_for_next(NULL);
stream_engine *cronetEngine = [Cronet getGlobalEngine];
grpc_channel *client =
grpc_cronet_secure_channel_create(cronetEngine, addr, args, NULL);
diff --git a/src/php/ext/grpc/completion_queue.c b/src/php/ext/grpc/completion_queue.c
index 741204b0b1..c75a524530 100644
--- a/src/php/ext/grpc/completion_queue.c
+++ b/src/php/ext/grpc/completion_queue.c
@@ -38,13 +38,10 @@
grpc_completion_queue *completion_queue;
void grpc_php_init_completion_queue(TSRMLS_D) {
- completion_queue = grpc_completion_queue_create(NULL);
+ completion_queue = grpc_completion_queue_create_for_pluck(NULL);
}
void grpc_php_shutdown_completion_queue(TSRMLS_D) {
grpc_completion_queue_shutdown(completion_queue);
- while (grpc_completion_queue_next(completion_queue,
- gpr_inf_future(GPR_CLOCK_REALTIME),
- NULL).type != GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_destroy(completion_queue);
}
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index d8df6c2ef4..34b2623d34 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -40,7 +40,7 @@ cdef class CompletionQueue:
def __cinit__(self):
grpc_init()
with nogil:
- self.c_completion_queue = grpc_completion_queue_create(NULL)
+ self.c_completion_queue = grpc_completion_queue_create_for_next(NULL)
self.is_shutting_down = False
self.is_shutdown = False
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index bbd72424b9..0b2bdef48b 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -309,7 +309,8 @@ cdef extern from "grpc/grpc.h":
void grpc_init() nogil
void grpc_shutdown() nogil
- grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil
+ grpc_completion_queue *grpc_completion_queue_create_for_next(void *reserved) nogil
+
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline,
void *reserved) nogil
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 1c20c8813f..05f7160183 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -351,7 +351,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
parent_call = grpc_rb_get_wrapped_call(parent);
}
- cq = grpc_completion_queue_create(NULL);
+ cq = grpc_completion_queue_create_for_pluck(NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
@@ -522,7 +522,7 @@ static VALUE run_poll_channels_loop(VALUE arg) {
* TODO(apolcyn) remove this when core handles new RPCs on dead connections.
*/
static void start_poll_channels_loop() {
- channel_polling_cq = grpc_completion_queue_create(NULL);
+ channel_polling_cq = grpc_completion_queue_create_for_next(NULL);
gpr_mu_init(&global_connection_polling_mu);
abort_channel_polling = 0;
rb_thread_create(run_poll_channels_loop, NULL);
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index f5dcd68a8e..e036ff7bd6 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -233,7 +233,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f
typedef grpc_completion_queue *(*grpc_completion_queue_create_for_pluck_type)(void *reserved);
extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import;
#define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import
-typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(void *reserved);
+typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved);
extern grpc_completion_queue_create_type grpc_completion_queue_create_import;
#define grpc_completion_queue_create grpc_completion_queue_create_import
typedef grpc_event(*grpc_completion_queue_next_type)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved);
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 7b2f5774aa..ef57d5b07e 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -37,15 +37,15 @@
#include "rb_server.h"
#include <grpc/grpc.h>
-#include <grpc/support/atm.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/atm.h>
#include <grpc/support/log.h>
+#include "rb_byte_buffer.h"
#include "rb_call.h"
#include "rb_channel_args.h"
#include "rb_completion_queue.h"
-#include "rb_server_credentials.h"
-#include "rb_byte_buffer.h"
#include "rb_grpc.h"
+#include "rb_server_credentials.h"
/* grpc_rb_cServer is the ruby class that proxies grpc_server. */
static VALUE grpc_rb_cServer = Qnil;
@@ -93,9 +93,8 @@ static void grpc_rb_server_free(void *p) {
};
svr = (grpc_rb_server *)p;
- deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(2, GPR_TIMESPAN));
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(2, GPR_TIMESPAN));
destroy_server(svr, deadline);
@@ -104,13 +103,15 @@ static void grpc_rb_server_free(void *p) {
static const rb_data_type_t grpc_rb_server_data_type = {
"grpc_server",
- {GRPC_RB_GC_NOT_MARKED, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {GRPC_RB_GC_NOT_MARKED,
+ grpc_rb_server_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}},
NULL,
NULL,
#ifdef RUBY_TYPED_FREE_IMMEDIATELY
- /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free function would block
- * and we might want to unlock GVL
+ /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free
+ * function would block and we might want to unlock GVL
* TODO(yugui) Unlock GVL?
*/
0,
@@ -131,7 +132,7 @@ static VALUE grpc_rb_server_alloc(VALUE cls) {
Initializes server instances. */
static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
- grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
+ grpc_completion_queue *cq = grpc_completion_queue_create_for_pluck(NULL);
grpc_rb_server *wrapper = NULL;
grpc_server *srv = NULL;
grpc_channel_args args;
@@ -163,7 +164,7 @@ typedef struct request_call_stack {
/* grpc_request_call_stack_init ensures the request_call_stack is properly
* initialized */
-static void grpc_request_call_stack_init(request_call_stack* st) {
+static void grpc_request_call_stack_init(request_call_stack *st) {
MEMZERO(st, request_call_stack, 1);
grpc_metadata_array_init(&st->md_ary);
grpc_call_details_init(&st->details);
@@ -171,7 +172,7 @@ static void grpc_request_call_stack_init(request_call_stack* st) {
/* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
* cleaned up */
-static void grpc_request_call_stack_cleanup(request_call_stack* st) {
+static void grpc_request_call_stack_cleanup(request_call_stack *st) {
grpc_metadata_array_destroy(&st->md_ary);
grpc_call_details_destroy(&st->details);
}
@@ -187,8 +188,9 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_call_error err;
request_call_stack st;
VALUE result;
- void *tag = (void*)&st;
- grpc_completion_queue *call_queue = grpc_completion_queue_create(NULL);
+ void *tag = (void *)&st;
+ grpc_completion_queue *call_queue =
+ grpc_completion_queue_create_for_pluck(NULL);
gpr_timespec deadline;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
@@ -199,9 +201,8 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_request_call_stack_init(&st);
/* call grpc_server_request_call, then wait for it to complete using
* pluck_event */
- err = grpc_server_request_call(
- s->wrapped, &call, &st.details, &st.md_ary,
- call_queue, s->queue, tag);
+ err = grpc_server_request_call(s->wrapped, &call, &st.details, &st.md_ary,
+ call_queue, s->queue, tag);
if (err != GRPC_CALL_OK) {
grpc_request_call_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError,
@@ -218,8 +219,6 @@ static VALUE grpc_rb_server_request_call(VALUE self) {
return Qnil;
}
-
-
/* build the NewServerRpc struct result */
deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
result = rb_struct_new(
@@ -299,8 +298,7 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
return Qnil;
} else if (TYPE(rb_creds) == T_SYMBOL) {
if (id_insecure_server != SYM2ID(rb_creds)) {
- rb_raise(rb_eTypeError,
- "bad creds symbol, want :this_port_is_insecure");
+ rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure");
return Qnil;
}
recvd_port =
@@ -312,9 +310,8 @@ static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
}
} else {
creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
- recvd_port =
- grpc_server_add_secure_http2_port(s->wrapped, StringValueCStr(port),
- creds);
+ recvd_port = grpc_server_add_secure_http2_port(
+ s->wrapped, StringValueCStr(port), creds);
if (recvd_port == 0) {
rb_raise(rb_eRuntimeError,
"could not add secure port %s to server, not sure why",
@@ -333,18 +330,17 @@ void Init_grpc_server() {
/* Provides a ruby constructor and support for dup/clone. */
rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
- rb_define_method(grpc_rb_cServer, "initialize_copy",
- grpc_rb_cannot_init_copy, 1);
+ rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy,
+ 1);
/* Add the server methods. */
- rb_define_method(grpc_rb_cServer, "request_call",
- grpc_rb_server_request_call, 0);
+ rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
+ 0);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
rb_define_alias(grpc_rb_cServer, "close", "destroy");
rb_define_method(grpc_rb_cServer, "add_http2_port",
- grpc_rb_server_add_http2_port,
- 2);
+ grpc_rb_server_add_http2_port, 2);
id_at = rb_intern("at");
id_insecure_server = rb_intern("this_port_is_insecure");
}