diff options
Diffstat (limited to 'src/core/lib')
50 files changed, 1436 insertions, 194 deletions
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 4bf8218664..7581f937b6 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -138,7 +138,7 @@ typedef struct { is_first, is_last designate this elements position in the stack, and are useful for asserting correct configuration by upper layer code. The filter does not need to do any chaining. - Implementations may assume that elem->call_data is all zeros. */ + Implementations may assume that elem->channel_data is all zeros. */ grpc_error* (*init_channel_elem)(grpc_channel_element* elem, grpc_channel_element_args* args); /* Destroy per channel data. diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index ddd3029402..e2ea334ded 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -51,6 +51,7 @@ typedef struct connected_channel_call_data { callback_state on_complete[6]; // Max number of pending batches. callback_state recv_initial_metadata_ready; callback_state recv_message_ready; + callback_state recv_trailing_metadata_ready; } call_data; static void run_in_call_combiner(void* arg, grpc_error* error) { @@ -111,6 +112,12 @@ static void con_start_transport_stream_op_batch( intercept_callback(calld, state, false, "recv_message_ready", &batch->payload->recv_message.recv_message_ready); } + if (batch->recv_trailing_metadata) { + callback_state* state = &calld->recv_trailing_metadata_ready; + intercept_callback( + calld, state, false, "recv_trailing_metadata_ready", + &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready); + } if (batch->cancel_stream) { // There can be more than one cancellation batch in flight at any // given time, so we can't just pick out a fixed index into @@ -121,7 +128,7 @@ static void con_start_transport_stream_op_batch( static_cast<callback_state*>(gpr_malloc(sizeof(*state))); intercept_callback(calld, state, true, "on_complete (cancel_stream)", &batch->on_complete); - } else { + } else if (batch->on_complete != nullptr) { callback_state* state = get_state_for_batch(calld, batch); intercept_callback(calld, state, false, "on_complete", &batch->on_complete); } diff --git a/src/core/lib/debug/trace.h b/src/core/lib/debug/trace.h index 28157c6383..fe6301a3fc 100644 --- a/src/core/lib/debug/trace.h +++ b/src/core/lib/debug/trace.h @@ -57,14 +57,14 @@ class TraceFlag { const char* name() const { return name_; } -// This following define may be commented out to ensure that the compiler -// deletes any "if (tracer.enabled()) {...}" codeblocks. This is useful to -// test the performance impact tracers have on the system. -// -// #define COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD -#ifdef COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD - bool enabled() { return false; } -#else +// Use the symbol GRPC_USE_TRACERS to determine if tracers will be enabled in +// opt builds (tracers are always on in dbg builds). The default in OSS is for +// tracers to be on since we support binary distributions of gRPC for the +// wrapped language (wr don't want to force recompilation to get tracing). +// Internally, however, for performance reasons, we compile them out by +// default, since internal build systems make recompiling trivial. +#define GRPC_USE_TRACERS // tracers on by default in OSS +#if defined(GRPC_USE_TRACERS) || !defined(NDEBUG) bool enabled() { #ifdef GRPC_THREADSAFE_TRACER return gpr_atm_no_barrier_load(&value_) != 0; @@ -72,7 +72,9 @@ class TraceFlag { return value_; #endif // GRPC_THREADSAFE_TRACER } -#endif // COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD +#else + bool enabled() { return false; } +#endif /* defined(GRPC_USE_TRACERS) || !defined(NDEBUG) */ private: friend void grpc_core::testing::grpc_tracer_enable_flag(TraceFlag* flag); diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h index 1354109bf3..28fcdf1779 100644 --- a/src/core/lib/gprpp/memory.h +++ b/src/core/lib/gprpp/memory.h @@ -55,6 +55,7 @@ inline T* New(Args&&... args) { // Alternative to delete, since we cannot use it (for fear of libstdc++) template <typename T> inline void Delete(T* p) { + if (p == nullptr) return; p->~T(); if (alignof(T) > kAlignmentForDefaultAllocationInBytes) { gpr_free_aligned(p); diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index 0ccd08ea57..f9ce29f231 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -26,6 +26,7 @@ #include <grpc/support/atm.h> #include "src/core/lib/gpr/mpscq.h" +#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/iomgr/closure.h" // A simple, lock-free mechanism for serializing activity related to a @@ -109,4 +110,83 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, grpc_error* error); +namespace grpc_core { + +// Helper for running a list of closures in a call combiner. +// +// Each callback running in the call combiner will eventually be +// returned to the surface, at which point the surface will yield the +// call combiner. So when we are running in the call combiner and have +// more than one callback to return to the surface, we need to re-enter +// the call combiner for all but one of those callbacks. +class CallCombinerClosureList { + public: + CallCombinerClosureList() {} + + // Adds a closure to the list. The closure must eventually result in + // the call combiner being yielded. + void Add(grpc_closure* closure, grpc_error* error, const char* reason) { + closures_.emplace_back(closure, error, reason); + } + + // Runs all closures in the call combiner and yields the call combiner. + // + // All but one of the closures in the list will be scheduled via + // GRPC_CALL_COMBINER_START(), and the remaining closure will be + // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in + // yielding the call combiner. If the list is empty, then the call + // combiner will be yielded immediately. + void RunClosures(grpc_call_combiner* call_combiner) { + for (size_t i = 1; i < closures_.size(); ++i) { + auto& closure = closures_[i]; + GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, + closure.reason); + } + if (closures_.size() > 0) { + if (grpc_call_combiner_trace.enabled()) { + gpr_log(GPR_INFO, + "CallCombinerClosureList executing closure while already " + "holding call_combiner %p: closure=%p error=%s reason=%s", + call_combiner, closures_[0].closure, + grpc_error_string(closures_[0].error), closures_[0].reason); + } + // This will release the call combiner. + GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error); + } else { + GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); + } + closures_.clear(); + } + + // Runs all closures in the call combiner, but does NOT yield the call + // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). + void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) { + for (size_t i = 0; i < closures_.size(); ++i) { + auto& closure = closures_[i]; + GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, + closure.reason); + } + closures_.clear(); + } + + size_t size() const { return closures_.size(); } + + private: + struct CallCombinerClosure { + grpc_closure* closure; + grpc_error* error; + const char* reason; + + CallCombinerClosure(grpc_closure* closure, grpc_error* error, + const char* reason) + : closure(closure), error(error), reason(reason) {} + }; + + // There are generally a maximum of 6 closures to run in the call + // combiner, one for each pending op. + InlinedVector<CallCombinerClosure, 6> closures_; +}; + +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc new file mode 100644 index 0000000000..30f4e65632 --- /dev/null +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -0,0 +1,183 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <CoreFoundation/CoreFoundation.h> +#import "src/core/lib/iomgr/cfstream_handle.h" + +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +void* CFStreamHandle::Retain(void* info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(info); + CFSTREAM_HANDLE_REF(handle, "retain"); + return info; +} + +void CFStreamHandle::Release(void* info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(info); + CFSTREAM_HANDLE_UNREF(handle, "release"); +} + +CFStreamHandle* CFStreamHandle::CreateStreamHandle( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream) { + return new CFStreamHandle(read_stream, write_stream); +} + +void CFStreamHandle::ReadCallback(CFReadStreamRef stream, + CFStreamEventType type, + void* client_callback_info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info); + CFSTREAM_HANDLE_REF(handle, "read callback"); + dispatch_async( + dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle, + stream, type, client_callback_info); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventHasBytesAvailable: + case kCFStreamEventEndEncountered: + handle->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->read_event_.SetReady(); + break; + default: + GPR_UNREACHABLE_CODE(return ); + } + CFSTREAM_HANDLE_UNREF(handle, "read callback"); + }); +} +void CFStreamHandle::WriteCallback(CFWriteStreamRef stream, + CFStreamEventType type, + void* clientCallBackInfo) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo); + CFSTREAM_HANDLE_REF(handle, "write callback"); + dispatch_async( + dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle, + stream, type, clientCallBackInfo); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + case kCFStreamEventEndEncountered: + handle->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->write_event_.SetReady(); + break; + default: + GPR_UNREACHABLE_CODE(return ); + } + CFSTREAM_HANDLE_UNREF(handle, "write callback"); + }); +} + +CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream) { + gpr_ref_init(&refcount_, 1); + open_event_.InitEvent(); + read_event_.InitEvent(); + write_event_.InitEvent(); + CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil}; + CFReadStreamSetClient( + read_stream, + kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamHandle::ReadCallback, &ctx); + CFWriteStreamSetClient( + write_stream, + kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamHandle::WriteCallback, &ctx); + CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); + CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); +} + +CFStreamHandle::~CFStreamHandle() { + open_event_.DestroyEvent(); + read_event_.DestroyEvent(); + write_event_.DestroyEvent(); +} + +void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) { + open_event_.NotifyOn(closure); +} + +void CFStreamHandle::NotifyOnRead(grpc_closure* closure) { + read_event_.NotifyOn(closure); +} + +void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) { + write_event_.NotifyOn(closure); +} + +void CFStreamHandle::Shutdown(grpc_error* error) { + open_event_.SetShutdown(GRPC_ERROR_REF(error)); + read_event_.SetShutdown(GRPC_ERROR_REF(error)); + write_event_.SetShutdown(GRPC_ERROR_REF(error)); + GRPC_ERROR_UNREF(error); +} + +void CFStreamHandle::Ref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, + reason, val, val + 1); + } + gpr_ref(&refcount_); +} + +void CFStreamHandle::Unref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(GPR_ERROR, + "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, + reason, val, val - 1); + } + if (gpr_unref(&refcount_)) { + delete this; + } +} + +#endif diff --git a/src/core/lib/iomgr/cfstream_handle.h b/src/core/lib/iomgr/cfstream_handle.h new file mode 100644 index 0000000000..4258e72431 --- /dev/null +++ b/src/core/lib/iomgr/cfstream_handle.h @@ -0,0 +1,80 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* The CFStream handle acts as an event synchronization entity for + * read/write/open/error/eos events happening on CFStream streams. */ + +#ifndef GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H +#define GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <CoreFoundation/CoreFoundation.h> + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/lockfree_event.h" + +class CFStreamHandle final { + public: + static CFStreamHandle* CreateStreamHandle(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream); + ~CFStreamHandle(); + CFStreamHandle(const CFReadStreamRef& ref) = delete; + CFStreamHandle(CFReadStreamRef&& ref) = delete; + CFStreamHandle& operator=(const CFStreamHandle& rhs) = delete; + + void NotifyOnOpen(grpc_closure* closure); + void NotifyOnRead(grpc_closure* closure); + void NotifyOnWrite(grpc_closure* closure); + void Shutdown(grpc_error* error); + + void Ref(const char* file = "", int line = 0, const char* reason = nullptr); + void Unref(const char* file = "", int line = 0, const char* reason = nullptr); + + private: + CFStreamHandle(CFReadStreamRef read_stream, CFWriteStreamRef write_stream); + static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void* Retain(void* info); + static void Release(void* info); + + grpc_core::LockfreeEvent open_event_; + grpc_core::LockfreeEvent read_event_; + grpc_core::LockfreeEvent write_event_; + + gpr_refcount refcount_; +}; + +#ifdef DEBUG +#define CFSTREAM_HANDLE_REF(handle, reason) \ + (handle)->Ref(__FILE__, __LINE__, (reason)) +#define CFSTREAM_HANDLE_UNREF(handle, reason) \ + (handle)->Unref(__FILE__, __LINE__, (reason)) +#else +#define CFSTREAM_HANDLE_REF(handle, reason) (handle)->Ref() +#define CFSTREAM_HANDLE_UNREF(handle, reason) (handle)->Unref() +#endif + +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H */ diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 34a494485d..f14c723844 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -283,9 +283,10 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) { if (c->scheduled) { gpr_log(GPR_ERROR, "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d] run?: %s", + "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], " + "run?: %s", c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, c->run ? "true" : "false"); + c->line_initiated, file, line, c->run ? "true" : "false"); abort(); } c->scheduled = true; diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 6789e4d12d..7c0062eb4e 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -31,7 +31,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" -grpc_core::TraceFlag grpc_combiner_trace(false, "combiner"); +grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); #define GRPC_COMBINER_TRACE(fn) \ do { \ diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 0d63e468df..3c947bf9d6 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -61,6 +61,6 @@ grpc_closure_scheduler* grpc_combiner_finally_scheduler(grpc_combiner* lock); bool grpc_combiner_continue_exec_ctx(); -extern grpc_core::TraceFlag grpc_combiner_trace; +extern grpc_core::DebugOnlyTraceFlag grpc_combiner_trace; #endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc new file mode 100644 index 0000000000..c3bc0cc8fd --- /dev/null +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -0,0 +1,372 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_ENDPOINT + +#import <CoreFoundation/CoreFoundation.h> +#import "src/core/lib/iomgr/endpoint_cfstream.h" + +#include <grpc/slice_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/cfstream_handle.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error_cfstream.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct { + grpc_endpoint base; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamHandle* stream_sync; + + grpc_closure* read_cb; + grpc_closure* write_cb; + grpc_slice_buffer* read_slices; + grpc_slice_buffer* write_slices; + + grpc_closure read_action; + grpc_closure write_action; + + char* peer_string; + grpc_resource_user* resource_user; + grpc_resource_user_slice_allocator slice_allocator; +} CFStreamEndpoint; + +static void CFStreamFree(CFStreamEndpoint* ep) { + grpc_resource_user_unref(ep->resource_user); + CFRelease(ep->read_stream); + CFRelease(ep->write_stream); + CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free"); + gpr_free(ep->peer_string); + gpr_free(ep); +} + +#ifndef NDEBUG +#define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__) +#define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__) +static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason, + const char* file, int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, + reason, val, val - 1); + } + if (gpr_unref(&ep->refcount)) { + CFStreamFree(ep); + } +} +static void CFStreamRef(CFStreamEndpoint* ep, const char* reason, + const char* file, int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, + reason, val, val + 1); + } + gpr_ref(&ep->refcount); +} +#else +#define EP_REF(ep, reason) CFStreamRef((ep)) +#define EP_UNREF(ep, reason) CFStreamUnref((ep)) +static void CFStreamUnref(CFStreamEndpoint* ep) { + if (gpr_unref(&ep->refcount)) { + CFStreamFree(ep); + } +} +static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); } +#endif + +static grpc_error* CFStreamAnnotateError(grpc_error* src_error, + CFStreamEndpoint* ep) { + return grpc_error_set_str( + grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE), + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(ep->peer_string)); +} + +static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep, + ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg); + size_t i; + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < ep->read_slices->count; i++) { + char* dump = grpc_dump_slice(ep->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump); + gpr_free(dump); + } + } + grpc_closure* cb = ep->read_cb; + ep->read_cb = nullptr; + ep->read_slices = nullptr; + GRPC_CLOSURE_SCHED(cb, error); +} + +static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep, + ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg); + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write: error=%s", str); + } + grpc_closure* cb = ep->write_cb; + ep->write_cb = nullptr; + ep->write_slices = nullptr; + GRPC_CLOSURE_SCHED(cb, error); +} + +static void ReadAction(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + GPR_ASSERT(ep->read_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, GRPC_ERROR_REF(error)); + EP_UNREF(ep, "read"); + return; + } + + GPR_ASSERT(ep->read_slices->count == 1); + grpc_slice slice = ep->read_slices->slices[0]; + size_t len = GRPC_SLICE_LENGTH(slice); + CFIndex read_size = + CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len); + if (read_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream); + if (stream_error != nullptr) { + error = CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep); + CFRelease(stream_error); + } else { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error"); + } + CallReadCb(ep, error); + EP_UNREF(ep, "read"); + } else if (read_size == 0) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, + CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep)); + EP_UNREF(ep, "read"); + } else { + if (read_size < len) { + grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr); + } + CallReadCb(ep, GRPC_ERROR_NONE); + EP_UNREF(ep, "read"); + } +} + +static void WriteAction(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + GPR_ASSERT(ep->write_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); + CallWriteCb(ep, GRPC_ERROR_REF(error)); + EP_UNREF(ep, "write"); + return; + } + + grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices); + size_t slice_len = GRPC_SLICE_LENGTH(slice); + CFIndex write_size = CFWriteStreamWrite( + ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); + if (write_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); + CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream); + if (stream_error != nullptr) { + error = CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep); + CFRelease(stream_error); + } else { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed."); + } + CallWriteCb(ep, error); + EP_UNREF(ep, "write"); + } else { + if (write_size < GRPC_SLICE_LENGTH(slice)) { + grpc_slice_buffer_undo_take_first( + ep->write_slices, grpc_slice_sub(slice, write_size, slice_len)); + } + if (ep->write_slices->length > 0) { + ep->stream_sync->NotifyOnWrite(&ep->write_action); + } else { + CallWriteCb(ep, GRPC_ERROR_NONE); + EP_UNREF(ep, "write"); + } + + if (grpc_tcp_trace.enabled()) { + grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); + char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump); + gpr_free(dump); + grpc_slice_unref_internal(trace_slice); + } + } + grpc_slice_unref_internal(slice); +} + +static void CFStreamReadAllocationDone(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + if (error == GRPC_ERROR_NONE) { + ep->stream_sync->NotifyOnRead(&ep->read_action); + } else { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, error); + EP_UNREF(ep, "read"); + } +} + +static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl, + slices, cb, slices->length); + } + GPR_ASSERT(ep_impl->read_cb == nullptr); + ep_impl->read_cb = cb; + ep_impl->read_slices = slices; + grpc_slice_buffer_reset_and_unref_internal(slices); + grpc_resource_user_alloc_slices(&ep_impl->slice_allocator, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + ep_impl->read_slices); + EP_REF(ep_impl, "read"); +} + +static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", + ep_impl, slices, cb, slices->length); + } + GPR_ASSERT(ep_impl->write_cb == nullptr); + ep_impl->write_cb = cb; + ep_impl->write_slices = slices; + EP_REF(ep_impl, "write"); + ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action); +} + +void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why); + } + CFReadStreamClose(ep_impl->read_stream); + CFWriteStreamClose(ep_impl->write_stream); + ep_impl->stream_sync->Shutdown(why); + grpc_resource_user_shutdown(ep_impl->resource_user); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why); + } +} + +void CFStreamDestroy(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl); + } + EP_UNREF(ep_impl, "destroy"); +} + +grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + return ep_impl->resource_user; +} + +char* CFStreamGetPeer(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + return gpr_strdup(ep_impl->peer_string); +} + +int CFStreamGetFD(grpc_endpoint* ep) { return 0; } + +void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} +void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} +void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep, + grpc_pollset_set* pollset) {} + +static const grpc_endpoint_vtable vtable = {CFStreamRead, + CFStreamWrite, + CFStreamAddToPollset, + CFStreamAddToPollsetSet, + CFStreamDeleteFromPollsetSet, + CFStreamShutdown, + CFStreamDestroy, + CFStreamGetResourceUser, + CFStreamGetPeer, + CFStreamGetFD}; + +grpc_endpoint* grpc_cfstream_endpoint_create( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream, + const char* peer_string, grpc_resource_quota* resource_quota, + CFStreamHandle* stream_sync) { + CFStreamEndpoint* ep_impl = + static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint))); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, + "CFStream endpoint:%p create readStream:%p writeStream: %p", + ep_impl, read_stream, write_stream); + } + ep_impl->base.vtable = &vtable; + gpr_ref_init(&ep_impl->refcount, 1); + ep_impl->read_stream = read_stream; + ep_impl->write_stream = write_stream; + CFRetain(read_stream); + CFRetain(write_stream); + ep_impl->stream_sync = stream_sync; + CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create"); + + ep_impl->peer_string = gpr_strdup(peer_string); + ep_impl->read_cb = nil; + ep_impl->write_cb = nil; + ep_impl->read_slices = nil; + ep_impl->write_slices = nil; + GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction, + static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction, + static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx); + ep_impl->resource_user = + grpc_resource_user_create(resource_quota, peer_string); + grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator, + ep_impl->resource_user, + CFStreamReadAllocationDone, ep_impl); + + return &ep_impl->base; +} + +#endif /* GRPC_CFSTREAM_ENDPOINT */ diff --git a/src/core/lib/iomgr/endpoint_cfstream.h b/src/core/lib/iomgr/endpoint_cfstream.h new file mode 100644 index 0000000000..ef957c1f11 --- /dev/null +++ b/src/core/lib/iomgr/endpoint_cfstream.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H +#define GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H +/* + Low level TCP "bottom half" implementation, for use by transports built on + top of a TCP connection. + + Note that this file does not (yet) include APIs for creating the socket in + the first place. + + All calls passing slice transfer ownership of a slice refcount unless + otherwise specified. +*/ + +#include <grpc/support/port_platform.h> + +#ifdef GRPC_CFSTREAM + +#import <CoreFoundation/CoreFoundation.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/cfstream_handle.h" +#include "src/core/lib/iomgr/endpoint.h" + +grpc_endpoint* grpc_cfstream_endpoint_create( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream, + const char* peer_string, grpc_resource_quota* resource_quota, + CFStreamHandle* stream_sync); + +#endif /* GRPC_CFSTREAM */ + +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H */ diff --git a/src/core/lib/iomgr/error_cfstream.cc b/src/core/lib/iomgr/error_cfstream.cc new file mode 100644 index 0000000000..d7af8c377f --- /dev/null +++ b/src/core/lib/iomgr/error_cfstream.cc @@ -0,0 +1,52 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GRPC_CFSTREAM +#include <CoreFoundation/CoreFoundation.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/iomgr/error.h" + +#define MAX_ERROR_DESCRIPTION 256 + +grpc_error* grpc_error_create_from_cferror(const char* file, int line, + void* arg, const char* custom_desc) { + CFErrorRef error = static_cast<CFErrorRef>(arg); + char buf_domain[MAX_ERROR_DESCRIPTION]; + char buf_desc[MAX_ERROR_DESCRIPTION]; + char* error_msg; + CFErrorDomain domain = CFErrorGetDomain((error)); + CFIndex code = CFErrorGetCode((error)); + CFStringRef desc = CFErrorCopyDescription((error)); + CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION, + kCFStringEncodingUTF8); + CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION, + kCFStringEncodingUTF8); + gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)", + custom_desc, buf_domain, code, buf_desc); + CFRelease(desc); + grpc_error* return_error = grpc_error_create( + file, line, grpc_slice_from_copied_string(error_msg), NULL, 0); + gpr_free(error_msg); + return return_error; +} +#endif /* GRPC_CFSTREAM */ diff --git a/src/core/lib/iomgr/error_cfstream.h b/src/core/lib/iomgr/error_cfstream.h new file mode 100644 index 0000000000..06ab751329 --- /dev/null +++ b/src/core/lib/iomgr/error_cfstream.h @@ -0,0 +1,31 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H +#define GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H + +#ifdef GRPC_CFSTREAM +// Create an error from Apple Core Foundation CFError object +#define GRPC_ERROR_CREATE_FROM_CFERROR(error, desc) \ + grpc_error_create_from_cferror(__FILE__, __LINE__, \ + static_cast<void*>((error)), (desc)) +grpc_error* grpc_error_create_from_cferror(const char* file, int line, + void* arg, const char* desc); +#endif /* GRPC_CFSTREAM */ + +#endif /* GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index e5db1be0e0..cf839619cd 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1237,12 +1237,12 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { } #else /* defined(GRPC_LINUX_EPOLL) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1) #include "src/core/lib/iomgr/ev_epoll1_linux.h" /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { return nullptr; } -#endif /* defined(GRPC_POSIX_SOCKET) */ +#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */ #endif /* !defined(GRPC_LINUX_EPOLL) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 4c6cff7fe2..7903297fc6 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1066,7 +1066,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, #endif if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, - "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR + "PS:%p work hdl=%p worker=%p now=%" PRId64 " deadline=%" PRId64 " kwp=%d pollable=%p", pollset, worker_hdl, WORKER_PTR, grpc_core::ExecCtx::Get()->Now(), deadline, pollset->kicked_without_poller, pollset->active_pollable); @@ -1556,7 +1556,7 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( } #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) #include "src/core/lib/iomgr/ev_epollex_linux.h" /* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means epoll_create1 is not available. Return NULL */ @@ -1564,6 +1564,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { return nullptr; } -#endif /* defined(GRPC_POSIX_SOCKET) */ +#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) */ #endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 494bc71c1d..a144817a83 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1721,7 +1721,7 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( } #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLLSIG) #include "src/core/lib/iomgr/ev_epollsig_linux.h" /* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means epoll_create1 is not available. Return NULL */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 504787e659..70958ed562 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_EV_POLL #include "src/core/lib/iomgr/ev_poll_posix.h" @@ -1761,4 +1761,4 @@ const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request) { return &vtable; } -#endif +#endif /* GRPC_POSIX_SOCKET_EV_POLL */ diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 6bd1dc8e50..6b7eca0afa 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_EV #include "src/core/lib/iomgr/ev_posix.h" @@ -251,10 +251,10 @@ static void pollset_destroy(grpc_pollset* pollset) { static grpc_error* pollset_work(grpc_pollset* pollset, grpc_pollset_worker** worker, grpc_millis deadline) { - GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRIdPTR ") begin", pollset, + GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") begin", pollset, deadline); grpc_error* err = g_event_engine->pollset_work(pollset, worker, deadline); - GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRIdPTR ") end", pollset, + GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") end", pollset, deadline); return err; } @@ -334,4 +334,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { g_event_engine->pollset_set_del_fd(pollset_set, fd); } -#endif // GRPC_POSIX_SOCKET +#endif // GRPC_POSIX_SOCKET_EV diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 2f544b20ab..5d5c355ff9 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -53,24 +53,24 @@ static void exec_ctx_sched(grpc_closure* closure, grpc_error* error) { static gpr_timespec g_start_time; -static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) { +static grpc_millis timespec_to_millis_round_down(gpr_timespec ts) { ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * static_cast<double>(ts.tv_sec) + static_cast<double>(ts.tv_nsec) / GPR_NS_PER_MS; if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return static_cast<gpr_atm>(x); + if (x > GRPC_MILLIS_INF_FUTURE) return GRPC_MILLIS_INF_FUTURE; + return static_cast<grpc_millis>(x); } -static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) { +static grpc_millis timespec_to_millis_round_up(gpr_timespec ts) { ts = gpr_time_sub(ts, g_start_time); double x = GPR_MS_PER_SEC * static_cast<double>(ts.tv_sec) + static_cast<double>(ts.tv_nsec) / GPR_NS_PER_MS + static_cast<double>(GPR_NS_PER_SEC - 1) / static_cast<double>(GPR_NS_PER_SEC); if (x < 0) return 0; - if (x > GPR_ATM_MAX) return GPR_ATM_MAX; - return static_cast<gpr_atm>(x); + if (x > GRPC_MILLIS_INF_FUTURE) return GRPC_MILLIS_INF_FUTURE; + return static_cast<grpc_millis>(x); } gpr_timespec grpc_millis_to_timespec(grpc_millis millis, @@ -92,12 +92,12 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis, } grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) { - return timespec_to_atm_round_down( + return timespec_to_millis_round_down( gpr_convert_clock_type(ts, g_start_time.clock_type)); } grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) { - return timespec_to_atm_round_up( + return timespec_to_millis_round_up( gpr_convert_clock_type(ts, g_start_time.clock_type)); } @@ -138,7 +138,7 @@ bool ExecCtx::Flush() { grpc_millis ExecCtx::Now() { if (!now_is_valid_) { - now_ = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); + now_ = timespec_to_millis_round_down(gpr_now(GPR_CLOCK_MONOTONIC)); now_is_valid_ = true; } return now_; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 8823dc4b51..cf1118a003 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -29,10 +29,10 @@ #include "src/core/lib/gprpp/fork.h" #include "src/core/lib/iomgr/closure.h" -typedef gpr_atm grpc_millis; +typedef int64_t grpc_millis; -#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX -#define GRPC_MILLIS_INF_PAST GPR_ATM_MIN +#define GRPC_MILLIS_INF_FUTURE INT64_MAX +#define GRPC_MILLIS_INF_PAST INT64_MIN /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 66c9cb7ff7..ca7334c9a4 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_IOMGR #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" @@ -64,4 +64,4 @@ void grpc_set_default_iomgr_platform() { grpc_set_iomgr_platform_vtable(&vtable); } -#endif /* GRPC_POSIX_SOCKET */ +#endif /* GRPC_POSIX_SOCKET_IOMGR */ diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc index 9f164f65b0..dea07cae53 100644 --- a/src/core/lib/iomgr/polling_entity.cc +++ b/src/core/lib/iomgr/polling_entity.cc @@ -61,8 +61,11 @@ bool grpc_polling_entity_is_empty(const grpc_polling_entity* pollent) { void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, grpc_pollset_set* pss_dst) { if (pollent->tag == GRPC_POLLS_POLLSET) { - GPR_ASSERT(pollent->pollent.pollset != nullptr); - grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset); + // CFStream does not use file destriptors. When CFStream is used, the fd + // pollset is possible to be null. + if (pollent->pollent.pollset != nullptr) { + grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset); + } } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) { GPR_ASSERT(pollent->pollent.pollset_set != nullptr); grpc_pollset_set_add_pollset_set(pss_dst, pollent->pollent.pollset_set); @@ -75,8 +78,14 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent, grpc_pollset_set* pss_dst) { if (pollent->tag == GRPC_POLLS_POLLSET) { +#ifdef GRPC_CFSTREAM + if (pollent->pollent.pollset != nullptr) { + grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset); + } +#else GPR_ASSERT(pollent->pollent.pollset != nullptr); grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset); +#endif } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) { GPR_ASSERT(pollent->pollent.pollset_set != nullptr); grpc_pollset_set_del_pollset_set(pss_dst, pollent->pollent.pollset_set); diff --git a/src/core/lib/iomgr/pollset_custom.cc b/src/core/lib/iomgr/pollset_custom.cc index 04bd104055..70e8a4596f 100644 --- a/src/core/lib/iomgr/pollset_custom.cc +++ b/src/core/lib/iomgr/pollset_custom.cc @@ -69,7 +69,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); gpr_mu_unlock(&pollset->mu); grpc_millis now = grpc_core::ExecCtx::Get()->Now(); - size_t timeout = 0; + grpc_millis timeout = 0; if (deadline > now) { timeout = deadline - now; } @@ -77,7 +77,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, // control back to the application grpc_core::ExecCtx* curr = grpc_core::ExecCtx::Get(); grpc_core::ExecCtx::Set(nullptr); - poller_vtable->poll(timeout); + poller_vtable->poll(static_cast<size_t>(timeout)); grpc_core::ExecCtx::Set(curr); grpc_core::ExecCtx::Get()->InvalidateNow(); if (grpc_core::ExecCtx::Get()->HasWork()) { diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index a397012003..80d8e63cdd 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -97,7 +97,26 @@ #define GRPC_MSG_IOVLEN_TYPE int #define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#ifdef GRPC_CFSTREAM +#define GRPC_POSIX_SOCKET_IOMGR 1 +#define GRPC_CFSTREAM_ENDPOINT 1 +#define GRPC_CFSTREAM_CLIENT 1 +#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1 +#define GRPC_POSIX_SOCKET_EV 1 +#define GRPC_POSIX_SOCKET_EV_EPOLL1 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1 +#define GRPC_POSIX_SOCKET_EV_POLL 1 +#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1 +#define GRPC_POSIX_SOCKET_SOCKADDR 1 +#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1 +#define GRPC_POSIX_SOCKET_TCP 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1 +#define GRPC_POSIX_SOCKET_UTILS_COMMON 1 +#else #define GRPC_POSIX_SOCKET 1 +#endif #define GRPC_POSIX_SOCKETUTILS 1 #define GRPC_POSIX_SYSCONF 1 #define GRPC_POSIX_WAKEUP_FD 1 @@ -131,12 +150,30 @@ #endif #if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \ - defined(GRPC_CUSTOM_SOCKET) != \ + defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_CFSTREAM) != \ 1 #error \ "Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET" #endif +#ifdef GRPC_POSIX_SOCKET +#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1 +#define GRPC_POSIX_SOCKET_EV 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1 +#define GRPC_POSIX_SOCKET_EV_POLL 1 +#define GRPC_POSIX_SOCKET_EV_EPOLL1 1 +#define GRPC_POSIX_SOCKET_IOMGR 1 +#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1 +#define GRPC_POSIX_SOCKET_SOCKADDR 1 +#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1 +#define GRPC_POSIX_SOCKET_TCP 1 +#define GRPC_POSIX_SOCKET_TCP_CLIENT 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1 +#define GRPC_POSIX_SOCKET_UTILS_COMMON 1 +#endif + #if defined(GRPC_POSIX_HOST_NAME_MAX) && defined(GRPC_POSIX_SYSCONF) #error "Cannot define both GRPC_POSIX_HOST_NAME_MAX and GRPC_POSIX_SYSCONF" #endif diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index fe0d834582..6afe94a7a9 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -33,7 +33,7 @@ #include <ws2tcpip.h> #endif -#ifdef GRPC_POSIX_SOCKET +#if defined(GRPC_POSIX_SOCKET) || defined(GRPC_CFSTREAM) #include <sys/socket.h> #endif diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index a82075542f..7a825643e1 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -19,7 +19,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_RESOLVE_ADDRESS #include "src/core/lib/iomgr/sockaddr.h" diff --git a/src/core/lib/iomgr/sockaddr_posix.h b/src/core/lib/iomgr/sockaddr_posix.h index 5b18bbc465..3cedd9082d 100644 --- a/src/core/lib/iomgr/sockaddr_posix.h +++ b/src/core/lib/iomgr/sockaddr_posix.h @@ -23,7 +23,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_SOCKADDR #include <arpa/inet.h> #include <netdb.h> #include <netinet/in.h> diff --git a/src/core/lib/iomgr/socket_factory_posix.cc b/src/core/lib/iomgr/socket_factory_posix.cc index 1d1e36c0e3..57137769c8 100644 --- a/src/core/lib/iomgr/socket_factory_posix.cc +++ b/src/core/lib/iomgr/socket_factory_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_SOCKET_FACTORY #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/useful.h" diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index 04a1767731..caee652307 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON #include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc new file mode 100644 index 0000000000..ffed3bbef6 --- /dev/null +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -0,0 +1,216 @@ + +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_CLIENT + +#include <CoreFoundation/CoreFoundation.h> + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> + +#include <netinet/in.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/iomgr/cfstream_handle.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint_cfstream.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/error_cfstream.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/timer.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct CFStreamConnect { + gpr_mu mu; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamHandle* stream_sync; + + grpc_timer alarm; + grpc_closure on_alarm; + grpc_closure on_open; + + bool read_stream_open; + bool write_stream_open; + bool failed; + + grpc_closure* closure; + grpc_endpoint** endpoint; + int refs; + char* addr_name; + grpc_resource_quota* resource_quota; +} CFStreamConnect; + +static void CFStreamConnectCleanup(CFStreamConnect* connect) { + grpc_resource_quota_unref_internal(connect->resource_quota); + CFSTREAM_HANDLE_UNREF(connect->stream_sync, "async connect clean up"); + CFRelease(connect->read_stream); + CFRelease(connect->write_stream); + gpr_mu_destroy(&connect->mu); + gpr_free(connect->addr_name); + gpr_free(connect); +} + +static void OnAlarm(void* arg, grpc_error* error) { + CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_closure* closure = connect->closure; + connect->closure = nil; + const bool done = (--connect->refs == 0); + gpr_mu_unlock(&connect->mu); + // Only schedule a callback once, by either OnAlarm or OnOpen. The + // first one issues callback while the second one does cleanup. + if (done) { + CFStreamConnectCleanup(connect); + } else { + grpc_error* error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); + GRPC_CLOSURE_SCHED(closure, error); + } +} + +static void OnOpen(void* arg, grpc_error* error) { + CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_timer_cancel(&connect->alarm); + grpc_closure* closure = connect->closure; + connect->closure = nil; + + bool done = (--connect->refs == 0); + grpc_endpoint** endpoint = connect->endpoint; + + // Only schedule a callback once, by either OnAlarm or OnOpen. The + // first one issues callback while the second one does cleanup. + if (done) { + gpr_mu_unlock(&connect->mu); + CFStreamConnectCleanup(connect); + } else { + if (error == GRPC_ERROR_NONE) { + CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream); + if (stream_error == NULL) { + stream_error = CFWriteStreamCopyError(connect->write_stream); + } + if (stream_error) { + error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error"); + CFRelease(stream_error); + } + if (error == GRPC_ERROR_NONE) { + *endpoint = grpc_cfstream_endpoint_create( + connect->read_stream, connect->write_stream, connect->addr_name, + connect->resource_quota, connect->stream_sync); + } + } else { + GRPC_ERROR_REF(error); + } + gpr_mu_unlock(&connect->mu); + GRPC_CLOSURE_SCHED(closure, error); + } +} + +static void ParseResolvedAddress(const grpc_resolved_address* addr, + CFStringRef* host, int* port) { + char *host_port, *host_string, *port_string; + grpc_sockaddr_to_string(&host_port, addr, 1); + gpr_split_host_port(host_port, &host_string, &port_string); + *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8); + gpr_free(host_string); + gpr_free(port_string); + gpr_free(host_port); + *port = grpc_sockaddr_get_port(addr); +} + +static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* resolved_addr, + grpc_millis deadline) { + CFStreamConnect* connect; + + connect = (CFStreamConnect*)gpr_zalloc(sizeof(CFStreamConnect)); + connect->closure = closure; + connect->endpoint = ep; + connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); + // connect->resource_quota = resource_quota; + connect->refs = 2; // One for the connect operation, one for the timer. + gpr_ref_init(&connect->refcount, 1); + gpr_mu_init(&connect->mu); + + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", + connect->addr_name); + } + + grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + (grpc_resource_quota*)channel_args->args[i].value.pointer.p); + } + } + } + connect->resource_quota = resource_quota; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + + CFStringRef host; + int port; + ParseResolvedAddress(resolved_addr, &host, &port); + CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, + &write_stream); + CFRelease(host); + connect->read_stream = read_stream; + connect->write_stream = write_stream; + connect->stream_sync = + CFStreamHandle::CreateStreamHandle(read_stream, write_stream); + GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect), + grpc_schedule_on_exec_ctx); + connect->stream_sync->NotifyOnOpen(&connect->on_open); + GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, + grpc_schedule_on_exec_ctx); + gpr_mu_lock(&connect->mu); + CFReadStreamOpen(read_stream); + CFWriteStreamOpen(write_stream); + grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); + gpr_mu_unlock(&connect->mu); +} + +grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {CFStreamClientConnect}; + +#endif /* GRPC_CFSTREAM_CLIENT */ diff --git a/src/core/lib/iomgr/tcp_client_custom.cc b/src/core/lib/iomgr/tcp_client_custom.cc index 932c79ea0b..9389861d07 100644 --- a/src/core/lib/iomgr/tcp_client_custom.cc +++ b/src/core/lib/iomgr/tcp_client_custom.cc @@ -140,12 +140,12 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep, socket, connect->addr_name); } - grpc_custom_socket_vtable->connect( - socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len, - custom_connect_callback); GRPC_CLOSURE_INIT(&connect->on_alarm, on_alarm, socket, grpc_schedule_on_exec_ctx); grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); + grpc_custom_socket_vtable->connect( + socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len, + custom_connect_callback); } grpc_tcp_client_vtable custom_tcp_client_vtable = {tcp_connect}; diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 900c056575..39da7f1637 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_CLIENT #include "src/core/lib/iomgr/tcp_client_posix.h" diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index b79ffe20f1..43d545846d 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/tcp_posix.h" @@ -70,7 +70,9 @@ struct grpc_tcp { grpc_endpoint base; grpc_fd* em_fd; int fd; - bool finished_edge; + /* Used by the endpoint read function to distinguish the very first read call + * from the rest */ + bool is_first_read; double target_length; double bytes_read_this_round; gpr_refcount refcount; @@ -377,7 +379,6 @@ static void tcp_do_read(grpc_tcp* tcp) { ssize_t read_bytes; size_t i; - GPR_ASSERT(!tcp->finished_edge); GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); for (i = 0; i < tcp->incoming_buffer->count; i++) { @@ -473,7 +474,6 @@ static void tcp_continue_read(grpc_tcp* tcp) { static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) { grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); - GPR_ASSERT(!tcp->finished_edge); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); } @@ -497,10 +497,17 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, grpc_slice_buffer_reset_and_unref_internal(incoming_buffer); grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); - if (tcp->finished_edge) { - tcp->finished_edge = false; + if (tcp->is_first_read) { + /* Endpoint read called for the very first time. Register read callback with + * the polling engine */ + tcp->is_first_read = false; notify_on_read(tcp); } else { + /* Not the first time. We may or may not have more bytes available. In any + * case call tcp->read_done_closure (i.e tcp_handle_read()) which does the + * right thing (i.e calls tcp_do_read() which either reads the available + * bytes or calls notify_on_read() to be notified when new bytes become + * available */ GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE); } } @@ -778,7 +785,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->min_read_chunk_size = tcp_min_read_chunk_size; tcp->max_read_chunk_size = tcp_max_read_chunk_size; tcp->bytes_read_this_round = 0; - tcp->finished_edge = true; + /* Will be set to false by the very first endpoint read function */ + tcp->is_first_read = true; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -811,4 +819,4 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, TCP_UNREF(tcp, "destroy"); } -#endif +#endif /* GRPC_POSIX_SOCKET_TCP */ diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 484d2b6077..0a5caca906 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -25,7 +25,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_SERVER #include "src/core/lib/iomgr/tcp_server.h" @@ -559,4 +559,4 @@ grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = { tcp_server_shutdown_starting_add, tcp_server_unref, tcp_server_shutdown_listeners}; -#endif +#endif /* GRPC_POSIX_SOCKET_TCP_SERVER */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 2d95aa66d6..73afa15e65 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON #include "src/core/lib/iomgr/tcp_server_utils_posix.h" @@ -217,4 +217,4 @@ error: return ret; } -#endif /* GRPC_POSIX_SOCKET */ +#endif /* GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON */ diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index 5ff10d3aee..7f534476df 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -28,7 +28,7 @@ #include "src/core/lib/iomgr/iomgr.h" typedef struct grpc_timer { - gpr_atm deadline; + grpc_millis deadline; uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ bool pending; struct grpc_timer* next; diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index de2256f7cb..4294162af7 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -34,6 +34,7 @@ #include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/time_averaged_stats.h" #include "src/core/lib/iomgr/timer_heap.h" @@ -59,9 +60,9 @@ typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; /* All and only timers with deadlines <= this will be in the heap. */ - gpr_atm queue_deadline_cap; + grpc_millis queue_deadline_cap; /* The deadline of the next timer due in this shard */ - gpr_atm min_deadline; + grpc_millis min_deadline; /* Index of this timer_shard in the g_shard_queue */ uint32_t shard_queue_index; /* This holds all timers with deadlines < queue_deadline_cap. Timers in this @@ -209,15 +210,23 @@ static void validate_non_pending_timer(grpc_timer* t) { #endif +#if GPR_ARCH_64 +/* NOTE: TODO(sreek) - Currently the thread local storage support in grpc is + for intptr_t which means on 32-bit machines it is not wide enough to hold + grpc_millis which is 64-bit. Adding thread local support for 64 bit values + is a lot of work for very little gain. So we are currently restricting this + optimization to only 64 bit machines */ + /* Thread local variable that stores the deadline of the next timer the thread * has last-seen. This is an optimization to prevent the thread from checking * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock, * an expensive operation) */ GPR_TLS_DECL(g_last_seen_min_timer); +#endif struct shared_mutables { /* The deadline of the next timer due across all timer shards */ - gpr_atm min_timer; + grpc_millis min_timer; /* Allow only one run_some_expired_timers at once */ gpr_spinlock checker_mu; bool initialized; @@ -227,18 +236,18 @@ struct shared_mutables { static struct shared_mutables g_shared_mutables; -static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { - if (a > GPR_ATM_MAX - b) { - return GPR_ATM_MAX; +static grpc_millis saturating_add(grpc_millis a, grpc_millis b) { + if (a > GRPC_MILLIS_INF_FUTURE - b) { + return GRPC_MILLIS_INF_FUTURE; } return a + b; } -static grpc_timer_check_result run_some_expired_timers(gpr_atm now, - gpr_atm* next, +static grpc_timer_check_result run_some_expired_timers(grpc_millis now, + grpc_millis* next, grpc_error* error); -static gpr_atm compute_min_deadline(timer_shard* shard) { +static grpc_millis compute_min_deadline(timer_shard* shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; @@ -257,8 +266,11 @@ static void timer_list_init() { g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER; gpr_mu_init(&g_shared_mutables.mu); g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now(); + +#if GPR_ARCH_64 gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); +#endif for (i = 0; i < g_num_shards; i++) { timer_shard* shard = &g_shards[i]; @@ -287,7 +299,11 @@ static void timer_list_shutdown() { grpc_timer_heap_destroy(&shard->heap); } gpr_mu_destroy(&g_shared_mutables.mu); + +#if GPR_ARCH_64 gpr_tls_destroy(&g_last_seen_min_timer); +#endif + gpr_free(g_shards); gpr_free(g_shard_queue); g_shared_mutables.initialized = false; @@ -346,7 +362,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline, #endif if (grpc_timer_trace.enabled()) { - gpr_log(GPR_INFO, "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]", + gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]", timer, deadline, grpc_core::ExecCtx::Get()->Now(), closure, closure->cb); } @@ -383,7 +399,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline, } if (grpc_timer_trace.enabled()) { gpr_log(GPR_INFO, - " .. add to shard %d with queue_deadline_cap=%" PRIdPTR + " .. add to shard %d with queue_deadline_cap=%" PRId64 " => is_first_timer=%s", static_cast<int>(shard - g_shards), shard->queue_deadline_cap, is_first_timer ? "true" : "false"); @@ -404,15 +420,27 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline, if (is_first_timer) { gpr_mu_lock(&g_shared_mutables.mu); if (grpc_timer_trace.enabled()) { - gpr_log(GPR_INFO, " .. old shard min_deadline=%" PRIdPTR, + gpr_log(GPR_INFO, " .. old shard min_deadline=%" PRId64, shard->min_deadline); } if (deadline < shard->min_deadline) { - gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline; + grpc_millis old_min_deadline = g_shard_queue[0]->min_deadline; shard->min_deadline = deadline; note_deadline_change(shard); if (shard->shard_queue_index == 0 && deadline < old_min_deadline) { - gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline); +#if GPR_ARCH_64 + // TODO: sreek - Using c-style cast here. static_cast<> gives an error + // (on mac platforms complaining that gpr_atm* is (long *) while + // (&g_shared_mutables.min_timer) is a (long long *). The cast should be + // safe since we know that both are pointer types and 64-bit wide. + gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer), + deadline); +#else + // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit + // types (like grpc_millis). So all reads and writes to + // g_shared_mutables.min_timer varialbe under g_shared_mutables.mu + g_shared_mutables.min_timer = deadline; +#endif grpc_kick_poller(); } } @@ -421,8 +449,10 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline, } static void timer_consume_kick(void) { - /* force re-evaluation of last seeen min */ +#if GPR_ARCH_64 + /* Force re-evaluation of last seen min */ gpr_tls_set(&g_last_seen_min_timer, 0); +#endif } static void timer_cancel(grpc_timer* timer) { @@ -459,7 +489,7 @@ static void timer_cancel(grpc_timer* timer) { 'queue_deadline_cap') into into shard->heap. Returns 'true' if shard->heap has atleast ONE element REQUIRES: shard->mu locked */ -static int refill_heap(timer_shard* shard, gpr_atm now) { +static int refill_heap(timer_shard* shard, grpc_millis now) { /* Compute the new queue window width and bound by the limits: */ double computed_deadline_delta = grpc_time_averaged_stats_update_average(&shard->stats) * @@ -472,10 +502,10 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { /* Compute the new cap and put all timers under it into the queue: */ shard->queue_deadline_cap = saturating_add(GPR_MAX(now, shard->queue_deadline_cap), - static_cast<gpr_atm>(deadline_delta * 1000.0)); + static_cast<grpc_millis>(deadline_delta * 1000.0)); if (grpc_timer_check_trace.enabled()) { - gpr_log(GPR_INFO, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR, + gpr_log(GPR_INFO, " .. shard[%d]->queue_deadline_cap --> %" PRId64, static_cast<int>(shard - g_shards), shard->queue_deadline_cap); } for (timer = shard->list.next; timer != &shard->list; timer = next) { @@ -483,7 +513,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { if (timer->deadline < shard->queue_deadline_cap) { if (grpc_timer_check_trace.enabled()) { - gpr_log(GPR_INFO, " .. add timer with deadline %" PRIdPTR " to heap", + gpr_log(GPR_INFO, " .. add timer with deadline %" PRId64 " to heap", timer->deadline); } list_remove(timer); @@ -496,7 +526,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) { /* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) { +static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) { grpc_timer* timer; for (;;) { if (grpc_timer_check_trace.enabled()) { @@ -511,12 +541,12 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) { timer = grpc_timer_heap_top(&shard->heap); if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_INFO, - " .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR, + " .. check top timer deadline=%" PRId64 " now=%" PRId64, timer->deadline, now); } if (timer->deadline > now) return nullptr; if (grpc_timer_trace.enabled()) { - gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler", + gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late via %s scheduler", timer, now - timer->deadline, timer->closure->scheduler->vtable->name); } @@ -527,8 +557,8 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) { } /* REQUIRES: shard->mu unlocked */ -static size_t pop_timers(timer_shard* shard, gpr_atm now, - gpr_atm* new_min_deadline, grpc_error* error) { +static size_t pop_timers(timer_shard* shard, grpc_millis now, + grpc_millis* new_min_deadline, grpc_error* error) { size_t n = 0; grpc_timer* timer; gpr_mu_lock(&shard->mu); @@ -546,13 +576,27 @@ static size_t pop_timers(timer_shard* shard, gpr_atm now, return n; } -static grpc_timer_check_result run_some_expired_timers(gpr_atm now, - gpr_atm* next, +static grpc_timer_check_result run_some_expired_timers(grpc_millis now, + grpc_millis* next, grpc_error* error) { grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED; - gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer); +#if GPR_ARCH_64 + // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on + // mac platforms complaining that gpr_atm* is (long *) while + // (&g_shared_mutables.min_timer) is a (long long *). The cast should be + // safe since we know that both are pointer types and 64-bit wide + grpc_millis min_timer = static_cast<grpc_millis>( + gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer))); gpr_tls_set(&g_last_seen_min_timer, min_timer); +#else + // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types + // (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer + // are done under g_shared_mutables.mu + gpr_mu_lock(&g_shared_mutables.mu); + grpc_millis min_timer = g_shared_mutables.min_timer; + gpr_mu_unlock(&g_shared_mutables.mu); +#endif if (now < min_timer) { if (next != nullptr) *next = GPR_MIN(*next, min_timer); return GRPC_TIMERS_CHECKED_AND_EMPTY; @@ -563,14 +607,15 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now, result = GRPC_TIMERS_CHECKED_AND_EMPTY; if (grpc_timer_check_trace.enabled()) { - gpr_log(GPR_INFO, " .. shard[%d]->min_deadline = %" PRIdPTR, + gpr_log(GPR_INFO, " .. shard[%d]->min_deadline = %" PRId64, static_cast<int>(g_shard_queue[0] - g_shards), g_shard_queue[0]->min_deadline); } while (g_shard_queue[0]->min_deadline < now || - (now != GPR_ATM_MAX && g_shard_queue[0]->min_deadline == now)) { - gpr_atm new_min_deadline; + (now != GRPC_MILLIS_INF_FUTURE && + g_shard_queue[0]->min_deadline == now)) { + grpc_millis new_min_deadline; /* For efficiency, we pop as many available timers as we can from the shard. This may violate perfect timer deadline ordering, but that @@ -582,8 +627,8 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now, if (grpc_timer_check_trace.enabled()) { gpr_log(GPR_INFO, " .. result --> %d" - ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR - ", now=%" PRIdPTR, + ", shard[%d]->min_deadline %" PRId64 " --> %" PRId64 + ", now=%" PRId64, result, static_cast<int>(g_shard_queue[0] - g_shards), g_shard_queue[0]->min_deadline, new_min_deadline, now); } @@ -601,8 +646,19 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now, *next = GPR_MIN(*next, g_shard_queue[0]->min_deadline); } - gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, +#if GPR_ARCH_64 + // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on + // mac platforms complaining that gpr_atm* is (long *) while + // (&g_shared_mutables.min_timer) is a (long long *). The cast should be + // safe since we know that both are pointer types and 64-bit wide + gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer), g_shard_queue[0]->min_deadline); +#else + // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit + // types (like grpc_millis). So all reads and writes to + // g_shared_mutables.min_timer are done under g_shared_mutables.mu + g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline; +#endif gpr_mu_unlock(&g_shared_mutables.mu); gpr_spinlock_unlock(&g_shared_mutables.checker_mu); } @@ -616,17 +672,28 @@ static grpc_timer_check_result timer_check(grpc_millis* next) { // prelude grpc_millis now = grpc_core::ExecCtx::Get()->Now(); +#if GPR_ARCH_64 /* fetch from a thread-local first: this avoids contention on a globally mutable cacheline in the common case */ grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer); +#else + // On 32-bit systems, we currently do not have thread local support for 64-bit + // types. In this case, directly read from g_shared_mutables.min_timer. + // Also, note that on 32-bit systems, gpr_atm_no_barrier_store does not work + // on 64-bit types (like grpc_millis). So all reads and writes to + // g_shared_mutables.min_timer are done under g_shared_mutables.mu + gpr_mu_lock(&g_shared_mutables.mu); + grpc_millis min_timer = g_shared_mutables.min_timer; + gpr_mu_unlock(&g_shared_mutables.mu); +#endif + if (now < min_timer) { if (next != nullptr) { *next = GPR_MIN(*next, min_timer); } if (grpc_timer_check_trace.enabled()) { - gpr_log(GPR_INFO, - "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now, - min_timer); + gpr_log(GPR_INFO, "TIMER CHECK SKIP: now=%" PRId64 " min_timer=%" PRId64, + now, min_timer); } return GRPC_TIMERS_CHECKED_AND_EMPTY; } @@ -642,13 +709,18 @@ static grpc_timer_check_result timer_check(grpc_millis* next) { if (next == nullptr) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRIdPTR, *next); + gpr_asprintf(&next_str, "%" PRId64, *next); } +#if GPR_ARCH_64 gpr_log(GPR_INFO, - "TIMER CHECK BEGIN: now=%" PRIdPTR " next=%s tls_min=%" PRIdPTR + "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64 " glob_min=%" PRIdPTR, - now, next_str, gpr_tls_get(&g_last_seen_min_timer), - gpr_atm_no_barrier_load(&g_shared_mutables.min_timer)); + now, next_str, min_timer, + gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer))); +#else + gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64, + now, next_str, min_timer); +#endif gpr_free(next_str); } // actual code @@ -660,7 +732,7 @@ static grpc_timer_check_result timer_check(grpc_millis* next) { if (next == nullptr) { next_str = gpr_strdup("NULL"); } else { - gpr_asprintf(&next_str, "%" PRIdPTR, *next); + gpr_asprintf(&next_str, "%" PRId64, *next); } gpr_log(GPR_INFO, "TIMER CHECK END: r=%d; next=%s", r, next_str); gpr_free(next_str); diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc index 35e7914568..9fdae17909 100644 --- a/src/core/lib/iomgr/timer_manager.cc +++ b/src/core/lib/iomgr/timer_manager.cc @@ -172,7 +172,7 @@ static bool wait_until(grpc_millis next) { if (grpc_timer_check_trace.enabled()) { grpc_millis wait_time = next - grpc_core::ExecCtx::Get()->Now(); - gpr_log(GPR_INFO, "sleep for a %" PRIdPTR " milliseconds", wait_time); + gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds", wait_time); } } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline next = GRPC_MILLIS_INF_FUTURE; diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index 2129029737..43dd68e874 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -219,9 +219,11 @@ static void on_oauth2_token_fetcher_http_response(void* user_data, gpr_mu_lock(&c->mu); c->token_fetch_pending = false; c->access_token_md = GRPC_MDELEM_REF(access_token_md); - c->token_expiration = status == GRPC_CREDENTIALS_OK - ? grpc_core::ExecCtx::Get()->Now() + token_lifetime - : 0; + c->token_expiration = + status == GRPC_CREDENTIALS_OK + ? gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(token_lifetime, GPR_TIMESPAN)) + : gpr_inf_past(GPR_CLOCK_MONOTONIC); grpc_oauth2_pending_get_request_metadata* pending_request = c->pending_requests; c->pending_requests = nullptr; @@ -259,8 +261,10 @@ static bool oauth2_token_fetcher_get_request_metadata( grpc_mdelem cached_access_token_md = GRPC_MDNULL; gpr_mu_lock(&c->mu); if (!GRPC_MDISNULL(c->access_token_md) && - (c->token_expiration - grpc_core::ExecCtx::Get()->Now() > - refresh_threshold)) { + gpr_time_cmp( + gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_MONOTONIC)), + gpr_time_from_seconds(GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, + GPR_TIMESPAN)) > 0) { cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md); } if (!GRPC_MDISNULL(cached_access_token_md)) { @@ -333,7 +337,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials* c, c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2; gpr_ref_init(&c->base.refcount, 1); gpr_mu_init(&c->mu); - c->token_expiration = 0; + c->token_expiration = gpr_inf_past(GPR_CLOCK_MONOTONIC); c->fetch_func = fetch_func; c->pollent = grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create()); diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h index c0dd1546e3..12a1d4484f 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h @@ -71,7 +71,7 @@ typedef struct { grpc_call_credentials base; gpr_mu mu; grpc_mdelem access_token_md; - grpc_millis token_expiration; + gpr_timespec token_expiration; bool token_fetch_pending; grpc_oauth2_pending_get_request_metadata* pending_requests; grpc_httpcli_context httpcli_context; diff --git a/src/core/lib/security/util/json_util.cc b/src/core/lib/security/util/json_util.cc index 75512a19c9..fe9f5fe3d3 100644 --- a/src/core/lib/security/util/json_util.cc +++ b/src/core/lib/security/util/json_util.cc @@ -29,6 +29,10 @@ const char* grpc_json_get_string_property(const grpc_json* json, const char* prop_name) { grpc_json* child; for (child = json->child; child != nullptr; child = child->next) { + if (child->key == nullptr) { + gpr_log(GPR_ERROR, "Invalid (null) JSON key encountered"); + return nullptr; + } if (strcmp(child->key, prop_name) == 0) break; } if (child == nullptr || child->type != GRPC_JSON_STRING) { diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index fd56997388..1f1c08b159 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -333,14 +333,26 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n, size_t slice_len = GRPC_SLICE_LENGTH(slice); if (slice_len > n) { sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n); - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } return; } else if (slice_len == n) { - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } sb->count = idx; return; } else { - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } n -= slice_len; sb->count = idx; } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index ba7cb92cdc..d44846cd12 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -233,6 +233,7 @@ struct grpc_call { grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; + grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; grpc_closure release_call; @@ -516,7 +517,6 @@ static void release_call(void* call, grpc_error* error) { grpc_call* c = static_cast<grpc_call*>(call); grpc_channel* channel = c->channel; grpc_call_combiner_destroy(&c->call_combiner); - gpr_free((char*)c->peer_string); grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(channel, "call"); } @@ -1132,7 +1132,7 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { return !(flags & invalid_positions); } -static int batch_slot_for_op(grpc_op_type type) { +static size_t batch_slot_for_op(grpc_op_type type) { switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: return 0; @@ -1155,17 +1155,20 @@ static int batch_slot_for_op(grpc_op_type type) { static batch_control* reuse_or_allocate_batch_control(grpc_call* call, const grpc_op* ops, size_t num_ops) { - int slot = batch_slot_for_op(ops[0].op); - batch_control** pslot = &call->active_batches[slot]; - if (*pslot == nullptr) { - *pslot = static_cast<batch_control*>( + size_t slot_idx = batch_slot_for_op(ops[0].op); + batch_control** pslot = &call->active_batches[slot_idx]; + batch_control* bctl; + if (*pslot != nullptr) { + bctl = *pslot; + if (bctl->call != nullptr) { + return nullptr; + } + memset(bctl, 0, sizeof(*bctl)); + } else { + bctl = static_cast<batch_control*>( gpr_arena_alloc(call->arena, sizeof(batch_control))); + *pslot = bctl; } - batch_control* bctl = *pslot; - if (bctl->call != nullptr) { - return nullptr; - } - memset(bctl, 0, sizeof(*bctl)); bctl->call = call; bctl->op.payload = &call->stream_op_payload; return bctl; @@ -1207,7 +1210,6 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( - &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->op.send_message) { @@ -1215,14 +1217,9 @@ static void post_batch_completion(batch_control* bctl) { } if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( - &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->op.recv_trailing_metadata) { - grpc_metadata_batch* md = - &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md); - /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); parent_call* pc = get_parent_call(call); @@ -1244,7 +1241,6 @@ static void post_batch_completion(batch_control* bctl) { } gpr_mu_unlock(&pc->child_list_mu); } - if (call->is_client) { get_final_status(call, set_status_value_directly, call->final_op.client.status, @@ -1254,7 +1250,6 @@ static void post_batch_completion(batch_control* bctl) { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, nullptr, nullptr); } - GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1536,6 +1531,19 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { finish_batch_step(bctl); } +static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { + batch_control* bctl = static_cast<batch_control*>(bctlp); + grpc_call* call = bctl->call; + GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); + add_batch_error(bctl, GRPC_ERROR_REF(error), false); + if (error == GRPC_ERROR_NONE) { + grpc_metadata_batch* md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + recv_trailing_filter(call, md); + } + finish_batch_step(bctl); +} + static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast<batch_control*>(bctlp); grpc_call* call = bctl->call; @@ -1556,7 +1564,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, size_t i; const grpc_op* op; batch_control* bctl; - int num_completion_callbacks_needed = 1; + bool has_send_ops = false; + int num_recv_ops = 0; grpc_call_error error = GRPC_CALL_OK; grpc_transport_stream_op_batch* stream_op; grpc_transport_stream_op_batch_payload* stream_op_payload; @@ -1662,6 +1671,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->send_initial_metadata.peer_string = &call->peer_string; } + has_send_ops = true; break; } case GRPC_OP_SEND_MESSAGE: { @@ -1691,6 +1701,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, &op->data.send_message.send_message->data.raw.slice_buffer, flags); stream_op_payload->send_message.send_message.reset( call->sending_stream.get()); + has_send_ops = true; break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { @@ -1711,6 +1722,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->sent_final_op = true; stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; + has_send_ops = true; break; } case GRPC_OP_SEND_STATUS_FROM_SERVER: { @@ -1775,6 +1787,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, } stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; + has_send_ops = true; break; } case GRPC_OP_RECV_INITIAL_METADATA: { @@ -1802,7 +1815,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->recv_initial_metadata.peer_string = &call->peer_string; } - num_completion_callbacks_needed++; + ++num_recv_ops; break; } case GRPC_OP_RECV_MESSAGE: { @@ -1824,7 +1837,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; - num_completion_callbacks_needed++; + ++num_recv_ops; break; } case GRPC_OP_RECV_STATUS_ON_CLIENT: { @@ -1850,11 +1863,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.client.error_string = op->data.recv_status_on_client.error_string; stream_op->recv_trailing_metadata = true; - stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->collect_stats.collect_stats = + stream_op_payload->recv_trailing_metadata.collect_stats = &call->final_info.stats.transport_stream_stats; + GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, + receiving_trailing_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->receiving_trailing_metadata_ready; + ++num_recv_ops; break; } case GRPC_OP_RECV_CLOSE_ON_SERVER: { @@ -1875,11 +1893,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; stream_op->recv_trailing_metadata = true; - stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->collect_stats.collect_stats = + stream_op_payload->recv_trailing_metadata.collect_stats = &call->final_info.stats.transport_stream_stats; + GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, + receiving_trailing_metadata_ready, bctl, + grpc_schedule_on_exec_ctx); + stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->receiving_trailing_metadata_ready; + ++num_recv_ops; break; } } @@ -1889,13 +1912,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } - gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); + gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops); - GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, - grpc_schedule_on_exec_ctx); - stream_op->on_complete = &bctl->finish_batch; - gpr_atm_rel_store(&call->any_ops_sent_atm, 1); + if (has_send_ops) { + GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); + stream_op->on_complete = &bctl->finish_batch; + } + gpr_atm_rel_store(&call->any_ops_sent_atm, 1); execute_batch(call, stream_op, &bctl->start_batch); done: diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 10bc5900a1..a466b325be 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -72,10 +72,6 @@ struct grpc_channel { }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack*)((c) + 1)) -#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ - (((grpc_channel*)(channel_stack)) - 1) -#define CHANNEL_FROM_TOP_ELEM(top_elem) \ - CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) static void destroy_channel(void* arg, grpc_error* error); diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index f751741712..7da9e6b74c 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -321,8 +321,7 @@ static const cq_vtable g_cq_vtable[] = { #define POLLSET_FROM_CQ(cq) \ ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq))) -grpc_core::TraceFlag grpc_cq_pluck_trace(true, "queue_pluck"); -grpc_core::TraceFlag grpc_cq_event_timeout_trace(true, "queue_timeout"); +grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck"); #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ if (grpc_api_trace.enabled() && (grpc_cq_pluck_trace.enabled() || \ diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index c9dc2d93c1..84446a4d92 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -30,7 +30,6 @@ /* These trace flags default to 1. The corresponding lines are only traced if grpc_api_trace is also truthy */ extern grpc_core::TraceFlag grpc_cq_pluck_trace; -extern grpc_core::TraceFlag grpc_cq_event_timeout_trace; extern grpc_core::TraceFlag grpc_trace_operation_failures; extern grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags; extern grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount; diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 6b41e4b37e..cbdb77c844 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -184,7 +184,8 @@ void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, nullptr) { transport->vtable->set_pollset_set(transport, stream, pollset_set); } else { - abort(); + // No-op for empty pollset. Empty pollset is possible when using + // non-fd-based event engines such as CFStream. } } @@ -211,21 +212,32 @@ void grpc_transport_stream_op_batch_finish_with_failure( if (batch->send_message) { batch->payload->send_message.send_message.reset(); } - if (batch->recv_message) { - GRPC_CALL_COMBINER_START( - call_combiner, batch->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error), "failing recv_message_ready"); + if (batch->cancel_stream) { + GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); } + // Construct a list of closures to execute. + grpc_core::CallCombinerClosureList closures; if (batch->recv_initial_metadata) { - GRPC_CALL_COMBINER_START( - call_combiner, + closures.Add( batch->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); } - GRPC_CLOSURE_SCHED(batch->on_complete, error); - if (batch->cancel_stream) { - GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); + if (batch->recv_message) { + closures.Add(batch->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error), "failing recv_message_ready"); + } + if (batch->recv_trailing_metadata) { + closures.Add( + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready, + GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready"); + } + if (batch->on_complete != nullptr) { + closures.Add(batch->on_complete, GRPC_ERROR_REF(error), + "failing on_complete"); } + // Execute closures. + closures.RunClosures(call_combiner); + GRPC_ERROR_UNREF(error); } typedef struct { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 10e9df0f7c..585b9dfae9 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -122,9 +122,15 @@ typedef struct grpc_transport_stream_op_batch_payload /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op_batch { - /** Should be enqueued when all requested operations (excluding recv_message - and recv_initial_metadata which have their own closures) in a given batch - have been completed. */ + /** Should be scheduled when all of the non-recv operations in the batch + are complete. + + The recv ops (recv_initial_metadata, recv_message, and + recv_trailing_metadata) each have their own callbacks. If a batch + contains both recv ops and non-recv ops, on_complete should be + scheduled as soon as the non-recv ops are complete, regardless of + whether or not the recv ops are complete. If a batch contains + only recv ops, on_complete can be null. */ grpc_closure* on_complete; /** Values for the stream op (fields set are determined by flags above) */ @@ -149,9 +155,6 @@ typedef struct grpc_transport_stream_op_batch { */ bool recv_trailing_metadata : 1; - /** Collect any stats into provided buffer, zero internal stat counters */ - bool collect_stats : 1; - /** Cancel this stream with the provided error */ bool cancel_stream : 1; @@ -168,13 +171,11 @@ struct grpc_transport_stream_op_batch_payload { /** Iff send_initial_metadata != NULL, flags associated with send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */ uint32_t send_initial_metadata_flags; - // If non-NULL, will be set by the transport to the peer string - // (a char*, which the caller takes ownership of). + // If non-NULL, will be set by the transport to the peer string (a char*). + // The transport retains ownership of the string. // Note: This pointer may be used by the transport after the // send_initial_metadata op is completed. It must remain valid // until the call is destroyed. - // Note: When a transport sets this, it must free the previous - // value, if any. gpr_atm* peer_string; } send_initial_metadata; @@ -202,13 +203,11 @@ struct grpc_transport_stream_op_batch_payload { // immediately available. This may be a signal that we received a // Trailers-Only response. bool* trailing_metadata_available; - // If non-NULL, will be set by the transport to the peer string - // (a char*, which the caller takes ownership of). + // If non-NULL, will be set by the transport to the peer string (a char*). + // The transport retains ownership of the string. // Note: This pointer may be used by the transport after the // recv_initial_metadata op is completed. It must remain valid // until the call is destroyed. - // Note: When a transport sets this, it must free the previous - // value, if any. gpr_atm* peer_string; } recv_initial_metadata; @@ -223,11 +222,10 @@ struct grpc_transport_stream_op_batch_payload { struct { grpc_metadata_batch* recv_trailing_metadata; - } recv_trailing_metadata; - - struct { grpc_transport_stream_stats* collect_stats; - } collect_stats; + /** Should be enqueued when initial metadata is ready to be processed. */ + grpc_closure* recv_trailing_metadata_ready; + } recv_trailing_metadata; /** Forcefully close this stream. The HTTP2 semantics should be: diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index 99af7c1931..8c7db642a5 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -52,7 +52,7 @@ static void put_metadata_list(gpr_strvec* b, grpc_metadata_batch md) { } if (md.deadline != GRPC_MILLIS_INF_FUTURE) { char* tmp; - gpr_asprintf(&tmp, " deadline=%" PRIdPTR, md.deadline); + gpr_asprintf(&tmp, " deadline=%" PRId64, md.deadline); gpr_strvec_add(b, tmp); } } @@ -120,13 +120,6 @@ char* grpc_transport_stream_op_batch_string( gpr_strvec_add(&b, tmp); } - if (op->collect_stats) { - gpr_strvec_add(&b, gpr_strdup(" ")); - gpr_asprintf(&tmp, "COLLECT_STATS:%p", - op->payload->collect_stats.collect_stats); - gpr_strvec_add(&b, tmp); - } - out = gpr_strvec_flatten(&b, nullptr); gpr_strvec_destroy(&b); |