diff options
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi')
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi | 453 |
1 files changed, 453 insertions, 0 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi new file mode 100644 index 0000000000..f9a1b2856d --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi @@ -0,0 +1,453 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +cimport cpython +from libc cimport string +from libc.stdlib cimport malloc, free +import errno +gevent_g = None +gevent_socket = None +gevent_hub = None +gevent_event = None +g_event = None +g_pool = None + +cdef grpc_error* grpc_error_none(): + return <grpc_error*>0 + +cdef grpc_error* socket_error(str syscall, str err): + error_str = "{} failed: {}".format(syscall, err) + error_bytes = str_to_bytes(error_str) + return grpc_socket_error(error_bytes) + +cdef resolved_addr_to_tuple(grpc_resolved_address* address): + cdef char* res_str + port = grpc_sockaddr_get_port(address) + str_len = grpc_sockaddr_to_string(&res_str, address, 0) + byte_str = _decode(<bytes>res_str[:str_len]) + if byte_str.endswith(':' + str(port)): + byte_str = byte_str[:(0 - len(str(port)) - 1)] + byte_str = byte_str.lstrip('[') + byte_str = byte_str.rstrip(']') + byte_str = '{}'.format(byte_str) + return byte_str, port + +cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length): + cdef grpc_resolved_address c_addr + string.memcpy(<void*>c_addr.addr, <void*> address, length) + c_addr.len = length + return resolved_addr_to_tuple(&c_addr) + +cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length): + cdef grpc_resolved_address c_addr + string.memcpy(<void*>c_addr.addr, <void*> address, length) + c_addr.len = length + return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4' + +cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups): + cdef grpc_resolved_addresses* addresses + tups_set = set((tup[4][0], tup[4][1]) for tup in tups) + addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses)) + addresses.naddrs = len(tups_set) + addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set)) + i = 0 + for tup in set(tups_set): + hostname = str_to_bytes(tup[0]) + grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1]) + i += 1 + return addresses + +def _spawn_greenlet(*args): + greenlet = g_pool.spawn(*args) + +############################### +### socket implementation ### +############################### + +cdef class SocketWrapper: + def __cinit__(self): + self.sockopts = [] + self.socket = None + self.c_socket = NULL + self.c_buffer = NULL + self.len = 0 + +cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil: + sw = SocketWrapper() + sw.c_socket = socket + sw.sockopts = [] + cpython.Py_INCREF(sw) + # Python doesn't support AF_UNSPEC sockets, so we defer creation until + # bind/connect when we know what type of socket we need + sw.socket = None + sw.closed = False + sw.accepting_socket = NULL + socket.impl = <void*>sw + return grpc_error_none() + +cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple): + try: + socket_wrapper.socket.connect(addr_tuple) + socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + grpc_error_none()) + except IOError as io_error: + socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + socket_error("connect", str(io_error))) + g_event.set() + +def socket_connect_async(socket_wrapper, addr_tuple): + socket_connect_async_cython(socket_wrapper, addr_tuple) + +cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t addr_len, + grpc_custom_connect_callback cb) with gil: + py_socket = None + socket_wrapper = <SocketWrapper>socket.impl + socket_wrapper.connect_cb = cb + addr_tuple = sockaddr_to_tuple(addr, addr_len) + if sockaddr_is_ipv4(addr, addr_len): + py_socket = gevent_socket.socket(gevent_socket.AF_INET) + else: + py_socket = gevent_socket.socket(gevent_socket.AF_INET6) + applysockopts(py_socket) + socket_wrapper.socket = py_socket + _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple) + +cdef void socket_destroy(grpc_custom_socket* socket) with gil: + cpython.Py_DECREF(<SocketWrapper>socket.impl) + +cdef void socket_shutdown(grpc_custom_socket* socket) with gil: + try: + (<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR) + except IOError as io_error: + if io_error.errno != errno.ENOTCONN: + raise io_error + +cdef void socket_close(grpc_custom_socket* socket, + grpc_custom_close_callback cb) with gil: + socket_wrapper = (<SocketWrapper>socket.impl) + if socket_wrapper.socket is not None: + socket_wrapper.socket.close() + socket_wrapper.closed = True + socket_wrapper.close_cb = cb + # Delay the close callback until the accept() call has picked it up + if socket_wrapper.accepting_socket != NULL: + return + socket_wrapper.close_cb(socket) + +def socket_sendmsg(socket, write_bytes): + try: + return socket.sendmsg(write_bytes) + except AttributeError: + # sendmsg not available on all Pythons/Platforms + return socket.send(b''.join(write_bytes)) + +cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes): + try: + while write_bytes: + sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes) + while sent_byte_count > 0: + if sent_byte_count < len(write_bytes[0]): + write_bytes[0] = write_bytes[0][sent_byte_count:] + sent_byte_count = 0 + else: + sent_byte_count -= len(write_bytes[0]) + write_bytes = write_bytes[1:] + socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + grpc_error_none()) + except IOError as io_error: + socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + socket_error("send", str(io_error))) + g_event.set() + +def socket_write_async(socket_wrapper, write_bytes): + socket_write_async_cython(socket_wrapper, write_bytes) + +cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer, + grpc_custom_write_callback cb) with gil: + cdef char* start + sw = <SocketWrapper>socket.impl + sw.write_cb = cb + write_bytes = [] + for i in range(buffer.count): + start = grpc_slice_buffer_start(buffer, i) + length = grpc_slice_buffer_length(buffer, i) + write_bytes.append(<bytes>start[:length]) + _spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes) + +cdef socket_read_async_cython(SocketWrapper socket_wrapper): + cdef char* buff_char_arr + try: + buff_str = socket_wrapper.socket.recv(socket_wrapper.len) + buff_char_arr = buff_str + string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str)) + socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + len(buff_str), grpc_error_none()) + except IOError as io_error: + socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + -1, socket_error("recv", str(io_error))) + g_event.set() + +def socket_read_async(socket_wrapper): + socket_read_async_cython(socket_wrapper) + +cdef void socket_read(grpc_custom_socket* socket, char* buffer, + size_t length, grpc_custom_read_callback cb) with gil: + sw = <SocketWrapper>socket.impl + sw.read_cb = cb + sw.c_buffer = buffer + sw.len = length + _spawn_greenlet(socket_read_async, sw) + +cdef grpc_error* socket_getpeername(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + int* length) with gil: + cdef char* src_buf + peer = (<SocketWrapper>socket.impl).socket.getpeername() + + cdef grpc_resolved_address c_addr + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() + +cdef grpc_error* socket_getsockname(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + int* length) with gil: + cdef char* src_buf + cdef grpc_resolved_address c_addr + if (<SocketWrapper>socket.impl).socket is None: + peer = ('0.0.0.0', 0) + else: + peer = (<SocketWrapper>socket.impl).socket.getsockname() + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() + +def applysockopts(s): + s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1) + s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True) + +cdef grpc_error* socket_bind(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + size_t len, int flags) with gil: + addr_tuple = sockaddr_to_tuple(addr, len) + try: + try: + py_socket = gevent_socket.socket(gevent_socket.AF_INET) + applysockopts(py_socket) + py_socket.bind(addr_tuple) + except gevent_socket.gaierror as e: + py_socket = gevent_socket.socket(gevent_socket.AF_INET6) + applysockopts(py_socket) + py_socket.bind(addr_tuple) + (<SocketWrapper>socket.impl).socket = py_socket + except IOError as io_error: + return socket_error("bind", str(io_error)) + else: + return grpc_error_none() + +cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil: + (<SocketWrapper>socket.impl).socket.listen(50) + return grpc_error_none() + +cdef void accept_callback_cython(SocketWrapper s): + try: + conn, address = s.socket.accept() + sw = SocketWrapper() + sw.closed = False + sw.c_socket = s.accepting_socket + sw.sockopts = [] + sw.socket = conn + sw.c_socket.impl = <void*>sw + sw.accepting_socket = NULL + cpython.Py_INCREF(sw) + s.accepting_socket = NULL + s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none()) + except IOError as io_error: + #TODO actual error + s.accepting_socket = NULL + s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket, + socket_error("accept", str(io_error))) + if s.closed: + s.close_cb(<grpc_custom_socket*>s.c_socket) + g_event.set() + +def socket_accept_async(s): + accept_callback_cython(s) + +cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client, + grpc_custom_accept_callback cb) with gil: + sw = <SocketWrapper>socket.impl + sw.accepting_socket = client + sw.accept_cb = cb + _spawn_greenlet(socket_accept_async, sw) + +##################################### +######Resolver implementation ####### +##################################### + +cdef class ResolveWrapper: + def __cinit__(self): + self.c_resolver = NULL + self.c_host = NULL + self.c_port = NULL + +cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper): + try: + res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port) + grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver, + tuples_to_resolvaddr(res), grpc_error_none()) + except IOError as io_error: + grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver, + <grpc_resolved_addresses*>0, + socket_error("getaddrinfo", str(io_error))) + g_event.set() + +def socket_resolve_async_python(resolve_wrapper): + socket_resolve_async_cython(resolve_wrapper) + +cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil: + rw = ResolveWrapper() + rw.c_resolver = r + rw.c_host = host + rw.c_port = port + _spawn_greenlet(socket_resolve_async_python, rw) + +cdef grpc_error* socket_resolve(char* host, char* port, + grpc_resolved_addresses** res) with gil: + try: + result = gevent_socket.getaddrinfo(host, port) + res[0] = tuples_to_resolvaddr(result) + return grpc_error_none() + except IOError as io_error: + return socket_error("getaddrinfo", str(io_error)) + +############################### +### timer implementation ###### +############################### + +cdef class TimerWrapper: + def __cinit__(self, deadline): + self.timer = gevent_hub.get_hub().loop.timer(deadline) + self.event = None + + def start(self): + self.event = gevent_event.Event() + self.timer.start(self.on_finish) + + def on_finish(self): + grpc_custom_timer_callback(self.c_timer, grpc_error_none()) + self.timer.stop() + g_event.set() + + def stop(self): + self.event.set() + self.timer.stop() + +cdef void timer_start(grpc_custom_timer* t) with gil: + timer = TimerWrapper(t.timeout_ms / 1000.0) + timer.c_timer = t + t.timer = <void*>timer + timer.start() + +cdef void timer_stop(grpc_custom_timer* t) with gil: + time_wrapper = <object>t.timer + time_wrapper.stop() + +############################### +### pollset implementation ### +############################### + +cdef void init_loop() with gil: + pass + +cdef void destroy_loop() with gil: + g_pool.join() + +cdef void kick_loop() with gil: + g_event.set() + +cdef void run_loop(size_t timeout_ms) with gil: + timeout = timeout_ms / 1000.0 + if timeout_ms > 0: + g_event.wait(timeout) + g_event.clear() + +############################### +### Initializer ############### +############################### + +cdef grpc_socket_vtable gevent_socket_vtable +cdef grpc_custom_resolver_vtable gevent_resolver_vtable +cdef grpc_custom_timer_vtable gevent_timer_vtable +cdef grpc_custom_poller_vtable gevent_pollset_vtable + +def init_grpc_gevent(): + # Lazily import gevent + global gevent_socket + global gevent_g + global gevent_hub + global gevent_event + global g_event + global g_pool + import gevent + gevent_g = gevent + import gevent.socket + gevent_socket = gevent.socket + import gevent.hub + gevent_hub = gevent.hub + import gevent.event + gevent_event = gevent.event + import gevent.pool + + g_event = gevent.event.Event() + g_pool = gevent.pool.Group() + + def cb_func(cb, args): + _spawn_greenlet(cb, *args) + set_async_callback_func(cb_func) + + gevent_resolver_vtable.resolve = socket_resolve + gevent_resolver_vtable.resolve_async = socket_resolve_async + + gevent_socket_vtable.init = socket_init + gevent_socket_vtable.connect = socket_connect + gevent_socket_vtable.destroy = socket_destroy + gevent_socket_vtable.shutdown = socket_shutdown + gevent_socket_vtable.close = socket_close + gevent_socket_vtable.write = socket_write + gevent_socket_vtable.read = socket_read + gevent_socket_vtable.getpeername = socket_getpeername + gevent_socket_vtable.getsockname = socket_getsockname + gevent_socket_vtable.bind = socket_bind + gevent_socket_vtable.listen = socket_listen + gevent_socket_vtable.accept = socket_accept + + gevent_timer_vtable.start = timer_start + gevent_timer_vtable.stop = timer_stop + + gevent_pollset_vtable.init = init_loop + gevent_pollset_vtable.poll = run_loop + gevent_pollset_vtable.kick = kick_loop + gevent_pollset_vtable.shutdown = destroy_loop + + grpc_custom_iomgr_init(&gevent_socket_vtable, + &gevent_resolver_vtable, + &gevent_timer_vtable, + &gevent_pollset_vtable) |