aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/channel/client_channel.c25
-rw-r--r--src/core/client_config/subchannel.c17
-rw-r--r--src/core/client_config/subchannel.h22
-rw-r--r--src/core/tsi/ssl_transport_security.c13
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs2
-rwxr-xr-xsrc/php/lib/Grpc/BaseStub.php25
-rw-r--r--src/php/tests/generated_code/AbstractGeneratedCodeTest.php8
-rwxr-xr-xtools/run_tests/port_server.py21
-rwxr-xr-xtools/run_tests/run_tests.py53
9 files changed, 141 insertions, 45 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 08aa721a4d..ce8af96054 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -196,13 +196,12 @@ static int is_empty(void *p, int len) {
return 1;
}
-static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
- gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
@@ -230,10 +229,18 @@ static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
}
}
+static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ call_data *calld = arg;
+ gpr_mu_lock(&calld->mu_state);
+ started_call_locked(exec_ctx, arg, iomgr_success);
+}
+
static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
+ grpc_subchannel_call_create_status call_creation_status;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
@@ -248,11 +255,15 @@ static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
- gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->async_setup_task, started_call, calld);
- grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
- &calld->subchannel_call,
- &calld->async_setup_task);
+ call_creation_status = grpc_subchannel_create_call(
+ exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call,
+ &calld->async_setup_task);
+ if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
+ started_call_locked(exec_ctx, calld, iomgr_success);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
+ }
}
}
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index a2c521a20d..5e84dec0ca 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -335,18 +335,20 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
+ grpc_subchannel_call_create_status call_creation_status;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
- grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
- w4c->target, w4c->notify);
+ call_creation_status = grpc_subchannel_create_call(
+ exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
+ GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY);
+ w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
- grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify) {
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset,
+ grpc_subchannel_call **target, grpc_closure *notify) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@@ -355,7 +357,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
gpr_mu_unlock(&c->mu);
*target = create_call(exec_ctx, con);
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ return GRPC_SUBCHANNEL_CALL_CREATE_READY;
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
@@ -380,6 +382,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
} else {
gpr_mu_unlock(&c->mu);
}
+ return GRPC_SUBCHANNEL_CALL_CREATE_PENDING;
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 86b7fa5851..a26d08f02e 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -75,12 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a call (possibly asynchronously) */
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify);
+typedef enum {
+ GRPC_SUBCHANNEL_CALL_CREATE_READY,
+ GRPC_SUBCHANNEL_CALL_CREATE_PENDING
+} grpc_subchannel_call_create_status;
+
+/** construct a subchannel call (possibly asynchronously).
+ *
+ * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
+ * return immediately and \a target will point to a connected \a subchannel_call
+ * instance. Note that \a notify will \em not be invoked in this case.
+ * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
+ * subchannel call will be created asynchronously, invoking the \a notify
+ * callback upon completion. */
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset,
+ grpc_subchannel_call **target, grpc_closure *notify);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 05789f07d4..22b57964cc 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -319,8 +319,9 @@ static tsi_result peer_from_x509(X509 *cert, int include_certificate_type,
/* TODO(jboeuf): Maybe add more properties. */
GENERAL_NAMES *subject_alt_names =
X509_get_ext_d2i(cert, NID_subject_alt_name, 0, 0);
- int subject_alt_name_count =
- (subject_alt_names != NULL) ? sk_GENERAL_NAME_num(subject_alt_names) : 0;
+ int subject_alt_name_count = (subject_alt_names != NULL)
+ ? (int)sk_GENERAL_NAME_num(subject_alt_names)
+ : 0;
size_t property_count;
tsi_result result;
GPR_ASSERT(subject_alt_name_count >= 0);
@@ -358,7 +359,7 @@ static void log_ssl_error_stack(void) {
unsigned long err;
while ((err = ERR_get_error()) != 0) {
char details[256];
- ERR_error_string_n(err, details, sizeof(details));
+ ERR_error_string_n((uint32_t)err, details, sizeof(details));
gpr_log(GPR_ERROR, "%s", details);
}
}
@@ -668,7 +669,7 @@ static tsi_result ssl_protector_protect(tsi_frame_protector *self,
tsi_result result = TSI_OK;
/* First see if we have some pending data in the SSL BIO. */
- int pending_in_ssl = BIO_pending(impl->from_ssl);
+ int pending_in_ssl = (int)BIO_pending(impl->from_ssl);
if (pending_in_ssl > 0) {
*unprotected_bytes_size = 0;
GPR_ASSERT(*protected_output_frames_size <= INT_MAX);
@@ -726,7 +727,7 @@ static tsi_result ssl_protector_protect_flush(
impl->buffer_offset = 0;
}
- pending = BIO_pending(impl->from_ssl);
+ pending = (int)BIO_pending(impl->from_ssl);
GPR_ASSERT(pending >= 0);
*still_pending_size = (size_t)pending;
if (*still_pending_size == 0) return TSI_OK;
@@ -739,7 +740,7 @@ static tsi_result ssl_protector_protect_flush(
return TSI_INTERNAL_ERROR;
}
*protected_output_frames_size = (size_t)read_from_ssl;
- pending = BIO_pending(impl->from_ssl);
+ pending = (int)BIO_pending(impl->from_ssl);
GPR_ASSERT(pending >= 0);
*still_pending_size = (size_t)pending;
return TSI_OK;
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 714c2f7494..073c502daf 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -60,7 +60,7 @@ namespace Grpc.Core.Tests
public void CompletionQueueCreateDestroyBenchmark()
{
BenchmarkUtil.RunBenchmark(
- 100000, 1000000,
+ 10, 10,
() =>
{
CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create();
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index 381b114399..0a3e1f78bf 100755
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -114,7 +114,7 @@ class BaseStub {
return true;
}
if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) {
- throw new Exception('Failed to connect to server');
+ throw new \Exception('Failed to connect to server');
}
return false;
}
@@ -153,6 +153,25 @@ class BaseStub {
return array($metadata_copy, $timeout);
}
+ /**
+ * validate and normalize the metadata array
+ * @param $metadata The metadata map
+ * @return $metadata Validated and key-normalized metadata map
+ * @throw InvalidArgumentException if key contains invalid characters
+ */
+ private function _validate_and_normalize_metadata($metadata) {
+ $metadata_copy = array();
+ foreach ($metadata as $key => $value) {
+ if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) {
+ throw new \InvalidArgumentException(
+ 'Metadata keys must be nonempty strings containing only '.
+ 'alphanumeric characters, hyphens and underscores');
+ }
+ $metadata_copy[strtolower($key)] = $value;
+ }
+ return $metadata_copy;
+ }
+
/* This class is intended to be subclassed by generated code, so all functions
begin with "_" to avoid name collisions. */
@@ -178,6 +197,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
+ $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
$call->start($argument, $actual_metadata, $options);
return $call;
}
@@ -204,6 +224,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
+ $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
$call->start($actual_metadata);
return $call;
}
@@ -231,6 +252,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
+ $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
$call->start($argument, $actual_metadata, $options);
return $call;
}
@@ -254,6 +276,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
+ $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata);
$call->start($actual_metadata);
return $call;
}
diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
index 9cee188666..5cdba1e5a0 100644
--- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
+++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
@@ -51,6 +51,14 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase {
$this->assertTrue(is_string(self::$client->getTarget()));
}
+ /**
+ * @expectedException InvalidArgumentException
+ */
+ public function testInvalidMetadata() {
+ $div_arg = new math\DivArgs();
+ $call = self::$client->Div($div_arg, array(' ' => 'abc123'));
+ }
+
public function testWriteFlags() {
$div_arg = new math\DivArgs();
$div_arg->setDividend(7);
diff --git a/tools/run_tests/port_server.py b/tools/run_tests/port_server.py
index b953df952c..3b85486ebf 100755
--- a/tools/run_tests/port_server.py
+++ b/tools/run_tests/port_server.py
@@ -42,7 +42,7 @@ import time
# increment this number whenever making a change to ensure that
# the changes are picked up by running CI servers
# note that all changes must be backwards compatible
-_MY_VERSION = 2
+_MY_VERSION = 5
if len(sys.argv) == 2 and sys.argv[1] == 'dump_version':
@@ -52,8 +52,16 @@ if len(sys.argv) == 2 and sys.argv[1] == 'dump_version':
argp = argparse.ArgumentParser(description='Server for httpcli_test')
argp.add_argument('-p', '--port', default=12345, type=int)
+argp.add_argument('-l', '--logfile', default=None, type=str)
args = argp.parse_args()
+if args.logfile is not None:
+ sys.stdin.close()
+ sys.stderr.close()
+ sys.stdout.close()
+ sys.stderr = open(args.logfile, 'w')
+ sys.stdout = sys.stderr
+
print 'port server running on port %d' % args.port
pool = []
@@ -119,9 +127,12 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
self.send_header('Content-Type', 'text/plain')
self.end_headers()
p = int(self.path[6:])
- del in_use[p]
- pool.append(p)
- self.log_message('drop port %d' % p)
+ if p in in_use:
+ del in_use[p]
+ pool.append(p)
+ self.log_message('drop known port %d' % p)
+ else:
+ self.log_message('drop unknown port %d' % p)
elif self.path == '/version_number':
# fetch a version string and the current process pid
self.send_response(200)
@@ -146,6 +157,6 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
httpd = BaseHTTPServer.HTTPServer(('', args.port), Handler)
while keep_running:
httpd.handle_request()
+ sys.stderr.flush()
print 'done'
-
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 048ab90798..e9ae9f4795 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -43,6 +43,8 @@ import re
import socket
import subprocess
import sys
+import tempfile
+import traceback
import time
import xml.etree.cElementTree as ET
import urllib2
@@ -577,7 +579,7 @@ run_configs = set(_CONFIGS[cfg]
build_configs = set(cfg.build_config for cfg in run_configs)
if args.travis:
- _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'surface,batch'}
+ _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'api'}
languages = set(_LANGUAGES[l]
for l in itertools.chain.from_iterable(
@@ -704,35 +706,62 @@ def _start_port_server(port_server_port):
urllib2.urlopen('http://localhost:%d/quitquitquit' % port_server_port).read()
time.sleep(1)
if not running:
- print 'starting port_server'
- port_log = open('portlog.txt', 'w')
- port_server = subprocess.Popen(
- [sys.executable, 'tools/run_tests/port_server.py', '-p', '%d' % port_server_port],
- stderr=subprocess.STDOUT,
- stdout=port_log)
+ fd, logfile = tempfile.mkstemp()
+ os.close(fd)
+ print 'starting port_server, with log file %s' % logfile
+ args = [sys.executable, 'tools/run_tests/port_server.py', '-p', '%d' % port_server_port, '-l', logfile]
+ env = dict(os.environ)
+ env['BUILD_ID'] = 'pleaseDontKillMeJenkins'
+ if platform.system() == 'Windows':
+ port_server = subprocess.Popen(
+ args,
+ env=env,
+ creationflags = 0x00000008, # detached process
+ close_fds=True)
+ else:
+ port_server = subprocess.Popen(
+ args,
+ env=env,
+ preexec_fn=os.setsid,
+ close_fds=True)
+ time.sleep(1)
# ensure port server is up
waits = 0
while True:
if waits > 10:
+ print 'killing port server due to excessive start up waits'
port_server.kill()
if port_server.poll() is not None:
print 'port_server failed to start'
- port_log = open('portlog.txt', 'r').read()
- print port_log
- sys.exit(1)
+ # try one final time: maybe another build managed to start one
+ time.sleep(1)
+ try:
+ urllib2.urlopen('http://localhost:%d/get' % port_server_port,
+ timeout=1).read()
+ print 'last ditch attempt to contact port server succeeded'
+ break
+ except:
+ traceback.print_exc();
+ port_log = open(logfile, 'r').read()
+ print port_log
+ sys.exit(1)
try:
urllib2.urlopen('http://localhost:%d/get' % port_server_port,
timeout=1).read()
+ print 'port server is up and ready'
break
except socket.timeout:
print 'waiting for port_server: timeout'
- time.sleep(0.5)
+ traceback.print_exc();
+ time.sleep(1)
waits += 1
except urllib2.URLError:
print 'waiting for port_server: urlerror'
- time.sleep(0.5)
+ traceback.print_exc();
+ time.sleep(1)
waits += 1
except:
+ traceback.print_exc();
port_server.kill()
raise