aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-11-28 11:08:25 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2016-11-28 11:08:25 -0800
commitf4936517723ce63dcb1deca6c2c07dfab6fa1497 (patch)
treedd1ebc1acb98362438f9df9e307050c2ec050a3e /src/core
parent1edfb952d0aa12bdc154e85eab24e87621292366 (diff)
parentd960c8b166eb1160714d3532d567364722940137 (diff)
Merge branch 'master' of github.com:grpc/grpc into backoff
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c16
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c31
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c29
-rw-r--r--src/core/lib/iomgr/ev_posix.h1
-rw-r--r--src/core/lib/iomgr/tcp_windows.c2
5 files changed, 46 insertions, 33 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 127e1cdc13..4e3c7ff681 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -954,6 +954,16 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
+ bool is_client, bool is_initial) {
+ for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail;
+ md = md->next) {
+ gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
+ is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->md->key),
+ grpc_mdstr_as_c_string(md->md->value));
+ }
+}
+
static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
@@ -967,6 +977,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
+ if (op->send_initial_metadata) {
+ log_metadata(op->send_initial_metadata, s->id, t->is_client, true);
+ }
+ if (op->send_trailing_metadata) {
+ log_metadata(op->send_trailing_metadata, s->id, t->is_client, false);
+ }
}
grpc_closure *on_complete = op->on_complete;
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 91041a7c28..07fbfd849e 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -72,6 +72,11 @@ static int grpc_polling_trace = 0; /* Disabled by default */
static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false;
+/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
+ * sure to wake up one polling thread (which can wake up other threads if
+ * needed) */
+static grpc_wakeup_fd global_wakeup_fd;
+
/* Implements the function defined in grpc_posix.h. This function might be
* called before even calling grpc_init() to set either a different signal to
* use. If signum == -1, then the use of signals is disabled */
@@ -437,9 +442,8 @@ static void polling_island_add_wakeup_fd_locked(polling_island *pi,
gpr_asprintf(&err_msg,
"epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
"error: %d (%s)",
- pi->epoll_fd,
- GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
- strerror(errno));
+ pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
+ errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
@@ -541,7 +545,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done;
}
- polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+ polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
if (initial_fd != NULL) {
@@ -843,11 +847,6 @@ static void polling_island_global_shutdown() {
* alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
* case occurs. */
-/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
- * sure to wake up one polling thread (which can wake up other threads if
- * needed) */
-grpc_wakeup_fd grpc_global_wakeup_fd;
-
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
@@ -1163,11 +1162,11 @@ static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
poller_kick_init();
- return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_init(&global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
- grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ grpc_wakeup_fd_destroy(&global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
}
@@ -1274,7 +1273,7 @@ static grpc_error *pollset_kick(grpc_pollset *p,
}
static grpc_error *kick_poller(void) {
- return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
@@ -1501,13 +1500,11 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &grpc_global_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+ if (data_ptr == &global_wakeup_fd) {
+ append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else if (data_ptr == &pi->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+ append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
maybe_do_workqueue_work(exec_ctx, pi);
} else if (data_ptr == &polling_island_wakeup_fd) {
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index e1d620cfff..21b28e5554 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -120,6 +120,8 @@ struct grpc_fd {
grpc_pollset *read_notifier_pollset;
};
+static grpc_wakeup_fd global_wakeup_fd;
+
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
@@ -769,17 +771,17 @@ static grpc_error *pollset_kick(grpc_pollset *p,
static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
- return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_init(&global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
- grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ grpc_wakeup_fd_destroy(&global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
}
static grpc_error *kick_poller(void) {
- return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
/* main interface */
@@ -947,7 +949,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
fd_count = 0;
pfd_count = 2;
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd);
pfds[0].events = POLLIN;
pfds[0].revents = 0;
pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
@@ -1001,8 +1003,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
- work_combine_error(
- &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
+ work_combine_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd));
}
if (pfds[1].revents & POLLIN_CHECK) {
work_combine_error(
@@ -1343,6 +1345,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
int res, idx;
gpr_cv *pollcv;
cv_node *cvn, *prev;
+ int skip_poll = 0;
nfds_t nsockfds = 0;
gpr_thd_id t_id;
gpr_thd_options opt;
@@ -1358,17 +1361,17 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
cvn->cv = pollcv;
cvn->next = g_cvfds.cvfds[idx].cvs;
g_cvfds.cvfds[idx].cvs = cvn;
- // We should return immediately if there are pending events,
- // but we still need to call poll() to check for socket events
+ // Don't bother polling if a wakeup fd is ready
if (g_cvfds.cvfds[idx].is_set) {
- timeout = 0;
+ skip_poll = 1;
}
} else if (fds[i].fd >= 0) {
nsockfds++;
}
}
- if (nsockfds > 0) {
+ res = 0;
+ if (!skip_poll && nsockfds > 0) {
pargs = gpr_malloc(sizeof(struct poll_args));
// Both the main thread and calling thread get a reference
gpr_ref_init(&pargs->refcount, 2);
@@ -1398,16 +1401,14 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
res = pargs->retval;
errno = pargs->err;
} else {
- res = 0;
errno = 0;
gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
}
- } else {
+ } else if (!skip_poll) {
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
deadline =
gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
- res = 0;
}
idx = 0;
@@ -1431,7 +1432,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
- } else if (fds[i].fd >= 0 &&
+ } else if (!skip_poll && fds[i].fd >= 0 &&
gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
fds[i].revents = pargs->fds[idx].revents;
idx++;
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 2fdef06838..cb5832539d 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -183,6 +183,5 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
-extern grpc_wakeup_fd grpc_global_wakeup_fd;
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 62afbcef51..d4613b674e 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -423,7 +423,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
tcp->base.vtable = &vtable;
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
- gpr_ref_init(&tcp->refcount, 2);
+ gpr_ref_init(&tcp->refcount, 1);
grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);