diff options
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 8 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 4 | ||||
-rw-r--r-- | src/core/support/time_posix.c | 14 | ||||
-rw-r--r-- | src/core/transport/chttp2/hpack_table.c | 12 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 12 | ||||
-rw-r--r-- | src/core/transport/stream_op.c | 6 | ||||
-rw-r--r-- | src/cpp/client/secure_credentials.cc | 15 | ||||
-rw-r--r-- | src/cpp/server/secure_server_credentials.cc | 24 | ||||
-rw-r--r-- | test/cpp/end2end/streaming_throughput_test.cc | 30 |
10 files changed, 73 insertions, 56 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index b609e83c11..b22eaa6288 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -211,12 +211,12 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( /* TODO(klempner): We might want to consider making err and pri * separate events */ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write = ep_ev[i].events & EPOLLOUT; - if (read || cancel) { + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (read_ev || cancel) { grpc_fd_become_readable(exec_ctx, fd); } - if (write || cancel) { + if (write_ev || cancel) { grpc_fd_become_writable(exec_ctx, fd); } } diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index a582f4a7c3..13bd67576f 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -478,8 +478,8 @@ done: return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; } -int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) { - return (index < s->nports) ? s->ports[index].fd : -1; +int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned port_index) { + return (port_index < s->nports) ? s->ports[port_index].fd : -1; } void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index d884359aa4..a8d611c3f2 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -399,8 +399,8 @@ done: return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; } -int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { - return (index < s->nports) ? s->ports[index].fd : -1; +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { + return (port_index < s->nports) ? s->ports[port_index].fd : -1; } void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index eedfd0a060..78f2c2bb77 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -52,11 +52,11 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) { #if _POSIX_TIMERS > 0 static gpr_timespec gpr_from_timespec(struct timespec ts, - gpr_clock_type clock) { + gpr_clock_type clock_type) { gpr_timespec rv; rv.tv_sec = ts.tv_sec; rv.tv_nsec = (int)ts.tv_nsec; - rv.clock_type = clock; + rv.clock_type = clock_type; return rv; } @@ -65,16 +65,16 @@ static clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC, CLOCK_REALTIME}; void gpr_time_init(void) {} -gpr_timespec gpr_now(gpr_clock_type clock) { +gpr_timespec gpr_now(gpr_clock_type clock_type) { struct timespec now; - GPR_ASSERT(clock != GPR_TIMESPAN); - if (clock == GPR_CLOCK_PRECISE) { + GPR_ASSERT(clock_type != GPR_TIMESPAN); + if (clock_type == GPR_CLOCK_PRECISE) { gpr_timespec ret; gpr_precise_clock_now(&ret); return ret; } else { - clock_gettime(clockid_for_gpr_clock[clock], &now); - return gpr_from_timespec(now, clock); + clock_gettime(clockid_for_gpr_clock[clock_type], &now); + return gpr_from_timespec(now, clock_type); } } #else diff --git a/src/core/transport/chttp2/hpack_table.c b/src/core/transport/chttp2/hpack_table.c index d5cb752789..c442c2c341 100644 --- a/src/core/transport/chttp2/hpack_table.c +++ b/src/core/transport/chttp2/hpack_table.c @@ -193,15 +193,15 @@ void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl) { } grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl, - gpr_uint32 index) { + gpr_uint32 tbl_index) { /* Static table comes first, just return an entry from it */ - if (index <= GRPC_CHTTP2_LAST_STATIC_ENTRY) { - return tbl->static_ents[index - 1]; + if (tbl_index <= GRPC_CHTTP2_LAST_STATIC_ENTRY) { + return tbl->static_ents[tbl_index - 1]; } /* Otherwise, find the value in the list of valid entries */ - index -= (GRPC_CHTTP2_LAST_STATIC_ENTRY + 1); - if (index < tbl->num_ents) { - gpr_uint32 offset = (tbl->num_ents - 1u - index + tbl->first_ent) % + tbl_index -= (GRPC_CHTTP2_LAST_STATIC_ENTRY + 1); + if (tbl_index < tbl->num_ents) { + gpr_uint32 offset = (tbl->num_ents - 1u - tbl_index + tbl->first_ent) % GRPC_CHTTP2_MAX_TABLE_COUNT; return tbl->ents[offset]; } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index b1f1db05d2..ec97af3d5d 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -274,10 +274,11 @@ static grpc_mdelem *add_elem(grpc_chttp2_hpack_compressor *c, return elem_to_unref; } -static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 index, +static void emit_indexed(grpc_chttp2_hpack_compressor *c, gpr_uint32 elem_index, framer_state *st) { - gpr_uint32 len = GRPC_CHTTP2_VARINT_LENGTH(index, 1); - GRPC_CHTTP2_WRITE_VARINT(index, 1, 0x80, add_tiny_header_data(st, len), len); + gpr_uint32 len = GRPC_CHTTP2_VARINT_LENGTH(elem_index, 1); + GRPC_CHTTP2_WRITE_VARINT(elem_index, 1, 0x80, add_tiny_header_data(st, len), + len); } static gpr_slice get_wire_value(grpc_mdelem *elem, gpr_uint8 *huffman_prefix) { @@ -363,9 +364,10 @@ static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c, add_header_data(st, gpr_slice_ref(value_slice)); } -static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, gpr_uint32 index) { +static gpr_uint32 dynidx(grpc_chttp2_hpack_compressor *c, + gpr_uint32 elem_index) { return 1 + GRPC_CHTTP2_LAST_STATIC_ENTRY + c->tail_remote_index + - c->table_elems - index; + c->table_elems - elem_index; } /* encode an mdelem; returns metadata element to unref */ diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 038586d48e..1cb2bd7c59 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -274,14 +274,14 @@ void grpc_metadata_batch_link_tail(grpc_metadata_batch *batch, } void grpc_metadata_batch_merge(grpc_metadata_batch *target, - grpc_metadata_batch *add) { + grpc_metadata_batch *to_add) { grpc_linked_mdelem *l; grpc_linked_mdelem *next; - for (l = add->list.head; l; l = next) { + for (l = to_add->list.head; l; l = next) { next = l->next; link_tail(&target->list, l); } - for (l = add->garbage.head; l; l = next) { + for (l = to_add->garbage.head; l; l = next) { next = l->next; link_tail(&target->garbage, l); } diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 1693cf740b..8299ebeb8a 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -154,10 +154,10 @@ void MetadataCredentialsPluginWrapper::Destroy(void* wrapper) { void MetadataCredentialsPluginWrapper::GetMetadata( void* wrapper, const char* service_url, grpc_credentials_plugin_metadata_cb cb, void* user_data) { - GPR_ASSERT(wrapper != nullptr); + GPR_ASSERT(wrapper); MetadataCredentialsPluginWrapper* w = reinterpret_cast<MetadataCredentialsPluginWrapper*>(wrapper); - if (w->plugin_ == nullptr) { + if (!w->plugin_) { cb(user_data, NULL, 0, GRPC_STATUS_OK, NULL); return; } @@ -177,11 +177,12 @@ void MetadataCredentialsPluginWrapper::InvokePlugin( Status status = plugin_->GetMetadata(service_url, &metadata); std::vector<grpc_metadata> md; for (auto it = metadata.begin(); it != metadata.end(); ++it) { - md.push_back({it->first.c_str(), - it->second.data(), - it->second.size(), - 0, - {{nullptr, nullptr, nullptr, nullptr}}}); + grpc_metadata md_entry; + md_entry.key = it->first.c_str(); + md_entry.value = it->second.data(); + md_entry.value_length = it->second.size(); + md_entry.flags = 0; + md.push_back(md_entry); } cb(user_data, md.empty() ? nullptr : &md[0], md.size(), static_cast<grpc_status_code>(status.error_code()), diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc index 90afebfd2e..7c828cb125 100644 --- a/src/cpp/server/secure_server_credentials.cc +++ b/src/cpp/server/secure_server_credentials.cc @@ -52,7 +52,7 @@ void AuthMetadataProcessorAyncWrapper::Process( void* wrapper, grpc_auth_context* context, const grpc_metadata* md, size_t num_md, grpc_process_auth_metadata_done_cb cb, void* user_data) { auto* w = reinterpret_cast<AuthMetadataProcessorAyncWrapper*>(wrapper); - if (w->processor_ == nullptr) { + if (!w->processor_) { // Early exit. cb(user_data, nullptr, 0, nullptr, 0, GRPC_STATUS_OK, nullptr); return; @@ -86,20 +86,22 @@ void AuthMetadataProcessorAyncWrapper::InvokeProcessor( std::vector<grpc_metadata> consumed_md; for (auto it = consumed_metadata.begin(); it != consumed_metadata.end(); ++it) { - consumed_md.push_back({it->first.c_str(), - it->second.data(), - it->second.size(), - 0, - {{nullptr, nullptr, nullptr, nullptr}}}); + grpc_metadata md_entry; + md_entry.key = it->first.c_str(); + md_entry.value = it->second.data(); + md_entry.value_length = it->second.size(); + md_entry.flags = 0; + consumed_md.push_back(md_entry); } std::vector<grpc_metadata> response_md; for (auto it = response_metadata.begin(); it != response_metadata.end(); ++it) { - response_md.push_back({it->first.c_str(), - it->second.data(), - it->second.size(), - 0, - {{nullptr, nullptr, nullptr, nullptr}}}); + grpc_metadata md_entry; + md_entry.key = it->first.c_str(); + md_entry.value = it->second.data(); + md_entry.value_length = it->second.size(); + md_entry.flags = 0; + response_md.push_back(md_entry); } auto consumed_md_data = consumed_md.empty() ? nullptr : &consumed_md[0]; auto response_md_data = response_md.empty() ? nullptr : &response_md[0]; diff --git a/test/cpp/end2end/streaming_throughput_test.cc b/test/cpp/end2end/streaming_throughput_test.cc index c1355b38f0..344bf507ce 100644 --- a/test/cpp/end2end/streaming_throughput_test.cc +++ b/test/cpp/end2end/streaming_throughput_test.cc @@ -31,9 +31,9 @@ * */ -#include <atomic> #include <mutex> #include <thread> +#include <time.h> #include <grpc++/channel.h> #include <grpc++/client_context.h> @@ -44,6 +44,7 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/atm.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <gtest/gtest.h> @@ -99,12 +100,17 @@ namespace testing { class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { public: - static void BidiStream_Sender(ServerReaderWriter<EchoResponse, EchoRequest>* stream, std::atomic<bool>* should_exit) { + static void BidiStream_Sender(ServerReaderWriter<EchoResponse, EchoRequest>* stream, gpr_atm* should_exit) { EchoResponse response; response.set_message(kLargeString); - while (!should_exit->load()) { - // TODO(vpai): Decide if the below requires blocking annotation - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + while (gpr_atm_acq_load(should_exit) == static_cast<gpr_atm>(0)) { + struct timespec tv = {0, 1000000}; // 1 ms + struct timespec rem; + // TODO (vpai): Mark this blocking + while (nanosleep(&tv, &rem) != 0) { + tv = rem; + }; + stream->Write(response); } } @@ -114,14 +120,20 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { ServerReaderWriter<EchoResponse, EchoRequest>* stream) GRPC_OVERRIDE { EchoRequest request; - std::atomic<bool> should_exit(false); + gpr_atm should_exit; + gpr_atm_rel_store(&should_exit, static_cast<gpr_atm>(0)); + std::thread sender(std::bind(&TestServiceImpl::BidiStream_Sender, stream, &should_exit)); while (stream->Read(&request)) { - // TODO(vpai): Decide if the below requires blocking annotation - std::this_thread::sleep_for(std::chrono::milliseconds(3)); + struct timespec tv = {0, 3000000}; // 3 ms + struct timespec rem; + // TODO (vpai): Mark this blocking + while (nanosleep(&tv, &rem) != 0) { + tv = rem; + }; } - should_exit.store(true); + gpr_atm_rel_store(&should_exit, static_cast<gpr_atm>(1)); sender.join(); return Status::OK; } |