aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-02-22 13:18:28 -0800
committerGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-02-22 13:18:28 -0800
commit6eade73f58597c51f112abbb8964d5e76d15bb1b (patch)
treed9d34b95df1756477fda9e499c90b09cca6e5c05
parentcc69f8d958cd4785c12bf5a838a21becc8183989 (diff)
parente04e20aaca1601c098aeb035ce4eb38bcb907173 (diff)
Merge pull request #709 from nathanielmanistaatgoogle/thread-pool-less-links
Thread-pool-less construction of GRPC links.
-rw-r--r--src/python/src/grpc/_adapter/fore.py90
-rw-r--r--src/python/src/grpc/_adapter/rear.py75
2 files changed, 165 insertions, 0 deletions
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py
index 051fc083f1..b08b9f48bc 100644
--- a/src/python/src/grpc/_adapter/fore.py
+++ b/src/python/src/grpc/_adapter/fore.py
@@ -41,6 +41,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import activated
+from grpc.framework.foundation import logging_pool
+
+_THREAD_POOL_SIZE = 100
@enum.unique
@@ -353,3 +356,90 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
self._complete(ticket.operation_id, ticket.payload)
else:
self._cancel(ticket.operation_id)
+
+
+class _ActivatedForeLink(ticket_interfaces.ForeLink, activated.Activated):
+
+ def __init__(
+ self, port, request_deserializers, response_serializers,
+ root_certificates, key_chain_pairs):
+ self._port = port
+ self._request_deserializers = request_deserializers
+ self._response_serializers = response_serializers
+ self._root_certificates = root_certificates
+ self._key_chain_pairs = key_chain_pairs
+
+ self._lock = threading.Lock()
+ self._pool = None
+ self._fore_link = None
+ self._rear_link = null.NULL_REAR_LINK
+
+ def join_rear_link(self, rear_link):
+ with self._lock:
+ self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
+ if self._fore_link is not None:
+ self._fore_link.join_rear_link(rear_link)
+
+ def _start(self):
+ with self._lock:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._fore_link = ForeLink(
+ self._pool, self._request_deserializers, self._response_serializers,
+ self._root_certificates, self._key_chain_pairs, port=self._port)
+ self._fore_link.join_rear_link(self._rear_link)
+ self._fore_link.start()
+ return self
+
+ def _stop(self):
+ with self._lock:
+ self._fore_link.stop()
+ self._fore_link = None
+ self._pool.shutdown(wait=True)
+ self._pool = None
+
+ def __enter__(self):
+ return self._start()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._stop()
+ return False
+
+ def start(self):
+ return self._start()
+
+ def stop(self):
+ self._stop()
+
+ def port(self):
+ with self._lock:
+ return None if self._fore_link is None else self._fore_link.port()
+
+ def accept_back_to_front_ticket(self, ticket):
+ with self._lock:
+ if self._fore_link is not None:
+ self._fore_link.accept_back_to_front_ticket(ticket)
+
+
+def activated_fore_link(
+ port, request_deserializers, response_serializers, root_certificates,
+ key_chain_pairs):
+ """Creates a ForeLink that is also an activated.Activated.
+
+ The returned object is only valid for use between calls to its start and stop
+ methods (or in context when used as a context manager).
+
+ Args:
+ port: The port on which to serve RPCs, or None for a port to be
+ automatically selected.
+ request_deserializers: A dictionary from RPC method names to request object
+ deserializer behaviors.
+ response_serializers: A dictionary from RPC method names to response object
+ serializer behaviors.
+ root_certificates: The PEM-encoded client root certificates as a bytestring
+ or None.
+ key_chain_pairs: A sequence of PEM-encoded private key-certificate chain
+ pairs.
+ """
+ return _ActivatedForeLink(
+ port, request_deserializers, response_serializers, root_certificates,
+ key_chain_pairs)
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index cbcf121d9a..3fbcb24094 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -40,6 +40,9 @@ from grpc.framework.base.packets import interfaces as ticket_interfaces
from grpc.framework.base.packets import null
from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import activated
+from grpc.framework.foundation import logging_pool
+
+_THREAD_POOL_SIZE = 100
_INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED,
@@ -361,3 +364,75 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated):
else:
# NOTE(nathaniel): All other categories are treated as cancellation.
self._cancel(ticket.operation_id)
+
+
+class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated):
+
+ def __init__(self, host, port, request_serializers, response_deserializers):
+ self._host = host
+ self._port = port
+ self._request_serializers = request_serializers
+ self._response_deserializers = response_deserializers
+
+ self._lock = threading.Lock()
+ self._pool = None
+ self._rear_link = None
+ self._fore_link = null.NULL_FORE_LINK
+
+ def join_fore_link(self, fore_link):
+ with self._lock:
+ self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
+
+ def _start(self):
+ with self._lock:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._rear_link = RearLink(
+ self._host, self._port, self._pool, self._request_serializers,
+ self._response_deserializers)
+ self._rear_link.join_fore_link(self._fore_link)
+ self._rear_link.start()
+ return self
+
+ def _stop(self):
+ with self._lock:
+ self._rear_link.stop()
+ self._rear_link = None
+ self._pool.shutdown(wait=True)
+ self._pool = None
+
+ def __enter__(self):
+ return self._start()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._stop()
+ return False
+
+ def start(self):
+ return self._start()
+
+ def stop(self):
+ self._stop()
+
+ def accept_front_to_back_ticket(self, ticket):
+ with self._lock:
+ if self._rear_link is not None:
+ self._rear_link.accept_front_to_back_ticket(ticket)
+
+
+def activated_rear_link(
+ host, port, request_serializers, response_deserializers):
+ """Creates a RearLink that is also an activated.Activated.
+
+ The returned object is only valid for use between calls to its start and stop
+ methods (or in context when used as a context manager).
+
+ Args:
+ host: The host to which to connect for RPC service.
+ port: The port to which to connect for RPC service.
+ request_serializers: A dictionary from RPC method name to request object
+ serializer behavior.
+ response_deserializers: A dictionary from RPC method name to response
+ object deserializer behavior.
+ """
+ return _ActivatedRearLink(
+ host, port, request_serializers, response_deserializers)