diff options
author | Yang Gao <yangg@google.com> | 2015-03-06 10:03:25 -0800 |
---|---|---|
committer | Yang Gao <yangg@google.com> | 2015-03-06 10:03:25 -0800 |
commit | df6e45c52a40aca45485cc087f11134d4714954e (patch) | |
tree | 713eb3b31c87425bb79606bdc9ceddc4532e4361 | |
parent | 1c40233814db12cca53857241c7314b8ef14ea54 (diff) | |
parent | ee092fcc62bdbae5d5351ad67661c84ff92e33c9 (diff) |
Merge branch 'master' into untypedAPI
-rw-r--r-- | src/core/security/server_secure_chttp2.c | 73 | ||||
-rw-r--r-- | src/core/surface/channel.c | 2 | ||||
-rw-r--r-- | src/core/surface/init.c | 16 | ||||
-rw-r--r-- | src/core/surface/init.h | 1 | ||||
-rw-r--r-- | src/core/surface/server.c | 4 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 7 | ||||
-rw-r--r-- | test/compiler/python_plugin_test.py | 50 | ||||
-rwxr-xr-x | tools/gce_setup/cloud_prod_runner.sh | 2 | ||||
-rwxr-xr-x | tools/gce_setup/interop_test_runner.sh | 5 | ||||
-rwxr-xr-x | tools/run_tests/python_tests.json | 64 | ||||
-rwxr-xr-x | tools/run_tests/run_python.sh | 2 | ||||
-rwxr-xr-x | tools/run_tests/run_tests.py | 11 |
12 files changed, 176 insertions, 61 deletions
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index c88f0726bb..bd6f8473cb 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -35,6 +35,7 @@ #include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" +#include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_server.h" #include "src/core/security/security_context.h" @@ -43,8 +44,27 @@ #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include <grpc/support/useful.h> +typedef struct grpc_server_secure_state { + grpc_server *server; + grpc_tcp_server *tcp; + int is_shutdown; + gpr_mu mu; + gpr_refcount refcount; +} grpc_server_secure_state; + +static void state_ref(grpc_server_secure_state *state) { + gpr_ref(&state->refcount); +} + +static void state_unref(grpc_server_secure_state *state) { + if (gpr_unref(&state->refcount)) { + gpr_free(state); + } +} + static grpc_transport_setup_result setup_transport(void *server, grpc_transport *transport, grpc_mdctx *mdctx) { @@ -54,44 +74,62 @@ static grpc_transport_setup_result setup_transport(void *server, GPR_ARRAY_SIZE(extra_filters), mdctx); } -static void on_secure_transport_setup_done(void *server, +static void on_secure_transport_setup_done(void *statep, grpc_security_status status, grpc_endpoint *secure_endpoint) { + grpc_server_secure_state *state = statep; if (status == GRPC_SECURITY_OK) { - grpc_create_chttp2_transport( - setup_transport, server, grpc_server_get_channel_args(server), - secure_endpoint, NULL, 0, grpc_mdctx_create(), 0); + gpr_mu_lock(&state->mu); + if (!state->is_shutdown) { + grpc_create_chttp2_transport( + setup_transport, state->server, + grpc_server_get_channel_args(state->server), + secure_endpoint, NULL, 0, grpc_mdctx_create(), 0); + } else { + /* We need to consume this here, because the server may already have gone + * away. */ + grpc_endpoint_destroy(secure_endpoint); + } + gpr_mu_unlock(&state->mu); } else { gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } + state_unref(state); } -static void on_accept(void *server, grpc_endpoint *tcp) { - const grpc_channel_args *args = grpc_server_get_channel_args(server); +static void on_accept(void *statep, grpc_endpoint *tcp) { + grpc_server_secure_state *state = statep; + const grpc_channel_args *args = grpc_server_get_channel_args(state->server); grpc_security_context *ctx = grpc_find_security_context_in_args(args); GPR_ASSERT(ctx); - grpc_setup_secure_transport(ctx, tcp, on_secure_transport_setup_done, server); + state_ref(state); + grpc_setup_secure_transport(ctx, tcp, on_secure_transport_setup_done, state); } /* Note: the following code is the same with server_chttp2.c */ /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, +static void start(grpc_server *server, void *statep, grpc_pollset **pollsets, size_t pollset_count) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollsets, pollset_count, on_accept, server); + grpc_server_secure_state *state = statep; + grpc_tcp_server_start(state->tcp, pollsets, pollset_count, on_accept, state); } /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ -static void destroy(grpc_server *server, void *tcpp) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_destroy(tcp); +static void destroy(grpc_server *server, void *statep) { + grpc_server_secure_state *state = statep; + gpr_mu_lock(&state->mu); + state->is_shutdown = 1; + grpc_tcp_server_destroy(state->tcp); + gpr_mu_unlock(&state->mu); + state_unref(state); } int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses *resolved = NULL; grpc_tcp_server *tcp = NULL; + grpc_server_secure_state *state = NULL; size_t i; unsigned count = 0; int port_num = -1; @@ -132,8 +170,15 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr) { } grpc_resolved_addresses_destroy(resolved); + state = gpr_malloc(sizeof(*state)); + state->server = server; + state->tcp = tcp; + state->is_shutdown = 0; + gpr_mu_init(&state->mu); + gpr_ref_init(&state->refcount, 1); + /* Register with the server only upon success */ - grpc_server_add_listener(server, tcp, start, destroy); + grpc_server_add_listener(server, state, start, destroy); return port_num; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index e38734c6a4..e764a3b9af 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -39,6 +39,7 @@ #include "src/core/iomgr/iomgr.h" #include "src/core/surface/call.h" #include "src/core/surface/client.h" +#include "src/core/surface/init.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -63,6 +64,7 @@ grpc_channel *grpc_channel_create_from_filters( size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); + GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */ gpr_ref_init(&channel->refs, 1 + is_client); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 4db66fb66e..e48c4202e5 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -40,17 +40,17 @@ #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" -static gpr_once g_init = GPR_ONCE_INIT; +static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; static int g_initializations; -static void do_init(void) { +static void do_basic_init(void) { gpr_mu_init(&g_init_mu); g_initializations = 0; } void grpc_init(void) { - gpr_once_init(&g_init, do_init); + gpr_once_init(&g_basic_init, do_basic_init); gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { @@ -73,3 +73,13 @@ void grpc_shutdown(void) { } gpr_mu_unlock(&g_init_mu); } + +int grpc_is_initialized(void) { + int r; + gpr_once_init(&g_basic_init, do_basic_init); + gpr_mu_lock(&g_init_mu); + r = g_initializations > 0; + gpr_mu_unlock(&g_init_mu); + return r; +} + diff --git a/src/core/surface/init.h b/src/core/surface/init.h index ab40bedf87..416874020d 100644 --- a/src/core/surface/init.h +++ b/src/core/surface/init.h @@ -35,5 +35,6 @@ #define GRPC_INTERNAL_CORE_SURFACE_INIT_H void grpc_security_pre_init(void); +int grpc_is_initialized(void); #endif /* GRPC_INTERNAL_CORE_SURFACE_INIT_H */ diff --git a/src/core/surface/server.c b/src/core/surface/server.c index c99a1b4cc9..424734c54c 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -44,6 +44,7 @@ #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" +#include "src/core/surface/init.h" #include "src/core/transport/metadata.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -612,6 +613,9 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, int census_enabled = grpc_channel_args_is_census_enabled(args); grpc_server *server = gpr_malloc(sizeof(grpc_server)); + + GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); + memset(server, 0, sizeof(grpc_server)); if (cq) addcq(server, cq); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index fd702593b8..27434b39e2 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -53,6 +53,13 @@ static grpc_transport_setup_result setup_transport(void *server, } static void new_transport(void *server, grpc_endpoint *tcp) { + /* + * Beware that the call to grpc_create_chttp2_transport() has to happen before + * grpc_tcp_server_destroy(). This is fine here, but similar code + * asynchronously doing a handshake instead of calling grpc_tcp_server_start() + * (as in server_secure_chttp2.c) needs to add synchronization to avoid this + * case. + */ grpc_create_chttp2_transport(setup_transport, server, grpc_server_get_channel_args(server), tcp, NULL, 0, grpc_mdctx_create(), 0); diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py index f16682862c..9cf3c624c0 100644 --- a/test/compiler/python_plugin_test.py +++ b/test/compiler/python_plugin_test.py @@ -32,8 +32,10 @@ import contextlib import errno import itertools import os +import shutil import subprocess import sys +import tempfile import time import unittest @@ -55,8 +57,8 @@ DOES_NOT_MATTER_DELAY = 0 NO_DELAY = 0 LONG_DELAY = 1 -# Assigned in __main__. -_build_mode = None +# Build mode environment variable set by tools/run_tests/run_tests.py. +_build_mode = os.environ['CONFIG'] class _ServicerMethods(object): @@ -227,24 +229,26 @@ class PythonPluginTest(unittest.TestCase): protoc_command = 'protoc' # Ensure that the output directory exists. - outdir = '../../gens/test/compiler/python' - try: - os.makedirs(outdir) - except OSError as exception: - if exception.errno != errno.EEXIST: - raise + self.outdir = tempfile.mkdtemp() # Invoke protoc with the plugin. cmd = [ protoc_command, '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename, '-I %s' % os.path.dirname(test_proto_filename), - '--python_out=%s' % outdir, - '--python-grpc_out=%s' % outdir, + '--python_out=%s' % self.outdir, + '--python-grpc_out=%s' % self.outdir, os.path.basename(test_proto_filename), ] subprocess.call(' '.join(cmd), shell=True) - sys.path.append(outdir) + sys.path.append(self.outdir) + + def tearDown(self): + try: + shutil.rmtree(self.outdir) + except OSError as exc: + if exc.errno != errno.ENOENT: + raise # TODO(atash): Figure out which of theses tests is hanging flakily with small # probability. @@ -296,6 +300,8 @@ class PythonPluginTest(unittest.TestCase): with self.assertRaises(exceptions.ExpirationError): response_future.result() + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') def testUnaryCallAsyncCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top request = test_pb2.SimpleRequest(response_size=13) @@ -325,6 +331,8 @@ class PythonPluginTest(unittest.TestCase): expected_response, response = check self.assertEqual(expected_response, response) + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') def testStreamingOutputCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top request = StreamingOutputRequest(test_pb2) @@ -335,6 +343,8 @@ class PythonPluginTest(unittest.TestCase): with self.assertRaises(exceptions.ExpirationError): list(responses) + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') def testStreamingOutputCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top request = StreamingOutputRequest(test_pb2) @@ -359,6 +369,8 @@ class PythonPluginTest(unittest.TestCase): with self.assertRaises(exceptions.ServicerError): next(responses) + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') def testStreamingInputCall(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): @@ -426,6 +438,8 @@ class PythonPluginTest(unittest.TestCase): expected_response, response = check self.assertEqual(expected_response, response) + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') def testFullDuplexCallExpired(self): import test_pb2 # pylint: disable=g-import-not-at-top request = FullDuplexRequest(test_pb2) @@ -436,6 +450,8 @@ class PythonPluginTest(unittest.TestCase): with self.assertRaises(exceptions.ExpirationError): list(responses) + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') def testFullDuplexCallCancelled(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server): @@ -459,6 +475,8 @@ class PythonPluginTest(unittest.TestCase): with self.assertRaises(exceptions.ServicerError): next(responses) + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') def testHalfDuplexCall(self): import test_pb2 # pylint: disable=g-import-not-at-top with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as ( @@ -502,14 +520,4 @@ class PythonPluginTest(unittest.TestCase): if __name__ == '__main__': os.chdir(os.path.dirname(sys.argv[0])) - parser = argparse.ArgumentParser( - description='Run Python compiler plugin test.') - parser.add_argument( - '--build_mode', dest='build_mode', type=str, default='dbg', - help='The build mode of the targets to test, e.g. "dbg", "opt", "asan", ' - 'etc.') - parser.add_argument('--port', dest='port', type=int, default=0) - args, remainder = parser.parse_known_args() - _build_mode = args.build_mode - sys.argv[1:] = remainder unittest.main() diff --git a/tools/gce_setup/cloud_prod_runner.sh b/tools/gce_setup/cloud_prod_runner.sh index 3760ae4979..520dfcd998 100755 --- a/tools/gce_setup/cloud_prod_runner.sh +++ b/tools/gce_setup/cloud_prod_runner.sh @@ -34,7 +34,7 @@ main() { # temporarily remove ping_pong and cancel_after_first_response while investigating timeout test_cases=(large_unary empty_unary client_streaming server_streaming cancel_after_begin) auth_test_cases=(service_account_creds compute_engine_creds) - clients=(cxx java go ruby node) + clients=(cxx java go ruby node csharp_mono) for test_case in "${test_cases[@]}" do for client in "${clients[@]}" diff --git a/tools/gce_setup/interop_test_runner.sh b/tools/gce_setup/interop_test_runner.sh index 5f8c0e7064..ebc631c1fd 100755 --- a/tools/gce_setup/interop_test_runner.sh +++ b/tools/gce_setup/interop_test_runner.sh @@ -35,8 +35,9 @@ echo $result_file_name main() { source grpc_docker.sh - test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response) - clients=(cxx java go ruby node) + # temporarily remove ping_pong and cancel_after_first_response while investigating timeout + test_cases=(large_unary empty_unary client_streaming server_streaming cancel_after_begin) + clients=(cxx java go ruby node csharp_mono) servers=(cxx java go ruby node python) for test_case in "${test_cases[@]}" do diff --git a/tools/run_tests/python_tests.json b/tools/run_tests/python_tests.json index 9e5b1365e6..4b43ee8357 100755 --- a/tools/run_tests/python_tests.json +++ b/tools/run_tests/python_tests.json @@ -1,18 +1,50 @@ [ - "grpc._adapter._blocking_invocation_inline_service_test", - "grpc._adapter._c_test", - "grpc._adapter._event_invocation_synchronous_event_service_test", - "grpc._adapter._future_invocation_asynchronous_event_service_test", - "grpc._adapter._links_test", - "grpc._adapter._lonely_rear_link_test", - "grpc._adapter._low_test", - "grpc.early_adopter.implementations_test", - "grpc.framework.assembly.implementations_test", - "grpc.framework.base.packets.implementations_test", - "grpc.framework.face.blocking_invocation_inline_service_test", - "grpc.framework.face.event_invocation_synchronous_event_service_test", - "grpc.framework.face.future_invocation_asynchronous_event_service_test", - "grpc.framework.foundation._later_test", - "grpc.framework.foundation._logging_pool_test" + { + "file": "test/compiler/python_plugin_test.py" + }, + { + "module": "grpc._adapter._blocking_invocation_inline_service_test" + }, + { + "module": "grpc._adapter._c_test" + }, + { + "module": "grpc._adapter._event_invocation_synchronous_event_service_test" + }, + { + "module": "grpc._adapter._future_invocation_asynchronous_event_service_test" + }, + { + "module": "grpc._adapter._links_test" + }, + { + "module": "grpc._adapter._lonely_rear_link_test" + }, + { + "module": "grpc._adapter._low_test" + }, + { + "module": "grpc.early_adopter.implementations_test" + }, + { + "module": "grpc.framework.assembly.implementations_test" + }, + { + "module": "grpc.framework.base.packets.implementations_test" + }, + { + "module": "grpc.framework.face.blocking_invocation_inline_service_test" + }, + { + "module": "grpc.framework.face.event_invocation_synchronous_event_service_test" + }, + { + "module": "grpc.framework.face.future_invocation_asynchronous_event_service_test" + }, + { + "module": "grpc.framework.foundation._later_test" + }, + { + "module": "grpc.framework.foundation._logging_pool_test" + } ] - diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh index 403862b0a0..fa1497aee4 100755 --- a/tools/run_tests/run_python.sh +++ b/tools/run_tests/run_python.sh @@ -36,4 +36,4 @@ cd $(dirname $0)/../.. root=`pwd` export LD_LIBRARY_PATH=$root/libs/opt source python2.7_virtual_environment/bin/activate -python2.7 -B -m $* +python2.7 -B $* diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 0f4544a5c6..12a45f3ad9 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -170,11 +170,16 @@ class PythonLanguage(object): self._tests = json.load(f) def test_specs(self, config, travis): - return [config.job_spec(['tools/run_tests/run_python.sh', test], None) - for test in self._tests] + modules = [config.job_spec(['tools/run_tests/run_python.sh', '-m', + test['module']], None) + for test in self._tests if 'module' in test] + files = [config.job_spec(['tools/run_tests/run_python.sh', + test['file']], None) + for test in self._tests if 'file' in test] + return files + modules def make_targets(self): - return ['static_c'] + return ['static_c', 'grpc_python_plugin'] def build_steps(self): return [['tools/run_tests/build_python.sh']] |