aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/client_config/subchannel.c14
-rw-r--r--src/core/iomgr/iocp_windows.c44
-rw-r--r--src/core/iomgr/iocp_windows.h10
-rw-r--r--src/core/iomgr/pollset_set_windows.c6
-rw-r--r--src/core/iomgr/pollset_windows.c15
-rw-r--r--src/core/iomgr/resolve_address_windows.c6
-rw-r--r--src/core/iomgr/sockaddr_utils.c3
-rw-r--r--src/core/iomgr/socket_windows.h4
-rw-r--r--src/core/iomgr/tcp_client_windows.c40
-rw-r--r--src/core/iomgr/tcp_server_windows.c49
-rw-r--r--src/core/iomgr/tcp_windows.c80
-rw-r--r--src/core/security/security_connector.c8
-rw-r--r--src/core/surface/server.c6
-rw-r--r--src/core/transport/chttp2/frame_data.c2
-rw-r--r--src/core/transport/chttp2/frame_goaway.c2
15 files changed, 148 insertions, 141 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index c7dfba9a4b..d9dc5ae39d 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -258,7 +258,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
if (c->active != NULL) {
connection_destroy(exec_ctx, c->active);
}
- gpr_free(c->filters);
+ gpr_free((void*)c->filters);
grpc_channel_args_destroy(c->args);
gpr_free(c->addr);
grpc_mdctx_unref(c->mdctx);
@@ -294,7 +294,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_connector_ref(c->connector);
c->num_filters = args->filter_count;
c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
- memcpy(c->filters, args->filters,
+ memcpy((void*)c->filters, args->filters,
sizeof(grpc_channel_filter *) * c->num_filters);
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
@@ -531,8 +531,8 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
- memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
- memcpy(filters + c->num_filters, c->connecting_result.filters,
+ memcpy((void*)filters, c->filters, sizeof(*filters) * c->num_filters);
+ memcpy((void*)(filters + c->num_filters), c->connecting_result.filters,
sizeof(*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
@@ -545,7 +545,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
c->mdctx, stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
- gpr_free(c->connecting_result.filters);
+ gpr_free((void*)c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
@@ -559,7 +559,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
if (c->disconnected) {
gpr_mu_unlock(&c->mu);
gpr_free(sw);
- gpr_free(filters);
+ gpr_free((void*)filters);
grpc_channel_stack_destroy(exec_ctx, stk);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
@@ -601,7 +601,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
w4c = next;
}
- gpr_free(filters);
+ gpr_free((void*)filters);
if (destroy_connection != NULL) {
connection_destroy(exec_ctx, destroy_connection);
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 54048910a2..0378cf0c76 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -56,7 +56,7 @@ static gpr_atm g_custom_events = 0;
static HANDLE g_iocp;
-static void do_iocp_work() {
+static void do_iocp_work(grpc_exec_ctx *exec_ctx) {
BOOL success;
DWORD bytes = 0;
DWORD flags = 0;
@@ -64,8 +64,6 @@ static void do_iocp_work() {
LPOVERLAPPED overlapped;
grpc_winsocket *socket;
grpc_winsocket_callback_info *info;
- void (*f)(void *, int) = NULL;
- void *opaque = NULL;
success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key,
&overlapped, INFINITE);
/* success = 0 and overlapped = NULL means the deadline got attained.
@@ -98,22 +96,24 @@ static void do_iocp_work() {
GPR_ASSERT(overlapped == &info->overlapped);
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
- if (info->cb) {
- f = info->cb;
- opaque = info->opaque;
- info->cb = NULL;
+ if (info->closure) {
+ grpc_exec_ctx_enqueue(exec_ctx, info->closure, 1);
+ info->closure = NULL;
} else {
info->has_pending_iocp = 1;
}
gpr_mu_unlock(&socket->state_mu);
- if (f) f(opaque, 1);
}
static void iocp_loop(void *p) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
while (gpr_atm_acq_load(&g_custom_events) ||
!gpr_event_get(&g_shutdown_iocp)) {
- do_iocp_work();
+ do_iocp_work(&exec_ctx);
+ grpc_exec_ctx_flush(&exec_ctx);
}
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_event_set(&g_iocp_done, (void *)1);
}
@@ -168,31 +168,33 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
-) The IOCP already completed in the background, and we need to call
the callback now.
-) The IOCP hasn't completed yet, and we're queuing it for later. */
-static void socket_notify_on_iocp(grpc_winsocket *socket,
- void (*cb)(void *, int), void *opaque,
+static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *socket,
+ grpc_closure *closure,
grpc_winsocket_callback_info *info) {
int run_now = 0;
- GPR_ASSERT(!info->cb);
+ GPR_ASSERT(info->closure == NULL);
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
run_now = 1;
info->has_pending_iocp = 0;
+ grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
} else {
- info->cb = cb;
- info->opaque = opaque;
+ info->closure = closure;
}
gpr_mu_unlock(&socket->state_mu);
- if (run_now) cb(opaque, 1);
}
-void grpc_socket_notify_on_write(grpc_winsocket *socket,
- void (*cb)(void *, int), void *opaque) {
- socket_notify_on_iocp(socket, cb, opaque, &socket->write_info);
+void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *socket,
+ grpc_closure *closure) {
+ socket_notify_on_iocp(exec_ctx, socket, closure, &socket->write_info);
}
-void grpc_socket_notify_on_read(grpc_winsocket *socket, void (*cb)(void *, int),
- void *opaque) {
- socket_notify_on_iocp(socket, cb, opaque, &socket->read_info);
+void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *socket,
+ grpc_closure *closure) {
+ socket_notify_on_iocp(exec_ctx, socket, closure, &socket->read_info);
}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h
index 7d2dc45176..35c418f3b4 100644
--- a/src/core/iomgr/iocp_windows.h
+++ b/src/core/iomgr/iocp_windows.h
@@ -43,10 +43,12 @@ void grpc_iocp_kick(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
-void grpc_socket_notify_on_write(grpc_winsocket *,
- void (*cb)(void *, int success), void *opaque);
+void grpc_socket_notify_on_write(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *winsocket,
+ grpc_closure *closure);
-void grpc_socket_notify_on_read(grpc_winsocket *,
- void (*cb)(void *, int success), void *opaque);
+void grpc_socket_notify_on_read(grpc_exec_ctx *exec_ctx,
+ grpc_winsocket *winsocket,
+ grpc_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOCP_WINDOWS_H */
diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c
index c80dd592b0..75da6dbfa1 100644
--- a/src/core/iomgr/pollset_set_windows.c
+++ b/src/core/iomgr/pollset_set_windows.c
@@ -41,10 +41,12 @@ void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {}
void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
-void grpc_pollset_set_add_pollset(grpc_pollset_set* pollset_set,
+void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set* pollset_set,
grpc_pollset* pollset) {}
-void grpc_pollset_set_del_pollset(grpc_pollset_set* pollset_set,
+void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set* pollset_set,
grpc_pollset* pollset) {}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index e6791d4943..7707c3d502 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -85,29 +85,26 @@ void grpc_pollset_init(grpc_pollset *pollset) {
pollset->kicked_without_pollers = 0;
}
-void grpc_pollset_shutdown(grpc_pollset *pollset,
- void (*shutdown_done)(void *arg),
- void *shutdown_done_arg) {
+void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
gpr_mu_lock(&pollset->mu);
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
- shutdown_done(shutdown_done_arg);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
int added_worker = 0;
worker->next = worker->prev = NULL;
gpr_cv_init(&worker->cv);
- if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
- goto done;
- }
- if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
+ if (grpc_alarm_check(exec_ctx, now, &deadline)) {
goto done;
}
if (!pollset->kicked_without_pollers && !pollset->shutting_down) {
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index fb5fd0d4f6..fcd80b3912 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -128,6 +128,7 @@ done:
/* Thread function to asynch-ify grpc_blocking_resolve_address */
static void do_request(void *rp) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
request *r = rp;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
@@ -137,7 +138,8 @@ static void do_request(void *rp) {
gpr_free(r->default_port);
grpc_iomgr_unregister_object(&r->iomgr_object);
gpr_free(r);
- cb(arg, resolved);
+ cb(&exec_ctx, arg, resolved);
+ grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -149,7 +151,7 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
- const char *label;
+ char *label;
gpr_asprintf(&label, "resolve:%s", name);
grpc_iomgr_register_object(&r->iomgr_object, label);
gpr_free(label);
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c
index 0e4bf24549..df74fe2fe0 100644
--- a/src/core/iomgr/sockaddr_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -158,8 +158,9 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
}
+ /* Windows inet_ntop wants a mutable ip pointer */
if (ip != NULL &&
- inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) {
+ inet_ntop(addr->sa_family, (void*)ip, ntop_buf, sizeof(ntop_buf)) != NULL) {
ret = gpr_join_host_port(out, ntop_buf, port);
} else {
ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family);
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index 45bc657225..dfbfabe1f9 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -41,6 +41,7 @@
#include <grpc/support/atm.h>
#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/exec_ctx.h"
/* This holds the data for an outstanding read or write on a socket.
The mutex to protect the concurrent access to that data is the one
@@ -54,8 +55,7 @@ typedef struct grpc_winsocket_callback_info {
OVERLAPPED overlapped;
/* The callback information for the pending operation. May be empty if the
caller hasn't registered a callback yet. */
- void (*cb)(void *opaque, int success);
- void *opaque;
+ grpc_closure *closure;
/* A boolean to describe if the IO Completion Port got a notification for
that operation. This will happen if the operation completed before the
called had time to register a callback. We could avoid that behavior
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 28f1bc96c8..4dfab3b954 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -52,14 +52,15 @@
#include "src/core/iomgr/socket_windows.h"
typedef struct {
- void (*cb)(void *arg, grpc_endpoint *tcp);
- void *cb_arg;
+ grpc_closure *on_done;
gpr_mu mu;
grpc_winsocket *socket;
gpr_timespec deadline;
grpc_alarm alarm;
char *addr_name;
int refs;
+ grpc_closure on_connect;
+ grpc_endpoint **endpoint;
} async_connect;
static void async_connect_unlock_and_cleanup(async_connect *ac) {
@@ -73,7 +74,7 @@ static void async_connect_unlock_and_cleanup(async_connect *ac) {
}
}
-static void on_alarm(void *acp, int occured) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
/* If the alarm didn't occur, it got cancelled. */
@@ -83,15 +84,14 @@ static void on_alarm(void *acp, int occured) {
async_connect_unlock_and_cleanup(ac);
}
-static void on_connect(void *acp, int from_iocp) {
+static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, int from_iocp) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
- grpc_endpoint *ep = NULL;
+ grpc_endpoint **ep = ac->endpoint;
grpc_winsocket_callback_info *info = &ac->socket->write_info;
- void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
- void *cb_arg = ac->cb_arg;
+ grpc_closure *on_done = ac->on_done;
- grpc_alarm_cancel(&ac->alarm);
+ grpc_alarm_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
@@ -106,7 +106,7 @@ static void on_connect(void *acp, int from_iocp) {
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message);
} else {
- ep = grpc_tcp_create(ac->socket, ac->addr_name);
+ *ep = grpc_tcp_create(ac->socket, ac->addr_name);
ac->socket = NULL;
}
}
@@ -114,14 +114,15 @@ static void on_connect(void *acp, int from_iocp) {
async_connect_unlock_and_cleanup(ac);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
- cb(cb_arg, ep);
+ on_done->cb(exec_ctx, on_done->cb_arg, *ep != NULL);
}
/* Tries to issue one async connection, then schedules both an IOCP
notification request for the connection, and one timeout alert. */
-void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
- void *arg, grpc_pollset_set *interested_parties,
- const struct sockaddr *addr, int addr_len,
+void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
+ grpc_endpoint **endpoint,
+ grpc_pollset_set *interested_parties,
+ const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
@@ -137,6 +138,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
char *utf8_message;
grpc_winsocket_callback_info *info;
+ *endpoint = NULL;
+
/* Use dualstack sockets where available. */
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = (const struct sockaddr *)&addr6_v4mapped;
@@ -189,16 +192,17 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
}
ac = gpr_malloc(sizeof(async_connect));
- ac->cb = cb;
- ac->cb_arg = arg;
+ ac->on_done = on_done;
ac->socket = socket;
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr);
+ ac->endpoint = endpoint;
+ grpc_closure_init(&ac->on_connect, on_connect, ac);
- grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
+ grpc_alarm_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac,
gpr_now(GPR_CLOCK_MONOTONIC));
- grpc_socket_notify_on_write(socket, on_connect, ac);
+ grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect);
return;
failure:
@@ -210,7 +214,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- cb(arg, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, on_done, 0);
}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index be2c0055db..9881a41152 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -67,6 +67,8 @@ typedef struct server_port {
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
+ /* closure for socket notification of accept being ready */
+ grpc_closure on_accept;
} server_port;
/* the overall server */
@@ -86,8 +88,7 @@ struct grpc_tcp_server {
size_t port_capacity;
/* shutdown callback */
- void (*shutdown_complete)(void *);
- void *shutdown_complete_arg;
+ grpc_closure *shutdown_complete;
};
/* Public function. Allocates the proper data structures to hold a
@@ -107,10 +108,10 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
static void dont_care_about_shutdown_completion(void *arg) {}
-static void finish_shutdown(grpc_tcp_server *s) {
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
size_t i;
- s->shutdown_complete(s->shutdown_complete_arg);
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
@@ -124,17 +125,13 @@ static void finish_shutdown(grpc_tcp_server *s) {
}
/* Public function. Stops and destroys a grpc_tcp_server. */
-void grpc_tcp_server_destroy(grpc_tcp_server *s,
- void (*shutdown_complete)(void *shutdown_done_arg),
- void *shutdown_complete_arg) {
+void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
+ grpc_closure *shutdown_complete) {
size_t i;
int immediately_done = 0;
gpr_mu_lock(&s->mu);
- s->shutdown_complete = shutdown_complete
- ? shutdown_complete
- : dont_care_about_shutdown_completion;
- s->shutdown_complete_arg = shutdown_complete_arg;
+ s->shutdown_complete = shutdown_complete;
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */
@@ -149,7 +146,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s,
gpr_mu_unlock(&s->mu);
if (immediately_done) {
- finish_shutdown(s);
+ finish_shutdown(exec_ctx, s);
}
}
@@ -201,7 +198,7 @@ error:
return -1;
}
-static void decrement_active_ports_and_notify(server_port *sp) {
+static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, server_port *sp) {
int notify = 0;
sp->shutting_down = 0;
gpr_mu_lock(&sp->server->mu);
@@ -212,16 +209,13 @@ static void decrement_active_ports_and_notify(server_port *sp) {
}
gpr_mu_unlock(&sp->server->mu);
if (notify) {
- finish_shutdown(sp->server);
+ finish_shutdown(exec_ctx, sp->server);
}
}
-/* start_accept will reference that for the IOCP notification request. */
-static void on_accept(void *arg, int from_iocp);
-
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection. */
-static void start_accept(server_port *port) {
+static void start_accept(grpc_exec_ctx *exec_ctx, server_port *port) {
SOCKET sock = INVALID_SOCKET;
char *message;
char *utf8_message;
@@ -260,7 +254,7 @@ static void start_accept(server_port *port) {
/* We're ready to do the accept. Calling grpc_socket_notify_on_read may
immediately process an accept that happened in the meantime. */
port->new_socket = sock;
- grpc_socket_notify_on_read(port->socket, on_accept, port);
+ grpc_socket_notify_on_read(exec_ctx, port->socket, &port->on_accept);
return;
failure:
@@ -270,7 +264,7 @@ failure:
change is not seen by on_accept and we proceed to trying new accept,
but we fail there because the listening port has been closed in the
meantime. */
- decrement_active_ports_and_notify(port);
+ decrement_active_ports_and_notify(exec_ctx, port);
return;
}
utf8_message = gpr_format_message(WSAGetLastError());
@@ -280,7 +274,7 @@ failure:
}
/* Event manager callback when reads are ready. */
-static void on_accept(void *arg, int from_iocp) {
+static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
server_port *sp = arg;
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
@@ -310,7 +304,7 @@ static void on_accept(void *arg, int from_iocp) {
if (sp->shutting_down) {
/* During the shutdown case, we ARE expecting an error. So that's well,
and we can wake up the shutdown thread. */
- decrement_active_ports_and_notify(sp);
+ decrement_active_ports_and_notify(exec_ctx, sp);
return;
} else {
char *utf8_message = gpr_format_message(WSAGetLastError());
@@ -346,12 +340,12 @@ static void on_accept(void *arg, int from_iocp) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
- if (ep) sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep);
+ if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep);
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
connection. */
- start_accept(sp);
+ start_accept(exec_ctx, sp);
}
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
@@ -402,7 +396,7 @@ static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
}
int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
- int addr_len) {
+ size_t addr_len) {
int allocated_port = -1;
unsigned i;
SOCKET sock;
@@ -464,7 +458,8 @@ grpc_tcp_server_get_socket(grpc_tcp_server *s, unsigned index) {
return (index < s->nports) ? s->ports[index].socket->socket : INVALID_SOCKET;
}
-void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset,
+void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
+ grpc_pollset **pollset,
size_t pollset_count,
grpc_tcp_server_cb on_accept_cb,
void *on_accept_cb_arg) {
@@ -476,7 +471,7 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset,
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
for (i = 0; i < s->nports; i++) {
- start_accept(s->ports + i);
+ start_accept(exec_ctx, s->ports + i);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index ab309180eb..4a531b8546 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -82,6 +82,9 @@ typedef struct grpc_tcp {
/* Refcounting how many operations are in progress. */
gpr_refcount refcount;
+ grpc_closure on_read;
+ grpc_closure on_write;
+
grpc_closure *read_cb;
grpc_closure *write_cb;
gpr_slice read_slice;
@@ -135,7 +138,9 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */
-static int on_read(grpc_tcp *tcp, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, int success) {
+ grpc_tcp *tcp = tcpp;
+ grpc_closure *cb = tcp->read_cb;
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
gpr_slice *slice = NULL;
@@ -164,23 +169,17 @@ static int on_read(grpc_tcp *tcp, int success) {
}
}
- return success;
-}
-
-static void on_read_cb(void *tcpp, int from_iocp) {
- grpc_tcp *tcp = tcpp;
- grpc_closure *cb = tcp->read_cb;
- int success = on_read(tcp, from_iocp);
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
if (cb) {
- cb->cb(cb->cb_arg, success);
+ cb->cb(exec_ctx, cb->cb_arg, success);
}
}
-static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
- gpr_slice_buffer *read_slices,
- grpc_closure *cb) {
+static void win_read(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep,
+ gpr_slice_buffer *read_slices,
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info;
@@ -190,7 +189,8 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- return GRPC_ENDPOINT_ERROR;
+ grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ return;
}
tcp->read_cb = cb;
@@ -202,6 +202,8 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
buffer.len = GPR_SLICE_LENGTH(tcp->read_slice);
buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
+ TCP_REF(tcp, "read");
+
/* First let's try a synchronous, non-blocking read. */
status =
WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL);
@@ -209,14 +211,11 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
- int ok;
info->bytes_transfered = bytes_read;
- ok = on_read(tcp, 1);
- return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 1);
+ return;
}
- TCP_REF(tcp, "read");
-
/* Otherwise, let's retry, by queuing a read. */
memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
@@ -225,19 +224,17 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
- int ok;
info->wsa_error = wsa_error;
- ok = on_read(tcp, 1);
- return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 0);
+ return;
}
}
- grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp);
- return GRPC_ENDPOINT_PENDING;
+ grpc_socket_notify_on_read(exec_ctx, tcp->socket, &tcp->on_read);
}
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_write(void *tcpp, int success) {
+static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, int success) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
@@ -263,13 +260,14 @@ static void on_write(void *tcpp, int success) {
}
TCP_UNREF(tcp, "write");
- cb->cb(cb->cb_arg, success);
+ cb->cb(exec_ctx, cb->cb_arg, success);
}
/* Initiates a write. */
-static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
- gpr_slice_buffer *slices,
- grpc_closure *cb) {
+static void win_write(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep,
+ gpr_slice_buffer *slices,
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info;
@@ -281,7 +279,8 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
WSABUF *buffers = local_buffers;
if (tcp->shutting_down) {
- return GRPC_ENDPOINT_ERROR;
+ grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ return;
}
tcp->write_cb = cb;
@@ -306,9 +305,9 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) {
- grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR;
+ int ok = 0;
if (status == 0) {
- ret = GRPC_ENDPOINT_DONE;
+ ok = 1;
GPR_ASSERT(bytes_sent == tcp->write_slices->length);
} else {
if (socket->read_info.wsa_error != WSAECONNRESET) {
@@ -318,7 +317,8 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
}
}
if (allocated) gpr_free(allocated);
- return ret;
+ grpc_exec_ctx_enqueue(exec_ctx, cb, ok);
+ return;
}
TCP_REF(tcp, "write");
@@ -334,24 +334,24 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(tcp, "write");
- return GRPC_ENDPOINT_ERROR;
+ grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ return;
}
}
/* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */
- grpc_socket_notify_on_write(socket, on_write, tcp);
- return GRPC_ENDPOINT_PENDING;
+ grpc_socket_notify_on_write(exec_ctx, socket, &tcp->on_write);
}
-static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) {
+static void win_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *ps) {
grpc_tcp *tcp;
(void)ps;
tcp = (grpc_tcp *)ep;
grpc_iocp_add_socket(tcp->socket);
}
-static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) {
+static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pss) {
grpc_tcp *tcp;
(void)pss;
tcp = (grpc_tcp *)ep;
@@ -364,7 +364,7 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) {
we're not going to protect against these. However the IO Completion Port
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
-static void win_shutdown(grpc_endpoint *ep) {
+static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
@@ -374,7 +374,7 @@ static void win_shutdown(grpc_endpoint *ep) {
gpr_mu_unlock(&tcp->mu);
}
-static void win_destroy(grpc_endpoint *ep) {
+static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
TCP_UNREF(tcp, "destroy");
}
@@ -395,6 +395,8 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_ref_init(&tcp->refcount, 1);
+ grpc_closure_init(&tcp->on_read, on_read, tcp);
+ grpc_closure_init(&tcp->on_read, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);
return &tcp->base;
}
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index ec5b1c86e5..ea07d49acb 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -648,12 +648,12 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
goto error;
}
*sc = &c->base;
- gpr_free(alpn_protocol_strings);
+ gpr_free((void*)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);
return GRPC_SECURITY_OK;
error:
- gpr_free(alpn_protocol_strings);
+ gpr_free((void*)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);
return GRPC_SECURITY_ERROR;
}
@@ -703,12 +703,12 @@ grpc_security_status grpc_ssl_server_security_connector_create(
goto error;
}
*sc = &c->base;
- gpr_free(alpn_protocol_strings);
+ gpr_free((void*)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);
return GRPC_SECURITY_OK;
error:
- gpr_free(alpn_protocol_strings);
+ gpr_free((void*)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);
return GRPC_SECURITY_ERROR;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index a26b84c0d3..28d0e8938d 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -183,7 +183,7 @@ typedef struct {
struct grpc_server {
size_t channel_filter_count;
- const grpc_channel_filter **channel_filters;
+ grpc_channel_filter const**channel_filters;
grpc_channel_args *channel_args;
grpc_completion_queue **cqs;
@@ -356,7 +356,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
- gpr_free(server->channel_filters);
+ gpr_free((void*)server->channel_filters);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
request_matcher_destroy(&rm->request_matcher);
@@ -988,7 +988,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
chand->next->prev = chand->prev->next = chand;
gpr_mu_unlock(&s->mu_global);
- gpr_free(filters);
+ gpr_free((void*)filters);
GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
memset(&op, 0, sizeof(op));
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 99414523d5..acfa7c002e 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -162,7 +162,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_sopb_add_slice(
&p->incoming_sopb,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
- GPR_ASSERT(end - cur <= p->frame_size);
+ GPR_ASSERT((size_t)(end - cur) <= p->frame_size);
p->frame_size -= (gpr_uint32)(end - cur);
return GRPC_CHTTP2_PARSE_OK;
}
diff --git a/src/core/transport/chttp2/frame_goaway.c b/src/core/transport/chttp2/frame_goaway.c
index d2065ce79d..2ff1eda89b 100644
--- a/src/core/transport/chttp2/frame_goaway.c
+++ b/src/core/transport/chttp2/frame_goaway.c
@@ -138,7 +138,7 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse(
/* fallthrough */
case GRPC_CHTTP2_GOAWAY_DEBUG:
memcpy(p->debug_data + p->debug_pos, cur, (size_t)(end - cur));
- GPR_ASSERT(end - cur < GPR_UINT32_MAX - p->debug_pos);
+ GPR_ASSERT((size_t)(end - cur) < GPR_UINT32_MAX - p->debug_pos);
p->debug_pos += (gpr_uint32)(end - cur);
p->state = GRPC_CHTTP2_GOAWAY_DEBUG;
if (is_last) {