aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Zhouyihai Ding <ddyihai@google.com>2018-05-07 16:12:50 -0700
committerGravatar GitHub <noreply@github.com>2018-05-07 16:12:50 -0700
commit19145e71dca46cfce1b9ff89bc0ade686886b3b0 (patch)
tree3987e80a40553c647733173ff4b4ce67a49641bc
parentb845ac9c95d203d1d9acb6fc8f9270e45cc736ca (diff)
parent52b95cf1fb10c6b76ea3527b3b45c8446901d9ba (diff)
Merge pull request #13342 from ZhouyihaiDing/intercept3
gRPC PHP Client Interceptor
-rw-r--r--src/php/lib/Grpc/BaseStub.php389
-rw-r--r--src/php/lib/Grpc/Interceptor.php86
-rw-r--r--src/php/lib/Grpc/Internal/InterceptorChannel.php76
-rw-r--r--src/php/tests/unit_tests/InterceptorTest.php427
4 files changed, 888 insertions, 90 deletions
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index 5f3a96feaa..7860233ca2 100644
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -38,12 +38,13 @@ class BaseStub
* - 'update_metadata': (optional) a callback function which takes in a
* metadata array, and returns an updated metadata array
* - 'grpc.primary_user_agent': (optional) a user-agent string
- * @param Channel $channel An already created Channel object (optional)
+ * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
*/
- public function __construct($hostname, $opts, Channel $channel = null)
+ public function __construct($hostname, $opts, $channel = null)
{
$ssl_roots = file_get_contents(
- dirname(__FILE__).'/../../../../etc/roots.pem');
+ dirname(__FILE__).'/../../../../etc/roots.pem'
+ );
ChannelCredentials::setDefaultRootsPem($ssl_roots);
$this->hostname = $hostname;
@@ -58,16 +59,20 @@ class BaseStub
$this->hostname_override = $opts['grpc.ssl_target_name_override'];
}
if ($channel) {
- if (!is_a($channel, 'Grpc\Channel')) {
- throw new \Exception('The channel argument is not a'.
- 'Channel object');
+ if (!is_a($channel, 'Grpc\Channel') &&
+ !is_a($channel, 'Grpc\InterceptorChannel')) {
+ throw new \Exception('The channel argument is not a Channel object '.
+ 'or an InterceptorChannel object created by '.
+ 'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
}
$this->channel = $channel;
return;
}
$package_config = json_decode(
- file_get_contents(dirname(__FILE__).'/../../composer.json'), true);
+ file_get_contents(dirname(__FILE__).'/../../composer.json'),
+ true
+ );
if (!empty($opts['grpc.primary_user_agent'])) {
$opts['grpc.primary_user_agent'] .= ' ';
} else {
@@ -77,8 +82,8 @@ class BaseStub
'grpc-php/'.$package_config['version'];
if (!array_key_exists('credentials', $opts)) {
throw new \Exception("The opts['credentials'] key is now ".
- 'required. Please see one of the '.
- 'ChannelCredentials::create methods');
+ 'required. Please see one of the '.
+ 'ChannelCredentials::create methods');
}
$this->channel = new Channel($hostname, $opts);
}
@@ -169,7 +174,8 @@ class BaseStub
$last_slash_idx = strrpos($method, '/');
if ($last_slash_idx === false) {
throw new \InvalidArgumentException(
- 'service name must have a slash');
+ 'service name must have a slash'
+ );
}
$service_name = substr($method, 0, $last_slash_idx);
@@ -197,7 +203,8 @@ class BaseStub
if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) {
throw new \InvalidArgumentException(
'Metadata keys must be nonempty strings containing only '.
- 'alphanumeric characters, hyphens and underscores');
+ 'alphanumeric characters, hyphens and underscores'
+ );
}
$metadata_copy[strtolower($key)] = $value;
}
@@ -205,9 +212,255 @@ class BaseStub
return $metadata_copy;
}
+ /**
+ * Create a function which can be used to create UnaryCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _GrpcUnaryUnary($channel, $deserialize)
+ {
+ return function ($method,
+ $argument,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ $call = new UnaryCall(
+ $channel,
+ $method,
+ $deserialize,
+ $options
+ );
+ $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
+ if (is_callable($this->update_metadata)) {
+ $metadata = call_user_func(
+ $this->update_metadata,
+ $metadata,
+ $jwt_aud_uri
+ );
+ }
+ $metadata = $this->_validate_and_normalize_metadata(
+ $metadata
+ );
+ $call->start($argument, $metadata, $options);
+ return $call;
+ };
+ }
+
+ /**
+ * Create a function which can be used to create ServerStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _GrpcStreamUnary($channel, $deserialize)
+ {
+ return function ($method,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ $call = new ClientStreamingCall(
+ $channel,
+ $method,
+ $deserialize,
+ $options
+ );
+ $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
+ if (is_callable($this->update_metadata)) {
+ $metadata = call_user_func(
+ $this->update_metadata,
+ $metadata,
+ $jwt_aud_uri
+ );
+ }
+ $metadata = $this->_validate_and_normalize_metadata(
+ $metadata
+ );
+ $call->start($metadata);
+ return $call;
+ };
+ }
+
+ /**
+ * Create a function which can be used to create ClientStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _GrpcUnaryStream($channel, $deserialize)
+ {
+ return function ($method,
+ $argument,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ $call = new ServerStreamingCall(
+ $channel,
+ $method,
+ $deserialize,
+ $options
+ );
+ $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
+ if (is_callable($this->update_metadata)) {
+ $metadata = call_user_func(
+ $this->update_metadata,
+ $metadata,
+ $jwt_aud_uri
+ );
+ }
+ $metadata = $this->_validate_and_normalize_metadata(
+ $metadata
+ );
+ $call->start($argument, $metadata, $options);
+ return $call;
+ };
+ }
+
+ /**
+ * Create a function which can be used to create BidiStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _GrpcStreamStream($channel, $deserialize)
+ {
+ return function ($method,
+ array $metadata = [],
+ array $options = []) use ($channel ,$deserialize) {
+ $call = new BidiStreamingCall(
+ $channel,
+ $method,
+ $deserialize,
+ $options
+ );
+ $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
+ if (is_callable($this->update_metadata)) {
+ $metadata = call_user_func(
+ $this->update_metadata,
+ $metadata,
+ $jwt_aud_uri
+ );
+ }
+ $metadata = $this->_validate_and_normalize_metadata(
+ $metadata
+ );
+ $call->start($metadata);
+
+ return $call;
+ };
+ }
+
+ /**
+ * Create a function which can be used to create UnaryCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _UnaryUnaryCallFactory($channel, $deserialize)
+ {
+ if (is_a($channel, 'Grpc\InterceptorChannel')) {
+ return function ($method,
+ $argument,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ return $channel->getInterceptor()->interceptUnaryUnary(
+ $method,
+ $argument,
+ $metadata,
+ $options,
+ $this->_UnaryUnaryCallFactory($channel->getNext(), $deserialize)
+ );
+ };
+ }
+ return $this->_GrpcUnaryUnary($channel, $deserialize);
+ }
+
+ /**
+ * Create a function which can be used to create ServerStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _UnaryStreamCallFactory($channel, $deserialize)
+ {
+ if (is_a($channel, 'Grpc\InterceptorChannel')) {
+ return function ($method,
+ $argument,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ return $channel->getInterceptor()->interceptUnaryStream(
+ $method,
+ $argument,
+ $metadata,
+ $options,
+ $this->_UnaryStreamCallFactory($channel->getNext(), $deserialize)
+ );
+ };
+ }
+ return $this->_GrpcUnaryStream($channel, $deserialize);
+ }
+
+ /**
+ * Create a function which can be used to create ClientStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _StreamUnaryCallFactory($channel, $deserialize)
+ {
+ if (is_a($channel, 'Grpc\InterceptorChannel')) {
+ return function ($method,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ return $channel->getInterceptor()->interceptStreamUnary(
+ $method,
+ $metadata,
+ $options,
+ $this->_StreamUnaryCallFactory($channel->getNext(), $deserialize)
+ );
+ };
+ }
+ return $this->_GrpcStreamUnary($channel, $deserialize);
+ }
+
+ /**
+ * Create a function which can be used to create BidiStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _StreamStreamCallFactory($channel, $deserialize)
+ {
+ if (is_a($channel, 'Grpc\InterceptorChannel')) {
+ return function ($method,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ return $channel->getInterceptor()->interceptStreamStream(
+ $method,
+ $metadata,
+ $options,
+ $this->_StreamStreamCallFactory($channel->getNext(), $deserialize)
+ );
+ };
+ }
+ return $this->_GrpcStreamStream($channel, $deserialize);
+ }
+
/* This class is intended to be subclassed by generated code, so
* all functions begin with "_" to avoid name collisions. */
-
/**
* Call a remote method that takes a single argument and has a
* single output.
@@ -221,26 +474,15 @@ class BaseStub
*
* @return UnaryCall The active call object
*/
- protected function _simpleRequest($method,
- $argument,
- $deserialize,
- array $metadata = [],
- array $options = [])
- {
- $call = new UnaryCall($this->channel,
- $method,
- $deserialize,
- $options);
- $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
- if (is_callable($this->update_metadata)) {
- $metadata = call_user_func($this->update_metadata,
- $metadata,
- $jwt_aud_uri);
- }
- $metadata = $this->_validate_and_normalize_metadata(
- $metadata);
- $call->start($argument, $metadata, $options);
-
+ protected function _simpleRequest(
+ $method,
+ $argument,
+ $deserialize,
+ array $metadata = [],
+ array $options = []
+ ) {
+ $call_factory = $this->_UnaryUnaryCallFactory($this->channel, $deserialize);
+ $call = $call_factory($method, $argument, $metadata, $options);
return $call;
}
@@ -256,25 +498,14 @@ class BaseStub
*
* @return ClientStreamingCall The active call object
*/
- protected function _clientStreamRequest($method,
- $deserialize,
- array $metadata = [],
- array $options = [])
- {
- $call = new ClientStreamingCall($this->channel,
- $method,
- $deserialize,
- $options);
- $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
- if (is_callable($this->update_metadata)) {
- $metadata = call_user_func($this->update_metadata,
- $metadata,
- $jwt_aud_uri);
- }
- $metadata = $this->_validate_and_normalize_metadata(
- $metadata);
- $call->start($metadata);
-
+ protected function _clientStreamRequest(
+ $method,
+ $deserialize,
+ array $metadata = [],
+ array $options = []
+ ) {
+ $call_factory = $this->_StreamUnaryCallFactory($this->channel, $deserialize);
+ $call = $call_factory($method, $metadata, $options);
return $call;
}
@@ -291,26 +522,15 @@ class BaseStub
*
* @return ServerStreamingCall The active call object
*/
- protected function _serverStreamRequest($method,
- $argument,
- $deserialize,
- array $metadata = [],
- array $options = [])
- {
- $call = new ServerStreamingCall($this->channel,
- $method,
- $deserialize,
- $options);
- $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
- if (is_callable($this->update_metadata)) {
- $metadata = call_user_func($this->update_metadata,
- $metadata,
- $jwt_aud_uri);
- }
- $metadata = $this->_validate_and_normalize_metadata(
- $metadata);
- $call->start($argument, $metadata, $options);
-
+ protected function _serverStreamRequest(
+ $method,
+ $argument,
+ $deserialize,
+ array $metadata = [],
+ array $options = []
+ ) {
+ $call_factory = $this->_UnaryStreamCallFactory($this->channel, $deserialize);
+ $call = $call_factory($method, $argument, $metadata, $options);
return $call;
}
@@ -325,25 +545,14 @@ class BaseStub
*
* @return BidiStreamingCall The active call object
*/
- protected function _bidiRequest($method,
- $deserialize,
- array $metadata = [],
- array $options = [])
- {
- $call = new BidiStreamingCall($this->channel,
- $method,
- $deserialize,
- $options);
- $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
- if (is_callable($this->update_metadata)) {
- $metadata = call_user_func($this->update_metadata,
- $metadata,
- $jwt_aud_uri);
- }
- $metadata = $this->_validate_and_normalize_metadata(
- $metadata);
- $call->start($metadata);
-
+ protected function _bidiRequest(
+ $method,
+ $deserialize,
+ array $metadata = [],
+ array $options = []
+ ) {
+ $call_factory = $this->_StreamStreamCallFactory($this->channel, $deserialize);
+ $call = $call_factory($method, $metadata, $options);
return $call;
}
}
diff --git a/src/php/lib/Grpc/Interceptor.php b/src/php/lib/Grpc/Interceptor.php
new file mode 100644
index 0000000000..9c1b5616f2
--- /dev/null
+++ b/src/php/lib/Grpc/Interceptor.php
@@ -0,0 +1,86 @@
+<?php
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace Grpc;
+
+/**
+ * Represents an interceptor that intercept RPC invocations before call starts.
+ * This is an EXPERIMENTAL API.
+ */
+class Interceptor
+{
+ public function interceptUnaryUnary(
+ $method,
+ $argument,
+ array $metadata = [],
+ array $options = [],
+ $continuation
+ ) {
+ return $continuation($method, $argument, $metadata, $options);
+ }
+
+ public function interceptStreamUnary(
+ $method,
+ array $metadata = [],
+ array $options = [],
+ $continuation
+ ) {
+ return $continuation($method, $metadata, $options);
+ }
+
+ public function interceptUnaryStream(
+ $method,
+ $argument,
+ array $metadata = [],
+ array $options = [],
+ $continuation
+ ) {
+ return $continuation($method, $argument, $metadata, $options);
+ }
+
+ public function interceptStreamStream(
+ $method,
+ array $metadata = [],
+ array $options = [],
+ $continuation
+ ) {
+ return $continuation($method, $metadata, $options);
+ }
+
+ /**
+ * Intercept the methods with Channel
+ *
+ * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
+ * @param Interceptor|Interceptor[] $interceptors interceptors to be added
+ *
+ * @return InterceptorChannel
+ */
+ public static function intercept($channel, $interceptors)
+ {
+ if (is_array($interceptors)) {
+ for ($i = count($interceptors) - 1; $i >= 0; $i--) {
+ $channel = new InterceptorChannel($channel, $interceptors[$i]);
+ }
+ } else {
+ $channel = new InterceptorChannel($channel, $interceptors);
+ }
+ return $channel;
+ }
+}
+
diff --git a/src/php/lib/Grpc/Internal/InterceptorChannel.php b/src/php/lib/Grpc/Internal/InterceptorChannel.php
new file mode 100644
index 0000000000..9ac05748f3
--- /dev/null
+++ b/src/php/lib/Grpc/Internal/InterceptorChannel.php
@@ -0,0 +1,76 @@
+<?php
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace Grpc;
+
+/**
+ * This is a PRIVATE API and can change without notice.
+ */
+class InterceptorChannel
+{
+ private $next = null;
+ private $interceptor;
+
+ /**
+ * @param Channel|InterceptorChannel $channel An already created Channel
+ * or InterceptorChannel object (optional)
+ * @param Interceptor $interceptor
+ */
+ public function __construct($channel, $interceptor)
+ {
+ if (!is_a($channel, 'Grpc\Channel') &&
+ !is_a($channel, 'Grpc\InterceptorChannel')) {
+ throw new \Exception('The channel argument is not a Channel object '.
+ 'or an InterceptorChannel object created by '.
+ 'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
+ }
+ $this->interceptor = $interceptor;
+ $this->next = $channel;
+ }
+
+ public function getNext()
+ {
+ return $this->next;
+ }
+
+ public function getInterceptor()
+ {
+ return $this->interceptor;
+ }
+
+ public function getTarget()
+ {
+ return $this->getNext()->getTarget();
+ }
+
+ public function watchConnectivityState($new_state, $deadline)
+ {
+ return $this->getNext()->watchConnectivityState($new_state, $deadline);
+ }
+
+ public function getConnectivityState($try_to_connect = false)
+ {
+ return $this->getNext()->getConnectivityState($try_to_connect);
+ }
+
+ public function close()
+ {
+ return $this->getNext()->close();
+ }
+}
diff --git a/src/php/tests/unit_tests/InterceptorTest.php b/src/php/tests/unit_tests/InterceptorTest.php
new file mode 100644
index 0000000000..08f5abbb21
--- /dev/null
+++ b/src/php/tests/unit_tests/InterceptorTest.php
@@ -0,0 +1,427 @@
+<?php
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Interface exported by the server.
+ */
+require_once(dirname(__FILE__).'/../../lib/Grpc/BaseStub.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/AbstractCall.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/UnaryCall.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/ClientStreamingCall.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/Interceptor.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/Internal/InterceptorChannel.php');
+
+class SimpleRequest
+{
+ private $data;
+ public function __construct($data)
+ {
+ $this->data = $data;
+ }
+ public function setData($data)
+ {
+ $this->data = $data;
+ }
+ public function serializeToString()
+ {
+ return $this->data;
+ }
+}
+
+class InterceptorClient extends Grpc\BaseStub
+{
+
+ /**
+ * @param string $hostname hostname
+ * @param array $opts channel options
+ * @param Channel|InterceptorChannel $channel (optional) re-use channel object
+ */
+ public function __construct($hostname, $opts, $channel = null)
+ {
+ parent::__construct($hostname, $opts, $channel);
+ }
+
+ /**
+ * A simple RPC.
+ * @param \Routeguide\Point $argument input argument
+ * @param array $metadata metadata
+ * @param array $options call options
+ */
+ public function UnaryCall(
+ SimpleRequest $argument,
+ $metadata = [],
+ $options = []
+ ) {
+ return $this->_simpleRequest(
+ '/dummy_method',
+ $argument,
+ [],
+ $metadata,
+ $options
+ );
+ }
+
+ /**
+ * A client-to-server streaming RPC.
+ * @param array $metadata metadata
+ * @param array $options call options
+ */
+ public function StreamCall(
+ $metadata = [],
+ $options = []
+ ) {
+ return $this->_clientStreamRequest('/dummy_method', [], $metadata, $options);
+ }
+}
+
+
+class ChangeMetadataInterceptor extends Grpc\Interceptor
+{
+ public function interceptUnaryUnary($method,
+ $argument,
+ array $metadata = [],
+ array $options = [],
+ $continuation)
+ {
+ $metadata["foo"] = array('interceptor_from_unary_request');
+ return $continuation($method, $argument, $metadata, $options);
+ }
+ public function interceptStreamUnary($method, array $metadata = [], array $options = [], $continuation)
+ {
+ $metadata["foo"] = array('interceptor_from_stream_request');
+ return $continuation($method, $metadata, $options);
+ }
+}
+
+class ChangeMetadataInterceptor2 extends Grpc\Interceptor
+{
+ public function interceptUnaryUnary($method,
+ $argument,
+ array $metadata = [],
+ array $options = [],
+ $continuation)
+ {
+ if (array_key_exists('foo', $metadata)) {
+ $metadata['bar'] = array('ChangeMetadataInterceptor should be executed first');
+ } else {
+ $metadata["bar"] = array('interceptor_from_unary_request');
+ }
+ return $continuation($method, $argument, $metadata, $options);
+ }
+ public function interceptStreamUnary($method,
+ array $metadata = [],
+ array $options = [],
+ $continuation)
+ {
+ if (array_key_exists('foo', $metadata)) {
+ $metadata['bar'] = array('ChangeMetadataInterceptor should be executed first');
+ } else {
+ $metadata["bar"] = array('interceptor_from_stream_request');
+ }
+ return $continuation($method, $metadata, $options);
+ }
+}
+
+class ChangeRequestCall
+{
+ private $call;
+
+ public function __construct($call)
+ {
+ $this->call = $call;
+ }
+ public function getCall()
+ {
+ return $this->call;
+ }
+
+ public function write($request)
+ {
+ $request->setData('intercepted_stream_request');
+ $this->getCall()->write($request);
+ }
+
+ public function wait()
+ {
+ return $this->getCall()->wait();
+ }
+}
+
+class ChangeRequestInterceptor extends Grpc\Interceptor
+{
+ public function interceptUnaryUnary($method,
+ $argument,
+ array $metadata = [],
+ array $options = [],
+ $continuation)
+ {
+ $argument->setData('intercepted_unary_request');
+ return $continuation($method, $argument, $metadata, $options);
+ }
+ public function interceptStreamUnary($method, array $metadata = [], array $options = [], $continuation)
+ {
+ return new ChangeRequestCall(
+ $continuation($method, $metadata, $options)
+ );
+ }
+}
+
+class StopCallInterceptor extends Grpc\Interceptor
+{
+ public function interceptUnaryUnary($method,
+ $argument,
+ array $metadata = [],
+ array $options = [],
+ $continuation)
+ {
+ $metadata["foo"] = array('interceptor_from_request_response');
+ }
+ public function interceptStreamUnary($method,
+ array $metadata = [],
+ array $options = [],
+ $continuation)
+ {
+ $metadata["foo"] = array('interceptor_from_request_response');
+ }
+}
+
+class InterceptorTest extends PHPUnit_Framework_TestCase
+{
+ public function setUp()
+ {
+ $this->server = new Grpc\Server([]);
+ $this->port = $this->server->addHttp2Port('0.0.0.0:0');
+ $this->channel = new Grpc\Channel('localhost:'.$this->port, ['credentials' => Grpc\ChannelCredentials::createInsecure()]);
+ $this->server->start();
+ }
+
+ public function tearDown()
+ {
+ $this->channel->close();
+ }
+
+
+ public function testClientChangeMetadataOneInterceptor()
+ {
+ $req_text = 'client_request';
+ $channel_matadata_interceptor = new ChangeMetadataInterceptor();
+ $intercept_channel = Grpc\Interceptor::intercept($this->channel, $channel_matadata_interceptor);
+ echo "create Client\n";
+ $client = new InterceptorClient('localhost:'.$this->port, [
+ 'credentials' => Grpc\ChannelCredentials::createInsecure(),
+ ], $intercept_channel);
+ echo "create Call\n";
+ $req = new SimpleRequest($req_text);
+ echo "Call created\n";
+ $unary_call = $client->UnaryCall($req);
+ echo "start call\n";
+ $event = $this->server->requestCall();
+ $this->assertSame('/dummy_method', $event->method);
+ $this->assertSame(['interceptor_from_unary_request'], $event->metadata['foo']);
+
+ $stream_call = $client->StreamCall();
+ $stream_call->write($req);
+ $event = $this->server->requestCall();
+ $this->assertSame('/dummy_method', $event->method);
+ $this->assertSame(['interceptor_from_stream_request'], $event->metadata['foo']);
+
+ unset($unary_call);
+ unset($stream_call);
+ unset($server_call);
+ }
+
+ public function testClientChangeMetadataTwoInterceptor()
+ {
+ $req_text = 'client_request';
+ $channel_matadata_interceptor = new ChangeMetadataInterceptor();
+ $channel_matadata_intercepto2 = new ChangeMetadataInterceptor2();
+ // test intercept separately.
+ $intercept_channel1 = Grpc\Interceptor::intercept($this->channel, $channel_matadata_interceptor);
+ $intercept_channel2 = Grpc\Interceptor::intercept($intercept_channel1, $channel_matadata_intercepto2);
+ $client = new InterceptorClient('localhost:'.$this->port, [
+ 'credentials' => Grpc\ChannelCredentials::createInsecure(),
+ ], $intercept_channel2);
+
+ $req = new SimpleRequest($req_text);
+ $unary_call = $client->UnaryCall($req);
+ $event = $this->server->requestCall();
+ $this->assertSame('/dummy_method', $event->method);
+ $this->assertSame(['interceptor_from_unary_request'], $event->metadata['foo']);
+ $this->assertSame(['interceptor_from_unary_request'], $event->metadata['bar']);
+
+ $stream_call = $client->StreamCall();
+ $stream_call->write($req);
+ $event = $this->server->requestCall();
+ $this->assertSame('/dummy_method', $event->method);
+ $this->assertSame(['interceptor_from_stream_request'], $event->metadata['foo']);
+ $this->assertSame(['interceptor_from_stream_request'], $event->metadata['bar']);
+
+ unset($unary_call);
+ unset($stream_call);
+ unset($server_call);
+
+ // test intercept by array.
+ $intercept_channel3 = Grpc\Interceptor::intercept($this->channel,
+ [$channel_matadata_intercepto2, $channel_matadata_interceptor]);
+ $client = new InterceptorClient('localhost:'.$this->port, [
+ 'credentials' => Grpc\ChannelCredentials::createInsecure(),
+ ], $intercept_channel3);
+
+ $req = new SimpleRequest($req_text);
+ $unary_call = $client->UnaryCall($req);
+ $event = $this->server->requestCall();
+ $this->assertSame('/dummy_method', $event->method);
+ $this->assertSame(['interceptor_from_unary_request'], $event->metadata['foo']);
+ $this->assertSame(['interceptor_from_unary_request'], $event->metadata['bar']);
+
+ $stream_call = $client->StreamCall();
+ $stream_call->write($req);
+ $event = $this->server->requestCall();
+ $this->assertSame('/dummy_method', $event->method);
+ $this->assertSame(['interceptor_from_stream_request'], $event->metadata['foo']);
+ $this->assertSame(['interceptor_from_stream_request'], $event->metadata['bar']);
+
+ unset($unary_call);
+ unset($stream_call);
+ unset($server_call);
+ }
+
+ public function testClientChangeRequestInterceptor()
+ {
+ $req_text = 'client_request';
+ $change_request_interceptor = new ChangeRequestInterceptor();
+ $intercept_channel = Grpc\Interceptor::intercept($this->channel,
+ $change_request_interceptor);
+ $client = new InterceptorClient('localhost:'.$this->port, [
+ 'credentials' => Grpc\ChannelCredentials::createInsecure(),
+ ], $intercept_channel);
+
+ $req = new SimpleRequest($req_text);
+ $unary_call = $client->UnaryCall($req);
+
+ $event = $this->server->requestCall();
+ $this->assertSame('/dummy_method', $event->method);
+ $server_call = $event->call;
+ $event = $server_call->startBatch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_STATUS_FROM_SERVER => [
+ 'metadata' => [],
+ 'code' => Grpc\STATUS_OK,
+ 'details' => '',
+ ],
+ Grpc\OP_RECV_MESSAGE => true,
+ Grpc\OP_RECV_CLOSE_ON_SERVER => true,
+ ]);
+ $this->assertSame('intercepted_unary_request', $event->message);
+
+ $stream_call = $client->StreamCall();
+ $stream_call->write($req);
+ $event = $this->server->requestCall();
+ $this->assertSame('/dummy_method', $event->method);
+ $server_call = $event->call;
+ $event = $server_call->startBatch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_STATUS_FROM_SERVER => [
+ 'metadata' => [],
+ 'code' => Grpc\STATUS_OK,
+ 'details' => '',
+ ],
+ Grpc\OP_RECV_MESSAGE => true,
+ Grpc\OP_RECV_CLOSE_ON_SERVER => true,
+ ]);
+ $this->assertSame('intercepted_stream_request', $event->message);
+
+ unset($unary_call);
+ unset($stream_call);
+ unset($server_call);
+ }
+
+ public function testClientChangeStopCallInterceptor()
+ {
+ $req_text = 'client_request';
+ $channel_request_interceptor = new StopCallInterceptor();
+ $intercept_channel = Grpc\Interceptor::intercept($this->channel,
+ $channel_request_interceptor);
+ $client = new InterceptorClient('localhost:'.$this->port, [
+ 'credentials' => Grpc\ChannelCredentials::createInsecure(),
+ ], $intercept_channel);
+
+ $req = new SimpleRequest($req_text);
+ $unary_call = $client->UnaryCall($req);
+ $this->assertNull($unary_call);
+
+
+ $stream_call = $client->StreamCall();
+ $this->assertNull($stream_call);
+
+ unset($unary_call);
+ unset($stream_call);
+ unset($server_call);
+ }
+
+ public function testGetInterceptorChannelConnectivityState()
+ {
+ $channel = new Grpc\Channel(
+ 'localhost:0',
+ ['credentials' => Grpc\ChannelCredentials::createInsecure()]
+ );
+ $interceptor_channel = Grpc\Interceptor::intercept($channel, new Grpc\Interceptor());
+ $state = $interceptor_channel->getConnectivityState();
+ $this->assertEquals(0, $state);
+ $channel->close();
+ }
+
+ public function testInterceptorChannelWatchConnectivityState()
+ {
+ $channel = new Grpc\Channel(
+ 'localhost:0',
+ ['credentials' => Grpc\ChannelCredentials::createInsecure()]
+ );
+ $interceptor_channel = Grpc\Interceptor::intercept($channel, new Grpc\Interceptor());
+ $now = Grpc\Timeval::now();
+ $deadline = $now->add(new Grpc\Timeval(100*1000));
+ $state = $interceptor_channel->watchConnectivityState(1, $deadline);
+ $this->assertTrue($state);
+ unset($time);
+ unset($deadline);
+ $channel->close();
+ }
+
+ public function testInterceptorChannelClose()
+ {
+ $channel = new Grpc\Channel(
+ 'localhost:0',
+ ['credentials' => Grpc\ChannelCredentials::createInsecure()]
+ );
+ $interceptor_channel = Grpc\Interceptor::intercept($channel, new Grpc\Interceptor());
+ $this->assertNotNull($interceptor_channel);
+ $channel->close();
+ }
+
+ public function testInterceptorChannelGetTarget()
+ {
+ $channel = new Grpc\Channel(
+ 'localhost:8888',
+ ['credentials' => Grpc\ChannelCredentials::createInsecure()]
+ );
+ $interceptor_channel = Grpc\Interceptor::intercept($channel, new Grpc\Interceptor());
+ $target = $interceptor_channel->getTarget();
+ $this->assertTrue(is_string($target));
+ $channel->close();
+ }
+}