aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_adapter/_types.py
blob: f9e18f0bb3f2ca534866a703755e54322eccff63 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import abc
import collections
import enum

import six

from grpc._cython import cygrpc


class GrpcChannelArgumentKeys(enum.Enum):
  """Mirrors keys used in grpc_channel_args for GRPC-specific arguments."""
  SSL_TARGET_NAME_OVERRIDE = 'grpc.ssl_target_name_override'


@enum.unique
class CallError(enum.IntEnum):
  """Mirrors grpc_call_error in the C core."""
  OK                        = cygrpc.CallError.ok
  ERROR                     = cygrpc.CallError.error
  ERROR_NOT_ON_SERVER       = cygrpc.CallError.not_on_server
  ERROR_NOT_ON_CLIENT       = cygrpc.CallError.not_on_client
  ERROR_ALREADY_ACCEPTED    = cygrpc.CallError.already_accepted
  ERROR_ALREADY_INVOKED     = cygrpc.CallError.already_invoked
  ERROR_NOT_INVOKED         = cygrpc.CallError.not_invoked
  ERROR_ALREADY_FINISHED    = cygrpc.CallError.already_finished
  ERROR_TOO_MANY_OPERATIONS = cygrpc.CallError.too_many_operations
  ERROR_INVALID_FLAGS       = cygrpc.CallError.invalid_flags
  ERROR_INVALID_METADATA    = cygrpc.CallError.invalid_metadata


@enum.unique
class StatusCode(enum.IntEnum):
  """Mirrors grpc_status_code in the C core."""
  OK                  = cygrpc.StatusCode.ok
  CANCELLED           = cygrpc.StatusCode.cancelled
  UNKNOWN             = cygrpc.StatusCode.unknown
  INVALID_ARGUMENT    = cygrpc.StatusCode.invalid_argument
  DEADLINE_EXCEEDED   = cygrpc.StatusCode.deadline_exceeded
  NOT_FOUND           = cygrpc.StatusCode.not_found
  ALREADY_EXISTS      = cygrpc.StatusCode.already_exists
  PERMISSION_DENIED   = cygrpc.StatusCode.permission_denied
  RESOURCE_EXHAUSTED  = cygrpc.StatusCode.resource_exhausted
  FAILED_PRECONDITION = cygrpc.StatusCode.failed_precondition
  ABORTED             = cygrpc.StatusCode.aborted
  OUT_OF_RANGE        = cygrpc.StatusCode.out_of_range
  UNIMPLEMENTED       = cygrpc.StatusCode.unimplemented
  INTERNAL            = cygrpc.StatusCode.internal
  UNAVAILABLE         = cygrpc.StatusCode.unavailable
  DATA_LOSS           = cygrpc.StatusCode.data_loss
  UNAUTHENTICATED     = cygrpc.StatusCode.unauthenticated


@enum.unique
class OpWriteFlags(enum.IntEnum):
  """Mirrors defined write-flag constants in the C core."""
  WRITE_BUFFER_HINT = cygrpc.WriteFlag.buffer_hint
  WRITE_NO_COMPRESS = cygrpc.WriteFlag.no_compress


@enum.unique
class OpType(enum.IntEnum):
  """Mirrors grpc_op_type in the C core."""
  SEND_INITIAL_METADATA   = cygrpc.OperationType.send_initial_metadata
  SEND_MESSAGE            = cygrpc.OperationType.send_message
  SEND_CLOSE_FROM_CLIENT  = cygrpc.OperationType.send_close_from_client
  SEND_STATUS_FROM_SERVER = cygrpc.OperationType.send_status_from_server
  RECV_INITIAL_METADATA   = cygrpc.OperationType.receive_initial_metadata
  RECV_MESSAGE            = cygrpc.OperationType.receive_message
  RECV_STATUS_ON_CLIENT   = cygrpc.OperationType.receive_status_on_client
  RECV_CLOSE_ON_SERVER    = cygrpc.OperationType.receive_close_on_server


@enum.unique
class EventType(enum.IntEnum):
  """Mirrors grpc_completion_type in the C core."""
  QUEUE_SHUTDOWN = cygrpc.CompletionType.queue_shutdown
  QUEUE_TIMEOUT  = cygrpc.CompletionType.queue_timeout
  OP_COMPLETE    = cygrpc.CompletionType.operation_complete


