aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2015-11-30 14:25:01 -0800
committerGravatar yang-g <yangg@google.com>2015-11-30 14:25:01 -0800
commitdc2159309a59a00c3a8c0be295db7af5fcc91566 (patch)
tree9a8fbafd3cd90bdaf95148cf56f199df15ce68a3 /src
parent945836eade7d8f12f6eb84bc209da13ae7c89b38 (diff)
Release fd api
Diffstat (limited to 'src')
-rw-r--r--src/core/iomgr/fd_posix.c15
-rw-r--r--src/core/iomgr/fd_posix.h3
-rw-r--r--src/core/iomgr/tcp_client_posix.c4
-rw-r--r--src/core/iomgr/tcp_posix.c20
-rw-r--r--src/core/iomgr/tcp_posix.h5
-rw-r--r--src/core/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/iomgr/udp_server.c2
-rw-r--r--src/core/iomgr/workqueue_posix.c2
8 files changed, 42 insertions, 11 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 7ff80e6cf8..d7c682a91a 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -207,14 +207,19 @@ static int has_watchers(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- const char *reason) {
+ int release_fd, const char *reason) {
fd->on_done_closure = on_done;
- shutdown(fd->fd, SHUT_RDWR);
+ fd->released = release_fd;
+ if (!fd->released) {
+ shutdown(fd->fd, SHUT_RDWR);
+ }
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
if (!has_watchers(fd)) {
fd->closed = 1;
- close(fd->fd);
+ if (!fd->released) {
+ close(fd->fd);
+ }
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
} else {
wake_all_watchers_locked(fd);
@@ -406,7 +411,9 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
}
if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
fd->closed = 1;
- close(fd->fd);
+ if (!fd->released) {
+ close(fd->fd);
+ }
grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
}
gpr_mu_unlock(&fd->mu);
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index dc917ebbc0..73443f7152 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -62,6 +62,7 @@ struct grpc_fd {
gpr_mu mu;
int shutdown;
int closed;
+ int released;
/* The watcher list.
@@ -111,7 +112,7 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- const char *reason);
+ int release_fd, const char *reason);
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index abd6315ca1..01a88214c7 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -196,7 +196,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) {
finish:
if (fd != NULL) {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- grpc_fd_orphan(exec_ctx, fd, NULL, "tcp_client_orphan");
+ grpc_fd_orphan(exec_ctx, fd, NULL, 0, "tcp_client_orphan");
fd = NULL;
}
done = (--ac->refs == 0);
@@ -265,7 +265,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
- grpc_fd_orphan(exec_ctx, fdobj, NULL, "tcp_client_connect_error");
+ grpc_fd_orphan(exec_ctx, fdobj, NULL, 0, "tcp_client_connect_error");
grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
goto done;
}
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 915553d509..f8d62cf60b 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -74,6 +74,7 @@ typedef struct {
grpc_fd *em_fd;
int fd;
int finished_edge;
+ int fd_released;
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
size_t slice_size;
gpr_refcount refcount;
@@ -90,6 +91,7 @@ typedef struct {
grpc_closure *read_cb;
grpc_closure *write_cb;
+ grpc_closure *release_fd_cb;
grpc_closure read_closure;
grpc_closure write_closure;
@@ -108,7 +110,8 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
}
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
- grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan");
+ grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->fd_released,
+ "tcp_unref_orphan");
gpr_slice_buffer_destroy(&tcp->last_read_buffer);
gpr_free(tcp->peer_string);
gpr_free(tcp);
@@ -452,10 +455,12 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->fd = em_fd->fd;
tcp->read_cb = NULL;
tcp->write_cb = NULL;
+ tcp->release_fd_cb = NULL;
tcp->incoming_buffer = NULL;
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = 1;
+ tcp->fd_released = 0;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
@@ -468,4 +473,17 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
return &tcp->base;
}
+int grpc_tcp_get_fd(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return tcp->fd;
+}
+
+void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_closure *done) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ tcp->fd_released = 1;
+ tcp->release_fd_cb = done;
+ TCP_UNREF(exec_ctx, tcp, "destroy");
+}
+
#endif
diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h
index 40b3ae2679..3819905c53 100644
--- a/src/core/iomgr/tcp_posix.h
+++ b/src/core/iomgr/tcp_posix.h
@@ -56,4 +56,9 @@ extern int grpc_tcp_trace;
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size,
const char *peer_string);
+int grpc_tcp_get_fd(grpc_endpoint *ep);
+
+void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_closure *done);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 0ece77c4e8..8362cfa4b4 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -193,7 +193,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
- grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure,
+ grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, 0,
"tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index 9903e970e6..b568078687 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -179,7 +179,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
}
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
- grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure,
+ grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, 0,
"udp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index c087b887b8..583e63beef 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -115,7 +115,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) {
/* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0;
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
- grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, "destroy");
+ grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, 0, "destroy");
gpr_free(workqueue);
} else {
gpr_mu_lock(&workqueue->mu);