aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/_session_cache_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/_session_cache_test.py')
-rw-r--r--src/python/grpcio_tests/tests/unit/_session_cache_test.py145
1 files changed, 145 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_session_cache_test.py b/src/python/grpcio_tests/tests/unit/_session_cache_test.py
new file mode 100644
index 0000000000..b4e4670fa7
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_session_cache_test.py
@@ -0,0 +1,145 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests experimental TLS Session Resumption API"""
+
+import pickle
+import unittest
+
+import grpc
+from grpc import _channel
+from grpc.experimental import session_cache
+
+from tests.unit import test_common
+from tests.unit import resources
+
+_REQUEST = b'\x00\x00\x00'
+_RESPONSE = b'\x00\x00\x00'
+
+_UNARY_UNARY = '/test/UnaryUnary'
+
+_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
+_ID = 'id'
+_ID_KEY = 'id_key'
+_AUTH_CTX = 'auth_ctx'
+
+_PRIVATE_KEY = resources.private_key()
+_CERTIFICATE_CHAIN = resources.certificate_chain()
+_TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
+_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
+_PROPERTY_OPTIONS = ((
+ 'grpc.ssl_target_name_override',
+ _SERVER_HOST_OVERRIDE,
+),)
+
+
+def handle_unary_unary(request, servicer_context):
+ return pickle.dumps({
+ _ID: servicer_context.peer_identities(),
+ _ID_KEY: servicer_context.peer_identity_key(),
+ _AUTH_CTX: servicer_context.auth_context()
+ })
+
+
+def start_secure_server():
+ handler = grpc.method_handlers_generic_handler('test', {
+ 'UnaryUnary':
+ grpc.unary_unary_rpc_method_handler(handle_unary_unary)
+ })
+ server = test_common.test_server()
+ server.add_generic_rpc_handlers((handler,))
+ server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
+ port = server.add_secure_port('[::]:0', server_cred)
+ server.start()
+
+ return server, port
+
+
+class SSLSessionCacheTest(unittest.TestCase):
+
+ def _do_one_shot_client_rpc(self, channel_creds, channel_options, port,
+ expect_ssl_session_reused):
+ channel = grpc.secure_channel(
+ 'localhost:{}'.format(port), channel_creds, options=channel_options)
+ response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
+ auth_data = pickle.loads(response)
+ self.assertEqual(expect_ssl_session_reused,
+ auth_data[_AUTH_CTX]['ssl_session_reused'])
+ channel.close()
+
+ def testSSLSessionCacheLRU(self):
+ server_1, port_1 = start_secure_server()
+
+ cache = session_cache.ssl_session_cache_lru(1)
+ channel_creds = grpc.ssl_channel_credentials(
+ root_certificates=_TEST_ROOT_CERTIFICATES)
+ channel_options = _PROPERTY_OPTIONS + (
+ ('grpc.ssl_session_cache', cache),)
+
+ # Initial connection has no session to resume
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'false'])
+
+ # Connection to server_1 resumes from initial session
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'true'])
+
+ # Connection to a different server with the same name overwrites the cache entry
+ server_2, port_2 = start_secure_server()
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_2,
+ expect_ssl_session_reused=[b'false'])
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_2,
+ expect_ssl_session_reused=[b'true'])
+ server_2.stop(None)
+
+ # Connection to server_1 now falls back to full TLS handshake
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'false'])
+
+ # Re-creating server_1 causes old sessions to become invalid
+ server_1.stop(None)
+ server_1, port_1 = start_secure_server()
+
+ # Old sessions should no longer be valid
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'false'])
+
+ # Resumption should work for subsequent connections
+ self._do_one_shot_client_rpc(
+ channel_creds,
+ channel_options,
+ port_1,
+ expect_ssl_session_reused=[b'true'])
+ server_1.stop(None)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)