aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c8
-rw-r--r--src/core/iomgr/tcp_server_posix.c4
-rw-r--r--src/core/iomgr/udp_server.c4
-rw-r--r--src/core/support/time_posix.c14
-rw-r--r--src/core/transport/chttp2/hpack_table.c12
-rw-r--r--src/core/transport/chttp2/stream_encoder.c12
-rw-r--r--src/core/transport/stream_op.c6
-rw-r--r--src/cpp/client/secure_credentials.cc15
-rw-r--r--src/cpp/server/secure_server_credentials.cc24
-rw-r--r--test/cpp/end2end/streaming_throughput_test.cc30
10 files changed, 73 insertions, 56 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index b609e83c11..b22eaa6288 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -211,12 +211,12 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
/* TODO(klempner): We might want to consider making err and pri
* separate events */
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write = ep_ev[i].events & EPOLLOUT;
- if (read || cancel) {
+ int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write_ev = ep_ev[i].events & EPOLLOUT;
+ if (read_ev || cancel) {
grpc_fd_become_readable(exec_ctx, fd);
}
- if (write || cancel) {
+ if (write_ev || cancel) {
grpc_fd_become_writable(exec_ctx, fd);
}
}
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index a582f4a7c3..13bd67576f 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -478,8 +478,8 @@ done:
return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
}
-int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) {
- return (index < s->nports) ? s->ports[index].fd : -1;
+int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) {
+ return (port_index < s->nports) ? s->ports[port_index].fd : -1;
}
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index d884359aa4..a8d611c3f2 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -399,8 +399,8 @@ done:
return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
}
-int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
- return (index < s->nports) ? s->ports[index].fd : -1;
+int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
+ return (port_index < s->nports) ? s->ports[port_index].fd : -1;
}
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c
index eedfd0a060..78f2c2bb77 100644
--- a/src/core/support/time_posix.c
+++ b/src/core/support/time_posix.c
@@ -52,11 +52,11 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) {
#if _POSIX_TIMERS > 0
static gpr_timespec gpr_from_timespec(struct timespec ts,
- gpr_clock_type clock) {
+ gpr_clock_type clock_type) {
gpr_timespec rv;
rv.tv_sec = ts.tv_sec;
rv.tv_nsec = (int)ts.tv_nsec;
- rv.clock_type = clock;
+ rv.clock_type = clock_type;
return rv;
}
@@ -65,16 +65,16 @@ static clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC, CLOCK_REALTIME};
void gpr_time_init(void) {}
-gpr_timespec gpr_now(gpr_clock_type clock) {
+gpr_timespec gpr_now(gpr_clock_type clock_type) {
struct timespec now;
- GPR_ASSERT(clock != GPR_TIMESPAN);
- if (clock == GPR_CLOCK_PRECISE) {
+ GPR_ASSERT(clock_type != GPR_TIMESPAN);
+ if (clock_type == GPR_CLOCK_PRECISE) {
gpr_timespec ret;
gpr_precise_clock_now(&ret);
return ret;
} else {
- clock_gettime(clockid_for_gpr_clock[clock], &now);
- return gpr_from_timespec(now, clock);
+ clock_gettime(clockid_for_gpr_clock[clock_type], &now);
+ return gpr_from_timespec(now, clock_type);
}
}
#else
diff --git a/src/core/transport/chttp2/hpack_table.c b/src/core/transport/chttp2/hpack_table.c
index d5cb752789..c442c2c341 100644
--- a/src/core/transport/chttp2/hpack_table.c
+++ b/src/core/transport/chttp2/hpack_table.c
@@ -193,15 +193,15 @@ void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl) {
}
grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
- gpr_uint32 index) {
+ gpr_uint32 tbl_index) {
/* Static table comes first, just return an entry from it */
- if (index <= GRPC_CHTTP2_LAST_STATIC_ENTRY) {
- return tbl->static_ents[index - 1];
+ if (tbl_index <= GRPC_CHTTP2_LAST_STATIC_ENTRY) {
+ return tbl->static_ents[tbl_index - 1];
}
/* Otherwise, find the value in the list of valid entries */
- index -= (GRPC_CHTTP2_LAST_STATIC_ENTRY + 1);
- if (index < tbl->num_ents) {
- gpr_uint32 offset = (tbl->num_ents - 1u - index + tbl->first_ent) %
+ tbl_index -= (GRPC_CHTTP2_LAST_STATIC_ENTRY + 1);
+ if (tbl_index < tbl->num_ents) {
+ gpr_uint32 offset = (tbl->num_ents - 1u - tbl_index + tbl->first_ent) %
GRPC_CHTTP2_MAX_TABLE_COUNT;
return tbl->ents[offset];
}
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index b1f1db05d2..ec97af3d5d 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -274,10 +274,11 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c,
return elem_to_unref;
}
-static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 index,
+static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 elem_index,
framer_state *st) {
- gpr_uint32 len = GRPC_CHTTP2_VARINT_LENGTH(index, 1);
- GRPC_CHTTP2_WRITE_VARINT(index, 1, 0x80, add_tiny_header_data(st, len), len);
+ gpr_uint32 len = GRPC_CHTTP2_VARINT_LENGTH(elem_index, 1);
+ GRPC_CHTTP2_WRITE_VARINT(elem_index, 1, 0x80, add_tiny_header_data(st, len),
+ len);
}
static gpr_slice get_wire_value(grpc_mdelem *elem, gpr_uint8 *huffman_prefix) {
@@ -363,9 +364,10 @@ static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c,
add_header_data(st, gpr_slice_ref(value_slice));
}
-static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 index) {
+static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c,
+ gpr_uint32 elem_index) {
return 1 + GRPC_CHTTP2_LAST_STATIC_ENTRY + c->tail_remote_index +
- c->table_elems - index;
+ c->table_elems - elem_index;
}
/* encode an mdelem; returns metadata element to unref */
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 038586d48e..1cb2bd7c59 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -274,14 +274,14 @@ void grpc_metadata_batch_link_tail(grpc_metadata_batch *batch,
}
void grpc_metadata_batch_merge(grpc_metadata_batch *target,
- grpc_metadata_batch *add) {
+ grpc_metadata_batch *to_add) {
grpc_linked_mdelem *l;
grpc_linked_mdelem *next;
- for (l = add->list.head; l; l = next) {
+ for (l = to_add->list.head; l; l = next) {
next = l->next;
link_tail(&target->list, l);
}
- for (l = add->garbage.head; l; l = next) {
+ for (l = to_add->garbage.head; l; l = next) {
next = l->next;
link_tail(&target->garbage, l);
}
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 1693cf740b..8299ebeb8a 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -154,10 +154,10 @@ void MetadataCredentialsPluginWrapper::Destroy(void* wrapper) {
void MetadataCredentialsPluginWrapper::GetMetadata(
void* wrapper, const char* service_url,
grpc_credentials_plugin_metadata_cb cb, void* user_data) {
- GPR_ASSERT(wrapper != nullptr);
+ GPR_ASSERT(wrapper);
MetadataCredentialsPluginWrapper* w =
reinterpret_cast<MetadataCredentialsPluginWrapper*>(wrapper);
- if (w->plugin_ == nullptr) {
+ if (!w->plugin_) {
cb(user_data, NULL, 0, GRPC_STATUS_OK, NULL);
return;
}
@@ -177,11 +177,12 @@ void MetadataCredentialsPluginWrapper::InvokePlugin(
Status status = plugin_->GetMetadata(service_url, &metadata);
std::vector<grpc_metadata> md;
for (auto it = metadata.begin(); it != metadata.end(); ++it) {
- md.push_back({it->first.c_str(),
- it->second.data(),
- it->second.size(),
- 0,
- {{nullptr, nullptr, nullptr, nullptr}}});
+ grpc_metadata md_entry;
+ md_entry.key = it->first.c_str();
+ md_entry.value = it->second.data();
+ md_entry.value_length = it->second.size();
+ md_entry.flags = 0;
+ md.push_back(md_entry);
}
cb(user_data, md.empty() ? nullptr : &md[0], md.size(),
static_cast<grpc_status_code>(status.error_code()),
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index 90afebfd2e..7c828cb125 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -52,7 +52,7 @@ void AuthMetadataProcessorAyncWrapper::Process(
void* wrapper, grpc_auth_context* context, const grpc_metadata* md,
size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) {
auto* w = reinterpret_cast<AuthMetadataProcessorAyncWrapper*>(wrapper);
- if (w->processor_ == nullptr) {
+ if (!w->processor_) {
// Early exit.
cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_OK, nullptr);
return;
@@ -86,20 +86,22 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor(
std::vector<grpc_metadata> consumed_md;
for (auto it = consumed_metadata.begin(); it != consumed_metadata.end();
++it) {
- consumed_md.push_back({it->first.c_str(),
- it->second.data(),
- it->second.size(),
- 0,
- {{nullptr, nullptr, nullptr, nullptr}}});
+ grpc_metadata md_entry;
+ md_entry.key = it->first.c_str();
+ md_entry.value = it->second.data();
+ md_entry.value_length = it->second.size();
+ md_entry.flags = 0;
+ consumed_md.push_back(md_entry);
}
std::vector<grpc_metadata> response_md;
for (auto it = response_metadata.begin(); it != response_metadata.end();
++it) {
- response_md.push_back({it->first.c_str(),
- it->second.data(),
- it->second.size(),
- 0,
- {{nullptr, nullptr, nullptr, nullptr}}});
+ grpc_metadata md_entry;
+ md_entry.key = it->first.c_str();
+ md_entry.value = it->second.data();
+ md_entry.value_length = it->second.size();
+ md_entry.flags = 0;
+ response_md.push_back(md_entry);
}
auto consumed_md_data = consumed_md.empty() ? nullptr : &consumed_md[0];
auto response_md_data = response_md.empty() ? nullptr : &response_md[0];
diff --git a/test/cpp/end2end/streaming_throughput_test.cc b/test/cpp/end2end/streaming_throughput_test.cc
index c1355b38f0..344bf507ce 100644
--- a/test/cpp/end2end/streaming_throughput_test.cc
+++ b/test/cpp/end2end/streaming_throughput_test.cc
@@ -31,9 +31,9 @@
*
*/
-#include <atomic>
#include <mutex>
#include <thread>
+#include <time.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
@@ -44,6 +44,7 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/atm.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <gtest/gtest.h>
@@ -99,12 +100,17 @@ namespace testing {
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
public:
- static void BidiStream_Sender(ServerReaderWriter<EchoResponse, EchoRequest>* stream, std::atomic<bool>* should_exit) {
+ static void BidiStream_Sender(ServerReaderWriter<EchoResponse, EchoRequest>* stream, gpr_atm* should_exit) {
EchoResponse response;
response.set_message(kLargeString);
- while (!should_exit->load()) {
- // TODO(vpai): Decide if the below requires blocking annotation
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ while (gpr_atm_acq_load(should_exit) == static_cast<gpr_atm>(0)) {
+ struct timespec tv = {0, 1000000}; // 1 ms
+ struct timespec rem;
+ // TODO (vpai): Mark this blocking
+ while (nanosleep(&tv, &rem) != 0) {
+ tv = rem;
+ };
+
stream->Write(response);
}
}
@@ -114,14 +120,20 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
ServerReaderWriter<EchoResponse, EchoRequest>* stream)
GRPC_OVERRIDE {
EchoRequest request;
- std::atomic<bool> should_exit(false);
+ gpr_atm should_exit;
+ gpr_atm_rel_store(&should_exit, static_cast<gpr_atm>(0));
+
std::thread sender(std::bind(&TestServiceImpl::BidiStream_Sender, stream, &should_exit));
while (stream->Read(&request)) {
- // TODO(vpai): Decide if the below requires blocking annotation
- std::this_thread::sleep_for(std::chrono::milliseconds(3));
+ struct timespec tv = {0, 3000000}; // 3 ms
+ struct timespec rem;
+ // TODO (vpai): Mark this blocking
+ while (nanosleep(&tv, &rem) != 0) {
+ tv = rem;
+ };
}
- should_exit.store(true);
+ gpr_atm_rel_store(&should_exit, static_cast<gpr_atm>(1));
sender.join();
return Status::OK;
}