aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/iomgr/exec_ctx.c75
-rw-r--r--src/core/iomgr/exec_ctx.h23
-rw-r--r--src/core/iomgr/iomgr.c3
-rw-r--r--src/core/iomgr/pollset.h12
-rw-r--r--src/core/iomgr/workqueue_posix.c4
-rw-r--r--src/cpp/client/secure_credentials.cc16
-rw-r--r--src/cpp/codegen/codegen_init.cc (renamed from src/cpp/codegen/grpc_library.cc)13
-rw-r--r--src/cpp/common/call.cc92
-rw-r--r--src/cpp/common/completion_queue.cc33
-rw-r--r--src/cpp/common/core_codegen.cc (renamed from src/cpp/proto/proto_utils.cc)106
-rw-r--r--src/cpp/common/core_codegen.h71
-rw-r--r--src/cpp/util/string_ref.cc66
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi41
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi46
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi21
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi78
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi148
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi94
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi51
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx11
20 files changed, 571 insertions, 433 deletions
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c
index 1fd79f6eba..893fe4515c 100644
--- a/src/core/iomgr/exec_ctx.c
+++ b/src/core/iomgr/exec_ctx.c
@@ -34,9 +34,12 @@
#include "src/core/iomgr/exec_ctx.h"
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
#include "src/core/profiling/timers.h"
+#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
@@ -74,3 +77,75 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_move(list, &exec_ctx->closure_list);
}
+
+void grpc_exec_ctx_global_init(void) {}
+void grpc_exec_ctx_global_shutdown(void) {}
+#else
+static gpr_mu g_mu;
+static gpr_cv g_cv;
+static int g_threads = 0;
+
+static void run_closure(void *arg) {
+ grpc_closure *closure = arg;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0);
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(&g_mu);
+ if (--g_threads == 0) {
+ gpr_cv_signal(&g_cv);
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+static void start_closure(grpc_closure *closure) {
+ gpr_thd_id id;
+ gpr_mu_lock(&g_mu);
+ g_threads++;
+ gpr_mu_unlock(&g_mu);
+ gpr_thd_new(&id, run_closure, closure, NULL);
+}
+
+bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; }
+
+void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {}
+
+void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ bool success,
+ grpc_workqueue *offload_target_or_null) {
+ GPR_ASSERT(offload_target_or_null == NULL);
+ if (closure == NULL) return;
+ closure->final_data = success;
+ start_closure(closure);
+}
+
+void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
+ grpc_closure_list *list,
+ grpc_workqueue *offload_target_or_null) {
+ GPR_ASSERT(offload_target_or_null == NULL);
+ if (list == NULL) return;
+ grpc_closure *p = list->head;
+ while (p) {
+ grpc_closure *start = p;
+ p = grpc_closure_next(start);
+ start_closure(start);
+ }
+ grpc_closure_list r = GRPC_CLOSURE_LIST_INIT;
+ *list = r;
+}
+
+void grpc_exec_ctx_global_init(void) {
+ gpr_mu_init(&g_mu);
+ gpr_cv_init(&g_cv);
+}
+
+void grpc_exec_ctx_global_shutdown(void) {
+ gpr_mu_lock(&g_mu);
+ while (g_threads != 0) {
+ gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&g_mu);
+
+ gpr_mu_destroy(&g_mu);
+ gpr_cv_destroy(&g_cv);
+}
+#endif
diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h
index 9a9b2e55fa..1b627a5dcf 100644
--- a/src/core/iomgr/exec_ctx.h
+++ b/src/core/iomgr/exec_ctx.h
@@ -36,6 +36,14 @@
#include "src/core/iomgr/closure.h"
+/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */
+
+/** A workqueue represents a list of work to be executed asynchronously.
+ Forward declared here to avoid a circular dependency with workqueue.h. */
+struct grpc_workqueue;
+typedef struct grpc_workqueue grpc_workqueue;
+
+#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
/** Execution context.
* A bag of data that collects information along a callstack.
* Generally created at public API entry points, and passed down as
@@ -57,13 +65,15 @@ struct grpc_exec_ctx {
grpc_closure_list closure_list;
};
-/** A workqueue represents a list of work to be executed asynchronously.
- Forward declared here to avoid a circular dependency with workqueue.h. */
-struct grpc_workqueue;
-typedef struct grpc_workqueue grpc_workqueue;
-
#define GRPC_EXEC_CTX_INIT \
{ GRPC_CLOSURE_LIST_INIT }
+#else
+struct grpc_exec_ctx {
+ int unused;
+};
+#define GRPC_EXEC_CTX_INIT \
+ { 0 }
+#endif
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
@@ -82,4 +92,7 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
grpc_closure_list *list,
grpc_workqueue *offload_target_or_null);
+void grpc_exec_ctx_global_init(void);
+void grpc_exec_ctx_global_shutdown(void);
+
#endif
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 9c89c2c08a..3ab4430668 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -43,6 +43,7 @@
#include <grpc/support/thd.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/exec_ctx.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/timer.h"
#include "src/core/support/env.h"
@@ -57,6 +58,7 @@ void grpc_iomgr_init(void) {
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
+ grpc_exec_ctx_global_init();
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
@@ -138,6 +140,7 @@ void grpc_iomgr_shutdown(void) {
grpc_pollset_global_shutdown();
grpc_iomgr_platform_shutdown();
+ grpc_exec_ctx_global_shutdown();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_rcv);
}
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 92a0374ddd..ee1debfb71 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -55,7 +55,7 @@ typedef struct grpc_pollset_worker grpc_pollset_worker;
size_t grpc_pollset_size(void);
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu);
/* Begin shutting down the pollset, and call closure when done.
- * GRPC_POLLSET_MU(pollset) must be held */
+ * pollset's mutex must be held */
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
/** Reset the pollset to its initial state (perhaps with some cached objects);
@@ -66,16 +66,16 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
/* Do some work on a pollset.
May involve invoking asynchronous callbacks, or actually polling file
descriptors.
- Requires GRPC_POLLSET_MU(pollset) locked.
- May unlock GRPC_POLLSET_MU(pollset) during its execution.
+ Requires pollset's mutex locked.
+ May unlock its mutex during its execution.
worker is a (platform-specific) handle that can be used to wake up
from grpc_pollset_work before any events are received and before the timeout
has expired. It is both initialized and destroyed by grpc_pollset_work.
Initialization of worker is guaranteed to occur BEFORE the
- GRPC_POLLSET_MU(pollset) is released for the first time by
- grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
- not be released by grpc_pollset_work AFTER worker has been destroyed.
+ pollset's mutex is released for the first time by grpc_pollset_work
+ and it is guaranteed that it will not be released by grpc_pollset_work
+ AFTER worker has been destroyed.
Tries not to block past deadline.
May call grpc_closure_list_run on grpc_closure_list, without holding the
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index c096dbfb30..2b42e6d4fb 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -107,7 +107,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
if (grpc_closure_list_empty(workqueue->closure_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
- grpc_closure_list_move(&exec_ctx->closure_list, &workqueue->closure_list);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
gpr_mu_unlock(&workqueue->mu);
}
@@ -123,7 +123,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
gpr_free(workqueue);
} else {
gpr_mu_lock(&workqueue->mu);
- grpc_closure_list_move(&workqueue->closure_list, &exec_ctx->closure_list);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 074dae7ca7..308455527c 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -83,14 +83,14 @@ std::shared_ptr<CallCredentials> WrapCallCredentials(
} // namespace
std::shared_ptr<ChannelCredentials> GoogleDefaultCredentials() {
- GrpcLibrary init; // To call grpc_init().
+ GrpcLibraryCodegen init; // To call grpc_init().
return WrapChannelCredentials(grpc_google_default_credentials_create());
}
// Builds SSL Credentials given SSL specific options
std::shared_ptr<ChannelCredentials> SslCredentials(
const SslCredentialsOptions& options) {
- GrpcLibrary init; // To call grpc_init().
+ GrpcLibraryCodegen init; // To call grpc_init().
grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {
options.pem_private_key.c_str(), options.pem_cert_chain.c_str()};
@@ -102,7 +102,7 @@ std::shared_ptr<ChannelCredentials> SslCredentials(
// Builds credentials for use when running in GCE
std::shared_ptr<CallCredentials> GoogleComputeEngineCredentials() {
- GrpcLibrary init; // To call grpc_init().
+ GrpcLibraryCodegen init; // To call grpc_init().
return WrapCallCredentials(
grpc_google_compute_engine_credentials_create(nullptr));
}
@@ -110,7 +110,7 @@ std::shared_ptr<CallCredentials> GoogleComputeEngineCredentials() {
// Builds JWT credentials.
std::shared_ptr<CallCredentials> ServiceAccountJWTAccessCredentials(
const grpc::string& json_key, long token_lifetime_seconds) {
- GrpcLibrary init; // To call grpc_init().
+ GrpcLibraryCodegen init; // To call grpc_init().
if (token_lifetime_seconds <= 0) {
gpr_log(GPR_ERROR,
"Trying to create JWTCredentials with non-positive lifetime");
@@ -125,7 +125,7 @@ std::shared_ptr<CallCredentials> ServiceAccountJWTAccessCredentials(
// Builds refresh token credentials.
std::shared_ptr<CallCredentials> GoogleRefreshTokenCredentials(
const grpc::string& json_refresh_token) {
- GrpcLibrary init; // To call grpc_init().
+ GrpcLibraryCodegen init; // To call grpc_init().
return WrapCallCredentials(grpc_google_refresh_token_credentials_create(
json_refresh_token.c_str(), nullptr));
}
@@ -133,7 +133,7 @@ std::shared_ptr<CallCredentials> GoogleRefreshTokenCredentials(
// Builds access token credentials.
std::shared_ptr<CallCredentials> AccessTokenCredentials(
const grpc::string& access_token) {
- GrpcLibrary init; // To call grpc_init().
+ GrpcLibraryCodegen init; // To call grpc_init().
return WrapCallCredentials(
grpc_access_token_credentials_create(access_token.c_str(), nullptr));
}
@@ -142,7 +142,7 @@ std::shared_ptr<CallCredentials> AccessTokenCredentials(
std::shared_ptr<CallCredentials> GoogleIAMCredentials(
const grpc::string& authorization_token,
const grpc::string& authority_selector) {
- GrpcLibrary init; // To call grpc_init().
+ GrpcLibraryCodegen init; // To call grpc_init().
return WrapCallCredentials(grpc_google_iam_credentials_create(
authorization_token.c_str(), authority_selector.c_str(), nullptr));
}
@@ -224,7 +224,7 @@ MetadataCredentialsPluginWrapper::MetadataCredentialsPluginWrapper(
std::shared_ptr<CallCredentials> MetadataCredentialsFromPlugin(
std::unique_ptr<MetadataCredentialsPlugin> plugin) {
- GrpcLibrary init; // To call grpc_init().
+ GrpcLibraryCodegen init; // To call grpc_init().
const char* type = plugin->GetType();
MetadataCredentialsPluginWrapper* wrapper =
new MetadataCredentialsPluginWrapper(std::move(plugin));
diff --git a/src/cpp/codegen/grpc_library.cc b/src/cpp/codegen/codegen_init.cc
index 48acec3f3d..c5d22124b7 100644
--- a/src/cpp/codegen/grpc_library.cc
+++ b/src/cpp/codegen/codegen_init.cc
@@ -31,10 +31,15 @@
*
*/
+#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/grpc_library.h>
-namespace grpc {
+/// Initializes the global gRPC variables for the codegen library. These will
+/// stay null in the absence of of grpc++ library. In this case, no gRPC
+/// features such as the ability to perform calls will be available. Trying to
+/// perform them would result in a segmentation fault when trying to deference
+/// the following nulled globals. These should be associated with actual
+/// as part of the instantiation of a \a grpc::GrpcLibraryInitializer variable.
-GrpcLibraryInterface *g_glip = nullptr;
-
-} // namespace grpc
+grpc::CoreCodegenInterface* grpc::g_core_codegen_interface = nullptr;
+grpc::GrpcLibraryInterface* grpc::g_glip = nullptr;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
deleted file mode 100644
index 5b87c2a806..0000000000
--- a/src/cpp/common/call.cc
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc++/impl/call.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc++/channel.h>
-#include <grpc++/client_context.h>
-#include <grpc++/support/byte_buffer.h>
-#include "src/core/profiling/timers.h"
-
-namespace grpc {
-
-void FillMetadataMap(
- grpc_metadata_array* arr,
- std::multimap<grpc::string_ref, grpc::string_ref>* metadata) {
- for (size_t i = 0; i < arr->count; i++) {
- // TODO(yangg) handle duplicates?
- metadata->insert(std::pair<grpc::string_ref, grpc::string_ref>(
- arr->metadata[i].key, grpc::string_ref(arr->metadata[i].value,
- arr->metadata[i].value_length)));
- }
- grpc_metadata_array_destroy(arr);
- grpc_metadata_array_init(arr);
-}
-
-// TODO(yangg) if the map is changed before we send, the pointers will be a
-// mess. Make sure it does not happen.
-grpc_metadata* FillMetadataArray(
- const std::multimap<grpc::string, grpc::string>& metadata) {
- if (metadata.empty()) {
- return nullptr;
- }
- grpc_metadata* metadata_array =
- (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata));
- size_t i = 0;
- for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
- metadata_array[i].key = iter->first.c_str();
- metadata_array[i].value = iter->second.c_str();
- metadata_array[i].value_length = iter->second.size();
- }
- return metadata_array;
-}
-
-Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
- : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
-
-Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
- int max_message_size)
- : call_hook_(call_hook),
- cq_(cq),
- call_(call),
- max_message_size_(max_message_size) {}
-
-void Call::PerformOps(CallOpSetInterface* ops) {
- if (max_message_size_ > 0) {
- ops->set_max_message_size(max_message_size_);
- }
- call_hook_->PerformOpsOnCall(ops, this);
-}
-
-} // namespace grpc
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 4f76dfff1d..729dc33749 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -34,7 +34,6 @@
#include <memory>
-#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/time.h>
#include <grpc/grpc.h>
@@ -43,16 +42,13 @@
namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
-CompletionQueue::CompletionQueue() {
- g_gli_initializer.summon();
- cq_ = grpc_completion_queue_create(nullptr);
-}
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
-CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
-
-void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
+void CompletionQueue::Shutdown() {
+ g_gli_initializer.summon();
+ grpc_completion_queue_shutdown(cq_);
+}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) {
@@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
}
}
-bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
- auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
- bool ok = ev.success != 0;
- void* ignored = tag;
- GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
- GPR_ASSERT(ignored == tag);
- // Ignore mutations by FinalizeResult: Pluck returns the C API status
- return ev.success != 0;
-}
-
-void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
- auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
- auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
- if (ev.type == GRPC_QUEUE_TIMEOUT) return;
- bool ok = ev.success != 0;
- void* ignored = tag;
- // the tag must be swallowed if using TryPluck
- GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
-}
-
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/common/core_codegen.cc
index 79e7bf1801..45e9e278a0 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,28 +31,31 @@
*
*/
-#include <grpc++/impl/proto_utils.h>
+#include "src/cpp/common/core_codegen.h"
-#include <climits>
+#include <stdlib.h>
-#include <grpc/grpc.h>
+#include <grpc++/support/config.h>
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
-#include <grpc/support/log.h>
+#include <grpc/grpc.h>
+#include <grpc/impl/codegen/alloc.h>
+#include <grpc/impl/codegen/byte_buffer.h>
+#include <grpc/impl/codegen/log.h>
+#include <grpc/support/port_platform.h>
#include <grpc/support/slice.h>
#include <grpc/support/slice_buffer.h>
-#include <grpc/support/port_platform.h>
-#include <grpc++/support/config.h>
#include "src/core/profiling/timers.h"
-const int kMaxBufferLength = 8192;
+namespace {
+
+const int kGrpcBufferWriterMaxBufferLength = 8192;
class GrpcBufferWriter GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
- explicit GrpcBufferWriter(grpc_byte_buffer** bp,
- int block_size = kMaxBufferLength)
+ explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
@@ -161,14 +164,56 @@ class GrpcBufferReader GRPC_FINAL
grpc_byte_buffer_reader reader_;
gpr_slice slice_;
};
+} // namespace
namespace grpc {
-Status SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** bp) {
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
+ void* reserved) {
+ return ::grpc_completion_queue_create(reserved);
+}
+
+void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) {
+ ::grpc_completion_queue_destroy(cq);
+}
+
+grpc_event CoreCodegen::grpc_completion_queue_pluck(grpc_completion_queue* cq,
+ void* tag,
+ gpr_timespec deadline,
+ void* reserved) {
+ return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved);
+}
+
+void* CoreCodegen::gpr_malloc(size_t size) { return ::gpr_malloc(size); }
+
+void CoreCodegen::gpr_free(void* p) { return ::gpr_free(p); }
+
+void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
+ ::grpc_byte_buffer_destroy(bb);
+}
+
+void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) {
+ ::grpc_metadata_array_init(array);
+}
+
+void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) {
+ ::grpc_metadata_array_destroy(array);
+}
+
+gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) {
+ return ::gpr_inf_future(type);
+}
+
+void CoreCodegen::assert_fail(const char* failed_assertion) {
+ gpr_log(GPR_ERROR, "assertion failed: %s", failed_assertion);
+ abort();
+}
+
+Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** bp) {
GPR_TIMER_SCOPE("SerializeProto", 0);
int byte_size = msg.ByteSize();
- if (byte_size <= kMaxBufferLength) {
+ if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
gpr_slice slice = gpr_slice_malloc(byte_size);
GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
@@ -176,31 +221,36 @@ Status SerializeProto(const grpc::protobuf::Message& msg,
gpr_slice_unref(slice);
return Status::OK;
} else {
- GrpcBufferWriter writer(bp);
+ GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? Status::OK
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
}
-Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size) {
+Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer,
+ grpc::protobuf::Message* msg,
+ int max_message_size) {
GPR_TIMER_SCOPE("DeserializeProto", 0);
- if (!buffer) {
+ if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");
}
- GrpcBufferReader reader(buffer);
- ::grpc::protobuf::io::CodedInputStream decoder(&reader);
- if (max_message_size > 0) {
- decoder.SetTotalBytesLimit(max_message_size, max_message_size);
- }
- if (!msg->ParseFromCodedStream(&decoder)) {
- return Status(StatusCode::INTERNAL, msg->InitializationErrorString());
- }
- if (!decoder.ConsumedEntireMessage()) {
- return Status(StatusCode::INTERNAL, "Did not read entire message");
+ Status result = Status::OK;
+ {
+ GrpcBufferReader reader(buffer);
+ ::grpc::protobuf::io::CodedInputStream decoder(&reader);
+ if (max_message_size > 0) {
+ decoder.SetTotalBytesLimit(max_message_size, max_message_size);
+ }
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ result = Status(StatusCode::INTERNAL, "Did not read entire message");
+ }
}
- return Status::OK;
+ grpc_byte_buffer_destroy(buffer);
+ return result;
}
} // namespace grpc
diff --git a/src/cpp/common/core_codegen.h b/src/cpp/common/core_codegen.h
new file mode 100644
index 0000000000..0d8c6b79f7
--- /dev/null
+++ b/src/cpp/common/core_codegen.h
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// This file should be compiled as part of grpc++.
+
+#include <grpc++/impl/codegen/core_codegen_interface.h>
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/byte_buffer.h>
+
+namespace grpc {
+
+/// Implementation of the core codegen interface.
+class CoreCodegen : public CoreCodegenInterface {
+ private:
+ Status SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** bp) override;
+
+ Status DeserializeProto(grpc_byte_buffer* buffer,
+ grpc::protobuf::Message* msg,
+ int max_message_size) override;
+
+ grpc_completion_queue* grpc_completion_queue_create(void* reserved) override;
+ void grpc_completion_queue_destroy(grpc_completion_queue* cq) override;
+ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline,
+ void* reserved) override;
+
+ void* gpr_malloc(size_t size) override;
+ void gpr_free(void* p) override;
+
+ void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
+
+ void grpc_metadata_array_init(grpc_metadata_array* array) override;
+ void grpc_metadata_array_destroy(grpc_metadata_array* array) override;
+
+ gpr_timespec gpr_inf_future(gpr_clock_type type) override;
+
+ void assert_fail(const char* failed_assertion) override;
+};
+
+} // namespace grpc
diff --git a/src/cpp/util/string_ref.cc b/src/cpp/util/string_ref.cc
index 66c79a1818..b55019b5f2 100644
--- a/src/cpp/util/string_ref.cc
+++ b/src/cpp/util/string_ref.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -33,72 +33,8 @@
#include <grpc++/support/string_ref.h>
-#include <string.h>
-
-#include <algorithm>
-#include <iostream>
-
namespace grpc {
const size_t string_ref::npos = size_t(-1);
-string_ref& string_ref::operator=(const string_ref& rhs) {
- data_ = rhs.data_;
- length_ = rhs.length_;
- return *this;
-}
-
-string_ref::string_ref(const char* s) : data_(s), length_(strlen(s)) {}
-
-string_ref string_ref::substr(size_t pos, size_t n) const {
- if (pos > length_) pos = length_;
- if (n > (length_ - pos)) n = length_ - pos;
- return string_ref(data_ + pos, n);
-}
-
-int string_ref::compare(string_ref x) const {
- size_t min_size = length_ < x.length_ ? length_ : x.length_;
- int r = memcmp(data_, x.data_, min_size);
- if (r < 0) return -1;
- if (r > 0) return 1;
- if (length_ < x.length_) return -1;
- if (length_ > x.length_) return 1;
- return 0;
-}
-
-bool string_ref::starts_with(string_ref x) const {
- return length_ >= x.length_ && (memcmp(data_, x.data_, x.length_) == 0);
-}
-
-bool string_ref::ends_with(string_ref x) const {
- return length_ >= x.length_ &&
- (memcmp(data_ + (length_ - x.length_), x.data_, x.length_) == 0);
-}
-
-size_t string_ref::find(string_ref s) const {
- auto it = std::search(cbegin(), cend(), s.cbegin(), s.cend());
- return it == cend() ? npos : std::distance(cbegin(), it);
-}
-
-size_t string_ref::find(char c) const {
- auto it = std::find(cbegin(), cend(), c);
- return it == cend() ? npos : std::distance(cbegin(), it);
-}
-
-bool operator==(string_ref x, string_ref y) { return x.compare(y) == 0; }
-
-bool operator!=(string_ref x, string_ref y) { return x.compare(y) != 0; }
-
-bool operator<(string_ref x, string_ref y) { return x.compare(y) < 0; }
-
-bool operator<=(string_ref x, string_ref y) { return x.compare(y) <= 0; }
-
-bool operator>(string_ref x, string_ref y) { return x.compare(y) > 0; }
-
-bool operator>=(string_ref x, string_ref y) { return x.compare(y) >= 0; }
-
-std::ostream& operator<<(std::ostream& out, const string_ref& string) {
- return out << grpc::string(string.begin(), string.end());
-}
-
} // namespace grpc
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 80f4da51e8..d1b9c98ffc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -40,14 +40,17 @@ cdef class Call:
def start_batch(self, operations, tag):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
+ cdef grpc_call_error result
cdef Operations cy_operations = Operations(operations)
cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = self
operation_tag.batch_operations = cy_operations
cpython.Py_INCREF(operation_tag)
- return grpc_call_start_batch(
- self.c_call, cy_operations.c_ops, cy_operations.c_nops,
- <cpython.PyObject *>operation_tag, NULL)
+ with nogil:
+ result = grpc_call_start_batch(
+ self.c_call, cy_operations.c_ops, cy_operations.c_nops,
+ <cpython.PyObject *>operation_tag, NULL)
+ return result
def cancel(
self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE,
@@ -57,6 +60,8 @@ cdef class Call:
if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE):
raise ValueError("if error_code is specified, so must details "
"(and vice-versa)")
+ cdef grpc_call_error result
+ cdef char *c_details = NULL
if error_code != GRPC_STATUS__DO_NOT_USE:
if isinstance(details, bytes):
pass
@@ -65,25 +70,37 @@ cdef class Call:
else:
raise TypeError("expected details to be str or bytes")
self.references.append(details)
- return grpc_call_cancel_with_status(
- self.c_call, error_code, details, NULL)
+ c_details = details
+ with nogil:
+ result = grpc_call_cancel_with_status(
+ self.c_call, error_code, c_details, NULL)
+ return result
else:
- return grpc_call_cancel(self.c_call, NULL)
+ with nogil:
+ result = grpc_call_cancel(self.c_call, NULL)
+ return result
def set_credentials(
self, CallCredentials call_credentials not None):
- return grpc_call_set_credentials(
- self.c_call, call_credentials.c_credentials)
+ cdef grpc_call_error result
+ with nogil:
+ result = grpc_call_set_credentials(
+ self.c_call, call_credentials.c_credentials)
+ return result
def peer(self):
- cdef char *peer = grpc_call_get_peer(self.c_call)
+ cdef char *peer = NULL
+ with nogil:
+ peer = grpc_call_get_peer(self.c_call)
result = <bytes>peer
- gpr_free(peer)
+ with nogil:
+ gpr_free(peer)
return result
def __dealloc__(self):
if self.c_call != NULL:
- grpc_call_destroy(self.c_call)
+ with nogil:
+ grpc_call_destroy(self.c_call)
# The object *should* always be valid from Python. Used for debugging.
@property
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index bc03c4dcf1..d612c90791 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -35,6 +35,7 @@ cdef class Channel:
def __cinit__(self, target, ChannelArgs arguments=None,
ChannelCredentials channel_credentials=None):
cdef grpc_channel_args *c_arguments = NULL
+ cdef char *c_target = NULL
self.c_channel = NULL
self.references = []
if arguments is not None:
@@ -45,12 +46,15 @@ cdef class Channel:
target = target.encode()
else:
raise TypeError("expected target to be str or bytes")
+ c_target = target
if channel_credentials is None:
- self.c_channel = grpc_insecure_channel_create(target, c_arguments,
- NULL)
+ with nogil:
+ self.c_channel = grpc_insecure_channel_create(c_target, c_arguments,
+ NULL)
else:
- self.c_channel = grpc_secure_channel_create(
- channel_credentials.c_credentials, target, c_arguments, NULL)
+ with nogil:
+ self.c_channel = grpc_secure_channel_create(
+ channel_credentials.c_credentials, c_target, c_arguments, NULL)
self.references.append(channel_credentials)
self.references.append(target)
self.references.append(arguments)
@@ -66,6 +70,7 @@ cdef class Channel:
method = method.encode()
else:
raise TypeError("expected method to be str or bytes")
+ cdef char *method_c_string = method
cdef char *host_c_string = NULL
if host is None:
pass
@@ -81,31 +86,40 @@ cdef class Channel:
cdef grpc_call *parent_call = NULL
if parent is not None:
parent_call = parent.c_call
- operation_call.c_call = grpc_channel_create_call(
- self.c_channel, parent_call, flags,
- queue.c_completion_queue, method, host_c_string, deadline.c_time,
- NULL)
+ with nogil:
+ operation_call.c_call = grpc_channel_create_call(
+ self.c_channel, parent_call, flags,
+ queue.c_completion_queue, method_c_string, host_c_string,
+ deadline.c_time, NULL)
return operation_call
def check_connectivity_state(self, bint try_to_connect):
- return grpc_channel_check_connectivity_state(self.c_channel,
- try_to_connect)
+ cdef grpc_connectivity_state result
+ with nogil:
+ result = grpc_channel_check_connectivity_state(self.c_channel,
+ try_to_connect)
+ return result
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag = OperationTag(tag)
cpython.Py_INCREF(operation_tag)
- grpc_channel_watch_connectivity_state(
- self.c_channel, last_observed_state, deadline.c_time,
- queue.c_completion_queue, <cpython.PyObject *>operation_tag)
+ with nogil:
+ grpc_channel_watch_connectivity_state(
+ self.c_channel, last_observed_state, deadline.c_time,
+ queue.c_completion_queue, <cpython.PyObject *>operation_tag)
def target(self):
- cdef char * target = grpc_channel_get_target(self.c_channel)
+ cdef char *target = NULL
+ with nogil:
+ target = grpc_channel_get_target(self.c_channel)
result = <bytes>target
- gpr_free(target)
+ with nogil:
+ gpr_free(target)
return result
def __dealloc__(self):
if self.c_channel != NULL:
- grpc_channel_destroy(self.c_channel)
+ with nogil:
+ grpc_channel_destroy(self.c_channel)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index e11138b1cd..09e47d4222 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -36,7 +36,8 @@ import time
cdef class CompletionQueue:
def __cinit__(self):
- self.c_completion_queue = grpc_completion_queue_create(NULL)
+ with nogil:
+ self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False
self.is_shutdown = False
self.pluck_condition = threading.Condition()
@@ -82,8 +83,9 @@ cdef class CompletionQueue:
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
# 'special' methods (like next and __next__).
- cdef gpr_timespec c_deadline = gpr_inf_future(
- GPR_CLOCK_REALTIME)
+ cdef gpr_timespec c_deadline
+ with nogil:
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if deadline is not None:
c_deadline = deadline.c_time
cdef grpc_event event
@@ -123,7 +125,8 @@ cdef class CompletionQueue:
return self._interpret_event(event)
def shutdown(self):
- grpc_completion_queue_shutdown(self.c_completion_queue)
+ with nogil:
+ grpc_completion_queue_shutdown(self.c_completion_queue)
self.is_shutting_down = True
def clear(self):
@@ -133,15 +136,19 @@ cdef class CompletionQueue:
pass
def __dealloc__(self):
- cdef gpr_timespec c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
+ cdef gpr_timespec c_deadline
+ with nogil:
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if self.c_completion_queue != NULL:
# Ensure shutdown
if not self.is_shutting_down:
- grpc_completion_queue_shutdown(self.c_completion_queue)
+ with nogil:
+ grpc_completion_queue_shutdown(self.c_completion_queue)
# Pump the queue
while not self.is_shutdown:
with nogil:
event = grpc_completion_queue_next(
self.c_completion_queue, c_deadline, NULL)
self._interpret_event(event)
- grpc_completion_queue_destroy(self.c_completion_queue)
+ with nogil:
+ grpc_completion_queue_destroy(self.c_completion_queue)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index 3f439c8900..1d7adca23e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -46,7 +46,8 @@ cdef class ChannelCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- grpc_channel_credentials_release(self.c_credentials)
+ with nogil:
+ grpc_channel_credentials_release(self.c_credentials)
cdef class CallCredentials:
@@ -63,7 +64,8 @@ cdef class CallCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- grpc_call_credentials_release(self.c_credentials)
+ with nogil:
+ grpc_call_credentials_release(self.c_credentials)
cdef class ServerCredentials:
@@ -74,7 +76,8 @@ cdef class ServerCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- grpc_server_credentials_release(self.c_credentials)
+ with nogil:
+ grpc_server_credentials_release(self.c_credentials)
cdef class CredentialsMetadataPlugin:
@@ -139,7 +142,8 @@ cdef void plugin_destroy_c_plugin_state(void *state):
def channel_credentials_google_default():
cdef ChannelCredentials credentials = ChannelCredentials();
- credentials.c_credentials = grpc_google_default_credentials_create()
+ with nogil:
+ credentials.c_credentials = grpc_google_default_credentials_create()
return credentials
def channel_credentials_ssl(pem_root_certificates,
@@ -158,12 +162,14 @@ def channel_credentials_ssl(pem_root_certificates,
c_pem_root_certificates = pem_root_certificates
credentials.references.append(pem_root_certificates)
if ssl_pem_key_cert_pair is not None:
- credentials.c_credentials = grpc_ssl_credentials_create(
- c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
+ with nogil:
+ credentials.c_credentials = grpc_ssl_credentials_create(
+ c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
credentials.references.append(ssl_pem_key_cert_pair)
else:
- credentials.c_credentials = grpc_ssl_credentials_create(
- c_pem_root_certificates, NULL, NULL)
+ with nogil:
+ credentials.c_credentials = grpc_ssl_credentials_create(
+ c_pem_root_certificates, NULL, NULL)
return credentials
def channel_credentials_composite(
@@ -172,8 +178,9 @@ def channel_credentials_composite(
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef ChannelCredentials credentials = ChannelCredentials()
- credentials.c_credentials = grpc_composite_channel_credentials_create(
- credentials_1.c_credentials, credentials_2.c_credentials, NULL)
+ with nogil:
+ credentials.c_credentials = grpc_composite_channel_credentials_create(
+ credentials_1.c_credentials, credentials_2.c_credentials, NULL)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
@@ -184,16 +191,18 @@ def call_credentials_composite(
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = grpc_composite_call_credentials_create(
- credentials_1.c_credentials, credentials_2.c_credentials, NULL)
+ with nogil:
+ credentials.c_credentials = grpc_composite_call_credentials_create(
+ credentials_1.c_credentials, credentials_2.c_credentials, NULL)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
def call_credentials_google_compute_engine():
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = (
- grpc_google_compute_engine_credentials_create(NULL))
+ with nogil:
+ credentials.c_credentials = (
+ grpc_google_compute_engine_credentials_create(NULL))
return credentials
def call_credentials_service_account_jwt_access(
@@ -205,9 +214,11 @@ def call_credentials_service_account_jwt_access(
else:
raise TypeError("expected json_key to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = (
- grpc_service_account_jwt_access_credentials_create(
- json_key, token_lifetime.c_time, NULL))
+ cdef char *json_key_c_string = json_key
+ with nogil:
+ credentials.c_credentials = (
+ grpc_service_account_jwt_access_credentials_create(
+ json_key_c_string, token_lifetime.c_time, NULL))
credentials.references.append(json_key)
return credentials
@@ -219,8 +230,10 @@ def call_credentials_google_refresh_token(json_refresh_token):
else:
raise TypeError("expected json_refresh_token to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = grpc_google_refresh_token_credentials_create(
- json_refresh_token, NULL)
+ cdef char *json_refresh_token_c_string = json_refresh_token
+ with nogil:
+ credentials.c_credentials = grpc_google_refresh_token_credentials_create(
+ json_refresh_token_c_string, NULL)
credentials.references.append(json_refresh_token)
return credentials
@@ -238,17 +251,21 @@ def call_credentials_google_iam(authorization_token, authority_selector):
else:
raise TypeError("expected authority_selector to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = grpc_google_iam_credentials_create(
- authorization_token, authority_selector, NULL)
+ cdef char *authorization_token_c_string = authorization_token
+ cdef char *authority_selector_c_string = authority_selector
+ with nogil:
+ credentials.c_credentials = grpc_google_iam_credentials_create(
+ authorization_token_c_string, authority_selector_c_string, NULL)
credentials.references.append(authorization_token)
credentials.references.append(authority_selector)
return credentials
def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin):
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = (
- grpc_metadata_credentials_create_from_plugin(plugin.make_c_plugin(),
- NULL))
+ cdef grpc_metadata_credentials_plugin c_plugin = plugin.make_c_plugin()
+ with nogil:
+ credentials.c_credentials = (
+ grpc_metadata_credentials_create_from_plugin(c_plugin, NULL))
# TODO(atash): the following held reference is *probably* never necessary
credentials.references.append(plugin)
return credentials
@@ -274,11 +291,12 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs,
credentials.references.append(pem_key_cert_pairs)
credentials.references.append(pem_root_certs)
credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs)
- credentials.c_ssl_pem_key_cert_pairs = (
- <grpc_ssl_pem_key_cert_pair *>gpr_malloc(
- sizeof(grpc_ssl_pem_key_cert_pair) *
- credentials.c_ssl_pem_key_cert_pairs_count
- ))
+ with nogil:
+ credentials.c_ssl_pem_key_cert_pairs = (
+ <grpc_ssl_pem_key_cert_pair *>gpr_malloc(
+ sizeof(grpc_ssl_pem_key_cert_pair) *
+ credentials.c_ssl_pem_key_cert_pairs_count
+ ))
for i in range(credentials.c_ssl_pem_key_cert_pairs_count):
credentials.c_ssl_pem_key_cert_pairs[i] = (
(<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index dbf0045710..61165cb021 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -38,27 +38,27 @@ cdef extern from "grpc/_cython/loader.h":
int pygrpc_load_core(char*)
- void *gpr_malloc(size_t size)
- void gpr_free(void *ptr)
- void *gpr_realloc(void *p, size_t size)
+ void *gpr_malloc(size_t size) nogil
+ void gpr_free(void *ptr) nogil
+ void *gpr_realloc(void *p, size_t size) nogil
ctypedef struct gpr_slice:
# don't worry about writing out the members of gpr_slice; we never access
# them directly.
pass
- gpr_slice gpr_slice_ref(gpr_slice s)
- void gpr_slice_unref(gpr_slice s)
- gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *))
+ gpr_slice gpr_slice_ref(gpr_slice s) nogil
+ void gpr_slice_unref(gpr_slice s) nogil
+ gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
gpr_slice gpr_slice_new_with_len(
- void *p, size_t len, void (*destroy)(void *, size_t))
- gpr_slice gpr_slice_malloc(size_t length)
- gpr_slice gpr_slice_from_copied_string(const char *source)
- gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len)
+ void *p, size_t len, void (*destroy)(void *, size_t)) nogil
+ gpr_slice gpr_slice_malloc(size_t length) nogil
+ gpr_slice gpr_slice_from_copied_string(const char *source) nogil
+ gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil
# Declare functions for function-like macros (because Cython)...
- void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s)
- size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s)
+ void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil
+ size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil
ctypedef enum gpr_clock_type:
GPR_CLOCK_MONOTONIC
@@ -71,14 +71,14 @@ cdef extern from "grpc/_cython/loader.h":
int32_t nanoseconds "tv_nsec"
gpr_clock_type clock_type
- gpr_timespec gpr_time_0(gpr_clock_type type)
- gpr_timespec gpr_inf_future(gpr_clock_type type)
- gpr_timespec gpr_inf_past(gpr_clock_type type)
+ gpr_timespec gpr_time_0(gpr_clock_type type) nogil
+ gpr_timespec gpr_inf_future(gpr_clock_type type) nogil
+ gpr_timespec gpr_inf_past(gpr_clock_type type) nogil
- gpr_timespec gpr_now(gpr_clock_type clock)
+ gpr_timespec gpr_now(gpr_clock_type clock) nogil
gpr_timespec gpr_convert_clock_type(gpr_timespec t,
- gpr_clock_type target_clock)
+ gpr_clock_type target_clock) nogil
ctypedef enum grpc_status_code:
GRPC_STATUS_OK
@@ -114,15 +114,15 @@ cdef extern from "grpc/_cython/loader.h":
pass
grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
- size_t nslices)
- size_t grpc_byte_buffer_length(grpc_byte_buffer *bb)
- void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer)
+ size_t nslices) nogil
+ size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil
+ void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
- grpc_byte_buffer *buffer)
+ grpc_byte_buffer *buffer) nogil
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
- gpr_slice *slice)
- void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
+ gpr_slice *slice) nogil
+ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
const char *GRPC_ARG_ENABLE_CENSUS
@@ -221,8 +221,8 @@ cdef extern from "grpc/_cython/loader.h":
size_t capacity
grpc_metadata *metadata
- void grpc_metadata_array_init(grpc_metadata_array *array)
- void grpc_metadata_array_destroy(grpc_metadata_array *array)
+ void grpc_metadata_array_init(grpc_metadata_array *array) nogil
+ void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil
ctypedef struct grpc_call_details:
char *method
@@ -231,8 +231,8 @@ cdef extern from "grpc/_cython/loader.h":
size_t host_capacity
gpr_timespec deadline
- void grpc_call_details_init(grpc_call_details *details)
- void grpc_call_details_destroy(grpc_call_details *details)
+ void grpc_call_details_init(grpc_call_details *details) nogil
+ void grpc_call_details_destroy(grpc_call_details *details) nogil
ctypedef enum grpc_op_type:
GRPC_OP_SEND_INITIAL_METADATA
@@ -277,61 +277,62 @@ cdef extern from "grpc/_cython/loader.h":
uint32_t flags
grpc_op_data data
- void grpc_init()
- void grpc_shutdown()
+ void grpc_init() nogil
+ void grpc_shutdown() nogil
- grpc_completion_queue *grpc_completion_queue_create(void *reserved)
+ grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline,
void *reserved) nogil
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline,
void *reserved) nogil
- void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
- void grpc_completion_queue_destroy(grpc_completion_queue *cq)
+ void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil
+ void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil
- grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
- size_t nops, void *tag, void *reserved)
- grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved)
+ grpc_call_error grpc_call_start_batch(
+ grpc_call *call, const grpc_op *ops, size_t nops, void *tag,
+ void *reserved) nogil
+ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil
grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description,
- void *reserved)
- char *grpc_call_get_peer(grpc_call *call)
- void grpc_call_destroy(grpc_call *call)
+ void *reserved) nogil
+ char *grpc_call_get_peer(grpc_call *call) nogil
+ void grpc_call_destroy(grpc_call *call) nogil
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
- void *reserved)
- grpc_call *grpc_channel_create_call(grpc_channel *channel,
- grpc_call *parent_call,
- uint32_t propagation_mask,
- grpc_completion_queue *completion_queue,
- const char *method, const char *host,
- gpr_timespec deadline, void *reserved)
+ void *reserved) nogil
+ grpc_call *grpc_channel_create_call(
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_completion_queue *completion_queue, const char *method,
+ const char *host, gpr_timespec deadline, void *reserved) nogil
grpc_connectivity_state grpc_channel_check_connectivity_state(
- grpc_channel *channel, int try_to_connect)
+ grpc_channel *channel, int try_to_connect) nogil
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
- gpr_timespec deadline, grpc_completion_queue *cq, void *tag)
- char *grpc_channel_get_target(grpc_channel *channel)
- void grpc_channel_destroy(grpc_channel *channel)
+ gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil
+ char *grpc_channel_get_target(grpc_channel *channel) nogil
+ void grpc_channel_destroy(grpc_channel *channel) nogil
- grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved)
+ grpc_server *grpc_server_create(
+ const grpc_channel_args *args, void *reserved) nogil
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata, grpc_completion_queue
*cq_bound_to_call, grpc_completion_queue *cq_for_notification, void
- *tag_new)
+ *tag_new) nogil
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
- void *reserved)
- int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr)
- void grpc_server_start(grpc_server *server)
+ void *reserved) nogil
+ int grpc_server_add_insecure_http2_port(
+ grpc_server *server, const char *addr) nogil
+ void grpc_server_start(grpc_server *server) nogil
void grpc_server_shutdown_and_notify(
- grpc_server *server, grpc_completion_queue *cq, void *tag)
- void grpc_server_cancel_all_calls(grpc_server *server)
- void grpc_server_destroy(grpc_server *server)
+ grpc_server *server, grpc_completion_queue *cq, void *tag) nogil
+ void grpc_server_cancel_all_calls(grpc_server *server) nogil
+ void grpc_server_destroy(grpc_server *server) nogil
ctypedef struct grpc_ssl_pem_key_cert_pair:
const char *private_key
@@ -347,35 +348,36 @@ cdef extern from "grpc/_cython/loader.h":
ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs)
- void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb)
+ void grpc_set_ssl_roots_override_callback(
+ grpc_ssl_roots_override_callback cb) nogil
- grpc_channel_credentials *grpc_google_default_credentials_create()
+ grpc_channel_credentials *grpc_google_default_credentials_create() nogil
grpc_channel_credentials *grpc_ssl_credentials_create(
const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair,
- void *reserved)
+ void *reserved) nogil
grpc_channel_credentials *grpc_composite_channel_credentials_create(
grpc_channel_credentials *creds1, grpc_call_credentials *creds2,
- void *reserved)
- void grpc_channel_credentials_release(grpc_channel_credentials *creds)
+ void *reserved) nogil
+ void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil
grpc_call_credentials *grpc_composite_call_credentials_create(
grpc_call_credentials *creds1, grpc_call_credentials *creds2,
- void *reserved)
+ void *reserved) nogil
grpc_call_credentials *grpc_google_compute_engine_credentials_create(
- void *reserved)
+ void *reserved) nogil
grpc_call_credentials *grpc_service_account_jwt_access_credentials_create(
const char *json_key,
- gpr_timespec token_lifetime, void *reserved)
+ gpr_timespec token_lifetime, void *reserved) nogil
grpc_call_credentials *grpc_google_refresh_token_credentials_create(
- const char *json_refresh_token, void *reserved)
+ const char *json_refresh_token, void *reserved) nogil
grpc_call_credentials *grpc_google_iam_credentials_create(
const char *authorization_token, const char *authority_selector,
- void *reserved)
- void grpc_call_credentials_release(grpc_call_credentials *creds)
+ void *reserved) nogil
+ void grpc_call_credentials_release(grpc_call_credentials *creds) nogil
grpc_channel *grpc_secure_channel_create(
grpc_channel_credentials *creds, const char *target,
- const grpc_channel_args *args, void *reserved)
+ const grpc_channel_args *args, void *reserved) nogil
ctypedef struct grpc_server_credentials:
# We don't care about the internals (and in fact don't know them)
@@ -385,13 +387,13 @@ cdef extern from "grpc/_cython/loader.h":
const char *pem_root_certs,
grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
size_t num_key_cert_pairs, int force_client_auth, void *reserved)
- void grpc_server_credentials_release(grpc_server_credentials *creds)
+ void grpc_server_credentials_release(grpc_server_credentials *creds) nogil
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
- grpc_server_credentials *creds)
+ grpc_server_credentials *creds) nogil
grpc_call_error grpc_call_set_credentials(grpc_call *call,
- grpc_call_credentials *creds)
+ grpc_call_credentials *creds) nogil
ctypedef struct grpc_auth_context:
# We don't care about the internals (and in fact don't know them)
@@ -415,4 +417,4 @@ cdef extern from "grpc/_cython/loader.h":
const char *type
grpc_call_credentials *grpc_metadata_credentials_create_from_plugin(
- grpc_metadata_credentials_plugin plugin, void *reserved)
+ grpc_metadata_credentials_plugin plugin, void *reserved) nogil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index fa4ea99ea9..851389a261 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -107,15 +107,18 @@ cdef class Timespec:
def __cinit__(self, time):
if time is None:
- self.c_time = gpr_now(GPR_CLOCK_REALTIME)
+ with nogil:
+ self.c_time = gpr_now(GPR_CLOCK_REALTIME)
return
if isinstance(time, int):
time = float(time)
if isinstance(time, float):
if time == float("+inf"):
- self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME)
+ with nogil:
+ self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME)
elif time == float("-inf"):
- self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME)
+ with nogil:
+ self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME)
else:
self.c_time.seconds = time
self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9
@@ -131,8 +134,10 @@ cdef class Timespec:
# TODO(atash) ensure that everywhere a Timespec is created that it's
# converted to GPR_CLOCK_REALTIME then and not every time someone wants to
# read values off in Python.
- cdef gpr_timespec real_time = (
- gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME))
+ cdef gpr_timespec real_time
+ with nogil:
+ real_time = (
+ gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME))
return real_time.seconds
@property
@@ -158,10 +163,12 @@ cdef class Timespec:
cdef class CallDetails:
def __cinit__(self):
- grpc_call_details_init(&self.c_details)
+ with nogil:
+ grpc_call_details_init(&self.c_details)
def __dealloc__(self):
- grpc_call_details_destroy(&self.c_details)
+ with nogil:
+ grpc_call_details_destroy(&self.c_details)
@property
def method(self):
@@ -229,10 +236,15 @@ cdef class ByteBuffer:
"ByteBuffer, not {}".format(type(data)))
cdef char *c_data = data
- data_slice = gpr_slice_from_copied_buffer(c_data, len(data))
- self.c_byte_buffer = grpc_raw_byte_buffer_create(
- &data_slice, 1)
- gpr_slice_unref(data_slice)
+ cdef gpr_slice data_slice
+ cdef size_t data_length = len(data)
+ with nogil:
+ data_slice = gpr_slice_from_copied_buffer(c_data, data_length)
+ with nogil:
+ self.c_byte_buffer = grpc_raw_byte_buffer_create(
+ &data_slice, 1)
+ with nogil:
+ gpr_slice_unref(data_slice)
def bytes(self):
cdef grpc_byte_buffer_reader reader
@@ -240,20 +252,27 @@ cdef class ByteBuffer:
cdef size_t data_slice_length
cdef void *data_slice_pointer
if self.c_byte_buffer != NULL:
- grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer)
+ with nogil:
+ grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer)
result = b""
- while grpc_byte_buffer_reader_next(&reader, &data_slice):
- data_slice_pointer = gpr_slice_start_ptr(data_slice)
- data_slice_length = gpr_slice_length(data_slice)
- result += (<char *>data_slice_pointer)[:data_slice_length]
- grpc_byte_buffer_reader_destroy(&reader)
+ with nogil:
+ while grpc_byte_buffer_reader_next(&reader, &data_slice):
+ data_slice_pointer = gpr_slice_start_ptr(data_slice)
+ data_slice_length = gpr_slice_length(data_slice)
+ with gil:
+ result += (<char *>data_slice_pointer)[:data_slice_length]
+ with nogil:
+ grpc_byte_buffer_reader_destroy(&reader)
return result
else:
return None
def __len__(self):
+ cdef size_t result
if self.c_byte_buffer != NULL:
- return grpc_byte_buffer_length(self.c_byte_buffer)
+ with nogil:
+ result = grpc_byte_buffer_length(self.c_byte_buffer)
+ return result
else:
return 0
@@ -262,7 +281,8 @@ cdef class ByteBuffer:
def __dealloc__(self):
if self.c_byte_buffer != NULL:
- grpc_byte_buffer_destroy(self.c_byte_buffer)
+ with nogil:
+ grpc_byte_buffer_destroy(self.c_byte_buffer)
cdef class SslPemKeyCertPair:
@@ -319,14 +339,15 @@ cdef class ChannelArgs:
if not isinstance(arg, ChannelArg):
raise TypeError("expected list of ChannelArg")
self.c_args.arguments_length = len(self.args)
- self.c_args.arguments = <grpc_arg *>gpr_malloc(
- self.c_args.arguments_length*sizeof(grpc_arg)
- )
+ with nogil:
+ self.c_args.arguments = <grpc_arg *>gpr_malloc(
+ self.c_args.arguments_length*sizeof(grpc_arg))
for i in range(self.c_args.arguments_length):
self.c_args.arguments[i] = (<ChannelArg>self.args[i]).c_arg
def __dealloc__(self):
- gpr_free(self.c_args.arguments)
+ with nogil:
+ gpr_free(self.c_args.arguments)
def __len__(self):
# self.args is never stale; it's only updated from this file
@@ -407,21 +428,24 @@ cdef class Metadata:
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
raise TypeError("expected list of Metadatum")
- grpc_metadata_array_init(&self.c_metadata_array)
+ with nogil:
+ grpc_metadata_array_init(&self.c_metadata_array)
self.c_metadata_array.count = len(self.metadata)
self.c_metadata_array.capacity = len(self.metadata)
- self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
- self.c_metadata_array.count*sizeof(grpc_metadata)
- )
+ with nogil:
+ self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
+ self.c_metadata_array.count*sizeof(grpc_metadata)
+ )
for i in range(self.c_metadata_array.count):
self.c_metadata_array.metadata[i] = (
(<Metadatum>self.metadata[i]).c_metadata)
def __dealloc__(self):
# this frees the allocated memory for the grpc_metadata_array (although
- # it'd be nice if that were documented somewhere...) TODO(atash): document
- # this in the C core
- grpc_metadata_array_destroy(&self.c_metadata_array)
+ # it'd be nice if that were documented somewhere...)
+ # TODO(atash): document this in the C core
+ with nogil:
+ grpc_metadata_array_destroy(&self.c_metadata_array)
def __len__(self):
return self.c_metadata_array.count
@@ -526,7 +550,8 @@ cdef class Operation:
# Python. The remaining one(s) are primitive fields filled in by GRPC core.
# This means that we need to clean up after receive_status_on_client.
if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
- gpr_free(self._received_status_details)
+ with nogil:
+ gpr_free(self._received_status_details)
def operation_send_initial_metadata(Metadata metadata):
cdef Operation op = Operation()
@@ -648,8 +673,8 @@ cdef class Operations:
if not isinstance(operation, Operation):
raise TypeError("expected operations to be iterable of Operation")
self.c_nops = len(self.operations)
- self.c_ops = <grpc_op *>gpr_malloc(
- sizeof(grpc_op)*self.c_nops)
+ with nogil:
+ self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops)
for i in range(self.c_nops):
self.c_ops[i] = (<Operation>(self.operations[i])).c_op
@@ -661,7 +686,8 @@ cdef class Operations:
return self.operations[i]
def __dealloc__(self):
- gpr_free(self.c_ops)
+ with nogil:
+ gpr_free(self.c_ops)
def __iter__(self):
return _OperationsIterator(self)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index 5f85923524..a098f11da2 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -41,7 +41,8 @@ cdef class Server:
if arguments is not None:
c_arguments = &arguments.c_args
self.references.append(arguments)
- self.c_server = grpc_server_create(c_arguments, NULL)
+ with nogil:
+ self.c_server = grpc_server_create(c_arguments, NULL)
self.is_started = False
self.is_shutting_down = False
self.is_shutdown = False
@@ -53,6 +54,7 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
+ cdef grpc_call_error result
cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = Call()
operation_tag.request_call_details = CallDetails()
@@ -61,19 +63,22 @@ cdef class Server:
operation_tag.is_new_request = True
operation_tag.batch_operations = Operations([])
cpython.Py_INCREF(operation_tag)
- return grpc_server_request_call(
- self.c_server, &operation_tag.operation_call.c_call,
- &operation_tag.request_call_details.c_details,
- &operation_tag.request_metadata.c_metadata_array,
- call_queue.c_completion_queue, server_queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ with nogil:
+ result = grpc_server_request_call(
+ self.c_server, &operation_tag.operation_call.c_call,
+ &operation_tag.request_call_details.c_details,
+ &operation_tag.request_metadata.c_metadata_array,
+ call_queue.c_completion_queue, server_queue.c_completion_queue,
+ <cpython.PyObject *>operation_tag)
+ return result
def register_completion_queue(
self, CompletionQueue queue not None):
if self.is_started:
raise ValueError("cannot register completion queues after start")
- grpc_server_register_completion_queue(
- self.c_server, queue.c_completion_queue, NULL)
+ with nogil:
+ grpc_server_register_completion_queue(
+ self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
def start(self):
@@ -82,7 +87,8 @@ cdef class Server:
self.backup_shutdown_queue = CompletionQueue()
self.register_completion_queue(self.backup_shutdown_queue)
self.is_started = True
- grpc_server_start(self.c_server)
+ with nogil:
+ grpc_server_start(self.c_server)
# Ensure the core has gotten a chance to do the start-up work
self.backup_shutdown_queue.pluck(None, Timespec(None))
@@ -95,21 +101,28 @@ cdef class Server:
else:
raise TypeError("expected address to be a str or bytes")
self.references.append(address)
+ cdef int result
+ cdef char *address_c_string = address
if server_credentials is not None:
self.references.append(server_credentials)
- return grpc_server_add_secure_http2_port(
- self.c_server, address, server_credentials.c_credentials)
+ with nogil:
+ result = grpc_server_add_secure_http2_port(
+ self.c_server, address_c_string, server_credentials.c_credentials)
else:
- return grpc_server_add_insecure_http2_port(self.c_server, address)
+ with nogil:
+ result = grpc_server_add_insecure_http2_port(self.c_server,
+ address_c_string)
+ return result
cdef _c_shutdown(self, CompletionQueue queue, tag):
self.is_shutting_down = True
operation_tag = OperationTag(tag)
operation_tag.shutting_down_server = self
cpython.Py_INCREF(operation_tag)
- grpc_server_shutdown_and_notify(
- self.c_server, queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ with nogil:
+ grpc_server_shutdown_and_notify(
+ self.c_server, queue.c_completion_queue,
+ <cpython.PyObject *>operation_tag)
def shutdown(self, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag
@@ -134,7 +147,8 @@ cdef class Server:
elif self.is_shutdown:
return
else:
- grpc_server_cancel_all_calls(self.c_server)
+ with nogil:
+ grpc_server_cancel_all_calls(self.c_server)
def __dealloc__(self):
if self.c_server != NULL:
@@ -153,5 +167,6 @@ cdef class Server:
# much but repeatedly release the GIL and wait
while not self.is_shutdown:
time.sleep(0)
- grpc_server_destroy(self.c_server)
+ with nogil:
+ grpc_server_destroy(self.c_server)
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 30cc7a132b..8a0f171ee7 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -57,14 +57,17 @@ cdef class _ModuleState:
'grpc._cython', '_windows/grpc_c.64.python')
if not pygrpc_load_core(filename):
raise ImportError('failed to load core gRPC library')
- grpc_init()
+ with nogil:
+ grpc_init()
self.is_loaded = True
- grpc_set_ssl_roots_override_callback(
- <grpc_ssl_roots_override_callback>ssl_roots_override_callback)
+ with nogil:
+ grpc_set_ssl_roots_override_callback(
+ <grpc_ssl_roots_override_callback>ssl_roots_override_callback)
def __dealloc__(self):
if self.is_loaded:
- grpc_shutdown()
+ with nogil:
+ grpc_shutdown()
_module_state = _ModuleState()