1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example gRPC Python-using client-side application."""
import collections
import enum
import threading
import time
import grpc
from tests.unit.framework.common import test_constants
from tests.testing.proto import requests_pb2
from tests.testing.proto import services_pb2
from tests.testing.proto import services_pb2_grpc
from tests.testing import _application_common
@enum.unique
class Scenario(enum.Enum):
UNARY_UNARY = 'unary unary'
UNARY_STREAM = 'unary stream'
STREAM_UNARY = 'stream unary'
STREAM_STREAM = 'stream stream'
CONCURRENT_STREAM_UNARY = 'concurrent stream unary'
CONCURRENT_STREAM_STREAM = 'concurrent stream stream'
CANCEL_UNARY_UNARY = 'cancel unary unary'
CANCEL_UNARY_STREAM = 'cancel unary stream'
INFINITE_REQUEST_STREAM = 'infinite request stream'
class Outcome(collections.namedtuple('Outcome', ('kind', 'code', 'details'))):
"""Outcome of a client application scenario.
Attributes:
kind: A Kind value describing the overall kind of scenario execution.
code: A grpc.StatusCode value. Only valid if kind is Kind.RPC_ERROR.
details: A status details string. Only valid if kind is Kind.RPC_ERROR.
"""
@enum.unique
class Kind(enum.Enum):
SATISFACTORY = 'satisfactory'
UNSATISFACTORY = 'unsatisfactory'
RPC_ERROR = 'rpc error'
_SATISFACTORY_OUTCOME = Outcome(Outcome.Kind.SATISFACTORY, None, None)
_UNSATISFACTORY_OUTCOME = Outcome(Outcome.Kind.UNSATISFACTORY, None, None)
class _Pipe(object):
def __init__(self):
self._condition = threading.Condition()
self._values = []
self._open = True
def __iter__(self):
return self
def _next(self):
with self._condition:
while True:
if self._values:
return self._values.pop(0)
elif not self._open:
raise StopIteration()
else:
self._condition.wait()
def __next__(self): # (Python 3 Iterator Protocol)
return self._next()
def next(self): # (Python 2 Iterator Protocol)
return self._next()
def add(self, value):
with self._condition:
self._values.append(value)
self._condition.notify_all()
def close(self):
with self._condition:
self._open = False
self._condition.notify_all()
def _run_unary_unary(stub):
response = stub.UnUn(_application_common.UNARY_UNARY_REQUEST)
if _application_common.UNARY_UNARY_RESPONSE == response:
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_unary_stream(stub):
response_iterator = stub.UnStre(_application_common.UNARY_STREAM_REQUEST)
try:
next(response_iterator)
except StopIteration:
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_stream_unary(stub):
response, call = stub.StreUn.with_call(
iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
if (_application_common.STREAM_UNARY_RESPONSE == response and
call.code() is grpc.StatusCode.OK):
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_stream_stream(stub):
request_pipe = _Pipe()
response_iterator = stub.StreStre(iter(request_pipe))
request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
first_responses = next(response_iterator), next(response_iterator),
request_pipe.add(_application_common.STREAM_STREAM_REQUEST)
second_responses = next(response_iterator), next(response_iterator),
request_pipe.close()
try:
next(response_iterator)
except StopIteration:
unexpected_extra_response = False
else:
unexpected_extra_response = True
if (first_responses == _application_common.TWO_STREAM_STREAM_RESPONSES and
second_responses == _application_common.TWO_STREAM_STREAM_RESPONSES
and not unexpected_extra_response):
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_concurrent_stream_unary(stub):
future_calls = tuple(
stub.StreUn.future(
iter((_application_common.STREAM_UNARY_REQUEST,) * 3))
for _ in range(test_constants.THREAD_CONCURRENCY))
for future_call in future_calls:
if future_call.code() is grpc.StatusCode.OK:
response = future_call.result()
if _application_common.STREAM_UNARY_RESPONSE != response:
return _UNSATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
else:
return _SATISFACTORY_OUTCOME
def _run_concurrent_stream_stream(stub):
condition = threading.Condition()
outcomes = [None] * test_constants.RPC_CONCURRENCY
def run_stream_stream(index):
outcome = _run_stream_stream(stub)
with condition:
outcomes[index] = outcome
condition.notify()
for index in range(test_constants.RPC_CONCURRENCY):
thread = threading.Thread(target=run_stream_stream, args=(index,))
thread.start()
with condition:
while True:
if all(outcomes):
for outcome in outcomes:
if outcome.kind is not Outcome.Kind.SATISFACTORY:
return _UNSATISFACTORY_OUTCOME
else:
return _SATISFACTORY_OUTCOME
else:
condition.wait()
def _run_cancel_unary_unary(stub):
response_future_call = stub.UnUn.future(
_application_common.UNARY_UNARY_REQUEST)
initial_metadata = response_future_call.initial_metadata()
cancelled = response_future_call.cancel()
if initial_metadata is not None and cancelled:
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
def _run_infinite_request_stream(stub):
def infinite_request_iterator():
while True:
yield _application_common.STREAM_UNARY_REQUEST
response_future_call = stub.StreUn.future(
infinite_request_iterator(),
timeout=_application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
if response_future_call.code() is grpc.StatusCode.DEADLINE_EXCEEDED:
return _SATISFACTORY_OUTCOME
else:
return _UNSATISFACTORY_OUTCOME
_IMPLEMENTATIONS = {
Scenario.UNARY_UNARY: _run_unary_unary,
Scenario.UNARY_STREAM: _run_unary_stream,
Scenario.STREAM_UNARY: _run_stream_unary,
Scenario.STREAM_STREAM: _run_stream_stream,
Scenario.CONCURRENT_STREAM_UNARY: _run_concurrent_stream_unary,
Scenario.CONCURRENT_STREAM_STREAM: _run_concurrent_stream_stream,
Scenario.CANCEL_UNARY_UNARY: _run_cancel_unary_unary,
Scenario.INFINITE_REQUEST_STREAM: _run_infinite_request_stream,
}
def run(scenario, channel):
stub = services_pb2_grpc.FirstServiceStub(channel)
try:
return _IMPLEMENTATIONS[scenario](stub)
except grpc.RpcError as rpc_error:
return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(),
rpc_error.details())
|