From 3ab8e79b0111dc93696d72e815531b5ee3124230 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Mon, 24 Aug 2015 16:58:42 -0700 Subject: php: support per message compression disable --- src/php/lib/Grpc/AbstractCall.php | 18 +++++++++++++++++- src/php/lib/Grpc/BaseStub.php | 13 +++++++------ src/php/lib/Grpc/BidiStreamingCall.php | 11 ++++++++--- src/php/lib/Grpc/ClientStreamingCall.php | 22 ++++++++++++++++------ src/php/lib/Grpc/ServerStreamingCall.php | 15 ++++++++++----- src/php/lib/Grpc/UnaryCall.php | 13 +++++++++---- 6 files changed, 67 insertions(+), 25 deletions(-) (limited to 'src/php/lib/Grpc') diff --git a/src/php/lib/Grpc/AbstractCall.php b/src/php/lib/Grpc/AbstractCall.php index 35057224f8..3fdaf2e487 100644 --- a/src/php/lib/Grpc/AbstractCall.php +++ b/src/php/lib/Grpc/AbstractCall.php @@ -92,4 +92,20 @@ abstract class AbstractCall { } return call_user_func($this->deserialize, $value); } -} \ No newline at end of file + + /** + * Get the list of Grpc Write Flags + * @param array $options an array of options + * @return The list of Grpc Write Flags contained in the input + */ + protected static function getGrpcWriteFlags($options) { + $grpc_write_flags = []; + foreach ([WRITE_BUFFER_HINT, + WRITE_NO_COMPRESS] as $flag) { + if (in_array($flag, $options)) { + $grpc_write_flags[] = $flag; + } + } + return $grpc_write_flags; + } +} diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index 2e980c5eed..381b114399 100755 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -168,7 +168,8 @@ class BaseStub { public function _simpleRequest($method, $argument, callable $deserialize, - $metadata = array()) { + $metadata = array(), + $options = array()) { list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); $call = new UnaryCall($this->channel, $method, $deserialize, $timeout); $jwt_aud_uri = $this->_get_jwt_aud_uri($method); @@ -177,7 +178,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } - $call->start($argument, $actual_metadata); + $call->start($argument, $actual_metadata, $options); return $call; } @@ -193,7 +194,6 @@ class BaseStub { * @return ClientStreamingSurfaceActiveCall The active call object */ public function _clientStreamRequest($method, - $arguments, callable $deserialize, $metadata = array()) { list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); @@ -204,7 +204,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } - $call->start($arguments, $actual_metadata); + $call->start($actual_metadata); return $call; } @@ -221,7 +221,8 @@ class BaseStub { public function _serverStreamRequest($method, $argument, callable $deserialize, - $metadata = array()) { + $metadata = array(), + $options = array()) { list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); $call = new ServerStreamingCall($this->channel, $method, $deserialize, $timeout); $jwt_aud_uri = $this->_get_jwt_aud_uri($method); @@ -230,7 +231,7 @@ class BaseStub { $actual_metadata, $jwt_aud_uri); } - $call->start($argument, $actual_metadata); + $call->start($argument, $actual_metadata, $options); return $call; } diff --git a/src/php/lib/Grpc/BidiStreamingCall.php b/src/php/lib/Grpc/BidiStreamingCall.php index 76c642bef4..80b7a66a76 100644 --- a/src/php/lib/Grpc/BidiStreamingCall.php +++ b/src/php/lib/Grpc/BidiStreamingCall.php @@ -66,9 +66,14 @@ class BidiStreamingCall extends AbstractCall { * Write a single message to the server. This cannot be called after * writesDone is called. * @param ByteBuffer $data The data to write + * @param array $options an array of options */ - public function write($data) { - $this->call->startBatch([OP_SEND_MESSAGE => $data->serialize()]); + public function write($data, $options = array()) { + $message_array = ['message' => $data->serialize()]; + if ($grpc_write_flags = self::getGrpcWriteFlags($options)) { + $message_array['flags'] = $grpc_write_flags; + } + $this->call->startBatch([OP_SEND_MESSAGE => $message_array]); } /** @@ -86,7 +91,7 @@ class BidiStreamingCall extends AbstractCall { public function getStatus() { $status_event = $this->call->startBatch([ OP_RECV_STATUS_ON_CLIENT => true - ]); + ]); return $status_event->status; } } \ No newline at end of file diff --git a/src/php/lib/Grpc/ClientStreamingCall.php b/src/php/lib/Grpc/ClientStreamingCall.php index 61439d3f47..97c241a087 100644 --- a/src/php/lib/Grpc/ClientStreamingCall.php +++ b/src/php/lib/Grpc/ClientStreamingCall.php @@ -40,15 +40,24 @@ namespace Grpc; class ClientStreamingCall extends AbstractCall { /** * Start the call. - * @param Traversable $arg_iter The iterator of arguments to send * @param array $metadata Metadata to send with the call, if applicable */ - public function start($arg_iter, $metadata = array()) { - $event = $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]); - foreach($arg_iter as $arg) { - $this->call->startBatch([OP_SEND_MESSAGE => $arg->serialize()]); + public function start($metadata) { + $this->call->startBatch([OP_SEND_INITIAL_METADATA => $metadata]); + } + + /** + * Write a single message to the server. This cannot be called after + * wait is called. + * @param ByteBuffer $data The data to write + * @param array $options an array of options + */ + public function write($data, $options = array()) { + $message_array = ['message' => $data->serialize()]; + if ($grpc_write_flags = self::getGrpcWriteFlags($options)) { + $message_array['flags'] = $grpc_write_flags; } - $this->call->startBatch([OP_SEND_CLOSE_FROM_CLIENT => true]); + $this->call->startBatch([OP_SEND_MESSAGE => $message_array]); } /** @@ -57,6 +66,7 @@ class ClientStreamingCall extends AbstractCall { */ public function wait() { $event = $this->call->startBatch([ + OP_SEND_CLOSE_FROM_CLIENT => true, OP_RECV_INITIAL_METADATA => true, OP_RECV_MESSAGE => true, OP_RECV_STATUS_ON_CLIENT => true]); diff --git a/src/php/lib/Grpc/ServerStreamingCall.php b/src/php/lib/Grpc/ServerStreamingCall.php index 631c863345..159561b43a 100644 --- a/src/php/lib/Grpc/ServerStreamingCall.php +++ b/src/php/lib/Grpc/ServerStreamingCall.php @@ -40,14 +40,19 @@ namespace Grpc; class ServerStreamingCall extends AbstractCall { /** * Start the call - * @param $arg The argument to send + * @param $data The data to send * @param array $metadata Metadata to send with the call, if applicable + * @param array $options an array of options */ - public function start($arg, $metadata = array()) { + public function start($data, $metadata = array(), $options = array()) { + $message_array = ['message' => $data->serialize()]; + if ($grpc_write_flags = self::getGrpcWriteFlags($options)) { + $message_array['flags'] = $grpc_write_flags; + } $event = $this->call->startBatch([ OP_SEND_INITIAL_METADATA => $metadata, OP_RECV_INITIAL_METADATA => true, - OP_SEND_MESSAGE => $arg->serialize(), + OP_SEND_MESSAGE => $message_array, OP_SEND_CLOSE_FROM_CLIENT => true]); $this->metadata = $event->metadata; } @@ -71,7 +76,7 @@ class ServerStreamingCall extends AbstractCall { public function getStatus() { $status_event = $this->call->startBatch([ OP_RECV_STATUS_ON_CLIENT => true - ]); + ]); return $status_event->status; } -} \ No newline at end of file +} diff --git a/src/php/lib/Grpc/UnaryCall.php b/src/php/lib/Grpc/UnaryCall.php index 97a10a40f4..5ca7d9ed65 100644 --- a/src/php/lib/Grpc/UnaryCall.php +++ b/src/php/lib/Grpc/UnaryCall.php @@ -40,14 +40,19 @@ namespace Grpc; class UnaryCall extends AbstractCall { /** * Start the call - * @param $arg The argument to send + * @param $data The data to send * @param array $metadata Metadata to send with the call, if applicable + * @param array $options an array of options */ - public function start($arg, $metadata = array()) { + public function start($data, $metadata = array(), $options = array()) { + $message_array = ['message' => $data->serialize()]; + if ($grpc_write_flags = self::getGrpcWriteFlags($options)) { + $message_array['flags'] = $grpc_write_flags; + } $event = $this->call->startBatch([ OP_SEND_INITIAL_METADATA => $metadata, OP_RECV_INITIAL_METADATA => true, - OP_SEND_MESSAGE => $arg->serialize(), + OP_SEND_MESSAGE => $message_array, OP_SEND_CLOSE_FROM_CLIENT => true]); $this->metadata = $event->metadata; } @@ -62,4 +67,4 @@ class UnaryCall extends AbstractCall { OP_RECV_STATUS_ON_CLIENT => true]); return array($this->deserializeResponse($event->message), $event->status); } -} \ No newline at end of file +} -- cgit v1.2.3