diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/testing/_time_test.py')
-rw-r--r-- | src/python/grpcio_tests/tests/testing/_time_test.py | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/testing/_time_test.py b/src/python/grpcio_tests/tests/testing/_time_test.py new file mode 100644 index 0000000000..797394ae20 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_time_test.py @@ -0,0 +1,165 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import threading +import time +import unittest + +import grpc_testing + +_QUANTUM = 0.3 +_MANY = 10000 +# Tests that run in real time can either wait for the scheduler to +# eventually run what needs to be run (and risk timing out) or declare +# that the scheduler didn't schedule work reasonably fast enough. We +# choose the latter for this test. +_PATHOLOGICAL_SCHEDULING = 'pathological thread scheduling!' + + +class _TimeNoter(object): + + def __init__(self, time): + self._condition = threading.Condition() + self._time = time + self._call_times = [] + + def __call__(self): + with self._condition: + self._call_times.append(self._time.time()) + + def call_times(self): + with self._condition: + return tuple(self._call_times) + + +class TimeTest(object): + + def test_sleep_for(self): + start_time = self._time.time() + self._time.sleep_for(_QUANTUM) + end_time = self._time.time() + + self.assertLessEqual(start_time + _QUANTUM, end_time) + + def test_sleep_until(self): + start_time = self._time.time() + self._time.sleep_until(start_time + _QUANTUM) + end_time = self._time.time() + + self.assertLessEqual(start_time + _QUANTUM, end_time) + + def test_call_in(self): + time_noter = _TimeNoter(self._time) + + start_time = self._time.time() + self._time.call_in(time_noter, _QUANTUM) + self._time.sleep_for(_QUANTUM * 2) + call_times = time_noter.call_times() + + self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) + self.assertLessEqual(start_time + _QUANTUM, call_times[0]) + + def test_call_at(self): + time_noter = _TimeNoter(self._time) + + start_time = self._time.time() + self._time.call_at(time_noter, self._time.time() + _QUANTUM) + self._time.sleep_for(_QUANTUM * 2) + call_times = time_noter.call_times() + + self.assertTrue(call_times, msg=_PATHOLOGICAL_SCHEDULING) + self.assertLessEqual(start_time + _QUANTUM, call_times[0]) + + def test_cancel(self): + time_noter = _TimeNoter(self._time) + + future = self._time.call_in(time_noter, _QUANTUM * 2) + self._time.sleep_for(_QUANTUM) + cancelled = future.cancel() + self._time.sleep_for(_QUANTUM * 2) + call_times = time_noter.call_times() + + self.assertFalse(call_times, msg=_PATHOLOGICAL_SCHEDULING) + self.assertTrue(cancelled) + self.assertTrue(future.cancelled()) + + def test_many(self): + test_events = tuple(threading.Event() for _ in range(_MANY)) + possibly_cancelled_futures = {} + background_noise_futures = [] + + for test_event in test_events: + possibly_cancelled_futures[test_event] = self._time.call_in( + test_event.set, _QUANTUM * (2 + random.random())) + for _ in range(_MANY): + background_noise_futures.append( + self._time.call_in(threading.Event().set, _QUANTUM * 1000 * + random.random())) + self._time.sleep_for(_QUANTUM) + cancelled = set() + for test_event, test_future in possibly_cancelled_futures.items(): + if bool(random.randint(0, 1)) and test_future.cancel(): + cancelled.add(test_event) + self._time.sleep_for(_QUANTUM * 3) + + for test_event in test_events: + (self.assertFalse if test_event in cancelled else + self.assertTrue)(test_event.is_set()) + for background_noise_future in background_noise_futures: + background_noise_future.cancel() + + def test_same_behavior_used_several_times(self): + time_noter = _TimeNoter(self._time) + + start_time = self._time.time() + first_future_at_one = self._time.call_in(time_noter, _QUANTUM) + second_future_at_one = self._time.call_in(time_noter, _QUANTUM) + first_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) + second_future_at_three = self._time.call_in(time_noter, _QUANTUM * 3) + self._time.sleep_for(_QUANTUM * 2) + first_future_at_one_cancelled = first_future_at_one.cancel() + second_future_at_one_cancelled = second_future_at_one.cancel() + first_future_at_three_cancelled = first_future_at_three.cancel() + self._time.sleep_for(_QUANTUM * 2) + second_future_at_three_cancelled = second_future_at_three.cancel() + first_future_at_three_cancelled_again = first_future_at_three.cancel() + call_times = time_noter.call_times() + + self.assertEqual(3, len(call_times), msg=_PATHOLOGICAL_SCHEDULING) + self.assertFalse(first_future_at_one_cancelled) + self.assertFalse(second_future_at_one_cancelled) + self.assertTrue(first_future_at_three_cancelled) + self.assertFalse(second_future_at_three_cancelled) + self.assertTrue(first_future_at_three_cancelled_again) + self.assertLessEqual(start_time + _QUANTUM, call_times[0]) + self.assertLessEqual(start_time + _QUANTUM, call_times[1]) + self.assertLessEqual(start_time + _QUANTUM * 3, call_times[2]) + + +class StrictRealTimeTest(TimeTest, unittest.TestCase): + + def setUp(self): + self._time = grpc_testing.strict_real_time() + + +class StrictFakeTimeTest(TimeTest, unittest.TestCase): + + def setUp(self): + self._time = grpc_testing.strict_fake_time( + random.randint(0, int(time.time()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) |