aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-03-06 10:03:25 -0800
committerGravatar Yang Gao <yangg@google.com>2015-03-06 10:03:25 -0800
commitdf6e45c52a40aca45485cc087f11134d4714954e (patch)
tree713eb3b31c87425bb79606bdc9ceddc4532e4361
parent1c40233814db12cca53857241c7314b8ef14ea54 (diff)
parentee092fcc62bdbae5d5351ad67661c84ff92e33c9 (diff)
Merge branch 'master' into untypedAPI
-rw-r--r--src/core/security/server_secure_chttp2.c73
-rw-r--r--src/core/surface/channel.c2
-rw-r--r--src/core/surface/init.c16
-rw-r--r--src/core/surface/init.h1
-rw-r--r--src/core/surface/server.c4
-rw-r--r--src/core/surface/server_chttp2.c7
-rw-r--r--test/compiler/python_plugin_test.py50
-rwxr-xr-xtools/gce_setup/cloud_prod_runner.sh2
-rwxr-xr-xtools/gce_setup/interop_test_runner.sh5
-rwxr-xr-xtools/run_tests/python_tests.json64
-rwxr-xr-xtools/run_tests/run_python.sh2
-rwxr-xr-xtools/run_tests/run_tests.py11
12 files changed, 176 insertions, 61 deletions
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index c88f0726bb..bd6f8473cb 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -35,6 +35,7 @@
#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
+#include "src/core/iomgr/endpoint.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/iomgr/tcp_server.h"
#include "src/core/security/security_context.h"
@@ -43,8 +44,27 @@
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+typedef struct grpc_server_secure_state {
+ grpc_server *server;
+ grpc_tcp_server *tcp;
+ int is_shutdown;
+ gpr_mu mu;
+ gpr_refcount refcount;
+} grpc_server_secure_state;
+
+static void state_ref(grpc_server_secure_state *state) {
+ gpr_ref(&state->refcount);
+}
+
+static void state_unref(grpc_server_secure_state *state) {
+ if (gpr_unref(&state->refcount)) {
+ gpr_free(state);
+ }
+}
+
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
@@ -54,44 +74,62 @@ static grpc_transport_setup_result setup_transport(void *server,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
-static void on_secure_transport_setup_done(void *server,
+static void on_secure_transport_setup_done(void *statep,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
+ grpc_server_secure_state *state = statep;
if (status == GRPC_SECURITY_OK) {
- grpc_create_chttp2_transport(
- setup_transport, server, grpc_server_get_channel_args(server),
- secure_endpoint, NULL, 0, grpc_mdctx_create(), 0);
+ gpr_mu_lock(&state->mu);
+ if (!state->is_shutdown) {
+ grpc_create_chttp2_transport(
+ setup_transport, state->server,
+ grpc_server_get_channel_args(state->server),
+ secure_endpoint, NULL, 0, grpc_mdctx_create(), 0);
+ } else {
+ /* We need to consume this here, because the server may already have gone
+ * away. */
+ grpc_endpoint_destroy(secure_endpoint);
+ }
+ gpr_mu_unlock(&state->mu);
} else {
gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
}
+ state_unref(state);
}
-static void on_accept(void *server, grpc_endpoint *tcp) {
- const grpc_channel_args *args = grpc_server_get_channel_args(server);
+static void on_accept(void *statep, grpc_endpoint *tcp) {
+ grpc_server_secure_state *state = statep;
+ const grpc_channel_args *args = grpc_server_get_channel_args(state->server);
grpc_security_context *ctx = grpc_find_security_context_in_args(args);
GPR_ASSERT(ctx);
- grpc_setup_secure_transport(ctx, tcp, on_secure_transport_setup_done, server);
+ state_ref(state);
+ grpc_setup_secure_transport(ctx, tcp, on_secure_transport_setup_done, state);
}
/* Note: the following code is the same with server_chttp2.c */
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets,
+static void start(grpc_server *server, void *statep, grpc_pollset **pollsets,
size_t pollset_count) {
- grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollsets, pollset_count, on_accept, server);
+ grpc_server_secure_state *state = statep;
+ grpc_tcp_server_start(state->tcp, pollsets, pollset_count, on_accept, state);
}
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
-static void destroy(grpc_server *server, void *tcpp) {
- grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_destroy(tcp);
+static void destroy(grpc_server *server, void *statep) {
+ grpc_server_secure_state *state = statep;
+ gpr_mu_lock(&state->mu);
+ state->is_shutdown = 1;
+ grpc_tcp_server_destroy(state->tcp);
+ gpr_mu_unlock(&state->mu);
+ state_unref(state);
}
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr) {
grpc_resolved_addresses *resolved = NULL;
grpc_tcp_server *tcp = NULL;
+ grpc_server_secure_state *state = NULL;
size_t i;
unsigned count = 0;
int port_num = -1;
@@ -132,8 +170,15 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr) {
}
grpc_resolved_addresses_destroy(resolved);
+ state = gpr_malloc(sizeof(*state));
+ state->server = server;
+ state->tcp = tcp;
+ state->is_shutdown = 0;
+ gpr_mu_init(&state->mu);
+ gpr_ref_init(&state->refcount, 1);
+
/* Register with the server only upon success */
- grpc_server_add_listener(server, tcp, start, destroy);
+ grpc_server_add_listener(server, state, start, destroy);
return port_num;
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index e38734c6a4..e764a3b9af 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -39,6 +39,7 @@
#include "src/core/iomgr/iomgr.h"
#include "src/core/surface/call.h"
#include "src/core/surface/client.h"
+#include "src/core/surface/init.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -63,6 +64,7 @@ grpc_channel *grpc_channel_create_from_filters(
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
+ GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
/* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */
gpr_ref_init(&channel->refs, 1 + is_client);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 4db66fb66e..e48c4202e5 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -40,17 +40,17 @@
#include "src/core/surface/surface_trace.h"
#include "src/core/transport/chttp2_transport.h"
-static gpr_once g_init = GPR_ONCE_INIT;
+static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static int g_initializations;
-static void do_init(void) {
+static void do_basic_init(void) {
gpr_mu_init(&g_init_mu);
g_initializations = 0;
}
void grpc_init(void) {
- gpr_once_init(&g_init, do_init);
+ gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
@@ -73,3 +73,13 @@ void grpc_shutdown(void) {
}
gpr_mu_unlock(&g_init_mu);
}
+
+int grpc_is_initialized(void) {
+ int r;
+ gpr_once_init(&g_basic_init, do_basic_init);
+ gpr_mu_lock(&g_init_mu);
+ r = g_initializations > 0;
+ gpr_mu_unlock(&g_init_mu);
+ return r;
+}
+
diff --git a/src/core/surface/init.h b/src/core/surface/init.h
index ab40bedf87..416874020d 100644
--- a/src/core/surface/init.h
+++ b/src/core/surface/init.h
@@ -35,5 +35,6 @@
#define GRPC_INTERNAL_CORE_SURFACE_INIT_H
void grpc_security_pre_init(void);
+int grpc_is_initialized(void);
#endif /* GRPC_INTERNAL_CORE_SURFACE_INIT_H */
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index c99a1b4cc9..424734c54c 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -44,6 +44,7 @@
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
+#include "src/core/surface/init.h"
#include "src/core/transport/metadata.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -612,6 +613,9 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
int census_enabled = grpc_channel_args_is_census_enabled(args);
grpc_server *server = gpr_malloc(sizeof(grpc_server));
+
+ GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
+
memset(server, 0, sizeof(grpc_server));
if (cq) addcq(server, cq);
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index fd702593b8..27434b39e2 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -53,6 +53,13 @@ static grpc_transport_setup_result setup_transport(void *server,
}
static void new_transport(void *server, grpc_endpoint *tcp) {
+ /*
+ * Beware that the call to grpc_create_chttp2_transport() has to happen before
+ * grpc_tcp_server_destroy(). This is fine here, but similar code
+ * asynchronously doing a handshake instead of calling grpc_tcp_server_start()
+ * (as in server_secure_chttp2.c) needs to add synchronization to avoid this
+ * case.
+ */
grpc_create_chttp2_transport(setup_transport, server,
grpc_server_get_channel_args(server), tcp, NULL,
0, grpc_mdctx_create(), 0);
diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py
index f16682862c..9cf3c624c0 100644
--- a/test/compiler/python_plugin_test.py
+++ b/test/compiler/python_plugin_test.py
@@ -32,8 +32,10 @@ import contextlib
import errno
import itertools
import os
+import shutil
import subprocess
import sys
+import tempfile
import time
import unittest
@@ -55,8 +57,8 @@ DOES_NOT_MATTER_DELAY = 0
NO_DELAY = 0
LONG_DELAY = 1
-# Assigned in __main__.
-_build_mode = None
+# Build mode environment variable set by tools/run_tests/run_tests.py.
+_build_mode = os.environ['CONFIG']
class _ServicerMethods(object):
@@ -227,24 +229,26 @@ class PythonPluginTest(unittest.TestCase):
protoc_command = 'protoc'
# Ensure that the output directory exists.
- outdir = '../../gens/test/compiler/python'
- try:
- os.makedirs(outdir)
- except OSError as exception:
- if exception.errno != errno.EEXIST:
- raise
+ self.outdir = tempfile.mkdtemp()
# Invoke protoc with the plugin.
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
'-I %s' % os.path.dirname(test_proto_filename),
- '--python_out=%s' % outdir,
- '--python-grpc_out=%s' % outdir,
+ '--python_out=%s' % self.outdir,
+ '--python-grpc_out=%s' % self.outdir,
os.path.basename(test_proto_filename),
]
subprocess.call(' '.join(cmd), shell=True)
- sys.path.append(outdir)
+ sys.path.append(self.outdir)
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.outdir)
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ raise
# TODO(atash): Figure out which of theses tests is hanging flakily with small
# probability.
@@ -296,6 +300,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testUnaryCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
@@ -325,6 +331,8 @@ class PythonPluginTest(unittest.TestCase):
expected_response, response = check
self.assertEqual(expected_response, response)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testStreamingOutputCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputRequest(test_pb2)
@@ -335,6 +343,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ExpirationError):
list(responses)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testStreamingOutputCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputRequest(test_pb2)
@@ -359,6 +369,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ServicerError):
next(responses)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testStreamingInputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
@@ -426,6 +438,8 @@ class PythonPluginTest(unittest.TestCase):
expected_response, response = check
self.assertEqual(expected_response, response)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testFullDuplexCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = FullDuplexRequest(test_pb2)
@@ -436,6 +450,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ExpirationError):
list(responses)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testFullDuplexCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
@@ -459,6 +475,8 @@ class PythonPluginTest(unittest.TestCase):
with self.assertRaises(exceptions.ServicerError):
next(responses)
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
def testHalfDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
@@ -502,14 +520,4 @@ class PythonPluginTest(unittest.TestCase):
if __name__ == '__main__':
os.chdir(os.path.dirname(sys.argv[0]))
- parser = argparse.ArgumentParser(
- description='Run Python compiler plugin test.')
- parser.add_argument(
- '--build_mode', dest='build_mode', type=str, default='dbg',
- help='The build mode of the targets to test, e.g. "dbg", "opt", "asan", '
- 'etc.')
- parser.add_argument('--port', dest='port', type=int, default=0)
- args, remainder = parser.parse_known_args()
- _build_mode = args.build_mode
- sys.argv[1:] = remainder
unittest.main()
diff --git a/tools/gce_setup/cloud_prod_runner.sh b/tools/gce_setup/cloud_prod_runner.sh
index 3760ae4979..520dfcd998 100755
--- a/tools/gce_setup/cloud_prod_runner.sh
+++ b/tools/gce_setup/cloud_prod_runner.sh
@@ -34,7 +34,7 @@ main() {
# temporarily remove ping_pong and cancel_after_first_response while investigating timeout
test_cases=(large_unary empty_unary client_streaming server_streaming cancel_after_begin)
auth_test_cases=(service_account_creds compute_engine_creds)
- clients=(cxx java go ruby node)
+ clients=(cxx java go ruby node csharp_mono)
for test_case in "${test_cases[@]}"
do
for client in "${clients[@]}"
diff --git a/tools/gce_setup/interop_test_runner.sh b/tools/gce_setup/interop_test_runner.sh
index 5f8c0e7064..ebc631c1fd 100755
--- a/tools/gce_setup/interop_test_runner.sh
+++ b/tools/gce_setup/interop_test_runner.sh
@@ -35,8 +35,9 @@ echo $result_file_name
main() {
source grpc_docker.sh
- test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response)
- clients=(cxx java go ruby node)
+ # temporarily remove ping_pong and cancel_after_first_response while investigating timeout
+ test_cases=(large_unary empty_unary client_streaming server_streaming cancel_after_begin)
+ clients=(cxx java go ruby node csharp_mono)
servers=(cxx java go ruby node python)
for test_case in "${test_cases[@]}"
do
diff --git a/tools/run_tests/python_tests.json b/tools/run_tests/python_tests.json
index 9e5b1365e6..4b43ee8357 100755
--- a/tools/run_tests/python_tests.json
+++ b/tools/run_tests/python_tests.json
@@ -1,18 +1,50 @@
[
- "grpc._adapter._blocking_invocation_inline_service_test",
- "grpc._adapter._c_test",
- "grpc._adapter._event_invocation_synchronous_event_service_test",
- "grpc._adapter._future_invocation_asynchronous_event_service_test",
- "grpc._adapter._links_test",
- "grpc._adapter._lonely_rear_link_test",
- "grpc._adapter._low_test",
- "grpc.early_adopter.implementations_test",
- "grpc.framework.assembly.implementations_test",
- "grpc.framework.base.packets.implementations_test",
- "grpc.framework.face.blocking_invocation_inline_service_test",
- "grpc.framework.face.event_invocation_synchronous_event_service_test",
- "grpc.framework.face.future_invocation_asynchronous_event_service_test",
- "grpc.framework.foundation._later_test",
- "grpc.framework.foundation._logging_pool_test"
+ {
+ "file": "test/compiler/python_plugin_test.py"
+ },
+ {
+ "module": "grpc._adapter._blocking_invocation_inline_service_test"
+ },
+ {
+ "module": "grpc._adapter._c_test"
+ },
+ {
+ "module": "grpc._adapter._event_invocation_synchronous_event_service_test"
+ },
+ {
+ "module": "grpc._adapter._future_invocation_asynchronous_event_service_test"
+ },
+ {
+ "module": "grpc._adapter._links_test"
+ },
+ {
+ "module": "grpc._adapter._lonely_rear_link_test"
+ },
+ {
+ "module": "grpc._adapter._low_test"
+ },
+ {
+ "module": "grpc.early_adopter.implementations_test"
+ },
+ {
+ "module": "grpc.framework.assembly.implementations_test"
+ },
+ {
+ "module": "grpc.framework.base.packets.implementations_test"
+ },
+ {
+ "module": "grpc.framework.face.blocking_invocation_inline_service_test"
+ },
+ {
+ "module": "grpc.framework.face.event_invocation_synchronous_event_service_test"
+ },
+ {
+ "module": "grpc.framework.face.future_invocation_asynchronous_event_service_test"
+ },
+ {
+ "module": "grpc.framework.foundation._later_test"
+ },
+ {
+ "module": "grpc.framework.foundation._logging_pool_test"
+ }
]
-
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index 403862b0a0..fa1497aee4 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -36,4 +36,4 @@ cd $(dirname $0)/../..
root=`pwd`
export LD_LIBRARY_PATH=$root/libs/opt
source python2.7_virtual_environment/bin/activate
-python2.7 -B -m $*
+python2.7 -B $*
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 0f4544a5c6..12a45f3ad9 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -170,11 +170,16 @@ class PythonLanguage(object):
self._tests = json.load(f)
def test_specs(self, config, travis):
- return [config.job_spec(['tools/run_tests/run_python.sh', test], None)
- for test in self._tests]
+ modules = [config.job_spec(['tools/run_tests/run_python.sh', '-m',
+ test['module']], None)
+ for test in self._tests if 'module' in test]
+ files = [config.job_spec(['tools/run_tests/run_python.sh',
+ test['file']], None)
+ for test in self._tests if 'file' in test]
+ return files + modules
def make_targets(self):
- return ['static_c']
+ return ['static_c', 'grpc_python_plugin']
def build_steps(self):
return [['tools/run_tests/build_python.sh']]