aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/tcp_client_posix.c
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2015-01-07 14:03:30 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2015-01-09 17:27:32 -0800
commit58393c20bc29cf4f9e6fe15516125661e1957db7 (patch)
treed1648196ca8201d778e73c7c9e59a8ac90613763 /src/core/iomgr/tcp_client_posix.c
parente4b409364e4c493a66d4b2a6fe897075aa5c174e (diff)
Remove libevent.
Fixed any exposed bugs across the stack. Add a poll() based implementation. Heavily leverages pollset infrastructure to allow small polls to be the norm. Exposes a mechanism to plug in epoll/kqueue for platforms where we have them. Simplify iomgr callbacks to return one bit of success or failure (instead of the multi valued result that was mostly unused previously). This will ease the burden on new implementations, and the previous system provided no real value anyway. Removed timeouts on endpoint read/write routines. This simplifies porting burden by providing a more orthogonal interface, and the functionality can always be replicated when desired by using an alarm combined with endpoint_shutdown. I'm fairly certain we ended up with this interface because it was convenient to do from libevent. Things that need attention still: - adding an fd to a pollset is O(n^2) - but this is probably ok given that we'll not use this for multipolling once platform specific implementations are added. - we rely on the backup poller too often - especially for SSL handshakes and for client connection establishment we should have a better mechanism ([] [] - Linux needs to use epoll for multiple fds, FreeBSD variants (including Darwin) need to use kqueue. ([] [] - Linux needs to use eventfd for poll kicking. ([] Change on 2015/01/07 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83461069
Diffstat (limited to 'src/core/iomgr/tcp_client_posix.c')
-rw-r--r--src/core/iomgr/tcp_client_posix.c76
1 files changed, 55 insertions, 21 deletions
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 88b599b582..d675c2dcec 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -38,7 +38,9 @@
#include <string.h>
#include <unistd.h>
-#include "src/core/iomgr/iomgr_libevent.h"
+#include "src/core/iomgr/alarm.h"
+#include "src/core/iomgr/iomgr_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
@@ -49,8 +51,11 @@
typedef struct {
void (*cb)(void *arg, grpc_endpoint *tcp);
void *cb_arg;
+ gpr_mu mu;
grpc_fd *fd;
gpr_timespec deadline;
+ grpc_alarm alarm;
+ int refs;
} async_connect;
static int prepare_socket(int fd) {
@@ -74,21 +79,42 @@ error:
return 0;
}
-static void on_writable(void *acp, grpc_iomgr_cb_status status) {
+static void on_alarm(void *acp, int success) {
+ int done;
+ async_connect *ac = acp;
+ gpr_mu_lock(&ac->mu);
+ if (ac->fd != NULL && success) {
+ grpc_fd_shutdown(ac->fd);
+ }
+ done = (--ac->refs == 0);
+ gpr_mu_unlock(&ac->mu);
+ if (done) {
+ gpr_mu_destroy(&ac->mu);
+ gpr_free(ac);
+ }
+}
+
+static void on_writable(void *acp, int success) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
int err;
- int fd = grpc_fd_get(ac->fd);
+ int fd = ac->fd->fd;
+ int done;
+ grpc_endpoint *ep = NULL;
+ void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
+ void *cb_arg = ac->cb_arg;
+
+ grpc_alarm_cancel(&ac->alarm);
- if (status == GRPC_CALLBACK_SUCCESS) {
+ if (success) {
do {
so_error_size = sizeof(so_error);
err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno));
- goto error;
+ goto finish;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
/* We will get one of these errors if we have run out of
@@ -106,7 +132,7 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
- grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac);
return;
} else {
switch (so_error) {
@@ -117,27 +143,31 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) {
gpr_log(GPR_ERROR, "socket error: %d", so_error);
break;
}
- goto error;
+ goto finish;
}
} else {
- goto great_success;
+ ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ goto finish;
}
} else {
- gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status);
- goto error;
+ gpr_log(GPR_ERROR, "on_writable failed during connect");
+ goto finish;
}
abort();
-error:
- ac->cb(ac->cb_arg, NULL);
- grpc_fd_destroy(ac->fd, NULL, NULL);
- gpr_free(ac);
- return;
-
-great_success:
- ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
- gpr_free(ac);
+finish:
+ gpr_mu_lock(&ac->mu);
+ if (!ep) {
+ grpc_fd_orphan(ac->fd, NULL, NULL);
+ }
+ done = (--ac->refs == 0);
+ gpr_mu_unlock(&ac->mu);
+ if (done) {
+ gpr_mu_destroy(&ac->mu);
+ gpr_free(ac);
+ }
+ cb(cb_arg, ep);
}
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
@@ -176,6 +206,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
} while (err < 0 && errno == EINTR);
if (err >= 0) {
+ gpr_log(GPR_DEBUG, "instant connect");
cb(arg,
grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
return;
@@ -191,7 +222,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac = gpr_malloc(sizeof(async_connect));
ac->cb = cb;
ac->cb_arg = arg;
- ac->deadline = deadline;
ac->fd = grpc_fd_create(fd);
- grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline);
+ gpr_mu_init(&ac->mu);
+ ac->refs = 2;
+
+ grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac);
}