diff options
author | Nathaniel Manista <nathaniel@google.com> | 2015-02-22 03:50:02 +0000 |
---|---|---|
committer | Nathaniel Manista <nathaniel@google.com> | 2015-02-22 03:50:02 +0000 |
commit | e04e20aaca1601c098aeb035ce4eb38bcb907173 (patch) | |
tree | a8eb789e47144b1eb21556d696cf50985bf32e23 /src | |
parent | 6e8d15e7de3c7463210f3e62ba9429177722c4df (diff) |
Thread-pool-less construction of GRPC links.
These will be used in generated code in circumstances in which we
don't necessarily want to be asking calling code to have a thread
pool readily available.
Diffstat (limited to 'src')
-rw-r--r-- | src/python/src/grpc/_adapter/fore.py | 90 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/rear.py | 75 |
2 files changed, 165 insertions, 0 deletions
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py index 051fc083f1..b08b9f48bc 100644 --- a/src/python/src/grpc/_adapter/fore.py +++ b/src/python/src/grpc/_adapter/fore.py @@ -41,6 +41,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces from grpc.framework.base.packets import null from grpc.framework.base.packets import packets as tickets from grpc.framework.foundation import activated +from grpc.framework.foundation import logging_pool + +_THREAD_POOL_SIZE = 100 @enum.unique @@ -353,3 +356,90 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): self._complete(ticket.operation_id, ticket.payload) else: self._cancel(ticket.operation_id) + + +class _ActivatedForeLink(ticket_interfaces.ForeLink, activated.Activated): + + def __init__( + self, port, request_deserializers, response_serializers, + root_certificates, key_chain_pairs): + self._port = port + self._request_deserializers = request_deserializers + self._response_serializers = response_serializers + self._root_certificates = root_certificates + self._key_chain_pairs = key_chain_pairs + + self._lock = threading.Lock() + self._pool = None + self._fore_link = None + self._rear_link = null.NULL_REAR_LINK + + def join_rear_link(self, rear_link): + with self._lock: + self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link + if self._fore_link is not None: + self._fore_link.join_rear_link(rear_link) + + def _start(self): + with self._lock: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + self._fore_link = ForeLink( + self._pool, self._request_deserializers, self._response_serializers, + self._root_certificates, self._key_chain_pairs, port=self._port) + self._fore_link.join_rear_link(self._rear_link) + self._fore_link.start() + return self + + def _stop(self): + with self._lock: + self._fore_link.stop() + self._fore_link = None + self._pool.shutdown(wait=True) + self._pool = None + + def __enter__(self): + return self._start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self._stop() + return False + + def start(self): + return self._start() + + def stop(self): + self._stop() + + def port(self): + with self._lock: + return None if self._fore_link is None else self._fore_link.port() + + def accept_back_to_front_ticket(self, ticket): + with self._lock: + if self._fore_link is not None: + self._fore_link.accept_back_to_front_ticket(ticket) + + +def activated_fore_link( + port, request_deserializers, response_serializers, root_certificates, + key_chain_pairs): + """Creates a ForeLink that is also an activated.Activated. + + The returned object is only valid for use between calls to its start and stop + methods (or in context when used as a context manager). + + Args: + port: The port on which to serve RPCs, or None for a port to be + automatically selected. + request_deserializers: A dictionary from RPC method names to request object + deserializer behaviors. + response_serializers: A dictionary from RPC method names to response object + serializer behaviors. + root_certificates: The PEM-encoded client root certificates as a bytestring + or None. + key_chain_pairs: A sequence of PEM-encoded private key-certificate chain + pairs. + """ + return _ActivatedForeLink( + port, request_deserializers, response_serializers, root_certificates, + key_chain_pairs) diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index cbcf121d9a..3fbcb24094 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -40,6 +40,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces from grpc.framework.base.packets import null from grpc.framework.base.packets import packets as tickets from grpc.framework.foundation import activated +from grpc.framework.foundation import logging_pool + +_THREAD_POOL_SIZE = 100 _INVOCATION_EVENT_KINDS = ( _low.Event.Kind.METADATA_ACCEPTED, @@ -361,3 +364,75 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): else: # NOTE(nathaniel): All other categories are treated as cancellation. self._cancel(ticket.operation_id) + + +class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated): + + def __init__(self, host, port, request_serializers, response_deserializers): + self._host = host + self._port = port + self._request_serializers = request_serializers + self._response_deserializers = response_deserializers + + self._lock = threading.Lock() + self._pool = None + self._rear_link = None + self._fore_link = null.NULL_FORE_LINK + + def join_fore_link(self, fore_link): + with self._lock: + self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link + + def _start(self): + with self._lock: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + self._rear_link = RearLink( + self._host, self._port, self._pool, self._request_serializers, + self._response_deserializers) + self._rear_link.join_fore_link(self._fore_link) + self._rear_link.start() + return self + + def _stop(self): + with self._lock: + self._rear_link.stop() + self._rear_link = None + self._pool.shutdown(wait=True) + self._pool = None + + def __enter__(self): + return self._start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self._stop() + return False + + def start(self): + return self._start() + + def stop(self): + self._stop() + + def accept_front_to_back_ticket(self, ticket): + with self._lock: + if self._rear_link is not None: + self._rear_link.accept_front_to_back_ticket(ticket) + + +def activated_rear_link( + host, port, request_serializers, response_deserializers): + """Creates a RearLink that is also an activated.Activated. + + The returned object is only valid for use between calls to its start and stop + methods (or in context when used as a context manager). + + Args: + host: The host to which to connect for RPC service. + port: The port to which to connect for RPC service. + request_serializers: A dictionary from RPC method name to request object + serializer behavior. + response_deserializers: A dictionary from RPC method name to response + object deserializer behavior. + """ + return _ActivatedRearLink( + host, port, request_serializers, response_deserializers) |