@enum.unique
class ConnectivityState(enum.IntEnum):
  """Mirrors grpc_connectivity_state in the C core."""
  IDLE              = cygrpc.ConnectivityState.idle
  CONNECTING        = cygrpc.ConnectivityState.connecting
  READY             = cygrpc.ConnectivityState.ready
  TRANSIENT_FAILURE = cygrpc.ConnectivityState.transient_failure
  FATAL_FAILURE     = cygrpc.ConnectivityState.fatal_failure


class Status(collections.namedtuple(
    'Status', [
        'code',
        'details',
    ])):
  """The end status of a GRPC call.

  Attributes:
    code (StatusCode): ...
    details (str): ...
  """


class CallDetails(collections.namedtuple(
    'CallDetails', [
        'method',
        'host',
        'deadline',
    ])):
  """Provides information to the server about the client's call.

  Attributes:
    method (str): ...
    host (str): ...
    deadline (float): ...
  """


class OpArgs(collections.namedtuple(
    'OpArgs', [
        'type',
        'initial_metadata',
        'trailing_metadata',
        'message',
        'status',
        'write_flags',
    ])):
  """Arguments passed into a GRPC operation.

  Attributes:
    type (OpType): ...
    initial_metadata (sequence of 2-sequence of str): Only valid if type ==
      OpType.SEND_INITIAL_METADATA, else is None.
    trailing_metadata (sequence of 2-sequence of str): Only valid if type ==
      OpType.SEND_STATUS_FROM_SERVER, else is None.
    message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None.
    status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else
      is None.
    write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values.
  """

  @staticmethod
  def send_initial_metadata(initial_metadata):
    return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None, 0)

  @staticmethod
  def send_message(message, flags):
    return OpArgs(OpType.SEND_MESSAGE, None, None, message, None, flags)

  @staticmethod
  def send_close_from_client():
    return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None, 0)

  @staticmethod
  def send_status_from_server(trailing_metadata, status_code, status_details):
    return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details), 0)

  @staticmethod
  def recv_initial_metadata():
    return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None, 0);

  @staticmethod
  def recv_message():
    return OpArgs(OpType.RECV_MESSAGE, None, None, None, None, 0)

  @staticmethod
  def recv_status_on_client():
    return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None, 0)

  @staticmethod
  def recv_close_on_server():
    return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None, 0)


class OpResult(collections.namedtuple(
    'OpResult', [
        'type',
        'initial_metadata',
        'trailing_metadata',
        'message',
        'status',
        'cancelled',
    ])):
  """Results received from a GRPC operation.

  Attributes:
    type (OpType): ...
    initial_metadata (sequence of 2-sequence of str): Only valid if type ==
      OpType.RECV_INITIAL_METADATA, else is None.
    trailing_metadata (sequence of 2-sequence of str): Only valid if type ==
      OpType.RECV_STATUS_ON_CLIENT, else is None.
    message (bytes): Only valid if type == OpType.RECV_MESSAGE, else is None.
    status (Status): Only valid if type == OpType.RECV_STATUS_ON_CLIENT, else
      is None.
    cancelled (bool): Only valid if type == OpType.RECV_CLOSE_ON_SERVER, else
      is None.
  """


class Event(collections.namedtuple(
    'Event', [
        'type',
        'tag',
        'call',
        'call_details',
        'results',
        'success',
    ])):
  """An event received from a GRPC completion queue.

  Attributes:
    type (EventType): ...
    tag (object): ...
    call (Call): The Call object associated with this event (if there is one,
      else None).
    call_details (CallDetails): The call details associated with the
      server-side call (if there is such information, else None).
    results (list of OpResult): ...
    success (bool): ...
  """


