aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/php/lib
diff options
context:
space:
mode:
authorGravatar ZhouyihaiDing <ddyihai@google.com>2017-10-31 18:09:59 -0700
committerGravatar ZhouyihaiDing <ddyihai@google.com>2018-05-07 14:06:30 -0700
commit52b95cf1fb10c6b76ea3527b3b45c8446901d9ba (patch)
tree4abeb141721a920e31cc8adafb97dd6791d0423d /src/php/lib
parentaef957950aa3390e3516531ca2d783de8fc9333b (diff)
gRPC PHP Client Interceptor implementation and tests
move InterceptorChannel to Internal subdir; change year
Diffstat (limited to 'src/php/lib')
-rw-r--r--src/php/lib/Grpc/BaseStub.php389
-rw-r--r--src/php/lib/Grpc/Interceptor.php86
-rw-r--r--src/php/lib/Grpc/Internal/InterceptorChannel.php76
3 files changed, 461 insertions, 90 deletions
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index 5f3a96feaa..7860233ca2 100644
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -38,12 +38,13 @@ class BaseStub
* - 'update_metadata': (optional) a callback function which takes in a
* metadata array, and returns an updated metadata array
* - 'grpc.primary_user_agent': (optional) a user-agent string
- * @param Channel $channel An already created Channel object (optional)
+ * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
*/
- public function __construct($hostname, $opts, Channel $channel = null)
+ public function __construct($hostname, $opts, $channel = null)
{
$ssl_roots = file_get_contents(
- dirname(__FILE__).'/../../../../etc/roots.pem');
+ dirname(__FILE__).'/../../../../etc/roots.pem'
+ );
ChannelCredentials::setDefaultRootsPem($ssl_roots);
$this->hostname = $hostname;
@@ -58,16 +59,20 @@ class BaseStub
$this->hostname_override = $opts['grpc.ssl_target_name_override'];
}
if ($channel) {
- if (!is_a($channel, 'Grpc\Channel')) {
- throw new \Exception('The channel argument is not a'.
- 'Channel object');
+ if (!is_a($channel, 'Grpc\Channel') &&
+ !is_a($channel, 'Grpc\InterceptorChannel')) {
+ throw new \Exception('The channel argument is not a Channel object '.
+ 'or an InterceptorChannel object created by '.
+ 'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
}
$this->channel = $channel;
return;
}
$package_config = json_decode(
- file_get_contents(dirname(__FILE__).'/../../composer.json'), true);
+ file_get_contents(dirname(__FILE__).'/../../composer.json'),
+ true
+ );
if (!empty($opts['grpc.primary_user_agent'])) {
$opts['grpc.primary_user_agent'] .= ' ';
} else {
@@ -77,8 +82,8 @@ class BaseStub
'grpc-php/'.$package_config['version'];
if (!array_key_exists('credentials', $opts)) {
throw new \Exception("The opts['credentials'] key is now ".
- 'required. Please see one of the '.
- 'ChannelCredentials::create methods');
+ 'required. Please see one of the '.
+ 'ChannelCredentials::create methods');
}
$this->channel = new Channel($hostname, $opts);
}
@@ -169,7 +174,8 @@ class BaseStub
$last_slash_idx = strrpos($method, '/');
if ($last_slash_idx === false) {
throw new \InvalidArgumentException(
- 'service name must have a slash');
+ 'service name must have a slash'
+ );
}
$service_name = substr($method, 0, $last_slash_idx);
@@ -197,7 +203,8 @@ class BaseStub
if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) {
throw new \InvalidArgumentException(
'Metadata keys must be nonempty strings containing only '.
- 'alphanumeric characters, hyphens and underscores');
+ 'alphanumeric characters, hyphens and underscores'
+ );
}
$metadata_copy[strtolower($key)] = $value;
}
@@ -205,9 +212,255 @@ class BaseStub
return $metadata_copy;
}
+ /**
+ * Create a function which can be used to create UnaryCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _GrpcUnaryUnary($channel, $deserialize)
+ {
+ return function ($method,
+ $argument,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ $call = new UnaryCall(
+ $channel,
+ $method,
+ $deserialize,
+ $options
+ );
+ $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
+ if (is_callable($this->update_metadata)) {
+ $metadata = call_user_func(
+ $this->update_metadata,
+ $metadata,
+ $jwt_aud_uri
+ );
+ }
+ $metadata = $this->_validate_and_normalize_metadata(
+ $metadata
+ );
+ $call->start($argument, $metadata, $options);
+ return $call;
+ };
+ }
+
+ /**
+ * Create a function which can be used to create ServerStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _GrpcStreamUnary($channel, $deserialize)
+ {
+ return function ($method,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ $call = new ClientStreamingCall(
+ $channel,
+ $method,
+ $deserialize,
+ $options
+ );
+ $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
+ if (is_callable($this->update_metadata)) {
+ $metadata = call_user_func(
+ $this->update_metadata,
+ $metadata,
+ $jwt_aud_uri
+ );
+ }
+ $metadata = $this->_validate_and_normalize_metadata(
+ $metadata
+ );
+ $call->start($metadata);
+ return $call;
+ };
+ }
+
+ /**
+ * Create a function which can be used to create ClientStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _GrpcUnaryStream($channel, $deserialize)
+ {
+ return function ($method,
+ $argument,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ $call = new ServerStreamingCall(
+ $channel,
+ $method,
+ $deserialize,
+ $options
+ );
+ $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
+ if (is_callable($this->update_metadata)) {
+ $metadata = call_user_func(
+ $this->update_metadata,
+ $metadata,
+ $jwt_aud_uri
+ );
+ }
+ $metadata = $this->_validate_and_normalize_metadata(
+ $metadata
+ );
+ $call->start($argument, $metadata, $options);
+ return $call;
+ };
+ }
+
+ /**
+ * Create a function which can be used to create BidiStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _GrpcStreamStream($channel, $deserialize)
+ {
+ return function ($method,
+ array $metadata = [],
+ array $options = []) use ($channel ,$deserialize) {
+ $call = new BidiStreamingCall(
+ $channel,
+ $method,
+ $deserialize,
+ $options
+ );
+ $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
+ if (is_callable($this->update_metadata)) {
+ $metadata = call_user_func(
+ $this->update_metadata,
+ $metadata,
+ $jwt_aud_uri
+ );
+ }
+ $metadata = $this->_validate_and_normalize_metadata(
+ $metadata
+ );
+ $call->start($metadata);
+
+ return $call;
+ };
+ }
+
+ /**
+ * Create a function which can be used to create UnaryCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _UnaryUnaryCallFactory($channel, $deserialize)
+ {
+ if (is_a($channel, 'Grpc\InterceptorChannel')) {
+ return function ($method,
+ $argument,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ return $channel->getInterceptor()->interceptUnaryUnary(
+ $method,
+ $argument,
+ $metadata,
+ $options,
+ $this->_UnaryUnaryCallFactory($channel->getNext(), $deserialize)
+ );
+ };
+ }
+ return $this->_GrpcUnaryUnary($channel, $deserialize);
+ }
+
+ /**
+ * Create a function which can be used to create ServerStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _UnaryStreamCallFactory($channel, $deserialize)
+ {
+ if (is_a($channel, 'Grpc\InterceptorChannel')) {
+ return function ($method,
+ $argument,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ return $channel->getInterceptor()->interceptUnaryStream(
+ $method,
+ $argument,
+ $metadata,
+ $options,
+ $this->_UnaryStreamCallFactory($channel->getNext(), $deserialize)
+ );
+ };
+ }
+ return $this->_GrpcUnaryStream($channel, $deserialize);
+ }
+
+ /**
+ * Create a function which can be used to create ClientStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _StreamUnaryCallFactory($channel, $deserialize)
+ {
+ if (is_a($channel, 'Grpc\InterceptorChannel')) {
+ return function ($method,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ return $channel->getInterceptor()->interceptStreamUnary(
+ $method,
+ $metadata,
+ $options,
+ $this->_StreamUnaryCallFactory($channel->getNext(), $deserialize)
+ );
+ };
+ }
+ return $this->_GrpcStreamUnary($channel, $deserialize);
+ }
+
+ /**
+ * Create a function which can be used to create BidiStreamingCall
+ *
+ * @param Channel|InterceptorChannel $channel
+ * @param callable $deserialize A function that deserializes the response
+ *
+ * @return \Closure
+ */
+ private function _StreamStreamCallFactory($channel, $deserialize)
+ {
+ if (is_a($channel, 'Grpc\InterceptorChannel')) {
+ return function ($method,
+ array $metadata = [],
+ array $options = []) use ($channel, $deserialize) {
+ return $channel->getInterceptor()->interceptStreamStream(
+ $method,
+ $metadata,
+ $options,
+ $this->_StreamStreamCallFactory($channel->getNext(), $deserialize)
+ );
+ };
+ }
+ return $this->_GrpcStreamStream($channel, $deserialize);
+ }
+
/* This class is intended to be subclassed by generated code, so
* all functions begin with "_" to avoid name collisions. */
-
/**
* Call a remote method that takes a single argument and has a
* single output.
@@ -221,26 +474,15 @@ class BaseStub
*
* @return UnaryCall The active call object
*/
- protected function _simpleRequest($method,
- $argument,
- $deserialize,
- array $metadata = [],
- array $options = [])
- {
- $call = new UnaryCall($this->channel,
- $method,
- $deserialize,
- $options);
- $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
- if (is_callable($this->update_metadata)) {
- $metadata = call_user_func($this->update_metadata,
- $metadata,
- $jwt_aud_uri);
- }
- $metadata = $this->_validate_and_normalize_metadata(
- $metadata);
- $call->start($argument, $metadata, $options);
-
+ protected function _simpleRequest(
+ $method,
+ $argument,
+ $deserialize,
+ array $metadata = [],
+ array $options = []
+ ) {
+ $call_factory = $this->_UnaryUnaryCallFactory($this->channel, $deserialize);
+ $call = $call_factory($method, $argument, $metadata, $options);
return $call;
}
@@ -256,25 +498,14 @@ class BaseStub
*
* @return ClientStreamingCall The active call object
*/
- protected function _clientStreamRequest($method,
- $deserialize,
- array $metadata = [],
- array $options = [])
- {
- $call = new ClientStreamingCall($this->channel,
- $method,
- $deserialize,
- $options);
- $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
- if (is_callable($this->update_metadata)) {
- $metadata = call_user_func($this->update_metadata,
- $metadata,
- $jwt_aud_uri);
- }
- $metadata = $this->_validate_and_normalize_metadata(
- $metadata);
- $call->start($metadata);
-
+ protected function _clientStreamRequest(
+ $method,
+ $deserialize,
+ array $metadata = [],
+ array $options = []
+ ) {
+ $call_factory = $this->_StreamUnaryCallFactory($this->channel, $deserialize);
+ $call = $call_factory($method, $metadata, $options);
return $call;
}
@@ -291,26 +522,15 @@ class BaseStub
*
* @return ServerStreamingCall The active call object
*/
- protected function _serverStreamRequest($method,
- $argument,
- $deserialize,
- array $metadata = [],
- array $options = [])
- {
- $call = new ServerStreamingCall($this->channel,
- $method,
- $deserialize,
- $options);
- $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
- if (is_callable($this->update_metadata)) {
- $metadata = call_user_func($this->update_metadata,
- $metadata,
- $jwt_aud_uri);
- }
- $metadata = $this->_validate_and_normalize_metadata(
- $metadata);
- $call->start($argument, $metadata, $options);
-
+ protected function _serverStreamRequest(
+ $method,
+ $argument,
+ $deserialize,
+ array $metadata = [],
+ array $options = []
+ ) {
+ $call_factory = $this->_UnaryStreamCallFactory($this->channel, $deserialize);
+ $call = $call_factory($method, $argument, $metadata, $options);
return $call;
}
@@ -325,25 +545,14 @@ class BaseStub
*
* @return BidiStreamingCall The active call object
*/
- protected function _bidiRequest($method,
- $deserialize,
- array $metadata = [],
- array $options = [])
- {
- $call = new BidiStreamingCall($this->channel,
- $method,
- $deserialize,
- $options);
- $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
- if (is_callable($this->update_metadata)) {
- $metadata = call_user_func($this->update_metadata,
- $metadata,
- $jwt_aud_uri);
- }
- $metadata = $this->_validate_and_normalize_metadata(
- $metadata);
- $call->start($metadata);
-
+ protected function _bidiRequest(
+ $method,
+ $deserialize,
+ array $metadata = [],
+ array $options = []
+ ) {
+ $call_factory = $this->_StreamStreamCallFactory($this->channel, $deserialize);
+ $call = $call_factory($method, $metadata, $options);
return $call;
}
}
diff --git a/src/php/lib/Grpc/Interceptor.php b/src/php/lib/Grpc/Interceptor.php
new file mode 100644
index 0000000000..9c1b5616f2
--- /dev/null
+++ b/src/php/lib/Grpc/Interceptor.php
@@ -0,0 +1,86 @@
+<?php
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace Grpc;
+
+/**
+ * Represents an interceptor that intercept RPC invocations before call starts.
+ * This is an EXPERIMENTAL API.
+ */
+class Interceptor
+{
+ public function interceptUnaryUnary(
+ $method,
+ $argument,
+ array $metadata = [],
+ array $options = [],
+ $continuation
+ ) {
+ return $continuation($method, $argument, $metadata, $options);
+ }
+
+ public function interceptStreamUnary(
+ $method,
+ array $metadata = [],
+ array $options = [],
+ $continuation
+ ) {
+ return $continuation($method, $metadata, $options);
+ }
+
+ public function interceptUnaryStream(
+ $method,
+ $argument,
+ array $metadata = [],
+ array $options = [],
+ $continuation
+ ) {
+ return $continuation($method, $argument, $metadata, $options);
+ }
+
+ public function interceptStreamStream(
+ $method,
+ array $metadata = [],
+ array $options = [],
+ $continuation
+ ) {
+ return $continuation($method, $metadata, $options);
+ }
+
+ /**
+ * Intercept the methods with Channel
+ *
+ * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
+ * @param Interceptor|Interceptor[] $interceptors interceptors to be added
+ *
+ * @return InterceptorChannel
+ */
+ public static function intercept($channel, $interceptors)
+ {
+ if (is_array($interceptors)) {
+ for ($i = count($interceptors) - 1; $i >= 0; $i--) {
+ $channel = new InterceptorChannel($channel, $interceptors[$i]);
+ }
+ } else {
+ $channel = new InterceptorChannel($channel, $interceptors);
+ }
+ return $channel;
+ }
+}
+
diff --git a/src/php/lib/Grpc/Internal/InterceptorChannel.php b/src/php/lib/Grpc/Internal/InterceptorChannel.php
new file mode 100644
index 0000000000..9ac05748f3
--- /dev/null
+++ b/src/php/lib/Grpc/Internal/InterceptorChannel.php
@@ -0,0 +1,76 @@
+<?php
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace Grpc;
+
+/**
+ * This is a PRIVATE API and can change without notice.
+ */
+class InterceptorChannel
+{
+ private $next = null;
+ private $interceptor;
+
+ /**
+ * @param Channel|InterceptorChannel $channel An already created Channel
+ * or InterceptorChannel object (optional)
+ * @param Interceptor $interceptor
+ */
+ public function __construct($channel, $interceptor)
+ {
+ if (!is_a($channel, 'Grpc\Channel') &&
+ !is_a($channel, 'Grpc\InterceptorChannel')) {
+ throw new \Exception('The channel argument is not a Channel object '.
+ 'or an InterceptorChannel object created by '.
+ 'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
+ }
+ $this->interceptor = $interceptor;
+ $this->next = $channel;
+ }
+
+ public function getNext()
+ {
+ return $this->next;
+ }
+
+ public function getInterceptor()
+ {
+ return $this->interceptor;
+ }
+
+ public function getTarget()
+ {
+ return $this->getNext()->getTarget();
+ }
+
+ public function watchConnectivityState($new_state, $deadline)
+ {
+ return $this->getNext()->watchConnectivityState($new_state, $deadline);
+ }
+
+ public function getConnectivityState($try_to_connect = false)
+ {
+ return $this->getNext()->getConnectivityState($try_to_connect);
+ }
+
+ public function close()
+ {
+ return $this->getNext()->close();
+ }
+}