aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_plugin_wrapping.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/_plugin_wrapping.py')
-rw-r--r--src/python/grpcio/grpc/_plugin_wrapping.py123
1 files changed, 62 insertions, 61 deletions
diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py
index 7cb5218c22..bb9a42f3ff 100644
--- a/src/python/grpcio/grpc/_plugin_wrapping.py
+++ b/src/python/grpcio/grpc/_plugin_wrapping.py
@@ -36,82 +36,82 @@ from grpc._cython import cygrpc
class AuthMetadataContext(
- collections.namedtuple(
- 'AuthMetadataContext', ('service_url', 'method_name',)),
- grpc.AuthMetadataContext):
- pass
+ collections.namedtuple('AuthMetadataContext', (
+ 'service_url',
+ 'method_name',)), grpc.AuthMetadataContext):
+ pass
class AuthMetadataPluginCallback(grpc.AuthMetadataContext):
- def __init__(self, callback):
- self._callback = callback
+ def __init__(self, callback):
+ self._callback = callback
- def __call__(self, metadata, error):
- self._callback(metadata, error)
+ def __call__(self, metadata, error):
+ self._callback(metadata, error)
class _WrappedCygrpcCallback(object):
- def __init__(self, cygrpc_callback):
- self.is_called = False
- self.error = None
- self.is_called_lock = threading.Lock()
- self.cygrpc_callback = cygrpc_callback
-
- def _invoke_failure(self, error):
- # TODO(atash) translate different Exception superclasses into different
- # status codes.
- self.cygrpc_callback(
- _common.EMPTY_METADATA, cygrpc.StatusCode.internal,
- _common.encode(str(error)))
-
- def _invoke_success(self, metadata):
- try:
- cygrpc_metadata = _common.cygrpc_metadata(metadata)
- except Exception as error:
- self._invoke_failure(error)
- return
- self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'')
-
- def __call__(self, metadata, error):
- with self.is_called_lock:
- if self.is_called:
- raise RuntimeError('callback should only ever be invoked once')
- if self.error:
- self._invoke_failure(self.error)
- return
- self.is_called = True
- if error is None:
- self._invoke_success(metadata)
- else:
- self._invoke_failure(error)
-
- def notify_failure(self, error):
- with self.is_called_lock:
- if not self.is_called:
- self.error = error
+ def __init__(self, cygrpc_callback):
+ self.is_called = False
+ self.error = None
+ self.is_called_lock = threading.Lock()
+ self.cygrpc_callback = cygrpc_callback
+
+ def _invoke_failure(self, error):
+ # TODO(atash) translate different Exception superclasses into different
+ # status codes.
+ self.cygrpc_callback(_common.EMPTY_METADATA, cygrpc.StatusCode.internal,
+ _common.encode(str(error)))
+
+ def _invoke_success(self, metadata):
+ try:
+ cygrpc_metadata = _common.cygrpc_metadata(metadata)
+ except Exception as error:
+ self._invoke_failure(error)
+ return
+ self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'')
+
+ def __call__(self, metadata, error):
+ with self.is_called_lock:
+ if self.is_called:
+ raise RuntimeError('callback should only ever be invoked once')
+ if self.error:
+ self._invoke_failure(self.error)
+ return
+ self.is_called = True
+ if error is None:
+ self._invoke_success(metadata)
+ else:
+ self._invoke_failure(error)
+
+ def notify_failure(self, error):
+ with self.is_called_lock:
+ if not self.is_called:
+ self.error = error
class _WrappedPlugin(object):
- def __init__(self, plugin):
- self.plugin = plugin
+ def __init__(self, plugin):
+ self.plugin = plugin
- def __call__(self, context, cygrpc_callback):
- wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
- wrapped_context = AuthMetadataContext(
- _common.decode(context.service_url), _common.decode(context.method_name))
- try:
- self.plugin(
- wrapped_context, AuthMetadataPluginCallback(wrapped_cygrpc_callback))
- except Exception as error:
- wrapped_cygrpc_callback.notify_failure(error)
- raise
+ def __call__(self, context, cygrpc_callback):
+ wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
+ wrapped_context = AuthMetadataContext(
+ _common.decode(context.service_url),
+ _common.decode(context.method_name))
+ try:
+ self.plugin(wrapped_context,
+ AuthMetadataPluginCallback(wrapped_cygrpc_callback))
+ except Exception as error:
+ wrapped_cygrpc_callback.notify_failure(error)
+ raise
def call_credentials_metadata_plugin(plugin, name):
- """
+ """
Args:
plugin: A callable accepting a grpc.AuthMetadataContext
object and a callback (itself accepting a list of metadata key/value
@@ -119,5 +119,6 @@ def call_credentials_metadata_plugin(plugin, name):
called, but need not be called in plugin's invocation.
plugin's invocation must be non-blocking.
"""
- return cygrpc.call_credentials_metadata_plugin(
- cygrpc.CredentialsMetadataPlugin(_WrappedPlugin(plugin), _common.encode(name)))
+ return cygrpc.call_credentials_metadata_plugin(
+ cygrpc.CredentialsMetadataPlugin(
+ _WrappedPlugin(plugin), _common.encode(name)))