aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Stanley Cheung <stanleycheung@google.com>2015-08-24 16:58:42 -0700
committerGravatar Stanley Cheung <stanleycheung@google.com>2015-09-02 13:34:40 -0700
commit3ab8e79b0111dc93696d72e815531b5ee3124230 (patch)
treebadc54d70b58bb289fda46dba0d0314d1d08a492
parentdea2648a6acb5c16b158335b4deb02caf4f02487 (diff)
php: support per message compression disable
-rw-r--r--src/php/ext/grpc/call.c27
-rw-r--r--src/php/lib/Grpc/AbstractCall.php18
-rwxr-xr-xsrc/php/lib/Grpc/BaseStub.php13
-rw-r--r--src/php/lib/Grpc/BidiStreamingCall.php11
-rw-r--r--src/php/lib/Grpc/ClientStreamingCall.php22
-rw-r--r--src/php/lib/Grpc/ServerStreamingCall.php15
-rw-r--r--src/php/lib/Grpc/UnaryCall.php13
-rw-r--r--src/php/tests/generated_code/AbstractGeneratedCodeTest.php14
-rwxr-xr-xsrc/php/tests/interop/interop_client.php7
-rwxr-xr-xsrc/php/tests/unit_tests/EndToEndTest.php49
-rwxr-xr-xsrc/php/tests/unit_tests/SecureEndToEndTest.php51
11 files changed, 200 insertions, 40 deletions
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index 252623d0c3..3b99de7538 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -265,6 +265,9 @@ PHP_METHOD(Call, startBatch) {
HashTable *array_hash;
HashPosition array_pointer;
HashTable *status_hash;
+ HashTable *message_hash;
+ zval **message_value;
+ zval **message_flags;
char *key;
uint key_len;
ulong index;
@@ -319,13 +322,33 @@ PHP_METHOD(Call, startBatch) {
metadata.metadata;
break;
case GRPC_OP_SEND_MESSAGE:
- if (Z_TYPE_PP(value) != IS_STRING) {
+ if (Z_TYPE_PP(value) != IS_ARRAY) {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "Expected an array for send message",
+ 1 TSRMLS_CC);
+ goto cleanup;
+ }
+ message_hash = Z_ARRVAL_PP(value);
+ if (zend_hash_find(message_hash, "flags", sizeof("flags"),
+ (void **)&message_flags) == SUCCESS) {
+ if (Z_TYPE_PP(message_flags) != IS_LONG) {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "Expected an int for message flags",
+ 1 TSRMLS_CC);
+ }
+ ops[op_num].flags = Z_LVAL_PP(message_flags) & GRPC_WRITE_USED_MASK;
+ }
+ if (zend_hash_find(message_hash, "message", sizeof("message"),
+ (void **)&message_value) != SUCCESS ||
+ Z_TYPE_PP(message_value) != IS_STRING) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"Expected a string for send message",
1 TSRMLS_CC);
+ goto cleanup;
}
ops[op_num].data.send_message =
- string_to_byte_buffer(Z_STRVAL_PP(value), Z_STRLEN_PP(value));
+ string_to_byte_buffer(Z_STRVAL_PP(message_value),
+ Z_STRLEN_PP(message_value));
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
diff --git a/src/php/lib/Grpc/AbstractCall.php b/src/php/lib/Grpc/AbstractCall.php
index 35057224f8..3fdaf2e487 100644
--- a/src/php/lib/Grpc/AbstractCall.php
+++ b/src/php/lib/Grpc/AbstractCall.php
@@ -92,4 +92,20 @@ abstract class AbstractCall {
}
return call_user_func($this->deserialize, $value);
}
-} \ No newline at end of file
+
+ /**
+ * Get the list of Grpc Write Flags
+ * @param array $options an array of options
+ * @return The list of Grpc Write Flags contained in the input
+ */
+ protected static function getGrpcWriteFlags($options) {
+ $grpc_write_flags = [];
+ foreach ([WRITE_BUFFER_HINT,
+ WRITE_NO_COMPRESS] as $flag) {
+ if (in_array($flag, $options)) {
+ $grpc_write_flags[] = $flag;
+ }
+ }
+ return $grpc_write_flags;
+ }
+}
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index 2e980c5eed..381b114399 100755
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -168,7 +168,8 @@ class BaseStub {
public function _simpleRequest($method,
$argument,
callable $deserialize,
- $metadata = array()) {
+ $metadata = array(),
+ $options = array()) {
list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata);
$call = new UnaryCall($this->channel, $method, $deserialize, $timeout);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
@@ -177,7 +178,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
- $call->start($argument, $actual_metadata);
+ $call->start($argument, $actual_metadata, $options);
return $call;
}
@@ -193,7 +194,6 @@ class BaseStub {
* @return ClientStreamingSurfaceActiveCall The active call object
*/
public function _clientStreamRequest($method,
- $arguments,
callable $deserialize,
$metadata = array()) {
list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata);
@@ -204,7 +204,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
- $call->start($arguments, $actual_metadata);
+ $call->start($actual_metadata);
return $call;
}
@@ -221,7 +221,8 @@ class BaseStub {
public function _serverStreamRequest($method,
$argument,
callable $deserialize,
- $metadata = array()) {
+ $metadata = array(),
+ $options = array()) {
list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata);
$call = new ServerStreamingCall($this->channel, $method, $deserialize, $timeout);
$jwt_aud_uri = $this->_get_jwt_aud_uri($method);
@@ -230,7 +231,7 @@ class BaseStub {
$actual_metadata,
$jwt_aud_uri);
}
- $call->start($argument, $actual_metadata);
+ $call->start($argument, $actual_metadata, $options);
return $call;
}
diff --git a/src/php/lib/Grpc/BidiStreamingCall.php b/src/php/lib/Grpc/BidiStreamingCall.php
index 76c642bef4..80b7a66a76 100644
--- a/src/php/lib/Grpc/BidiStreamingCall.php
+++ b/src/php/lib/Grpc/BidiStreamingCall.php
@@ -66,9 +66,14 @@ class BidiStreamingCall extends AbstractCall {
* Write a single message to the server. This cannot be called after
* writesDone is called.
* @param ByteBuffer $data The data to write
+ * @param array $options an array of options
*/
- public function write($data) {
- $this->call->startBatch([OP_SEND_MESSAGE => $data->serialize()]);
+ public function write($data, $options = array()) {
+ $message_array = ['message' => $data->serialize()];
+ if ($grpc_write_flags = self::getGrpcWriteFlags($options)) {
+ $message_array['flags'] = $grpc_write_flags;
+ }
+ $this->call->startBatch([OP_SEND_MESSAGE => $message_array]);
}
/**
@@ -86,7 +91,7 @@ class BidiStreamingCall extends AbstractCall {
public function getStatus() {
$status_event = $this->call->startBatch([
OP_RECV_STATUS_ON_CLIENT => true
- ]);
+ ]);
return $status_event->status;
}
} \ No newline at end of file
diff --git a/src/php/lib/Grpc/ClientStreamingCall.php b/src/php/lib/Grpc/ClientStreamingCall.php
index 61439d3f47..97c241a087 100644
--- a/src/php/lib/Grpc/ClientStreamingCall.php
+++ b/src/php/lib/Grpc/ClientStreamingCall.php
@@ -40,15 +40,24 @@ namespace Grpc;
class ClientStreamingCall extends AbstractCall {
/**
* Start the call.
- * @param Traversable $arg_iter The iterator of arguments to send
* @param array $metadata Metadata to send with the call, if applicable
*/
- public function start($arg_iter, $metadata = array()) {
- $event = $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]);
- foreach($arg_iter as $arg) {
- $this->call->startBatch([OP_SEND_MESSAGE => $arg->serialize()]);
+ public function start($metadata) {
+ $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]);
+ }
+
+ /**
+ * Write a single message to the server. This cannot be called after
+ * wait is called.
+ * @param ByteBuffer $data The data to write
+ * @param array $options an array of options
+ */
+ public function write($data, $options = array()) {
+ $message_array = ['message' => $data->serialize()];
+ if ($grpc_write_flags = self::getGrpcWriteFlags($options)) {
+ $message_array['flags'] = $grpc_write_flags;
}
- $this->call->startBatch([OP_SEND_CLOSE_FROM_CLIENT => true]);
+ $this->call->startBatch([OP_SEND_MESSAGE => $message_array]);
}
/**
@@ -57,6 +66,7 @@ class ClientStreamingCall extends AbstractCall {
*/
public function wait() {
$event = $this->call->startBatch([
+ OP_SEND_CLOSE_FROM_CLIENT => true,
OP_RECV_INITIAL_METADATA => true,
OP_RECV_MESSAGE => true,
OP_RECV_STATUS_ON_CLIENT => true]);
diff --git a/src/php/lib/Grpc/ServerStreamingCall.php b/src/php/lib/Grpc/ServerStreamingCall.php
index 631c863345..159561b43a 100644
--- a/src/php/lib/Grpc/ServerStreamingCall.php
+++ b/src/php/lib/Grpc/ServerStreamingCall.php
@@ -40,14 +40,19 @@ namespace Grpc;
class ServerStreamingCall extends AbstractCall {
/**
* Start the call
- * @param $arg The argument to send
+ * @param $data The data to send
* @param array $metadata Metadata to send with the call, if applicable
+ * @param array $options an array of options
*/
- public function start($arg, $metadata = array()) {
+ public function start($data, $metadata = array(), $options = array()) {
+ $message_array = ['message' => $data->serialize()];
+ if ($grpc_write_flags = self::getGrpcWriteFlags($options)) {
+ $message_array['flags'] = $grpc_write_flags;
+ }
$event = $this->call->startBatch([
OP_SEND_INITIAL_METADATA => $metadata,
OP_RECV_INITIAL_METADATA => true,
- OP_SEND_MESSAGE => $arg->serialize(),
+ OP_SEND_MESSAGE => $message_array,
OP_SEND_CLOSE_FROM_CLIENT => true]);
$this->metadata = $event->metadata;
}
@@ -71,7 +76,7 @@ class ServerStreamingCall extends AbstractCall {
public function getStatus() {
$status_event = $this->call->startBatch([
OP_RECV_STATUS_ON_CLIENT => true
- ]);
+ ]);
return $status_event->status;
}
-} \ No newline at end of file
+}
diff --git a/src/php/lib/Grpc/UnaryCall.php b/src/php/lib/Grpc/UnaryCall.php
index 97a10a40f4..5ca7d9ed65 100644
--- a/src/php/lib/Grpc/UnaryCall.php
+++ b/src/php/lib/Grpc/UnaryCall.php
@@ -40,14 +40,19 @@ namespace Grpc;
class UnaryCall extends AbstractCall {
/**
* Start the call
- * @param $arg The argument to send
+ * @param $data The data to send
* @param array $metadata Metadata to send with the call, if applicable
+ * @param array $options an array of options
*/
- public function start($arg, $metadata = array()) {
+ public function start($data, $metadata = array(), $options = array()) {
+ $message_array = ['message' => $data->serialize()];
+ if ($grpc_write_flags = self::getGrpcWriteFlags($options)) {
+ $message_array['flags'] = $grpc_write_flags;
+ }
$event = $this->call->startBatch([
OP_SEND_INITIAL_METADATA => $metadata,
OP_RECV_INITIAL_METADATA => true,
- OP_SEND_MESSAGE => $arg->serialize(),
+ OP_SEND_MESSAGE => $message_array,
OP_SEND_CLOSE_FROM_CLIENT => true]);
$this->metadata = $event->metadata;
}
@@ -62,4 +67,4 @@ class UnaryCall extends AbstractCall {
OP_RECV_STATUS_ON_CLIENT => true]);
return array($this->deserializeResponse($event->message), $event->status);
}
-} \ No newline at end of file
+}
diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
index a368dd4ee0..531e80aa61 100644
--- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
+++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
@@ -79,15 +79,13 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase {
}
public function testClientStreaming() {
- $num_iter = function() {
- for ($i = 0; $i < 7; $i++) {
- $num = new math\Num();
- $num->setNum($i);
- yield $num;
- }
- };
- $call = self::$client->Sum($num_iter());
+ $call = self::$client->Sum();
$this->assertTrue(is_string($call->getPeer()));
+ for ($i = 0; $i < 7; $i++) {
+ $num = new math\Num();
+ $num->setNum($i);
+ $call->write($num);
+ }
list($response, $status) = $call->wait();
$this->assertSame(21, $response->getNum());
$this->assertSame(\Grpc\STATUS_OK, $status->code);
diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php
index bd15ee4303..28782735c0 100755
--- a/src/php/tests/interop/interop_client.php
+++ b/src/php/tests/interop/interop_client.php
@@ -173,7 +173,11 @@ function clientStreaming($stub) {
return $request;
}, $request_lengths);
- list($result, $status) = $stub->StreamingInputCall($requests)->wait();
+ $call = $stub->StreamingInputCall();
+ foreach ($requests as $request) {
+ $call->write($request);
+ }
+ list($result, $status) = $call->wait();
hardAssert($status->code === Grpc\STATUS_OK, 'Call did not complete successfully');
hardAssert($result->getAggregatedPayloadSize() === 74922,
'aggregated_payload_size was incorrect');
@@ -374,5 +378,6 @@ switch ($args['test_case']) {
// messages are sent immediately after metadata is sent. There is
// currently no way to cancel before messages are sent.
default:
+ echo "Unsupported test case $args[test_case]\n";
exit(1);
}
diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php
index 4c0cf91d51..bd464f939f 100755
--- a/src/php/tests/unit_tests/EndToEndTest.php
+++ b/src/php/tests/unit_tests/EndToEndTest.php
@@ -91,6 +91,51 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
unset($server_call);
}
+ public function testMessageWriteFlags() {
+ $deadline = Grpc\Timeval::infFuture();
+ $req_text = 'message_write_flags_test';
+ $status_text = 'xyz';
+ $call = new Grpc\Call($this->channel,
+ 'dummy_method',
+ $deadline);
+
+ $event = $call->startBatch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_MESSAGE => ['message' => $req_text,
+ 'flags' => Grpc\WRITE_NO_COMPRESS],
+ Grpc\OP_SEND_CLOSE_FROM_CLIENT => true
+ ]);
+
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_close);
+
+ $event = $this->server->requestCall();
+ $this->assertSame('dummy_method', $event->method);
+ $server_call = $event->call;
+
+ $event = $server_call->startBatch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_STATUS_FROM_SERVER => [
+ 'metadata' => [],
+ 'code' => Grpc\STATUS_OK,
+ 'details' => $status_text
+ ],
+ ]);
+
+ $event = $call->startBatch([
+ Grpc\OP_RECV_INITIAL_METADATA => true,
+ Grpc\OP_RECV_STATUS_ON_CLIENT => true
+ ]);
+
+ $status = $event->status;
+ $this->assertSame([], $status->metadata);
+ $this->assertSame(Grpc\STATUS_OK, $status->code);
+ $this->assertSame($status_text, $status->details);
+
+ unset($call);
+ unset($server_call);
+ }
+
public function testClientServerFullRequestResponse() {
$deadline = Grpc\Timeval::infFuture();
$req_text = 'client_server_full_request_response';
@@ -104,7 +149,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true,
- Grpc\OP_SEND_MESSAGE => $req_text
+ Grpc\OP_SEND_MESSAGE => ['message' => $req_text],
]);
$this->assertTrue($event->send_metadata);
@@ -117,7 +162,7 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
- Grpc\OP_SEND_MESSAGE => $reply_text,
+ Grpc\OP_SEND_MESSAGE => ['message' => $reply_text],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php
index 60341b983d..d7fca14a0d 100755
--- a/src/php/tests/unit_tests/SecureEndToEndTest.php
+++ b/src/php/tests/unit_tests/SecureEndToEndTest.php
@@ -107,6 +107,53 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
unset($server_call);
}
+ public function testMessageWriteFlags() {
+ $deadline = Grpc\Timeval::infFuture();
+ $req_text = 'message_write_flags_test';
+ $status_text = 'xyz';
+ $call = new Grpc\Call($this->channel,
+ 'dummy_method',
+ $deadline,
+ $this->host_override);
+
+ $event = $call->startBatch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_MESSAGE => ['message' => $req_text,
+ 'flags' => Grpc\WRITE_NO_COMPRESS],
+ Grpc\OP_SEND_CLOSE_FROM_CLIENT => true
+ ]);
+
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_close);
+
+ $event = $this->server->requestCall();
+ $this->assertSame('dummy_method', $event->method);
+ $server_call = $event->call;
+
+ $event = $server_call->startBatch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_STATUS_FROM_SERVER => [
+ 'metadata' => [],
+ 'code' => Grpc\STATUS_OK,
+ 'details' => $status_text
+ ],
+ ]);
+
+ $event = $call->startBatch([
+ Grpc\OP_RECV_INITIAL_METADATA => true,
+ Grpc\OP_RECV_STATUS_ON_CLIENT => true
+ ]);
+
+ $this->assertSame([], $event->metadata);
+ $status = $event->status;
+ $this->assertSame([], $status->metadata);
+ $this->assertSame(Grpc\STATUS_OK, $status->code);
+ $this->assertSame($status_text, $status->details);
+
+ unset($call);
+ unset($server_call);
+ }
+
public function testClientServerFullRequestResponse() {
$deadline = Grpc\Timeval::infFuture();
$req_text = 'client_server_full_request_response';
@@ -121,7 +168,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_CLOSE_FROM_CLIENT => true,
- Grpc\OP_SEND_MESSAGE => $req_text
+ Grpc\OP_SEND_MESSAGE => ['message' => $req_text]
]);
$this->assertTrue($event->send_metadata);
@@ -134,7 +181,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
- Grpc\OP_SEND_MESSAGE => $reply_text,
+ Grpc\OP_SEND_MESSAGE => ['message' => $reply_text],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,