aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2015-12-03 00:55:25 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2015-12-03 00:55:25 +0000
commit2533365bf41fb6c65053e110aa1e1030b1d64c46 (patch)
treead4eb659e2dcce970283954bdb0706172000d999 /src/python/grpcio
parente0abeeaa6d360a35d6a66da16d047fd2abc2a5d7 (diff)
Fixes to stub and server lifecycle
Context management is implemented. Stub deletion now cancels all RPCs immediately.
Diffstat (limited to 'src/python/grpcio')
-rw-r--r--src/python/grpcio/grpc/beta/_server.py175
-rw-r--r--src/python/grpcio/grpc/beta/_stub.py124
2 files changed, 200 insertions, 99 deletions
diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py
index 05b954d186..4f454437c0 100644
--- a/src/python/grpcio/grpc/beta/_server.py
+++ b/src/python/grpcio/grpc/beta/_server.py
@@ -44,6 +44,12 @@ _DEFAULT_TIMEOUT = 300
_MAXIMUM_TIMEOUT = 24 * 60 * 60
+def _set_event():
+ event = threading.Event()
+ event.set()
+ return event
+
+
class _GRPCServicer(base.Servicer):
def __init__(self, delegate):
@@ -61,86 +67,143 @@ class _GRPCServicer(base.Servicer):
raise
-def _disassemble(grpc_link, end_link, pool, event, grace):
- grpc_link.begin_stop()
- end_link.stop(grace).wait()
- grpc_link.end_stop()
- grpc_link.join_link(utilities.NULL_LINK)
- end_link.join_link(utilities.NULL_LINK)
- if pool is not None:
- pool.shutdown(wait=True)
- event.set()
+class _Server(interfaces.Server):
+ def __init__(
+ self, implementations, multi_implementation, pool, pool_size,
+ default_timeout, maximum_timeout, grpc_link):
+ self._lock = threading.Lock()
+ self._implementations = implementations
+ self._multi_implementation = multi_implementation
+ self._customer_pool = pool
+ self._pool_size = pool_size
+ self._default_timeout = default_timeout
+ self._maximum_timeout = maximum_timeout
+ self._grpc_link = grpc_link
-class Server(interfaces.Server):
+ self._end_link = None
+ self._stop_events = None
+ self._pool = None
- def __init__(self, grpc_link, end_link, pool):
- self._grpc_link = grpc_link
- self._end_link = end_link
- self._pool = pool
+ def _start(self):
+ with self._lock:
+ if self._end_link is not None:
+ raise ValueError('Cannot start already-started server!')
+
+ if self._customer_pool is None:
+ self._pool = logging_pool.pool(self._pool_size)
+ assembly_pool = self._pool
+ else:
+ assembly_pool = self._customer_pool
+
+ servicer = _GRPCServicer(
+ _crust_implementations.servicer(
+ self._implementations, self._multi_implementation, assembly_pool))
+
+ self._end_link = _core_implementations.service_end_link(
+ servicer, self._default_timeout, self._maximum_timeout)
+
+ self._grpc_link.join_link(self._end_link)
+ self._end_link.join_link(self._grpc_link)
+ self._grpc_link.start()
+ self._end_link.start()
+
+ def _dissociate_links_and_shut_down_pool(self):
+ self._grpc_link.end_stop()
+ self._grpc_link.join_link(utilities.NULL_LINK)
+ self._end_link.join_link(utilities.NULL_LINK)
+ self._end_link = None
+ if self._pool is not None:
+ self._pool.shutdown(wait=True)
+ self._pool = None
+
+ def _stop_stopping(self):
+ self._dissociate_links_and_shut_down_pool()
+ for stop_event in self._stop_events:
+ stop_event.set()
+ self._stop_events = None
+
+ def _stop_started(self):
+ self._grpc_link.begin_stop()
+ self._end_link.stop(0).wait()
+ self._dissociate_links_and_shut_down_pool()
+
+ def _foreign_thread_stop(self, end_stop_event, stop_events):
+ end_stop_event.wait()
+ with self._lock:
+ if self._stop_events is stop_events:
+ self._stop_stopping()
+
+ def _schedule_stop(self, grace):
+ with self._lock:
+ if self._end_link is None:
+ return _set_event()
+ server_stop_event = threading.Event()
+ if self._stop_events is None:
+ self._stop_events = [server_stop_event]
+ self._grpc_link.begin_stop()
+ else:
+ self._stop_events.append(server_stop_event)
+ end_stop_event = self._end_link.stop(grace)
+ end_stop_thread = threading.Thread(
+ target=self._foreign_thread_stop,
+ args=(end_stop_event, self._stop_events))
+ end_stop_thread.start()
+ return server_stop_event
+
+ def _stop_now(self):
+ with self._lock:
+ if self._end_link is not None:
+ if self._stop_events is None:
+ self._stop_started()
+ else:
+ self._stop_stopping()
def add_insecure_port(self, address):
- return self._grpc_link.add_port(address, None)
+ with self._lock:
+ if self._end_link is None:
+ return self._grpc_link.add_port(address, None)
+ else:
+ raise ValueError('Can\'t add port to serving server!')
def add_secure_port(self, address, server_credentials):
- return self._grpc_link.add_port(
- address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
-
- def _start(self):
- self._grpc_link.join_link(self._end_link)
- self._end_link.join_link(self._grpc_link)
- self._grpc_link.start()
- self._end_link.start()
-
- def _stop(self, grace):
- stop_event = threading.Event()
- if 0 < grace:
- disassembly_thread = threading.Thread(
- target=_disassemble,
- args=(
- self._grpc_link, self._end_link, self._pool, stop_event, grace,))
- disassembly_thread.start()
- return stop_event
- else:
- _disassemble(self._grpc_link, self._end_link, self._pool, stop_event, 0)
- return stop_event
+ with self._lock:
+ if self._end_link is None:
+ return self._grpc_link.add_port(
+ address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
+ else:
+ raise ValueError('Can\'t add port to serving server!')
def start(self):
self._start()
def stop(self, grace):
- return self._stop(grace)
+ if 0 < grace:
+ return self._schedule_stop(grace)
+ else:
+ self._stop_now()
+ return _set_event()
def __enter__(self):
self._start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
- self._stop(0).wait()
+ self._stop_now()
return False
+ def __del__(self):
+ self._stop_now()
+
def server(
implementations, multi_implementation, request_deserializers,
response_serializers, thread_pool, thread_pool_size, default_timeout,
maximum_timeout):
- if thread_pool is None:
- service_thread_pool = logging_pool.pool(
- _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
- assembly_thread_pool = service_thread_pool
- else:
- service_thread_pool = thread_pool
- assembly_thread_pool = None
-
- servicer = _GRPCServicer(
- _crust_implementations.servicer(
- implementations, multi_implementation, service_thread_pool))
-
grpc_link = service.service_link(request_deserializers, response_serializers)
-
- end_link = _core_implementations.service_end_link(
- servicer,
+ return _Server(
+ implementations, multi_implementation, thread_pool,
+ _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size,
_DEFAULT_TIMEOUT if default_timeout is None else default_timeout,
- _MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout)
-
- return Server(grpc_link, end_link, assembly_thread_pool)
+ _MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout,
+ grpc_link)
diff --git a/src/python/grpcio/grpc/beta/_stub.py b/src/python/grpcio/grpc/beta/_stub.py
index 11dab889cd..2af019309a 100644
--- a/src/python/grpcio/grpc/beta/_stub.py
+++ b/src/python/grpcio/grpc/beta/_stub.py
@@ -42,76 +42,114 @@ _DEFAULT_POOL_SIZE = 6
class _AutoIntermediary(object):
- def __init__(self, delegate, on_deletion):
+ def __init__(self, up, down, delegate):
+ self._lock = threading.Lock()
+ self._up = up
+ self._down = down
+ self._in_context = False
self._delegate = delegate
- self._on_deletion = on_deletion
def __getattr__(self, attr):
- return getattr(self._delegate, attr)
+ with self._lock:
+ if self._delegate is None:
+ raise AttributeError('No useful attributes out of context!')
+ else:
+ return getattr(self._delegate, attr)
def __enter__(self):
- return self
+ with self._lock:
+ if self._in_context:
+ raise ValueError('Already in context!')
+ elif self._delegate is None:
+ self._delegate = self._up()
+ self._in_context = True
+ return self
def __exit__(self, exc_type, exc_val, exc_tb):
- return False
+ with self._lock:
+ if not self._in_context:
+ raise ValueError('Not in context!')
+ self._down()
+ self._in_context = False
+ self._delegate = None
+ return False
def __del__(self):
- self._on_deletion()
+ with self._lock:
+ if self._delegate is not None:
+ self._down()
+ self._delegate = None
+
+
+class _StubAssemblyManager(object):
+
+ def __init__(
+ self, thread_pool, thread_pool_size, end_link, grpc_link, stub_creator):
+ self._thread_pool = thread_pool
+ self._pool_size = thread_pool_size
+ self._end_link = end_link
+ self._grpc_link = grpc_link
+ self._stub_creator = stub_creator
+ self._own_pool = None
+
+ def up(self):
+ if self._thread_pool is None:
+ self._own_pool = logging_pool.pool(
+ _DEFAULT_POOL_SIZE if self._pool_size is None else self._pool_size)
+ assembly_pool = self._own_pool
+ else:
+ assembly_pool = self._thread_pool
+ self._end_link.join_link(self._grpc_link)
+ self._grpc_link.join_link(self._end_link)
+ self._end_link.start()
+ self._grpc_link.start()
+ return self._stub_creator(self._end_link, assembly_pool)
+
+ def down(self):
+ self._end_link.stop(0).wait()
+ self._grpc_link.stop()
+ self._end_link.join_link(utilities.NULL_LINK)
+ self._grpc_link.join_link(utilities.NULL_LINK)
+ if self._own_pool is not None:
+ self._own_pool.shutdown(wait=True)
+ self._own_pool = None
def _assemble(
channel, host, metadata_transformer, request_serializers,
- response_deserializers, thread_pool, thread_pool_size):
+ response_deserializers, thread_pool, thread_pool_size, stub_creator):
end_link = _core_implementations.invocation_end_link()
grpc_link = invocation.invocation_link(
channel, host, metadata_transformer, request_serializers,
response_deserializers)
- if thread_pool is None:
- invocation_pool = logging_pool.pool(
- _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
- assembly_pool = invocation_pool
- else:
- invocation_pool = thread_pool
- assembly_pool = None
- end_link.join_link(grpc_link)
- grpc_link.join_link(end_link)
- end_link.start()
- grpc_link.start()
- return end_link, grpc_link, invocation_pool, assembly_pool
-
-
-def _disassemble(end_link, grpc_link, pool):
- end_link.stop(24 * 60 * 60).wait()
- grpc_link.stop()
- end_link.join_link(utilities.NULL_LINK)
- grpc_link.join_link(utilities.NULL_LINK)
- if pool is not None:
- pool.shutdown(wait=True)
-
-
-def _wrap_assembly(stub, end_link, grpc_link, assembly_pool):
- disassembly_thread = threading.Thread(
- target=_disassemble, args=(end_link, grpc_link, assembly_pool))
- return _AutoIntermediary(stub, disassembly_thread.start)
+ stub_assembly_manager = _StubAssemblyManager(
+ thread_pool, thread_pool_size, end_link, grpc_link, stub_creator)
+ stub = stub_assembly_manager.up()
+ return _AutoIntermediary(
+ stub_assembly_manager.up, stub_assembly_manager.down, stub)
+
+
+def _dynamic_stub_creator(service, cardinalities):
+ def create_dynamic_stub(end_link, invocation_pool):
+ return _crust_implementations.dynamic_stub(
+ end_link, service, cardinalities, invocation_pool)
+ return create_dynamic_stub
def generic_stub(
channel, host, metadata_transformer, request_serializers,
response_deserializers, thread_pool, thread_pool_size):
- end_link, grpc_link, invocation_pool, assembly_pool = _assemble(
+ return _assemble(
channel, host, metadata_transformer, request_serializers,
- response_deserializers, thread_pool, thread_pool_size)
- stub = _crust_implementations.generic_stub(end_link, invocation_pool)
- return _wrap_assembly(stub, end_link, grpc_link, assembly_pool)
+ response_deserializers, thread_pool, thread_pool_size,
+ _crust_implementations.generic_stub)
def dynamic_stub(
channel, host, service, cardinalities, metadata_transformer,
request_serializers, response_deserializers, thread_pool,
thread_pool_size):
- end_link, grpc_link, invocation_pool, assembly_pool = _assemble(
+ return _assemble(
channel, host, metadata_transformer, request_serializers,
- response_deserializers, thread_pool, thread_pool_size)
- stub = _crust_implementations.dynamic_stub(
- end_link, service, cardinalities, invocation_pool)
- return _wrap_assembly(stub, end_link, grpc_link, assembly_pool)
+ response_deserializers, thread_pool, thread_pool_size,
+ _dynamic_stub_creator(service, cardinalities))