aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2018-08-09 12:13:56 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2018-08-09 12:13:56 -0700
commite4961eb824d92805f8ea128d80cca1b22082bf43 (patch)
tree3a5c46bef9982d3e9b3034b7fe40412627539b20 /src/core/lib
parent427af942e6bc6e1c747e43b4da968dd7a2b61607 (diff)
parentcb87dd9fef97d1e1fb2559be68188c0792a3ecd8 (diff)
Merge branch 'master' into minor-timer-fix
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/connected_channel.cc4
-rw-r--r--src/core/lib/channel/connected_channel.h4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc3
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc3
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc15
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc21
-rw-r--r--src/core/lib/iomgr/ev_posix.cc6
-rw-r--r--src/core/lib/iomgr/ev_posix.h18
-rw-r--r--src/core/lib/iomgr/iocp_windows.cc13
-rw-r--r--src/core/lib/iomgr/socket_windows.cc4
-rw-r--r--src/core/lib/iomgr/socket_windows.h2
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc4
-rw-r--r--src/core/lib/iomgr/tcp_windows.h2
-rw-r--r--src/core/lib/surface/channel.cc11
-rw-r--r--src/core/lib/surface/channel_init.h28
-rw-r--r--src/core/lib/surface/init.cc26
-rw-r--r--src/core/lib/surface/init_secure.cc11
-rw-r--r--src/core/lib/transport/transport.h2
18 files changed, 141 insertions, 36 deletions
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index e2ea334ded..c78849a29b 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -228,8 +228,8 @@ static void bind_transport(grpc_channel_stack* channel_stack,
grpc_transport_stream_size(static_cast<grpc_transport*>(t));
}
-bool grpc_add_connected_filter(grpc_channel_stack_builder* builder,
- void* arg_must_be_null) {
+bool grpc_append_connected_filter(grpc_channel_stack_builder* builder,
+ void* arg_must_be_null) {
GPR_ASSERT(arg_must_be_null == nullptr);
grpc_transport* t = grpc_channel_stack_builder_get_transport(builder);
GPR_ASSERT(t != nullptr);
diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h
index faa1c73a21..280daf040d 100644
--- a/src/core/lib/channel/connected_channel.h
+++ b/src/core/lib/channel/connected_channel.h
@@ -25,8 +25,8 @@
extern const grpc_channel_filter grpc_connected_filter;
-bool grpc_add_connected_filter(grpc_channel_stack_builder* builder,
- void* arg_must_be_null);
+bool grpc_append_connected_filter(grpc_channel_stack_builder* builder,
+ void* arg_must_be_null);
/* Debug helper to dig the transport stream out of a call element */
grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index ecb7eadf85..66e0f1fd6d 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1203,6 +1203,9 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
pollset_init,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 6ec8267799..96eae30345 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1615,6 +1615,9 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
pollset_init,
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 28656b0666..5695ac795d 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -948,6 +948,12 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
fd->error_closure->NotifyOn(closure);
}
+static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
+
+static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1105,12 +1111,6 @@ static int poll_deadline_to_millis_timeout(grpc_millis millis) {
return static_cast<int>(delta);
}
-static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
-
-static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
-
-static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
-
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@@ -1647,6 +1647,9 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_become_readable,
+ fd_become_writable,
+ fd_has_errors,
fd_is_shutdown,
pollset_init,
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index a4a83c4ad7..fb4c71ef71 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -538,6 +538,24 @@ static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
}
+static void fd_set_readable(grpc_fd* fd) {
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(fd, &fd->read_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_set_writable(grpc_fd* fd) {
+ gpr_mu_lock(&fd->mu);
+ set_ready_locked(fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_set_error(grpc_fd* fd) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ }
+}
+
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
grpc_pollset_worker* worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher* watcher) {
@@ -1700,6 +1718,9 @@ static const grpc_event_engine_vtable vtable = {
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
+ fd_set_readable,
+ fd_set_writable,
+ fd_set_error,
fd_is_shutdown,
pollset_init,
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 1139b3273a..0e45fc42ca 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -239,6 +239,12 @@ void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_error(fd, closure);
}
+void grpc_fd_set_readable(grpc_fd* fd) { g_event_engine->fd_set_readable(fd); }
+
+void grpc_fd_set_writable(grpc_fd* fd) { g_event_engine->fd_set_writable(fd); }
+
+void grpc_fd_set_error(grpc_fd* fd) { g_event_engine->fd_set_error(fd); }
+
static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 393c3dd05e..8d0bcc0710 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -51,6 +51,9 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure);
+ void (*fd_set_readable)(grpc_fd* fd);
+ void (*fd_set_writable)(grpc_fd* fd);
+ void (*fd_set_error)(grpc_fd* fd);
bool (*fd_is_shutdown)(grpc_fd* fd);
void (*pollset_init)(grpc_pollset* pollset, gpr_mu** mu);
@@ -141,6 +144,21 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure);
* needs to have been set on grpc_fd_create */
void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure);
+/* Forcibly set the fd to be readable, resulting in the closure registered with
+ * grpc_fd_notify_on_read being invoked.
+ */
+void grpc_fd_set_readable(grpc_fd* fd);
+
+/* Forcibly set the fd to be writable, resulting in the closure registered with
+ * grpc_fd_notify_on_write being invoked.
+ */
+void grpc_fd_set_writable(grpc_fd* fd);
+
+/* Forcibly set the fd to have errored, resulting in the closure registered with
+ * grpc_fd_notify_on_error being invoked.
+ */
+void grpc_fd_set_error(grpc_fd* fd);
+
/* pollset_posix functions */
/* Add an fd to a pollset */
diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc
index ce77231036..ad325fe215 100644
--- a/src/core/lib/iomgr/iocp_windows.cc
+++ b/src/core/lib/iomgr/iocp_windows.cc
@@ -89,10 +89,15 @@ grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) {
} else {
abort();
}
- success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
- FALSE, &flags);
- info->bytes_transfered = bytes;
- info->wsa_error = success ? 0 : WSAGetLastError();
+ if (socket->shutdown_called) {
+ info->bytes_transfered = 0;
+ info->wsa_error = WSA_OPERATION_ABORTED;
+ } else {
+ success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
+ FALSE, &flags);
+ info->bytes_transfered = bytes;
+ info->wsa_error = success ? 0 : WSAGetLastError();
+ }
GPR_ASSERT(overlapped == &info->overlapped);
grpc_socket_become_ready(socket, info);
return GRPC_IOCP_WORK_WORK;
diff --git a/src/core/lib/iomgr/socket_windows.cc b/src/core/lib/iomgr/socket_windows.cc
index 4ad31cb35d..999c6646ad 100644
--- a/src/core/lib/iomgr/socket_windows.cc
+++ b/src/core/lib/iomgr/socket_windows.cc
@@ -52,6 +52,10 @@ grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name) {
return r;
}
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket) {
+ return socket->socket;
+}
+
/* Schedule a shutdown of the socket operations. Will call the pending
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index b09b9da562..46d7d58356 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -92,6 +92,8 @@ typedef struct grpc_winsocket {
it will be responsible for closing it. */
grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name);
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket);
+
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket* socket);
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 5d316d477b..b3cb442f18 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -53,7 +53,7 @@
extern grpc_core::TraceFlag grpc_tcp_trace;
-static grpc_error* set_non_block(SOCKET sock) {
+grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
int status;
uint32_t param = 1;
DWORD ret;
@@ -90,7 +90,7 @@ static grpc_error* enable_loopback_fast_path(SOCKET sock) {
grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
grpc_error* err;
- err = set_non_block(sock);
+ err = grpc_tcp_set_non_block(sock);
if (err != GRPC_ERROR_NONE) return err;
err = set_dualstack(sock);
if (err != GRPC_ERROR_NONE) return err;
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index 161a545a2a..04ef8102b6 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -46,6 +46,8 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
grpc_error* grpc_tcp_prepare_socket(SOCKET sock);
+grpc_error* grpc_tcp_set_non_block(SOCKET sock);
+
#endif
#endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 7cbd61adef..82635d3c21 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -281,6 +281,17 @@ void grpc_channel_get_info(grpc_channel* channel,
elem->filter->get_channel_info(elem, channel_info);
}
+void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_API_TRACE("grpc_channel_reset_connect_backoff(channel=%p)", 1,
+ (channel));
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ op->reset_connect_backoff = true;
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
static grpc_call* grpc_channel_create_call_internal(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index f01852473b..6543796b4c 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -21,11 +21,37 @@
#include <grpc/support/port_platform.h>
+#include <limits.h>
+
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
-#define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000
+// Priority for channel registration functions to be used in
+// grpc_channel_init_register_stage(). The priority dictates the
+// order in which the registration functions run.
+//
+// When used to register a filter, the filter can either be appended or
+// prepended, thus dictating whether the filter goes at the top or bottom of
+// the stack. Higher priority functions can get closer to the top or bottom
+// of the stack than lower priority functions.
+enum {
+ // Default level. Most of filters should use this level if their location in
+ // the stack does not matter.
+ GRPC_CHANNEL_INIT_PRIORITY_LOW = 0,
+ // For filters that should be added after the group of filters with default
+ // priority, such as auth filters.
+ GRPC_CHANNEL_INIT_PRIORITY_MED = 10000,
+ // For filters that need to be close to top or bottom, such as protocol-level
+ // filters (client_authority, http-client, http-server).
+ GRPC_CHANNEL_INIT_PRIORITY_HIGH = 20000,
+ // For filters that need to be very close to the wire or surface, such as
+ // stats filters (census).
+ GRPC_CHANNEL_INIT_PRIORITY_VERY_HIGH = 30000,
+ // For things that have to happen last, such as connected channel filter or
+ // surface server filter. Consider as reserved for gRPC internals.
+ GRPC_CHANNEL_INIT_PRIORITY_MAX = INT_MAX
+};
/// This module provides a way for plugins (and the grpc core library itself)
/// to register mutators for channel stacks.
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 0ad82fed99..7807b261d4 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -70,11 +70,6 @@ static void do_basic_init(void) {
g_initializations = 0;
}
-static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
- return grpc_channel_stack_builder_append_filter(
- builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr);
-}
-
static bool prepend_filter(grpc_channel_stack_builder* builder, void* arg) {
return grpc_channel_stack_builder_prepend_filter(
builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr);
@@ -82,19 +77,20 @@ static bool prepend_filter(grpc_channel_stack_builder* builder, void* arg) {
static void register_builtin_channel_init() {
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- grpc_add_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_append_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- grpc_add_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_append_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- grpc_add_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_append_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- append_filter, (void*)&grpc_lame_filter);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
- (void*)&grpc_server_top_filter);
+ GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ prepend_filter, (void*)&grpc_lame_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, prepend_filter,
+ (void*)&grpc_server_top_filter);
}
typedef struct grpc_plugin {
diff --git a/src/core/lib/surface/init_secure.cc b/src/core/lib/surface/init_secure.cc
index 28c6f7b121..8058aaa804 100644
--- a/src/core/lib/surface/init_secure.cc
+++ b/src/core/lib/surface/init_secure.cc
@@ -67,14 +67,17 @@ static bool maybe_prepend_server_auth_filter(
}
void grpc_register_security_filters(void) {
- // Register the auth client with a priority < INT_MAX to allow the authority
+ // Register the auth client with a medium priority to allow the authority
// filter -on which the auth filter depends- to be higher on the channel
// stack.
- grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX - 1,
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
+ GRPC_CHANNEL_INIT_PRIORITY_MED,
maybe_prepend_client_auth_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX - 1,
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
+ GRPC_CHANNEL_INIT_PRIORITY_MED,
maybe_prepend_client_auth_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
+ GRPC_CHANNEL_INIT_PRIORITY_MED,
maybe_prepend_server_auth_filter, nullptr);
}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 585b9dfae9..9e784635c6 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -282,6 +282,8 @@ typedef struct grpc_transport_op {
/** Called when the ping ack is received */
grpc_closure* on_ack;
} send_ping;
+ // If true, will reset the channel's connection backoff.
+ bool reset_connect_backoff;
/***************************************************************************
* remaining fields are initialized and used at the discretion of the