aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
blob: a2d765546a5deb3d17533d1a235521f2f994690c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cimport cpython

import threading
import time

cdef int _INTERRUPT_CHECK_PERIOD_MS = 200


cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline):
  cdef gpr_timespec c_increment
  cdef gpr_timespec c_timeout
  cdef gpr_timespec c_deadline
  c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
  if deadline is None:
    c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
  else:
    c_deadline = _timespec_from_time(deadline)

  with nogil:
    while True:
      c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
      if gpr_time_cmp(c_timeout, c_deadline) > 0:
        c_timeout = c_deadline
      c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
      if (c_event.type != GRPC_QUEUE_TIMEOUT or
          gpr_time_cmp(c_timeout, c_deadline) == 0):
        break

      # Handle any signals
      with gil:
        cpython.PyErr_CheckSignals()
  return c_event


cdef _interpret_event(grpc_event c_event):
  cdef _Tag tag
  if c_event.type == GRPC_QUEUE_TIMEOUT:
    # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
    return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
  elif c_event.type == GRPC_QUEUE_SHUTDOWN:
    # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
    return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None)
  else:
    tag = <_Tag>c_event.tag
    # We receive event tags only after they've been inc-ref'd elsewhere in
    # the code.
    cpython.Py_DECREF(tag)
    return tag, tag.event(c_event)


cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
  cdef grpc_event c_event = _next(c_completion_queue, deadline)
  return _interpret_event(c_event)


cdef class CompletionQueue:

  def __cinit__(self, shutdown_cq=False):
    cdef grpc_completion_queue_attributes c_attrs
    grpc_init()
    if shutdown_cq:
      c_attrs.version = 1
      c_attrs.cq_completion_type = GRPC_CQ_NEXT
      c_attrs.cq_polling_type = GRPC_CQ_NON_LISTENING
      self.c_completion_queue = grpc_completion_queue_create(
          grpc_completion_queue_factory_lookup(&c_attrs), &c_attrs, NULL);
    else:
      self.c_completion_queue = grpc_completion_queue_create_for_next(NULL)
    self.is_shutting_down = False
    self.is_shutdown = False

  cdef _interpret_event(self, grpc_event c_event):
    unused_tag, event = _interpret_event(c_event)
    if event.completion_type == GRPC_QUEUE_SHUTDOWN:
      self.is_shutdown = True
    return event

  # We name this 'poll' to avoid problems with CPython's expectations for
  # 'special' methods (like next and __next__).
  def poll(self, deadline=None):
    return self._interpret_event(_next(self.c_completion_queue, deadline))

  def shutdown(self):
    with nogil:
      grpc_completion_queue_shutdown(self.c_completion_queue)
    self.is_shutting_down = True

  def clear(self):
    if not self.is_shutting_down:
      raise ValueError('queue must be shutting down to be cleared')
    while self.poll().type != GRPC_QUEUE_SHUTDOWN:
      pass

  def __dealloc__(self):
    cdef gpr_timespec c_deadline
    c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
    if self.c_completion_queue != NULL:
      # Ensure shutdown
      if not self.is_shutting_down:
        grpc_completion_queue_shutdown(self.c_completion_queue)
      # Pump the queue (All outstanding calls should have been cancelled)
      while not self.is_shutdown:
        event = grpc_completion_queue_next(
            self.c_completion_queue, c_deadline, NULL)
        self._interpret_event(event)
      grpc_completion_queue_destroy(self.c_completion_queue)
    grpc_shutdown()