diff options
-rw-r--r-- | src/core/channel/client_channel.c | 25 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 17 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 22 | ||||
-rw-r--r-- | src/core/tsi/ssl_transport_security.c | 13 | ||||
-rw-r--r-- | src/csharp/Grpc.Core.Tests/PInvokeTest.cs | 2 | ||||
-rwxr-xr-x | src/php/lib/Grpc/BaseStub.php | 25 | ||||
-rw-r--r-- | src/php/tests/generated_code/AbstractGeneratedCodeTest.php | 8 | ||||
-rwxr-xr-x | tools/run_tests/port_server.py | 21 | ||||
-rwxr-xr-x | tools/run_tests/run_tests.py | 53 |
9 files changed, 141 insertions, 45 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 08aa721a4d..ce8af96054 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -196,13 +196,12 @@ static int is_empty(void *p, int len) { return 1; } -static void started_call(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { +static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_transport_stream_op op; int have_waiting; - gpr_mu_lock(&calld->mu_state); if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { memset(&op, 0, sizeof(op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; @@ -230,10 +229,18 @@ static void started_call(grpc_exec_ctx *exec_ctx, void *arg, } } +static void started_call(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + call_data *calld = arg; + gpr_mu_lock(&calld->mu_state); + started_call_locked(exec_ctx, arg, iomgr_success); +} + static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; + grpc_subchannel_call_create_status call_creation_status; if (calld->picked_channel == NULL) { /* treat this like a cancellation */ @@ -248,11 +255,15 @@ static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); calld->state = CALL_WAITING_FOR_CALL; pollset = calld->waiting_op.bind_pollset; - gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset, - &calld->subchannel_call, - &calld->async_setup_task); + call_creation_status = grpc_subchannel_create_call( + exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, + &calld->async_setup_task); + if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) { + started_call_locked(exec_ctx, calld, iomgr_success); + } else { + gpr_mu_unlock(&calld->mu_state); + } } } } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a2c521a20d..5e84dec0ca 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -335,18 +335,20 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { + grpc_subchannel_call_create_status call_creation_status; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); - grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset, - w4c->target, w4c->notify); + call_creation_status = grpc_subchannel_create_call( + exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); + GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY); + w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } -void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify) { +grpc_subchannel_call_create_status grpc_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset, + grpc_subchannel_call **target, grpc_closure *notify) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -355,7 +357,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, gpr_mu_unlock(&c->mu); *target = create_call(exec_ctx, con); - notify->cb(exec_ctx, notify->cb_arg, 1); + return GRPC_SUBCHANNEL_CALL_CREATE_READY; } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -380,6 +382,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, } else { gpr_mu_unlock(&c->mu); } + return GRPC_SUBCHANNEL_CALL_CREATE_PENDING; } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 86b7fa5851..a26d08f02e 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -75,12 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -/** construct a call (possibly asynchronously) */ -void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify); +typedef enum { + GRPC_SUBCHANNEL_CALL_CREATE_READY, + GRPC_SUBCHANNEL_CALL_CREATE_PENDING +} grpc_subchannel_call_create_status; + +/** construct a subchannel call (possibly asynchronously). + * + * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will + * return immediately and \a target will point to a connected \a subchannel_call + * instance. Note that \a notify will \em not be invoked in this case. + * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the + * subchannel call will be created asynchronously, invoking the \a notify + * callback upon completion. */ +grpc_subchannel_call_create_status grpc_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, + grpc_subchannel_call **target, grpc_closure *notify); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 05789f07d4..22b57964cc 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -319,8 +319,9 @@ static tsi_result peer_from_x509(X509 *cert, int include_certificate_type, /* TODO(jboeuf): Maybe add more properties. */ GENERAL_NAMES *subject_alt_names = X509_get_ext_d2i(cert, NID_subject_alt_name, 0, 0); - int subject_alt_name_count = - (subject_alt_names != NULL) ? sk_GENERAL_NAME_num(subject_alt_names) : 0; + int subject_alt_name_count = (subject_alt_names != NULL) + ? (int)sk_GENERAL_NAME_num(subject_alt_names) + : 0; size_t property_count; tsi_result result; GPR_ASSERT(subject_alt_name_count >= 0); @@ -358,7 +359,7 @@ static void log_ssl_error_stack(void) { unsigned long err; while ((err = ERR_get_error()) != 0) { char details[256]; - ERR_error_string_n(err, details, sizeof(details)); + ERR_error_string_n((uint32_t)err, details, sizeof(details)); gpr_log(GPR_ERROR, "%s", details); } } @@ -668,7 +669,7 @@ static tsi_result ssl_protector_protect(tsi_frame_protector *self, tsi_result result = TSI_OK; /* First see if we have some pending data in the SSL BIO. */ - int pending_in_ssl = BIO_pending(impl->from_ssl); + int pending_in_ssl = (int)BIO_pending(impl->from_ssl); if (pending_in_ssl > 0) { *unprotected_bytes_size = 0; GPR_ASSERT(*protected_output_frames_size <= INT_MAX); @@ -726,7 +727,7 @@ static tsi_result ssl_protector_protect_flush( impl->buffer_offset = 0; } - pending = BIO_pending(impl->from_ssl); + pending = (int)BIO_pending(impl->from_ssl); GPR_ASSERT(pending >= 0); *still_pending_size = (size_t)pending; if (*still_pending_size == 0) return TSI_OK; @@ -739,7 +740,7 @@ static tsi_result ssl_protector_protect_flush( return TSI_INTERNAL_ERROR; } *protected_output_frames_size = (size_t)read_from_ssl; - pending = BIO_pending(impl->from_ssl); + pending = (int)BIO_pending(impl->from_ssl); GPR_ASSERT(pending >= 0); *still_pending_size = (size_t)pending; return TSI_OK; diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs index 714c2f7494..073c502daf 100644 --- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs +++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs @@ -60,7 +60,7 @@ namespace Grpc.Core.Tests public void CompletionQueueCreateDestroyBenchmark() { BenchmarkUtil.RunBenchmark( - 100000, 1000000, + 10, 10, () => { CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create(); diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index 381b114399..0a3e1f78bf 100755 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -114,7 +114,7 @@ class BaseStub { return true; } if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) { - throw new Exception('Failed to connect to server'); + throw new \Exception('Failed to connect to server'); } return false; } @@ -153,6 +153,25 @@ class BaseStub { return array($metadata_copy, $timeout); } + /** + * validate and normalize the metadata array + * @param $metadata The metadata map + * @return $metadata Validated and key-normalized metadata map + * @throw InvalidArgumentException if key contains invalid characters + */ + private function _validate_and_normalize_metadata($metadata) { + $metadata_copy = array(); + foreach ($metadata as $key => $value) { + if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) { + throw new \InvalidArgumentException( + 'Metadata keys must be nonempty strings containing only '. + 'alphanumeric characters, hyphens and underscores'); + } + $metadata_copy[strtolower($key)] = $value; + } + return $metadata_copy; + } + /* This class is intended to be subclassed by generated code, so all functions begin with "_" to avoid name collisions. */ @@ -178,6 +197,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } + $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); $call->start($argument, $actual_metadata, $options); return $call; } @@ -204,6 +224,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } + $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); $call->start($actual_metadata); return $call; } @@ -231,6 +252,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } + $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); $call->start($argument, $actual_metadata, $options); return $call; } @@ -254,6 +276,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } + $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); $call->start($actual_metadata); return $call; } diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php index 9cee188666..5cdba1e5a0 100644 --- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php +++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php @@ -51,6 +51,14 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase { $this->assertTrue(is_string(self::$client->getTarget())); } + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidMetadata() { + $div_arg = new math\DivArgs(); + $call = self::$client->Div($div_arg, array(' ' => 'abc123')); + } + public function testWriteFlags() { $div_arg = new math\DivArgs(); $div_arg->setDividend(7); diff --git a/tools/run_tests/port_server.py b/tools/run_tests/port_server.py index b953df952c..3b85486ebf 100755 --- a/tools/run_tests/port_server.py +++ b/tools/run_tests/port_server.py @@ -42,7 +42,7 @@ import time # increment this number whenever making a change to ensure that # the changes are picked up by running CI servers # note that all changes must be backwards compatible -_MY_VERSION = 2 +_MY_VERSION = 5 if len(sys.argv) == 2 and sys.argv[1] == 'dump_version': @@ -52,8 +52,16 @@ if len(sys.argv) == 2 and sys.argv[1] == 'dump_version': argp = argparse.ArgumentParser(description='Server for httpcli_test') argp.add_argument('-p', '--port', default=12345, type=int) +argp.add_argument('-l', '--logfile', default=None, type=str) args = argp.parse_args() +if args.logfile is not None: + sys.stdin.close() + sys.stderr.close() + sys.stdout.close() + sys.stderr = open(args.logfile, 'w') + sys.stdout = sys.stderr + print 'port server running on port %d' % args.port pool = [] @@ -119,9 +127,12 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler): self.send_header('Content-Type', 'text/plain') self.end_headers() p = int(self.path[6:]) - del in_use[p] - pool.append(p) - self.log_message('drop port %d' % p) + if p in in_use: + del in_use[p] + pool.append(p) + self.log_message('drop known port %d' % p) + else: + self.log_message('drop unknown port %d' % p) elif self.path == '/version_number': # fetch a version string and the current process pid self.send_response(200) @@ -146,6 +157,6 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler): httpd = BaseHTTPServer.HTTPServer(('', args.port), Handler) while keep_running: httpd.handle_request() + sys.stderr.flush() print 'done' - diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 048ab90798..e9ae9f4795 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -43,6 +43,8 @@ import re import socket import subprocess import sys +import tempfile +import traceback import time import xml.etree.cElementTree as ET import urllib2 @@ -577,7 +579,7 @@ run_configs = set(_CONFIGS[cfg] build_configs = set(cfg.build_config for cfg in run_configs) if args.travis: - _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'surface,batch'} + _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'api'} languages = set(_LANGUAGES[l] for l in itertools.chain.from_iterable( @@ -704,35 +706,62 @@ def _start_port_server(port_server_port): urllib2.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read() time.sleep(1) if not running: - print 'starting port_server' - port_log = open('portlog.txt', 'w') - port_server = subprocess.Popen( - [sys.executable, 'tools/run_tests/port_server.py', '-p', '%d' % port_server_port], - stderr=subprocess.STDOUT, - stdout=port_log) + fd, logfile = tempfile.mkstemp() + os.close(fd) + print 'starting port_server, with log file %s' % logfile + args = [sys.executable, 'tools/run_tests/port_server.py', '-p', '%d' % port_server_port, '-l', logfile] + env = dict(os.environ) + env['BUILD_ID'] = 'pleaseDontKillMeJenkins' + if platform.system() == 'Windows': + port_server = subprocess.Popen( + args, + env=env, + creationflags = 0x00000008, # detached process + close_fds=True) + else: + port_server = subprocess.Popen( + args, + env=env, + preexec_fn=os.setsid, + close_fds=True) + time.sleep(1) # ensure port server is up waits = 0 while True: if waits > 10: + print 'killing port server due to excessive start up waits' port_server.kill() if port_server.poll() is not None: print 'port_server failed to start' - port_log = open('portlog.txt', 'r').read() - print port_log - sys.exit(1) + # try one final time: maybe another build managed to start one + time.sleep(1) + try: + urllib2.urlopen('http://localhost:%d/get' % port_server_port, + timeout=1).read() + print 'last ditch attempt to contact port server succeeded' + break + except: + traceback.print_exc(); + port_log = open(logfile, 'r').read() + print port_log + sys.exit(1) try: urllib2.urlopen('http://localhost:%d/get' % port_server_port, timeout=1).read() + print 'port server is up and ready' break except socket.timeout: print 'waiting for port_server: timeout' - time.sleep(0.5) + traceback.print_exc(); + time.sleep(1) waits += 1 except urllib2.URLError: print 'waiting for port_server: urlerror' - time.sleep(0.5) + traceback.print_exc(); + time.sleep(1) waits += 1 except: + traceback.print_exc(); port_server.kill() raise |