diff options
author | Masood Malekghassemi <atash@google.com> | 2016-03-15 15:54:25 -0700 |
---|---|---|
committer | Masood Malekghassemi <atash@google.com> | 2016-03-15 17:59:32 -0700 |
commit | 3d2d5b9d279a48d930e34e398081d1ae784e7d42 (patch) | |
tree | 1d7e6c3590c1ad5771a69442a6f8ca398359c749 /src/python/grpcio | |
parent | 85b460fb516e76843c3b091fb6fb4e801878d275 (diff) |
Don't hold the GIL when calling anything in core
Diffstat (limited to 'src/python/grpcio')
8 files changed, 296 insertions, 194 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 80f4da51e8..d1b9c98ffc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -40,14 +40,17 @@ cdef class Call: def start_batch(self, operations, tag): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") + cdef grpc_call_error result cdef Operations cy_operations = Operations(operations) cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = self operation_tag.batch_operations = cy_operations cpython.Py_INCREF(operation_tag) - return grpc_call_start_batch( - self.c_call, cy_operations.c_ops, cy_operations.c_nops, - <cpython.PyObject *>operation_tag, NULL) + with nogil: + result = grpc_call_start_batch( + self.c_call, cy_operations.c_ops, cy_operations.c_nops, + <cpython.PyObject *>operation_tag, NULL) + return result def cancel( self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE, @@ -57,6 +60,8 @@ cdef class Call: if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE): raise ValueError("if error_code is specified, so must details " "(and vice-versa)") + cdef grpc_call_error result + cdef char *c_details = NULL if error_code != GRPC_STATUS__DO_NOT_USE: if isinstance(details, bytes): pass @@ -65,25 +70,37 @@ cdef class Call: else: raise TypeError("expected details to be str or bytes") self.references.append(details) - return grpc_call_cancel_with_status( - self.c_call, error_code, details, NULL) + c_details = details + with nogil: + result = grpc_call_cancel_with_status( + self.c_call, error_code, c_details, NULL) + return result else: - return grpc_call_cancel(self.c_call, NULL) + with nogil: + result = grpc_call_cancel(self.c_call, NULL) + return result def set_credentials( self, CallCredentials call_credentials not None): - return grpc_call_set_credentials( - self.c_call, call_credentials.c_credentials) + cdef grpc_call_error result + with nogil: + result = grpc_call_set_credentials( + self.c_call, call_credentials.c_credentials) + return result def peer(self): - cdef char *peer = grpc_call_get_peer(self.c_call) + cdef char *peer = NULL + with nogil: + peer = grpc_call_get_peer(self.c_call) result = <bytes>peer - gpr_free(peer) + with nogil: + gpr_free(peer) return result def __dealloc__(self): if self.c_call != NULL: - grpc_call_destroy(self.c_call) + with nogil: + grpc_call_destroy(self.c_call) # The object *should* always be valid from Python. Used for debugging. @property diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index bc03c4dcf1..d612c90791 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -35,6 +35,7 @@ cdef class Channel: def __cinit__(self, target, ChannelArgs arguments=None, ChannelCredentials channel_credentials=None): cdef grpc_channel_args *c_arguments = NULL + cdef char *c_target = NULL self.c_channel = NULL self.references = [] if arguments is not None: @@ -45,12 +46,15 @@ cdef class Channel: target = target.encode() else: raise TypeError("expected target to be str or bytes") + c_target = target if channel_credentials is None: - self.c_channel = grpc_insecure_channel_create(target, c_arguments, - NULL) + with nogil: + self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, + NULL) else: - self.c_channel = grpc_secure_channel_create( - channel_credentials.c_credentials, target, c_arguments, NULL) + with nogil: + self.c_channel = grpc_secure_channel_create( + channel_credentials.c_credentials, c_target, c_arguments, NULL) self.references.append(channel_credentials) self.references.append(target) self.references.append(arguments) @@ -66,6 +70,7 @@ cdef class Channel: method = method.encode() else: raise TypeError("expected method to be str or bytes") + cdef char *method_c_string = method cdef char *host_c_string = NULL if host is None: pass @@ -81,31 +86,40 @@ cdef class Channel: cdef grpc_call *parent_call = NULL if parent is not None: parent_call = parent.c_call - operation_call.c_call = grpc_channel_create_call( - self.c_channel, parent_call, flags, - queue.c_completion_queue, method, host_c_string, deadline.c_time, - NULL) + with nogil: + operation_call.c_call = grpc_channel_create_call( + self.c_channel, parent_call, flags, + queue.c_completion_queue, method_c_string, host_c_string, + deadline.c_time, NULL) return operation_call def check_connectivity_state(self, bint try_to_connect): - return grpc_channel_check_connectivity_state(self.c_channel, - try_to_connect) + cdef grpc_connectivity_state result + with nogil: + result = grpc_channel_check_connectivity_state(self.c_channel, + try_to_connect) + return result def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, Timespec deadline not None, CompletionQueue queue not None, tag): cdef OperationTag operation_tag = OperationTag(tag) cpython.Py_INCREF(operation_tag) - grpc_channel_watch_connectivity_state( - self.c_channel, last_observed_state, deadline.c_time, - queue.c_completion_queue, <cpython.PyObject *>operation_tag) + with nogil: + grpc_channel_watch_connectivity_state( + self.c_channel, last_observed_state, deadline.c_time, + queue.c_completion_queue, <cpython.PyObject *>operation_tag) def target(self): - cdef char * target = grpc_channel_get_target(self.c_channel) + cdef char *target = NULL + with nogil: + target = grpc_channel_get_target(self.c_channel) result = <bytes>target - gpr_free(target) + with nogil: + gpr_free(target) return result def __dealloc__(self): if self.c_channel != NULL: - grpc_channel_destroy(self.c_channel) + with nogil: + grpc_channel_destroy(self.c_channel) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index e11138b1cd..09e47d4222 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -36,7 +36,8 @@ import time cdef class CompletionQueue: def __cinit__(self): - self.c_completion_queue = grpc_completion_queue_create(NULL) + with nogil: + self.c_completion_queue = grpc_completion_queue_create(NULL) self.is_shutting_down = False self.is_shutdown = False self.pluck_condition = threading.Condition() @@ -82,8 +83,9 @@ cdef class CompletionQueue: def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). - cdef gpr_timespec c_deadline = gpr_inf_future( - GPR_CLOCK_REALTIME) + cdef gpr_timespec c_deadline + with nogil: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if deadline is not None: c_deadline = deadline.c_time cdef grpc_event event @@ -123,7 +125,8 @@ cdef class CompletionQueue: return self._interpret_event(event) def shutdown(self): - grpc_completion_queue_shutdown(self.c_completion_queue) + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) self.is_shutting_down = True def clear(self): @@ -133,15 +136,19 @@ cdef class CompletionQueue: pass def __dealloc__(self): - cdef gpr_timespec c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + cdef gpr_timespec c_deadline + with nogil: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if self.c_completion_queue != NULL: # Ensure shutdown if not self.is_shutting_down: - grpc_completion_queue_shutdown(self.c_completion_queue) + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) # Pump the queue while not self.is_shutdown: with nogil: event = grpc_completion_queue_next( self.c_completion_queue, c_deadline, NULL) self._interpret_event(event) - grpc_completion_queue_destroy(self.c_completion_queue) + with nogil: + grpc_completion_queue_destroy(self.c_completion_queue) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 3f439c8900..1d7adca23e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -46,7 +46,8 @@ cdef class ChannelCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_channel_credentials_release(self.c_credentials) + with nogil: + grpc_channel_credentials_release(self.c_credentials) cdef class CallCredentials: @@ -63,7 +64,8 @@ cdef class CallCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_call_credentials_release(self.c_credentials) + with nogil: + grpc_call_credentials_release(self.c_credentials) cdef class ServerCredentials: @@ -74,7 +76,8 @@ cdef class ServerCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_server_credentials_release(self.c_credentials) + with nogil: + grpc_server_credentials_release(self.c_credentials) cdef class CredentialsMetadataPlugin: @@ -139,7 +142,8 @@ cdef void plugin_destroy_c_plugin_state(void *state): def channel_credentials_google_default(): cdef ChannelCredentials credentials = ChannelCredentials(); - credentials.c_credentials = grpc_google_default_credentials_create() + with nogil: + credentials.c_credentials = grpc_google_default_credentials_create() return credentials def channel_credentials_ssl(pem_root_certificates, @@ -158,12 +162,14 @@ def channel_credentials_ssl(pem_root_certificates, c_pem_root_certificates = pem_root_certificates credentials.references.append(pem_root_certificates) if ssl_pem_key_cert_pair is not None: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) + with nogil: + credentials.c_credentials = grpc_ssl_credentials_create( + c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) credentials.references.append(ssl_pem_key_cert_pair) else: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, NULL, NULL) + with nogil: + credentials.c_credentials = grpc_ssl_credentials_create( + c_pem_root_certificates, NULL, NULL) return credentials def channel_credentials_composite( @@ -172,8 +178,9 @@ def channel_credentials_composite( if not credentials_1.is_valid or not credentials_2.is_valid: raise ValueError("passed credentials must both be valid") cdef ChannelCredentials credentials = ChannelCredentials() - credentials.c_credentials = grpc_composite_channel_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) + with nogil: + credentials.c_credentials = grpc_composite_channel_credentials_create( + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials @@ -184,16 +191,18 @@ def call_credentials_composite( if not credentials_1.is_valid or not credentials_2.is_valid: raise ValueError("passed credentials must both be valid") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_composite_call_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) + with nogil: + credentials.c_credentials = grpc_composite_call_credentials_create( + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials def call_credentials_google_compute_engine(): cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_google_compute_engine_credentials_create(NULL)) + with nogil: + credentials.c_credentials = ( + grpc_google_compute_engine_credentials_create(NULL)) return credentials def call_credentials_service_account_jwt_access( @@ -205,9 +214,11 @@ def call_credentials_service_account_jwt_access( else: raise TypeError("expected json_key to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_service_account_jwt_access_credentials_create( - json_key, token_lifetime.c_time, NULL)) + cdef char *json_key_c_string = json_key + with nogil: + credentials.c_credentials = ( + grpc_service_account_jwt_access_credentials_create( + json_key_c_string, token_lifetime.c_time, NULL)) credentials.references.append(json_key) return credentials @@ -219,8 +230,10 @@ def call_credentials_google_refresh_token(json_refresh_token): else: raise TypeError("expected json_refresh_token to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_google_refresh_token_credentials_create( - json_refresh_token, NULL) + cdef char *json_refresh_token_c_string = json_refresh_token + with nogil: + credentials.c_credentials = grpc_google_refresh_token_credentials_create( + json_refresh_token_c_string, NULL) credentials.references.append(json_refresh_token) return credentials @@ -238,17 +251,21 @@ def call_credentials_google_iam(authorization_token, authority_selector): else: raise TypeError("expected authority_selector to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_google_iam_credentials_create( - authorization_token, authority_selector, NULL) + cdef char *authorization_token_c_string = authorization_token + cdef char *authority_selector_c_string = authority_selector + with nogil: + credentials.c_credentials = grpc_google_iam_credentials_create( + authorization_token_c_string, authority_selector_c_string, NULL) credentials.references.append(authorization_token) credentials.references.append(authority_selector) return credentials def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin): cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_metadata_credentials_create_from_plugin(plugin.make_c_plugin(), - NULL)) + cdef grpc_metadata_credentials_plugin c_plugin = plugin.make_c_plugin() + with nogil: + credentials.c_credentials = ( + grpc_metadata_credentials_create_from_plugin(c_plugin, NULL)) # TODO(atash): the following held reference is *probably* never necessary credentials.references.append(plugin) return credentials @@ -274,11 +291,12 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, credentials.references.append(pem_key_cert_pairs) credentials.references.append(pem_root_certs) credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs) - credentials.c_ssl_pem_key_cert_pairs = ( - <grpc_ssl_pem_key_cert_pair *>gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair) * - credentials.c_ssl_pem_key_cert_pairs_count - )) + with nogil: + credentials.c_ssl_pem_key_cert_pairs = ( + <grpc_ssl_pem_key_cert_pair *>gpr_malloc( + sizeof(grpc_ssl_pem_key_cert_pair) * + credentials.c_ssl_pem_key_cert_pairs_count + )) for i in range(credentials.c_ssl_pem_key_cert_pairs_count): credentials.c_ssl_pem_key_cert_pairs[i] = ( (<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index dbf0045710..61165cb021 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -38,27 +38,27 @@ cdef extern from "grpc/_cython/loader.h": int pygrpc_load_core(char*) - void *gpr_malloc(size_t size) - void gpr_free(void *ptr) - void *gpr_realloc(void *p, size_t size) + void *gpr_malloc(size_t size) nogil + void gpr_free(void *ptr) nogil + void *gpr_realloc(void *p, size_t size) nogil ctypedef struct gpr_slice: # don't worry about writing out the members of gpr_slice; we never access # them directly. pass - gpr_slice gpr_slice_ref(gpr_slice s) - void gpr_slice_unref(gpr_slice s) - gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) + gpr_slice gpr_slice_ref(gpr_slice s) nogil + void gpr_slice_unref(gpr_slice s) nogil + gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil gpr_slice gpr_slice_new_with_len( - void *p, size_t len, void (*destroy)(void *, size_t)) - gpr_slice gpr_slice_malloc(size_t length) - gpr_slice gpr_slice_from_copied_string(const char *source) - gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) + void *p, size_t len, void (*destroy)(void *, size_t)) nogil + gpr_slice gpr_slice_malloc(size_t length) nogil + gpr_slice gpr_slice_from_copied_string(const char *source) nogil + gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil # Declare functions for function-like macros (because Cython)... - void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) - size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) + void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil + size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil ctypedef enum gpr_clock_type: GPR_CLOCK_MONOTONIC @@ -71,14 +71,14 @@ cdef extern from "grpc/_cython/loader.h": int32_t nanoseconds "tv_nsec" gpr_clock_type clock_type - gpr_timespec gpr_time_0(gpr_clock_type type) - gpr_timespec gpr_inf_future(gpr_clock_type type) - gpr_timespec gpr_inf_past(gpr_clock_type type) + gpr_timespec gpr_time_0(gpr_clock_type type) nogil + gpr_timespec gpr_inf_future(gpr_clock_type type) nogil + gpr_timespec gpr_inf_past(gpr_clock_type type) nogil - gpr_timespec gpr_now(gpr_clock_type clock) + gpr_timespec gpr_now(gpr_clock_type clock) nogil gpr_timespec gpr_convert_clock_type(gpr_timespec t, - gpr_clock_type target_clock) + gpr_clock_type target_clock) nogil ctypedef enum grpc_status_code: GRPC_STATUS_OK @@ -114,15 +114,15 @@ cdef extern from "grpc/_cython/loader.h": pass grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices, - size_t nslices) - size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) - void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) + size_t nslices) nogil + size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil + void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, - grpc_byte_buffer *buffer) + grpc_byte_buffer *buffer) nogil int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, - gpr_slice *slice) - void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) + gpr_slice *slice) nogil + void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING const char *GRPC_ARG_ENABLE_CENSUS @@ -221,8 +221,8 @@ cdef extern from "grpc/_cython/loader.h": size_t capacity grpc_metadata *metadata - void grpc_metadata_array_init(grpc_metadata_array *array) - void grpc_metadata_array_destroy(grpc_metadata_array *array) + void grpc_metadata_array_init(grpc_metadata_array *array) nogil + void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil ctypedef struct grpc_call_details: char *method @@ -231,8 +231,8 @@ cdef extern from "grpc/_cython/loader.h": size_t host_capacity gpr_timespec deadline - void grpc_call_details_init(grpc_call_details *details) - void grpc_call_details_destroy(grpc_call_details *details) + void grpc_call_details_init(grpc_call_details *details) nogil + void grpc_call_details_destroy(grpc_call_details *details) nogil ctypedef enum grpc_op_type: GRPC_OP_SEND_INITIAL_METADATA @@ -277,61 +277,62 @@ cdef extern from "grpc/_cython/loader.h": uint32_t flags grpc_op_data data - void grpc_init() - void grpc_shutdown() + void grpc_init() nogil + void grpc_shutdown() nogil - grpc_completion_queue *grpc_completion_queue_create(void *reserved) + grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) nogil grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) nogil - void grpc_completion_queue_shutdown(grpc_completion_queue *cq) - void grpc_completion_queue_destroy(grpc_completion_queue *cq) + void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil + void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil - grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag, void *reserved) - grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) + grpc_call_error grpc_call_start_batch( + grpc_call *call, const grpc_op *ops, size_t nops, void *tag, + void *reserved) nogil + grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description, - void *reserved) - char *grpc_call_get_peer(grpc_call *call) - void grpc_call_destroy(grpc_call *call) + void *reserved) nogil + char *grpc_call_get_peer(grpc_call *call) nogil + void grpc_call_destroy(grpc_call *call) nogil grpc_channel *grpc_insecure_channel_create(const char *target, const grpc_channel_args *args, - void *reserved) - grpc_call *grpc_channel_create_call(grpc_channel *channel, - grpc_call *parent_call, - uint32_t propagation_mask, - grpc_completion_queue *completion_queue, - const char *method, const char *host, - gpr_timespec deadline, void *reserved) + void *reserved) nogil + grpc_call *grpc_channel_create_call( + grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, + grpc_completion_queue *completion_queue, const char *method, + const char *host, gpr_timespec deadline, void *reserved) nogil grpc_connectivity_state grpc_channel_check_connectivity_state( - grpc_channel *channel, int try_to_connect) + grpc_channel *channel, int try_to_connect) nogil void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, - gpr_timespec deadline, grpc_completion_queue *cq, void *tag) - char *grpc_channel_get_target(grpc_channel *channel) - void grpc_channel_destroy(grpc_channel *channel) + gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil + char *grpc_channel_get_target(grpc_channel *channel) nogil + void grpc_channel_destroy(grpc_channel *channel) nogil - grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) + grpc_server *grpc_server_create( + const grpc_channel_args *args, void *reserved) nogil grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void - *tag_new) + *tag_new) nogil void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - void *reserved) - int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) - void grpc_server_start(grpc_server *server) + void *reserved) nogil + int grpc_server_add_insecure_http2_port( + grpc_server *server, const char *addr) nogil + void grpc_server_start(grpc_server *server) nogil void grpc_server_shutdown_and_notify( - grpc_server *server, grpc_completion_queue *cq, void *tag) - void grpc_server_cancel_all_calls(grpc_server *server) - void grpc_server_destroy(grpc_server *server) + grpc_server *server, grpc_completion_queue *cq, void *tag) nogil + void grpc_server_cancel_all_calls(grpc_server *server) nogil + void grpc_server_destroy(grpc_server *server) nogil ctypedef struct grpc_ssl_pem_key_cert_pair: const char *private_key @@ -347,35 +348,36 @@ cdef extern from "grpc/_cython/loader.h": ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs) - void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) + void grpc_set_ssl_roots_override_callback( + grpc_ssl_roots_override_callback cb) nogil - grpc_channel_credentials *grpc_google_default_credentials_create() + grpc_channel_credentials *grpc_google_default_credentials_create() nogil grpc_channel_credentials *grpc_ssl_credentials_create( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, - void *reserved) + void *reserved) nogil grpc_channel_credentials *grpc_composite_channel_credentials_create( grpc_channel_credentials *creds1, grpc_call_credentials *creds2, - void *reserved) - void grpc_channel_credentials_release(grpc_channel_credentials *creds) + void *reserved) nogil + void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil grpc_call_credentials *grpc_composite_call_credentials_create( grpc_call_credentials *creds1, grpc_call_credentials *creds2, - void *reserved) + void *reserved) nogil grpc_call_credentials *grpc_google_compute_engine_credentials_create( - void *reserved) + void *reserved) nogil grpc_call_credentials *grpc_service_account_jwt_access_credentials_create( const char *json_key, - gpr_timespec token_lifetime, void *reserved) + gpr_timespec token_lifetime, void *reserved) nogil grpc_call_credentials *grpc_google_refresh_token_credentials_create( - const char *json_refresh_token, void *reserved) + const char *json_refresh_token, void *reserved) nogil grpc_call_credentials *grpc_google_iam_credentials_create( const char *authorization_token, const char *authority_selector, - void *reserved) - void grpc_call_credentials_release(grpc_call_credentials *creds) + void *reserved) nogil + void grpc_call_credentials_release(grpc_call_credentials *creds) nogil grpc_channel *grpc_secure_channel_create( grpc_channel_credentials *creds, const char *target, - const grpc_channel_args *args, void *reserved) + const grpc_channel_args *args, void *reserved) nogil ctypedef struct grpc_server_credentials: # We don't care about the internals (and in fact don't know them) @@ -385,13 +387,13 @@ cdef extern from "grpc/_cython/loader.h": const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved) - void grpc_server_credentials_release(grpc_server_credentials *creds) + void grpc_server_credentials_release(grpc_server_credentials *creds) nogil int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, - grpc_server_credentials *creds) + grpc_server_credentials *creds) nogil grpc_call_error grpc_call_set_credentials(grpc_call *call, - grpc_call_credentials *creds) + grpc_call_credentials *creds) nogil ctypedef struct grpc_auth_context: # We don't care about the internals (and in fact don't know them) @@ -415,4 +417,4 @@ cdef extern from "grpc/_cython/loader.h": const char *type grpc_call_credentials *grpc_metadata_credentials_create_from_plugin( - grpc_metadata_credentials_plugin plugin, void *reserved) + grpc_metadata_credentials_plugin plugin, void *reserved) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index fa4ea99ea9..851389a261 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -107,15 +107,18 @@ cdef class Timespec: def __cinit__(self, time): if time is None: - self.c_time = gpr_now(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_now(GPR_CLOCK_REALTIME) return if isinstance(time, int): time = float(time) if isinstance(time, float): if time == float("+inf"): - self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) elif time == float("-inf"): - self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) else: self.c_time.seconds = time self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9 @@ -131,8 +134,10 @@ cdef class Timespec: # TODO(atash) ensure that everywhere a Timespec is created that it's # converted to GPR_CLOCK_REALTIME then and not every time someone wants to # read values off in Python. - cdef gpr_timespec real_time = ( - gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) + cdef gpr_timespec real_time + with nogil: + real_time = ( + gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) return real_time.seconds @property @@ -158,10 +163,12 @@ cdef class Timespec: cdef class CallDetails: def __cinit__(self): - grpc_call_details_init(&self.c_details) + with nogil: + grpc_call_details_init(&self.c_details) def __dealloc__(self): - grpc_call_details_destroy(&self.c_details) + with nogil: + grpc_call_details_destroy(&self.c_details) @property def method(self): @@ -229,10 +236,15 @@ cdef class ByteBuffer: "ByteBuffer, not {}".format(type(data))) cdef char *c_data = data - data_slice = gpr_slice_from_copied_buffer(c_data, len(data)) - self.c_byte_buffer = grpc_raw_byte_buffer_create( - &data_slice, 1) - gpr_slice_unref(data_slice) + cdef gpr_slice data_slice + cdef size_t data_length = len(data) + with nogil: + data_slice = gpr_slice_from_copied_buffer(c_data, data_length) + with nogil: + self.c_byte_buffer = grpc_raw_byte_buffer_create( + &data_slice, 1) + with nogil: + gpr_slice_unref(data_slice) def bytes(self): cdef grpc_byte_buffer_reader reader @@ -240,20 +252,27 @@ cdef class ByteBuffer: cdef size_t data_slice_length cdef void *data_slice_pointer if self.c_byte_buffer != NULL: - grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) + with nogil: + grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) result = b"" - while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = gpr_slice_start_ptr(data_slice) - data_slice_length = gpr_slice_length(data_slice) - result += (<char *>data_slice_pointer)[:data_slice_length] - grpc_byte_buffer_reader_destroy(&reader) + with nogil: + while grpc_byte_buffer_reader_next(&reader, &data_slice): + data_slice_pointer = gpr_slice_start_ptr(data_slice) + data_slice_length = gpr_slice_length(data_slice) + with gil: + result += (<char *>data_slice_pointer)[:data_slice_length] + with nogil: + grpc_byte_buffer_reader_destroy(&reader) return result else: return None def __len__(self): + cdef size_t result if self.c_byte_buffer != NULL: - return grpc_byte_buffer_length(self.c_byte_buffer) + with nogil: + result = grpc_byte_buffer_length(self.c_byte_buffer) + return result else: return 0 @@ -262,7 +281,8 @@ cdef class ByteBuffer: def __dealloc__(self): if self.c_byte_buffer != NULL: - grpc_byte_buffer_destroy(self.c_byte_buffer) + with nogil: + grpc_byte_buffer_destroy(self.c_byte_buffer) cdef class SslPemKeyCertPair: @@ -319,14 +339,15 @@ cdef class ChannelArgs: if not isinstance(arg, ChannelArg): raise TypeError("expected list of ChannelArg") self.c_args.arguments_length = len(self.args) - self.c_args.arguments = <grpc_arg *>gpr_malloc( - self.c_args.arguments_length*sizeof(grpc_arg) - ) + with nogil: + self.c_args.arguments = <grpc_arg *>gpr_malloc( + self.c_args.arguments_length*sizeof(grpc_arg)) for i in range(self.c_args.arguments_length): self.c_args.arguments[i] = (<ChannelArg>self.args[i]).c_arg def __dealloc__(self): - gpr_free(self.c_args.arguments) + with nogil: + gpr_free(self.c_args.arguments) def __len__(self): # self.args is never stale; it's only updated from this file @@ -407,21 +428,24 @@ cdef class Metadata: for metadatum in metadata: if not isinstance(metadatum, Metadatum): raise TypeError("expected list of Metadatum") - grpc_metadata_array_init(&self.c_metadata_array) + with nogil: + grpc_metadata_array_init(&self.c_metadata_array) self.c_metadata_array.count = len(self.metadata) self.c_metadata_array.capacity = len(self.metadata) - self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( - self.c_metadata_array.count*sizeof(grpc_metadata) - ) + with nogil: + self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( + self.c_metadata_array.count*sizeof(grpc_metadata) + ) for i in range(self.c_metadata_array.count): self.c_metadata_array.metadata[i] = ( (<Metadatum>self.metadata[i]).c_metadata) def __dealloc__(self): # this frees the allocated memory for the grpc_metadata_array (although - # it'd be nice if that were documented somewhere...) TODO(atash): document - # this in the C core - grpc_metadata_array_destroy(&self.c_metadata_array) + # it'd be nice if that were documented somewhere...) + # TODO(atash): document this in the C core + with nogil: + grpc_metadata_array_destroy(&self.c_metadata_array) def __len__(self): return self.c_metadata_array.count @@ -526,7 +550,8 @@ cdef class Operation: # Python. The remaining one(s) are primitive fields filled in by GRPC core. # This means that we need to clean up after receive_status_on_client. if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT: - gpr_free(self._received_status_details) + with nogil: + gpr_free(self._received_status_details) def operation_send_initial_metadata(Metadata metadata): cdef Operation op = Operation() @@ -648,8 +673,8 @@ cdef class Operations: if not isinstance(operation, Operation): raise TypeError("expected operations to be iterable of Operation") self.c_nops = len(self.operations) - self.c_ops = <grpc_op *>gpr_malloc( - sizeof(grpc_op)*self.c_nops) + with nogil: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops) for i in range(self.c_nops): self.c_ops[i] = (<Operation>(self.operations[i])).c_op @@ -661,7 +686,8 @@ cdef class Operations: return self.operations[i] def __dealloc__(self): - gpr_free(self.c_ops) + with nogil: + gpr_free(self.c_ops) def __iter__(self): return _OperationsIterator(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 5f85923524..a098f11da2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -41,7 +41,8 @@ cdef class Server: if arguments is not None: c_arguments = &arguments.c_args self.references.append(arguments) - self.c_server = grpc_server_create(c_arguments, NULL) + with nogil: + self.c_server = grpc_server_create(c_arguments, NULL) self.is_started = False self.is_shutting_down = False self.is_shutdown = False @@ -53,6 +54,7 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") + cdef grpc_call_error result cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = Call() operation_tag.request_call_details = CallDetails() @@ -61,19 +63,22 @@ cdef class Server: operation_tag.is_new_request = True operation_tag.batch_operations = Operations([]) cpython.Py_INCREF(operation_tag) - return grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag.request_metadata.c_metadata_array, - call_queue.c_completion_queue, server_queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + with nogil: + result = grpc_server_request_call( + self.c_server, &operation_tag.operation_call.c_call, + &operation_tag.request_call_details.c_details, + &operation_tag.request_metadata.c_metadata_array, + call_queue.c_completion_queue, server_queue.c_completion_queue, + <cpython.PyObject *>operation_tag) + return result def register_completion_queue( self, CompletionQueue queue not None): if self.is_started: raise ValueError("cannot register completion queues after start") - grpc_server_register_completion_queue( - self.c_server, queue.c_completion_queue, NULL) + with nogil: + grpc_server_register_completion_queue( + self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) def start(self): @@ -82,7 +87,8 @@ cdef class Server: self.backup_shutdown_queue = CompletionQueue() self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True - grpc_server_start(self.c_server) + with nogil: + grpc_server_start(self.c_server) # Ensure the core has gotten a chance to do the start-up work self.backup_shutdown_queue.pluck(None, Timespec(None)) @@ -95,21 +101,28 @@ cdef class Server: else: raise TypeError("expected address to be a str or bytes") self.references.append(address) + cdef int result + cdef char *address_c_string = address if server_credentials is not None: self.references.append(server_credentials) - return grpc_server_add_secure_http2_port( - self.c_server, address, server_credentials.c_credentials) + with nogil: + result = grpc_server_add_secure_http2_port( + self.c_server, address_c_string, server_credentials.c_credentials) else: - return grpc_server_add_insecure_http2_port(self.c_server, address) + with nogil: + result = grpc_server_add_insecure_http2_port(self.c_server, + address_c_string) + return result cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True operation_tag = OperationTag(tag) operation_tag.shutting_down_server = self cpython.Py_INCREF(operation_tag) - grpc_server_shutdown_and_notify( - self.c_server, queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + with nogil: + grpc_server_shutdown_and_notify( + self.c_server, queue.c_completion_queue, + <cpython.PyObject *>operation_tag) def shutdown(self, CompletionQueue queue not None, tag): cdef OperationTag operation_tag @@ -134,7 +147,8 @@ cdef class Server: elif self.is_shutdown: return else: - grpc_server_cancel_all_calls(self.c_server) + with nogil: + grpc_server_cancel_all_calls(self.c_server) def __dealloc__(self): if self.c_server != NULL: @@ -153,5 +167,6 @@ cdef class Server: # much but repeatedly release the GIL and wait while not self.is_shutdown: time.sleep(0) - grpc_server_destroy(self.c_server) + with nogil: + grpc_server_destroy(self.c_server) diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 30cc7a132b..8a0f171ee7 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -57,14 +57,17 @@ cdef class _ModuleState: 'grpc._cython', '_windows/grpc_c.64.python') if not pygrpc_load_core(filename): raise ImportError('failed to load core gRPC library') - grpc_init() + with nogil: + grpc_init() self.is_loaded = True - grpc_set_ssl_roots_override_callback( - <grpc_ssl_roots_override_callback>ssl_roots_override_callback) + with nogil: + grpc_set_ssl_roots_override_callback( + <grpc_ssl_roots_override_callback>ssl_roots_override_callback) def __dealloc__(self): if self.is_loaded: - grpc_shutdown() + with nogil: + grpc_shutdown() _module_state = _ModuleState() |