aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--PYTHON-MANIFEST.in1
-rw-r--r--README.md2
-rw-r--r--build.yaml1
-rwxr-xr-xgrpc.gemspec1
-rw-r--r--include/grpc/census.h4
-rw-r--r--package.json1
-rw-r--r--setup.py5
-rw-r--r--src/core/census/tag_set.c2
-rw-r--r--src/core/iomgr/tcp_server.h61
-rw-r--r--src/core/iomgr/tcp_server_posix.c148
-rw-r--r--src/core/iomgr/tcp_server_windows.c141
-rw-r--r--src/core/security/server_secure_chttp2.c29
-rw-r--r--src/core/surface/server_chttp2.c16
-rw-r--r--src/cpp/server/server_context.cc3
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py15
-rw-r--r--test/core/census/tag_set_test.c3
-rw-r--r--test/core/client_config/set_initial_connect_string_test.c5
-rw-r--r--test/core/iomgr/tcp_server_posix_test.c241
-rw-r--r--test/core/util/reconnect_server.c5
-rw-r--r--test/core/util/test_tcp_server.c26
-rw-r--r--test/core/util/test_tcp_server.h3
-rwxr-xr-xtools/run_tests/build_artifacts.py21
-rwxr-xr-xtools/run_tests/post_tests_ruby.sh2
-rwxr-xr-xtools/run_tests/pre_build_csharp.sh7
-rw-r--r--tools/run_tests/tests.json1
-rw-r--r--vsprojects/vcxproj/test/tag_set_test/tag_set_test.vcxproj2
26 files changed, 521 insertions, 225 deletions
diff --git a/PYTHON-MANIFEST.in b/PYTHON-MANIFEST.in
index 6e54e1b3c6..2f5700d11a 100644
--- a/PYTHON-MANIFEST.in
+++ b/PYTHON-MANIFEST.in
@@ -3,6 +3,7 @@ graft src/python/grpcio/tests
graft src/core
graft include/grpc
graft third_party/boringssl
+graft third_party/zlib
include src/python/grpcio/commands.py
include src/python/grpcio/grpc_core_dependencies.py
include src/python/grpcio/support.py
diff --git a/README.md b/README.md
index e0553ecc86..f894def470 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@
[gRPC - An RPC library and framework](http://github.com/grpc/grpc)
===================================
-Copyright 2015 Google Inc.
+Copyright 2015-2016 Google Inc.
#Documentation
diff --git a/build.yaml b/build.yaml
index c05dde812e..500ec855ae 100644
--- a/build.yaml
+++ b/build.yaml
@@ -2636,3 +2636,4 @@ python_dependencies:
- grpc
- gpr
- boringssl
+ - z
diff --git a/grpc.gemspec b/grpc.gemspec
index 4f3a3f2dd9..47b66ae535 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -412,5 +412,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/census/context.c )
s.files += %w( src/core/census/initialize.c )
s.files += %w( src/core/census/operation.c )
+ s.files += %w( src/core/census/tag_set.c )
s.files += %w( src/core/census/tracing.c )
end
diff --git a/include/grpc/census.h b/include/grpc/census.h
index ab0e0e4802..f4130c7e6a 100644
--- a/include/grpc/census.h
+++ b/include/grpc/census.h
@@ -506,8 +506,8 @@ extern census_aggregation_ops census_agg_window;
construction via census_define_view(). */
typedef struct {
const census_aggregation_ops *ops;
- const void
- *create_arg; /* Argument to be used for aggregation initialization. */
+ const void *
+ create_arg; /* Argument to be used for aggregation initialization. */
} census_aggregation;
/** A census view type. Opaque. */
diff --git a/package.json b/package.json
index e0bbb7d80f..0b87f6a59d 100644
--- a/package.json
+++ b/package.json
@@ -363,6 +363,7 @@
"src/core/census/context.c",
"src/core/census/initialize.c",
"src/core/census/operation.c",
+ "src/core/census/tag_set.c",
"src/core/census/tracing.c",
"include/grpc/support/alloc.h",
"include/grpc/support/atm.h",
diff --git a/setup.py b/setup.py
index 113e49ecfe..52d3d2241e 100644
--- a/setup.py
+++ b/setup.py
@@ -45,6 +45,7 @@ egg_info.manifest_maker.template = 'PYTHON-MANIFEST.in'
PYTHON_STEM = './src/python/grpcio'
CORE_INCLUDE = ('./include', '.',)
BORINGSSL_INCLUDE = ('./third_party/boringssl/include',)
+ZLIB_INCLUDE = ('./third_party/zlib',)
# Ensure we're in the proper directory whether or not we're being used by pip.
os.chdir(os.path.dirname(os.path.abspath(__file__)))
@@ -75,9 +76,9 @@ CYTHON_EXTENSION_PACKAGE_NAMES = ()
CYTHON_EXTENSION_MODULE_NAMES = ('grpc._cython.cygrpc',)
EXTENSION_INCLUDE_DIRECTORIES = (
- (PYTHON_STEM,) + CORE_INCLUDE + BORINGSSL_INCLUDE)
+ (PYTHON_STEM,) + CORE_INCLUDE + BORINGSSL_INCLUDE + ZLIB_INCLUDE)
-EXTENSION_LIBRARIES = ('z', 'm',)
+EXTENSION_LIBRARIES = ('m',)
if not "darwin" in sys.platform:
EXTENSION_LIBRARIES += ('rt',)
diff --git a/src/core/census/tag_set.c b/src/core/census/tag_set.c
index 8908a2d5f3..9b65a1dfa1 100644
--- a/src/core/census/tag_set.c
+++ b/src/core/census/tag_set.c
@@ -253,7 +253,7 @@ static void tag_set_flatten(struct tag_set *tags) {
if (tags->ntags == tags->ntags_alloc) return;
bool found_deleted = false; // found a deleted tag.
char *kvp = tags->kvm;
- char *dbase; // record location of deleted tag
+ char *dbase = NULL; // record location of deleted tag
for (int i = 0; i < tags->ntags_alloc; i++) {
struct raw_tag tag;
char *next_kvp = decode_tag(&tag, kvp, 0);
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 3294e13797..8f3184ff1e 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,28 +34,39 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H
#define GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H
+#include "src/core/iomgr/closure.h"
#include "src/core/iomgr/endpoint.h"
/* Forward decl of grpc_tcp_server */
typedef struct grpc_tcp_server grpc_tcp_server;
-/* Forward decl of grpc_tcp_listener */
-typedef struct grpc_tcp_listener grpc_tcp_listener;
+typedef struct grpc_tcp_server_acceptor grpc_tcp_server_acceptor;
+struct grpc_tcp_server_acceptor {
+ /* grpc_tcp_server_cb functions share a ref on from_server that is valid
+ until the function returns. */
+ grpc_tcp_server *from_server;
+ /* Indices that may be passed to grpc_tcp_server_port_fd(). */
+ unsigned port_index;
+ unsigned fd_index;
+};
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_endpoint *ep);
+ grpc_endpoint *ep,
+ grpc_tcp_server_acceptor *acceptor);
-/* Create a server, initially not bound to any ports */
-grpc_tcp_server *grpc_tcp_server_create(void);
+/* Create a server, initially not bound to any ports. The caller owns one ref.
+ If shutdown_complete is not NULL, it will be used by
+ grpc_tcp_server_unref(). */
+grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete);
/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_pollset **pollsets, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb, void *cb_arg);
-/* Add a port to the server, returning the newly created listener on success,
- or a null pointer on failure.
+/* Add a port to the server, returning the newly allocated port on success, or
+ -1 on failure.
The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
both IPv4 and IPv6 connections, but :: is the preferred style. This usually
@@ -63,21 +74,31 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
-grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
- const void *addr, size_t addr_len);
+int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len);
-/* Returns the file descriptor of the Nth listening socket on this server,
- or -1 if the index is out of bounds.
+/* Number of fds at the given port_index, or 0 if port_index is out of
+ bounds. */
+unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, unsigned port_index);
- The file descriptor remains owned by the server, and will be cleaned
- up when grpc_tcp_server_destroy is called. */
-int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
+/* Returns the file descriptor of the Mth (fd_index) listening socket of the Nth
+ (port_index) call to add_port() on this server, or -1 if the indices are out
+ of bounds. The file descriptor remains owned by the server, and will be
+ cleaned up when grpc_tcp_server_destroy is called. */
+int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
+ unsigned fd_index);
-void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
- grpc_closure *closure);
+/* Ref s and return s. */
+grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s);
-int grpc_tcp_listener_get_port(grpc_tcp_listener *listener);
-void grpc_tcp_listener_ref(grpc_tcp_listener *listener);
-void grpc_tcp_listener_unref(grpc_tcp_listener *listener);
+/* shutdown_starting is called when ref count has reached zero and the server is
+ about to be destroyed. The server will be deleted after it returns. Calling
+ grpc_tcp_server_ref() from it has no effect. */
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
+ grpc_closure *shutdown_starting);
+
+/* If the recount drops to zero, delete s, and call (exec_ctx==NULL) or enqueue
+ a call (exec_ctx!=NULL) to shutdown_complete. */
+void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 49e83cf6ae..adf14aeb59 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -73,6 +73,7 @@ static gpr_once s_init_max_accept_queue_size;
static int s_max_accept_queue_size;
/* one listening port */
+typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener {
int fd;
grpc_fd *emfd;
@@ -84,9 +85,10 @@ struct grpc_tcp_listener {
} addr;
size_t addr_len;
int port;
+ unsigned port_index;
+ unsigned fd_index;
grpc_closure read_closure;
grpc_closure destroyed_closure;
- gpr_refcount refs;
struct grpc_tcp_listener *next;
/* When we add a listener, more than one can be created, mainly because of
IPv6. A sibling will still be in the normal list, but will be flagged
@@ -106,6 +108,7 @@ static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
/* the overall server */
struct grpc_tcp_server {
+ gpr_refcount refs;
/* Called whenever accept() succeeds on a server port. */
grpc_tcp_server_cb on_accept_cb;
void *on_accept_cb_arg;
@@ -122,8 +125,12 @@ struct grpc_tcp_server {
/* linked list of server ports */
grpc_tcp_listener *head;
+ grpc_tcp_listener *tail;
unsigned nports;
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
+
/* shutdown callback */
grpc_closure *shutdown_complete;
@@ -133,28 +140,35 @@ struct grpc_tcp_server {
size_t pollset_count;
};
-grpc_tcp_server *grpc_tcp_server_create(void) {
+grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
+ s->shutdown_starting.head = NULL;
+ s->shutdown_starting.tail = NULL;
+ s->shutdown_complete = shutdown_complete;
s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL;
s->head = NULL;
+ s->tail = NULL;
s->nports = 0;
return s;
}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ }
gpr_mu_destroy(&s->mu);
while (s->head) {
grpc_tcp_listener *sp = s->head;
s->head = sp->next;
- grpc_tcp_listener_unref(sp);
+ gpr_free(sp);
}
gpr_free(s);
@@ -203,15 +217,12 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}
-void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
- grpc_closure *closure) {
+static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
s->shutdown = 1;
- s->shutdown_complete = closure;
-
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener *sp;
@@ -308,6 +319,8 @@ error:
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
grpc_tcp_listener *sp = arg;
+ grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
+ sp->fd_index};
grpc_fd *fdobj;
size_t i;
@@ -355,7 +368,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
}
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
+ grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
+ &acceptor);
gpr_free(name);
gpr_free(addr_str);
@@ -375,7 +389,9 @@ error:
static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
const struct sockaddr *addr,
- size_t addr_len) {
+ size_t addr_len,
+ unsigned port_index,
+ unsigned fd_index) {
grpc_tcp_listener *sp = NULL;
int port;
char *addr_str;
@@ -389,17 +405,23 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
s->nports++;
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
sp = gpr_malloc(sizeof(grpc_tcp_listener));
- sp->next = s->head;
- s->head = sp;
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
sp->server = s;
sp->fd = fd;
sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->port = port;
+ sp->port_index = port_index;
+ sp->fd_index = fd_index;
sp->is_sibling = 0;
sp->sibling = NULL;
- gpr_ref_init(&sp->refs, 1);
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(addr_str);
@@ -409,8 +431,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, int fd,
return sp;
}
-grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
- const void *addr, size_t addr_len) {
+int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len) {
grpc_tcp_listener *sp;
grpc_tcp_listener *sp2 = NULL;
int fd;
@@ -423,7 +445,11 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
int port;
-
+ unsigned port_index = 0;
+ unsigned fd_index = 0;
+ if (s->tail != NULL) {
+ port_index = s->tail->port_index + 1;
+ }
if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
unlink_if_unix_domain_socket(addr);
}
@@ -462,11 +488,13 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
- sp = add_socket_to_server(s, fd, addr, addr_len);
+ sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
-
+ if (sp != NULL) {
+ ++fd_index;
+ }
/* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
if (port == 0 && sp != NULL) {
grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port);
@@ -485,20 +513,46 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
- sp = add_socket_to_server(s, fd, addr, addr_len);
- if (sp != NULL) sp->sibling = sp2;
- if (sp2 != NULL) sp2->is_sibling = 1;
+ sp = add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index);
+ if (sp2 != NULL && sp != NULL) {
+ sp2->sibling = sp;
+ sp->is_sibling = 1;
+ }
done:
gpr_free(allocated_addr);
- return sp;
+ if (sp != NULL) {
+ return sp->port;
+ } else {
+ return -1;
+ }
}
-int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
+unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s,
+ unsigned port_index) {
+ unsigned num_fds = 0;
grpc_tcp_listener *sp;
- for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--)
+ for (sp = s->head; sp && port_index != 0; sp = sp->next) {
+ if (!sp->is_sibling) {
+ --port_index;
+ }
+ }
+ for (; sp; sp = sp->sibling, ++num_fds)
+ ;
+ return num_fds;
+}
+
+int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
+ unsigned fd_index) {
+ grpc_tcp_listener *sp;
+ for (sp = s->head; sp && port_index != 0; sp = sp->next) {
+ if (!sp->is_sibling) {
+ --port_index;
+ }
+ }
+ for (; sp && fd_index != 0; sp = sp->sibling, --fd_index)
;
- if (port_index == 0 && sp) {
+ if (sp) {
return sp->fd;
} else {
return -1;
@@ -531,31 +585,33 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
gpr_mu_unlock(&s->mu);
}
-int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
- if (listener != NULL) {
- grpc_tcp_listener *sp = listener;
- return sp->port;
- } else {
- return 0;
- }
+grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
+ gpr_ref(&s->refs);
+ return s;
}
-void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
- grpc_tcp_listener *sp = listener;
- gpr_ref(&sp->refs);
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
+ grpc_closure *shutdown_starting) {
+ gpr_mu_lock(&s->mu);
+ grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
+ gpr_mu_unlock(&s->mu);
}
-void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
- grpc_tcp_listener *sp = listener;
- if (sp->is_sibling) return;
- if (gpr_unref(&sp->refs)) {
- grpc_tcp_listener *sibling = sp->sibling;
- while (sibling) {
- sp = sibling;
- sibling = sp->sibling;
- gpr_free(sp);
+void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ if (gpr_unref(&s->refs)) {
+ /* Complete shutdown_starting work before destroying. */
+ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_lock(&s->mu);
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
+ gpr_mu_unlock(&s->mu);
+ if (exec_ctx == NULL) {
+ grpc_exec_ctx_flush(&local_exec_ctx);
+ tcp_server_destroy(&local_exec_ctx, s);
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ } else {
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ tcp_server_destroy(exec_ctx, s);
}
- gpr_free(listener);
}
}
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index d38fd8860a..8ee8149f25 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -55,6 +55,7 @@
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
/* one listening port */
+typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener {
/* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */
@@ -65,19 +66,20 @@ struct grpc_tcp_listener {
grpc_winsocket *socket;
/* The actual TCP port number. */
int port;
+ unsigned port_index;
grpc_tcp_server *server;
/* The cached AcceptEx for that port. */
LPFN_ACCEPTEX AcceptEx;
int shutting_down;
/* closure for socket notification of accept being ready */
grpc_closure on_accept;
- gpr_refcount refs;
/* linked list */
struct grpc_tcp_listener *next;
};
/* the overall server */
struct grpc_tcp_server {
+ gpr_refcount refs;
/* Called whenever accept() succeeds on a server port. */
grpc_tcp_server_cb on_accept_cb;
void *on_accept_cb_arg;
@@ -89,6 +91,10 @@ struct grpc_tcp_server {
/* linked list of server ports */
grpc_tcp_listener *head;
+ grpc_tcp_listener *tail;
+
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
/* shutdown callback */
grpc_closure *shutdown_complete;
@@ -96,21 +102,25 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
-grpc_tcp_server *grpc_tcp_server_create(void) {
+grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ gpr_ref_init(&s->refs, 1);
gpr_mu_init(&s->mu);
s->active_ports = 0;
s->on_accept_cb = NULL;
s->on_accept_cb_arg = NULL;
s->head = NULL;
- s->shutdown_complete = NULL;
+ s->tail = NULL;
+ s->shutdown_starting.head = NULL;
+ s->shutdown_starting.tail = NULL;
+ s->shutdown_complete = shutdown_complete;
return s;
}
-static void dont_care_about_shutdown_completion(void *arg) {}
-
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ if (s->shutdown_complete != NULL) {
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ }
/* Now that the accepts have been aborted, we can destroy the sockets.
The IOCP won't get notified on these, so we can flag them as already
@@ -120,20 +130,28 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
s->head = sp->next;
sp->next = NULL;
grpc_winsocket_destroy(sp->socket);
- grpc_tcp_listener_unref(sp);
+ gpr_free(sp);
}
gpr_free(s);
}
-/* Public function. Stops and destroys a grpc_tcp_server. */
-void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
- grpc_closure *shutdown_complete) {
+grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
+ gpr_ref(&s->refs);
+ return s;
+}
+
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
+ grpc_closure *shutdown_starting) {
+ gpr_mu_lock(&s->mu);
+ grpc_closure_list_add(&s->shutdown_starting, shutdown_starting, 1);
+ gpr_mu_unlock(&s->mu);
+}
+
+static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
int immediately_done = 0;
grpc_tcp_listener *sp;
gpr_mu_lock(&s->mu);
- s->shutdown_complete = shutdown_complete;
-
/* First, shutdown all fd's. This will queue abortion calls for all
of the pending accepts due to the normal operation mechanism. */
if (s->active_ports == 0) {
@@ -150,6 +168,24 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
}
}
+void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ if (gpr_unref(&s->refs)) {
+ /* Complete shutdown_starting work before destroying. */
+ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_lock(&s->mu);
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
+ gpr_mu_unlock(&s->mu);
+ if (exec_ctx == NULL) {
+ grpc_exec_ctx_flush(&local_exec_ctx);
+ tcp_server_destroy(&local_exec_ctx, s);
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ } else {
+ grpc_exec_ctx_finish(&local_exec_ctx);
+ tcp_server_destroy(exec_ctx, s);
+ }
+ }
+}
+
/* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
size_t addr_len) {
@@ -277,6 +313,7 @@ failure:
/* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
grpc_tcp_listener *sp = arg;
+ grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
@@ -343,7 +380,9 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
/* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */
- if (ep) sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep);
+ if (ep)
+ sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep,
+ &acceptor);
/* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned
to the new connection. We need to create a new one for the next
@@ -353,7 +392,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
const struct sockaddr *addr,
- size_t addr_len) {
+ size_t addr_len,
+ unsigned port_index) {
grpc_tcp_listener *sp = NULL;
int port;
int status;
@@ -382,15 +422,20 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
sp = gpr_malloc(sizeof(grpc_tcp_listener));
- sp->next = s->head;
- s->head = sp;
+ sp->next = NULL;
+ if (s->head == NULL) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
sp->server = s;
sp->socket = grpc_winsocket_create(sock, "listener");
sp->shutting_down = 0;
sp->AcceptEx = AcceptEx;
sp->new_socket = INVALID_SOCKET;
sp->port = port;
- gpr_ref_init(&sp->refs, 1);
+ sp->port_index = port_index;
grpc_closure_init(&sp->on_accept, on_accept, sp);
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
@@ -399,8 +444,8 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
return sp;
}
-grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
- const void *addr, size_t addr_len) {
+int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
+ size_t addr_len) {
grpc_tcp_listener *sp;
SOCKET sock;
struct sockaddr_in6 addr6_v4mapped;
@@ -409,6 +454,10 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
int port;
+ unsigned port_index = 0;
+ if (s->tail != NULL) {
+ port_index = s->tail->port_index + 1;
+ }
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
@@ -450,17 +499,38 @@ grpc_tcp_listener *grpc_tcp_server_add_port(grpc_tcp_server *s,
gpr_free(utf8_message);
}
- sp = add_socket_to_server(s, sock, addr, addr_len);
+ sp = add_socket_to_server(s, sock, addr, addr_len, port_index);
gpr_free(allocated_addr);
- return sp;
+ if (sp) {
+ return sp->port;
+ } else {
+ return -1;
+ }
}
-int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
+unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s,
+ unsigned port_index) {
grpc_tcp_listener *sp;
- for (sp = s->head; sp && port_index != 0; sp = sp->next, port_index--)
+ for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
;
- if (port_index == 0 && sp) {
+ if (sp) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
+ unsigned fd_index) {
+ grpc_tcp_listener *sp;
+ if (fd_index != 0) {
+ /* Windows implementation has only one fd per port_index. */
+ return -1;
+ }
+ for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
+ ;
+ if (sp) {
return _open_osfhandle(sp->socket->socket, 0);
} else {
return -1;
@@ -485,25 +555,4 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
gpr_mu_unlock(&s->mu);
}
-int grpc_tcp_listener_get_port(grpc_tcp_listener *listener) {
- if (listener != NULL) {
- grpc_tcp_listener *sp = listener;
- return sp->port;
- } else {
- return 0;
- }
-}
-
-void grpc_tcp_listener_ref(grpc_tcp_listener *listener) {
- grpc_tcp_listener *sp = listener;
- gpr_ref(&sp->refs);
-}
-
-void grpc_tcp_listener_unref(grpc_tcp_listener *listener) {
- grpc_tcp_listener *sp = listener;
- if (gpr_unref(&sp->refs)) {
- gpr_free(listener);
- }
-}
-
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index d7fad33854..08713fceaf 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -126,8 +126,8 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
state_unref(state);
}
-static void on_accept(grpc_exec_ctx *exec_ctx, void *statep,
- grpc_endpoint *tcp) {
+static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
+ grpc_tcp_server_acceptor *acceptor) {
grpc_server_secure_state *state = statep;
state_ref(state);
grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp,
@@ -144,8 +144,10 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) {
grpc_server_secure_state *state = statep;
- state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
- success);
+ if (state->destroy_callback != NULL) {
+ state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
+ success);
+ }
grpc_security_connector_shutdown(exec_ctx, state->sc);
state_unref(state);
}
@@ -161,8 +163,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
state->destroy_callback = callback;
tcp = state->tcp;
gpr_mu_unlock(&state->mu);
- grpc_closure_init(&state->destroy_closure, destroy_done, state);
- grpc_tcp_server_destroy(exec_ctx, tcp, &state->destroy_closure);
+ grpc_tcp_server_unref(exec_ctx, tcp);
}
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
@@ -199,18 +200,18 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
if (!resolved) {
goto error;
}
-
- tcp = grpc_tcp_server_create();
+ state = gpr_malloc(sizeof(*state));
+ memset(state, 0, sizeof(*state));
+ grpc_closure_init(&state->destroy_closure, destroy_done, state);
+ tcp = grpc_tcp_server_create(&state->destroy_closure);
if (!tcp) {
goto error;
}
for (i = 0; i < resolved->naddrs; i++) {
- grpc_tcp_listener *listener;
- listener = grpc_tcp_server_add_port(
+ port_temp = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len);
- port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp > 0) {
if (port_num == -1) {
port_num = port_temp;
@@ -232,8 +233,6 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
}
grpc_resolved_addresses_destroy(resolved);
- state = gpr_malloc(sizeof(*state));
- memset(state, 0, sizeof(*state));
state->server = server;
state->tcp = tcp;
state->sc = sc;
@@ -258,7 +257,7 @@ error:
grpc_resolved_addresses_destroy(resolved);
}
if (tcp) {
- grpc_tcp_server_destroy(&exec_ctx, tcp, NULL);
+ grpc_tcp_server_unref(&exec_ctx, tcp);
}
if (state) {
gpr_free(state);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 5ce7c1955b..6e21d2dcd7 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -53,7 +53,8 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
}
static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_endpoint *tcp) {
+ grpc_endpoint *tcp,
+ grpc_tcp_server_acceptor *acceptor) {
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code
@@ -80,7 +81,8 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_destroy(exec_ctx, tcp, destroy_done);
+ grpc_tcp_server_unref(exec_ctx, tcp);
+ grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
@@ -100,15 +102,13 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- tcp = grpc_tcp_server_create();
+ tcp = grpc_tcp_server_create(NULL);
GPR_ASSERT(tcp);
for (i = 0; i < resolved->naddrs; i++) {
- grpc_tcp_listener *listener;
- listener = grpc_tcp_server_add_port(
+ port_temp = grpc_tcp_server_add_port(
tcp, (struct sockaddr *)&resolved->addrs[i].addr,
resolved->addrs[i].len);
- port_temp = grpc_tcp_listener_get_port(listener);
if (port_temp > 0) {
if (port_num == -1) {
port_num = port_temp;
@@ -139,7 +139,7 @@ error:
grpc_resolved_addresses_destroy(resolved);
}
if (tcp) {
- grpc_tcp_server_destroy(&exec_ctx, tcp, NULL);
+ grpc_tcp_server_unref(&exec_ctx, tcp);
}
port_num = 0;
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 8193e70660..b3a74c7fce 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -122,6 +122,7 @@ ServerContext::ServerContext()
: completion_op_(nullptr),
has_notify_when_done_tag_(false),
async_notify_when_done_tag_(nullptr),
+ deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
call_(nullptr),
cq_(nullptr),
sent_initial_metadata_(false) {}
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 66a55ef3e5..98ab1ff7f0 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -517,4 +517,19 @@ CORE_SOURCE_FILES = [
'third_party/boringssl/ssl/t1_enc.c',
'third_party/boringssl/ssl/t1_lib.c',
'third_party/boringssl/ssl/tls_record.c',
+ 'third_party/zlib/adler32.c',
+ 'third_party/zlib/compress.c',
+ 'third_party/zlib/crc32.c',
+ 'third_party/zlib/deflate.c',
+ 'third_party/zlib/gzclose.c',
+ 'third_party/zlib/gzlib.c',
+ 'third_party/zlib/gzread.c',
+ 'third_party/zlib/gzwrite.c',
+ 'third_party/zlib/infback.c',
+ 'third_party/zlib/inffast.c',
+ 'third_party/zlib/inflate.c',
+ 'third_party/zlib/inftrees.c',
+ 'third_party/zlib/trees.c',
+ 'third_party/zlib/uncompr.c',
+ 'third_party/zlib/zutil.c',
]
diff --git a/test/core/census/tag_set_test.c b/test/core/census/tag_set_test.c
index 8e09e6c1c6..1056e98d55 100644
--- a/test/core/census/tag_set_test.c
+++ b/test/core/census/tag_set_test.c
@@ -319,9 +319,10 @@ static void replace_add_delete_test(void) {
census_tag_set_destroy(cts2);
}
+#define BUF_SIZE 200
+
// test encode/decode.
static void encode_decode_test(void) {
- const size_t BUF_SIZE = 200;
char buffer[BUF_SIZE];
struct census_tag_set *cts =
census_tag_set_create(NULL, basic_tags, BASIC_TAG_COUNT, NULL);
diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c
index ceca56c833..33cab715b2 100644
--- a/test/core/client_config/set_initial_connect_string_test.c
+++ b/test/core/client_config/set_initial_connect_string_test.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -78,7 +78,8 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
}
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) {
+static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+ grpc_tcp_server_acceptor *acceptor) {
test_tcp_server *server = arg;
grpc_closure_init(&on_read, handle_read, NULL);
gpr_slice_buffer_init(&state.incoming_buffer);
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index 530381e37f..f7097ac904 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -33,11 +33,15 @@
#include "src/core/iomgr/tcp_server.h"
#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/sockaddr_utils.h"
+#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
@@ -48,11 +52,69 @@
static grpc_pollset g_pollset;
static int g_nconnects = 0;
-static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) {
+typedef struct on_connect_result {
+ /* Owns a ref to server. */
+ grpc_tcp_server *server;
+ unsigned port_index;
+ unsigned fd_index;
+ int server_fd;
+} on_connect_result;
+
+typedef struct server_weak_ref {
+ grpc_tcp_server *server;
+
+ /* arg is this server_weak_ref. */
+ grpc_closure server_shutdown;
+} server_weak_ref;
+
+static on_connect_result g_result = {NULL, 0, 0, -1};
+
+static void on_connect_result_init(on_connect_result *result) {
+ result->server = NULL;
+ result->port_index = 0;
+ result->fd_index = 0;
+ result->server_fd = -1;
+}
+
+static void on_connect_result_set(on_connect_result *result,
+ const grpc_tcp_server_acceptor *acceptor) {
+ result->server = grpc_tcp_server_ref(acceptor->from_server);
+ result->port_index = acceptor->port_index;
+ result->fd_index = acceptor->fd_index;
+ result->server_fd = grpc_tcp_server_port_fd(
+ result->server, acceptor->port_index, acceptor->fd_index);
+}
+
+static void server_weak_ref_shutdown(grpc_exec_ctx *exec_ctx, void *arg,
+ int success) {
+ server_weak_ref *weak_ref = arg;
+ weak_ref->server = NULL;
+}
+
+static void server_weak_ref_init(server_weak_ref *weak_ref) {
+ weak_ref->server = NULL;
+ grpc_closure_init(&weak_ref->server_shutdown, server_weak_ref_shutdown,
+ weak_ref);
+}
+
+/* Make weak_ref->server_shutdown a shutdown_starting cb on server.
+ grpc_tcp_server promises that the server object will live until
+ weak_ref->server_shutdown has returned. A strong ref on grpc_tcp_server
+ should be held until server_weak_ref_set() returns to avoid a race where the
+ server is deleted before the shutdown_starting cb is added. */
+static void server_weak_ref_set(server_weak_ref *weak_ref,
+ grpc_tcp_server *server) {
+ grpc_tcp_server_shutdown_starting_add(server, &weak_ref->server_shutdown);
+ weak_ref->server = server;
+}
+
+static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+ grpc_tcp_server_acceptor *acceptor) {
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_destroy(exec_ctx, tcp);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ on_connect_result_set(&g_result, acceptor);
g_nconnects++;
grpc_pollset_kick(&g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -60,107 +122,184 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) {
static void test_no_op(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_tcp_server *s = grpc_tcp_server_create();
- grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_tcp_server *s = grpc_tcp_server_create(NULL);
+ grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_start(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_tcp_server *s = grpc_tcp_server_create();
+ grpc_tcp_server *s = grpc_tcp_server_create(NULL);
LOG_TEST("test_no_op_with_start");
grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
- grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
- grpc_tcp_server *s = grpc_tcp_server_create();
+ grpc_tcp_server *s = grpc_tcp_server_create(NULL);
LOG_TEST("test_no_op_with_port");
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
GPR_ASSERT(
- grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)));
+ grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)) > 0);
- grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port_and_start(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
- grpc_tcp_server *s = grpc_tcp_server_create();
+ grpc_tcp_server *s = grpc_tcp_server_create(NULL);
LOG_TEST("test_no_op_with_port_and_start");
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
GPR_ASSERT(
- grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)));
+ grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)) > 0);
grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
- grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_tcp_server_unref(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
}
-static void test_connect(int n) {
+static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote,
+ socklen_t remote_len, on_connect_result *result) {
+ gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
+ int clifd = socket(remote->sa_family, SOCK_STREAM, 0);
+ int nconnects_before;
+
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ nconnects_before = g_nconnects;
+ on_connect_result_init(&g_result);
+ GPR_ASSERT(clifd >= 0);
+ gpr_log(GPR_DEBUG, "start connect");
+ GPR_ASSERT(connect(clifd, remote, remote_len) == 0);
+ gpr_log(GPR_DEBUG, "wait");
+ while (g_nconnects == nconnects_before &&
+ gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
+ grpc_pollset_worker worker;
+ grpc_pollset_work(exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ }
+ gpr_log(GPR_DEBUG, "wait done");
+ GPR_ASSERT(g_nconnects == nconnects_before + 1);
+ close(clifd);
+ *result = g_result;
+
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+}
+
+/* Tests a tcp server with multiple ports. TODO(daniel-j-born): Multiple fds for
+ the same port should be tested. */
+static void test_connect(unsigned n) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_storage addr;
+ struct sockaddr_storage addr1;
socklen_t addr_len = sizeof(addr);
- int svrfd, clifd;
- grpc_tcp_server *s = grpc_tcp_server_create();
- int nconnects_before;
- gpr_timespec deadline;
+ unsigned svr_fd_count;
+ int svr_port;
+ unsigned svr1_fd_count;
+ int svr1_port;
+ grpc_tcp_server *s = grpc_tcp_server_create(NULL);
grpc_pollset *pollsets[1];
- int i;
+ unsigned i;
+ server_weak_ref weak_ref;
+ server_weak_ref_init(&weak_ref);
LOG_TEST("test_connect");
gpr_log(GPR_INFO, "clients=%d", n);
-
memset(&addr, 0, sizeof(addr));
- addr.ss_family = AF_INET;
- GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len));
+ memset(&addr1, 0, sizeof(addr1));
+ addr.ss_family = addr1.ss_family = AF_INET;
+ svr_port = grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len);
+ GPR_ASSERT(svr_port > 0);
+ /* Cannot use wildcard (port==0), because add_port() will try to reuse the
+ same port as a previous add_port(). */
+ svr1_port = grpc_pick_unused_port_or_die();
+ grpc_sockaddr_set_port((struct sockaddr *)&addr1, svr1_port);
+ GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr1, addr_len) ==
+ svr1_port);
+
+ /* Bad port_index. */
+ GPR_ASSERT(grpc_tcp_server_port_fd_count(s, 2) == 0);
+ GPR_ASSERT(grpc_tcp_server_port_fd(s, 2, 0) < 0);
+
+ /* Bad fd_index. */
+ GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 100) < 0);
+ GPR_ASSERT(grpc_tcp_server_port_fd(s, 1, 100) < 0);
+
+ /* Got at least one fd per port. */
+ svr_fd_count = grpc_tcp_server_port_fd_count(s, 0);
+ GPR_ASSERT(svr_fd_count >= 1);
+ svr1_fd_count = grpc_tcp_server_port_fd_count(s, 1);
+ GPR_ASSERT(svr1_fd_count >= 1);
- svrfd = grpc_tcp_server_get_fd(s, 0);
- GPR_ASSERT(svrfd >= 0);
- GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0);
- GPR_ASSERT(addr_len <= sizeof(addr));
+ for (i = 0; i < svr_fd_count; ++i) {
+ int fd = grpc_tcp_server_port_fd(s, 0, i);
+ GPR_ASSERT(fd >= 0);
+ if (i == 0) {
+ GPR_ASSERT(getsockname(fd, (struct sockaddr *)&addr, &addr_len) == 0);
+ GPR_ASSERT(addr_len <= sizeof(addr));
+ }
+ }
+ for (i = 0; i < svr1_fd_count; ++i) {
+ int fd = grpc_tcp_server_port_fd(s, 1, i);
+ GPR_ASSERT(fd >= 0);
+ if (i == 0) {
+ GPR_ASSERT(getsockname(fd, (struct sockaddr *)&addr1, &addr_len) == 0);
+ GPR_ASSERT(addr_len <= sizeof(addr1));
+ }
+ }
pollsets[0] = &g_pollset;
grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL);
- gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
-
for (i = 0; i < n; i++) {
- deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
-
- nconnects_before = g_nconnects;
- clifd = socket(addr.ss_family, SOCK_STREAM, 0);
- GPR_ASSERT(clifd >= 0);
- gpr_log(GPR_DEBUG, "start connect");
- GPR_ASSERT(connect(clifd, (struct sockaddr *)&addr, addr_len) == 0);
-
- gpr_log(GPR_DEBUG, "wait");
- while (g_nconnects == nconnects_before &&
- gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
- grpc_pollset_worker worker;
- grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC), deadline);
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_exec_ctx_finish(&exec_ctx);
- gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ on_connect_result result;
+ int svr_fd;
+ on_connect_result_init(&result);
+ tcp_connect(&exec_ctx, (struct sockaddr *)&addr, addr_len, &result);
+ GPR_ASSERT(result.server_fd >= 0);
+ svr_fd = result.server_fd;
+ GPR_ASSERT(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) ==
+ result.server_fd);
+ GPR_ASSERT(result.port_index == 0);
+ GPR_ASSERT(result.fd_index < svr_fd_count);
+ GPR_ASSERT(result.server == s);
+ if (weak_ref.server == NULL) {
+ server_weak_ref_set(&weak_ref, result.server);
}
- gpr_log(GPR_DEBUG, "wait done");
+ grpc_tcp_server_unref(&exec_ctx, result.server);
- GPR_ASSERT(g_nconnects == nconnects_before + 1);
- close(clifd);
+ on_connect_result_init(&result);
+ tcp_connect(&exec_ctx, (struct sockaddr *)&addr1, addr_len, &result);
+ GPR_ASSERT(result.server_fd >= 0);
+ GPR_ASSERT(result.server_fd != svr_fd);
+ GPR_ASSERT(grpc_tcp_server_port_fd(s, result.port_index, result.fd_index) ==
+ result.server_fd);
+ GPR_ASSERT(result.port_index == 1);
+ GPR_ASSERT(result.fd_index < svr_fd_count);
+ GPR_ASSERT(result.server == s);
+ grpc_tcp_server_unref(&exec_ctx, result.server);
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ /* Weak ref to server valid until final unref. */
+ GPR_ASSERT(weak_ref.server != NULL);
+ GPR_ASSERT(grpc_tcp_server_port_fd(s, 0, 0) >= 0);
+
+ grpc_tcp_server_unref(&exec_ctx, s);
+
+ /* Weak ref lost. */
+ GPR_ASSERT(weak_ref.server == NULL);
- grpc_tcp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -172,7 +311,7 @@ int main(int argc, char **argv) {
grpc_closure destroyed;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
- grpc_iomgr_init();
+ grpc_init();
grpc_pollset_init(&g_pollset);
test_no_op();
@@ -185,6 +324,6 @@ int main(int argc, char **argv) {
grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
grpc_exec_ctx_finish(&exec_ctx);
- grpc_iomgr_shutdown();
+ grpc_shutdown();
return 0;
}
diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c
index 28e521221b..57225aa8a3 100644
--- a/test/core/util/reconnect_server.c
+++ b/test/core/util/reconnect_server.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -66,7 +66,8 @@ static void pretty_print_backoffs(reconnect_server *server) {
}
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) {
+static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+ grpc_tcp_server_acceptor *acceptor) {
char *peer;
char *last_colon;
reconnect_server *server = (reconnect_server *)arg;
diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c
index 53b574d285..aaba7be356 100644
--- a/test/core/util/test_tcp_server.c
+++ b/test/core/util/test_tcp_server.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -45,10 +45,17 @@
#include "src/core/iomgr/tcp_server.h"
#include "test/core/util/port.h"
+static void on_server_destroyed(grpc_exec_ctx *exec_ctx, void *data,
+ int success) {
+ test_tcp_server *server = data;
+ server->shutdown = 1;
+}
+
void test_tcp_server_init(test_tcp_server *server,
grpc_tcp_server_cb on_connect, void *user_data) {
grpc_init();
server->tcp_server = NULL;
+ grpc_closure_init(&server->shutdown_complete, on_server_destroyed, server);
server->shutdown = 0;
grpc_pollset_init(&server->pollset);
server->pollsets[0] = &server->pollset;
@@ -58,7 +65,6 @@ void test_tcp_server_init(test_tcp_server *server,
void test_tcp_server_start(test_tcp_server *server, int port) {
struct sockaddr_in addr;
- grpc_tcp_listener *listener;
int port_added;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -66,9 +72,9 @@ void test_tcp_server_start(test_tcp_server *server, int port) {
addr.sin_port = htons((uint16_t)port);
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
- server->tcp_server = grpc_tcp_server_create();
- listener = grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr));
- port_added = grpc_tcp_listener_get_port(listener);
+ server->tcp_server = grpc_tcp_server_create(&server->shutdown_complete);
+ port_added =
+ grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr));
GPR_ASSERT(port_added == port);
grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1,
@@ -91,22 +97,14 @@ void test_tcp_server_poll(test_tcp_server *server, int seconds) {
grpc_exec_ctx_finish(&exec_ctx);
}
-static void on_server_destroyed(grpc_exec_ctx *exec_ctx, void *data,
- int success) {
- test_tcp_server *server = data;
- server->shutdown = 1;
-}
-
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {}
void test_tcp_server_destroy(test_tcp_server *server) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_timespec shutdown_deadline;
- grpc_closure server_shutdown_cb;
grpc_closure do_nothing_cb;
- grpc_closure_init(&server_shutdown_cb, on_server_destroyed, server);
+ grpc_tcp_server_unref(&exec_ctx, server->tcp_server);
grpc_closure_init(&do_nothing_cb, do_nothing, NULL);
- grpc_tcp_server_destroy(&exec_ctx, server->tcp_server, &server_shutdown_cb);
shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(5, GPR_TIMESPAN));
while (!server->shutdown &&
diff --git a/test/core/util/test_tcp_server.h b/test/core/util/test_tcp_server.h
index deb65eef11..51119cf6c8 100644
--- a/test/core/util/test_tcp_server.h
+++ b/test/core/util/test_tcp_server.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -39,6 +39,7 @@
typedef struct test_tcp_server {
grpc_tcp_server *tcp_server;
+ grpc_closure shutdown_complete;
int shutdown;
grpc_pollset pollset;
grpc_pollset *pollsets[1];
diff --git a/tools/run_tests/build_artifacts.py b/tools/run_tests/build_artifacts.py
index 0fd5bc63f3..ff9dd4735a 100755
--- a/tools/run_tests/build_artifacts.py
+++ b/tools/run_tests/build_artifacts.py
@@ -129,18 +129,19 @@ class CSharpExtArtifact:
'/p:PlatformToolset=v120',
'/p:Platform=%s' % msbuild_platform],
shell=True)
- if self.platform == 'linux':
- environ = {'CONFIG': 'opt'}
- return create_docker_jobspec(self.name,
- 'tools/jenkins/grpc_artifact_linux_%s' % self.arch,
- 'tools/run_tests/build_artifact_csharp.sh')
else:
- environ = {'CONFIG': 'opt'}
- if self.platform == 'macos':
+ environ = {'CONFIG': 'opt',
+ 'EMBED_OPENSSL': 'true',
+ 'EMBED_ZLIB': 'true'}
+ if self.platform == 'linux':
+ return create_docker_jobspec(self.name,
+ 'tools/jenkins/grpc_artifact_linux_%s' % self.arch,
+ 'tools/run_tests/build_artifact_csharp.sh')
+ else:
environ.update(macos_arch_env(self.arch))
- return create_jobspec(self.name,
- ['tools/run_tests/build_artifact_csharp.sh'],
- environ=environ)
+ return create_jobspec(self.name,
+ ['tools/run_tests/build_artifact_csharp.sh'],
+ environ=environ)
def __str__(self):
return self.name
diff --git a/tools/run_tests/post_tests_ruby.sh b/tools/run_tests/post_tests_ruby.sh
index 66a9fbc534..0877e44805 100755
--- a/tools/run_tests/post_tests_ruby.sh
+++ b/tools/run_tests/post_tests_ruby.sh
@@ -43,4 +43,4 @@ genhtml $tmp2 --output-directory $out
rm $tmp2
rm $tmp1
-cp -rv $root/src/ruby/coverage $root/reports/ruby
+cp -rv $root/coverage $root/reports/ruby
diff --git a/tools/run_tests/pre_build_csharp.sh b/tools/run_tests/pre_build_csharp.sh
index 42ff60bea2..4341c0256f 100755
--- a/tools/run_tests/pre_build_csharp.sh
+++ b/tools/run_tests/pre_build_csharp.sh
@@ -1,5 +1,5 @@
#!/bin/bash
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -35,6 +35,11 @@ cd $(dirname $0)/../../src/csharp
root=`pwd`
+if [ -x "$(command -v nuget)" ]
+then
+ nuget restore Grpc.sln
+fi
+
if [ -n "$NUGET" ]
then
$NUGET restore Grpc.sln
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 5124becfbe..8c76b3f134 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -1407,6 +1407,7 @@
"posix",
"windows"
],
+ "cpu_cost": 1.0,
"exclude_configs": [],
"flaky": false,
"language": "c",
diff --git a/vsprojects/vcxproj/test/tag_set_test/tag_set_test.vcxproj b/vsprojects/vcxproj/test/tag_set_test/tag_set_test.vcxproj
index 3894902672..9a468af3ad 100644
--- a/vsprojects/vcxproj/test/tag_set_test/tag_set_test.vcxproj
+++ b/vsprojects/vcxproj/test/tag_set_test/tag_set_test.vcxproj
@@ -63,12 +63,14 @@
<TargetName>tag_set_test</TargetName>
<Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
<Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)'=='Release'">
<TargetName>tag_set_test</TargetName>
<Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
<Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">