# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. cimport cpython import time cdef class Server: def __cinit__(self, ChannelArgs arguments=None): cdef grpc_channel_args *c_arguments = NULL self.references = [] self.registered_completion_queues = [] if arguments is not None: c_arguments = &arguments.c_args self.references.append(arguments) with nogil: self.c_server = grpc_server_create(c_arguments, NULL) self.is_started = False self.is_shutting_down = False self.is_shutdown = False def request_call( self, CompletionQueue call_queue not None, CompletionQueue server_queue not None, tag): if not self.is_started or self.is_shutting_down: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") cdef grpc_call_error result cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = Call() operation_tag.request_call_details = CallDetails() operation_tag.request_metadata = Metadata([]) operation_tag.references.extend([self, call_queue, server_queue]) operation_tag.is_new_request = True operation_tag.batch_operations = Operations([]) cpython.Py_INCREF(operation_tag) with nogil: result = grpc_server_request_call( self.c_server, &operation_tag.operation_call.c_call, &operation_tag.request_call_details.c_details, &operation_tag.request_metadata.c_metadata_array, call_queue.c_completion_queue, server_queue.c_completion_queue, operation_tag) return result def register_completion_queue( self, CompletionQueue queue not None): if self.is_started: raise ValueError("cannot register completion queues after start") with nogil: grpc_server_register_completion_queue( self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) def start(self): if self.is_started: raise ValueError("the server has already started") self.backup_shutdown_queue = CompletionQueue() self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True with nogil: grpc_server_start(self.c_server) # Ensure the core has gotten a chance to do the start-up work self.backup_shutdown_queue.pluck(None, Timespec(None)) def add_http2_port(self, address, ServerCredentials server_credentials=None): if isinstance(address, bytes): pass elif isinstance(address, basestring): address = address.encode() else: raise TypeError("expected address to be a str or bytes") self.references.append(address) cdef int result cdef char *address_c_string = address if server_credentials is not None: self.references.append(server_credentials) with nogil: result = grpc_server_add_secure_http2_port( self.c_server, address_c_string, server_credentials.c_credentials) else: with nogil: result = grpc_server_add_insecure_http2_port(self.c_server, address_c_string) return result cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True operation_tag = OperationTag(tag) operation_tag.shutting_down_server = self cpython.Py_INCREF(operation_tag) with nogil: grpc_server_shutdown_and_notify( self.c_server, queue.c_completion_queue, operation_tag) def shutdown(self, CompletionQueue queue not None, tag): cdef OperationTag operation_tag if queue.is_shutting_down: raise ValueError("queue must be live") elif not self.is_started: raise ValueError("the server hasn't started yet") elif self.is_shutting_down: return elif queue not in self.registered_completion_queues: raise ValueError("expected registered completion queue") else: self._c_shutdown(queue, tag) cdef notify_shutdown_complete(self): # called only by a completion queue on receiving our shutdown operation tag self.is_shutdown = True def cancel_all_calls(self): if not self.is_shutting_down: raise RuntimeError("the server must be shutting down to cancel all calls") elif self.is_shutdown: return else: with nogil: grpc_server_cancel_all_calls(self.c_server) def __dealloc__(self): if self.c_server != NULL: if not self.is_started: pass elif self.is_shutdown: pass elif not self.is_shutting_down: # the user didn't call shutdown - use our backup queue self._c_shutdown(self.backup_shutdown_queue, None) # and now we wait while not self.is_shutdown: self.backup_shutdown_queue.poll() else: # We're in the process of shutting down, but have not shutdown; can't do # much but repeatedly release the GIL and wait while not self.is_shutdown: time.sleep(0) with nogil: grpc_server_destroy(self.c_server)