aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_adapter/_low.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/_adapter/_low.py')
-rw-r--r--src/python/grpcio/grpc/_adapter/_low.py80
1 files changed, 80 insertions, 0 deletions
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
index 264c33b484..b13d8dd9dd 100644
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ b/src/python/grpcio/grpc/_adapter/_low.py
@@ -27,8 +27,11 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+import threading
+
from grpc import _grpcio_metadata
from grpc._cython import cygrpc
+from grpc._adapter import _implementations
from grpc._adapter import _types
_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
@@ -37,6 +40,9 @@ ChannelCredentials = cygrpc.ChannelCredentials
CallCredentials = cygrpc.CallCredentials
ServerCredentials = cygrpc.ServerCredentials
+channel_credentials_composite = cygrpc.channel_credentials_composite
+call_credentials_composite = cygrpc.call_credentials_composite
+
def server_credentials_ssl(root_credentials, pair_sequence, force_client_auth):
return cygrpc.server_credentials_ssl(
root_credentials,
@@ -51,6 +57,80 @@ def channel_credentials_ssl(
return cygrpc.channel_credentials_ssl(root_certificates, pair)
+class _WrappedCygrpcCallback(object):
+
+ def __init__(self, cygrpc_callback):
+ self.is_called = False
+ self.error = None
+ self.is_called_lock = threading.Lock()
+ self.cygrpc_callback = cygrpc_callback
+
+ def _invoke_failure(self, error):
+ # TODO(atash) translate different Exception superclasses into different
+ # status codes.
+ self.cygrpc_callback(
+ cygrpc.Metadata([]), cygrpc.StatusCode.internal, error.message)
+
+ def _invoke_success(self, metadata):
+ try:
+ cygrpc_metadata = cygrpc.Metadata(
+ cygrpc.Metadatum(key, value)
+ for key, value in metadata)
+ except Exception as error:
+ self._invoke_failure(error)
+ return
+ self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, '')
+
+ def __call__(self, metadata, error):
+ with self.is_called_lock:
+ if self.is_called:
+ raise RuntimeError('callback should only ever be invoked once')
+ if self.error:
+ self._invoke_failure(self.error)
+ return
+ self.is_called = True
+ if error is None:
+ self._invoke_success(metadata)
+ else:
+ self._invoke_failure(error)
+
+ def notify_failure(self, error):
+ with self.is_called_lock:
+ if not self.is_called:
+ self.error = error
+
+
+class _WrappedPlugin(object):
+
+ def __init__(self, plugin):
+ self.plugin = plugin
+
+ def __call__(self, context, cygrpc_callback):
+ wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
+ wrapped_context = _implementations.AuthMetadataContext(context.service_url,
+ context.method_name)
+ try:
+ self.plugin(
+ wrapped_context,
+ _implementations.AuthMetadataPluginCallback(wrapped_cygrpc_callback))
+ except Exception as error:
+ wrapped_cygrpc_callback.notify_failure(error)
+ raise
+
+
+def call_credentials_metadata_plugin(plugin, name):
+ """
+ Args:
+ plugin: A callable accepting a _types.AuthMetadataContext
+ object and a callback (itself accepting a list of metadata key/value
+ 2-tuples and a None-able exception value). The callback must be eventually
+ called, but need not be called in plugin's invocation.
+ plugin's invocation must be non-blocking.
+ """
+ return cygrpc.call_credentials_metadata_plugin(
+ cygrpc.CredentialsMetadataPlugin(_WrappedPlugin(plugin), name))
+
+
class CompletionQueue(_types.CompletionQueue):
def __init__(self):