aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/php/ext/grpc/call.c9
-rw-r--r--src/php/ext/grpc/call_credentials.c11
-rw-r--r--src/php/ext/grpc/channel_credentials.c6
-rw-r--r--src/proto/grpc/testing/control.proto3
-rw-r--r--test/cpp/qps/client_async.cc38
-rw-r--r--test/cpp/qps/client_sync.cc1
6 files changed, 58 insertions, 10 deletions
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index 9c0f3f8a91..ff55c3cbfa 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -99,6 +99,7 @@ zval *grpc_parse_metadata_array(grpc_metadata_array
1 TSRMLS_CC);
efree(str_key);
efree(str_val);
+ PHP_GRPC_FREE_STD_ZVAL(array);
return NULL;
}
php_grpc_add_next_index_stringl(data, str_val,
@@ -127,10 +128,12 @@ bool create_metadata_array(zval *array, grpc_metadata_array *metadata) {
HashTable *inner_array_hash;
zval *value;
zval *inner_array;
+ grpc_metadata_array_init(metadata);
+ metadata->count = 0;
+ metadata->metadata = NULL;
if (Z_TYPE_P(array) != IS_ARRAY) {
return false;
}
- grpc_metadata_array_init(metadata);
array_hash = Z_ARRVAL_P(array);
char *key;
@@ -538,7 +541,9 @@ cleanup:
*/
PHP_METHOD(Call, getPeer) {
wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis());
- PHP_GRPC_RETURN_STRING(grpc_call_get_peer(call->wrapped), 1);
+ char *peer = grpc_call_get_peer(call->wrapped);
+ PHP_GRPC_RETVAL_STRING(peer, 1);
+ gpr_free(peer);
}
/**
diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c
index a395d53614..41c488a79c 100644
--- a/src/php/ext/grpc/call_credentials.c
+++ b/src/php/ext/grpc/call_credentials.c
@@ -120,6 +120,8 @@ PHP_METHOD(CallCredentials, createFromPlugin) {
fci->params, fci->param_count) == FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
"createFromPlugin expects 1 callback", 1 TSRMLS_CC);
+ free(fci);
+ free(fci_cache);
return;
}
@@ -183,15 +185,17 @@ int plugin_get_metadata(
*status = GRPC_STATUS_OK;
*error_details = NULL;
+ bool should_return = false;
grpc_metadata_array metadata;
if (retval == NULL || Z_TYPE_P(retval) != IS_ARRAY) {
*status = GRPC_STATUS_INVALID_ARGUMENT;
- return true; // Synchronous return.
+ should_return = true; // Synchronous return.
}
if (!create_metadata_array(retval, &metadata)) {
*status = GRPC_STATUS_INVALID_ARGUMENT;
- return true; // Synchronous return.
+ should_return = true; // Synchronous return.
+ grpc_php_metadata_array_destroy_including_entries(&metadata);
}
if (retval != NULL) {
@@ -204,6 +208,9 @@ int plugin_get_metadata(
PHP_GRPC_FREE_STD_ZVAL(retval);
#endif
}
+ if (should_return) {
+ return true;
+ }
if (metadata.count > GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX) {
*status = GRPC_STATUS_INTERNAL;
diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c
index ed74b991bd..624d7cc75c 100644
--- a/src/php/ext/grpc/channel_credentials.c
+++ b/src/php/ext/grpc/channel_credentials.c
@@ -57,6 +57,10 @@ static grpc_ssl_roots_override_result get_ssl_roots_override(
/* Frees and destroys an instance of wrapped_grpc_channel_credentials */
PHP_GRPC_FREE_WRAPPED_FUNC_START(wrapped_grpc_channel_credentials)
+ if (p->hashstr != NULL) {
+ free(p->hashstr);
+ p->hashstr = NULL;
+ }
if (p->wrapped != NULL) {
grpc_channel_credentials_release(p->wrapped);
p->wrapped = NULL;
@@ -153,7 +157,7 @@ PHP_METHOD(ChannelCredentials, createSsl) {
}
php_grpc_int hashkey_len = root_certs_length + cert_chain_length;
- char *hashkey = emalloc(hashkey_len);
+ char *hashkey = emalloc(hashkey_len + 1);
if (root_certs_length > 0) {
strcpy(hashkey, pem_root_certs);
}
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 2ff2e4e8a2..57592662c4 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -108,6 +108,9 @@ message ClientConfig {
// Number of messages on a stream before it gets finished/restarted
int32 messages_per_stream = 18;
+
+ // Use coalescing API when possible.
+ bool use_coalesce_api = 19;
}
message ClientStatus { ClientStats stats = 1; }
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 7cf9d3ea7e..e3fba36a7a 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -82,6 +82,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
prepare_req_(prepare_req) {}
~ClientRpcContextUnaryImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ GPR_ASSERT(!config.use_coalesce_api()); // not supported.
StartInternal(cq);
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
@@ -349,10 +350,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
next_state_(State::INVALID),
callback_(on_done),
next_issue_(next_issue),
- prepare_req_(prepare_req) {}
+ prepare_req_(prepare_req),
+ coalesce_(false) {}
~ClientRpcContextStreamingPingPongImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
- StartInternal(cq, config.messages_per_stream());
+ StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api());
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
while (true) {
@@ -375,7 +377,12 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
}
start_ = UsageTimer::Now();
next_state_ = State::WRITE_DONE;
- stream_->Write(req_, ClientRpcContext::tag(this));
+ if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) {
+ stream_->WriteLast(req_, WriteOptions(),
+ ClientRpcContext::tag(this));
+ } else {
+ stream_->Write(req_, ClientRpcContext::tag(this));
+ }
return true;
case State::WRITE_DONE:
if (!ok) {
@@ -391,6 +398,11 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
if ((messages_per_stream_ != 0) &&
(++messages_issued_ >= messages_per_stream_)) {
next_state_ = State::WRITES_DONE_DONE;
+ if (coalesce_) {
+ // WritesDone should have been called on the last Write.
+ // loop around to call Finish.
+ break;
+ }
stream_->WritesDone(ClientRpcContext::tag(this));
return true;
}
@@ -413,7 +425,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingPingPongImpl(
stub_, req_, next_issue_, prepare_req_, callback_);
- clone->StartInternal(cq, messages_per_stream_);
+ clone->StartInternal(cq, messages_per_stream_, coalesce_);
}
void TryCancel() override { context_.TryCancel(); }
@@ -449,14 +461,27 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
// Allow a limit on number of messages in a stream
int messages_per_stream_;
int messages_issued_;
+ // Whether to use coalescing API.
+ bool coalesce_;
- void StartInternal(CompletionQueue* cq, int messages_per_stream) {
+ void StartInternal(CompletionQueue* cq, int messages_per_stream,
+ bool coalesce) {
cq_ = cq;
messages_per_stream_ = messages_per_stream;
messages_issued_ = 0;
+ coalesce_ = coalesce;
+ if (coalesce_) {
+ GPR_ASSERT(messages_per_stream_ != 0);
+ context_.set_initial_metadata_corked(true);
+ }
stream_ = prepare_req_(stub_, &context_, cq);
next_state_ = State::STREAM_IDLE;
stream_->StartCall(ClientRpcContext::tag(this));
+ if (coalesce_) {
+ // When the intial metadata is corked, the tag will not come back and we
+ // need to manually drive the state machine.
+ RunNextState(true, nullptr);
+ }
}
};
@@ -512,6 +537,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromClientImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
StartInternal(cq);
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
@@ -641,6 +667,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromServerImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ GPR_ASSERT(!config.use_coalesce_api()); // not supported
StartInternal(cq);
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
@@ -753,6 +780,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
prepare_req_(prepare_req) {}
~ClientRpcContextGenericStreamingImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
StartInternal(cq, config.messages_per_stream());
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 82a3f0042d..a2ddbeb508 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -402,6 +402,7 @@ class SynchronousStreamingBothWaysClient final
};
std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
+ GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
switch (config.rpc_type()) {
case UNARY:
return std::unique_ptr<Client>(new SynchronousUnaryClient(config));