aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-05-08 14:51:25 -0700
committerGravatar Julien Boeuf <jboeuf@google.com>2015-05-08 14:51:25 -0700
commit554c79c7304866dc543c2f6a73203ebcd62f7c55 (patch)
tree553f291bd2094972b2da100baf6264141c64dfde /src/core
parent9f218ddd9db5049d0ba92334f1e0a329171343c9 (diff)
parentfe117723d0599ff990fb154acbc7856083d94c9e (diff)
Merging with master.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/endpoint_pair_windows.c2
-rw-r--r--src/core/iomgr/iocp_windows.c35
-rw-r--r--src/core/iomgr/socket_utils_common_posix.c13
-rw-r--r--src/core/iomgr/socket_utils_posix.h5
-rw-r--r--src/core/iomgr/socket_windows.c7
-rw-r--r--src/core/iomgr/socket_windows.h13
-rw-r--r--src/core/iomgr/tcp_client_posix.c3
-rw-r--r--src/core/iomgr/tcp_client_windows.c24
-rw-r--r--src/core/iomgr/tcp_posix.c8
-rw-r--r--src/core/iomgr/tcp_server_posix.c5
-rw-r--r--src/core/iomgr/tcp_server_windows.c3
-rw-r--r--src/core/iomgr/tcp_windows.c92
-rw-r--r--src/core/surface/call.c51
-rw-r--r--src/core/transport/chttp2_transport.c13
-rw-r--r--src/core/transport/metadata.c16
-rw-r--r--src/core/transport/metadata.h3
16 files changed, 158 insertions, 135 deletions
diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c
index 58960b6028..7c945ebad4 100644
--- a/src/core/iomgr/endpoint_pair_windows.c
+++ b/src/core/iomgr/endpoint_pair_windows.c
@@ -68,6 +68,8 @@ static void create_sockets(SOCKET sv[2]) {
GPR_ASSERT(svr_sock != INVALID_SOCKET);
closesocket(lst_sock);
+ grpc_tcp_prepare_socket(cli_sock);
+ grpc_tcp_prepare_socket(svr_sock);
sv[1] = cli_sock;
sv[0] = svr_sock;
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 1cdf3da0d6..8827bb99bc 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -53,6 +53,7 @@ static OVERLAPPED g_iocp_custom_overlap;
static gpr_event g_shutdown_iocp;
static gpr_event g_iocp_done;
static gpr_atm g_orphans = 0;
+static gpr_atm g_custom_events = 0;
static HANDLE g_iocp;
@@ -62,20 +63,19 @@ static void do_iocp_work() {
DWORD flags = 0;
ULONG_PTR completion_key;
LPOVERLAPPED overlapped;
- gpr_timespec wait_time = gpr_inf_future;
grpc_winsocket *socket;
grpc_winsocket_callback_info *info;
void(*f)(void *, int) = NULL;
void *opaque = NULL;
success = GetQueuedCompletionStatus(g_iocp, &bytes,
&completion_key, &overlapped,
- gpr_time_to_millis(wait_time));
- if (!success && !overlapped) {
- /* The deadline got attained. */
- return;
- }
+ INFINITE);
+ /* success = 0 and overlapped = NULL means the deadline got attained.
+ Which is impossible. since our wait time is +inf */
+ GPR_ASSERT(success || overlapped);
GPR_ASSERT(completion_key && overlapped);
if (overlapped == &g_iocp_custom_overlap) {
+ gpr_atm_full_fetch_add(&g_custom_events, -1);
if (completion_key == (ULONG_PTR) &g_iocp_kick_token) {
/* We were awoken from a kick. */
return;
@@ -93,13 +93,17 @@ static void do_iocp_work() {
gpr_log(GPR_ERROR, "Unknown IOCP operation");
abort();
}
- success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
- FALSE, &flags);
+ GPR_ASSERT(info->outstanding);
if (socket->orphan) {
- grpc_winsocket_destroy(socket);
- gpr_atm_full_fetch_add(&g_orphans, -1);
+ info->outstanding = 0;
+ if (!socket->read_info.outstanding && !socket->write_info.outstanding) {
+ grpc_winsocket_destroy(socket);
+ gpr_atm_full_fetch_add(&g_orphans, -1);
+ }
return;
}
+ success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
+ FALSE, &flags);
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
@@ -117,10 +121,14 @@ static void do_iocp_work() {
}
static void iocp_loop(void *p) {
- while (gpr_atm_acq_load(&g_orphans) || !gpr_event_get(&g_shutdown_iocp)) {
+ void * eventshutdown = NULL;
+ while (gpr_atm_acq_load(&g_orphans) ||
+ gpr_atm_acq_load(&g_custom_events) ||
+ !gpr_event_get(&g_shutdown_iocp)) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
}
+ gpr_log(GPR_DEBUG, "iocp_loop is done");
gpr_event_set(&g_iocp_done, (void *)1);
}
@@ -128,8 +136,8 @@ static void iocp_loop(void *p) {
void grpc_iocp_init(void) {
gpr_thd_id id;
- g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
- (ULONG_PTR)NULL, 0);
+ g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
+ NULL, (ULONG_PTR)NULL, 0);
GPR_ASSERT(g_iocp);
gpr_event_init(&g_iocp_done);
@@ -140,6 +148,7 @@ void grpc_iocp_init(void) {
void grpc_iocp_kick(void) {
BOOL success;
+ gpr_atm_full_fetch_add(&g_custom_events, 1);
success = PostQueuedCompletionStatus(g_iocp, 0,
(ULONG_PTR) &g_iocp_kick_token,
&g_iocp_custom_overlap);
diff --git a/src/core/iomgr/socket_utils_common_posix.c b/src/core/iomgr/socket_utils_common_posix.c
index 3c8cafa315..a9af594700 100644
--- a/src/core/iomgr/socket_utils_common_posix.c
+++ b/src/core/iomgr/socket_utils_common_posix.c
@@ -76,6 +76,19 @@ int grpc_set_socket_nonblocking(int fd, int non_blocking) {
return 1;
}
+int grpc_set_socket_no_sigpipe_if_possible(int fd) {
+#ifdef GPR_HAVE_SO_NOSIGPIPE
+ int val = 1;
+ int newval;
+ socklen_t intlen = sizeof(newval);
+ return 0 == setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) &&
+ 0 == getsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen) &&
+ (newval != 0) == val;
+#else
+ return 1;
+#endif
+}
+
/* set a socket to close on exec */
int grpc_set_socket_cloexec(int fd, int close_on_exec) {
int oldflags = fcntl(fd, F_GETFD, 0);
diff --git a/src/core/iomgr/socket_utils_posix.h b/src/core/iomgr/socket_utils_posix.h
index c161082afc..d2a315b462 100644
--- a/src/core/iomgr/socket_utils_posix.h
+++ b/src/core/iomgr/socket_utils_posix.h
@@ -63,6 +63,11 @@ int grpc_set_socket_low_latency(int fd, int low_latency);
state to library users, we turn off IPv6 sockets. */
int grpc_ipv6_loopback_available(void);
+/* Tries to set SO_NOSIGPIPE if available on this platform.
+ Returns 1 on success, 0 on failure.
+ If SO_NO_SIGPIPE is not available, returns 1. */
+int grpc_set_socket_no_sigpipe_if_possible(int fd);
+
/* An enum to keep track of IPv4/IPv6 socket modes.
Currently, this information is only used when a socket is first created, but
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 9306310d43..35dbfa1587 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -75,15 +75,14 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) {
/* Abandons a socket. Either we're going to queue it up for garbage collecting
from the IO Completion Port thread, or destroy it immediately. Note that this
mechanisms assumes that we're either always waiting for an operation, or we
- explicitely know that we don't. If there is a future case where we can have
+ explicitly know that we don't. If there is a future case where we can have
an "idle" socket which is neither trying to read or write, we'd start leaking
both memory and sockets. */
void grpc_winsocket_orphan(grpc_winsocket *winsocket) {
SOCKET socket = winsocket->socket;
- if (!winsocket->closed_early) {
+ if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) {
grpc_iocp_socket_orphan(winsocket);
- }
- if (winsocket->closed_early) {
+ } else {
grpc_winsocket_destroy(winsocket);
}
closesocket(socket);
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index 6e778a776a..8898def854 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -65,12 +65,14 @@ typedef struct grpc_winsocket_callback_info {
/* The results of the overlapped operation. */
DWORD bytes_transfered;
int wsa_error;
+ /* A boolean indicating that we started an operation. */
+ int outstanding;
} grpc_winsocket_callback_info;
/* This is a wrapper to a Windows socket. A socket can have one outstanding
read, and one outstanding write. Doing an asynchronous accept means waiting
for a read operation. Doing an asynchronous connect means waiting for a
- write operation. These are completely abitrary ties between the operation
+ write operation. These are completely arbitrary ties between the operation
and the kind of event, because we can have one overlapped per pending
operation, whichever its nature is. So we could have more dedicated pending
operation callbacks for connect and listen. But given the scope of listen
@@ -87,17 +89,10 @@ typedef struct grpc_winsocket {
/* You can't add the same socket twice to the same IO Completion Port.
This prevents that. */
int added_to_iocp;
- /* A boolean to indicate that the caller has abandonned that socket, but
+ /* A boolean to indicate that the caller has abandoned that socket, but
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan;
- /* A boolean to indicate that the socket was already closed somehow, and
- that no operation is going to be pending. Trying to abandon a socket in
- that state won't result in an orphan, but will instead be destroyed
- without further delay. We could avoid that boolean by adding one into
- grpc_winsocket_callback_info describing that the operation is pending,
- but that 1) waste memory more and 2) obfuscate the intent a bit more. */
- int closed_early;
} grpc_winsocket;
/* Create a wrapped windows handle. This takes ownership of it, meaning that
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index e20cc3d1b2..2401fe00e4 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -69,7 +69,8 @@ static int prepare_socket(const struct sockaddr *addr, int fd) {
}
if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) ||
- (addr->sa_family != AF_UNIX && !grpc_set_socket_low_latency(fd, 1))) {
+ (addr->sa_family != AF_UNIX && !grpc_set_socket_low_latency(fd, 1)) ||
+ !grpc_set_socket_no_sigpipe_if_possible(fd)) {
gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
strerror(errno));
goto error;
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 653c0c65c5..d95346f87a 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -74,7 +74,7 @@ static void async_connect_cleanup(async_connect *ac) {
static void on_alarm(void *acp, int occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
- /* If the alarm didn't occor, it got cancelled. */
+ /* If the alarm didn't occur, it got cancelled. */
if (ac->socket != NULL && occured) {
grpc_winsocket_shutdown(ac->socket);
}
@@ -98,6 +98,7 @@ static void on_connect(void *acp, int from_iocp) {
if (from_iocp) {
DWORD transfered_bytes = 0;
DWORD flags;
+ info->outstanding = 0;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
&transfered_bytes, FALSE,
&flags);
@@ -106,10 +107,8 @@ static void on_connect(void *acp, int from_iocp) {
char *utf8_message = gpr_format_message(WSAGetLastError());
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message);
- goto finish;
- } else {
+ } else if (!aborted) {
ep = grpc_tcp_create(ac->socket);
- goto finish;
}
} else {
gpr_log(GPR_ERROR, "on_connect is shutting down");
@@ -125,20 +124,12 @@ static void on_connect(void *acp, int from_iocp) {
return;
}
- abort();
+ ac->socket->write_info.outstanding = 0;
-finish:
/* If we don't have an endpoint, it means the connection failed,
so it doesn't matter if it aborted or failed. We need to orphan
that socket. */
- if (!ep || aborted) {
- /* If the connection failed, it means we won't get an IOCP notification,
- so let's flag it as already closed. But if the connection was aborted,
- while we still got an endpoint, we have to wait for the IOCP to collect
- that socket. So let's properly flag that. */
- ac->socket->closed_early = !ep;
- grpc_winsocket_orphan(ac->socket);
- }
+ if (!ep || aborted) grpc_winsocket_orphan(ac->socket);
async_connect_cleanup(ac);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
@@ -189,7 +180,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
&ioctl_num_bytes, NULL, NULL);
if (status != 0) {
- message = "Unable to retreive ConnectEx pointer: %s";
+ message = "Unable to retrieve ConnectEx pointer: %s";
goto failure;
}
@@ -204,6 +195,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
socket = grpc_winsocket_create(sock);
info = &socket->write_info;
+ info->outstanding = 1;
success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
@@ -225,6 +217,7 @@ void grpc_tcp_client_connect(void(*cb)(void *arg, grpc_endpoint *tcp),
ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ socket->write_info.outstanding = 1;
grpc_socket_notify_on_write(socket, on_connect, ac);
return;
@@ -233,7 +226,6 @@ failure:
gpr_log(GPR_ERROR, message, utf8_message);
gpr_free(utf8_message);
if (socket) {
- socket->closed_early = 1;
grpc_winsocket_orphan(socket);
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 06725fbc89..f7dae5f86c 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -53,6 +53,12 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#ifdef GPR_HAVE_MSG_NOSIGNAL
+#define SENDMSG_FLAGS MSG_NOSIGNAL
+#else
+#define SENDMSG_FLAGS 0
+#endif
+
/* Holds a slice array and associated state. */
typedef struct grpc_tcp_slice_state {
gpr_slice *slices; /* Array of slices */
@@ -461,7 +467,7 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0);
do {
/* TODO(klempner): Cork if this is a partial write */
- sent_length = sendmsg(tcp->fd, &msg, 0);
+ sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
} while (sent_length < 0 && errno == EINTR);
GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0);
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 7e31f2d7a5..d1cd8a769c 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -235,7 +235,8 @@ static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) {
if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) ||
(addr->sa_family != AF_UNIX && (!grpc_set_socket_low_latency(fd, 1) ||
- !grpc_set_socket_reuse_addr(fd, 1)))) {
+ !grpc_set_socket_reuse_addr(fd, 1))) ||
+ !grpc_set_socket_no_sigpipe_if_possible(fd)) {
gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
strerror(errno));
goto error;
@@ -296,6 +297,8 @@ static void on_read(void *arg, int success) {
}
}
+ grpc_set_socket_no_sigpipe_if_possible(fd);
+
sp->server->cb(
sp->server->cb_arg,
grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index c6137e1e1d..d22acc7453 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -123,7 +123,6 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
closed by the system. */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
- sp->socket->closed_early = 1;
grpc_winsocket_orphan(sp->socket);
}
gpr_free(s->ports);
@@ -249,6 +248,7 @@ static void on_accept(void *arg, int from_iocp) {
if (sp->shutting_down) {
GPR_ASSERT(from_iocp);
sp->shutting_down = 0;
+ sp->socket->read_info.outstanding = 0;
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_cv_broadcast(&sp->server->cv);
@@ -420,6 +420,7 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
+ s->ports[i].socket->read_info.outstanding = 1;
start_accept(s->ports + i);
s->active_ports++;
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index c8483bd891..f16b4c1268 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -86,12 +86,10 @@ typedef struct grpc_tcp {
grpc_endpoint_read_cb read_cb;
void *read_user_data;
gpr_slice read_slice;
- int outstanding_read;
grpc_endpoint_write_cb write_cb;
void *write_user_data;
gpr_slice_buffer write_slices;
- int outstanding_write;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
@@ -141,14 +139,13 @@ static void on_read(void *tcpp, int from_iocp) {
return;
}
- GPR_ASSERT(tcp->outstanding_read);
+ GPR_ASSERT(tcp->socket->read_info.outstanding);
if (socket->read_info.wsa_error != 0) {
char *utf8_message = gpr_format_message(info->wsa_error);
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR;
- socket->closed_early = 1;
} else {
if (info->bytes_transfered != 0) {
sub = gpr_slice_sub(tcp->read_slice, 0, info->bytes_transfered);
@@ -161,7 +158,7 @@ static void on_read(void *tcpp, int from_iocp) {
}
}
- tcp->outstanding_read = 0;
+ tcp->socket->read_info.outstanding = 0;
tcp_unref(tcp);
cb(opaque, slice, nslices, status);
@@ -175,13 +172,15 @@ static void win_notify_on_read(grpc_endpoint *ep,
int status;
DWORD bytes_read = 0;
DWORD flags = 0;
- int error;
WSABUF buffer;
- GPR_ASSERT(!tcp->outstanding_read);
- GPR_ASSERT(!tcp->shutting_down);
+ GPR_ASSERT(!tcp->socket->read_info.outstanding);
+ if (tcp->shutting_down) {
+ cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+ return;
+ }
tcp_ref(tcp);
- tcp->outstanding_read = 1;
+ tcp->socket->read_info.outstanding = 1;
tcp->read_cb = cb;
tcp->read_user_data = arg;
@@ -208,34 +207,13 @@ static void win_notify_on_read(grpc_endpoint *ep,
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
&info->overlapped, NULL);
- if (status == 0) {
- grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
- return;
- }
-
- error = WSAGetLastError();
-
- if (error != WSA_IO_PENDING) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "WSARecv error: %s - this means we're going to leak.",
- utf8_message);
- gpr_free(utf8_message);
- /* I'm pretty sure this is a very bad situation there. Hence the log.
- What will happen now is that the socket will neither wait for read
- or write, unless the caller retry, which is unlikely, but I am not
- sure if that's guaranteed. And there might also be a write pending.
- This means that the future orphanage of that socket will be in limbo,
- and we're going to leak it. I have no idea what could cause this
- specific case however, aside from a parameter error from our call.
- Normal read errors would actually happen during the overlapped
- operation, which is the supported way to go for that. */
- tcp->outstanding_read = 0;
- tcp_unref(tcp);
- cb(arg, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
- /* Per the comment above, I'm going to treat that case as a hard failure
- for now, and leave the option to catch that and debug. */
- __debugbreak();
- return;
+ if (status != 0) {
+ int wsa_error = WSAGetLastError();
+ if (wsa_error != WSA_IO_PENDING) {
+ info->wsa_error = wsa_error;
+ on_read(tcp, 1);
+ return;
+ }
}
grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
@@ -260,7 +238,7 @@ static void on_write(void *tcpp, int from_iocp) {
}
gpr_mu_unlock(&tcp->mu);
- GPR_ASSERT(tcp->outstanding_write);
+ GPR_ASSERT(tcp->socket->write_info.outstanding);
if (do_abort) {
if (from_iocp) gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
@@ -274,13 +252,12 @@ static void on_write(void *tcpp, int from_iocp) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message);
status = GRPC_ENDPOINT_CB_ERROR;
- tcp->socket->closed_early = 1;
} else {
GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
}
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
- tcp->outstanding_write = 0;
+ tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp);
cb(opaque, status);
@@ -301,11 +278,13 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
WSABUF *allocated = NULL;
WSABUF *buffers = local_buffers;
- GPR_ASSERT(!tcp->outstanding_write);
- GPR_ASSERT(!tcp->shutting_down);
+ GPR_ASSERT(!tcp->socket->write_info.outstanding);
+ if (tcp->shutting_down) {
+ return GRPC_ENDPOINT_WRITE_ERROR;
+ }
tcp_ref(tcp);
- tcp->outstanding_write = 1;
+ tcp->socket->write_info.outstanding = 1;
tcp->write_cb = cb;
tcp->write_user_data = arg;
@@ -341,7 +320,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
}
if (allocated) gpr_free(allocated);
gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
- tcp->outstanding_write = 0;
+ tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp);
return ret;
}
@@ -353,29 +332,12 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
&bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated);
- /* It is possible the operation completed then. But we'd still get an IOCP
- notification. So let's ignore it and wait for the IOCP. */
if (status != 0) {
- int error = WSAGetLastError();
- if (error != WSA_IO_PENDING) {
- char *utf8_message = gpr_format_message(WSAGetLastError());
- gpr_log(GPR_ERROR, "WSASend error: %s - this means we're going to leak.",
- utf8_message);
- gpr_free(utf8_message);
- /* I'm pretty sure this is a very bad situation there. Hence the log.
- What will happen now is that the socket will neither wait for read
- or write, unless the caller retry, which is unlikely, but I am not
- sure if that's guaranteed. And there might also be a read pending.
- This means that the future orphanage of that socket will be in limbo,
- and we're going to leak it. I have no idea what could cause this
- specific case however, aside from a parameter error from our call.
- Normal read errors would actually happen during the overlapped
- operation, which is the supported way to go for that. */
- tcp->outstanding_write = 0;
+ int wsa_error = WSAGetLastError();
+ if (wsa_error != WSA_IO_PENDING) {
+ gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
+ tcp->socket->write_info.outstanding = 0;
tcp_unref(tcp);
- /* Per the comment above, I'm going to treat that case as a hard failure
- for now, and leave the option to catch that and debug. */
- __debugbreak();
return GRPC_ENDPOINT_WRITE_ERROR;
}
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 8aa0aae886..e117f270df 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -375,18 +375,10 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
- int flush;
-
call->status[source].is_set = 1;
call->status[source].code = status;
- if (call->is_client) {
- flush = status == GRPC_STATUS_CANCELLED;
- } else {
- flush = status != GRPC_STATUS_OK;
- }
-
- if (flush && !grpc_bbq_empty(&call->incoming_queue)) {
+ if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
grpc_bbq_flush(&call->incoming_queue);
}
}
@@ -711,6 +703,10 @@ static void call_on_done_recv(void *pc, int success) {
break;
}
}
+ if (!success) {
+ grpc_stream_ops_unref_owned_objects(&call->recv_ops.ops[i],
+ call->recv_ops.nops - i);
+ }
if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
call->read_state = READ_STATE_READ_CLOSED;
@@ -739,14 +735,9 @@ static void call_on_done_recv(void *pc, int success) {
GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
}
-static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
- grpc_metadata *metadata) {
+static int prepare_application_metadata(grpc_call *call, size_t count,
+ grpc_metadata *metadata) {
size_t i;
- grpc_mdelem_list out;
- if (count == 0) {
- out.head = out.tail = NULL;
- return out;
- }
for (i = 0; i < count; i++) {
grpc_metadata *md = &metadata[i];
grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
@@ -756,9 +747,27 @@ static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
(const gpr_uint8 *)md->value,
md->value_length);
+ if (!grpc_mdstr_is_legal_header(l->md->key)) {
+ gpr_log(GPR_ERROR, "attempt to send invalid metadata key");
+ return 0;
+ } else if (!grpc_mdstr_is_bin_suffixed(l->md->key) &&
+ !grpc_mdstr_is_legal_header(l->md->value)) {
+ gpr_log(GPR_ERROR, "attempt to send invalid metadata value");
+ return 0;
+ }
l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
}
+ return 1;
+}
+
+static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
+ grpc_metadata *metadata) {
+ grpc_mdelem_list out;
+ if (count == 0) {
+ out.head = out.tail = NULL;
+ return out;
+ }
out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
return out;
@@ -954,8 +963,16 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
} else if (call->request_set[op] == REQSET_DONE) {
return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
}
- have_ops |= 1u << op;
data = reqs[i].data;
+ if (op == GRPC_IOREQ_SEND_INITIAL_METADATA ||
+ op == GRPC_IOREQ_SEND_TRAILING_METADATA) {
+ if (!prepare_application_metadata(call, data.send_metadata.count,
+ data.send_metadata.metadata)) {
+ return start_ioreq_error(call, have_ops,
+ GRPC_CALL_ERROR_INVALID_METADATA);
+ }
+ }
+ have_ops |= 1u << op;
call->request_data[op] = data;
call->request_set[op] = set;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 885838ec5d..a6f9f782a1 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -824,12 +824,9 @@ static void unlock(transport *t) {
/* gather any callbacks that need to be made */
if (!t->calling_back) {
- perform_callbacks = prepare_callbacks(t);
- if (perform_callbacks) {
- t->calling_back = 1;
- }
+ t->calling_back = perform_callbacks = prepare_callbacks(t);
if (cb) {
- if (t->error_state == ERROR_STATE_SEEN && !t->writing && !t->calling_back) {
+ if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back = 1;
t->cb = NULL; /* no more callbacks */
@@ -1930,8 +1927,10 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
break;
case GRPC_ENDPOINT_CB_OK:
lock(t);
- for (i = 0; i < nslices && process_read(t, slices[i]); i++)
- ;
+ if (t->cb) {
+ for (i = 0; i < nslices && process_read(t, slices[i]); i++)
+ ;
+ }
unlock(t);
keep_reading = 1;
break;
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index 74e94b2c24..c80d67823f 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -569,3 +569,19 @@ void grpc_mdctx_locked_mdelem_unref(grpc_mdctx *ctx, grpc_mdelem *gmd) {
}
void grpc_mdctx_unlock(grpc_mdctx *ctx) { unlock(ctx); }
+
+int grpc_mdstr_is_legal_header(grpc_mdstr *s) {
+ /* TODO(ctiller): consider caching this, or computing it on construction */
+ const gpr_uint8 *p = GPR_SLICE_START_PTR(s->slice);
+ const gpr_uint8 *e = GPR_SLICE_END_PTR(s->slice);
+ for (; p != e; p++) {
+ if (*p < 32 || *p > 126) return 0;
+ }
+ return 1;
+}
+
+int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s) {
+ /* TODO(ctiller): consider caching this */
+ return grpc_is_binary_header((const char *)GPR_SLICE_START_PTR(s->slice),
+ GPR_SLICE_LENGTH(s->slice));
+}
diff --git a/src/core/transport/metadata.h b/src/core/transport/metadata.h
index 21b8ae2b78..e7508718f5 100644
--- a/src/core/transport/metadata.h
+++ b/src/core/transport/metadata.h
@@ -135,6 +135,9 @@ void grpc_mdelem_unref(grpc_mdelem *md);
Does not promise that the returned string has no embedded nulls however. */
const char *grpc_mdstr_as_c_string(grpc_mdstr *s);
+int grpc_mdstr_is_legal_header(grpc_mdstr *s);
+int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s);
+
/* Batch mode metadata functions.
These API's have equivalents above, but allow taking the mdctx just once,
performing a bunch of work, and then leaving the mdctx. */