# Copyright 2018 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # distutils: language=c++ cimport cpython from libc cimport string from libc.stdlib cimport malloc, free import errno gevent_g = None gevent_socket = None gevent_hub = None gevent_event = None g_event = None g_pool = None cdef grpc_error* grpc_error_none(): return 0 cdef grpc_error* socket_error(str syscall, str err): error_str = "{} failed: {}".format(syscall, err) error_bytes = str_to_bytes(error_str) return grpc_socket_error(error_bytes) cdef resolved_addr_to_tuple(grpc_resolved_address* address): cdef char* res_str port = grpc_sockaddr_get_port(address) str_len = grpc_sockaddr_to_string(&res_str, address, 0) byte_str = _decode(res_str[:str_len]) if byte_str.endswith(':' + str(port)): byte_str = byte_str[:(0 - len(str(port)) - 1)] byte_str = byte_str.lstrip('[') byte_str = byte_str.rstrip(']') byte_str = '{}'.format(byte_str) return byte_str, port cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length): cdef grpc_resolved_address c_addr string.memcpy(c_addr.addr, address, length) c_addr.len = length return resolved_addr_to_tuple(&c_addr) cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length): cdef grpc_resolved_address c_addr string.memcpy(c_addr.addr, address, length) c_addr.len = length return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4' cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups): cdef grpc_resolved_addresses* addresses tups_set = set((tup[4][0], tup[4][1]) for tup in tups) addresses = malloc(sizeof(grpc_resolved_addresses)) addresses.naddrs = len(tups_set) addresses.addrs = malloc(sizeof(grpc_resolved_address) * len(tups_set)) i = 0 for tup in set(tups_set): hostname = str_to_bytes(tup[0]) grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1]) i += 1 return addresses def _spawn_greenlet(*args): greenlet = g_pool.spawn(*args) ############################### ### socket implementation ### ############################### cdef class SocketWrapper: def __cinit__(self): self.sockopts = [] self.socket = None self.c_socket = NULL self.c_buffer = NULL self.len = 0 cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil: sw = SocketWrapper() sw.c_socket = socket sw.sockopts = [] cpython.Py_INCREF(sw) # Python doesn't support AF_UNSPEC sockets, so we defer creation until # bind/connect when we know what type of socket we need sw.socket = None sw.closed = False sw.accepting_socket = NULL socket.impl = sw return grpc_error_none() cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple): try: socket_wrapper.socket.connect(addr_tuple) socket_wrapper.connect_cb(socket_wrapper.c_socket, grpc_error_none()) except IOError as io_error: socket_wrapper.connect_cb(socket_wrapper.c_socket, socket_error("connect", str(io_error))) g_event.set() def socket_connect_async(socket_wrapper, addr_tuple): socket_connect_async_cython(socket_wrapper, addr_tuple) cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr, size_t addr_len, grpc_custom_connect_callback cb) with gil: py_socket = None socket_wrapper = socket.impl socket_wrapper.connect_cb = cb addr_tuple = sockaddr_to_tuple(addr, addr_len) if sockaddr_is_ipv4(addr, addr_len): py_socket = gevent_socket.socket(gevent_socket.AF_INET) else: py_socket = gevent_socket.socket(gevent_socket.AF_INET6) applysockopts(py_socket) socket_wrapper.socket = py_socket _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple) cdef void socket_destroy(grpc_custom_socket* socket) with gil: cpython.Py_DECREF(socket.impl) cdef void socket_shutdown(grpc_custom_socket* socket) with gil: try: (socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR) except IOError as io_error: if io_error.errno != errno.ENOTCONN: raise io_error cdef void socket_close(grpc_custom_socket* socket, grpc_custom_close_callback cb) with gil: socket_wrapper = (socket.impl) if socket_wrapper.socket is not None: socket_wrapper.socket.close() socket_wrapper.closed = True socket_wrapper.close_cb = cb # Delay the close callback until the accept() call has picked it up if socket_wrapper.accepting_socket != NULL: return socket_wrapper.close_cb(socket) def socket_sendmsg(socket, write_bytes): try: return socket.sendmsg(write_bytes) except AttributeError: # sendmsg not available on all Pythons/Platforms return socket.send(b''.join(write_bytes)) cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes): try: while write_bytes: sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes) while sent_byte_count > 0: if sent_byte_count < len(write_bytes[0]): write_bytes[0] = write_bytes[0][sent_byte_count:] sent_byte_count = 0 else: sent_byte_count -= len(write_bytes[0]) write_bytes = write_bytes[1:] socket_wrapper.write_cb(socket_wrapper.c_socket, grpc_error_none()) except IOError as io_error: socket_wrapper.write_cb(socket_wrapper.c_socket, socket_error("send", str(io_error))) g_event.set() def socket_write_async(socket_wrapper, write_bytes): socket_write_async_cython(socket_wrapper, write_bytes) cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer, grpc_custom_write_callback cb) with gil: cdef char* start sw = socket.impl sw.write_cb = cb write_bytes = [] for i in range(buffer.count): start = grpc_slice_buffer_start(buffer, i) length = grpc_slice_buffer_length(buffer, i) write_bytes.append(start[:length]) _spawn_greenlet(socket_write_async, socket.impl, write_bytes) cdef socket_read_async_cython(SocketWrapper socket_wrapper): cdef char* buff_char_arr try: buff_str = socket_wrapper.socket.recv(socket_wrapper.len) buff_char_arr = buff_str string.memcpy(socket_wrapper.c_buffer, buff_char_arr, len(buff_str)) socket_wrapper.read_cb(socket_wrapper.c_socket, len(buff_str), grpc_error_none()) except IOError as io_error: socket_wrapper.read_cb(socket_wrapper.c_socket, -1, socket_error("recv", str(io_error))) g_event.set() def socket_read_async(socket_wrapper): socket_read_async_cython(socket_wrapper) cdef void socket_read(grpc_custom_socket* socket, char* buffer, size_t length, grpc_custom_read_callback cb) with gil: sw = socket.impl sw.read_cb = cb sw.c_buffer = buffer sw.len = length _spawn_greenlet(socket_read_async, sw) cdef grpc_error* socket_getpeername(grpc_custom_socket* socket, const grpc_sockaddr* addr, int* length) with gil: cdef char* src_buf peer = (socket.impl).socket.getpeername() cdef grpc_resolved_address c_addr hostname = str_to_bytes(peer[0]) grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) string.memcpy(addr, c_addr.addr, c_addr.len) length[0] = c_addr.len return grpc_error_none() cdef grpc_error* socket_getsockname(grpc_custom_socket* socket, const grpc_sockaddr* addr, int* length) with gil: cdef char* src_buf cdef grpc_resolved_address c_addr if (socket.impl).socket is None: peer = ('0.0.0.0', 0) else: peer = (socket.impl).socket.getsockname() hostname = str_to_bytes(peer[0]) grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) string.memcpy(addr, c_addr.addr, c_addr.len) length[0] = c_addr.len return grpc_error_none() def applysockopts(s): s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1) s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True) cdef grpc_error* socket_bind(grpc_custom_socket* socket, const grpc_sockaddr* addr, size_t len, int flags) with gil: addr_tuple = sockaddr_to_tuple(addr, len) try: try: py_socket = gevent_socket.socket(gevent_socket.AF_INET) applysockopts(py_socket) py_socket.bind(addr_tuple) except gevent_socket.gaierror as e: py_socket = gevent_socket.socket(gevent_socket.AF_INET6) applysockopts(py_socket) py_socket.bind(addr_tuple) (socket.impl).socket = py_socket except IOError as io_error: return socket_error("bind", str(io_error)) else: return grpc_error_none() cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil: (socket.impl).socket.listen(50) return grpc_error_none() cdef void accept_callback_cython(SocketWrapper s) except *: try: conn, address = s.socket.accept() sw = SocketWrapper() sw.closed = False sw.c_socket = s.accepting_socket sw.sockopts = [] sw.socket = conn sw.c_socket.impl = sw sw.accepting_socket = NULL cpython.Py_INCREF(sw) s.accepting_socket = NULL s.accept_cb(s.c_socket, sw.c_socket, grpc_error_none()) except IOError as io_error: #TODO actual error s.accepting_socket = NULL s.accept_cb(s.c_socket, s.accepting_socket, socket_error("accept", str(io_error))) if s.closed: s.close_cb(s.c_socket) g_event.set() def socket_accept_async(s): accept_callback_cython(s) cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client, grpc_custom_accept_callback cb) with gil: sw = socket.impl sw.accepting_socket = client sw.accept_cb = cb _spawn_greenlet(socket_accept_async, sw) ##################################### ######Resolver implementation ####### ##################################### cdef class ResolveWrapper: def __cinit__(self): self.c_resolver = NULL self.c_host = NULL self.c_port = NULL cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper): try: res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port) grpc_custom_resolve_callback(resolve_wrapper.c_resolver, tuples_to_resolvaddr(res), grpc_error_none()) except IOError as io_error: grpc_custom_resolve_callback(resolve_wrapper.c_resolver, 0, socket_error("getaddrinfo", str(io_error))) g_event.set() def socket_resolve_async_python(resolve_wrapper): socket_resolve_async_cython(resolve_wrapper) cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil: rw = ResolveWrapper() rw.c_resolver = r rw.c_host = host rw.c_port = port _spawn_greenlet(socket_resolve_async_python, rw) cdef grpc_error* socket_resolve(char* host, char* port, grpc_resolved_addresses** res) with gil: try: result = gevent_socket.getaddrinfo(host, port) res[0] = tuples_to_resolvaddr(result) return grpc_error_none() except IOError as io_error: return socket_error("getaddrinfo", str(io_error)) ############################### ### timer implementation ###### ############################### cdef class TimerWrapper: def __cinit__(self, deadline): self.timer = gevent_hub.get_hub().loop.timer(deadline) self.event = None def start(self): self.event = gevent_event.Event() self.timer.start(self.on_finish) def on_finish(self): grpc_custom_timer_callback(self.c_timer, grpc_error_none()) self.timer.stop() g_event.set() def stop(self): self.event.set() self.timer.stop() cdef void timer_start(grpc_custom_timer* t) with gil: timer = TimerWrapper(t.timeout_ms / 1000.0) timer.c_timer = t t.timer = timer timer.start() cdef void timer_stop(grpc_custom_timer* t) with gil: time_wrapper = t.timer time_wrapper.stop() ############################### ### pollset implementation ### ############################### cdef void init_loop() with gil: pass cdef void destroy_loop() with gil: g_pool.join() cdef void kick_loop() with gil: g_event.set() cdef void run_loop(size_t timeout_ms) with gil: timeout = timeout_ms / 1000.0 if timeout_ms > 0: g_event.wait(timeout) g_event.clear() ############################### ### Initializer ############### ############################### cdef grpc_socket_vtable gevent_socket_vtable cdef grpc_custom_resolver_vtable gevent_resolver_vtable cdef grpc_custom_timer_vtable gevent_timer_vtable cdef grpc_custom_poller_vtable gevent_pollset_vtable def init_grpc_gevent(): # Lazily import gevent global gevent_socket global gevent_g global gevent_hub global gevent_event global g_event global g_pool import gevent gevent_g = gevent import gevent.socket gevent_socket = gevent.socket import gevent.hub gevent_hub = gevent.hub import gevent.event gevent_event = gevent.event import gevent.pool g_event = gevent.event.Event() g_pool = gevent.pool.Group() def cb_func(cb, args): _spawn_greenlet(cb, *args) set_async_callback_func(cb_func) gevent_resolver_vtable.resolve = socket_resolve gevent_resolver_vtable.resolve_async = socket_resolve_async gevent_socket_vtable.init = socket_init gevent_socket_vtable.connect = socket_connect gevent_socket_vtable.destroy = socket_destroy gevent_socket_vtable.shutdown = socket_shutdown gevent_socket_vtable.close = socket_close gevent_socket_vtable.write = socket_write gevent_socket_vtable.read = socket_read gevent_socket_vtable.getpeername = socket_getpeername gevent_socket_vtable.getsockname = socket_getsockname gevent_socket_vtable.bind = socket_bind gevent_socket_vtable.listen = socket_listen gevent_socket_vtable.accept = socket_accept gevent_timer_vtable.start = timer_start gevent_timer_vtable.stop = timer_stop gevent_pollset_vtable.init = init_loop gevent_pollset_vtable.poll = run_loop gevent_pollset_vtable.kick = kick_loop gevent_pollset_vtable.shutdown = destroy_loop grpc_custom_iomgr_init(&gevent_socket_vtable, &gevent_resolver_vtable, &gevent_timer_vtable, &gevent_pollset_vtable)