aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_health_checking
diff options
context:
space:
mode:
authorGravatar Eric Gribkoff <ericgribkoff@google.com>2018-12-26 12:29:52 -0800
committerGravatar Eric Gribkoff <ericgribkoff@google.com>2018-12-26 12:39:58 -0800
commit71e7e6ddc73175df0793748e290e29321934fd7c (patch)
tree16f7cb46689c76ce49a0598db6a146923fe2d35a /src/python/grpcio_health_checking
parentd8662f5704ec6f03122943f9baa5ed07b88a1fdf (diff)
Add Watch method to health check service
Diffstat (limited to 'src/python/grpcio_health_checking')
-rw-r--r--src/python/grpcio_health_checking/grpc_health/v1/health.py76
1 files changed, 69 insertions, 7 deletions
diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py
index 0583659428..75c480b0a7 100644
--- a/src/python/grpcio_health_checking/grpc_health/v1/health.py
+++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py
@@ -23,15 +23,61 @@ from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name
+class _Watcher():
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._responses = list()
+ self._open = True
+
+ def __iter__(self):
+ return self
+
+ def _next(self):
+ with self._condition:
+ while not self._responses and self._open:
+ self._condition.wait()
+ if self._responses:
+ return self._responses.pop(0)
+ else:
+ raise StopIteration()
+
+ def next(self):
+ return self._next()
+
+ def __next__(self):
+ return self._next()
+
+ def add(self, response):
+ with self._condition:
+ self._responses.append(response)
+ self._condition.notify()
+
+ def close(self):
+ with self._condition:
+ self._open = False
+ self._condition.notify()
+
+
class HealthServicer(_health_pb2_grpc.HealthServicer):
"""Servicer handling RPCs for service statuses."""
def __init__(self):
- self._server_status_lock = threading.Lock()
+ self._lock = threading.RLock()
self._server_status = {}
+ self._watchers = {}
+
+ def _on_close_callback(self, watcher, service):
+
+ def callback():
+ with self._lock:
+ self._watchers[service].remove(watcher)
+ watcher.close()
+
+ return callback
def Check(self, request, context):
- with self._server_status_lock:
+ with self._lock:
status = self._server_status.get(request.service)
if status is None:
context.set_code(grpc.StatusCode.NOT_FOUND)
@@ -39,14 +85,30 @@ class HealthServicer(_health_pb2_grpc.HealthServicer):
else:
return _health_pb2.HealthCheckResponse(status=status)
+ def Watch(self, request, context):
+ service = request.service
+ with self._lock:
+ status = self._server_status.get(service)
+ if status is None:
+ status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member
+ watcher = _Watcher()
+ watcher.add(_health_pb2.HealthCheckResponse(status=status))
+ if service not in self._watchers:
+ self._watchers[service] = set()
+ self._watchers[service].add(watcher)
+ context.add_callback(self._on_close_callback(watcher, service))
+ return watcher
+
def set(self, service, status):
"""Sets the status of a service.
Args:
- service: string, the name of the service.
- NOTE, '' must be set.
- status: HealthCheckResponse.status enum value indicating
- the status of the service
+ service: string, the name of the service. NOTE, '' must be set.
+ status: HealthCheckResponse.status enum value indicating the status of
+ the service
"""
- with self._server_status_lock:
+ with self._lock:
self._server_status[service] = status
+ if service in self._watchers:
+ for watcher in self._watchers[service]:
+ watcher.add(_health_pb2.HealthCheckResponse(status=status))