diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 202 |
1 files changed, 107 insertions, 95 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index b6b7231ee1..d61092c4f2 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -65,7 +65,7 @@ grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); typedef struct { grpc_endpoint base; - grpc_fd *em_fd; + grpc_fd* em_fd; int fd; bool finished_edge; double target_length; @@ -79,44 +79,44 @@ typedef struct { /* garbage after the last read */ grpc_slice_buffer last_read_buffer; - grpc_slice_buffer *incoming_buffer; - grpc_slice_buffer *outgoing_buffer; + grpc_slice_buffer* incoming_buffer; + grpc_slice_buffer* outgoing_buffer; /** slice within outgoing_buffer to write next */ size_t outgoing_slice_idx; /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ size_t outgoing_byte_idx; - grpc_closure *read_cb; - grpc_closure *write_cb; - grpc_closure *release_fd_cb; - int *release_fd; + grpc_closure* read_cb; + grpc_closure* write_cb; + grpc_closure* release_fd_cb; + int* release_fd; grpc_closure read_done_closure; grpc_closure write_done_closure; - char *peer_string; + char* peer_string; - grpc_resource_user *resource_user; + grpc_resource_user* resource_user; grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; typedef struct backup_poller { - gpr_mu *pollset_mu; + gpr_mu* pollset_mu; grpc_closure run_poller; } backup_poller; -#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset *)((b) + 1)) +#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1)) static gpr_atm g_uncovered_notifications_pending; static gpr_atm g_backup_poller; /* backup_poller* */ -static void tcp_handle_read(void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_handle_write(void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_drop_uncovered_then_handle_write(void *arg /* grpc_tcp */, - grpc_error *error); +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */, + grpc_error* error); -static void done_poller(void *bp, grpc_error *error_ignored) { - backup_poller *p = (backup_poller *)bp; +static void done_poller(void* bp, grpc_error* error_ignored) { + backup_poller* p = (backup_poller*)bp; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p); } @@ -124,8 +124,8 @@ static void done_poller(void *bp, grpc_error *error_ignored) { gpr_free(p); } -static void run_poller(void *bp, grpc_error *error_ignored) { - backup_poller *p = (backup_poller *)bp; +static void run_poller(void* bp, grpc_error* error_ignored) { + backup_poller* p = (backup_poller*)bp; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } @@ -160,8 +160,8 @@ static void run_poller(void *bp, grpc_error *error_ignored) { } } -static void drop_uncovered(grpc_tcp *tcp) { - backup_poller *p = (backup_poller *)gpr_atm_acq_load(&g_backup_poller); +static void drop_uncovered(grpc_tcp* tcp) { + backup_poller* p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller); gpr_atm old_count = gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1); if (GRPC_TRACER_ON(grpc_tcp_trace)) { @@ -171,8 +171,8 @@ static void drop_uncovered(grpc_tcp *tcp) { GPR_ASSERT(old_count != 1); } -static void cover_self(grpc_tcp *tcp) { - backup_poller *p; +static void cover_self(grpc_tcp* tcp) { + backup_poller* p; gpr_atm old_count = gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2); if (GRPC_TRACER_ON(grpc_tcp_trace)) { @@ -181,7 +181,7 @@ static void cover_self(grpc_tcp *tcp) { } if (old_count == 0) { GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(); - p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); + p = (backup_poller*)gpr_malloc(sizeof(*p) + grpc_pollset_size()); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); } @@ -192,7 +192,7 @@ static void cover_self(grpc_tcp *tcp) { grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), GRPC_ERROR_NONE); } else { - while ((p = (backup_poller *)gpr_atm_acq_load(&g_backup_poller)) == NULL) { + while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) == NULL) { // spin waiting for backup poller } } @@ -205,7 +205,7 @@ static void cover_self(grpc_tcp *tcp) { } } -static void notify_on_read(grpc_tcp *tcp) { +static void notify_on_read(grpc_tcp* tcp) { if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp); } @@ -214,7 +214,7 @@ static void notify_on_read(grpc_tcp *tcp) { grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_done_closure); } -static void notify_on_write(grpc_tcp *tcp) { +static void notify_on_write(grpc_tcp* tcp) { if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp); } @@ -225,19 +225,19 @@ static void notify_on_write(grpc_tcp *tcp) { grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure); } -static void tcp_drop_uncovered_then_handle_write(void *arg, grpc_error *error) { +static void tcp_drop_uncovered_then_handle_write(void* arg, grpc_error* error) { if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error)); } - drop_uncovered((grpc_tcp *)arg); + drop_uncovered((grpc_tcp*)arg); tcp_handle_write(arg, error); } -static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { +static void add_to_estimate(grpc_tcp* tcp, size_t bytes) { tcp->bytes_read_this_round += (double)bytes; } -static void finish_estimate(grpc_tcp *tcp) { +static void finish_estimate(grpc_tcp* tcp) { /* If we read >80% of the target buffer in one read loop, increase the size of the target buffer to either the amount read, or twice its previous value */ @@ -251,8 +251,8 @@ static void finish_estimate(grpc_tcp *tcp) { tcp->bytes_read_this_round = 0; } -static size_t get_target_read_size(grpc_tcp *tcp) { - grpc_resource_quota *rq = grpc_resource_user_quota(tcp->resource_user); +static size_t get_target_read_size(grpc_tcp* tcp) { + grpc_resource_quota* rq = grpc_resource_user_quota(tcp->resource_user); double pressure = grpc_resource_quota_get_memory_pressure(rq); double target = tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0); @@ -269,23 +269,23 @@ static size_t get_target_read_size(grpc_tcp *tcp) { return sz; } -static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { +static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(tcp->peer_string)); } -static void tcp_handle_read(void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_handle_write(void *arg /* grpc_tcp */, grpc_error *error); +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error); -static void tcp_shutdown(grpc_endpoint *ep, grpc_error *why) { - grpc_tcp *tcp = (grpc_tcp *)ep; +static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) { + grpc_tcp* tcp = (grpc_tcp*)ep; grpc_fd_shutdown(tcp->em_fd, why); grpc_resource_user_shutdown(tcp->resource_user); } -static void tcp_free(grpc_tcp *tcp) { +static void tcp_free(grpc_tcp* tcp) { grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, false /* already_closed */, "tcp_unref_orphan"); grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); @@ -297,7 +297,7 @@ static void tcp_free(grpc_tcp *tcp) { #ifndef NDEBUG #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, +static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file, int line) { if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); @@ -310,7 +310,7 @@ static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, } } -static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, +static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file, int line) { if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); @@ -323,33 +323,33 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, #else #define TCP_UNREF(tcp, reason) tcp_unref((tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp) { +static void tcp_unref(grpc_tcp* tcp) { if (gpr_unref(&tcp->refcount)) { tcp_free(tcp); } } -static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); } #endif -static void tcp_destroy(grpc_endpoint *ep) { +static void tcp_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); - grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_tcp* tcp = (grpc_tcp*)ep; grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); TCP_UNREF(tcp, "destroy"); } -static void call_read_cb(grpc_tcp *tcp, grpc_error *error) { - grpc_closure *cb = tcp->read_cb; +static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { + grpc_closure* cb = tcp->read_cb; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; - const char *str = grpc_error_string(error); + const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "read: error=%s", str); for (i = 0; i < tcp->incoming_buffer->count; i++) { - char *dump = grpc_dump_slice(tcp->incoming_buffer->slices[i], + char* dump = grpc_dump_slice(tcp->incoming_buffer->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); gpr_free(dump); @@ -362,7 +362,7 @@ static void call_read_cb(grpc_tcp *tcp, grpc_error *error) { } #define MAX_READ_IOVEC 4 -static void tcp_do_read(grpc_tcp *tcp) { +static void tcp_do_read(grpc_tcp* tcp) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; @@ -433,8 +433,8 @@ static void tcp_do_read(grpc_tcp *tcp) { GPR_TIMER_END("tcp_continue_read", 0); } -static void tcp_read_allocation_done(void *tcpp, grpc_error *error) { - grpc_tcp *tcp = (grpc_tcp *)tcpp; +static void tcp_read_allocation_done(void* tcpp, grpc_error* error) { + grpc_tcp* tcp = (grpc_tcp*)tcpp; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, grpc_error_string(error)); @@ -449,7 +449,7 @@ static void tcp_read_allocation_done(void *tcpp, grpc_error *error) { } } -static void tcp_continue_read(grpc_tcp *tcp) { +static void tcp_continue_read(grpc_tcp* tcp) { size_t target_read_size = get_target_read_size(tcp); if (tcp->incoming_buffer->length < target_read_size && tcp->incoming_buffer->count < MAX_READ_IOVEC) { @@ -466,8 +466,8 @@ static void tcp_continue_read(grpc_tcp *tcp) { } } -static void tcp_handle_read(void *arg /* grpc_tcp */, grpc_error *error) { - grpc_tcp *tcp = (grpc_tcp *)arg; +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) { + grpc_tcp* tcp = (grpc_tcp*)arg; GPR_ASSERT(!tcp->finished_edge); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); @@ -483,9 +483,9 @@ static void tcp_handle_read(void *arg /* grpc_tcp */, grpc_error *error) { } } -static void tcp_read(grpc_endpoint *ep, grpc_slice_buffer *incoming_buffer, - grpc_closure *cb) { - grpc_tcp *tcp = (grpc_tcp *)ep; +static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, + grpc_closure* cb) { + grpc_tcp* tcp = (grpc_tcp*)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->incoming_buffer = incoming_buffer; @@ -502,7 +502,7 @@ static void tcp_read(grpc_endpoint *ep, grpc_slice_buffer *incoming_buffer, /* returns true if done, false if pending; if returning true, *error is set */ #define MAX_WRITE_IOVEC 1000 -static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { +static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -591,9 +591,9 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { }; } -static void tcp_handle_write(void *arg /* grpc_tcp */, grpc_error *error) { - grpc_tcp *tcp = (grpc_tcp *)arg; - grpc_closure *cb; +static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { + grpc_tcp* tcp = (grpc_tcp*)arg; + grpc_closure* cb; if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; @@ -612,7 +612,7 @@ static void tcp_handle_write(void *arg /* grpc_tcp */, grpc_error *error) { cb = tcp->write_cb; tcp->write_cb = NULL; if (GRPC_TRACER_ON(grpc_tcp_trace)) { - const char *str = grpc_error_string(error); + const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: %s", str); } @@ -621,16 +621,16 @@ static void tcp_handle_write(void *arg /* grpc_tcp */, grpc_error *error) { } } -static void tcp_write(grpc_endpoint *ep, grpc_slice_buffer *buf, - grpc_closure *cb) { - grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_error *error = GRPC_ERROR_NONE; +static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, + grpc_closure* cb) { + grpc_tcp* tcp = (grpc_tcp*)ep; + grpc_error* error = GRPC_ERROR_NONE; if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t i; for (i = 0; i < buf->count; i++) { - char *data = + char* data = grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); gpr_free(data); @@ -662,7 +662,7 @@ static void tcp_write(grpc_endpoint *ep, grpc_slice_buffer *buf, notify_on_write(tcp); } else { if (GRPC_TRACER_ON(grpc_tcp_trace)) { - const char *str = grpc_error_string(error); + const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: %s", str); } GRPC_CLOSURE_SCHED(cb, error); @@ -671,46 +671,58 @@ static void tcp_write(grpc_endpoint *ep, grpc_slice_buffer *buf, GPR_TIMER_END("tcp_write", 0); } -static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { - grpc_tcp *tcp = (grpc_tcp *)ep; +static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { + grpc_tcp* tcp = (grpc_tcp*)ep; grpc_pollset_add_fd(pollset, tcp->em_fd); } -static void tcp_add_to_pollset_set(grpc_endpoint *ep, - grpc_pollset_set *pollset_set) { - grpc_tcp *tcp = (grpc_tcp *)ep; +static void tcp_add_to_pollset_set(grpc_endpoint* ep, + grpc_pollset_set* pollset_set) { + grpc_tcp* tcp = (grpc_tcp*)ep; grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); } -static char *tcp_get_peer(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *)ep; +static void tcp_delete_from_pollset_set(grpc_endpoint* ep, + grpc_pollset_set* pollset_set) { + grpc_tcp* tcp = (grpc_tcp*)ep; + grpc_pollset_set_del_fd(pollset_set, tcp->em_fd); +} + +static char* tcp_get_peer(grpc_endpoint* ep) { + grpc_tcp* tcp = (grpc_tcp*)ep; return gpr_strdup(tcp->peer_string); } -static int tcp_get_fd(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *)ep; +static int tcp_get_fd(grpc_endpoint* ep) { + grpc_tcp* tcp = (grpc_tcp*)ep; return tcp->fd; } -static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *)ep; +static grpc_resource_user* tcp_get_resource_user(grpc_endpoint* ep) { + grpc_tcp* tcp = (grpc_tcp*)ep; return tcp->resource_user; } -static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer, - tcp_get_fd}; +static const grpc_endpoint_vtable vtable = {tcp_read, + tcp_write, + tcp_add_to_pollset, + tcp_add_to_pollset_set, + tcp_delete_from_pollset_set, + tcp_shutdown, + tcp_destroy, + tcp_get_resource_user, + tcp_get_peer, + tcp_get_fd}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, - const grpc_channel_args *channel_args, - const char *peer_string) { +grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, + const grpc_channel_args* channel_args, + const char* peer_string) { int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; int tcp_max_read_chunk_size = 4 * 1024 * 1024; int tcp_min_read_chunk_size = 256; - grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == @@ -735,7 +747,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { grpc_resource_quota_unref_internal(resource_quota); resource_quota = grpc_resource_quota_ref_internal( - (grpc_resource_quota *)channel_args->args[i].value.pointer.p); + (grpc_resource_quota*)channel_args->args[i].value.pointer.p); } } } @@ -746,7 +758,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size, tcp_max_read_chunk_size); - grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->peer_string = gpr_strdup(peer_string); tcp->fd = grpc_fd_wrapped_fd(em_fd); @@ -775,16 +787,16 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, return &tcp->base; } -int grpc_tcp_fd(grpc_endpoint *ep) { - grpc_tcp *tcp = (grpc_tcp *)ep; +int grpc_tcp_fd(grpc_endpoint* ep) { + grpc_tcp* tcp = (grpc_tcp*)ep; GPR_ASSERT(ep->vtable == &vtable); return grpc_fd_wrapped_fd(tcp->em_fd); } -void grpc_tcp_destroy_and_release_fd(grpc_endpoint *ep, int *fd, - grpc_closure *done) { +void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, + grpc_closure* done) { grpc_network_status_unregister_endpoint(ep); - grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_tcp* tcp = (grpc_tcp*)ep; GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; |