diff options
Diffstat (limited to 'src/core/iomgr/tcp_client_posix.c')
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 76 |
1 files changed, 55 insertions, 21 deletions
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 88b599b582..d675c2dcec 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -38,7 +38,9 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/iomgr_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -49,8 +51,11 @@ typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; + gpr_mu mu; grpc_fd *fd; gpr_timespec deadline; + grpc_alarm alarm; + int refs; } async_connect; static int prepare_socket(int fd) { @@ -74,21 +79,42 @@ error: return 0; } -static void on_writable(void *acp, grpc_iomgr_cb_status status) { +static void on_alarm(void *acp, int success) { + int done; + async_connect *ac = acp; + gpr_mu_lock(&ac->mu); + if (ac->fd != NULL && success) { + grpc_fd_shutdown(ac->fd); + } + done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } +} + +static void on_writable(void *acp, int success) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; int err; - int fd = grpc_fd_get(ac->fd); + int fd = ac->fd->fd; + int done; + grpc_endpoint *ep = NULL; + void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; + void *cb_arg = ac->cb_arg; + + grpc_alarm_cancel(&ac->alarm); - if (status == GRPC_CALLBACK_SUCCESS) { + if (success) { do { so_error_size = sizeof(so_error); err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); - goto error; + goto finish; } else if (so_error != 0) { if (so_error == ENOBUFS) { /* We will get one of these errors if we have run out of @@ -106,7 +132,7 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) { opened too many network connections. The "easy" fix: don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); - grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline); + grpc_fd_notify_on_write(ac->fd, on_writable, ac); return; } else { switch (so_error) { @@ -117,27 +143,31 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) { gpr_log(GPR_ERROR, "socket error: %d", so_error); break; } - goto error; + goto finish; } } else { - goto great_success; + ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + goto finish; } } else { - gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status); - goto error; + gpr_log(GPR_ERROR, "on_writable failed during connect"); + goto finish; } abort(); -error: - ac->cb(ac->cb_arg, NULL); - grpc_fd_destroy(ac->fd, NULL, NULL); - gpr_free(ac); - return; - -great_success: - ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); - gpr_free(ac); +finish: + gpr_mu_lock(&ac->mu); + if (!ep) { + grpc_fd_orphan(ac->fd, NULL, NULL); + } + done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } + cb(cb_arg, ep); } void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), @@ -176,6 +206,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), } while (err < 0 && errno == EINTR); if (err >= 0) { + gpr_log(GPR_DEBUG, "instant connect"); cb(arg, grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); return; @@ -191,7 +222,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac = gpr_malloc(sizeof(async_connect)); ac->cb = cb; ac->cb_arg = arg; - ac->deadline = deadline; ac->fd = grpc_fd_create(fd); - grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline); + gpr_mu_init(&ac->mu); + ac->refs = 2; + + grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); + grpc_fd_notify_on_write(ac->fd, on_writable, ac); } |