diff options
Diffstat (limited to 'src/python/grpcio/grpc/_cython')
6 files changed, 47 insertions, 34 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi index e0e068e452..01b8237484 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi @@ -28,19 +28,22 @@ cdef tuple _wrap_grpc_arg(grpc_arg arg) cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg) -cdef class _ArgumentProcessor: +cdef class _ChannelArg: cdef grpc_arg c_argument cdef void c(self, argument, grpc_arg_pointer_vtable *vtable, references) except * -cdef class _ArgumentsProcessor: +cdef class _ChannelArgs: cdef readonly tuple _arguments - cdef list _argument_processors + cdef list _channel_args cdef readonly list _references cdef grpc_channel_args _c_arguments - cdef grpc_channel_args *c(self, grpc_arg_pointer_vtable *vtable) except * - cdef un_c(self) + cdef void _c(self, grpc_arg_pointer_vtable *vtable) except * + cdef grpc_channel_args *c_args(self) except * + + @staticmethod + cdef _ChannelArgs from_args(object arguments, grpc_arg_pointer_vtable * vtable) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi index b7a4277ff6..bf12871015 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi @@ -50,7 +50,7 @@ cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg): return wrapped.arg -cdef class _ArgumentProcessor: +cdef class _ChannelArg: cdef void c(self, argument, grpc_arg_pointer_vtable *vtable, references) except *: key, value = argument @@ -82,27 +82,34 @@ cdef class _ArgumentProcessor: 'Expected int, bytes, or behavior, got {}'.format(type(value))) -cdef class _ArgumentsProcessor: +cdef class _ChannelArgs: def __cinit__(self, arguments): self._arguments = () if arguments is None else tuple(arguments) - self._argument_processors = [] + self._channel_args = [] self._references = [] + self._c_arguments.arguments = NULL - cdef grpc_channel_args *c(self, grpc_arg_pointer_vtable *vtable) except *: + cdef void _c(self, grpc_arg_pointer_vtable *vtable) except *: self._c_arguments.arguments_length = len(self._arguments) - if self._c_arguments.arguments_length == 0: - return NULL - else: + if self._c_arguments.arguments_length != 0: self._c_arguments.arguments = <grpc_arg *>gpr_malloc( self._c_arguments.arguments_length * sizeof(grpc_arg)) for index, argument in enumerate(self._arguments): - argument_processor = _ArgumentProcessor() - argument_processor.c(argument, vtable, self._references) - self._c_arguments.arguments[index] = argument_processor.c_argument - self._argument_processors.append(argument_processor) - return &self._c_arguments - - cdef un_c(self): - if self._arguments: + channel_arg = _ChannelArg() + channel_arg.c(argument, vtable, self._references) + self._c_arguments.arguments[index] = channel_arg.c_argument + self._channel_args.append(channel_arg) + + cdef grpc_channel_args *c_args(self) except *: + return &self._c_arguments + + def __dealloc__(self): + if self._c_arguments.arguments != NULL: gpr_free(self._c_arguments.arguments) + + @staticmethod + cdef _ChannelArgs from_args(object arguments, grpc_arg_pointer_vtable * vtable): + cdef _ChannelArgs channel_args = _ChannelArgs(arguments) + channel_args._c(vtable) + return channel_args diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 135d224095..70d4abb730 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -423,16 +423,15 @@ cdef class Channel: self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer self._vtable.cmp = &_compare_pointer - cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( - arguments) - cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) + cdef _ChannelArgs channel_args = _ChannelArgs.from_args( + arguments, &self._vtable) if channel_credentials is None: self._state.c_channel = grpc_insecure_channel_create( - <char *>target, c_arguments, NULL) + <char *>target, channel_args.c_args(), NULL) else: c_channel_credentials = channel_credentials.c() self._state.c_channel = grpc_secure_channel_create( - c_channel_credentials, <char *>target, c_arguments, NULL) + c_channel_credentials, <char *>target, channel_args.c_args(), NULL) grpc_channel_credentials_release(c_channel_credentials) self._state.c_call_completion_queue = ( grpc_completion_queue_create_for_next(NULL)) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 141116df5d..3c33b46dbb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -49,7 +49,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): cdef _interpret_event(grpc_event c_event): cdef _Tag tag if c_event.type == GRPC_QUEUE_TIMEOUT: - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + # TODO(ericgribkoff) Do not coopt ConnectivityEvent here. return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) elif c_event.type == GRPC_QUEUE_SHUTDOWN: # NOTE(nathaniel): For now we coopt ConnectivityEvent here. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi index 52cfccb677..4a6fbe0f96 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi @@ -16,7 +16,6 @@ cdef class Server: cdef grpc_arg_pointer_vtable _vtable - cdef readonly _ArgumentsProcessor _arguments_processor cdef grpc_server *c_server cdef bint is_started # start has been called cdef bint is_shutting_down # shutdown has been called diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index ce701724fd..d72648a35d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -29,11 +29,9 @@ cdef class Server: self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer self._vtable.cmp = &_compare_pointer - cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( - arguments) - cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) - self.c_server = grpc_server_create(c_arguments, NULL) - arguments_processor.un_c() + cdef _ChannelArgs channel_args = _ChannelArgs.from_args( + arguments, &self._vtable) + self.c_server = grpc_server_create(channel_args.c_args(), NULL) self.references.append(arguments) self.is_started = False self.is_shutting_down = False @@ -128,7 +126,10 @@ cdef class Server: with nogil: grpc_server_cancel_all_calls(self.c_server) - def __dealloc__(self): + # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any, + # portion of this is safe to call from __dealloc__, and potentially remove + # backup_shutdown_queue. + def destroy(self): if self.c_server != NULL: if not self.is_started: pass @@ -146,4 +147,8 @@ cdef class Server: while not self.is_shutdown: time.sleep(0) grpc_server_destroy(self.c_server) - grpc_shutdown() + self.c_server = NULL + + def __dealloc(self): + if self.c_server == NULL: + grpc_shutdown() |