diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/block_annotate.h | 64 | ||||
-rw-r--r-- | src/core/lib/iomgr/call_combiner.cc (renamed from src/core/lib/iomgr/call_combiner.c) | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/call_combiner.h | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.cc (renamed from src/core/lib/iomgr/closure.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.cc (renamed from src/core/lib/iomgr/combiner.c) | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.h | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint.cc (renamed from src/core/lib/iomgr/endpoint.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair_posix.cc (renamed from src/core/lib/iomgr/endpoint_pair_posix.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair_uv.cc (renamed from src/core/lib/iomgr/endpoint_pair_uv.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint_pair_windows.cc (renamed from src/core/lib/iomgr/endpoint_pair_windows.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.cc (renamed from src/core/lib/iomgr/error.c) | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/error_internal.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.cc (renamed from src/core/lib/iomgr/ev_epoll1_linux.c) | 57 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc (renamed from src/core/lib/iomgr/ev_epollex_linux.c) | 82 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollsig_linux.cc (renamed from src/core/lib/iomgr/ev_epollsig_linux.c) | 45 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollsig_linux.h | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.cc (renamed from src/core/lib/iomgr/ev_poll_posix.c) | 43 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.cc (renamed from src/core/lib/iomgr/ev_posix.c) | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.h | 14 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_windows.cc (renamed from src/core/lib/iomgr/ev_windows.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.cc (renamed from src/core/lib/iomgr/exec_ctx.c) | 62 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 26 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.cc (renamed from src/core/lib/iomgr/executor.c) | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/gethostname.h | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/gethostname_fallback.cc (renamed from src/core/lib/iomgr/gethostname_fallback.c) | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/gethostname_host_name_max.cc (renamed from src/core/lib/iomgr/gethostname_host_name_max.c) | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/gethostname_sysconf.cc (renamed from src/core/lib/iomgr/gethostname_sysconf.c) | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/iocp_windows.cc (renamed from src/core/lib/iomgr/iocp_windows.c) | 29 | ||||
-rw-r--r-- | src/core/lib/iomgr/iocp_windows.h | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.cc (renamed from src/core/lib/iomgr/iomgr.c) | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr_internal.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr_posix.cc (renamed from src/core/lib/iomgr/iomgr_posix.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr_uv.cc (renamed from src/core/lib/iomgr/iomgr_uv.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr_uv.h | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr_windows.cc (renamed from src/core/lib/iomgr/iomgr_windows.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/is_epollexclusive_available.cc (renamed from src/core/lib/iomgr/is_epollexclusive_available.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/is_epollexclusive_available.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/load_file.cc (renamed from src/core/lib/iomgr/load_file.c) | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/lockfree_event.cc (renamed from src/core/lib/iomgr/lockfree_event.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/lockfree_event.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/network_status_tracker.cc (renamed from src/core/lib/iomgr/network_status_tracker.c) | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/network_status_tracker.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/polling_entity.cc (renamed from src/core/lib/iomgr/polling_entity.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/polling_entity.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset.h | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_set.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_set_uv.cc (renamed from src/core/lib/iomgr/pollset_set_uv.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_set_windows.cc (renamed from src/core/lib/iomgr/pollset_set_windows.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_uv.cc (renamed from src/core/lib/iomgr/pollset_uv.c) | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_uv.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_windows.cc (renamed from src/core/lib/iomgr/pollset_windows.c) | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_windows.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.cc (renamed from src/core/lib/iomgr/resolve_address_posix.c) | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_uv.cc (renamed from src/core/lib/iomgr/resolve_address_uv.c) | 25 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_windows.cc (renamed from src/core/lib/iomgr/resolve_address_windows.c) | 16 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.cc (renamed from src/core/lib/iomgr/resource_quota.c) | 35 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/sockaddr_utils.cc (renamed from src/core/lib/iomgr/sockaddr_utils.c) | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/sockaddr_utils.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_factory_posix.cc (renamed from src/core/lib/iomgr/socket_factory_posix.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_mutator.cc (renamed from src/core/lib/iomgr/socket_mutator.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_common_posix.cc (renamed from src/core/lib/iomgr/socket_utils_common_posix.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_linux.cc (renamed from src/core/lib/iomgr/socket_utils_linux.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_posix.cc (renamed from src/core/lib/iomgr/socket_utils_posix.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_posix.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_uv.cc (renamed from src/core/lib/iomgr/socket_utils_uv.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_utils_windows.cc (renamed from src/core/lib/iomgr/socket_utils_windows.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_windows.cc (renamed from src/core/lib/iomgr/socket_windows.c) | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_windows.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client.h | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.cc (renamed from src/core/lib/iomgr/tcp_client_posix.c) | 13 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_uv.cc (renamed from src/core/lib/iomgr/tcp_client_uv.c) | 24 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_windows.cc (renamed from src/core/lib/iomgr/tcp_client_windows.c) | 20 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc (renamed from src/core/lib/iomgr/tcp_posix.c) | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.cc (renamed from src/core/lib/iomgr/tcp_server_posix.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_utils_posix.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_utils_posix_common.cc (renamed from src/core/lib/iomgr/tcp_server_utils_posix_common.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc (renamed from src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.cc (renamed from src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_uv.cc (renamed from src/core/lib/iomgr/tcp_server_uv.c) | 20 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_windows.cc (renamed from src/core/lib/iomgr/tcp_server_windows.c) | 18 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.cc (renamed from src/core/lib/iomgr/tcp_uv.c) | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.cc (renamed from src/core/lib/iomgr/tcp_windows.c) | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/time_averaged_stats.cc (renamed from src/core/lib/iomgr/time_averaged_stats.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/time_averaged_stats.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer.h | 15 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.cc (renamed from src/core/lib/iomgr/timer_generic.c) | 128 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_heap.cc (renamed from src/core/lib/iomgr/timer_heap.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_heap.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.cc (renamed from src/core/lib/iomgr/timer_manager.c) | 54 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.cc (renamed from src/core/lib/iomgr/timer_uv.c) | 15 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc (renamed from src/core/lib/iomgr/udp_server.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/unix_sockets_posix.cc (renamed from src/core/lib/iomgr/unix_sockets_posix.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/unix_sockets_posix.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/unix_sockets_posix_noop.cc (renamed from src/core/lib/iomgr/unix_sockets_posix_noop.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.cc (renamed from src/core/lib/iomgr/wakeup_fd_cv.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_eventfd.cc (renamed from src/core/lib/iomgr/wakeup_fd_eventfd.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_nospecial.cc (renamed from src/core/lib/iomgr/wakeup_fd_nospecial.c) | 0 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_pipe.cc (renamed from src/core/lib/iomgr/wakeup_fd_pipe.c) | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_pipe.h | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_posix.cc (renamed from src/core/lib/iomgr/wakeup_fd_posix.c) | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_posix.h | 8 |
120 files changed, 906 insertions, 421 deletions
diff --git a/src/core/lib/iomgr/block_annotate.h b/src/core/lib/iomgr/block_annotate.h new file mode 100644 index 0000000000..fcbfe9eb1a --- /dev/null +++ b/src/core/lib/iomgr/block_annotate.h @@ -0,0 +1,64 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H +#define GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H + +#ifdef __cplusplus +extern "C" { +#endif + +void gpr_thd_start_blocking_region(); +void gpr_thd_end_blocking_region(); + +#ifdef __cplusplus +} +#endif + +/* These annotations identify the beginning and end of regions where + the code may block for reasons other than synchronization functions. + These include poll, epoll, and getaddrinfo. */ + +#ifdef GRPC_SCHEDULING_MARK_BLOCKING_REGION +#define GRPC_SCHEDULING_START_BLOCKING_REGION \ + do { \ + gpr_thd_start_blocking_region(); \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ + do { \ + gpr_thd_end_blocking_region(); \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(ec) \ + do { \ + gpr_thd_end_blocking_region(); \ + grpc_exec_ctx_invalidate_now((ec)); \ + } while (0) +#else +#define GRPC_SCHEDULING_START_BLOCKING_REGION \ + do { \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX \ + do { \ + } while (0) +#define GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(ec) \ + do { \ + grpc_exec_ctx_invalidate_now((ec)); \ + } while (0) +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_BLOCK_ANNOTATE_H */ diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.cc index 48d8eaec18..bab3df021a 100644 --- a/src/core/lib/iomgr/call_combiner.c +++ b/src/core/lib/iomgr/call_combiner.cc @@ -18,6 +18,8 @@ #include "src/core/lib/iomgr/call_combiner.h" +#include <inttypes.h> + #include <grpc/support/log.h> grpc_tracer_flag grpc_call_combiner_trace = diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index 5cfb3f0c07..527f84fce0 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -27,6 +27,10 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/mpscq.h" +#ifdef __cplusplus +extern "C" { +#endif + // A simple, lock-free mechanism for serializing activity related to a // single call. This is similar to a combiner but is more lightweight. // @@ -118,4 +122,8 @@ void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, grpc_call_combiner* call_combiner, grpc_error* error); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.cc index 00edefc6ae..00edefc6ae 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.cc diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.cc index f899b25f10..0e707ef839 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.cc @@ -19,6 +19,7 @@ #include "src/core/lib/iomgr/combiner.h" #include <assert.h> +#include <inttypes.h> #include <string.h> #include <grpc/support/alloc.h> diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 8e0434369d..10e5fb480d 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -26,6 +26,10 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/support/mpscq.h" +#ifdef __cplusplus +extern "C" { +#endif + // Provides serialized access to some resource. // Each action queued on a combiner is executed serially in a borrowed thread. // The actual thread executing actions may change over time (but there will only @@ -63,4 +67,8 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx); extern grpc_tracer_flag grpc_combiner_trace; +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.cc index 37cce335ca..37cce335ca 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.cc diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 8f0523a981..16ff0ab733 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -26,6 +26,10 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resource_quota.h" +#ifdef __cplusplus +extern "C" { +#endif + /* An endpoint caps a streaming channel between two communicating processes. Examples may be: a tcp socket, <stdin+stdout>, or some shared memory. */ @@ -95,4 +99,8 @@ struct grpc_endpoint { const grpc_endpoint_vtable *vtable; }; -#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index b60e62fc3e..f8830022f4 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -21,6 +21,10 @@ #include "src/core/lib/iomgr/endpoint.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { grpc_endpoint *client; grpc_endpoint *server; @@ -29,4 +33,8 @@ typedef struct { grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, grpc_channel_args *args); -#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.cc index 3ade2148ba..3ade2148ba 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.cc index ff72fe0492..ff72fe0492 100644 --- a/src/core/lib/iomgr/endpoint_pair_uv.c +++ b/src/core/lib/iomgr/endpoint_pair_uv.cc diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.cc index 782fa2fd69..782fa2fd69 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.c +++ b/src/core/lib/iomgr/endpoint_pair_windows.cc diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.cc index aa05501537..2ea6cf1301 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.cc @@ -15,9 +15,11 @@ * limitations under the License. * */ +#include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/error.h" +#include <inttypes.h> #include <string.h> #include <grpc/status.h> diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index b362948691..b36330a7ab 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -19,8 +19,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_ERROR_H #define GRPC_CORE_LIB_IOMGR_ERROR_H +#include <inttypes.h> #include <stdbool.h> -#include <stdint.h> #include <grpc/slice.h> #include <grpc/status.h> diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h index 7507484557..f718e06d4e 100644 --- a/src/core/lib/iomgr/error_internal.h +++ b/src/core/lib/iomgr/error_internal.h @@ -24,6 +24,10 @@ #include <grpc/support/sync.h> +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_linked_error grpc_linked_error; struct grpc_linked_error { @@ -57,4 +61,8 @@ struct grpc_error { bool grpc_error_is_special(grpc_error *err); -#endif /* GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.cc index 3ac12ab56f..6126e2771c 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -20,11 +20,11 @@ /* This polling engine is only relevant on linux kernels supporting epoll() */ #ifdef GRPC_LINUX_EPOLL - #include "src/core/lib/iomgr/ev_epoll1_linux.h" #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <string.h> @@ -40,12 +40,12 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; @@ -562,25 +562,17 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_TIMER_END("pollset_shutdown", 0); } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, now) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) { + return INT_MAX; + } else if (delta < 0) { return 0; + } else { + return (int)delta; } - - static const gpr_timespec round_up = { - 0, /* tv_sec */ - GPR_NS_PER_MS - 1, /* tv_nsec */ - GPR_TIMESPAN /* clock_type */ - }; - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); - return millis >= 1 ? millis : 1; } /* Process the epoll events found by do_epoll_wait() function. @@ -637,11 +629,11 @@ static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx, (i.e the designated poller thread) will be calling this function. So there is no need for any synchronization when accesing fields in g_epoll_set */ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { GPR_TIMER_BEGIN("do_epoll_wait", 0); int r; - int timeout = poll_deadline_to_millis_timeout(deadline, now); + int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (timeout != 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; } @@ -651,7 +643,7 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -669,9 +661,10 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, return GRPC_ERROR_NONE; } -static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, gpr_timespec *now, - gpr_timespec deadline) { +static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + grpc_millis deadline) { GPR_TIMER_BEGIN("begin_worker", 0); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; @@ -756,14 +749,15 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->shutting_down); } - if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && + if (gpr_cv_wait(&worker->cv, &pollset->mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) && worker->state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ SET_KICK_STATE(worker, KICKED); } } - *now = gpr_now(now->clock_type); + grpc_exec_ctx_invalidate_now(exec_ctx); } if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -942,7 +936,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; @@ -953,7 +947,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, return GRPC_ERROR_NONE; } - if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) { + if (begin_worker(exec_ctx, ps, &worker, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!ps->shutting_down); @@ -976,8 +970,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, designated poller */ if (gpr_atm_acq_load(&g_epoll_set.cursor) == gpr_atm_acq_load(&g_epoll_set.num_events)) { - append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline), - err_desc); + append_error(&error, do_epoll_wait(exec_ctx, ps, deadline), err_desc); } append_error(&error, process_epoll_events(exec_ctx, ps), err_desc); @@ -1257,7 +1250,7 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { #else /* defined(GRPC_LINUX_EPOLL) */ #if defined(GRPC_POSIX_SOCKET) -#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/ev_epoll1_linux.h" /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { diff --git a/src/core/lib/iomgr/ev_epoll1_linux.h b/src/core/lib/iomgr/ev_epoll1_linux.h index 0696e0df40..66fd826b49 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.h +++ b/src/core/lib/iomgr/ev_epoll1_linux.h @@ -22,8 +22,16 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/port.h" +#ifdef __cplusplus +extern "C" { +#endif + // a polling engine that utilizes a singleton epoll set and turnstile polling const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request); -#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL1_LINUX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.cc index 8eb4de44d9..59dd8fd2fe 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -25,6 +25,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <string.h> @@ -38,7 +39,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" -#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/is_epollexclusive_available.h" #include "src/core/lib/iomgr/lockfree_event.h" @@ -46,19 +47,18 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/spinlock.h" /******************************************************************************* * Polling object */ - typedef enum { PO_POLLING_GROUP, PO_POLLSET_SET, PO_POLLSET, - PO_FD, /* ordering is important: we always want to lock pollsets before fds: - this guarantees that using an fd as a pollable is safe */ + PO_FD, + /* ordering is important: we always want to lock pollsets before fds: + this guarantees that using an fd as a pollable is safe */ PO_EMPTY_POLLABLE, PO_COUNT } polling_obj_type; @@ -690,32 +690,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &pollset->pollable_obj.po.mu; } -/* Convert a timespec to milliseconds: - - Very small or negative poll times are clamped to zero to do a non-blocking - poll (which becomes spin polling) - - Other small values are rounded up to one millisecond - - Longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - Infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, now) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) + return INT_MAX; + else if (delta < 0) return 0; - } - - static const gpr_timespec round_up = { - 0, /* tv_sec */ - GPR_NS_PER_MS - 1, /* tv_nsec */ - GPR_TIMESPAN /* clock_type */ - }; - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up)); - return millis >= 1 ? millis : 1; + else + return (int)delta; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -810,9 +794,8 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable *p, gpr_timespec now, - gpr_timespec deadline) { - int timeout = poll_deadline_to_millis_timeout(deadline, now); + pollable *p, grpc_millis deadline) { + int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { char *desc = pollable_desc(p); @@ -829,7 +812,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); } if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); @@ -884,9 +867,10 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root, } /* Return true if this thread should poll */ -static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, gpr_timespec *now, - gpr_timespec deadline) { +static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + grpc_pollset_worker **worker_hdl, + grpc_millis deadline) { bool do_poll = true; if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; @@ -910,10 +894,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, worker->pollable_obj, worker, - poll_deadline_to_millis_timeout(deadline, *now)); + poll_deadline_to_millis_timeout(exec_ctx, deadline)); } while (do_poll && worker->pollable_obj->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, deadline)) { + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, worker->pollable_obj, worker); @@ -936,7 +921,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_mu_lock(&pollset->pollable_obj.po.mu); gpr_mu_lock(&worker->pollable_obj->po.mu); } - *now = gpr_now(now->clock_type); + grpc_exec_ctx_invalidate_now(exec_ctx); } return do_poll && pollset->shutdown_closure == NULL && @@ -967,14 +952,13 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (0 && GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64 - ".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p", - pollset, worker_hdl, &worker, now.tv_sec, now.tv_nsec, - deadline.tv_sec, deadline.tv_nsec, pollset->kicked_without_poller, - pollset->root_worker); + gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR + " deadline=%" PRIdPTR " kwp=%d root_worker=%p", + pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline, + pollset->kicked_without_poller, pollset->root_worker); } grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; @@ -985,7 +969,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->current_pollable_obj != &pollset->pollable_obj) { gpr_mu_lock(&pollset->current_pollable_obj->po.mu); } - if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { + if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); @@ -996,7 +980,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_unlock(&pollset->pollable_obj.po.mu); if (pollset->event_cursor == pollset->event_count) { append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, - now, deadline), + deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), @@ -1449,7 +1433,7 @@ const grpc_event_engine_vtable *grpc_init_epollex_linux( #else /* defined(GRPC_LINUX_EPOLL) */ #if defined(GRPC_POSIX_SOCKET) -#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/ev_epollex_linux.h" /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable *grpc_init_epollex_linux( diff --git a/src/core/lib/iomgr/ev_epollex_linux.h b/src/core/lib/iomgr/ev_epollex_linux.h index cff9b43c02..58cc5a24f8 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.h +++ b/src/core/lib/iomgr/ev_epollex_linux.h @@ -22,7 +22,15 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/port.h" +#ifdef __cplusplus +extern "C" { +#endif + const grpc_event_engine_vtable *grpc_init_epollex_linux( bool explicitly_requested); -#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLEX_LINUX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLEX_LINUX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.cc index 4d8bdf1401..035bdc4cb5 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -18,6 +18,8 @@ #include "src/core/lib/iomgr/port.h" +#include <grpc/grpc_posix.h> + /* This polling engine is only relevant on linux kernels supporting epoll() */ #ifdef GRPC_LINUX_EPOLL @@ -25,6 +27,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <pthread.h> #include <signal.h> @@ -40,13 +43,13 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -1089,30 +1092,16 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->shutdown_done = NULL; } -/* Convert a timespec to milliseconds: - - Very small or negative poll times are clamped to zero to do a non-blocking - poll (which becomes spin polling) - - Other small values are rounded up to one millisecond - - Longer than a millisecond polls are rounded up to the next nearest - millisecond to avoid spinning - - Infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis millis) { + if (millis == GRPC_MILLIS_INF_FUTURE) return -1; + grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); + if (delta > INT_MAX) + return INT_MAX; + else if (delta < 0) return 0; - } - timeout = gpr_time_sub(deadline, now); - int millis = gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); - return millis >= 1 ? millis : 1; + else + return (int)delta; } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, @@ -1243,7 +1232,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); if (ep_rv < 0) { if (errno != EINTR) { gpr_asprintf(&err_msg, @@ -1310,10 +1299,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { GPR_TIMER_BEGIN("pollset_work", 0); grpc_error *error = GRPC_ERROR_NONE; - int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); + int timeout_ms = poll_deadline_to_millis_timeout(exec_ctx, deadline); sigset_t new_mask; @@ -1756,7 +1745,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux( #else /* defined(GRPC_LINUX_EPOLL) */ #if defined(GRPC_POSIX_SOCKET) -#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/ev_epollsig_linux.h" /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable *grpc_init_epollsig_linux( diff --git a/src/core/lib/iomgr/ev_epollsig_linux.h b/src/core/lib/iomgr/ev_epollsig_linux.h index 88328682b3..c04ff27400 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.h +++ b/src/core/lib/iomgr/ev_epollsig_linux.h @@ -22,6 +22,10 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/port.h" +#ifdef __cplusplus +extern "C" { +#endif + const grpc_event_engine_vtable *grpc_init_epollsig_linux(bool explicit_request); #ifdef GRPC_LINUX_EPOLL @@ -30,4 +34,8 @@ void *grpc_pollset_get_polling_island(grpc_pollset *ps); bool grpc_are_polling_islands_equal(void *p, void *q); #endif /* defined(GRPC_LINUX_EPOLL) */ +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLLSIG_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.cc index e170702dca..036a35690c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -24,6 +24,7 @@ #include <assert.h> #include <errno.h> +#include <limits.h> #include <poll.h> #include <string.h> #include <sys/socket.h> @@ -37,12 +38,11 @@ #include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_cv.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/murmur_hash.h" #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) @@ -50,7 +50,6 @@ /******************************************************************************* * FD declarations */ - typedef struct grpc_fd_watcher { struct grpc_fd_watcher *next; struct grpc_fd_watcher *prev; @@ -200,8 +199,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline); /* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 @@ -876,7 +875,7 @@ static void work_combine_error(grpc_error **composite, grpc_error *error) { static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; @@ -945,7 +944,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd_watcher *watchers; struct pollfd *pfds; - timeout = poll_deadline_to_millis_timeout(deadline, now); + timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (pollset->fd_count + 2 <= inline_elements) { pfds = pollfd_space; @@ -991,7 +990,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GRPC_SCHEDULING_START_BLOCKING_REGION; GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); @@ -1068,13 +1067,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ - deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + deadline = 0; } keep_polling = 1; } - if (keep_polling) { - now = gpr_now(now.clock_type); - } } gpr_tls_set(&g_current_thread_poller, 0); if (added_worker) { @@ -1126,21 +1122,14 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } -static int poll_deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { - gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { - return -1; - } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; + if (deadline == 0) return 0; + grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx); + if (n < 0) return 0; + if (n > INT_MAX) return -1; + return (int)n; } /******************************************************************************* diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h index d444e60944..84b68155b5 100644 --- a/src/core/lib/iomgr/ev_poll_posix.h +++ b/src/core/lib/iomgr/ev_poll_posix.h @@ -21,7 +21,15 @@ #include "src/core/lib/iomgr/ev_posix.h" +#ifdef __cplusplus +extern "C" { +#endif + const grpc_event_engine_vtable *grpc_init_poll_posix(bool explicit_request); const grpc_event_engine_vtable *grpc_init_poll_cv_posix(bool explicit_request); -#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.cc index 4d3ae2228e..e4033fab1d 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.cc @@ -205,9 +205,9 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); + grpc_pollset_worker **worker, + grpc_millis deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, deadline); } grpc_error *grpc_pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1ff2ff1413..955326c5f7 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -27,6 +27,10 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +#ifdef __cplusplus +extern "C" { +#endif + extern grpc_tracer_flag grpc_polling_trace; /* Disabled by default */ typedef struct grpc_fd grpc_fd; @@ -52,8 +56,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_destroy)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); + grpc_pollset_worker **worker, + grpc_millis deadline); grpc_error *(*pollset_kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -158,4 +162,8 @@ extern grpc_poll_function_type grpc_poll_function; void grpc_set_event_engine_test_only(const grpc_event_engine_vtable *); const grpc_event_engine_vtable *grpc_get_event_engine_test_only(); -#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/ev_windows.c b/src/core/lib/iomgr/ev_windows.cc index c24dfaeaf7..c24dfaeaf7 100644 --- a/src/core/lib/iomgr/ev_windows.c +++ b/src/core/lib/iomgr/ev_windows.cc diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.cc index 41c69add17..3d17afcb8f 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -104,9 +104,69 @@ static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } -void grpc_exec_ctx_global_init(void) {} +static gpr_timespec + g_start_time[GPR_TIMESPAN + 1]; // assumes GPR_TIMESPAN is the + // last enum value in + // gpr_clock_type + +void grpc_exec_ctx_global_init(void) { + for (int i = 0; i < GPR_TIMESPAN; i++) { + g_start_time[i] = gpr_now((gpr_clock_type)i); + } + // allows uniform treatment in conversion functions + g_start_time[GPR_TIMESPAN] = gpr_time_0(GPR_TIMESPAN); +} + void grpc_exec_ctx_global_shutdown(void) {} +static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); + double x = + GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { + ts = gpr_time_sub(ts, g_start_time[ts.clock_type]); + double x = GPR_MS_PER_SEC * (double)ts.tv_sec + + (double)ts.tv_nsec / GPR_NS_PER_MS + + (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; + if (x < 0) return 0; + if (x > GPR_ATM_MAX) return GPR_ATM_MAX; + return (gpr_atm)x; +} + +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx) { + if (!exec_ctx->now_is_valid) { + exec_ctx->now = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + exec_ctx->now_is_valid = true; + } + return exec_ctx->now; +} + +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) { + exec_ctx->now_is_valid = false; +} + +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, + gpr_clock_type clock_type) { + if (clock_type == GPR_TIMESPAN) { + return gpr_time_from_millis(millis, GPR_TIMESPAN); + } + return gpr_time_add(g_start_time[clock_type], + gpr_time_from_millis(millis, GPR_TIMESPAN)); +} + +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) { + return timespec_to_atm_round_down(ts); +} + +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { + return timespec_to_atm_round_up(ts); +} + static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = { exec_ctx_run, exec_ctx_sched, "exec_ctx"}; static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable}; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index c89792c8c4..44b9be7aa9 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -19,10 +19,19 @@ #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H +#include <grpc/support/atm.h> #include <grpc/support/cpu.h> + #include "src/core/lib/iomgr/closure.h" -/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ +#ifdef __cplusplus +extern "C" { +#endif + +typedef gpr_atm grpc_millis; + +#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX +#define GRPC_MILLIS_INF_PAST GPR_ATM_MIN /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ @@ -66,6 +75,9 @@ struct grpc_exec_ctx { unsigned starting_cpu; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); + + bool now_is_valid; + grpc_millis now; }; /* initializer for grpc_exec_ctx: @@ -73,7 +85,7 @@ struct grpc_exec_ctx { #define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \ { \ GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, gpr_cpu_current_cpu(), \ - finish_check_arg, finish_check \ + finish_check_arg, finish_check, false, 0 \ } /* initialize an execution context at the top level of an API call into grpc @@ -106,4 +118,14 @@ void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_shutdown(void); +grpc_millis grpc_exec_ctx_now(grpc_exec_ctx *exec_ctx); +void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx); +gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); +grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); +grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); + +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.cc index 892385d7d7..92c3e70301 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.cc @@ -100,7 +100,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_init(&g_thread_state[i].mu); gpr_cv_init(&g_thread_state[i].cv); - g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; + g_thread_state[i].elems = GRPC_CLOSURE_LIST_INIT; } gpr_thd_options opt = gpr_thd_options_default(); @@ -172,12 +172,13 @@ static void executor_thread(void *arg) { } GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx); grpc_closure_list exec = ts->elems; - ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; + ts->elems = GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); if (GRPC_TRACER_ON(executor_trace)) { gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); } + grpc_exec_ctx_invalidate_now(&exec_ctx); subtract_depth = run_closures(&exec_ctx, exec); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 0412c02790..ab3fc901de 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,6 +21,10 @@ #include "src/core/lib/iomgr/closure.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef enum { GRPC_EXECUTOR_SHORT, GRPC_EXECUTOR_LONG @@ -45,4 +49,8 @@ bool grpc_executor_is_threaded(); grpc_executor_shutdown */ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool enable); -#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/gethostname.h b/src/core/lib/iomgr/gethostname.h index 9c6b9d8d42..f335fea586 100644 --- a/src/core/lib/iomgr/gethostname.h +++ b/src/core/lib/iomgr/gethostname.h @@ -19,8 +19,16 @@ #ifndef GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H #define GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H +#ifdef __cplusplus +extern "C" { +#endif + // Returns the hostname of the local machine. // Caller takes ownership of result. char *grpc_gethostname(); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H */ diff --git a/src/core/lib/iomgr/gethostname_fallback.c b/src/core/lib/iomgr/gethostname_fallback.cc index 6229461568..e6f4c2f760 100644 --- a/src/core/lib/iomgr/gethostname_fallback.c +++ b/src/core/lib/iomgr/gethostname_fallback.cc @@ -16,6 +16,7 @@ * */ +#include "src/core/lib/iomgr/gethostname.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_GETHOSTNAME_FALLBACK diff --git a/src/core/lib/iomgr/gethostname_host_name_max.c b/src/core/lib/iomgr/gethostname_host_name_max.cc index 4d0511412e..cdaf097c3e 100644 --- a/src/core/lib/iomgr/gethostname_host_name_max.c +++ b/src/core/lib/iomgr/gethostname_host_name_max.cc @@ -16,6 +16,7 @@ * */ +#include "src/core/lib/iomgr/gethostname.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_HOST_NAME_MAX diff --git a/src/core/lib/iomgr/gethostname_sysconf.c b/src/core/lib/iomgr/gethostname_sysconf.cc index 51bac5d69d..8441e0615e 100644 --- a/src/core/lib/iomgr/gethostname_sysconf.c +++ b/src/core/lib/iomgr/gethostname_sysconf.cc @@ -16,6 +16,7 @@ * */ +#include "src/core/lib/iomgr/gethostname.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SYSCONF diff --git a/src/core/lib/iomgr/iocp_windows.c b/src/core/lib/iomgr/iocp_windows.cc index c082179c0b..336cc86c75 100644 --- a/src/core/lib/iomgr/iocp_windows.c +++ b/src/core/lib/iomgr/iocp_windows.cc @@ -26,6 +26,7 @@ #include <grpc/support/log.h> #include <grpc/support/log_windows.h> #include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/iocp_windows.h" @@ -40,25 +41,17 @@ static gpr_atm g_custom_events = 0; static HANDLE g_iocp; -static DWORD deadline_to_millis_timeout(gpr_timespec deadline, - gpr_timespec now) { +static DWORD deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, + grpc_millis deadline) { gpr_timespec timeout; - static const int64_t max_spin_polling_us = 10; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { + if (deadline == GRPC_MILLIS_INF_FUTURE) { return INFINITE; } - if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( - max_spin_polling_us, - GPR_TIMESPAN))) <= 0) { - return 0; - } - timeout = gpr_time_sub(deadline, now); - return (DWORD)gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); + return (DWORD)GPR_MAX(0, deadline - grpc_exec_ctx_now(exec_ctx)); } grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, - gpr_timespec deadline) { + grpc_millis deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -67,9 +60,9 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket; grpc_winsocket_callback_info *info; GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); - success = GetQueuedCompletionStatus( - g_iocp, &bytes, &completion_key, &overlapped, - deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); + success = + GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, &overlapped, + deadline_to_millis_timeout(exec_ctx, deadline)); if (success == 0 && overlapped == NULL) { return GRPC_IOCP_WORK_TIMEOUT; } @@ -121,7 +114,7 @@ void grpc_iocp_flush(void) { grpc_iocp_work_status work_status; do { - work_status = grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); + work_status = grpc_iocp_work(&exec_ctx, GRPC_MILLIS_INF_PAST); } while (work_status == GRPC_IOCP_WORK_KICK || grpc_exec_ctx_flush(&exec_ctx)); } @@ -129,7 +122,7 @@ void grpc_iocp_flush(void) { void grpc_iocp_shutdown(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (gpr_atm_acq_load(&g_custom_events)) { - grpc_iocp_work(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_iocp_work(&exec_ctx, GRPC_MILLIS_INF_FUTURE); grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/iocp_windows.h b/src/core/lib/iomgr/iocp_windows.h index 9c89e868c5..aefe7a294a 100644 --- a/src/core/lib/iomgr/iocp_windows.h +++ b/src/core/lib/iomgr/iocp_windows.h @@ -23,6 +23,10 @@ #include "src/core/lib/iomgr/socket_windows.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef enum { GRPC_IOCP_WORK_WORK, GRPC_IOCP_WORK_TIMEOUT, @@ -30,11 +34,15 @@ typedef enum { } grpc_iocp_work_status; grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, - gpr_timespec deadline); + grpc_millis deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_flush(void); void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket *); -#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_IOCP_WINDOWS_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.cc index f63f190155..d6a5b4a76c 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.cc @@ -16,8 +16,11 @@ * */ +#include <grpc/support/port_platform.h> + #include "src/core/lib/iomgr/iomgr.h" +#include <inttypes.h> #include <stdlib.h> #include <string.h> @@ -48,7 +51,7 @@ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) { gpr_cv_init(&g_rcv); grpc_exec_ctx_global_init(); grpc_executor_init(exec_ctx); - grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_list_init(exec_ctx); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = (char *)"root"; grpc_network_status_init(); @@ -95,8 +98,9 @@ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL) == - GRPC_TIMERS_FIRED) { + exec_ctx->now_is_valid = true; + exec_ctx->now = GRPC_MILLIS_INF_FUTURE; + if (grpc_timer_check(exec_ctx, NULL) == GRPC_TIMERS_FIRED) { gpr_mu_unlock(&g_mu); grpc_exec_ctx_flush(exec_ctx); grpc_iomgr_platform_flush(); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index e3cd6ebe79..fea08496fe 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -22,6 +22,10 @@ #include <grpc/impl/codegen/exec_ctx_fwd.h> #include "src/core/lib/iomgr/port.h" +#ifdef __cplusplus +extern "C" { +#endif + /** Initializes the iomgr. */ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx); @@ -32,4 +36,8 @@ void grpc_iomgr_start(grpc_exec_ctx *exec_ctx); * exec_ctx. */ void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx); -#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index 836d82515d..005abbed13 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -23,6 +23,10 @@ #include "src/core/lib/iomgr/iomgr.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_iomgr_object { char *name; struct grpc_iomgr_object *next; @@ -40,4 +44,8 @@ void grpc_iomgr_platform_shutdown(void); bool grpc_iomgr_abort_on_leaks(void); -#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.cc index f5875a247e..f5875a247e 100644 --- a/src/core/lib/iomgr/iomgr_posix.c +++ b/src/core/lib/iomgr/iomgr_posix.cc diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.cc index df5d23af3b..df5d23af3b 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.cc diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h index 3b4daaa73b..bc42ca8c1c 100644 --- a/src/core/lib/iomgr/iomgr_uv.h +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -23,10 +23,18 @@ #include <grpc/support/thd.h> +#ifdef __cplusplus +extern "C" { +#endif + /* The thread ID of the thread on which grpc was initialized. Used to verify * that all calls into libuv are made on that same thread */ extern gpr_thd_id g_init_thread; +#ifdef __cplusplus +} +#endif + #ifdef GRPC_UV_THREAD_CHECK #define GRPC_UV_ASSERT_SAME_THREAD() \ GPR_ASSERT(gpr_thd_currentid() == g_init_thread) diff --git a/src/core/lib/iomgr/iomgr_windows.c b/src/core/lib/iomgr/iomgr_windows.cc index 630370166d..630370166d 100644 --- a/src/core/lib/iomgr/iomgr_windows.c +++ b/src/core/lib/iomgr/iomgr_windows.cc diff --git a/src/core/lib/iomgr/is_epollexclusive_available.c b/src/core/lib/iomgr/is_epollexclusive_available.cc index d08844c0df..d08844c0df 100644 --- a/src/core/lib/iomgr/is_epollexclusive_available.c +++ b/src/core/lib/iomgr/is_epollexclusive_available.cc diff --git a/src/core/lib/iomgr/is_epollexclusive_available.h b/src/core/lib/iomgr/is_epollexclusive_available.h index 1d2e133a3b..5c3e483065 100644 --- a/src/core/lib/iomgr/is_epollexclusive_available.h +++ b/src/core/lib/iomgr/is_epollexclusive_available.h @@ -21,6 +21,14 @@ #include <stdbool.h> +#ifdef __cplusplus +extern "C" { +#endif + bool grpc_is_epollexclusive_available(void); -#endif /* GRPC_CORE_LIB_IOMGR_IS_EPOLLEXCLUSIVE_AVAILABLE_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_IS_EPOLLEXCLUSIVE_AVAILABLE_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/load_file.c b/src/core/lib/iomgr/load_file.cc index 0b4d41ea4b..5cb4099ea4 100644 --- a/src/core/lib/iomgr/load_file.c +++ b/src/core/lib/iomgr/load_file.cc @@ -25,7 +25,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/support/string.h" grpc_error *grpc_load_file(const char *filename, int add_null_terminator, @@ -73,6 +73,6 @@ end: GRPC_ERROR_UNREF(error); error = error_out; } - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; return error; } diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.cc index f967b22ba9..f967b22ba9 100644 --- a/src/core/lib/iomgr/lockfree_event.c +++ b/src/core/lib/iomgr/lockfree_event.cc diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h index 6a14a0f3b2..925f004945 100644 --- a/src/core/lib/iomgr/lockfree_event.h +++ b/src/core/lib/iomgr/lockfree_event.h @@ -25,6 +25,10 @@ #include "src/core/lib/iomgr/exec_ctx.h" +#ifdef __cplusplus +extern "C" { +#endif + void grpc_lfev_init(gpr_atm *state); void grpc_lfev_destroy(gpr_atm *state); bool grpc_lfev_is_shutdown(gpr_atm *state); @@ -37,4 +41,8 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state, const char *variable); -#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.cc index 4e5c1d5408..57a7faa9f1 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.cc @@ -16,6 +16,7 @@ * */ +#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/endpoint.h" void grpc_network_status_shutdown(void) {} diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h index c0295c1f74..af50d51257 100644 --- a/src/core/lib/iomgr/network_status_tracker.h +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -20,6 +20,10 @@ #define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H #include "src/core/lib/iomgr/endpoint.h" +#ifdef __cplusplus +extern "C" { +#endif + void grpc_network_status_init(void); void grpc_network_status_shutdown(void); @@ -27,4 +31,8 @@ void grpc_network_status_register_endpoint(grpc_endpoint *ep); void grpc_network_status_unregister_endpoint(grpc_endpoint *ep); void grpc_network_status_shutdown_all_endpoints(); -#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/polling_entity.c b/src/core/lib/iomgr/polling_entity.cc index 8591a5518e..8591a5518e 100644 --- a/src/core/lib/iomgr/polling_entity.c +++ b/src/core/lib/iomgr/polling_entity.cc diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h index a161e1fea6..4a37acf212 100644 --- a/src/core/lib/iomgr/polling_entity.h +++ b/src/core/lib/iomgr/polling_entity.h @@ -22,6 +22,10 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef enum grpc_pollset_tag { GRPC_POLLS_NONE, GRPC_POLLS_POLLSET, @@ -64,4 +68,8 @@ void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx, void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx, grpc_polling_entity *pollent, grpc_pollset_set *pss_dst); -#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index a0f6b3a9d3..799fae154c 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -25,6 +25,10 @@ #include "src/core/lib/iomgr/exec_ctx.h" +#ifdef __cplusplus +extern "C" { +#endif + #ifndef NDEBUG extern grpc_tracer_flag grpc_trace_fd_refcount; #endif @@ -71,8 +75,8 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); pollset lock */ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) GRPC_MUST_USE_RESULT; + grpc_pollset_worker **worker, + grpc_millis deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is non-NULL, then kick that worker. */ @@ -80,4 +84,8 @@ grpc_error *grpc_pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) GRPC_MUST_USE_RESULT; +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */ diff --git a/src/core/lib/iomgr/pollset_set.h b/src/core/lib/iomgr/pollset_set.h index 29c0f03561..17df86542d 100644 --- a/src/core/lib/iomgr/pollset_set.h +++ b/src/core/lib/iomgr/pollset_set.h @@ -21,6 +21,10 @@ #include "src/core/lib/iomgr/pollset.h" +#ifdef __cplusplus +extern "C" { +#endif + /* A grpc_pollset_set is a set of pollsets that are interested in an action. Adding a pollset to a pollset_set automatically adds any fd's (etc) that have been registered with the set_set to that pollset. @@ -44,4 +48,8 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, grpc_pollset_set *bag, grpc_pollset_set *item); -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/pollset_set_uv.c b/src/core/lib/iomgr/pollset_set_uv.cc index 90186edbb7..90186edbb7 100644 --- a/src/core/lib/iomgr/pollset_set_uv.c +++ b/src/core/lib/iomgr/pollset_set_uv.cc diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.cc index 2105a47ad4..2105a47ad4 100644 --- a/src/core/lib/iomgr/pollset_set_windows.c +++ b/src/core/lib/iomgr/pollset_set_windows.cc diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.cc index 2651325e25..b9901bf8ef 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -65,7 +65,7 @@ void dummy_handle_close_cb(uv_handle_t *handle) { gpr_free(handle); } void grpc_pollset_global_init(void) { gpr_mu_init(&grpc_polling_mu); - dummy_uv_handle = gpr_malloc(sizeof(uv_timer_t)); + dummy_uv_handle = (uv_timer_t *)gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), dummy_uv_handle); grpc_pollset_work_run_loop = 1; } @@ -116,13 +116,14 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { uint64_t timeout; GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { - if (gpr_time_cmp(deadline, now) >= 0) { - timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline >= now) { + timeout = deadline - now; } else { timeout = 0; } diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h index 566c110ca6..d8f72ff867 100644 --- a/src/core/lib/iomgr/pollset_uv.h +++ b/src/core/lib/iomgr/pollset_uv.h @@ -19,9 +19,17 @@ #ifndef GRPC_CORE_LIB_IOMGR_POLLSET_UV_H #define GRPC_CORE_LIB_IOMGR_POLLSET_UV_H +#ifdef __cplusplus +extern "C" { +#endif + extern int grpc_pollset_work_run_loop; void grpc_pollset_global_init(void); void grpc_pollset_global_shutdown(void); -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.cc index eb295d3eeb..bb4df83fc1 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -110,7 +110,7 @@ void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {} grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, - gpr_timespec now, gpr_timespec deadline) { + grpc_millis deadline) { grpc_pollset_worker worker; if (worker_hdl) *worker_hdl = &worker; @@ -159,7 +159,8 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, &worker); added_worker = 1; while (!worker.kicked) { - if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, deadline)) { + if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, + grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { break; } } diff --git a/src/core/lib/iomgr/pollset_windows.h b/src/core/lib/iomgr/pollset_windows.h index 71878c3d30..7733d26471 100644 --- a/src/core/lib/iomgr/pollset_windows.h +++ b/src/core/lib/iomgr/pollset_windows.h @@ -23,6 +23,10 @@ #include "src/core/lib/iomgr/socket_windows.h" +#ifdef __cplusplus +extern "C" { +#endif + /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. A Windows "pollset" is merely a mutex used to synchronize with the IOCP, and workers are condition variables @@ -60,4 +64,8 @@ struct grpc_pollset { void grpc_pollset_global_init(void); void grpc_pollset_global_shutdown(void); -#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index fe1dd78576..4a6df2cf26 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -25,6 +25,10 @@ #define GRPC_MAX_SOCKADDR_SIZE 128 +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { char addr[GRPC_MAX_SOCKADDR_SIZE]; size_t len; @@ -52,4 +56,8 @@ extern grpc_error *(*grpc_blocking_resolve_address)( const char *name, const char *default_port, grpc_resolved_addresses **addresses); -#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.cc index 60cfeebd47..1b783495df 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -33,10 +33,10 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" static grpc_error *blocking_resolve_address_impl( @@ -81,7 +81,7 @@ static grpc_error *blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (s != 0) { /* Retry if well-known service name is recognized */ @@ -90,7 +90,7 @@ static grpc_error *blocking_resolve_address_impl( if (strcmp(port, svc[i][0]) == 0) { GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, svc[i][1], &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; break; } } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.cc index 2d438e8b48..4f7f234877 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.cc @@ -49,11 +49,12 @@ static int retry_named_port_failure(int status, request *r, uv_getaddrinfo_cb getaddrinfo_cb) { if (status != 0) { // This loop is copied from resolve_address_posix.c - char *svc[][2] = {{"http", "80"}, {"https", "443"}}; + const char *svc[][2] = {{"http", "80"}, {"https", "443"}}; for (size_t i = 0; i < GPR_ARRAY_SIZE(svc); i++) { if (strcmp(r->port, svc[i][0]) == 0) { int retry_status; - uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t)); + uv_getaddrinfo_t *req = + (uv_getaddrinfo_t *)gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; r->port = gpr_strdup(svc[i][1]); retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb, @@ -85,13 +86,14 @@ static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result, grpc_slice_from_static_string(uv_strerror(status))); return error; } - (*addresses) = gpr_malloc(sizeof(grpc_resolved_addresses)); + (*addresses) = + (grpc_resolved_addresses *)gpr_malloc(sizeof(grpc_resolved_addresses)); (*addresses)->naddrs = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { (*addresses)->naddrs++; } - (*addresses)->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs); + (*addresses)->addrs = (grpc_resolved_address *)gpr_malloc( + sizeof(grpc_resolved_address) * (*addresses)->naddrs); i = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); @@ -174,6 +176,7 @@ static grpc_error *blocking_resolve_address_impl( int s; grpc_error *err; int retry_status; + request r; GRPC_UV_ASSERT_SAME_THREAD(); @@ -191,8 +194,10 @@ static grpc_error *blocking_resolve_address_impl( hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints); - request r = { - .addresses = addresses, .hints = &hints, .host = host, .port = port}; + r.addresses = addresses; + r.hints = &hints; + r.host = host; + r.port = port; retry_status = retry_named_port_failure(s, &r, NULL); if (retry_status <= 0) { s = retry_status; @@ -239,16 +244,16 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, gpr_free(port); return; } - r = gpr_malloc(sizeof(request)); + r = (request *)gpr_malloc(sizeof(request)); r->on_done = on_done; r->addresses = addrs; r->host = host; r->port = port; - req = gpr_malloc(sizeof(uv_getaddrinfo_t)); + req = (uv_getaddrinfo_t *)gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; /* Call getaddrinfo */ - hints = gpr_malloc(sizeof(struct addrinfo)); + hints = (addrinfo *)gpr_malloc(sizeof(struct addrinfo)); memset(hints, 0, sizeof(struct addrinfo)); hints->ai_family = AF_UNSPEC; /* ipv4 or ipv6 */ hints->ai_socktype = SOCK_STREAM; /* stream socket */ diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.cc index 0cb0029f4e..451f01a701 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -23,6 +23,7 @@ #include "src/core/lib/iomgr/resolve_address.h" +#include <inttypes.h> #include <string.h> #include <sys/types.h> @@ -33,10 +34,10 @@ #include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" typedef struct { @@ -86,20 +87,21 @@ static grpc_error *blocking_resolve_address_impl( GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result); - GRPC_SCHEDULING_END_BLOCKING_REGION; + GRPC_SCHEDULING_END_BLOCKING_REGION_NO_EXEC_CTX; if (s != 0) { error = GRPC_WSA_ERROR(WSAGetLastError(), "getaddrinfo"); goto done; } /* Success path: set addrs non-NULL, fill it in */ - (*addresses) = gpr_malloc(sizeof(grpc_resolved_addresses)); + (*addresses) = + (grpc_resolved_addresses *)gpr_malloc(sizeof(grpc_resolved_addresses)); (*addresses)->naddrs = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { (*addresses)->naddrs++; } - (*addresses)->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs); + (*addresses)->addrs = (grpc_resolved_address *)gpr_malloc( + sizeof(grpc_resolved_address) * (*addresses)->naddrs); i = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); @@ -132,7 +134,7 @@ grpc_error *(*grpc_blocking_resolve_address)( * grpc_blocking_resolve_address */ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, grpc_error *error) { - request *r = rp; + request *r = (request *)rp; if (error == GRPC_ERROR_NONE) { error = grpc_blocking_resolve_address(r->name, r->default_port, r->addresses); @@ -157,7 +159,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_pollset_set *interested_parties, grpc_closure *on_done, grpc_resolved_addresses **addresses) { - request *r = gpr_malloc(sizeof(request)); + request *r = (request *)gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.cc index 4d69986fbc..ecb5747da8 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.cc @@ -18,6 +18,7 @@ #include "src/core/lib/iomgr/resource_quota.h" +#include <inttypes.h> #include <limits.h> #include <stdint.h> #include <string.h> @@ -88,6 +89,8 @@ struct grpc_resource_user { grpc_closure_list on_allocated; /* True if we are currently trying to allocate from the quota, false if not */ bool allocating; + /* How many bytes of allocations are outstanding */ + int64_t outstanding_allocations; /* True if we are currently trying to add ourselves to the non-free quota list, false otherwise */ bool added_to_free_pool; @@ -152,6 +155,9 @@ struct grpc_resource_quota { char *name; }; +static void ru_unref_by(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, gpr_atm amount); + /******************************************************************************* * list management */ @@ -288,6 +294,25 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) { gpr_mu_lock(&resource_user->mu); + if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + gpr_log(GPR_DEBUG, "RQ: check allocation for user %p shutdown=%" PRIdPTR + " free_pool=%" PRId64, + resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown), + resource_user->free_pool); + } + if (gpr_atm_no_barrier_load(&resource_user->shutdown)) { + resource_user->allocating = false; + grpc_closure_list_fail_all( + &resource_user->on_allocated, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown")); + int64_t aborted_allocations = resource_user->outstanding_allocations; + resource_user->outstanding_allocations = 0; + resource_user->free_pool += aborted_allocations; + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); + gpr_mu_unlock(&resource_user->mu); + ru_unref_by(exec_ctx, resource_user, (gpr_atm)aborted_allocations); + continue; + } if (resource_user->free_pool < 0 && -resource_user->free_pool <= resource_quota->free_pool) { int64_t amt = -resource_user->free_pool; @@ -307,6 +332,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, } if (resource_user->free_pool >= 0) { resource_user->allocating = false; + resource_user->outstanding_allocations = 0; GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); } else { @@ -487,6 +513,9 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, } static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { + if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { + gpr_log(GPR_DEBUG, "RU shutdown %p", ru); + } grpc_resource_user *resource_user = (grpc_resource_user *)ru; GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); @@ -496,6 +525,9 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { resource_user->reclaimers[1] = NULL; rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); + if (resource_user->allocating) { + rq_step_sched(exec_ctx, resource_user->resource_quota); + } } static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { @@ -717,6 +749,7 @@ grpc_resource_user *grpc_resource_user_create( resource_user->reclaimers[1] = NULL; resource_user->new_reclaimers[0] = NULL; resource_user->new_reclaimers[1] = NULL; + resource_user->outstanding_allocations = 0; for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_user->links[i].next = resource_user->links[i].prev = NULL; } @@ -777,6 +810,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&resource_user->mu); ru_ref_by(resource_user, (gpr_atm)size); resource_user->free_pool -= (int64_t)size; + resource_user->outstanding_allocations += (int64_t)size; if (GRPC_TRACER_ON(grpc_resource_quota_trace)) { gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, @@ -791,6 +825,7 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); } } else { + resource_user->outstanding_allocations -= (int64_t)size; GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index d66f9ae774..3afb525434 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -24,6 +24,10 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/exec_ctx.h" +#ifdef __cplusplus +extern "C" { +#endif + /** \file Tracks resource usage against a pool. The current implementation tracks only memory usage, but in the future @@ -150,4 +154,8 @@ grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, size_t size); -#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.cc index 3f4145d104..8a2e6ed89b 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.cc @@ -19,6 +19,7 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include <errno.h> +#include <inttypes.h> #include <string.h> #include <grpc/support/alloc.h> diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index a589a19705..129bb54fc9 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -21,6 +21,10 @@ #include "src/core/lib/iomgr/resolve_address.h" +#ifdef __cplusplus +extern "C" { +#endif + /* Returns true if addr is an IPv4-mapped IPv6 address within the ::ffff:0.0.0.0/96 range, or false otherwise. @@ -77,4 +81,8 @@ const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr); int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr); -#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/socket_factory_posix.c b/src/core/lib/iomgr/socket_factory_posix.cc index 8e907703ae..8e907703ae 100644 --- a/src/core/lib/iomgr/socket_factory_posix.c +++ b/src/core/lib/iomgr/socket_factory_posix.cc diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.cc index b0435d5a07..b0435d5a07 100644 --- a/src/core/lib/iomgr/socket_mutator.c +++ b/src/core/lib/iomgr/socket_mutator.cc diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h index 03fe46e5e9..f319e931b6 100644 --- a/src/core/lib/iomgr/socket_utils.h +++ b/src/core/lib/iomgr/socket_utils.h @@ -21,7 +21,15 @@ #include <stddef.h> +#ifdef __cplusplus +extern "C" { +#endif + /* A wrapper for inet_ntop on POSIX systems and InetNtop on Windows systems */ const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size); -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.cc index b8e2a0cdfd..b8e2a0cdfd 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc diff --git a/src/core/lib/iomgr/socket_utils_linux.c b/src/core/lib/iomgr/socket_utils_linux.cc index e7b094d216..e7b094d216 100644 --- a/src/core/lib/iomgr/socket_utils_linux.c +++ b/src/core/lib/iomgr/socket_utils_linux.cc diff --git a/src/core/lib/iomgr/socket_utils_posix.c b/src/core/lib/iomgr/socket_utils_posix.cc index dfd1ffd1e3..dfd1ffd1e3 100644 --- a/src/core/lib/iomgr/socket_utils_posix.c +++ b/src/core/lib/iomgr/socket_utils_posix.cc diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index eef80b439e..623b83f08b 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -29,6 +29,10 @@ #include "src/core/lib/iomgr/socket_factory_posix.h" #include "src/core/lib/iomgr/socket_mutator.h" +#ifdef __cplusplus +extern "C" { +#endif + /* a wrapper for accept or accept4 */ int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock, int cloexec); @@ -129,4 +133,8 @@ grpc_error *grpc_create_dualstack_socket_using_factory( grpc_socket_factory *factory, const grpc_resolved_address *addr, int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd); -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/socket_utils_uv.c b/src/core/lib/iomgr/socket_utils_uv.cc index 0f7de4dfad..0f7de4dfad 100644 --- a/src/core/lib/iomgr/socket_utils_uv.c +++ b/src/core/lib/iomgr/socket_utils_uv.cc diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.cc index 6e85e4b61f..6e85e4b61f 100644 --- a/src/core/lib/iomgr/socket_utils_windows.c +++ b/src/core/lib/iomgr/socket_utils_windows.cc diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.cc index a0d731b942..8c7f7cf683 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.cc @@ -38,7 +38,7 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) { char *final_name; - grpc_winsocket *r = gpr_malloc(sizeof(grpc_winsocket)); + grpc_winsocket *r = (grpc_winsocket *)gpr_malloc(sizeof(grpc_winsocket)); memset(r, 0, sizeof(grpc_winsocket)); r->socket = socket; gpr_mu_init(&r->state_mu); diff --git a/src/core/lib/iomgr/socket_windows.h b/src/core/lib/iomgr/socket_windows.h index 67dc4ca53e..a00a7615a3 100644 --- a/src/core/lib/iomgr/socket_windows.h +++ b/src/core/lib/iomgr/socket_windows.h @@ -28,6 +28,10 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#ifdef __cplusplus +extern "C" { +#endif + /* This holds the data for an outstanding read or write on a socket. The mutex to protect the concurrent access to that data is the one inside the winsocket wrapper. */ @@ -107,4 +111,8 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *winsocket, grpc_winsocket_callback_info *ci); -#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_WINDOWS_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 6c9e51ae84..1b102b5784 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -25,6 +25,10 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resolve_address.h" +#ifdef __cplusplus +extern "C" { +#endif + /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure). @@ -35,6 +39,10 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline); + grpc_millis deadline); + +#ifdef __cplusplus +} +#endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */ +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.cc index 39dbb506e2..5611dd9062 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -48,7 +48,6 @@ extern grpc_tracer_flag grpc_tcp_trace; typedef struct { gpr_mu mu; grpc_fd *fd; - gpr_timespec deadline; grpc_timer alarm; grpc_closure on_alarm; int refs; @@ -244,7 +243,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -325,9 +324,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&ac->mu); GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); @@ -337,18 +334,20 @@ done: } // overridden by api_fuzzer.c +extern "C" { void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; +} void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_client_posix.h b/src/core/lib/iomgr/tcp_client_posix.h index b5a3814799..0b9775504c 100644 --- a/src/core/lib/iomgr/tcp_client_posix.h +++ b/src/core/lib/iomgr/tcp_client_posix.h @@ -23,8 +23,16 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/tcp_client.h" +#ifdef __cplusplus +extern "C" { +#endif + grpc_endpoint *grpc_tcp_client_create_from_fd( grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, const char *addr_str); -#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_POSIX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.cc index 786c456b73..f3e9366299 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.cc @@ -58,7 +58,7 @@ static void tcp_close_callback(uv_handle_t *handle) { gpr_free(handle); } static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { int done; - grpc_uv_tcp_connect *connect = acp; + grpc_uv_tcp_connect *connect = (grpc_uv_tcp_connect *)acp; if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", @@ -77,7 +77,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, } static void uv_tc_on_connect(uv_connect_t *req, int status) { - grpc_uv_tcp_connect *connect = req->data; + grpc_uv_tcp_connect *connect = (grpc_uv_tcp_connect *)req->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *error = GRPC_ERROR_NONE; int done; @@ -119,7 +119,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *resolved_addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_uv_tcp_connect *connect; grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); (void)channel_args; @@ -132,20 +132,20 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { grpc_resource_quota_unref_internal(exec_ctx, resource_quota); resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); + (grpc_resource_quota *)channel_args->args[i].value.pointer.p); } } } - connect = gpr_zalloc(sizeof(grpc_uv_tcp_connect)); + connect = (grpc_uv_tcp_connect *)gpr_zalloc(sizeof(grpc_uv_tcp_connect)); connect->closure = closure; connect->endpoint = ep; - connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t)); + connect->tcp_handle = (uv_tcp_t *)gpr_malloc(sizeof(uv_tcp_t)); connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); connect->resource_quota = resource_quota; uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; - connect->refs = 1; + connect->refs = 2; // One for the connect operation, one for the timer. if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", @@ -158,24 +158,24 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, uv_tc_on_connect); GRPC_CLOSURE_INIT(&connect->on_alarm, uv_tc_on_alarm, connect, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &connect->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - &connect->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &connect->alarm, deadline, &connect->on_alarm); } // overridden by api_fuzzer.c +extern "C" { void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; +} void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.cc index fc62105cc9..9adf7ee4e9 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.cc @@ -18,6 +18,8 @@ #include "src/core/lib/iomgr/port.h" +#include <inttypes.h> + #ifdef GRPC_WINSOCK_SOCKET #include "src/core/lib/iomgr/sockaddr_windows.h" @@ -41,7 +43,6 @@ typedef struct { grpc_closure *on_done; gpr_mu mu; grpc_winsocket *socket; - gpr_timespec deadline; grpc_timer alarm; grpc_closure on_alarm; char *addr_name; @@ -66,7 +67,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, } static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { - async_connect *ac = acp; + async_connect *ac = (async_connect *)acp; gpr_mu_lock(&ac->mu); grpc_winsocket *socket = ac->socket; ac->socket = NULL; @@ -77,7 +78,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { } static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { - async_connect *ac = acp; + async_connect *ac = (async_connect *)acp; grpc_endpoint **ep = ac->endpoint; GPR_ASSERT(*ep == NULL); grpc_closure *on_done = ac->on_done; @@ -124,7 +125,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { static void tcp_client_connect_impl( grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, - const grpc_resolved_address *addr, gpr_timespec deadline) { + const grpc_resolved_address *addr, grpc_millis deadline) { SOCKET sock = INVALID_SOCKET; BOOL success; int status; @@ -193,7 +194,7 @@ static void tcp_client_connect_impl( } } - ac = gpr_malloc(sizeof(async_connect)); + ac = (async_connect *)gpr_malloc(sizeof(async_connect)); ac->on_done = on_done; ac->socket = socket; gpr_mu_init(&ac->mu); @@ -204,8 +205,7 @@ static void tcp_client_connect_impl( GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm, - gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm); grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect); return; @@ -226,18 +226,20 @@ failure: } // overridden by api_fuzzer.c +extern "C" { void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) = tcp_client_connect_impl; + grpc_millis deadline) = tcp_client_connect_impl; +} void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, const grpc_resolved_address *addr, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, channel_args, addr, deadline); } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.cc index 7e271294fd..7fcaef7679 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -135,13 +135,11 @@ static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec deadline = - gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN)); + grpc_millis deadline = grpc_exec_ctx_now(exec_ctx) + 13 * GPR_MS_PER_SEC; GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx); - GRPC_LOG_IF_ERROR("backup_poller:pollset_work", - grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, - now, deadline)); + GRPC_LOG_IF_ERROR( + "backup_poller:pollset_work", + grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, deadline)); gpr_mu_unlock(p->pollset_mu); /* last "uncovered" notification is the ref that keeps us polling, if we get * there try a cas to release it */ diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index 6831a4a57f..dda78b2f8e 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -33,6 +33,10 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" +#ifdef __cplusplus +extern "C" { +#endif + extern grpc_tracer_flag grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. @@ -53,4 +57,8 @@ int grpc_tcp_fd(grpc_endpoint *ep); void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int *fd, grpc_closure *done); -#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_POSIX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 8a126b6dee..3f190ac285 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -25,6 +25,10 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" +#ifdef __cplusplus +extern "C" { +#endif + /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; @@ -98,4 +102,8 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s); void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s); -#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.cc index 06612d639c..06612d639c 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.cc diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 85dea515d9..4bb0660f09 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -24,6 +24,10 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_server.h" +#ifdef __cplusplus +extern "C" { +#endif + /* one listening port */ typedef struct grpc_tcp_listener { int fd; @@ -117,4 +121,8 @@ grpc_error *grpc_tcp_server_prepare_socket(int fd, /* Ruturn true if the platform supports ifaddrs */ bool grpc_tcp_server_have_ifaddrs(void); -#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index a828bee074..a828bee074 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc index a243b69f35..a243b69f35 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.cc index 34eab20d6a..34eab20d6a 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.cc diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.cc index 3b9332321f..348838c495 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.cc @@ -77,14 +77,14 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server) { - grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + grpc_tcp_server *s = (grpc_tcp_server *)gpr_malloc(sizeof(grpc_tcp_server)); s->resource_quota = grpc_resource_quota_create(NULL); for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_POINTER) { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - s->resource_quota = - grpc_resource_quota_ref_internal(args->args[i].value.pointer.p); + s->resource_quota = grpc_resource_quota_ref_internal( + (grpc_resource_quota *)args->args[i].value.pointer.p); } else { grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); @@ -190,15 +190,16 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } static void finish_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp) { - grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); - uv_tcp_t *client; + grpc_tcp_server_acceptor *acceptor = + (grpc_tcp_server_acceptor *)gpr_malloc(sizeof(*acceptor)); + uv_tcp_t *client = NULL; grpc_endpoint *ep = NULL; grpc_resolved_address peer_name; char *peer_name_string; int err; uv_tcp_t *server = sp->handle; - client = gpr_malloc(sizeof(uv_tcp_t)); + client = (uv_tcp_t *)gpr_malloc(sizeof(uv_tcp_t)); uv_tcp_init(uv_default_loop(), client); // UV documentation says this is guaranteed to succeed uv_accept((uv_stream_t *)server, (uv_stream_t *)client); @@ -303,7 +304,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, GPR_ASSERT(port >= 0); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_zalloc(sizeof(grpc_tcp_listener)); + sp = (grpc_tcp_listener *)gpr_zalloc(sizeof(grpc_tcp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -355,7 +356,8 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, (int *)&sockname_temp.len)) { *port = grpc_sockaddr_get_port(&sockname_temp); if (*port > 0) { - allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); + allocated_addr = (grpc_resolved_address *)gpr_malloc( + sizeof(grpc_resolved_address)); memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); grpc_sockaddr_set_port(allocated_addr, *port); addr = allocated_addr; @@ -376,7 +378,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, addr = &wildcard; } - handle = gpr_malloc(sizeof(uv_tcp_t)); + handle = (uv_tcp_t *)gpr_malloc(sizeof(uv_tcp_t)); family = grpc_sockaddr_get_family(addr); status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.cc index 0162afc1ad..f198aaaa5b 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.cc @@ -22,6 +22,7 @@ #include "src/core/lib/iomgr/sockaddr.h" +#include <inttypes.h> #include <io.h> #include <grpc/support/alloc.h> @@ -97,7 +98,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server) { - grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + grpc_tcp_server *s = (grpc_tcp_server *)gpr_malloc(sizeof(grpc_tcp_server)); s->channel_args = grpc_channel_args_copy(args); gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); @@ -115,7 +116,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_tcp_server *s = arg; + grpc_tcp_server *s = (grpc_tcp_server *)arg; /* Now that the accepts have been aborted, we can destroy the sockets. The IOCP won't get notified on these, so we can flag them as already @@ -188,6 +189,7 @@ static grpc_error *prepare_socket(SOCKET sock, int *port) { grpc_resolved_address sockname_temp; grpc_error *error = GRPC_ERROR_NONE; + int sockname_temp_len; error = grpc_tcp_prepare_socket(sock); if (error != GRPC_ERROR_NONE) { @@ -205,7 +207,7 @@ static grpc_error *prepare_socket(SOCKET sock, goto failure; } - int sockname_temp_len = sizeof(struct sockaddr_storage); + sockname_temp_len = sizeof(struct sockaddr_storage); if (getsockname(sock, (struct sockaddr *)sockname_temp.addr, &sockname_temp_len) == SOCKET_ERROR) { error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname"); @@ -294,7 +296,7 @@ failure: /* Event manager callback when reads are ready. */ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_tcp_listener *sp = arg; + grpc_tcp_listener *sp = (grpc_tcp_listener *)arg; SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; @@ -368,7 +370,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { managed to accept a connection, and created an endpoint. */ if (ep) { // Create acceptor. - grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); + grpc_tcp_server_acceptor *acceptor = + (grpc_tcp_server_acceptor *)gpr_malloc(sizeof(*acceptor)); acceptor->from_server = sp->server; acceptor->port_index = sp->port_index; acceptor->fd_index = 0; @@ -421,7 +424,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, GPR_ASSERT(port >= 0); gpr_mu_lock(&s->mu); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp = (grpc_tcp_listener *)gpr_malloc(sizeof(grpc_tcp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -472,7 +475,8 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, sockname_temp.len = (size_t)sockname_temp_len; *port = grpc_sockaddr_get_port(&sockname_temp); if (*port > 0) { - allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); + allocated_addr = (grpc_resolved_address *)gpr_malloc( + sizeof(grpc_resolved_address)); memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); grpc_sockaddr_set_port(allocated_addr, *port); addr = allocated_addr; diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.cc index a05c19b4ac..e311964dbc 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -114,7 +114,7 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } static void uv_close_callback(uv_handle_t *handle) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_tcp *tcp = handle->data; + grpc_tcp *tcp = (grpc_tcp *)handle->data; TCP_UNREF(&exec_ctx, tcp, "destroy"); grpc_exec_ctx_finish(&exec_ctx); } @@ -128,7 +128,7 @@ static grpc_slice alloc_read_slice(grpc_exec_ctx *exec_ctx, static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_tcp *tcp = handle->data; + grpc_tcp *tcp = (grpc_tcp *)handle->data; (void)suggested_size; buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); @@ -140,7 +140,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, grpc_slice sub; grpc_error *error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_tcp *tcp = stream->data; + grpc_tcp *tcp = (grpc_tcp *)stream->data; grpc_closure *cb = tcp->read_cb; if (nread == 0) { // Nothing happened. Wait for the next callback @@ -207,7 +207,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } static void write_callback(uv_write_t *req, int status) { - grpc_tcp *tcp = req->data; + grpc_tcp *tcp = (grpc_tcp *)req->data; grpc_error *error; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure *cb = tcp->write_cb; @@ -269,7 +269,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; - buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); + buffers = (uv_buf_t *)gpr_malloc(sizeof(uv_buf_t) * buffer_count); grpc_resource_user_alloc(exec_ctx, tcp->resource_user, sizeof(uv_buf_t) * buffer_count, NULL); for (i = 0; i < buffer_count; i++) { diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h index 0e67481d35..ba7db8a0f7 100644 --- a/src/core/lib/iomgr/tcp_uv.h +++ b/src/core/lib/iomgr/tcp_uv.h @@ -38,8 +38,16 @@ extern grpc_tracer_flag grpc_tcp_trace; #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 +#ifdef __cplusplus +extern "C" { +#endif + grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, char *peer_string); -#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.cc index 2cbb97403b..dc84e564a9 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -37,6 +37,7 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_windows.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" @@ -158,7 +159,7 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } /* Asynchronous callback from the IOCP, or the background thread. */ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { - grpc_tcp *tcp = tcpp; + grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_closure *cb = tcp->read_cb; grpc_winsocket *socket = tcp->socket; grpc_slice sub; @@ -418,14 +419,14 @@ static grpc_endpoint_vtable vtable = { grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_channel_args *channel_args, - char *peer_string) { + const char *peer_string) { grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { grpc_resource_quota_unref_internal(exec_ctx, resource_quota); resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); + (grpc_resource_quota *)channel_args->args[i].value.pointer.p); } } } diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h index 864184ce84..f3697f707c 100644 --- a/src/core/lib/iomgr/tcp_windows.h +++ b/src/core/lib/iomgr/tcp_windows.h @@ -32,13 +32,21 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/socket_windows.h" +#ifdef __cplusplus +extern "C" { +#endif + /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_channel_args *channel_args, - char *peer_string); + const char *peer_string); grpc_error *grpc_tcp_prepare_socket(SOCKET sock); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_TCP_WINDOWS_H */ diff --git a/src/core/lib/iomgr/time_averaged_stats.c b/src/core/lib/iomgr/time_averaged_stats.cc index 3bddec04dd..3bddec04dd 100644 --- a/src/core/lib/iomgr/time_averaged_stats.c +++ b/src/core/lib/iomgr/time_averaged_stats.cc diff --git a/src/core/lib/iomgr/time_averaged_stats.h b/src/core/lib/iomgr/time_averaged_stats.h index 8745f7fa13..e255b58fee 100644 --- a/src/core/lib/iomgr/time_averaged_stats.h +++ b/src/core/lib/iomgr/time_averaged_stats.h @@ -19,6 +19,10 @@ #ifndef GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H #define GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H +#ifdef __cplusplus +extern "C" { +#endif + /* This tracks a time-decaying weighted average. It works by collecting batches of samples and then mixing their average into a time-decaying weighted mean. It is designed for batch operations where we do many adds @@ -70,4 +74,8 @@ void grpc_time_averaged_stats_add_sample(grpc_time_averaged_stats* stats, value. */ double grpc_time_averaged_stats_update_average(grpc_time_averaged_stats* stats); -#endif /* GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TIME_AVERAGED_STATS_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index ac392f87fe..419e834cf1 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -32,6 +32,10 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_timer grpc_timer; /* Initialize *timer. When expired or canceled, closure will be called with @@ -41,8 +45,7 @@ typedef struct grpc_timer grpc_timer; application callback is also responsible for maintaining information about when to free up any user-level state. */ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now); + grpc_millis deadline, grpc_closure *closure); /* Initialize *timer without setting it. This can later be passed through the regular init or cancel */ @@ -92,8 +95,8 @@ typedef enum { with high probability at least one thread in the system will see an update at any time slice. */ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next); -void grpc_timer_list_init(gpr_timespec now); + grpc_millis *next); +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx); void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); /* Consume a kick issued by grpc_kick_poller */ @@ -103,4 +106,8 @@ void grpc_timer_consume_kick(void); void grpc_kick_poller(void); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_TIMER_H */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.cc index 2472cf26be..b8e895de6f 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.cc @@ -18,6 +18,8 @@ #include "src/core/lib/iomgr/port.h" +#include <inttypes.h> + #ifdef GRPC_TIMER_USE_GENERIC #include "src/core/lib/iomgr/timer.h" @@ -41,9 +43,11 @@ #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 +extern "C" { grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false, "timer_check"); +} /* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with * deadlines earlier than 'queue_deadline" cap are maintained in the heap and @@ -216,9 +220,6 @@ struct shared_mutables { static struct shared_mutables g_shared_mutables; -static gpr_clock_type g_clock_type; -static gpr_timespec g_start_time; - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -231,52 +232,19 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_atm *next, grpc_error *error); -static gpr_timespec dbl_to_ts(double d) { - gpr_timespec ts; - ts.tv_sec = (int64_t)d; - ts.tv_nsec = (int32_t)(1e9 * (d - (double)ts.tv_sec)); - ts.clock_type = GPR_TIMESPAN; - return ts; -} - -static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = GPR_MS_PER_SEC * (double)ts.tv_sec + - (double)ts.tv_nsec / GPR_NS_PER_MS + - (double)(GPR_NS_PER_SEC - 1) / (double)GPR_NS_PER_SEC; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { - ts = gpr_time_sub(ts, g_start_time); - double x = - GPR_MS_PER_SEC * (double)ts.tv_sec + (double)ts.tv_nsec / GPR_NS_PER_MS; - if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return (gpr_atm)x; -} - -static gpr_timespec atm_to_timespec(gpr_atm x) { - return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); -} - static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; } -void grpc_timer_list_init(gpr_timespec now) { +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) { uint32_t i; g_shared_mutables.initialized = true; g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; gpr_mu_init(&g_shared_mutables.mu); - g_clock_type = now.clock_type; - g_start_time = now; - g_shared_mutables.min_timer = timespec_to_atm_round_down(now); + g_shared_mutables.min_timer = grpc_exec_ctx_now(exec_ctx); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); grpc_register_tracer(&grpc_timer_trace); @@ -313,10 +281,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { g_shared_mutables.initialized = false; } -static double ts_to_dbl(gpr_timespec ts) { - return (double)ts.tv_sec + 1e-9 * ts.tv_nsec; -} - /* returns true if the first element in the list */ static void list_join(grpc_timer *head, grpc_timer *timer) { timer->next = head; @@ -357,24 +321,20 @@ static void note_deadline_change(timer_shard *shard) { void grpc_timer_init_unset(grpc_timer *timer) { timer->pending = false; } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { int is_first_timer = 0; timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; - GPR_ASSERT(deadline.clock_type == g_clock_type); - GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; - gpr_atm deadline_atm = timer->deadline = timespec_to_atm_round_up(deadline); + timer->deadline = deadline; #ifndef NDEBUG timer->hash_table_next = NULL; #endif if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: SET %" PRId64 ".%09d [%" PRIdPTR - "] now %" PRId64 ".%09d [%" PRIdPTR "] call %p[%p]", - timer, deadline.tv_sec, deadline.tv_nsec, deadline_atm, now.tv_sec, - now.tv_nsec, timespec_to_atm_round_down(now), closure, closure->cb); + gpr_log(GPR_DEBUG, + "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", timer, + deadline, grpc_exec_ctx_now(exec_ctx), closure, closure->cb); } if (!g_shared_mutables.initialized) { @@ -387,7 +347,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_mu_lock(&shard->mu); timer->pending = true; - if (gpr_time_cmp(deadline, now) <= 0) { + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (deadline <= now) { timer->pending = false; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); gpr_mu_unlock(&shard->mu); @@ -396,11 +357,11 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } grpc_time_averaged_stats_add_sample(&shard->stats, - ts_to_dbl(gpr_time_sub(deadline, now))); + (double)(deadline - now) / 1000.0); ADD_TO_HASH_TABLE(timer); - if (deadline_atm < shard->queue_deadline_cap) { + if (deadline < shard->queue_deadline_cap) { is_first_timer = grpc_timer_heap_add(&shard->heap, timer); } else { timer->heap_index = INVALID_HEAP_INDEX; @@ -431,12 +392,12 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_log(GPR_DEBUG, " .. old shard min_deadline=%" PRIdPTR, shard->min_deadline); } - if (deadline_atm < shard->min_deadline) { + if (deadline < shard->min_deadline) { gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; - shard->min_deadline = deadline_atm; + shard->min_deadline = deadline; note_deadline_change(shard); - if (shard->shard_queue_index == 0 && deadline_atm < old_min_deadline) { - gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline_atm); + if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { + gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline); grpc_kick_poller(); } } @@ -540,8 +501,9 @@ static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { } if (timer->deadline > now) return NULL; if (GRPC_TRACER_ON(grpc_timer_trace)) { - gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late", timer, - now - timer->deadline); + gpr_log(GPR_DEBUG, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler", + timer, now - timer->deadline, + timer->closure->scheduler->vtable->name); } timer->pending = false; grpc_timer_heap_pop(&shard->heap); @@ -563,6 +525,10 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, } *new_min_deadline = compute_min_deadline(shard); gpr_mu_unlock(&shard->mu); + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, " .. shard[%d] popped %" PRIdPTR, + (int)(shard - g_shards), n); + } return n; } @@ -635,29 +601,27 @@ static grpc_timer_check_result run_some_expired_timers(grpc_exec_ctx *exec_ctx, } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { // prelude - GPR_ASSERT(now.clock_type == g_clock_type); - gpr_atm now_atm = timespec_to_atm_round_down(now); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ - gpr_atm min_timer = gpr_tls_get(&g_last_seen_min_timer); - if (now_atm < min_timer) { + grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer); + if (now < min_timer) { if (next != NULL) { - *next = - atm_to_timespec(GPR_MIN(timespec_to_atm_round_up(*next), min_timer)); + *next = GPR_MIN(*next, min_timer); } if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, - "TIMER CHECK SKIP: now_atm=%" PRIdPTR " min_timer=%" PRIdPTR, - now_atm, min_timer); + "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, + min_timer); } return GRPC_TIMERS_CHECKED_AND_EMPTY; } grpc_error *shutdown_error = - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0 + now != GRPC_MILLIS_INF_FUTURE ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutting down timer system"); @@ -667,34 +631,24 @@ grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, timespec_to_atm_round_down(*next)); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } - gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRId64 ".%09d [%" PRIdPTR - "] next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, - now.tv_sec, now.tv_nsec, now_atm, next_str, - gpr_tls_get(&g_last_seen_min_timer), + gpr_log(GPR_DEBUG, "TIMER CHECK BEGIN: now=%" PRIdPTR + " next=%s tls_min=%" PRIdPTR " glob_min=%" PRIdPTR, + now, next_str, gpr_tls_get(&g_last_seen_min_timer), gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); gpr_free(next_str); } // actual code - grpc_timer_check_result r; - gpr_atm next_atm; - if (next == NULL) { - r = run_some_expired_timers(exec_ctx, now_atm, NULL, shutdown_error); - } else { - next_atm = timespec_to_atm_round_down(*next); - r = run_some_expired_timers(exec_ctx, now_atm, &next_atm, shutdown_error); - *next = atm_to_timespec(next_atm); - } + grpc_timer_check_result r = + run_some_expired_timers(exec_ctx, now, next, shutdown_error); // tracing if (GRPC_TRACER_ON(grpc_timer_check_trace)) { char *next_str; if (next == NULL) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRId64 ".%09d [%" PRIdPTR "]", next->tv_sec, - next->tv_nsec, next_atm); + gpr_asprintf(&next_str, "%" PRIdPTR, *next); } gpr_log(GPR_DEBUG, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); diff --git a/src/core/lib/iomgr/timer_heap.c b/src/core/lib/iomgr/timer_heap.cc index 2648d5da5d..2648d5da5d 100644 --- a/src/core/lib/iomgr/timer_heap.c +++ b/src/core/lib/iomgr/timer_heap.cc diff --git a/src/core/lib/iomgr/timer_heap.h b/src/core/lib/iomgr/timer_heap.h index 0d64199ab9..f15e8a3abb 100644 --- a/src/core/lib/iomgr/timer_heap.h +++ b/src/core/lib/iomgr/timer_heap.h @@ -21,6 +21,10 @@ #include "src/core/lib/iomgr/timer.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { grpc_timer **timers; uint32_t timer_count; @@ -39,4 +43,8 @@ void grpc_timer_heap_pop(grpc_timer_heap *heap); int grpc_timer_heap_is_empty(grpc_timer_heap *heap); -#endif /* GRPC_CORE_LIB_IOMGR_TIMER_HEAP_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TIMER_HEAP_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.cc index 04ca44563d..1248f82189 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.cc @@ -20,8 +20,11 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/port_platform.h> #include <grpc/support/thd.h> +#include <inttypes.h> + #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/timer.h" @@ -30,7 +33,7 @@ typedef struct completed_thread { struct completed_thread *next; } completed_thread; -extern grpc_tracer_flag grpc_timer_check_trace; +extern "C" grpc_tracer_flag grpc_timer_check_trace; // global mutex static gpr_mu g_mu; @@ -52,7 +55,7 @@ static bool g_kicked; static bool g_has_timed_waiter; // the deadline of the current timed waiter thread (only relevant if // g_has_timed_waiter is true) -static gpr_timespec g_timed_waiter_deadline; +static grpc_millis g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -96,9 +99,8 @@ static void start_timer_thread_and_unlock(void) { void grpc_timer_manager_tick() { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_timespec next = gpr_inf_future(GPR_CLOCK_MONOTONIC); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_timer_check(&exec_ctx, now, &next); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_timer_check(&exec_ctx, &next); grpc_exec_ctx_finish(&exec_ctx); } @@ -121,6 +123,9 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { gpr_mu_unlock(&g_mu); } // without our lock, flush the exec_ctx + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "flush exec_ctx"); + } grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&g_mu); // garbage collect any threads hanging out that are dead @@ -133,8 +138,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { // wait until 'next' (or forever if there is already a timed waiter in the pool) // returns true if the thread should continue executing (false if it should // shutdown) -static bool wait_until(gpr_timespec next) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); +static bool wait_until(grpc_exec_ctx *exec_ctx, grpc_millis next) { gpr_mu_lock(&g_mu); // if we're not threaded anymore, leave if (!g_threaded) { @@ -168,30 +172,29 @@ static bool wait_until(gpr_timespec next) { unless their 'next' is earlier than the current timed-waiter's deadline (in which case the thread with earlier 'next' takes over as the new timed waiter) */ - if (gpr_time_cmp(next, inf_future) != 0) { - if (!g_has_timed_waiter || - (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + if (next != GRPC_MILLIS_INF_FUTURE) { + if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) { my_timed_waiter_generation = ++g_timed_waiter_generation; g_has_timed_waiter = true; g_timed_waiter_deadline = next; if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = - gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + grpc_millis wait_time = next - grpc_exec_ctx_now(exec_ctx); + gpr_log(GPR_DEBUG, "sleep for a %" PRIdPTR " milliseconds", + wait_time); } } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; } } if (GRPC_TRACER_ON(grpc_timer_check_trace) && - gpr_time_cmp(next, inf_future) == 0) { + next == GRPC_MILLIS_INF_FUTURE) { gpr_log(GPR_DEBUG, "sleep until kicked"); } - gpr_cv_wait(&g_cv_wait, &g_mu, next); + gpr_cv_wait(&g_cv_wait, &g_mu, + grpc_millis_to_timespec(next, GPR_CLOCK_REALTIME)); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", @@ -203,7 +206,7 @@ static bool wait_until(gpr_timespec next) { // there's work to do after checking timers (code above) if (my_timed_waiter_generation == g_timed_waiter_generation) { g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; } } @@ -219,12 +222,11 @@ static bool wait_until(gpr_timespec next) { } static void timer_main_loop(grpc_exec_ctx *exec_ctx) { - const gpr_timespec inf_future = gpr_inf_future(GPR_CLOCK_MONOTONIC); for (;;) { - gpr_timespec next = inf_future; - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_millis next = GRPC_MILLIS_INF_FUTURE; + grpc_exec_ctx_invalidate_now(exec_ctx); // check timer state, updates next to the next time to run a check - switch (grpc_timer_check(exec_ctx, now, &next)) { + switch (grpc_timer_check(exec_ctx, &next)) { case GRPC_TIMERS_FIRED: run_some_timers(exec_ctx); break; @@ -241,10 +243,10 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "timers not checked: expect another thread to"); } - next = inf_future; + next = GRPC_MILLIS_INF_FUTURE; /* fall through */ case GRPC_TIMERS_CHECKED_AND_EMPTY: - if (!wait_until(next)) { + if (!wait_until(exec_ctx, next)) { return; } break; @@ -300,7 +302,7 @@ void grpc_timer_manager_init(void) { g_completed_threads = NULL; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; start_threads(); } @@ -347,7 +349,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; - g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE; ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); diff --git a/src/core/lib/iomgr/timer_manager.h b/src/core/lib/iomgr/timer_manager.h index 0ba502928a..d8a59a9477 100644 --- a/src/core/lib/iomgr/timer_manager.h +++ b/src/core/lib/iomgr/timer_manager.h @@ -21,6 +21,10 @@ #include <stdbool.h> +#ifdef __cplusplus +extern "C" { +#endif + /* Timer Manager tries to keep one thread waiting for the next timeout at all times */ @@ -34,4 +38,8 @@ void grpc_timer_manager_set_threading(bool enabled); * disabled */ void grpc_timer_manager_tick(void); -#endif /* GRPC_CORE_LIB_IOMGR_TIMER_MANAGER_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TIMER_MANAGER_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.cc index adced41f53..ccbbe357ae 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.cc @@ -29,9 +29,11 @@ #include <uv.h> +extern "C" { grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false, "timer_check"); +} static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); } @@ -53,20 +55,19 @@ void run_expired_timer(uv_timer_t *handle) { } void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, - gpr_timespec deadline, grpc_closure *closure, - gpr_timespec now) { + grpc_millis deadline, grpc_closure *closure) { uint64_t timeout; uv_timer_t *uv_timer; GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; - if (gpr_time_cmp(deadline, now) <= 0) { + if (deadline <= grpc_exec_ctx_now(exec_ctx)) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } timer->pending = 1; - timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); - uv_timer = gpr_malloc(sizeof(uv_timer_t)); + timeout = (uint64_t)(deadline - grpc_exec_ctx_now(exec_ctx)); + uv_timer = (uv_timer_t *)gpr_malloc(sizeof(uv_timer_t)); uv_timer_init(uv_default_loop(), uv_timer); uv_timer->data = timer; timer->uv_timer = uv_timer; @@ -89,11 +90,11 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { } grpc_timer_check_result grpc_timer_check(grpc_exec_ctx *exec_ctx, - gpr_timespec now, gpr_timespec *next) { + grpc_millis *next) { return GRPC_TIMERS_NOT_CHECKED; } -void grpc_timer_list_init(gpr_timespec now) {} +void grpc_timer_list_init(grpc_exec_ctx *exec_ctx) {} void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {} void grpc_timer_consume_kick(void) {} diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.cc index 00b2e68bb5..00b2e68bb5 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.cc diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 881468ea2c..bcd8572260 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -23,6 +23,10 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" +#ifdef __cplusplus +extern "C" { +#endif + /* Forward decl of struct grpc_server */ /* This is not typedef'ed to avoid a typedef-redefinition error */ struct grpc_server; @@ -73,4 +77,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, grpc_closure *on_done); -#endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_UDP_SERVER_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.cc index 35f898f13a..35f898f13a 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.cc diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h index 25b64b3eec..b96131ae1c 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.h +++ b/src/core/lib/iomgr/unix_sockets_posix.h @@ -25,6 +25,10 @@ #include "src/core/lib/iomgr/resolve_address.h" +#ifdef __cplusplus +extern "C" { +#endif + void grpc_create_socketpair_if_unix(int sv[2]); grpc_error *grpc_resolve_unix_domain_address( @@ -38,4 +42,8 @@ void grpc_unlink_if_unix_domain_socket( char *grpc_sockaddr_to_uri_unix_if_possible( const grpc_resolved_address *resolved_addr); -#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */ +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */
\ No newline at end of file diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.cc index e46b1c003d..e46b1c003d 100644 --- a/src/core/lib/iomgr/unix_sockets_posix_noop.c +++ b/src/core/lib/iomgr/unix_sockets_posix_noop.cc diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.cc index 268e0175dd..268e0175dd 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.c +++ b/src/core/lib/iomgr/wakeup_fd_cv.cc diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index dc170ad5b4..dcd7bdb560 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -40,6 +40,10 @@ #define GRPC_FD_TO_IDX(fd) (-(fd)-1) #define GRPC_IDX_TO_FD(idx) (-(idx)-1) +#ifdef __cplusplus +extern "C" { +#endif + typedef struct cv_node { gpr_cv* cv; struct cv_node* next; @@ -62,4 +66,10 @@ typedef struct cv_fd_table { grpc_poll_function_type poll; } cv_fd_table; +extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; + +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_CV_H */ diff --git a/src/core/lib/iomgr/wakeup_fd_eventfd.c b/src/core/lib/iomgr/wakeup_fd_eventfd.cc index 81cb7ee280..81cb7ee280 100644 --- a/src/core/lib/iomgr/wakeup_fd_eventfd.c +++ b/src/core/lib/iomgr/wakeup_fd_eventfd.cc diff --git a/src/core/lib/iomgr/wakeup_fd_nospecial.c b/src/core/lib/iomgr/wakeup_fd_nospecial.cc index 4c20b8c1b7..4c20b8c1b7 100644 --- a/src/core/lib/iomgr/wakeup_fd_nospecial.c +++ b/src/core/lib/iomgr/wakeup_fd_nospecial.cc diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.cc index 4189488f8a..05d69dc9cc 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.cc @@ -20,6 +20,7 @@ #ifdef GRPC_POSIX_WAKEUP_FD +#include "src/core/lib/iomgr/wakeup_fd_pipe.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include <errno.h> diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.h b/src/core/lib/iomgr/wakeup_fd_pipe.h index f860406bda..9bbb5e2ff7 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.h +++ b/src/core/lib/iomgr/wakeup_fd_pipe.h @@ -21,6 +21,14 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" -extern grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable; +#ifdef __cplusplus +extern "C" { +#endif + +extern const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable; + +#ifdef __cplusplus +} +#endif #endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_PIPE_H */ diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.cc index 25daa7d3fb..9af96d314b 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.cc @@ -25,7 +25,6 @@ #include "src/core/lib/iomgr/wakeup_fd_pipe.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -extern grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL; int grpc_allow_specialized_wakeup_fd = 1; diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index a9584d0d48..ae7849f98c 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -49,6 +49,10 @@ #include "src/core/lib/iomgr/error.h" +#ifdef __cplusplus +extern "C" { +#endif + void grpc_wakeup_fd_global_init(void); void grpc_wakeup_fd_global_destroy(void); @@ -91,4 +95,8 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info); * wakeup_fd_nospecial.c if no such implementation exists. */ extern const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable; +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H */ |