aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/tcp_client_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/tcp_client_posix.c')
-rw-r--r--src/core/iomgr/tcp_client_posix.c76
1 files changed, 55 insertions, 21 deletions
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 88b599b582..d675c2dcec 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -38,7 +38,9 @@
#include <string.h>
#include <unistd.h>
-#include "src/core/iomgr/iomgr_libevent.h"
+#include "src/core/iomgr/alarm.h"
+#include "src/core/iomgr/iomgr_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
@@ -49,8 +51,11 @@
typedef struct {
void (*cb)(void *arg, grpc_endpoint *tcp);
void *cb_arg;
+ gpr_mu mu;
grpc_fd *fd;
gpr_timespec deadline;
+ grpc_alarm alarm;
+ int refs;
} async_connect;
static int prepare_socket(int fd) {
@@ -74,21 +79,42 @@ error:
return 0;
}
-static void on_writable(void *acp, grpc_iomgr_cb_status status) {
+static void on_alarm(void *acp, int success) {
+ int done;
+ async_connect *ac = acp;
+ gpr_mu_lock(&ac->mu);
+ if (ac->fd != NULL && success) {
+ grpc_fd_shutdown(ac->fd);
+ }
+ done = (--ac->refs == 0);
+ gpr_mu_unlock(&ac->mu);
+ if (done) {
+ gpr_mu_destroy(&ac->mu);
+ gpr_free(ac);
+ }
+}
+
+static void on_writable(void *acp, int success) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
int err;
- int fd = grpc_fd_get(ac->fd);
+ int fd = ac->fd->fd;
+ int done;
+ grpc_endpoint *ep = NULL;
+ void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
+ void *cb_arg = ac->cb_arg;
+
+ grpc_alarm_cancel(&ac->alarm);
- if (status == GRPC_CALLBACK_SUCCESS) {
+ if (success) {
do {
so_error_size = sizeof(so_error);
err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno));
- goto error;
+ goto finish;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
/* We will get one of these errors if we have run out of
@@ -106,7 +132,7 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
- grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac);
return;
} else {
switch (so_error) {
@@ -117,27 +143,31 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) {
gpr_log(GPR_ERROR, "socket error: %d", so_error);
break;
}
- goto error;
+ goto finish;
}
} else {
- goto great_success;
+ ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ goto finish;
}
} else {
- gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status);
- goto error;
+ gpr_log(GPR_ERROR, "on_writable failed during connect");
+ goto finish;
}
abort();
-error:
- ac->cb(ac->cb_arg, NULL);
- grpc_fd_destroy(ac->fd, NULL, NULL);
- gpr_free(ac);
- return;
-
-great_success:
- ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
- gpr_free(ac);
+finish:
+ gpr_mu_lock(&ac->mu);
+ if (!ep) {
+ grpc_fd_orphan(ac->fd, NULL, NULL);
+ }
+ done = (--ac->refs == 0);
+ gpr_mu_unlock(&ac->mu);
+ if (done) {
+ gpr_mu_destroy(&ac->mu);
+ gpr_free(ac);
+ }
+ cb(cb_arg, ep);
}
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
@@ -176,6 +206,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
} while (err < 0 && errno == EINTR);
if (err >= 0) {
+ gpr_log(GPR_DEBUG, "instant connect");
cb(arg,
grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
return;
@@ -191,7 +222,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac = gpr_malloc(sizeof(async_connect));
ac->cb = cb;
ac->cb_arg = arg;
- ac->deadline = deadline;
ac->fd = grpc_fd_create(fd);
- grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline);
+ gpr_mu_init(&ac->mu);
+ ac->refs = 2;
+
+ grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac);
}