aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2018-08-09 12:13:41 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2018-08-09 12:13:41 -0700
commitf63b51be86142138337d2f3166df6954db18c454 (patch)
tree4ae6cde0cc0b2452478f7f3520c1325e9cf039b7 /src/core/lib
parent82f0275998ee7b18f806514d4bad48b191d0ca53 (diff)
parentcb87dd9fef97d1e1fb2559be68188c0792a3ecd8 (diff)
Merge branch 'master' into rq-threads-2
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/connected_channel.cc4
-rw-r--r--src/core/lib/channel/connected_channel.h4
-rw-r--r--src/core/lib/iomgr/iocp_windows.cc13
-rw-r--r--src/core/lib/iomgr/socket_windows.cc4
-rw-r--r--src/core/lib/iomgr/socket_windows.h2
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc4
-rw-r--r--src/core/lib/iomgr/tcp_windows.h2
-rw-r--r--src/core/lib/surface/channel.cc11
-rw-r--r--src/core/lib/surface/channel_init.h28
-rw-r--r--src/core/lib/surface/init.cc26
-rw-r--r--src/core/lib/surface/init_secure.cc11
-rw-r--r--src/core/lib/transport/transport.h2
12 files changed, 81 insertions, 30 deletions
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index e2ea334ded..c78849a29b 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -228,8 +228,8 @@ static void bind_transport(grpc_channel_stack* channel_stack,
grpc_transport_stream_size(static_cast<grpc_transport*>(t));
}
-bool grpc_add_connected_filter(grpc_channel_stack_builder* builder,
- void* arg_must_be_null) {
+bool grpc_append_connected_filter(grpc_channel_stack_builder* builder,
+ void* arg_must_be_null) {
GPR_ASSERT(arg_must_be_null == nullptr);
grpc_transport* t = grpc_channel_stack_builder_get_transport(builder);
GPR_ASSERT(t != nullptr);
diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h
index faa1c73a21..280daf040d 100644
--- a/src/core/lib/channel/connected_channel.h
+++ b/src/core/lib/channel/connected_channel.h
@@ -25,8 +25,8 @@
extern const grpc_channel_filter grpc_connected_filter;
-bool grpc_add_connected_filter(grpc_channel_stack_builder* builder,
- void* arg_must_be_null);
+bool grpc_append_connected_filter(grpc_channel_stack_builder* builder,
+ void* arg_must_be_null);
/* Debug helper to dig the transport stream out of a call element */
grpc_stream* grpc_connected_channel_get_stream(grpc_call_element* elem);
diff --git a/src/core/lib/iomgr/iocp_windows.cc b/src/core/lib/iomgr/iocp_windows.cc
index ce77231036..ad325fe215 100644
--- a/src/core/lib/iomgr/iocp_windows.cc
+++ b/src/core/lib/iomgr/iocp_windows.cc
@@ -89,10 +89,15 @@ grpc_iocp_work_status grpc_iocp_work(grpc_millis deadline) {
} else {
abort();
}
- success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
- FALSE, &flags);
- info->bytes_transfered = bytes;
- info->wsa_error = success ? 0 : WSAGetLastError();
+ if (socket->shutdown_called) {
+ info->bytes_transfered = 0;
+ info->wsa_error = WSA_OPERATION_ABORTED;
+ } else {
+ success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
+ FALSE, &flags);
+ info->bytes_transfered = bytes;
+ info->wsa_error = success ? 0 : WSAGetLastError();
+ }
GPR_ASSERT(overlapped == &info->overlapped);
grpc_socket_become_ready(socket, info);
return GRPC_IOCP_WORK_WORK;
diff --git a/src/core/lib/iomgr/socket_windows.cc b/src/core/lib/iomgr/socket_windows.cc
index 4ad31cb35d..999c6646ad 100644
--- a/src/core/lib/iomgr/socket_windows.cc
+++ b/src/core/lib/iomgr/socket_windows.cc
@@ -52,6 +52,10 @@ grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name) {
return r;
}
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket) {
+ return socket->socket;
+}
+
/* Schedule a shutdown of the socket operations. Will call the pending
operations to abort them. We need to do that this way because of the
various callsites of that function, which happens to be in various
diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h
index b09b9da562..46d7d58356 100644
--- a/src/core/lib/iomgr/socket_windows.h
+++ b/src/core/lib/iomgr/socket_windows.h
@@ -92,6 +92,8 @@ typedef struct grpc_winsocket {
it will be responsible for closing it. */
grpc_winsocket* grpc_winsocket_create(SOCKET socket, const char* name);
+SOCKET grpc_winsocket_wrapped_socket(grpc_winsocket* socket);
+
/* Initiate an asynchronous shutdown of the socket. Will call off any pending
operation to cancel them. */
void grpc_winsocket_shutdown(grpc_winsocket* socket);
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 5d316d477b..b3cb442f18 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -53,7 +53,7 @@
extern grpc_core::TraceFlag grpc_tcp_trace;
-static grpc_error* set_non_block(SOCKET sock) {
+grpc_error* grpc_tcp_set_non_block(SOCKET sock) {
int status;
uint32_t param = 1;
DWORD ret;
@@ -90,7 +90,7 @@ static grpc_error* enable_loopback_fast_path(SOCKET sock) {
grpc_error* grpc_tcp_prepare_socket(SOCKET sock) {
grpc_error* err;
- err = set_non_block(sock);
+ err = grpc_tcp_set_non_block(sock);
if (err != GRPC_ERROR_NONE) return err;
err = set_dualstack(sock);
if (err != GRPC_ERROR_NONE) return err;
diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h
index 161a545a2a..04ef8102b6 100644
--- a/src/core/lib/iomgr/tcp_windows.h
+++ b/src/core/lib/iomgr/tcp_windows.h
@@ -46,6 +46,8 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket,
grpc_error* grpc_tcp_prepare_socket(SOCKET sock);
+grpc_error* grpc_tcp_set_non_block(SOCKET sock);
+
#endif
#endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 7cbd61adef..82635d3c21 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -281,6 +281,17 @@ void grpc_channel_get_info(grpc_channel* channel,
elem->filter->get_channel_info(elem, channel_info);
}
+void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_API_TRACE("grpc_channel_reset_connect_backoff(channel=%p)", 1,
+ (channel));
+ grpc_transport_op* op = grpc_make_transport_op(nullptr);
+ op->reset_connect_backoff = true;
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
+ elem->filter->start_transport_op(elem, op);
+}
+
static grpc_call* grpc_channel_create_call_internal(
grpc_channel* channel, grpc_call* parent_call, uint32_t propagation_mask,
grpc_completion_queue* cq, grpc_pollset_set* pollset_set_alternative,
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index f01852473b..6543796b4c 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -21,11 +21,37 @@
#include <grpc/support/port_platform.h>
+#include <limits.h>
+
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/transport.h"
-#define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000
+// Priority for channel registration functions to be used in
+// grpc_channel_init_register_stage(). The priority dictates the
+// order in which the registration functions run.
+//
+// When used to register a filter, the filter can either be appended or
+// prepended, thus dictating whether the filter goes at the top or bottom of
+// the stack. Higher priority functions can get closer to the top or bottom
+// of the stack than lower priority functions.
+enum {
+ // Default level. Most of filters should use this level if their location in
+ // the stack does not matter.
+ GRPC_CHANNEL_INIT_PRIORITY_LOW = 0,
+ // For filters that should be added after the group of filters with default
+ // priority, such as auth filters.
+ GRPC_CHANNEL_INIT_PRIORITY_MED = 10000,
+ // For filters that need to be close to top or bottom, such as protocol-level
+ // filters (client_authority, http-client, http-server).
+ GRPC_CHANNEL_INIT_PRIORITY_HIGH = 20000,
+ // For filters that need to be very close to the wire or surface, such as
+ // stats filters (census).
+ GRPC_CHANNEL_INIT_PRIORITY_VERY_HIGH = 30000,
+ // For things that have to happen last, such as connected channel filter or
+ // surface server filter. Consider as reserved for gRPC internals.
+ GRPC_CHANNEL_INIT_PRIORITY_MAX = INT_MAX
+};
/// This module provides a way for plugins (and the grpc core library itself)
/// to register mutators for channel stacks.
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 0ad82fed99..7807b261d4 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -70,11 +70,6 @@ static void do_basic_init(void) {
g_initializations = 0;
}
-static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
- return grpc_channel_stack_builder_append_filter(
- builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr);
-}
-
static bool prepend_filter(grpc_channel_stack_builder* builder, void* arg) {
return grpc_channel_stack_builder_prepend_filter(
builder, static_cast<const grpc_channel_filter*>(arg), nullptr, nullptr);
@@ -82,19 +77,20 @@ static bool prepend_filter(grpc_channel_stack_builder* builder, void* arg) {
static void register_builtin_channel_init() {
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- grpc_add_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_append_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- grpc_add_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_append_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- grpc_add_connected_filter, nullptr);
+ GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_append_connected_filter, nullptr);
grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL,
- GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- append_filter, (void*)&grpc_lame_filter);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
- (void*)&grpc_server_top_filter);
+ GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ prepend_filter, (void*)&grpc_lame_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, prepend_filter,
+ (void*)&grpc_server_top_filter);
}
typedef struct grpc_plugin {
diff --git a/src/core/lib/surface/init_secure.cc b/src/core/lib/surface/init_secure.cc
index 28c6f7b121..8058aaa804 100644
--- a/src/core/lib/surface/init_secure.cc
+++ b/src/core/lib/surface/init_secure.cc
@@ -67,14 +67,17 @@ static bool maybe_prepend_server_auth_filter(
}
void grpc_register_security_filters(void) {
- // Register the auth client with a priority < INT_MAX to allow the authority
+ // Register the auth client with a medium priority to allow the authority
// filter -on which the auth filter depends- to be higher on the channel
// stack.
- grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX - 1,
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
+ GRPC_CHANNEL_INIT_PRIORITY_MED,
maybe_prepend_client_auth_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX - 1,
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
+ GRPC_CHANNEL_INIT_PRIORITY_MED,
maybe_prepend_client_auth_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
+ GRPC_CHANNEL_INIT_PRIORITY_MED,
maybe_prepend_server_auth_filter, nullptr);
}
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 585b9dfae9..9e784635c6 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -282,6 +282,8 @@ typedef struct grpc_transport_op {
/** Called when the ping ack is received */
grpc_closure* on_ack;
} send_ping;
+ // If true, will reset the channel's connection backoff.
+ bool reset_connect_backoff;
/***************************************************************************
* remaining fields are initialized and used at the discretion of the