aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/tcp_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/tcp_posix.c')
-rw-r--r--src/core/iomgr/tcp_posix.c147
1 files changed, 80 insertions, 67 deletions
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 68f469c368..54ebad7dbc 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -78,6 +78,9 @@ typedef struct {
size_t slice_size;
gpr_refcount refcount;
+ /* garbage after the last read */
+ gpr_slice_buffer last_read_buffer;
+
gpr_slice_buffer *incoming_buffer;
gpr_slice_buffer *outgoing_buffer;
/** slice within outgoing_buffer to write next */
@@ -85,39 +88,43 @@ typedef struct {
/** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
size_t outgoing_byte_idx;
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
+ grpc_closure *read_cb;
+ grpc_closure *write_cb;
- grpc_iomgr_closure read_closure;
- grpc_iomgr_closure write_closure;
+ grpc_closure read_closure;
+ grpc_closure write_closure;
char *peer_string;
} grpc_tcp;
-static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ int success);
+static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ int success);
-static void tcp_shutdown(grpc_endpoint *ep) {
+static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_fd_shutdown(tcp->em_fd);
+ grpc_fd_shutdown(exec_ctx, tcp->em_fd);
}
-static void tcp_free(grpc_tcp *tcp) {
- grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
+static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan");
+ gpr_slice_buffer_destroy(&tcp->last_read_buffer);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
/*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_UNREF(cl, tcp, reason) \
+ tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
- int line) {
+static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
+ const char *reason, const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(exec_ctx, tcp);
}
}
@@ -128,24 +135,24 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
gpr_ref(&tcp->refcount);
}
#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp *tcp) {
+static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(exec_ctx, tcp);
}
}
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
-static void tcp_destroy(grpc_endpoint *ep) {
+static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- TCP_UNREF(tcp, "destroy");
+ TCP_UNREF(exec_ctx, tcp, "destroy");
}
-static void call_read_cb(grpc_tcp *tcp, int success) {
- grpc_iomgr_closure *cb = tcp->read_cb;
+static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
+ grpc_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
@@ -160,11 +167,11 @@ static void call_read_cb(grpc_tcp *tcp, int success) {
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- cb->cb(cb->cb_arg, success);
+ cb->cb(exec_ctx, cb->cb_arg, success);
}
#define MAX_READ_IOVEC 4
-static void tcp_continue_read(grpc_tcp *tcp) {
+static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
@@ -206,69 +213,71 @@ static void tcp_continue_read(grpc_tcp *tcp) {
tcp->iov_size /= 2;
}
/* We've consumed the edge, request a new one */
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
/* TODO(klempner): Log interesting errors */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ call_read_cb(exec_ctx, tcp, 0);
+ TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ call_read_cb(exec_ctx, tcp, 0);
+ TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
gpr_slice_buffer_trim_end(
tcp->incoming_buffer,
- tcp->incoming_buffer->length - (size_t)read_bytes);
+ tcp->incoming_buffer->length - (size_t)read_bytes,
+ &tcp->last_read_buffer);
} else if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
- call_read_cb(tcp, 1);
- TCP_UNREF(tcp, "read");
+ call_read_cb(exec_ctx, tcp, 1);
+ TCP_UNREF(exec_ctx, tcp, "read");
}
GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
}
-static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
if (!success) {
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ call_read_cb(exec_ctx, tcp, 0);
+ TCP_UNREF(exec_ctx, tcp, "read");
} else {
- tcp_continue_read(tcp);
+ tcp_continue_read(exec_ctx, tcp);
}
}
-static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
- gpr_slice_buffer *incoming_buffer,
- grpc_iomgr_closure *cb) {
+static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->incoming_buffer = incoming_buffer;
gpr_slice_buffer_reset_and_unref(incoming_buffer);
+ gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = 0;
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1);
}
- /* TODO(ctiller): immediate return */
- return GRPC_ENDPOINT_PENDING;
}
+typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
+
#define MAX_WRITE_IOVEC 16
-static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
+static flush_result tcp_flush(grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@@ -318,10 +327,10 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
if (errno == EAGAIN) {
tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
- return GRPC_ENDPOINT_PENDING;
+ return FLUSH_PENDING;
} else {
/* TODO(klempner): Log some of these */
- return GRPC_ENDPOINT_ERROR;
+ return FLUSH_ERROR;
}
}
@@ -342,42 +351,42 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
}
if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
- return GRPC_ENDPOINT_DONE;
+ return FLUSH_DONE;
}
};
}
-static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
+ int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
- grpc_endpoint_op_status status;
- grpc_iomgr_closure *cb;
+ flush_result status;
+ grpc_closure *cb;
if (!success) {
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb->cb(cb->cb_arg, 0);
- TCP_UNREF(tcp, "write");
+ cb->cb(exec_ctx, cb->cb_arg, 0);
+ TCP_UNREF(exec_ctx, tcp, "write");
return;
}
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
status = tcp_flush(tcp);
- if (status == GRPC_ENDPOINT_PENDING) {
- grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
+ if (status == FLUSH_PENDING) {
+ grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
- TCP_UNREF(tcp, "write");
+ cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
+ TCP_UNREF(exec_ctx, tcp, "write");
}
GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
}
-static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
- gpr_slice_buffer *buf,
- grpc_iomgr_closure *cb) {
+static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ gpr_slice_buffer *buf, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_endpoint_op_status status;
+ flush_result status;
if (grpc_tcp_trace) {
size_t i;
@@ -395,32 +404,35 @@ static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
if (buf->length == 0) {
GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
- return GRPC_ENDPOINT_DONE;
+ grpc_exec_ctx_enqueue(exec_ctx, cb, 1);
+ return;
}
tcp->outgoing_buffer = buf;
tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
status = tcp_flush(tcp);
- if (status == GRPC_ENDPOINT_PENDING) {
+ if (status == FLUSH_PENDING) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
- grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
+ grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
+ } else {
+ grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE);
}
GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
- return status;
}
-static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_pollset *pollset) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_pollset_add_fd(pollset, tcp->em_fd);
+ grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
}
-static void tcp_add_to_pollset_set(grpc_endpoint *ep,
+static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset_set) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
+ grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
}
static char *tcp_get_peer(grpc_endpoint *ep) {
@@ -451,6 +463,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
+ gpr_slice_buffer_init(&tcp->last_read_buffer);
return &tcp->base;
}