diff options
author | ZhouyihaiDing <ddyihai@google.com> | 2017-10-31 18:09:59 -0700 |
---|---|---|
committer | ZhouyihaiDing <ddyihai@google.com> | 2018-05-07 14:06:30 -0700 |
commit | 52b95cf1fb10c6b76ea3527b3b45c8446901d9ba (patch) | |
tree | 4abeb141721a920e31cc8adafb97dd6791d0423d /src/php/lib | |
parent | aef957950aa3390e3516531ca2d783de8fc9333b (diff) |
gRPC PHP Client Interceptor implementation and tests
move InterceptorChannel to Internal subdir; change year
Diffstat (limited to 'src/php/lib')
-rw-r--r-- | src/php/lib/Grpc/BaseStub.php | 389 | ||||
-rw-r--r-- | src/php/lib/Grpc/Interceptor.php | 86 | ||||
-rw-r--r-- | src/php/lib/Grpc/Internal/InterceptorChannel.php | 76 |
3 files changed, 461 insertions, 90 deletions
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index 5f3a96feaa..7860233ca2 100644 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -38,12 +38,13 @@ class BaseStub * - 'update_metadata': (optional) a callback function which takes in a * metadata array, and returns an updated metadata array * - 'grpc.primary_user_agent': (optional) a user-agent string - * @param Channel $channel An already created Channel object (optional) + * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional) */ - public function __construct($hostname, $opts, Channel $channel = null) + public function __construct($hostname, $opts, $channel = null) { $ssl_roots = file_get_contents( - dirname(__FILE__).'/../../../../etc/roots.pem'); + dirname(__FILE__).'/../../../../etc/roots.pem' + ); ChannelCredentials::setDefaultRootsPem($ssl_roots); $this->hostname = $hostname; @@ -58,16 +59,20 @@ class BaseStub $this->hostname_override = $opts['grpc.ssl_target_name_override']; } if ($channel) { - if (!is_a($channel, 'Grpc\Channel')) { - throw new \Exception('The channel argument is not a'. - 'Channel object'); + if (!is_a($channel, 'Grpc\Channel') && + !is_a($channel, 'Grpc\InterceptorChannel')) { + throw new \Exception('The channel argument is not a Channel object '. + 'or an InterceptorChannel object created by '. + 'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)'); } $this->channel = $channel; return; } $package_config = json_decode( - file_get_contents(dirname(__FILE__).'/../../composer.json'), true); + file_get_contents(dirname(__FILE__).'/../../composer.json'), + true + ); if (!empty($opts['grpc.primary_user_agent'])) { $opts['grpc.primary_user_agent'] .= ' '; } else { @@ -77,8 +82,8 @@ class BaseStub 'grpc-php/'.$package_config['version']; if (!array_key_exists('credentials', $opts)) { throw new \Exception("The opts['credentials'] key is now ". - 'required. Please see one of the '. - 'ChannelCredentials::create methods'); + 'required. Please see one of the '. + 'ChannelCredentials::create methods'); } $this->channel = new Channel($hostname, $opts); } @@ -169,7 +174,8 @@ class BaseStub $last_slash_idx = strrpos($method, '/'); if ($last_slash_idx === false) { throw new \InvalidArgumentException( - 'service name must have a slash'); + 'service name must have a slash' + ); } $service_name = substr($method, 0, $last_slash_idx); @@ -197,7 +203,8 @@ class BaseStub if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) { throw new \InvalidArgumentException( 'Metadata keys must be nonempty strings containing only '. - 'alphanumeric characters, hyphens and underscores'); + 'alphanumeric characters, hyphens and underscores' + ); } $metadata_copy[strtolower($key)] = $value; } @@ -205,9 +212,255 @@ class BaseStub return $metadata_copy; } + /** + * Create a function which can be used to create UnaryCall + * + * @param Channel|InterceptorChannel $channel + * @param callable $deserialize A function that deserializes the response + * + * @return \Closure + */ + private function _GrpcUnaryUnary($channel, $deserialize) + { + return function ($method, + $argument, + array $metadata = [], + array $options = []) use ($channel, $deserialize) { + $call = new UnaryCall( + $channel, + $method, + $deserialize, + $options + ); + $jwt_aud_uri = $this->_get_jwt_aud_uri($method); + if (is_callable($this->update_metadata)) { + $metadata = call_user_func( + $this->update_metadata, + $metadata, + $jwt_aud_uri + ); + } + $metadata = $this->_validate_and_normalize_metadata( + $metadata + ); + $call->start($argument, $metadata, $options); + return $call; + }; + } + + /** + * Create a function which can be used to create ServerStreamingCall + * + * @param Channel|InterceptorChannel $channel + * @param callable $deserialize A function that deserializes the response + * + * @return \Closure + */ + private function _GrpcStreamUnary($channel, $deserialize) + { + return function ($method, + array $metadata = [], + array $options = []) use ($channel, $deserialize) { + $call = new ClientStreamingCall( + $channel, + $method, + $deserialize, + $options + ); + $jwt_aud_uri = $this->_get_jwt_aud_uri($method); + if (is_callable($this->update_metadata)) { + $metadata = call_user_func( + $this->update_metadata, + $metadata, + $jwt_aud_uri + ); + } + $metadata = $this->_validate_and_normalize_metadata( + $metadata + ); + $call->start($metadata); + return $call; + }; + } + + /** + * Create a function which can be used to create ClientStreamingCall + * + * @param Channel|InterceptorChannel $channel + * @param callable $deserialize A function that deserializes the response + * + * @return \Closure + */ + private function _GrpcUnaryStream($channel, $deserialize) + { + return function ($method, + $argument, + array $metadata = [], + array $options = []) use ($channel, $deserialize) { + $call = new ServerStreamingCall( + $channel, + $method, + $deserialize, + $options + ); + $jwt_aud_uri = $this->_get_jwt_aud_uri($method); + if (is_callable($this->update_metadata)) { + $metadata = call_user_func( + $this->update_metadata, + $metadata, + $jwt_aud_uri + ); + } + $metadata = $this->_validate_and_normalize_metadata( + $metadata + ); + $call->start($argument, $metadata, $options); + return $call; + }; + } + + /** + * Create a function which can be used to create BidiStreamingCall + * + * @param Channel|InterceptorChannel $channel + * @param callable $deserialize A function that deserializes the response + * + * @return \Closure + */ + private function _GrpcStreamStream($channel, $deserialize) + { + return function ($method, + array $metadata = [], + array $options = []) use ($channel ,$deserialize) { + $call = new BidiStreamingCall( + $channel, + $method, + $deserialize, + $options + ); + $jwt_aud_uri = $this->_get_jwt_aud_uri($method); + if (is_callable($this->update_metadata)) { + $metadata = call_user_func( + $this->update_metadata, + $metadata, + $jwt_aud_uri + ); + } + $metadata = $this->_validate_and_normalize_metadata( + $metadata + ); + $call->start($metadata); + + return $call; + }; + } + + /** + * Create a function which can be used to create UnaryCall + * + * @param Channel|InterceptorChannel $channel + * @param callable $deserialize A function that deserializes the response + * + * @return \Closure + */ + private function _UnaryUnaryCallFactory($channel, $deserialize) + { + if (is_a($channel, 'Grpc\InterceptorChannel')) { + return function ($method, + $argument, + array $metadata = [], + array $options = []) use ($channel, $deserialize) { + return $channel->getInterceptor()->interceptUnaryUnary( + $method, + $argument, + $metadata, + $options, + $this->_UnaryUnaryCallFactory($channel->getNext(), $deserialize) + ); + }; + } + return $this->_GrpcUnaryUnary($channel, $deserialize); + } + + /** + * Create a function which can be used to create ServerStreamingCall + * + * @param Channel|InterceptorChannel $channel + * @param callable $deserialize A function that deserializes the response + * + * @return \Closure + */ + private function _UnaryStreamCallFactory($channel, $deserialize) + { + if (is_a($channel, 'Grpc\InterceptorChannel')) { + return function ($method, + $argument, + array $metadata = [], + array $options = []) use ($channel, $deserialize) { + return $channel->getInterceptor()->interceptUnaryStream( + $method, + $argument, + $metadata, + $options, + $this->_UnaryStreamCallFactory($channel->getNext(), $deserialize) + ); + }; + } + return $this->_GrpcUnaryStream($channel, $deserialize); + } + + /** + * Create a function which can be used to create ClientStreamingCall + * + * @param Channel|InterceptorChannel $channel + * @param callable $deserialize A function that deserializes the response + * + * @return \Closure + */ + private function _StreamUnaryCallFactory($channel, $deserialize) + { + if (is_a($channel, 'Grpc\InterceptorChannel')) { + return function ($method, + array $metadata = [], + array $options = []) use ($channel, $deserialize) { + return $channel->getInterceptor()->interceptStreamUnary( + $method, + $metadata, + $options, + $this->_StreamUnaryCallFactory($channel->getNext(), $deserialize) + ); + }; + } + return $this->_GrpcStreamUnary($channel, $deserialize); + } + + /** + * Create a function which can be used to create BidiStreamingCall + * + * @param Channel|InterceptorChannel $channel + * @param callable $deserialize A function that deserializes the response + * + * @return \Closure + */ + private function _StreamStreamCallFactory($channel, $deserialize) + { + if (is_a($channel, 'Grpc\InterceptorChannel')) { + return function ($method, + array $metadata = [], + array $options = []) use ($channel, $deserialize) { + return $channel->getInterceptor()->interceptStreamStream( + $method, + $metadata, + $options, + $this->_StreamStreamCallFactory($channel->getNext(), $deserialize) + ); + }; + } + return $this->_GrpcStreamStream($channel, $deserialize); + } + /* This class is intended to be subclassed by generated code, so * all functions begin with "_" to avoid name collisions. */ - /** * Call a remote method that takes a single argument and has a * single output. @@ -221,26 +474,15 @@ class BaseStub * * @return UnaryCall The active call object */ - protected function _simpleRequest($method, - $argument, - $deserialize, - array $metadata = [], - array $options = []) - { - $call = new UnaryCall($this->channel, - $method, - $deserialize, - $options); - $jwt_aud_uri = $this->_get_jwt_aud_uri($method); - if (is_callable($this->update_metadata)) { - $metadata = call_user_func($this->update_metadata, - $metadata, - $jwt_aud_uri); - } - $metadata = $this->_validate_and_normalize_metadata( - $metadata); - $call->start($argument, $metadata, $options); - + protected function _simpleRequest( + $method, + $argument, + $deserialize, + array $metadata = [], + array $options = [] + ) { + $call_factory = $this->_UnaryUnaryCallFactory($this->channel, $deserialize); + $call = $call_factory($method, $argument, $metadata, $options); return $call; } @@ -256,25 +498,14 @@ class BaseStub * * @return ClientStreamingCall The active call object */ - protected function _clientStreamRequest($method, - $deserialize, - array $metadata = [], - array $options = []) - { - $call = new ClientStreamingCall($this->channel, - $method, - $deserialize, - $options); - $jwt_aud_uri = $this->_get_jwt_aud_uri($method); - if (is_callable($this->update_metadata)) { - $metadata = call_user_func($this->update_metadata, - $metadata, - $jwt_aud_uri); - } - $metadata = $this->_validate_and_normalize_metadata( - $metadata); - $call->start($metadata); - + protected function _clientStreamRequest( + $method, + $deserialize, + array $metadata = [], + array $options = [] + ) { + $call_factory = $this->_StreamUnaryCallFactory($this->channel, $deserialize); + $call = $call_factory($method, $metadata, $options); return $call; } @@ -291,26 +522,15 @@ class BaseStub * * @return ServerStreamingCall The active call object */ - protected function _serverStreamRequest($method, - $argument, - $deserialize, - array $metadata = [], - array $options = []) - { - $call = new ServerStreamingCall($this->channel, - $method, - $deserialize, - $options); - $jwt_aud_uri = $this->_get_jwt_aud_uri($method); - if (is_callable($this->update_metadata)) { - $metadata = call_user_func($this->update_metadata, - $metadata, - $jwt_aud_uri); - } - $metadata = $this->_validate_and_normalize_metadata( - $metadata); - $call->start($argument, $metadata, $options); - + protected function _serverStreamRequest( + $method, + $argument, + $deserialize, + array $metadata = [], + array $options = [] + ) { + $call_factory = $this->_UnaryStreamCallFactory($this->channel, $deserialize); + $call = $call_factory($method, $argument, $metadata, $options); return $call; } @@ -325,25 +545,14 @@ class BaseStub * * @return BidiStreamingCall The active call object */ - protected function _bidiRequest($method, - $deserialize, - array $metadata = [], - array $options = []) - { - $call = new BidiStreamingCall($this->channel, - $method, - $deserialize, - $options); - $jwt_aud_uri = $this->_get_jwt_aud_uri($method); - if (is_callable($this->update_metadata)) { - $metadata = call_user_func($this->update_metadata, - $metadata, - $jwt_aud_uri); - } - $metadata = $this->_validate_and_normalize_metadata( - $metadata); - $call->start($metadata); - + protected function _bidiRequest( + $method, + $deserialize, + array $metadata = [], + array $options = [] + ) { + $call_factory = $this->_StreamStreamCallFactory($this->channel, $deserialize); + $call = $call_factory($method, $metadata, $options); return $call; } } diff --git a/src/php/lib/Grpc/Interceptor.php b/src/php/lib/Grpc/Interceptor.php new file mode 100644 index 0000000000..9c1b5616f2 --- /dev/null +++ b/src/php/lib/Grpc/Interceptor.php @@ -0,0 +1,86 @@ +<?php +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace Grpc; + +/** + * Represents an interceptor that intercept RPC invocations before call starts. + * This is an EXPERIMENTAL API. + */ +class Interceptor +{ + public function interceptUnaryUnary( + $method, + $argument, + array $metadata = [], + array $options = [], + $continuation + ) { + return $continuation($method, $argument, $metadata, $options); + } + + public function interceptStreamUnary( + $method, + array $metadata = [], + array $options = [], + $continuation + ) { + return $continuation($method, $metadata, $options); + } + + public function interceptUnaryStream( + $method, + $argument, + array $metadata = [], + array $options = [], + $continuation + ) { + return $continuation($method, $argument, $metadata, $options); + } + + public function interceptStreamStream( + $method, + array $metadata = [], + array $options = [], + $continuation + ) { + return $continuation($method, $metadata, $options); + } + + /** + * Intercept the methods with Channel + * + * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional) + * @param Interceptor|Interceptor[] $interceptors interceptors to be added + * + * @return InterceptorChannel + */ + public static function intercept($channel, $interceptors) + { + if (is_array($interceptors)) { + for ($i = count($interceptors) - 1; $i >= 0; $i--) { + $channel = new InterceptorChannel($channel, $interceptors[$i]); + } + } else { + $channel = new InterceptorChannel($channel, $interceptors); + } + return $channel; + } +} + diff --git a/src/php/lib/Grpc/Internal/InterceptorChannel.php b/src/php/lib/Grpc/Internal/InterceptorChannel.php new file mode 100644 index 0000000000..9ac05748f3 --- /dev/null +++ b/src/php/lib/Grpc/Internal/InterceptorChannel.php @@ -0,0 +1,76 @@ +<?php +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace Grpc; + +/** + * This is a PRIVATE API and can change without notice. + */ +class InterceptorChannel +{ + private $next = null; + private $interceptor; + + /** + * @param Channel|InterceptorChannel $channel An already created Channel + * or InterceptorChannel object (optional) + * @param Interceptor $interceptor + */ + public function __construct($channel, $interceptor) + { + if (!is_a($channel, 'Grpc\Channel') && + !is_a($channel, 'Grpc\InterceptorChannel')) { + throw new \Exception('The channel argument is not a Channel object '. + 'or an InterceptorChannel object created by '. + 'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)'); + } + $this->interceptor = $interceptor; + $this->next = $channel; + } + + public function getNext() + { + return $this->next; + } + + public function getInterceptor() + { + return $this->interceptor; + } + + public function getTarget() + { + return $this->getNext()->getTarget(); + } + + public function watchConnectivityState($new_state, $deadline) + { + return $this->getNext()->watchConnectivityState($new_state, $deadline); + } + + public function getConnectivityState($try_to_connect = false) + { + return $this->getNext()->getConnectivityState($try_to_connect); + } + + public function close() + { + return $this->getNext()->close(); + } +} |