class CompletionQueue(six.with_metaclass(abc.ABCMeta)):

  @abc.abstractmethod
  def __init__(self):
    pass

  def __iter__(self):
    """This class may be iterated over.

    This is the equivalent of calling next() repeatedly with an absolute
    deadline of None (i.e. no deadline).
    """
    return self

  def __next__(self):
    return self.next()

  @abc.abstractmethod
  def next(self, deadline=float('+inf')):
    """Get the next event on this completion queue.

    Args:
      deadline (float): absolute deadline in seconds from the Python epoch, or
        None for no deadline.

    Returns:
      Event: ...
    """
    pass

  @abc.abstractmethod
  def shutdown(self):
    """Begin the shutdown process of this completion queue.

    Note that this does not immediately destroy the completion queue.
    Nevertheless, user code should not pass it around after invoking this.
    """
    return None


class Call(six.with_metaclass(abc.ABCMeta)):

  @abc.abstractmethod
  def start_batch(self, ops, tag):
    """Start a batch of operations.

    Args:
      ops (sequence of OpArgs): ...
      tag (object): ...

    Returns:
      CallError: ...
    """
    return CallError.ERROR

  @abc.abstractmethod
  def cancel(self, code=None, details=None):
    """Cancel the call.

    Args:
      code (int): Status code to cancel with (on the server side). If
        specified, so must `details`.
      details (str): Status details to cancel with (on the server side). If
        specified, so must `code`.

    Returns:
      CallError: ...
    """
    return CallError.ERROR

  @abc.abstractmethod
  def peer(self):
    """Get the peer of this call.

    Returns:
      str: the peer of this call.
    """
    return None

  def set_credentials(self, creds):
    """Set per-call credentials.

    Args:
      creds (CallCredentials): Credentials to be set for this call.
    """
    return None


class Channel(six.with_metaclass(abc.ABCMeta)):

  @abc.abstractmethod
  def __init__(self, target, args, credentials=None):
    """Initialize a Channel.

    Args:
      target (str): ...
      args (sequence of 2-sequence of str, (str|integer)): ...
      credentials (ChannelCredentials): If None, create an insecure channel,
        else create a secure channel using the client credentials.
    """

  @abc.abstractmethod
  def create_call(self, completion_queue, method, host, deadline=float('+inf')):
    """Create a call from this channel.

    Args:
      completion_queue (CompletionQueue): ...
      method (str): ...
      host (str): ...
      deadline (float): absolute deadline in seconds from the Python epoch, or
        None for no deadline.

    Returns:
      Call: call object associated with this Channel and passed parameters.
    """
    return None

  @abc.abstractmethod
  def check_connectivity_state(self, try_to_connect):
    """Check and optionally repair the connectivity state of the channel.

    Args:
      try_to_connect (bool): whether or not to try to connect the channel if
      disconnected.

    Returns:
      ConnectivityState: state of the channel at the time of this invocation.
    """
    return None

  @abc.abstractmethod
  def watch_connectivity_state(self, last_observed_state, deadline,
                               completion_queue, tag):
    """Watch for connectivity state changes from the last_observed_state.

    Args:
      last_observed_state (ConnectivityState): ...
      deadline (float): ...
      completion_queue (CompletionQueue): ...
      tag (object) ...
    """

  @abc.abstractmethod
  def target(self):
    """Get the target of this channel.

    Returns:
      str: the target of this channel.
    """
    return None


class Server(six.with_metaclass(abc.ABCMeta)):

  @abc.abstractmethod
  def __init__(self, completion_queue, args):
    """Initialize a server.

    Args:
      completion_queue (CompletionQueue): ...
      args (sequence of 2-sequence of str, (str|integer)): ...
    """

  @abc.abstractmethod
  def add_http2_port(self, address, credentials=None):
    """Adds an HTTP/2 address+port to the server.

    Args:
      address (str): ...
      credentials (ServerCredentials): If None, create an insecure port, else
        create a secure port using the server credentials.
    """

  @abc.abstractmethod
  def start(self):
    """Starts the server."""

  @abc.abstractmethod
  def shutdown(self, tag=None):
    """Shuts down the server. Does not immediately destroy the server.

    Args:
      tag (object): if not None, have the server place an event on its
        completion queue notifying it when this server has completely shut down.
    """

  @abc.abstractmethod
  def request_call(self, completion_queue, tag):
    """Requests a call from the server on the server's completion queue.

    Args:
      completion_queue (CompletionQueue): Completion queue for the call. May be
        the same as the server's completion queue.
      tag (object) ...
    """