diff options
Diffstat (limited to 'src/php/lib/Grpc/BaseStub.php')
-rwxr-xr-x | src/php/lib/Grpc/BaseStub.php | 467 |
1 files changed, 261 insertions, 206 deletions
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index 0a3e1f78bf..c26be607ff 100755 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -31,253 +31,308 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ + namespace Grpc; /** * Base class for generated client stubs. Stub methods are expected to call * _simpleRequest or _streamRequest and return the result. */ -class BaseStub { +class BaseStub +{ + private $hostname; + private $channel; - private $hostname; - private $channel; + // a callback function + private $update_metadata; - // a callback function - private $update_metadata; + /** + * @param $hostname string + * @param $opts array + * - 'update_metadata': (optional) a callback function which takes in a + * metadata array, and returns an updated metadata array + */ + public function __construct($hostname, $opts) + { + $this->hostname = $hostname; + $this->update_metadata = null; + if (isset($opts['update_metadata'])) { + if (is_callable($opts['update_metadata'])) { + $this->update_metadata = $opts['update_metadata']; + } + unset($opts['update_metadata']); + } + $package_config = json_decode( + file_get_contents(dirname(__FILE__).'/../../composer.json'), true); + $opts['grpc.primary_user_agent'] = + 'grpc-php/'.$package_config['version']; + $this->channel = new Channel($hostname, $opts); + } - /** - * @param $hostname string - * @param $opts array - * - 'update_metadata': (optional) a callback function which takes in a - * metadata array, and returns an updated metadata array - */ - public function __construct($hostname, $opts) { - $this->hostname = $hostname; - $this->update_metadata = null; - if (isset($opts['update_metadata'])) { - if (is_callable($opts['update_metadata'])) { - $this->update_metadata = $opts['update_metadata']; - } - unset($opts['update_metadata']); + /** + * @return string The URI of the endpoint. + */ + public function getTarget() + { + return $this->channel->getTarget(); } - $package_config = json_decode( - file_get_contents(dirname(__FILE__) . '/../../composer.json'), true); - $opts['grpc.primary_user_agent'] = - 'grpc-php/' . $package_config['version']; - $this->channel = new Channel($hostname, $opts); - } - /** - * @return string The URI of the endpoint. - */ - public function getTarget() { - return $this->channel->getTarget(); - } + /** + * @param $try_to_connect bool + * + * @return int The grpc connectivity state + */ + public function getConnectivityState($try_to_connect = false) + { + return $this->channel->getConnectivityState($try_to_connect); + } - /** - * @param $try_to_connect bool - * @return int The grpc connectivity state - */ - public function getConnectivityState($try_to_connect = false) { - return $this->channel->getConnectivityState($try_to_connect); - } + /** + * @param $timeout in microseconds + * + * @return bool true if channel is ready + * @throw Exception if channel is in FATAL_ERROR state + */ + public function waitForReady($timeout) + { + $new_state = $this->getConnectivityState(true); + if ($this->_checkConnectivityState($new_state)) { + return true; + } - /** - * @param $timeout in microseconds - * @return bool true if channel is ready - * @throw Exception if channel is in FATAL_ERROR state - */ - public function waitForReady($timeout) { - $new_state = $this->getConnectivityState(true); - if ($this->_checkConnectivityState($new_state)) { - return true; - } + $now = Timeval::now(); + $delta = new Timeval($timeout); + $deadline = $now->add($delta); - $now = Timeval::now(); - $delta = new Timeval($timeout); - $deadline = $now->add($delta); + while ($this->channel->watchConnectivityState($new_state, $deadline)) { + // state has changed before deadline + $new_state = $this->getConnectivityState(); + if ($this->_checkConnectivityState($new_state)) { + return true; + } + } + // deadline has passed + $new_state = $this->getConnectivityState(); - while ($this->channel->watchConnectivityState($new_state, $deadline)) { - // state has changed before deadline - $new_state = $this->getConnectivityState(); - if ($this->_checkConnectivityState($new_state)) { - return true; - } + return $this->_checkConnectivityState($new_state); } - // deadline has passed - $new_state = $this->getConnectivityState(); - return $this->_checkConnectivityState($new_state); - } - private function _checkConnectivityState($new_state) { - if ($new_state == \Grpc\CHANNEL_READY) { - return true; + private function _checkConnectivityState($new_state) + { + if ($new_state == \Grpc\CHANNEL_READY) { + return true; + } + if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) { + throw new \Exception('Failed to connect to server'); + } + + return false; } - if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) { - throw new \Exception('Failed to connect to server'); + + /** + * Close the communication channel associated with this stub. + */ + public function close() + { + $this->channel->close(); } - return false; - } - /** - * Close the communication channel associated with this stub - */ - public function close() { - $channel->close(); - } + /** + * constructs the auth uri for the jwt. + */ + private function _get_jwt_aud_uri($method) + { + $last_slash_idx = strrpos($method, '/'); + if ($last_slash_idx === false) { + throw new \InvalidArgumentException( + 'service name must have a slash'); + } + $service_name = substr($method, 0, $last_slash_idx); - /** - * constructs the auth uri for the jwt - */ - private function _get_jwt_aud_uri($method) { - $last_slash_idx = strrpos($method, '/'); - if ($last_slash_idx === false) { - return false; + return 'https://'.$this->hostname.$service_name; } - $service_name = substr($method, 0, $last_slash_idx); - return "https://" . $this->hostname . $service_name; - } - /** - * extract $timeout from $metadata - * @param $metadata The metadata map - * @return list($metadata_copy, $timeout) - */ - private function _extract_timeout_from_metadata($metadata) { - $timeout = false; - $metadata_copy = $metadata; - if (isset($metadata['timeout'])) { - $timeout = $metadata['timeout']; - unset($metadata_copy['timeout']); + /** + * extract $timeout from $metadata. + * + * @param $metadata The metadata map + * + * @return list($metadata_copy, $timeout) + */ + private function _extract_timeout_from_metadata($metadata) + { + $timeout = false; + $metadata_copy = $metadata; + if (isset($metadata['timeout'])) { + $timeout = $metadata['timeout']; + unset($metadata_copy['timeout']); + } + + return [$metadata_copy, $timeout]; } - return array($metadata_copy, $timeout); - } - /** - * validate and normalize the metadata array - * @param $metadata The metadata map - * @return $metadata Validated and key-normalized metadata map - * @throw InvalidArgumentException if key contains invalid characters - */ - private function _validate_and_normalize_metadata($metadata) { - $metadata_copy = array(); - foreach ($metadata as $key => $value) { - if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) { - throw new \InvalidArgumentException( - 'Metadata keys must be nonempty strings containing only '. - 'alphanumeric characters, hyphens and underscores'); - } - $metadata_copy[strtolower($key)] = $value; + /** + * validate and normalize the metadata array. + * + * @param $metadata The metadata map + * + * @return $metadata Validated and key-normalized metadata map + * @throw InvalidArgumentException if key contains invalid characters + */ + private function _validate_and_normalize_metadata($metadata) + { + $metadata_copy = []; + foreach ($metadata as $key => $value) { + if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) { + throw new \InvalidArgumentException( + 'Metadata keys must be nonempty strings containing only '. + 'alphanumeric characters, hyphens and underscores'); + } + $metadata_copy[strtolower($key)] = $value; + } + + return $metadata_copy; } - return $metadata_copy; - } - /* This class is intended to be subclassed by generated code, so all functions - begin with "_" to avoid name collisions. */ + /* This class is intended to be subclassed by generated code, so + * all functions begin with "_" to avoid name collisions. */ - /** - * Call a remote method that takes a single argument and has a single output - * - * @param string $method The name of the method to call - * @param $argument The argument to the method - * @param callable $deserialize A function that deserializes the response - * @param array $metadata A metadata map to send to the server - * @return SimpleSurfaceActiveCall The active call object - */ - public function _simpleRequest($method, - $argument, - callable $deserialize, - $metadata = array(), - $options = array()) { - list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); - $call = new UnaryCall($this->channel, $method, $deserialize, $timeout); - $jwt_aud_uri = $this->_get_jwt_aud_uri($method); - if (is_callable($this->update_metadata)) { - $actual_metadata = call_user_func($this->update_metadata, + /** + * Call a remote method that takes a single argument and has a + * single output. + * + * @param string $method The name of the method to call + * @param $argument The argument to the method + * @param callable $deserialize A function that deserializes the response + * @param array $metadata A metadata map to send to the server + * + * @return SimpleSurfaceActiveCall The active call object + */ + public function _simpleRequest($method, + $argument, + callable $deserialize, + $metadata = [], + $options = []) + { + list($actual_metadata, $timeout) = + $this->_extract_timeout_from_metadata($metadata); + $call = new UnaryCall($this->channel, + $method, + $deserialize, + $timeout); + $jwt_aud_uri = $this->_get_jwt_aud_uri($method); + if (is_callable($this->update_metadata)) { + $actual_metadata = call_user_func($this->update_metadata, $actual_metadata, $jwt_aud_uri); + } + $actual_metadata = $this->_validate_and_normalize_metadata( + $actual_metadata); + $call->start($argument, $actual_metadata, $options); + + return $call; } - $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); - $call->start($argument, $actual_metadata, $options); - return $call; - } - /** - * Call a remote method that takes a stream of arguments and has a single - * output - * - * @param string $method The name of the method to call - * @param $arguments An array or Traversable of arguments to stream to the - * server - * @param callable $deserialize A function that deserializes the response - * @param array $metadata A metadata map to send to the server - * @return ClientStreamingSurfaceActiveCall The active call object - */ - public function _clientStreamRequest($method, - callable $deserialize, - $metadata = array()) { - list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); - $call = new ClientStreamingCall($this->channel, $method, $deserialize, $timeout); - $jwt_aud_uri = $this->_get_jwt_aud_uri($method); - if (is_callable($this->update_metadata)) { - $actual_metadata = call_user_func($this->update_metadata, + /** + * Call a remote method that takes a stream of arguments and has a single + * output. + * + * @param string $method The name of the method to call + * @param $arguments An array or Traversable of arguments to stream to the + * server + * @param callable $deserialize A function that deserializes the response + * @param array $metadata A metadata map to send to the server + * + * @return ClientStreamingSurfaceActiveCall The active call object + */ + public function _clientStreamRequest($method, + callable $deserialize, + $metadata = []) + { + list($actual_metadata, $timeout) = + $this->_extract_timeout_from_metadata($metadata); + $call = new ClientStreamingCall($this->channel, + $method, + $deserialize, + $timeout); + $jwt_aud_uri = $this->_get_jwt_aud_uri($method); + if (is_callable($this->update_metadata)) { + $actual_metadata = call_user_func($this->update_metadata, $actual_metadata, $jwt_aud_uri); + } + $actual_metadata = $this->_validate_and_normalize_metadata( + $actual_metadata); + $call->start($actual_metadata); + + return $call; } - $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); - $call->start($actual_metadata); - return $call; - } - /** - * Call a remote method that takes a single argument and returns a stream of - * responses - * - * @param string $method The name of the method to call - * @param $argument The argument to the method - * @param callable $deserialize A function that deserializes the responses - * @param array $metadata A metadata map to send to the server - * @return ServerStreamingSurfaceActiveCall The active call object - */ - public function _serverStreamRequest($method, - $argument, - callable $deserialize, - $metadata = array(), - $options = array()) { - list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); - $call = new ServerStreamingCall($this->channel, $method, $deserialize, $timeout); - $jwt_aud_uri = $this->_get_jwt_aud_uri($method); - if (is_callable($this->update_metadata)) { - $actual_metadata = call_user_func($this->update_metadata, + /** + * Call a remote method that takes a single argument and returns a stream of + * responses. + * + * @param string $method The name of the method to call + * @param $argument The argument to the method + * @param callable $deserialize A function that deserializes the responses + * @param array $metadata A metadata map to send to the server + * + * @return ServerStreamingSurfaceActiveCall The active call object + */ + public function _serverStreamRequest($method, + $argument, + callable $deserialize, + $metadata = [], + $options = []) + { + list($actual_metadata, $timeout) = + $this->_extract_timeout_from_metadata($metadata); + $call = new ServerStreamingCall($this->channel, + $method, + $deserialize, + $timeout); + $jwt_aud_uri = $this->_get_jwt_aud_uri($method); + if (is_callable($this->update_metadata)) { + $actual_metadata = call_user_func($this->update_metadata, $actual_metadata, $jwt_aud_uri); + } + $actual_metadata = $this->_validate_and_normalize_metadata( + $actual_metadata); + $call->start($argument, $actual_metadata, $options); + + return $call; } - $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); - $call->start($argument, $actual_metadata, $options); - return $call; - } - /** - * Call a remote method with messages streaming in both directions - * - * @param string $method The name of the method to call - * @param callable $deserialize A function that deserializes the responses - * @param array $metadata A metadata map to send to the server - * @return BidiStreamingSurfaceActiveCall The active call object - */ - public function _bidiRequest($method, - callable $deserialize, - $metadata = array()) { - list($actual_metadata, $timeout) = $this->_extract_timeout_from_metadata($metadata); - $call = new BidiStreamingCall($this->channel, $method, $deserialize, $timeout); - $jwt_aud_uri = $this->_get_jwt_aud_uri($method); - if (is_callable($this->update_metadata)) { - $actual_metadata = call_user_func($this->update_metadata, + /** + * Call a remote method with messages streaming in both directions. + * + * @param string $method The name of the method to call + * @param callable $deserialize A function that deserializes the responses + * @param array $metadata A metadata map to send to the server + * + * @return BidiStreamingSurfaceActiveCall The active call object + */ + public function _bidiRequest($method, + callable $deserialize, + $metadata = []) + { + list($actual_metadata, $timeout) = + $this->_extract_timeout_from_metadata($metadata); + $call = new BidiStreamingCall($this->channel, + $method, + $deserialize, + $timeout); + $jwt_aud_uri = $this->_get_jwt_aud_uri($method); + if (is_callable($this->update_metadata)) { + $actual_metadata = call_user_func($this->update_metadata, $actual_metadata, $jwt_aud_uri); + } + $actual_metadata = $this->_validate_and_normalize_metadata( + $actual_metadata); + $call->start($actual_metadata); + + return $call; } - $actual_metadata = $this->_validate_and_normalize_metadata($actual_metadata); - $call->start($actual_metadata); - return $call; - } } |