aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/testing/_client_application.py
blob: 7d0d74c8c4a701754fa795c3b3507c62abf6a62d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example gRPC Python-using client-side application."""

import collections
import enum
import threading
import time

import grpc
from tests.unit.framework.common import test_constants

from tests.testing.proto import requests_pb2
from tests.testing.proto import services_pb2
from tests.testing.proto import services_pb2_grpc

from tests.testing import _application_common


@enum.unique
class Scenario(enum.Enum):
    UNARY_UNARY = 'unary unary'
    UNARY_STREAM = 'unary stream'
    STREAM_UNARY = 'stream unary'
    STREAM_STREAM = 'stream stream'
    CONCURRENT_STREAM_UNARY = 'concurrent stream unary'
    CONCURRENT_STREAM_STREAM = 'concurrent stream stream'
    CANCEL_UNARY_UNARY = 'cancel unary unary'
    CANCEL_UNARY_STREAM = 'cancel unary stream'
    INFINITE_REQUEST_STREAM = 'infinite request stream'


class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))):
    """Outcome of a client application scenario.

    Attributes:
      kind: A Kind value describing the overall kind of scenario execution.
      code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
      details: A status details string. Only valid if kind is Kind.RPC_ERROR.
    """

    @enum.unique
    class Kind(enum.Enum):
        SATISFACTORY = 'satisfactory'
        UNSATISFACTORY = 'unsatisfactory'
        RPC_ERROR = 'rpc error'


_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)


class _Pipe(object):

    def __init__(self):
        self._condition = threading.Condition()
        self._values = []
        self._open = True

    def __iter__(self):
        return self

    def _next(self):
        with self._condition:
            while True:
                if self._values:
                    return self._values.pop(0)
                elif not self._open:
                    raise StopIteration()
                else:
                    self._condition.wait()

    def __next__(self):  # (Python 3 Iterator Protocol)
        return self._next()

    def next(self):  # (Python 2 Iterator Protocol)
        return self._next()

    def add(self, value):
        with self._condition:
            self._values.append(value)
            self._condition.notify_all()

    def close(self):
        with self._condition:
            self._open = False
            self._condition.notify_all()


def _run_unary_unary(stub):
    response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
    if _application_common.UNARY_UNARY_RESPONSE == response:
        return _SATISFACTORY_OUTCOME
    else:
        return _UNSATISFACTORY_OUTCOME


def _run_unary_stream(stub):
    response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
    try:
        next(response_iterator)
    except StopIteration:
        return _SATISFACTORY_OUTCOME
    else:
        return _UNSATISFACTORY_OUTCOME


def _run_stream_unary(stub):
    response, call = stub.StreUn.with_call(
        iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
    if (_application_common.STREAM_UNARY_RESPONSE == response and
            call.code() is grpc.StatusCode.OK):
        return _SATISFACTORY_OUTCOME
    else:
        return _UNSATISFACTORY_OUTCOME


def _run_stream_stream(stub):
    request_pipe = _Pipe()
    response_iterator = stub.StreStre(iter(request_pipe))
    request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
    first_responses = next(response_iterator), next(response_iterator),
    request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
    second_responses = next(response_iterator), next(response_iterator),
    request_pipe.close()
    try:
        next(response_iterator)
    except StopIteration:
        unexpected_extra_response = False
    else:
        unexpected_extra_response = True
    if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and
            second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
            and not unexpected_extra_response):
        return _SATISFACTORY_OUTCOME
    else:
        return _UNSATISFACTORY_OUTCOME


def _run_concurrent_stream_unary(stub):
    future_calls = tuple(
        stub.StreUn.future(
            iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
        for _ in range(test_constants.THREAD_CONCURRENCY))
    for future_call in future_calls:
        if future_call.code() is grpc.StatusCode.OK:
            response = future_call.result()
            if _application_common.STREAM_UNARY_RESPONSE != response:
                return _UNSATISFACTORY_OUTCOME
        else:
            return _UNSATISFACTORY_OUTCOME
    else:
        return _SATISFACTORY_OUTCOME


def _run_concurrent_stream_stream(stub):
    condition = threading.Condition()
    outcomes = [None] * test_constants.RPC_CONCURRENCY

    def run_stream_stream(index):
        outcome = _run_stream_stream(stub)
        with condition:
            outcomes[index] = outcome
            condition.notify()

    for index in range(test_constants.RPC_CONCURRENCY):
        thread = threading.Thread(target=run_stream_stream, args=(index,))
        thread.start()
    with condition:
        while True:
            if all(outcomes):
                for outcome in outcomes:
                    if outcome.kind is not Outcome.Kind.SATISFACTORY:
                        return _UNSATISFACTORY_OUTCOME
                else:
                    return _SATISFACTORY_OUTCOME
            else:
                condition.wait()


def _run_cancel_unary_unary(stub):
    response_future_call = stub.UnUn.future(
        _application_common.UNARY_UNARY_REQUEST)
    initial_metadata = response_future_call.initial_metadata()
    cancelled = response_future_call.cancel()
    if initial_metadata is not None and cancelled:
        return _SATISFACTORY_OUTCOME
    else:
        return _UNSATISFACTORY_OUTCOME


def _run_infinite_request_stream(stub):

    def infinite_request_iterator():
        while True:
            yield _application_common.STREAM_UNARY_REQUEST

    response_future_call = stub.StreUn.future(
        infinite_request_iterator(),
        timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
    if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
        return _SATISFACTORY_OUTCOME
    else:
        return _UNSATISFACTORY_OUTCOME


def run(scenario, channel):
    stub = services_pb2_grpc.FirstServiceStub(channel)
    try:
        if scenario is Scenario.UNARY_UNARY:
            return _run_unary_unary(stub)
        elif scenario is Scenario.UNARY_STREAM:
            return _run_unary_stream(stub)
        elif scenario is Scenario.STREAM_UNARY:
            return _run_stream_unary(stub)
        elif scenario is Scenario.STREAM_STREAM:
            return _run_stream_stream(stub)
        elif scenario is Scenario.CONCURRENT_STREAM_UNARY:
            return _run_concurrent_stream_unary(stub)
        elif scenario is Scenario.CONCURRENT_STREAM_STREAM:
            return _run_concurrent_stream_stream(stub)
        elif scenario is Scenario.CANCEL_UNARY_UNARY:
            return _run_cancel_unary_unary(stub)
        elif scenario is Scenario.INFINITE_REQUEST_STREAM:
            return _run_infinite_request_stream(stub)
    except grpc.RpcError as rpc_error:
        return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
                       rpc_error.details())


_IMPLEMENTATIONS = {
    Scenario.UNARY_UNARY: _run_unary_unary,
    Scenario.UNARY_STREAM: _run_unary_stream,
    Scenario.STREAM_UNARY: _run_stream_unary,
    Scenario.STREAM_STREAM: _run_stream_stream,
    Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
    Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
    Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
    Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
}


def run(scenario, channel):
    stub = services_pb2_grpc.FirstServiceStub(channel)
    try:
        return _IMPLEMENTATIONS[scenario](stub)
    except grpc.RpcError as rpc_error:
        return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
                       rpc_error.details())