diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 10:42:19 -0700 |
commit | 45724b35e411fef7c5da66a74c78428c11d56843 (patch) | |
tree | 9264034aca675c89444e02f72ef58e67d7043604 /src/core/iomgr/tcp_posix.c | |
parent | 298751c1195523ef6228595043b583c3a6270e08 (diff) |
indent pass to get logical source lines on one physical line
Diffstat (limited to 'src/core/iomgr/tcp_posix.c')
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 582 |
1 files changed, 324 insertions, 258 deletions
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 942ab8b71d..da65a90eb4 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -69,12 +69,13 @@ typedef size_t msg_iovlen_type; int grpc_tcp_trace = 0; -typedef struct { +typedef struct +{ grpc_endpoint base; grpc_fd *em_fd; int fd; int finished_edge; - msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ + msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ size_t slice_size; gpr_refcount refcount; @@ -94,20 +95,24 @@ typedef struct { char *peer_string; } grpc_tcp; -static void tcp_handle_read(void *arg /* grpc_tcp */, int success, - grpc_closure_list *closure_list); -static void tcp_handle_write(void *arg /* grpc_tcp */, int success, - grpc_closure_list *closure_list); +static void tcp_handle_read (void *arg /* grpc_tcp */ , int success, + grpc_closure_list * closure_list); +static void tcp_handle_write (void *arg /* grpc_tcp */ , int success, + grpc_closure_list * closure_list); -static void tcp_shutdown(grpc_endpoint *ep, grpc_closure_list *closure_list) { - grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_fd_shutdown(tcp->em_fd, closure_list); +static void +tcp_shutdown (grpc_endpoint * ep, grpc_closure_list * closure_list) +{ + grpc_tcp *tcp = (grpc_tcp *) ep; + grpc_fd_shutdown (tcp->em_fd, closure_list); } -static void tcp_free(grpc_tcp *tcp, grpc_closure_list *closure_list) { - grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan", closure_list); - gpr_free(tcp->peer_string); - gpr_free(tcp); +static void +tcp_free (grpc_tcp * tcp, grpc_closure_list * closure_list) +{ + grpc_fd_orphan (tcp->em_fd, NULL, "tcp_unref_orphan", closure_list); + gpr_free (tcp->peer_string); + gpr_free (tcp); } /*#define GRPC_TCP_REFCOUNT_DEBUG*/ @@ -115,78 +120,94 @@ static void tcp_free(grpc_tcp *tcp, grpc_closure_list *closure_list) { #define TCP_UNREF(tcp, reason, cl) \ tcp_unref((tcp), (cl), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, grpc_closure_list *closure_list, - const char *reason, const char *file, int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, - reason, tcp->refcount.count, tcp->refcount.count - 1); - if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp, closure_list); - } +static void +tcp_unref (grpc_tcp * tcp, grpc_closure_list * closure_list, const char *reason, const char *file, int line) +{ + gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); + if (gpr_unref (&tcp->refcount)) + { + tcp_free (tcp, closure_list); + } } -static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, - int line) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp, - reason, tcp->refcount.count, tcp->refcount.count + 1); - gpr_ref(&tcp->refcount); +static void +tcp_ref (grpc_tcp * tcp, const char *reason, const char *file, int line) +{ + gpr_log (file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count + 1); + gpr_ref (&tcp->refcount); } #else #define TCP_UNREF(tcp, reason, cl) tcp_unref((tcp), (cl)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp, grpc_closure_list *closure_list) { - if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp, closure_list); - } +static void +tcp_unref (grpc_tcp * tcp, grpc_closure_list * closure_list) +{ + if (gpr_unref (&tcp->refcount)) + { + tcp_free (tcp, closure_list); + } } -static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +static void +tcp_ref (grpc_tcp * tcp) +{ + gpr_ref (&tcp->refcount); +} #endif -static void tcp_destroy(grpc_endpoint *ep, grpc_closure_list *closure_list) { - grpc_tcp *tcp = (grpc_tcp *)ep; - TCP_UNREF(tcp, "destroy", closure_list); +static void +tcp_destroy (grpc_endpoint * ep, grpc_closure_list * closure_list) +{ + grpc_tcp *tcp = (grpc_tcp *) ep; + TCP_UNREF (tcp, "destroy", closure_list); } -static void call_read_cb(grpc_tcp *tcp, int success, - grpc_closure_list *closure_list) { +static void +call_read_cb (grpc_tcp * tcp, int success, grpc_closure_list * closure_list) +{ grpc_closure *cb = tcp->read_cb; - if (grpc_tcp_trace) { - size_t i; - gpr_log(GPR_DEBUG, "read: success=%d", success); - for (i = 0; i < tcp->incoming_buffer->count; i++) { - char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump); - gpr_free(dump); + if (grpc_tcp_trace) + { + size_t i; + gpr_log (GPR_DEBUG, "read: success=%d", success); + for (i = 0; i < tcp->incoming_buffer->count; i++) + { + char *dump = gpr_dump_slice (tcp->incoming_buffer->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log (GPR_DEBUG, "READ %p: %s", tcp, dump); + gpr_free (dump); + } } - } tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - cb->cb(cb->cb_arg, success, closure_list); + cb->cb (cb->cb_arg, success, closure_list); } #define MAX_READ_IOVEC 4 -static void tcp_continue_read(grpc_tcp *tcp, grpc_closure_list *closure_list) { +static void +tcp_continue_read (grpc_tcp * tcp, grpc_closure_list * closure_list) +{ struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; size_t i; - GPR_ASSERT(!tcp->finished_edge); - GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC); - GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); - GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); + GPR_ASSERT (!tcp->finished_edge); + GPR_ASSERT (tcp->iov_size <= MAX_READ_IOVEC); + GPR_ASSERT (tcp->incoming_buffer->count <= MAX_READ_IOVEC); + GRPC_TIMER_BEGIN (GRPC_PTAG_HANDLE_READ, 0); - while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { - gpr_slice_buffer_add_indexed(tcp->incoming_buffer, - gpr_slice_malloc(tcp->slice_size)); - } - for (i = 0; i < tcp->incoming_buffer->count; i++) { - iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); - iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); - } + while (tcp->incoming_buffer->count < (size_t) tcp->iov_size) + { + gpr_slice_buffer_add_indexed (tcp->incoming_buffer, gpr_slice_malloc (tcp->slice_size)); + } + for (i = 0; i < tcp->incoming_buffer->count; i++) + { + iov[i].iov_base = GPR_SLICE_START_PTR (tcp->incoming_buffer->slices[i]); + iov[i].iov_len = GPR_SLICE_LENGTH (tcp->incoming_buffer->slices[i]); + } msg.msg_name = NULL; msg.msg_namelen = 0; @@ -196,83 +217,107 @@ static void tcp_continue_read(grpc_tcp *tcp, grpc_closure_list *closure_list) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0); - do { - read_bytes = recvmsg(tcp->fd, &msg, 0); - } while (read_bytes < 0 && errno == EINTR); - GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0); - - if (read_bytes < 0) { - /* NB: After calling call_read_cb a parallel call of the read handler may - * be running. */ - if (errno == EAGAIN) { - if (tcp->iov_size > 1) { - tcp->iov_size /= 2; - } - /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, closure_list); - } else { - /* TODO(klempner): Log interesting errors */ - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0, closure_list); - TCP_UNREF(tcp, "read", closure_list); + GRPC_TIMER_BEGIN (GRPC_PTAG_RECVMSG, 0); + do + { + read_bytes = recvmsg (tcp->fd, &msg, 0); + } + while (read_bytes < 0 && errno == EINTR); + GRPC_TIMER_END (GRPC_PTAG_RECVMSG, 0); + + if (read_bytes < 0) + { + /* NB: After calling call_read_cb a parallel call of the read handler may + * be running. */ + if (errno == EAGAIN) + { + if (tcp->iov_size > 1) + { + tcp->iov_size /= 2; + } + /* We've consumed the edge, request a new one */ + grpc_fd_notify_on_read (tcp->em_fd, &tcp->read_closure, closure_list); + } + else + { + /* TODO(klempner): Log interesting errors */ + gpr_slice_buffer_reset_and_unref (tcp->incoming_buffer); + call_read_cb (tcp, 0, closure_list); + TCP_UNREF (tcp, "read", closure_list); + } + } + else if (read_bytes == 0) + { + /* 0 read size ==> end of stream */ + gpr_slice_buffer_reset_and_unref (tcp->incoming_buffer); + call_read_cb (tcp, 0, closure_list); + TCP_UNREF (tcp, "read", closure_list); } - } else if (read_bytes == 0) { - /* 0 read size ==> end of stream */ - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0, closure_list); - TCP_UNREF(tcp, "read", closure_list); - } else { - GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); - if ((size_t)read_bytes < tcp->incoming_buffer->length) { - gpr_slice_buffer_trim_end( - tcp->incoming_buffer, - tcp->incoming_buffer->length - (size_t)read_bytes); - } else if (tcp->iov_size < MAX_READ_IOVEC) { - ++tcp->iov_size; + else + { + GPR_ASSERT ((size_t) read_bytes <= tcp->incoming_buffer->length); + if ((size_t) read_bytes < tcp->incoming_buffer->length) + { + gpr_slice_buffer_trim_end (tcp->incoming_buffer, tcp->incoming_buffer->length - (size_t) read_bytes); + } + else if (tcp->iov_size < MAX_READ_IOVEC) + { + ++tcp->iov_size; + } + GPR_ASSERT ((size_t) read_bytes == tcp->incoming_buffer->length); + call_read_cb (tcp, 1, closure_list); + TCP_UNREF (tcp, "read", closure_list); } - GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(tcp, 1, closure_list); - TCP_UNREF(tcp, "read", closure_list); - } - GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); + GRPC_TIMER_END (GRPC_PTAG_HANDLE_READ, 0); } -static void tcp_handle_read(void *arg /* grpc_tcp */, int success, - grpc_closure_list *closure_list) { - grpc_tcp *tcp = (grpc_tcp *)arg; - GPR_ASSERT(!tcp->finished_edge); - - if (!success) { - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0, closure_list); - TCP_UNREF(tcp, "read", closure_list); - } else { - tcp_continue_read(tcp, closure_list); - } +static void +tcp_handle_read (void *arg /* grpc_tcp */ , int success, + grpc_closure_list * closure_list) +{ + grpc_tcp *tcp = (grpc_tcp *) arg; + GPR_ASSERT (!tcp->finished_edge); + + if (!success) + { + gpr_slice_buffer_reset_and_unref (tcp->incoming_buffer); + call_read_cb (tcp, 0, closure_list); + TCP_UNREF (tcp, "read", closure_list); + } + else + { + tcp_continue_read (tcp, closure_list); + } } -static void tcp_read(grpc_endpoint *ep, gpr_slice_buffer *incoming_buffer, - grpc_closure *cb, grpc_closure_list *closure_list) { - grpc_tcp *tcp = (grpc_tcp *)ep; - GPR_ASSERT(tcp->read_cb == NULL); +static void +tcp_read (grpc_endpoint * ep, gpr_slice_buffer * incoming_buffer, grpc_closure * cb, grpc_closure_list * closure_list) +{ + grpc_tcp *tcp = (grpc_tcp *) ep; + GPR_ASSERT (tcp->read_cb == NULL); tcp->read_cb = cb; tcp->incoming_buffer = incoming_buffer; - gpr_slice_buffer_reset_and_unref(incoming_buffer); - TCP_REF(tcp, "read"); - if (tcp->finished_edge) { - tcp->finished_edge = 0; - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, closure_list); - } else { - grpc_closure_list_add(closure_list, &tcp->read_closure, 1); - } + gpr_slice_buffer_reset_and_unref (incoming_buffer); + TCP_REF (tcp, "read"); + if (tcp->finished_edge) + { + tcp->finished_edge = 0; + grpc_fd_notify_on_read (tcp->em_fd, &tcp->read_closure, closure_list); + } + else + { + grpc_closure_list_add (closure_list, &tcp->read_closure, 1); + } } -typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result; +typedef enum +{ FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result; #define MAX_WRITE_IOVEC 16 -static flush_result tcp_flush(grpc_tcp *tcp) { +static flush_result +tcp_flush (grpc_tcp * tcp) +{ struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -282,169 +327,190 @@ static flush_result tcp_flush(grpc_tcp *tcp) { size_t unwind_slice_idx; size_t unwind_byte_idx; - for (;;) { - sending_length = 0; - unwind_slice_idx = tcp->outgoing_slice_idx; - unwind_byte_idx = tcp->outgoing_byte_idx; - for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && - iov_size != MAX_WRITE_IOVEC; - iov_size++) { - iov[iov_size].iov_base = - GPR_SLICE_START_PTR( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + - tcp->outgoing_byte_idx; - iov[iov_size].iov_len = - GPR_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - - tcp->outgoing_byte_idx; - sending_length += iov[iov_size].iov_len; - tcp->outgoing_slice_idx++; - tcp->outgoing_byte_idx = 0; - } - GPR_ASSERT(iov_size > 0); - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = iov_size; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0); - do { - /* TODO(klempner): Cork if this is a partial write */ - sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); - } while (sent_length < 0 && errno == EINTR); - GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0); - - if (sent_length < 0) { - if (errno == EAGAIN) { - tcp->outgoing_slice_idx = unwind_slice_idx; - tcp->outgoing_byte_idx = unwind_byte_idx; - return FLUSH_PENDING; - } else { - /* TODO(klempner): Log some of these */ - return FLUSH_ERROR; - } - } - - GPR_ASSERT(tcp->outgoing_byte_idx == 0); - trailing = sending_length - (size_t)sent_length; - while (trailing > 0) { - size_t slice_length; - - tcp->outgoing_slice_idx--; - slice_length = GPR_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); - if (slice_length > trailing) { - tcp->outgoing_byte_idx = slice_length - trailing; - break; - } else { - trailing -= slice_length; - } - } - - if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { - return FLUSH_DONE; - } - }; + for (;;) + { + sending_length = 0; + unwind_slice_idx = tcp->outgoing_slice_idx; + unwind_byte_idx = tcp->outgoing_byte_idx; + for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && iov_size != MAX_WRITE_IOVEC; iov_size++) + { + iov[iov_size].iov_base = GPR_SLICE_START_PTR (tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + tcp->outgoing_byte_idx; + iov[iov_size].iov_len = GPR_SLICE_LENGTH (tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - tcp->outgoing_byte_idx; + sending_length += iov[iov_size].iov_len; + tcp->outgoing_slice_idx++; + tcp->outgoing_byte_idx = 0; + } + GPR_ASSERT (iov_size > 0); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iov_size; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + GRPC_TIMER_BEGIN (GRPC_PTAG_SENDMSG, 0); + do + { + /* TODO(klempner): Cork if this is a partial write */ + sent_length = sendmsg (tcp->fd, &msg, SENDMSG_FLAGS); + } + while (sent_length < 0 && errno == EINTR); + GRPC_TIMER_END (GRPC_PTAG_SENDMSG, 0); + + if (sent_length < 0) + { + if (errno == EAGAIN) + { + tcp->outgoing_slice_idx = unwind_slice_idx; + tcp->outgoing_byte_idx = unwind_byte_idx; + return FLUSH_PENDING; + } + else + { + /* TODO(klempner): Log some of these */ + return FLUSH_ERROR; + } + } + + GPR_ASSERT (tcp->outgoing_byte_idx == 0); + trailing = sending_length - (size_t) sent_length; + while (trailing > 0) + { + size_t slice_length; + + tcp->outgoing_slice_idx--; + slice_length = GPR_SLICE_LENGTH (tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); + if (slice_length > trailing) + { + tcp->outgoing_byte_idx = slice_length - trailing; + break; + } + else + { + trailing -= slice_length; + } + } + + if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) + { + return FLUSH_DONE; + } + }; } -static void tcp_handle_write(void *arg /* grpc_tcp */, int success, - grpc_closure_list *closure_list) { - grpc_tcp *tcp = (grpc_tcp *)arg; +static void +tcp_handle_write (void *arg /* grpc_tcp */ , int success, + grpc_closure_list * closure_list) +{ + grpc_tcp *tcp = (grpc_tcp *) arg; flush_result status; grpc_closure *cb; - if (!success) { - cb = tcp->write_cb; - tcp->write_cb = NULL; - cb->cb(cb->cb_arg, 0, closure_list); - TCP_UNREF(tcp, "write", closure_list); - return; - } - - GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); - status = tcp_flush(tcp); - if (status == FLUSH_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, closure_list); - } else { - cb = tcp->write_cb; - tcp->write_cb = NULL; - cb->cb(cb->cb_arg, status == FLUSH_DONE, closure_list); - TCP_UNREF(tcp, "write", closure_list); - } - GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); + if (!success) + { + cb = tcp->write_cb; + tcp->write_cb = NULL; + cb->cb (cb->cb_arg, 0, closure_list); + TCP_UNREF (tcp, "write", closure_list); + return; + } + + GRPC_TIMER_BEGIN (GRPC_PTAG_TCP_CB_WRITE, 0); + status = tcp_flush (tcp); + if (status == FLUSH_PENDING) + { + grpc_fd_notify_on_write (tcp->em_fd, &tcp->write_closure, closure_list); + } + else + { + cb = tcp->write_cb; + tcp->write_cb = NULL; + cb->cb (cb->cb_arg, status == FLUSH_DONE, closure_list); + TCP_UNREF (tcp, "write", closure_list); + } + GRPC_TIMER_END (GRPC_PTAG_TCP_CB_WRITE, 0); } -static void tcp_write(grpc_endpoint *ep, gpr_slice_buffer *buf, - grpc_closure *cb, grpc_closure_list *closure_list) { - grpc_tcp *tcp = (grpc_tcp *)ep; +static void +tcp_write (grpc_endpoint * ep, gpr_slice_buffer * buf, grpc_closure * cb, grpc_closure_list * closure_list) +{ + grpc_tcp *tcp = (grpc_tcp *) ep; flush_result status; - if (grpc_tcp_trace) { - size_t i; + if (grpc_tcp_trace) + { + size_t i; - for (i = 0; i < buf->count; i++) { - char *data = - gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data); - gpr_free(data); + for (i = 0; i < buf->count; i++) + { + char *data = gpr_dump_slice (buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log (GPR_DEBUG, "WRITE %p: %s", tcp, data); + gpr_free (data); + } } - } - GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0); - GPR_ASSERT(tcp->write_cb == NULL); + GRPC_TIMER_BEGIN (GRPC_PTAG_TCP_WRITE, 0); + GPR_ASSERT (tcp->write_cb == NULL); - if (buf->length == 0) { - GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); - grpc_closure_list_add(closure_list, cb, 1); - return; - } + if (buf->length == 0) + { + GRPC_TIMER_END (GRPC_PTAG_TCP_WRITE, 0); + grpc_closure_list_add (closure_list, cb, 1); + return; + } tcp->outgoing_buffer = buf; tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; - status = tcp_flush(tcp); - if (status == FLUSH_PENDING) { - TCP_REF(tcp, "write"); - tcp->write_cb = cb; - grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, closure_list); - } else { - grpc_closure_list_add(closure_list, cb, status == FLUSH_DONE); - } + status = tcp_flush (tcp); + if (status == FLUSH_PENDING) + { + TCP_REF (tcp, "write"); + tcp->write_cb = cb; + grpc_fd_notify_on_write (tcp->em_fd, &tcp->write_closure, closure_list); + } + else + { + grpc_closure_list_add (closure_list, cb, status == FLUSH_DONE); + } - GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); + GRPC_TIMER_END (GRPC_PTAG_TCP_WRITE, 0); } -static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset, - grpc_closure_list *closure_list) { - grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_add_fd(pollset, tcp->em_fd, closure_list); +static void +tcp_add_to_pollset (grpc_endpoint * ep, grpc_pollset * pollset, grpc_closure_list * closure_list) +{ + grpc_tcp *tcp = (grpc_tcp *) ep; + grpc_pollset_add_fd (pollset, tcp->em_fd, closure_list); } -static void tcp_add_to_pollset_set(grpc_endpoint *ep, - grpc_pollset_set *pollset_set, - grpc_closure_list *closure_list) { - grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_set_add_fd(pollset_set, tcp->em_fd, closure_list); +static void +tcp_add_to_pollset_set (grpc_endpoint * ep, grpc_pollset_set * pollset_set, grpc_closure_list * closure_list) +{ + grpc_tcp *tcp = (grpc_tcp *) ep; + grpc_pollset_set_add_fd (pollset_set, tcp->em_fd, closure_list); } -static char *tcp_get_peer(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *)ep; - return gpr_strdup(tcp->peer_string); +static char * +tcp_get_peer (grpc_endpoint * ep) +{ + grpc_tcp *tcp = (grpc_tcp *) ep; + return gpr_strdup (tcp->peer_string); } static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_peer}; - -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, - const char *peer_string) { - grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_shutdown, tcp_destroy, tcp_get_peer +}; + +grpc_endpoint * +grpc_tcp_create (grpc_fd * em_fd, size_t slice_size, const char *peer_string) +{ + grpc_tcp *tcp = (grpc_tcp *) gpr_malloc (sizeof (grpc_tcp)); tcp->base.vtable = &vtable; - tcp->peer_string = gpr_strdup(peer_string); + tcp->peer_string = gpr_strdup (peer_string); tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; @@ -453,7 +519,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->iov_size = 1; tcp->finished_edge = 1; /* paired with unref in grpc_tcp_destroy */ - gpr_ref_init(&tcp->refcount, 1); + gpr_ref_init (&tcp->refcount, 1); tcp->em_fd = em_fd; tcp->read_closure.cb = tcp_handle_read; tcp->read_closure.cb_arg = tcp; |