aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-01-28 11:38:21 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-01-28 11:38:21 -0800
commit8c50db09b9fc937a16ace5b0783980b4a8ffc0ec (patch)
tree8fddf70188f7617afa2c54c5d187f8f56aa22ff5 /src/core
parentfdb603f5985b3ae604e1799b5bdbf52cda6bfd0a (diff)
parentddd91dbb42f6f52655738c6520548d04ea5f8468 (diff)
Merge github.com:grpc/grpc into sceq
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/initialize.c4
-rw-r--r--src/core/channel/channel_args.c5
-rw-r--r--src/core/client_config/lb_policies/pick_first.c4
-rw-r--r--src/core/client_config/subchannel.c21
-rw-r--r--src/core/iomgr/iocp_windows.c6
-rw-r--r--src/core/iomgr/pollset_windows.c11
-rw-r--r--src/core/iomgr/sockaddr_win32.h7
-rw-r--r--src/core/iomgr/tcp_server.h61
-rw-r--r--src/core/iomgr/tcp_server_posix.c148
-rw-r--r--src/core/iomgr/tcp_server_windows.c143
-rw-r--r--src/core/iomgr/tcp_windows.c6
-rw-r--r--src/core/iomgr/udp_server.c12
-rw-r--r--src/core/iomgr/udp_server.h10
-rw-r--r--src/core/security/server_secure_chttp2.c29
-rw-r--r--src/core/support/env_win32.c22
-rw-r--r--src/core/support/log_win32.c6
-rw-r--r--src/core/support/string_win32.c10
-rw-r--r--src/core/support/sync_win32.c8
-rw-r--r--src/core/support/time_win32.c21
-rw-r--r--src/core/surface/init.c6
-rw-r--r--src/core/surface/server.c6
-rw-r--r--src/core/surface/server_chttp2.c16
-rw-r--r--src/core/surface/server_create.c5
-rw-r--r--src/core/surface/version.c4
24 files changed, 343 insertions, 228 deletions
diff --git a/src/core/census/initialize.c b/src/core/census/initialize.c
index b7af714e0b..ce7ec09b89 100644
--- a/src/core/census/initialize.c
+++ b/src/core/census/initialize.c
@@ -37,9 +37,7 @@ static int features_enabled = CENSUS_FEATURE_NONE;
int census_initialize(int features) {
if (features_enabled != CENSUS_FEATURE_NONE) {
- return 1;
- }
- if (features == CENSUS_FEATURE_NONE) {
+ // Must have been a previous call to census_initialize; return error
return 1;
}
features_enabled = features;
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 63e440f97b..055b020652 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -35,6 +35,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/support/string.h"
+#include <grpc/census.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
@@ -168,10 +169,10 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
if (a == NULL) return 0;
for (i = 0; i < a->num_args; i++) {
if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
- return a->args[i].value.integer != 0;
+ return a->args[i].value.integer != 0 && census_enabled();
}
}
- return 0;
+ return census_enabled();
}
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index e6ddb1a11f..5b10600ab5 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,7 +76,7 @@ typedef struct {
} pick_first_lb_policy;
#define GET_SELECTED(p) \
- ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
+ ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -268,10 +268,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
GPR_ASSERT(selected != NULL);
- gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
+ gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
grpc_exec_ctx_enqueue(exec_ctx,
grpc_closure_create(destroy_subchannels, p), 1);
/* update any calls that were waiting for a pick */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 145e146862..ef58fc0b24 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -317,9 +317,13 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
c->connector = connector;
grpc_connector_ref(c->connector);
c->num_filters = args->filter_count;
- c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
- memcpy((void *)c->filters, args->filters,
- sizeof(grpc_channel_filter *) * c->num_filters);
+ if (c->num_filters > 0) {
+ c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
+ memcpy((void *)c->filters, args->filters,
+ sizeof(grpc_channel_filter *) * c->num_filters);
+ } else {
+ c->filters = NULL;
+ }
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
grpc_pollset_set_init(&c->pollset_set);
@@ -517,7 +521,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
- memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
+ if (c->num_filters > 0) {
+ memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
+ }
memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
sizeof(*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
@@ -553,7 +559,12 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
}
/* publish */
- GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
+ I'd have expected the rel_cas below to be enough, but
+ seemingly it's not.
+ Re-evaluate if we really need this. */
+ gpr_atm_full_barrier();
+ GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel ref
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index d3868ce62c..6cbe7d2fd4 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -67,7 +67,7 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline,
return 0;
}
timeout = gpr_time_sub(deadline, now);
- return gpr_time_to_millis(gpr_time_add(
+ return (DWORD)gpr_time_to_millis(gpr_time_add(
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
@@ -179,11 +179,9 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket, grpc_closure *closure,
grpc_winsocket_callback_info *info) {
- int run_now = 0;
GPR_ASSERT(info->closure == NULL);
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
- run_now = 1;
info->has_pending_iocp = 0;
grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
} else {
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index deb661548d..bfd9e69a16 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -81,15 +81,6 @@ static grpc_pollset_worker *pop_front_worker(
}
}
-static void push_back_worker(grpc_pollset_worker *root,
- grpc_pollset_worker_link_type type,
- grpc_pollset_worker *worker) {
- worker->links[type].next = root;
- worker->links[type].prev = worker->links[type].next->links[type].prev;
- worker->links[type].prev->links[type].next =
- worker->links[type].next->links[type].prev = worker;
-}
-
static void push_front_worker(grpc_pollset_worker *root,
grpc_pollset_worker_link_type type,
grpc_pollset_worker *worker) {
diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h
index fe2be99145..8e3946a7d8 100644
--- a/src/core/iomgr/sockaddr_win32.h
+++ b/src/core/iomgr/sockaddr_win32.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,9 +38,4 @@
#include <ws2tcpip.h>
#include <mswsock.h>
-#ifdef __MINGW32__
-/* mingw seems to be missing that definition. */
-const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);
-#endif
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_WIN32_H */
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 3294e13797..8f3184ff1e 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,28 +34,39 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H
#define GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H
+#include "src/core/iomgr/closure.h"
#include "src/core/iomgr/endpoint.h"
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;
-/* Forward decl of grpc_tcp_listener */
-typedef struct grpc_tcp_listener grpc_tcp_listener;
+typedef struct grpc_tcp_server_acceptor grpc_tcp_server_acceptor;
+struct grpc_tcp_server_acceptor {
+ /* grpc_tcp_server_cb functions share a ref on from_server that is valid
+ until the function returns. */
+ grpc_tcp_server *from_server;
+ /* Indices that may be passed to grpc_tcp_server_port_fd(). */
+ unsigned port_index;
+ unsigned fd_index;
+};
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_endpoint *ep);
+ grpc_endpoint *ep,
+ grpc_tcp_server_acceptor *acceptor);
-/* Create a server, initially not bound to any ports */
-grpc_tcp_server *grpc_tcp_server_create(void);
+/* Create a server, initially not bound to any ports. The caller owns one ref.
+ If shutdown_complete is not NULL, it will be used by
+ grpc_tcp_server_unref(). */
+grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete);
/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, void *cb_arg);
-/* Add a port to the server, returning the newly created listener on success,
- or a null pointer on failure.
+/* Add a port to the server, returning the newly allocated port on success, or
+ -1 on failure.
The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
both IPv4 and IPv6 connections, but :: is the preferred style. This usually
@@ -63,21 +74,31 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
-grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
- const void *addr, size_t addr_len);
+int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len);
-/* Returns the file descriptor of the Nth listening socket on this server,
- or -1 if the index is out of bounds.
+/* Number of fds at the given port_index, or 0 if port_index is out of
+ bounds. */
+unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, unsigned port_index);
- The file descriptor remains owned by the server, and will be cleaned
- up when grpc_tcp_server_destroy is called. */
-int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
+/* Returns the file descriptor of the Mth (fd_index) listening socket of the Nth
+ (port_index) call to add_port() on this server, or -1 if the indices are out
+ of bounds. The file descriptor remains owned by the server, and will be
+ cleaned up when grpc_tcp_server_destroy is called. */
+int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
+ unsigned fd_index);
-void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
- grpc_closure *closure);
+/* Ref s and return s. */
+grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s);
-int grpc_tcp_listener_get_port(grpc_tcp_listener *listener);
-void grpc_tcp_listener_ref(grpc_tcp_listener *listener);
-void grpc_tcp_listener_unref(grpc_tcp_listener *listener);
+/* shutdown_starting is called when ref count has reached zero and the server is
+ about to be destroyed. The server will be deleted after it returns. Calling
+ grpc_tcp_server_ref() from it has no effect. */
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
+ grpc_closure *shutdown_starting);
+
+/* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue
+ a call (exec_ctx!=NULL) to shutdown_complete. */
+void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 49e83cf6ae..adf14aeb59 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -73,6 +73,7 @@ static gpr_once s_init_max_accept_queue_size;
static int s_max_accept_queue_size;
/* one listening port */
+typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener {
int fd;
grpc_fd *emfd;
@@ -84,9 +85,10 @@ struct grpc_tcp_listener {
} addr;
size_t addr_len;
int port;
+ unsigned port_index;
+ unsigned fd_index;
grpc_closure read_closure;
grpc_closure destroyed_closure;
- gpr_refcount refs;
struct grpc_tcp_listener *next;
/* When we add a listener, more than one can be created, mainly because of
IPv6. A sibling will still be in the normal list, but will be flagged
@@ -106,6 +108,7 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
/* the overall server */
struct grpc_tcp_server {
+ gpr_refcount refs;
/* Called whenever accept() succeeds on a server port. */
grpc_tcp_server_cb on_accept_cb;
void *on_accept_cb_arg;
@@ -122,8 +125,12 @@ struct grpc_tcp_server {
/* linked list of server ports */
grpc_tcp_listener *head;
+ grpc_tcp_listener *tail;
unsigned nports;
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
+
/* shutdown callback */
grpc_closure *shutdown_complete;
@@ -133,28 +140,35 @@ struct grpc_tcp_server {
size_t pollset_count;
};
-grpc_tcp_server *grpc_tcp_server_create(void) {
+grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
+ s->shutdown_starting.head = NULL;
+ s->shutdown_starting.tail = NULL;
+ s->shutdown_complete = shutdown_complete;
s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL;
s->head = NULL;
+ s->tail = NULL;
s->nports = 0;
return s;
}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ }
gpr_mu_destroy(&s->mu);
while (s->head) {
grpc_tcp_listener *sp = s->head;
s->head = sp->next;
- grpc_tcp_listener_unref(sp);
+ gpr_free(sp);
}
gpr_free(s);
@@ -203,15 +217,12 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}
-void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
- grpc_closure *closure) {
+static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
s->shutdown = 1;
- s->shutdown_complete = closure;
-
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener *sp;
@@ -308,6 +319,8 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
grpc_tcp_listener *sp = arg;
+ grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
+ sp->fd_index};
grpc_fd *fdobj;
size_t i;
@@ -355,7 +368,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
}
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
+ grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
+ &acceptor);
gpr_free(name);
gpr_free(addr_str);
@@ -375,7 +389,9 @@ error:
static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr,
- size_t addr_len) {
+ size_t addr_len,
+ unsigned port_index,
+ unsigned fd_index) {
grpc_tcp_listener *sp = NULL;
int port;
char *addr_str;
@@ -389,17 +405,23 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
s->nports++;
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
sp = gpr_malloc(sizeof(grpc_tcp_listener));
- sp->next = s->head;
- s->head = sp;
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->port = port;
+ sp->port_index = port_index;
+ sp->fd_index = fd_index;
sp->is_sibling = 0;
sp->sibling = NULL;
- gpr_ref_init(&sp->refs, 1);
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(addr_str);
@@ -409,8 +431,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
return sp;
}
-grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
- const void *addr, size_t addr_len) {
+int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len) {
grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL;
int fd;
@@ -423,7 +445,11 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
int port;
-
+ unsigned port_index = 0;
+ unsigned fd_index = 0;
+ if (s->tail != NULL) {
+ port_index = s->tail->port_index + 1;
+ }
if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
unlink_if_unix_domain_socket(addr);
}
@@ -462,11 +488,13 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
- sp = add_socket_to_server(s, fd, addr, addr_len);
+ sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
-
+ if (sp != NULL) {
+ ++fd_index;
+ }
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && sp != NULL) {
grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
@@ -485,20 +513,46 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
- sp = add_socket_to_server(s, fd, addr, addr_len);
- if (sp != NULL) sp->sibling = sp2;
- if (sp2 != NULL) sp2->is_sibling = 1;
+ sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
+ if (sp2 != NULL && sp != NULL) {
+ sp2->sibling = sp;
+ sp->is_sibling = 1;
+ }
done:
gpr_free(allocated_addr);
- return sp;
+ if (sp != NULL) {
+ return sp->port;
+ } else {
+ return -1;
+ }
}
-int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
+unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s,
+ unsigned port_index) {
+ unsigned num_fds = 0;
grpc_tcp_listener *sp;
- for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--)
+ for (sp = s->head; sp && port_index != 0; sp = sp->next) {
+ if (!sp->is_sibling) {
+ --port_index;
+ }
+ }
+ for (; sp; sp = sp->sibling, ++num_fds)
+ ;
+ return num_fds;
+}
+
+int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
+ unsigned fd_index) {
+ grpc_tcp_listener *sp;
+ for (sp = s->head; sp && port_index != 0; sp = sp->next) {
+ if (!sp->is_sibling) {
+ --port_index;
+ }
+ }
+ for (; sp && fd_index != 0; sp = sp->sibling, --fd_index)
;
- if (port_index == 0 && sp) {
+ if (sp) {
return sp->fd;
} else {
return -1;
@@ -531,31 +585,33 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
gpr_mu_unlock(&s->mu);
}
-int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
- if (listener != NULL) {
- grpc_tcp_listener *sp = listener;
- return sp->port;
- } else {
- return 0;
- }
+grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
+ gpr_ref(&s->refs);
+ return s;
}
-void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
- grpc_tcp_listener *sp = listener;
- gpr_ref(&sp->refs);
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
+ grpc_closure *shutdown_starting) {
+ gpr_mu_lock(&s->mu);
+ grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
+ gpr_mu_unlock(&s->mu);
}
-void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
- grpc_tcp_listener *sp = listener;
- if (sp->is_sibling) return;
- if (gpr_unref(&sp->refs)) {
- grpc_tcp_listener *sibling = sp->sibling;
- while (sibling) {
- sp = sibling;
- sibling = sp->sibling;
- gpr_free(sp);
+void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ if (gpr_unref(&s->refs)) {
+ /* Complete shutdown_starting work before destroying. */
+ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_lock(&s->mu);
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
+ gpr_mu_unlock(&s->mu);
+ if (exec_ctx == NULL) {
+ grpc_exec_ctx_flush(&local_exec_ctx);
+ tcp_server_destroy(&local_exec_ctx, s);
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ } else {
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ tcp_server_destroy(exec_ctx, s);
}
- gpr_free(listener);
}
}
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index d38fd8860a..00d381f264 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -55,6 +55,7 @@
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
/* one listening port */
+typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener {
/* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */
@@ -65,19 +66,20 @@ struct grpc_tcp_listener {
grpc_winsocket *socket;
/* The actual TCP port number. */
int port;
+ unsigned port_index;
grpc_tcp_server *server;
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
/* closure for socket notification of accept being ready */
grpc_closure on_accept;
- gpr_refcount refs;
/* linked list */
struct grpc_tcp_listener *next;
};
/* the overall server */
struct grpc_tcp_server {
+ gpr_refcount refs;
/* Called whenever accept() succeeds on a server port. */
grpc_tcp_server_cb on_accept_cb;
void *on_accept_cb_arg;
@@ -89,6 +91,10 @@ struct grpc_tcp_server {
/* linked list of server ports */
grpc_tcp_listener *head;
+ grpc_tcp_listener *tail;
+
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
/* shutdown callback */
grpc_closure *shutdown_complete;
@@ -96,21 +102,25 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
-grpc_tcp_server *grpc_tcp_server_create(void) {
+grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL;
s->head = NULL;
- s->shutdown_complete = NULL;
+ s->tail = NULL;
+ s->shutdown_starting.head = NULL;
+ s->shutdown_starting.tail = NULL;
+ s->shutdown_complete = shutdown_complete;
return s;
}
-static void dont_care_about_shutdown_completion(void *arg) {}
-
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ }
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
@@ -120,20 +130,28 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
s->head = sp->next;
sp->next = NULL;
grpc_winsocket_destroy(sp->socket);
- grpc_tcp_listener_unref(sp);
+ gpr_free(sp);
}
gpr_free(s);
}
-/* Public function. Stops and destroys a grpc_tcp_server. */
-void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
- grpc_closure *shutdown_complete) {
+grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
+ gpr_ref(&s->refs);
+ return s;
+}
+
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
+ grpc_closure *shutdown_starting) {
+ gpr_mu_lock(&s->mu);
+ grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
+ gpr_mu_unlock(&s->mu);
+}
+
+static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
int immediately_done = 0;
grpc_tcp_listener *sp;
gpr_mu_lock(&s->mu);
- s->shutdown_complete = shutdown_complete;
-
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) {
@@ -150,6 +168,24 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
}
}
+void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ if (gpr_unref(&s->refs)) {
+ /* Complete shutdown_starting work before destroying. */
+ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_lock(&s->mu);
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
+ gpr_mu_unlock(&s->mu);
+ if (exec_ctx == NULL) {
+ grpc_exec_ctx_flush(&local_exec_ctx);
+ tcp_server_destroy(&local_exec_ctx, s);
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ } else {
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ tcp_server_destroy(exec_ctx, s);
+ }
+ }
+}
+
/* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
size_t addr_len) {
@@ -277,6 +313,7 @@ failure:
/* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
grpc_tcp_listener *sp = arg;
+ grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
@@ -343,7 +380,9 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
- if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep);
+ if (ep)
+ sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep,
+ &acceptor);
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
@@ -353,7 +392,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
const struct sockaddr *addr,
- size_t addr_len) {
+ size_t addr_len,
+ unsigned port_index) {
grpc_tcp_listener *sp = NULL;
int port;
int status;
@@ -382,15 +422,20 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
sp = gpr_malloc(sizeof(grpc_tcp_listener));
- sp->next = s->head;
- s->head = sp;
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
sp->port = port;
- gpr_ref_init(&sp->refs, 1);
+ sp->port_index = port_index;
grpc_closure_init(&sp->on_accept, on_accept, sp);
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
@@ -399,8 +444,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
return sp;
}
-grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
- const void *addr, size_t addr_len) {
+int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len) {
grpc_tcp_listener *sp;
SOCKET sock;
struct sockaddr_in6 addr6_v4mapped;
@@ -409,6 +454,10 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
int port;
+ unsigned port_index = 0;
+ if (s->tail != NULL) {
+ port_index = s->tail->port_index + 1;
+ }
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
@@ -450,18 +499,39 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
gpr_free(utf8_message);
}
- sp = add_socket_to_server(s, sock, addr, addr_len);
+ sp = add_socket_to_server(s, sock, addr, addr_len, port_index);
gpr_free(allocated_addr);
- return sp;
+ if (sp) {
+ return sp->port;
+ } else {
+ return -1;
+ }
}
-int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
+unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s,
+ unsigned port_index) {
grpc_tcp_listener *sp;
- for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--)
+ for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
;
- if (port_index == 0 && sp) {
- return _open_osfhandle(sp->socket->socket, 0);
+ if (sp) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
+ unsigned fd_index) {
+ grpc_tcp_listener *sp;
+ if (fd_index != 0) {
+ /* Windows implementation has only one fd per port_index. */
+ return -1;
+ }
+ for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
+ ;
+ if (sp) {
+ return _open_osfhandle((intptr_t)sp->socket->socket, 0);
} else {
return -1;
}
@@ -485,25 +555,4 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
gpr_mu_unlock(&s->mu);
}
-int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
- if (listener != NULL) {
- grpc_tcp_listener *sp = listener;
- return sp->port;
- } else {
- return 0;
- }
-}
-
-void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
- grpc_tcp_listener *sp = listener;
- gpr_ref(&sp->refs);
-}
-
-void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
- grpc_tcp_listener *sp = listener;
- if (gpr_unref(&sp->refs)) {
- gpr_free(listener);
- }
-}
-
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index cc7f7ff8d2..d3f080cbf9 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -143,10 +143,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, int success) {
grpc_closure *cb = tcp->read_cb;
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
- gpr_slice *slice = NULL;
- size_t nslices = 0;
grpc_winsocket_callback_info *info = &socket->read_info;
- int do_abort = 0;
if (success) {
if (socket->read_info.wsa_error != 0 && !tcp->shutting_down) {
@@ -238,7 +235,6 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, int success) {
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_closure *cb;
- int do_abort = 0;
gpr_mu_lock(&tcp->mu);
cb = tcp->write_cb;
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index a1a6b04cad..fe006c603c 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -425,15 +425,5 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
gpr_mu_unlock(&s->mu);
}
-/* TODO(rjshade): Add a test for this method. */
-void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len,
- const struct sockaddr *peer_address) {
- ssize_t rc;
- rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address));
- if (rc < 0) {
- gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno));
- }
-}
-
#endif
#endif
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index de5736c426..73a21c80ab 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -72,12 +72,4 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
grpc_closure *on_done);
-/* Write the contents of buffer to the underlying UDP socket. */
-/*
-void grpc_udp_server_write(grpc_udp_server *s,
- const char *buffer,
- int buf_len,
- const struct sockaddr* to);
- */
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H */
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index d7fad33854..08713fceaf 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -126,8 +126,8 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
state_unref(state);
}
-static void on_accept(grpc_exec_ctx *exec_ctx, void *statep,
- grpc_endpoint *tcp) {
+static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
+ grpc_tcp_server_acceptor *acceptor) {
grpc_server_secure_state *state = statep;
state_ref(state);
grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp,
@@ -144,8 +144,10 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) {
grpc_server_secure_state *state = statep;
- state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
- success);
+ if (state->destroy_callback != NULL) {
+ state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
+ success);
+ }
grpc_security_connector_shutdown(exec_ctx, state->sc);
state_unref(state);
}
@@ -161,8 +163,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
state->destroy_callback = callback;
tcp = state->tcp;
gpr_mu_unlock(&state->mu);
- grpc_closure_init(&state->destroy_closure, destroy_done, state);
- grpc_tcp_server_destroy(exec_ctx, tcp, &state->destroy_closure);
+ grpc_tcp_server_unref(exec_ctx, tcp);
}
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
@@ -199,18 +200,18 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
if (!resolved) {
goto error;
}
-
- tcp = grpc_tcp_server_create();
+ state = gpr_malloc(sizeof(*state));
+ memset(state, 0, sizeof(*state));
+ grpc_closure_init(&state->destroy_closure, destroy_done, state);
+ tcp = grpc_tcp_server_create(&state->destroy_closure);
if (!tcp) {
goto error;
}
for (i = 0; i < resolved->naddrs; i++) {
- grpc_tcp_listener *listener;
- listener = grpc_tcp_server_add_port(
+ port_temp = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len);
- port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp > 0) {
if (port_num == -1) {
port_num = port_temp;
@@ -232,8 +233,6 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
}
grpc_resolved_addresses_destroy(resolved);
- state = gpr_malloc(sizeof(*state));
- memset(state, 0, sizeof(*state));
state->server = server;
state->tcp = tcp;
state->sc = sc;
@@ -258,7 +257,7 @@ error:
grpc_resolved_addresses_destroy(resolved);
}
if (tcp) {
- grpc_tcp_server_destroy(&exec_ctx, tcp, NULL);
+ grpc_tcp_server_unref(&exec_ctx, tcp);
}
if (state) {
gpr_free(state);
diff --git a/src/core/support/env_win32.c b/src/core/support/env_win32.c
index 6b1ff102b0..10258283ba 100644
--- a/src/core/support/env_win32.c
+++ b/src/core/support/env_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,7 +38,12 @@
#include "src/core/support/env.h"
#include "src/core/support/string.h"
+#ifdef __MINGW32__
+errno_t getenv_s(size_t *size_needed, char *buffer, size_t size,
+ const char *varname);
+#else
#include <stdlib.h>
+#endif
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -47,14 +52,17 @@
char *gpr_getenv(const char *name) {
size_t size;
char *result = NULL;
- char *duplicated;
errno_t err;
- err = _dupenv_s(&result, &size, name);
- if (err) return NULL;
- duplicated = gpr_strdup(result);
- free(result);
- return duplicated;
+ err = getenv_s(&size, NULL, 0, name);
+ if (err || (size == 0)) return NULL;
+ result = gpr_malloc(size);
+ err = getenv_s(&size, result, size, name);
+ if (err) {
+ gpr_free(result);
+ return NULL;
+ }
+ return result;
}
void gpr_setenv(const char *name, const char *value) {
diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c
index 40adcd1b50..e18e667fe5 100644
--- a/src/core/support/log_win32.c
+++ b/src/core/support/log_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -109,13 +109,13 @@ void gpr_default_log(gpr_log_func_args *args) {
fflush(stderr);
}
-char *gpr_format_message(DWORD messageid) {
+char *gpr_format_message(int messageid) {
LPTSTR tmessage;
char *message;
DWORD status = FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
- NULL, messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR)(&tmessage), 0, NULL);
if (status == 0) return gpr_strdup("Unable to retrieve error string");
message = gpr_tchar_to_char(tmessage);
diff --git a/src/core/support/string_win32.c b/src/core/support/string_win32.c
index 914ba8771c..3b1f702cf1 100644
--- a/src/core/support/string_win32.c
+++ b/src/core/support/string_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -85,8 +85,8 @@ LPTSTR
gpr_char_to_tchar(LPCSTR input) {
LPTSTR ret;
int needed = MultiByteToWideChar(CP_UTF8, 0, input, -1, NULL, 0);
- if (needed == 0) return NULL;
- ret = gpr_malloc(needed * sizeof(TCHAR));
+ if (needed <= 0) return NULL;
+ ret = gpr_malloc((unsigned)needed * sizeof(TCHAR));
MultiByteToWideChar(CP_UTF8, 0, input, -1, ret, needed);
return ret;
}
@@ -95,8 +95,8 @@ LPSTR
gpr_tchar_to_char(LPCTSTR input) {
LPSTR ret;
int needed = WideCharToMultiByte(CP_UTF8, 0, input, -1, NULL, 0, NULL, NULL);
- if (needed == 0) return NULL;
- ret = gpr_malloc(needed);
+ if (needed <= 0) return NULL;
+ ret = gpr_malloc((unsigned)needed);
WideCharToMultiByte(CP_UTF8, 0, input, -1, ret, needed, NULL, NULL);
return ret;
}
diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c
index 51a082b29e..84d412a75f 100644
--- a/src/core/support/sync_win32.c
+++ b/src/core/support/sync_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -94,7 +94,11 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
if (now_ms >= deadline_ms) {
timeout = 1;
} else {
- timeout_max_ms = (DWORD)min(deadline_ms - now_ms, INFINITE - 1);
+ if ((deadline_ms - now_ms) >= INFINITE) {
+ timeout_max_ms = INFINITE - 1;
+ } else {
+ timeout_max_ms = (DWORD)(deadline_ms - now_ms);
+ }
timeout = (SleepConditionVariableCS(cv, &mu->cs, timeout_max_ms) == 0 &&
GetLastError() == ERROR_TIMEOUT);
}
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index 2bed0f6a9c..8af957e6f4 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,9 +37,12 @@
#ifdef GPR_WIN32
+#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <src/core/support/time_precise.h>
#include <sys/timeb.h>
+#include <process.h>
+#include <limits.h>
#include "src/core/support/block_annotate.h"
@@ -50,11 +53,12 @@ void gpr_time_init(void) {
LARGE_INTEGER frequency;
QueryPerformanceFrequency(&frequency);
QueryPerformanceCounter(&g_start_time);
- g_time_scale = 1.0 / frequency.QuadPart;
+ g_time_scale = 1.0 / (double)frequency.QuadPart;
}
gpr_timespec gpr_now(gpr_clock_type clock) {
gpr_timespec now_tv;
+ LONGLONG diff;
struct _timeb now_tb;
LARGE_INTEGER timestamp;
double now_dbl;
@@ -68,10 +72,14 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
case GPR_CLOCK_MONOTONIC:
case GPR_CLOCK_PRECISE:
QueryPerformanceCounter(&timestamp);
- now_dbl = (timestamp.QuadPart - g_start_time.QuadPart) * g_time_scale;
+ diff = timestamp.QuadPart - g_start_time.QuadPart;
+ now_dbl = (double)diff * g_time_scale;
now_tv.tv_sec = (int64_t)now_dbl;
now_tv.tv_nsec = (int32_t)((now_dbl - (double)now_tv.tv_sec) * 1e9);
break;
+ case GPR_TIMESPAN:
+ abort();
+ break;
}
return now_tv;
}
@@ -79,7 +87,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
void gpr_sleep_until(gpr_timespec until) {
gpr_timespec now;
gpr_timespec delta;
- DWORD sleep_millis;
+ int64_t sleep_millis;
for (;;) {
/* We could simplify by using clock_nanosleep instead, but it might be
@@ -91,9 +99,10 @@ void gpr_sleep_until(gpr_timespec until) {
delta = gpr_time_sub(until, now);
sleep_millis =
- (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
+ delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
+ GPR_ASSERT((sleep_millis >= 0) && (sleep_millis <= INT_MAX));
GRPC_SCHEDULING_START_BLOCKING_REGION;
- Sleep(sleep_millis);
+ Sleep((DWORD)sleep_millis);
GRPC_SCHEDULING_END_BLOCKING_REGION;
}
}
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 66c5a522a4..a4a53d3ec1 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -119,8 +119,10 @@ void grpc_init(void) {
grpc_iomgr_init();
grpc_executor_init();
grpc_tracer_init("GRPC_TRACE");
- /* Only initialize census if noone else has. */
- if (census_enabled() == CENSUS_FEATURE_NONE) {
+ /* Only initialize census if no one else has and some features are
+ * available. */
+ if (census_enabled() == CENSUS_FEATURE_NONE &&
+ census_supported() != CENSUS_FEATURE_NONE) {
if (census_initialize(census_supported())) { /* enable all features. */
gpr_log(GPR_ERROR, "Could not initialize census.");
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 79db13810a..0928f1e045 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -779,9 +779,7 @@ grpc_server *grpc_server_create_from_filters(
const grpc_channel_filter **filters, size_t filter_count,
const grpc_channel_args *args) {
size_t i;
- /* TODO(census): restore this once we finalize census filter etc.
- int census_enabled = grpc_channel_args_is_census_enabled(args); */
- int census_enabled = 0;
+ int census_enabled = grpc_channel_args_is_census_enabled(args);
grpc_server *server = gpr_malloc(sizeof(grpc_server));
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 5ce7c1955b..6e21d2dcd7 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -53,7 +53,8 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
}
static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_endpoint *tcp) {
+ grpc_endpoint *tcp,
+ grpc_tcp_server_acceptor *acceptor) {
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code
@@ -80,7 +81,8 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_destroy(exec_ctx, tcp, destroy_done);
+ grpc_tcp_server_unref(exec_ctx, tcp);
+ grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
@@ -100,15 +102,13 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- tcp = grpc_tcp_server_create();
+ tcp = grpc_tcp_server_create(NULL);
GPR_ASSERT(tcp);
for (i = 0; i < resolved->naddrs; i++) {
- grpc_tcp_listener *listener;
- listener = grpc_tcp_server_add_port(
+ port_temp = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len);
- port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp > 0) {
if (port_num == -1) {
port_num = port_temp;
@@ -139,7 +139,7 @@ error:
grpc_resolved_addresses_destroy(resolved);
}
if (tcp) {
- grpc_tcp_server_destroy(&exec_ctx, tcp, NULL);
+ grpc_tcp_server_unref(&exec_ctx, tcp);
}
port_num = 0;
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index f30093e06b..5e37e80948 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,9 +43,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
const grpc_channel_filter *filters[3];
size_t num_filters = 0;
filters[num_filters++] = &grpc_compress_filter;
- if (grpc_channel_args_is_census_enabled(args)) {
- filters[num_filters++] = &grpc_server_census_filter;
- }
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
return grpc_server_create_from_filters(filters, num_filters, args);
}
diff --git a/src/core/surface/version.c b/src/core/surface/version.c
index 962a72112a..262a13f184 100644
--- a/src/core/surface/version.c
+++ b/src/core/surface/version.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -36,4 +36,4 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "0.12.0.0"; }
+const char *grpc_version_string(void) { return "0.13.0.0"; }