diff options
-rw-r--r-- | src/php/ext/grpc/call.c | 9 | ||||
-rw-r--r-- | src/php/ext/grpc/call_credentials.c | 11 | ||||
-rw-r--r-- | src/php/ext/grpc/channel_credentials.c | 6 | ||||
-rw-r--r-- | src/proto/grpc/testing/control.proto | 3 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 38 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 1 |
6 files changed, 58 insertions, 10 deletions
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 9c0f3f8a91..ff55c3cbfa 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -99,6 +99,7 @@ zval *grpc_parse_metadata_array(grpc_metadata_array 1 TSRMLS_CC); efree(str_key); efree(str_val); + PHP_GRPC_FREE_STD_ZVAL(array); return NULL; } php_grpc_add_next_index_stringl(data, str_val, @@ -127,10 +128,12 @@ bool create_metadata_array(zval *array, grpc_metadata_array *metadata) { HashTable *inner_array_hash; zval *value; zval *inner_array; + grpc_metadata_array_init(metadata); + metadata->count = 0; + metadata->metadata = NULL; if (Z_TYPE_P(array) != IS_ARRAY) { return false; } - grpc_metadata_array_init(metadata); array_hash = Z_ARRVAL_P(array); char *key; @@ -538,7 +541,9 @@ cleanup: */ PHP_METHOD(Call, getPeer) { wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis()); - PHP_GRPC_RETURN_STRING(grpc_call_get_peer(call->wrapped), 1); + char *peer = grpc_call_get_peer(call->wrapped); + PHP_GRPC_RETVAL_STRING(peer, 1); + gpr_free(peer); } /** diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c index a395d53614..41c488a79c 100644 --- a/src/php/ext/grpc/call_credentials.c +++ b/src/php/ext/grpc/call_credentials.c @@ -120,6 +120,8 @@ PHP_METHOD(CallCredentials, createFromPlugin) { fci->params, fci->param_count) == FAILURE) { zend_throw_exception(spl_ce_InvalidArgumentException, "createFromPlugin expects 1 callback", 1 TSRMLS_CC); + free(fci); + free(fci_cache); return; } @@ -183,15 +185,17 @@ int plugin_get_metadata( *status = GRPC_STATUS_OK; *error_details = NULL; + bool should_return = false; grpc_metadata_array metadata; if (retval == NULL || Z_TYPE_P(retval) != IS_ARRAY) { *status = GRPC_STATUS_INVALID_ARGUMENT; - return true; // Synchronous return. + should_return = true; // Synchronous return. } if (!create_metadata_array(retval, &metadata)) { *status = GRPC_STATUS_INVALID_ARGUMENT; - return true; // Synchronous return. + should_return = true; // Synchronous return. + grpc_php_metadata_array_destroy_including_entries(&metadata); } if (retval != NULL) { @@ -204,6 +208,9 @@ int plugin_get_metadata( PHP_GRPC_FREE_STD_ZVAL(retval); #endif } + if (should_return) { + return true; + } if (metadata.count > GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX) { *status = GRPC_STATUS_INTERNAL; diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c index ed74b991bd..624d7cc75c 100644 --- a/src/php/ext/grpc/channel_credentials.c +++ b/src/php/ext/grpc/channel_credentials.c @@ -57,6 +57,10 @@ static grpc_ssl_roots_override_result get_ssl_roots_override( /* Frees and destroys an instance of wrapped_grpc_channel_credentials */ PHP_GRPC_FREE_WRAPPED_FUNC_START(wrapped_grpc_channel_credentials) + if (p->hashstr != NULL) { + free(p->hashstr); + p->hashstr = NULL; + } if (p->wrapped != NULL) { grpc_channel_credentials_release(p->wrapped); p->wrapped = NULL; @@ -153,7 +157,7 @@ PHP_METHOD(ChannelCredentials, createSsl) { } php_grpc_int hashkey_len = root_certs_length + cert_chain_length; - char *hashkey = emalloc(hashkey_len); + char *hashkey = emalloc(hashkey_len + 1); if (root_certs_length > 0) { strcpy(hashkey, pem_root_certs); } diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 2ff2e4e8a2..57592662c4 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -108,6 +108,9 @@ message ClientConfig { // Number of messages on a stream before it gets finished/restarted int32 messages_per_stream = 18; + + // Use coalescing API when possible. + bool use_coalesce_api = 19; } message ClientStatus { ClientStats stats = 1; } diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 7cf9d3ea7e..e3fba36a7a 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -82,6 +82,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextUnaryImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported. StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -349,10 +350,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { next_state_(State::INVALID), callback_(on_done), next_issue_(next_issue), - prepare_req_(prepare_req) {} + prepare_req_(prepare_req), + coalesce_(false) {} ~ClientRpcContextStreamingPingPongImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { - StartInternal(cq, config.messages_per_stream()); + StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api()); } bool RunNextState(bool ok, HistogramEntry* entry) override { while (true) { @@ -375,7 +377,12 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { } start_ = UsageTimer::Now(); next_state_ = State::WRITE_DONE; - stream_->Write(req_, ClientRpcContext::tag(this)); + if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) { + stream_->WriteLast(req_, WriteOptions(), + ClientRpcContext::tag(this)); + } else { + stream_->Write(req_, ClientRpcContext::tag(this)); + } return true; case State::WRITE_DONE: if (!ok) { @@ -391,6 +398,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { if ((messages_per_stream_ != 0) && (++messages_issued_ >= messages_per_stream_)) { next_state_ = State::WRITES_DONE_DONE; + if (coalesce_) { + // WritesDone should have been called on the last Write. + // loop around to call Finish. + break; + } stream_->WritesDone(ClientRpcContext::tag(this)); return true; } @@ -413,7 +425,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { void StartNewClone(CompletionQueue* cq) override { auto* clone = new ClientRpcContextStreamingPingPongImpl( stub_, req_, next_issue_, prepare_req_, callback_); - clone->StartInternal(cq, messages_per_stream_); + clone->StartInternal(cq, messages_per_stream_, coalesce_); } void TryCancel() override { context_.TryCancel(); } @@ -449,14 +461,27 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext { // Allow a limit on number of messages in a stream int messages_per_stream_; int messages_issued_; + // Whether to use coalescing API. + bool coalesce_; - void StartInternal(CompletionQueue* cq, int messages_per_stream) { + void StartInternal(CompletionQueue* cq, int messages_per_stream, + bool coalesce) { cq_ = cq; messages_per_stream_ = messages_per_stream; messages_issued_ = 0; + coalesce_ = coalesce; + if (coalesce_) { + GPR_ASSERT(messages_per_stream_ != 0); + context_.set_initial_metadata_corked(true); + } stream_ = prepare_req_(stub_, &context_, cq); next_state_ = State::STREAM_IDLE; stream_->StartCall(ClientRpcContext::tag(this)); + if (coalesce_) { + // When the intial metadata is corked, the tag will not come back and we + // need to manually drive the state machine. + RunNextState(true, nullptr); + } } }; @@ -512,6 +537,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromClientImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -641,6 +667,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextStreamingFromServerImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported StartInternal(cq); } bool RunNextState(bool ok, HistogramEntry* entry) override { @@ -753,6 +780,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { prepare_req_(prepare_req) {} ~ClientRpcContextGenericStreamingImpl() override {} void Start(CompletionQueue* cq, const ClientConfig& config) override { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. StartInternal(cq, config.messages_per_stream()); } bool RunNextState(bool ok, HistogramEntry* entry) override { diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 82a3f0042d..a2ddbeb508 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -402,6 +402,7 @@ class SynchronousStreamingBothWaysClient final }; std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) { + GPR_ASSERT(!config.use_coalesce_api()); // not supported yet. switch (config.rpc_type()) { case UNARY: return std::unique_ptr<Client>(new SynchronousUnaryClient(config)); |