aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/framework/foundation/_logging_pool_test.py
blob: c4ea03177cc88bfdac8497af121b1ce70f0bd205 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for grpc.framework.foundation.logging_pool."""

import threading
import unittest

from grpc.framework.foundation import logging_pool

_POOL_SIZE = 16


class _CallableObject(object):

    def __init__(self):
        self._lock = threading.Lock()
        self._passed_values = []

    def __call__(self, value):
        with self._lock:
            self._passed_values.append(value)

    def passed_values(self):
        with self._lock:
            return tuple(self._passed_values)


class LoggingPoolTest(unittest.TestCase):

    def testUpAndDown(self):
        pool = logging_pool.pool(_POOL_SIZE)
        pool.shutdown(wait=True)

        with logging_pool.pool(_POOL_SIZE) as pool:
            self.assertIsNotNone(pool)

    def testTaskExecuted(self):
        test_list = []

        with logging_pool.pool(_POOL_SIZE) as pool:
            pool.submit(lambda: test_list.append(object())).result()

        self.assertTrue(test_list)

    def testException(self):
        with logging_pool.pool(_POOL_SIZE) as pool:
            raised_exception = pool.submit(lambda: 1 / 0).exception()

        self.assertIsNotNone(raised_exception)

    def testCallableObjectExecuted(self):
        callable_object = _CallableObject()
        passed_object = object()
        with logging_pool.pool(_POOL_SIZE) as pool:
            future = pool.submit(callable_object, passed_object)
        self.assertIsNone(future.result())
        self.assertSequenceEqual((passed_object,),
                                 callable_object.passed_values())


if __name__ == '__main__':
    unittest.main(verbosity=2)