aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channel_stack.h2
-rw-r--r--src/core/lib/channel/connected_channel.cc9
-rw-r--r--src/core/lib/debug/trace.h20
-rw-r--r--src/core/lib/gprpp/memory.h1
-rw-r--r--src/core/lib/iomgr/call_combiner.h80
-rw-r--r--src/core/lib/iomgr/cfstream_handle.cc183
-rw-r--r--src/core/lib/iomgr/cfstream_handle.h80
-rw-r--r--src/core/lib/iomgr/closure.h5
-rw-r--r--src/core/lib/iomgr/combiner.cc2
-rw-r--r--src/core/lib/iomgr/combiner.h2
-rw-r--r--src/core/lib/iomgr/endpoint_cfstream.cc372
-rw-r--r--src/core/lib/iomgr/endpoint_cfstream.h49
-rw-r--r--src/core/lib/iomgr/error_cfstream.cc52
-rw-r--r--src/core/lib/iomgr/error_cfstream.h31
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc4
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc6
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_posix.cc8
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc18
-rw-r--r--src/core/lib/iomgr/exec_ctx.h6
-rw-r--r--src/core/lib/iomgr/iomgr_posix.cc4
-rw-r--r--src/core/lib/iomgr/polling_entity.cc13
-rw-r--r--src/core/lib/iomgr/pollset_custom.cc4
-rw-r--r--src/core/lib/iomgr/port.h39
-rw-r--r--src/core/lib/iomgr/resolve_address.h2
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.cc2
-rw-r--r--src/core/lib/iomgr/sockaddr_posix.h2
-rw-r--r--src/core/lib/iomgr/socket_factory_posix.cc2
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc216
-rw-r--r--src/core/lib/iomgr/tcp_client_custom.cc6
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc24
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc4
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc4
-rw-r--r--src/core/lib/iomgr/timer.h2
-rw-r--r--src/core/lib/iomgr/timer_generic.cc154
-rw-r--r--src/core/lib/iomgr/timer_manager.cc2
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.cc16
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.h2
-rw-r--r--src/core/lib/security/util/json_util.cc4
-rw-r--r--src/core/lib/slice/slice_buffer.cc18
-rw-r--r--src/core/lib/surface/call.cc87
-rw-r--r--src/core/lib/surface/channel.cc4
-rw-r--r--src/core/lib/surface/completion_queue.cc3
-rw-r--r--src/core/lib/surface/completion_queue.h1
-rw-r--r--src/core/lib/transport/transport.cc32
-rw-r--r--src/core/lib/transport/transport.h34
-rw-r--r--src/core/lib/transport/transport_op_string.cc9
50 files changed, 1436 insertions, 194 deletions
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 4bf8218664..7581f937b6 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -138,7 +138,7 @@ typedef struct {
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining.
- Implementations may assume that elem->call_data is all zeros. */
+ Implementations may assume that elem->channel_data is all zeros. */
grpc_error* (*init_channel_elem)(grpc_channel_element* elem,
grpc_channel_element_args* args);
/* Destroy per channel data.
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index ddd3029402..e2ea334ded 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -51,6 +51,7 @@ typedef struct connected_channel_call_data {
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
+ callback_state recv_trailing_metadata_ready;
} call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) {
@@ -111,6 +112,12 @@ static void con_start_transport_stream_op_batch(
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
+ if (batch->recv_trailing_metadata) {
+ callback_state* state = &calld->recv_trailing_metadata_ready;
+ intercept_callback(
+ calld, state, false, "recv_trailing_metadata_ready",
+ &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
+ }
if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into
@@ -121,7 +128,7 @@ static void con_start_transport_stream_op_batch(
static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
- } else {
+ } else if (batch->on_complete != nullptr) {
callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}
diff --git a/src/core/lib/debug/trace.h b/src/core/lib/debug/trace.h
index 28157c6383..fe6301a3fc 100644
--- a/src/core/lib/debug/trace.h
+++ b/src/core/lib/debug/trace.h
@@ -57,14 +57,14 @@ class TraceFlag {
const char* name() const { return name_; }
-// This following define may be commented out to ensure that the compiler
-// deletes any "if (tracer.enabled()) {...}" codeblocks. This is useful to
-// test the performance impact tracers have on the system.
-//
-// #define COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD
-#ifdef COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD
- bool enabled() { return false; }
-#else
+// Use the symbol GRPC_USE_TRACERS to determine if tracers will be enabled in
+// opt builds (tracers are always on in dbg builds). The default in OSS is for
+// tracers to be on since we support binary distributions of gRPC for the
+// wrapped language (wr don't want to force recompilation to get tracing).
+// Internally, however, for performance reasons, we compile them out by
+// default, since internal build systems make recompiling trivial.
+#define GRPC_USE_TRACERS // tracers on by default in OSS
+#if defined(GRPC_USE_TRACERS) || !defined(NDEBUG)
bool enabled() {
#ifdef GRPC_THREADSAFE_TRACER
return gpr_atm_no_barrier_load(&value_) != 0;
@@ -72,7 +72,9 @@ class TraceFlag {
return value_;
#endif // GRPC_THREADSAFE_TRACER
}
-#endif // COMPILE_OUT_ALL_TRACERS_IN_OPT_BUILD
+#else
+ bool enabled() { return false; }
+#endif /* defined(GRPC_USE_TRACERS) || !defined(NDEBUG) */
private:
friend void grpc_core::testing::grpc_tracer_enable_flag(TraceFlag* flag);
diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h
index 1354109bf3..28fcdf1779 100644
--- a/src/core/lib/gprpp/memory.h
+++ b/src/core/lib/gprpp/memory.h
@@ -55,6 +55,7 @@ inline T* New(Args&&... args) {
// Alternative to delete, since we cannot use it (for fear of libstdc++)
template <typename T>
inline void Delete(T* p) {
+ if (p == nullptr) return;
p->~T();
if (alignof(T) > kAlignmentForDefaultAllocationInBytes) {
gpr_free_aligned(p);
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
index 0ccd08ea57..f9ce29f231 100644
--- a/src/core/lib/iomgr/call_combiner.h
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -26,6 +26,7 @@
#include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/closure.h"
// A simple, lock-free mechanism for serializing activity related to a
@@ -109,4 +110,83 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error);
+namespace grpc_core {
+
+// Helper for running a list of closures in a call combiner.
+//
+// Each callback running in the call combiner will eventually be
+// returned to the surface, at which point the surface will yield the
+// call combiner. So when we are running in the call combiner and have
+// more than one callback to return to the surface, we need to re-enter
+// the call combiner for all but one of those callbacks.
+class CallCombinerClosureList {
+ public:
+ CallCombinerClosureList() {}
+
+ // Adds a closure to the list. The closure must eventually result in
+ // the call combiner being yielded.
+ void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
+ closures_.emplace_back(closure, error, reason);
+ }
+
+ // Runs all closures in the call combiner and yields the call combiner.
+ //
+ // All but one of the closures in the list will be scheduled via
+ // GRPC_CALL_COMBINER_START(), and the remaining closure will be
+ // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
+ // yielding the call combiner. If the list is empty, then the call
+ // combiner will be yielded immediately.
+ void RunClosures(grpc_call_combiner* call_combiner) {
+ for (size_t i = 1; i < closures_.size(); ++i) {
+ auto& closure = closures_[i];
+ GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+ closure.reason);
+ }
+ if (closures_.size() > 0) {
+ if (grpc_call_combiner_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "CallCombinerClosureList executing closure while already "
+ "holding call_combiner %p: closure=%p error=%s reason=%s",
+ call_combiner, closures_[0].closure,
+ grpc_error_string(closures_[0].error), closures_[0].reason);
+ }
+ // This will release the call combiner.
+ GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
+ } else {
+ GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
+ }
+ closures_.clear();
+ }
+
+ // Runs all closures in the call combiner, but does NOT yield the call
+ // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
+ void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
+ for (size_t i = 0; i < closures_.size(); ++i) {
+ auto& closure = closures_[i];
+ GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
+ closure.reason);
+ }
+ closures_.clear();
+ }
+
+ size_t size() const { return closures_.size(); }
+
+ private:
+ struct CallCombinerClosure {
+ grpc_closure* closure;
+ grpc_error* error;
+ const char* reason;
+
+ CallCombinerClosure(grpc_closure* closure, grpc_error* error,
+ const char* reason)
+ : closure(closure), error(error), reason(reason) {}
+ };
+
+ // There are generally a maximum of 6 closures to run in the call
+ // combiner, one for each pending op.
+ InlinedVector<CallCombinerClosure, 6> closures_;
+};
+
+} // namespace grpc_core
+
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc
new file mode 100644
index 0000000000..30f4e65632
--- /dev/null
+++ b/src/core/lib/iomgr/cfstream_handle.cc
@@ -0,0 +1,183 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM
+#import <CoreFoundation/CoreFoundation.h>
+#import "src/core/lib/iomgr/cfstream_handle.h"
+
+#include <grpc/support/atm.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+void* CFStreamHandle::Retain(void* info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
+ CFSTREAM_HANDLE_REF(handle, "retain");
+ return info;
+}
+
+void CFStreamHandle::Release(void* info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
+ CFSTREAM_HANDLE_UNREF(handle, "release");
+}
+
+CFStreamHandle* CFStreamHandle::CreateStreamHandle(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
+ return new CFStreamHandle(read_stream, write_stream);
+}
+
+void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
+ CFStreamEventType type,
+ void* client_callback_info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
+ CFSTREAM_HANDLE_REF(handle, "read callback");
+ dispatch_async(
+ dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
+ stream, type, client_callback_info);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ handle->open_event_.SetReady();
+ break;
+ case kCFStreamEventHasBytesAvailable:
+ case kCFStreamEventEndEncountered:
+ handle->read_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ handle->open_event_.SetReady();
+ handle->read_event_.SetReady();
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+ CFSTREAM_HANDLE_UNREF(handle, "read callback");
+ });
+}
+void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
+ CFStreamEventType type,
+ void* clientCallBackInfo) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
+ CFSTREAM_HANDLE_REF(handle, "write callback");
+ dispatch_async(
+ dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
+ stream, type, clientCallBackInfo);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ handle->open_event_.SetReady();
+ break;
+ case kCFStreamEventCanAcceptBytes:
+ case kCFStreamEventEndEncountered:
+ handle->write_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ handle->open_event_.SetReady();
+ handle->write_event_.SetReady();
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(return );
+ }
+ CFSTREAM_HANDLE_UNREF(handle, "write callback");
+ });
+}
+
+CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream) {
+ gpr_ref_init(&refcount_, 1);
+ open_event_.InitEvent();
+ read_event_.InitEvent();
+ write_event_.InitEvent();
+ CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil};
+ CFReadStreamSetClient(
+ read_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamHandle::ReadCallback, &ctx);
+ CFWriteStreamSetClient(
+ write_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamHandle::WriteCallback, &ctx);
+ CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+ CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+}
+
+CFStreamHandle::~CFStreamHandle() {
+ open_event_.DestroyEvent();
+ read_event_.DestroyEvent();
+ write_event_.DestroyEvent();
+}
+
+void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
+ open_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
+ read_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
+ write_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::Shutdown(grpc_error* error) {
+ open_event_.SetShutdown(GRPC_ERROR_REF(error));
+ read_event_.SetShutdown(GRPC_ERROR_REF(error));
+ write_event_.SetShutdown(GRPC_ERROR_REF(error));
+ GRPC_ERROR_UNREF(error);
+}
+
+void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
+ reason, val, val + 1);
+ }
+ gpr_ref(&refcount_);
+}
+
+void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
+ gpr_log(GPR_ERROR,
+ "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
+ reason, val, val - 1);
+ }
+ if (gpr_unref(&refcount_)) {
+ delete this;
+ }
+}
+
+#endif
diff --git a/src/core/lib/iomgr/cfstream_handle.h b/src/core/lib/iomgr/cfstream_handle.h
new file mode 100644
index 0000000000..4258e72431
--- /dev/null
+++ b/src/core/lib/iomgr/cfstream_handle.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/* The CFStream handle acts as an event synchronization entity for
+ * read/write/open/error/eos events happening on CFStream streams. */
+
+#ifndef GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H
+#define GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM
+#import <CoreFoundation/CoreFoundation.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/lockfree_event.h"
+
+class CFStreamHandle final {
+ public:
+ static CFStreamHandle* CreateStreamHandle(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream);
+ ~CFStreamHandle();
+ CFStreamHandle(const CFReadStreamRef& ref) = delete;
+ CFStreamHandle(CFReadStreamRef&& ref) = delete;
+ CFStreamHandle& operator=(const CFStreamHandle& rhs) = delete;
+
+ void NotifyOnOpen(grpc_closure* closure);
+ void NotifyOnRead(grpc_closure* closure);
+ void NotifyOnWrite(grpc_closure* closure);
+ void Shutdown(grpc_error* error);
+
+ void Ref(const char* file = "", int line = 0, const char* reason = nullptr);
+ void Unref(const char* file = "", int line = 0, const char* reason = nullptr);
+
+ private:
+ CFStreamHandle(CFReadStreamRef read_stream, CFWriteStreamRef write_stream);
+ static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
+ void* client_callback_info);
+ static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type,
+ void* client_callback_info);
+ static void* Retain(void* info);
+ static void Release(void* info);
+
+ grpc_core::LockfreeEvent open_event_;
+ grpc_core::LockfreeEvent read_event_;
+ grpc_core::LockfreeEvent write_event_;
+
+ gpr_refcount refcount_;
+};
+
+#ifdef DEBUG
+#define CFSTREAM_HANDLE_REF(handle, reason) \
+ (handle)->Ref(__FILE__, __LINE__, (reason))
+#define CFSTREAM_HANDLE_UNREF(handle, reason) \
+ (handle)->Unref(__FILE__, __LINE__, (reason))
+#else
+#define CFSTREAM_HANDLE_REF(handle, reason) (handle)->Ref()
+#define CFSTREAM_HANDLE_UNREF(handle, reason) (handle)->Unref()
+#endif
+
+#endif
+
+#endif /* GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H */
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 34a494485d..f14c723844 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -283,9 +283,10 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
- "previously scheduled at: [%s: %d] run?: %s",
+ "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
+ "run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
- c->line_initiated, c->run ? "true" : "false");
+ c->line_initiated, file, line, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index 6789e4d12d..7c0062eb4e 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -31,7 +31,7 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
-grpc_core::TraceFlag grpc_combiner_trace(false, "combiner");
+grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
#define GRPC_COMBINER_TRACE(fn) \
do { \
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index 0d63e468df..3c947bf9d6 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -61,6 +61,6 @@ grpc_closure_scheduler* grpc_combiner_finally_scheduler(grpc_combiner* lock);
bool grpc_combiner_continue_exec_ctx();
-extern grpc_core::TraceFlag grpc_combiner_trace;
+extern grpc_core::DebugOnlyTraceFlag grpc_combiner_trace;
#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc
new file mode 100644
index 0000000000..c3bc0cc8fd
--- /dev/null
+++ b/src/core/lib/iomgr/endpoint_cfstream.cc
@@ -0,0 +1,372 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_ENDPOINT
+
+#import <CoreFoundation/CoreFoundation.h>
+#import "src/core/lib/iomgr/endpoint_cfstream.h"
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/error_cfstream.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+typedef struct {
+ grpc_endpoint base;
+ gpr_refcount refcount;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+ CFStreamHandle* stream_sync;
+
+ grpc_closure* read_cb;
+ grpc_closure* write_cb;
+ grpc_slice_buffer* read_slices;
+ grpc_slice_buffer* write_slices;
+
+ grpc_closure read_action;
+ grpc_closure write_action;
+
+ char* peer_string;
+ grpc_resource_user* resource_user;
+ grpc_resource_user_slice_allocator slice_allocator;
+} CFStreamEndpoint;
+
+static void CFStreamFree(CFStreamEndpoint* ep) {
+ grpc_resource_user_unref(ep->resource_user);
+ CFRelease(ep->read_stream);
+ CFRelease(ep->write_stream);
+ CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
+ gpr_free(ep->peer_string);
+ gpr_free(ep);
+}
+
+#ifndef NDEBUG
+#define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
+#define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
+static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
+ const char* file, int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
+ reason, val, val - 1);
+ }
+ if (gpr_unref(&ep->refcount)) {
+ CFStreamFree(ep);
+ }
+}
+static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
+ const char* file, int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
+ reason, val, val + 1);
+ }
+ gpr_ref(&ep->refcount);
+}
+#else
+#define EP_REF(ep, reason) CFStreamRef((ep))
+#define EP_UNREF(ep, reason) CFStreamUnref((ep))
+static void CFStreamUnref(CFStreamEndpoint* ep) {
+ if (gpr_unref(&ep->refcount)) {
+ CFStreamFree(ep);
+ }
+}
+static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
+#endif
+
+static grpc_error* CFStreamAnnotateError(grpc_error* src_error,
+ CFStreamEndpoint* ep) {
+ return grpc_error_set_str(
+ grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE),
+ GRPC_ERROR_STR_TARGET_ADDRESS,
+ grpc_slice_from_copied_string(ep->peer_string));
+}
+
+static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep,
+ ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
+ size_t i;
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+
+ for (i = 0; i < ep->read_slices->count; i++) {
+ char* dump = grpc_dump_slice(ep->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump);
+ gpr_free(dump);
+ }
+ }
+ grpc_closure* cb = ep->read_cb;
+ ep->read_cb = nullptr;
+ ep->read_slices = nullptr;
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep,
+ ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "write: error=%s", str);
+ }
+ grpc_closure* cb = ep->write_cb;
+ ep->write_cb = nullptr;
+ ep->write_slices = nullptr;
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void ReadAction(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ GPR_ASSERT(ep->read_cb != nullptr);
+ if (error) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep, GRPC_ERROR_REF(error));
+ EP_UNREF(ep, "read");
+ return;
+ }
+
+ GPR_ASSERT(ep->read_slices->count == 1);
+ grpc_slice slice = ep->read_slices->slices[0];
+ size_t len = GRPC_SLICE_LENGTH(slice);
+ CFIndex read_size =
+ CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
+ if (read_size == -1) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
+ if (stream_error != nullptr) {
+ error = CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep);
+ CFRelease(stream_error);
+ } else {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error");
+ }
+ CallReadCb(ep, error);
+ EP_UNREF(ep, "read");
+ } else if (read_size == 0) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep,
+ CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep));
+ EP_UNREF(ep, "read");
+ } else {
+ if (read_size < len) {
+ grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
+ }
+ CallReadCb(ep, GRPC_ERROR_NONE);
+ EP_UNREF(ep, "read");
+ }
+}
+
+static void WriteAction(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ GPR_ASSERT(ep->write_cb != nullptr);
+ if (error) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
+ CallWriteCb(ep, GRPC_ERROR_REF(error));
+ EP_UNREF(ep, "write");
+ return;
+ }
+
+ grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
+ size_t slice_len = GRPC_SLICE_LENGTH(slice);
+ CFIndex write_size = CFWriteStreamWrite(
+ ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
+ if (write_size == -1) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
+ CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
+ if (stream_error != nullptr) {
+ error = CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep);
+ CFRelease(stream_error);
+ } else {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed.");
+ }
+ CallWriteCb(ep, error);
+ EP_UNREF(ep, "write");
+ } else {
+ if (write_size < GRPC_SLICE_LENGTH(slice)) {
+ grpc_slice_buffer_undo_take_first(
+ ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
+ }
+ if (ep->write_slices->length > 0) {
+ ep->stream_sync->NotifyOnWrite(&ep->write_action);
+ } else {
+ CallWriteCb(ep, GRPC_ERROR_NONE);
+ EP_UNREF(ep, "write");
+ }
+
+ if (grpc_tcp_trace.enabled()) {
+ grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
+ char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump);
+ gpr_free(dump);
+ grpc_slice_unref_internal(trace_slice);
+ }
+ }
+ grpc_slice_unref_internal(slice);
+}
+
+static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ if (error == GRPC_ERROR_NONE) {
+ ep->stream_sync->NotifyOnRead(&ep->read_action);
+ } else {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep, error);
+ EP_UNREF(ep, "read");
+ }
+}
+
+static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
+ slices, cb, slices->length);
+ }
+ GPR_ASSERT(ep_impl->read_cb == nullptr);
+ ep_impl->read_cb = cb;
+ ep_impl->read_slices = slices;
+ grpc_slice_buffer_reset_and_unref_internal(slices);
+ grpc_resource_user_alloc_slices(&ep_impl->slice_allocator,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
+ ep_impl->read_slices);
+ EP_REF(ep_impl, "read");
+}
+
+static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
+ ep_impl, slices, cb, slices->length);
+ }
+ GPR_ASSERT(ep_impl->write_cb == nullptr);
+ ep_impl->write_cb = cb;
+ ep_impl->write_slices = slices;
+ EP_REF(ep_impl, "write");
+ ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
+}
+
+void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why);
+ }
+ CFReadStreamClose(ep_impl->read_stream);
+ CFWriteStreamClose(ep_impl->write_stream);
+ ep_impl->stream_sync->Shutdown(why);
+ grpc_resource_user_shutdown(ep_impl->resource_user);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why);
+ }
+}
+
+void CFStreamDestroy(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
+ }
+ EP_UNREF(ep_impl, "destroy");
+}
+
+grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ return ep_impl->resource_user;
+}
+
+char* CFStreamGetPeer(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ return gpr_strdup(ep_impl->peer_string);
+}
+
+int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
+
+void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
+void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
+void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep,
+ grpc_pollset_set* pollset) {}
+
+static const grpc_endpoint_vtable vtable = {CFStreamRead,
+ CFStreamWrite,
+ CFStreamAddToPollset,
+ CFStreamAddToPollsetSet,
+ CFStreamDeleteFromPollsetSet,
+ CFStreamShutdown,
+ CFStreamDestroy,
+ CFStreamGetResourceUser,
+ CFStreamGetPeer,
+ CFStreamGetFD};
+
+grpc_endpoint* grpc_cfstream_endpoint_create(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
+ const char* peer_string, grpc_resource_quota* resource_quota,
+ CFStreamHandle* stream_sync) {
+ CFStreamEndpoint* ep_impl =
+ static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint)));
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "CFStream endpoint:%p create readStream:%p writeStream: %p",
+ ep_impl, read_stream, write_stream);
+ }
+ ep_impl->base.vtable = &vtable;
+ gpr_ref_init(&ep_impl->refcount, 1);
+ ep_impl->read_stream = read_stream;
+ ep_impl->write_stream = write_stream;
+ CFRetain(read_stream);
+ CFRetain(write_stream);
+ ep_impl->stream_sync = stream_sync;
+ CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
+
+ ep_impl->peer_string = gpr_strdup(peer_string);
+ ep_impl->read_cb = nil;
+ ep_impl->write_cb = nil;
+ ep_impl->read_slices = nil;
+ ep_impl->write_slices = nil;
+ GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
+ static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
+ static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
+ ep_impl->resource_user =
+ grpc_resource_user_create(resource_quota, peer_string);
+ grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator,
+ ep_impl->resource_user,
+ CFStreamReadAllocationDone, ep_impl);
+
+ return &ep_impl->base;
+}
+
+#endif /* GRPC_CFSTREAM_ENDPOINT */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.h b/src/core/lib/iomgr/endpoint_cfstream.h
new file mode 100644
index 0000000000..ef957c1f11
--- /dev/null
+++ b/src/core/lib/iomgr/endpoint_cfstream.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H
+#define GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H
+/*
+ Low level TCP "bottom half" implementation, for use by transports built on
+ top of a TCP connection.
+
+ Note that this file does not (yet) include APIs for creating the socket in
+ the first place.
+
+ All calls passing slice transfer ownership of a slice refcount unless
+ otherwise specified.
+*/
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_CFSTREAM
+
+#import <CoreFoundation/CoreFoundation.h>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
+#include "src/core/lib/iomgr/endpoint.h"
+
+grpc_endpoint* grpc_cfstream_endpoint_create(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
+ const char* peer_string, grpc_resource_quota* resource_quota,
+ CFStreamHandle* stream_sync);
+
+#endif /* GRPC_CFSTREAM */
+
+#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H */
diff --git a/src/core/lib/iomgr/error_cfstream.cc b/src/core/lib/iomgr/error_cfstream.cc
new file mode 100644
index 0000000000..d7af8c377f
--- /dev/null
+++ b/src/core/lib/iomgr/error_cfstream.cc
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_CFSTREAM
+#include <CoreFoundation/CoreFoundation.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/error.h"
+
+#define MAX_ERROR_DESCRIPTION 256
+
+grpc_error* grpc_error_create_from_cferror(const char* file, int line,
+ void* arg, const char* custom_desc) {
+ CFErrorRef error = static_cast<CFErrorRef>(arg);
+ char buf_domain[MAX_ERROR_DESCRIPTION];
+ char buf_desc[MAX_ERROR_DESCRIPTION];
+ char* error_msg;
+ CFErrorDomain domain = CFErrorGetDomain((error));
+ CFIndex code = CFErrorGetCode((error));
+ CFStringRef desc = CFErrorCopyDescription((error));
+ CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION,
+ kCFStringEncodingUTF8);
+ CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION,
+ kCFStringEncodingUTF8);
+ gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)",
+ custom_desc, buf_domain, code, buf_desc);
+ CFRelease(desc);
+ grpc_error* return_error = grpc_error_create(
+ file, line, grpc_slice_from_copied_string(error_msg), NULL, 0);
+ gpr_free(error_msg);
+ return return_error;
+}
+#endif /* GRPC_CFSTREAM */
diff --git a/src/core/lib/iomgr/error_cfstream.h b/src/core/lib/iomgr/error_cfstream.h
new file mode 100644
index 0000000000..06ab751329
--- /dev/null
+++ b/src/core/lib/iomgr/error_cfstream.h
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H
+#define GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H
+
+#ifdef GRPC_CFSTREAM
+// Create an error from Apple Core Foundation CFError object
+#define GRPC_ERROR_CREATE_FROM_CFERROR(error, desc) \
+ grpc_error_create_from_cferror(__FILE__, __LINE__, \
+ static_cast<void*>((error)), (desc))
+grpc_error* grpc_error_create_from_cferror(const char* file, int line,
+ void* arg, const char* desc);
+#endif /* GRPC_CFSTREAM */
+
+#endif /* GRPC_CORE_LIB_IOMGR_ERROR_CFSTREAM_H */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index e5db1be0e0..cf839619cd 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1237,12 +1237,12 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
}
#else /* defined(GRPC_LINUX_EPOLL) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
#include "src/core/lib/iomgr/ev_epoll1_linux.h"
/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
return nullptr;
}
-#endif /* defined(GRPC_POSIX_SOCKET) */
+#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
#endif /* !defined(GRPC_LINUX_EPOLL) */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 4c6cff7fe2..7903297fc6 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1066,7 +1066,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
#endif
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO,
- "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR
+ "PS:%p work hdl=%p worker=%p now=%" PRId64 " deadline=%" PRId64
" kwp=%d pollable=%p",
pollset, worker_hdl, WORKER_PTR, grpc_core::ExecCtx::Get()->Now(),
deadline, pollset->kicked_without_poller, pollset->active_pollable);
@@ -1556,7 +1556,7 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
}
#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLLEX)
#include "src/core/lib/iomgr/ev_epollex_linux.h"
/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
epoll_create1 is not available. Return NULL */
@@ -1564,6 +1564,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
return nullptr;
}
-#endif /* defined(GRPC_POSIX_SOCKET) */
+#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) */
#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 494bc71c1d..a144817a83 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -1721,7 +1721,7 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
}
#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLLSIG)
#include "src/core/lib/iomgr/ev_epollsig_linux.h"
/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
epoll_create1 is not available. Return NULL */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 504787e659..70958ed562 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_EV_POLL
#include "src/core/lib/iomgr/ev_poll_posix.h"
@@ -1761,4 +1761,4 @@ const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request) {
return &vtable;
}
-#endif
+#endif /* GRPC_POSIX_SOCKET_EV_POLL */
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 6bd1dc8e50..6b7eca0afa 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_EV
#include "src/core/lib/iomgr/ev_posix.h"
@@ -251,10 +251,10 @@ static void pollset_destroy(grpc_pollset* pollset) {
static grpc_error* pollset_work(grpc_pollset* pollset,
grpc_pollset_worker** worker,
grpc_millis deadline) {
- GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRIdPTR ") begin", pollset,
+ GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") begin", pollset,
deadline);
grpc_error* err = g_event_engine->pollset_work(pollset, worker, deadline);
- GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRIdPTR ") end", pollset,
+ GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") end", pollset,
deadline);
return err;
}
@@ -334,4 +334,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
g_event_engine->pollset_set_del_fd(pollset_set, fd);
}
-#endif // GRPC_POSIX_SOCKET
+#endif // GRPC_POSIX_SOCKET_EV
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 2f544b20ab..5d5c355ff9 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -53,24 +53,24 @@ static void exec_ctx_sched(grpc_closure* closure, grpc_error* error) {
static gpr_timespec g_start_time;
-static gpr_atm timespec_to_atm_round_down(gpr_timespec ts) {
+static grpc_millis timespec_to_millis_round_down(gpr_timespec ts) {
ts = gpr_time_sub(ts, g_start_time);
double x = GPR_MS_PER_SEC * static_cast<double>(ts.tv_sec) +
static_cast<double>(ts.tv_nsec) / GPR_NS_PER_MS;
if (x < 0) return 0;
- if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
- return static_cast<gpr_atm>(x);
+ if (x > GRPC_MILLIS_INF_FUTURE) return GRPC_MILLIS_INF_FUTURE;
+ return static_cast<grpc_millis>(x);
}
-static gpr_atm timespec_to_atm_round_up(gpr_timespec ts) {
+static grpc_millis timespec_to_millis_round_up(gpr_timespec ts) {
ts = gpr_time_sub(ts, g_start_time);
double x = GPR_MS_PER_SEC * static_cast<double>(ts.tv_sec) +
static_cast<double>(ts.tv_nsec) / GPR_NS_PER_MS +
static_cast<double>(GPR_NS_PER_SEC - 1) /
static_cast<double>(GPR_NS_PER_SEC);
if (x < 0) return 0;
- if (x > GPR_ATM_MAX) return GPR_ATM_MAX;
- return static_cast<gpr_atm>(x);
+ if (x > GRPC_MILLIS_INF_FUTURE) return GRPC_MILLIS_INF_FUTURE;
+ return static_cast<grpc_millis>(x);
}
gpr_timespec grpc_millis_to_timespec(grpc_millis millis,
@@ -92,12 +92,12 @@ gpr_timespec grpc_millis_to_timespec(grpc_millis millis,
}
grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec ts) {
- return timespec_to_atm_round_down(
+ return timespec_to_millis_round_down(
gpr_convert_clock_type(ts, g_start_time.clock_type));
}
grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec ts) {
- return timespec_to_atm_round_up(
+ return timespec_to_millis_round_up(
gpr_convert_clock_type(ts, g_start_time.clock_type));
}
@@ -138,7 +138,7 @@ bool ExecCtx::Flush() {
grpc_millis ExecCtx::Now() {
if (!now_is_valid_) {
- now_ = timespec_to_atm_round_down(gpr_now(GPR_CLOCK_MONOTONIC));
+ now_ = timespec_to_millis_round_down(gpr_now(GPR_CLOCK_MONOTONIC));
now_is_valid_ = true;
}
return now_;
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 8823dc4b51..cf1118a003 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -29,10 +29,10 @@
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/iomgr/closure.h"
-typedef gpr_atm grpc_millis;
+typedef int64_t grpc_millis;
-#define GRPC_MILLIS_INF_FUTURE GPR_ATM_MAX
-#define GRPC_MILLIS_INF_PAST GPR_ATM_MIN
+#define GRPC_MILLIS_INF_FUTURE INT64_MAX
+#define GRPC_MILLIS_INF_PAST INT64_MIN
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc
index 66c9cb7ff7..ca7334c9a4 100644
--- a/src/core/lib/iomgr/iomgr_posix.cc
+++ b/src/core/lib/iomgr/iomgr_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_IOMGR
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
@@ -64,4 +64,4 @@ void grpc_set_default_iomgr_platform() {
grpc_set_iomgr_platform_vtable(&vtable);
}
-#endif /* GRPC_POSIX_SOCKET */
+#endif /* GRPC_POSIX_SOCKET_IOMGR */
diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc
index 9f164f65b0..dea07cae53 100644
--- a/src/core/lib/iomgr/polling_entity.cc
+++ b/src/core/lib/iomgr/polling_entity.cc
@@ -61,8 +61,11 @@ bool grpc_polling_entity_is_empty(const grpc_polling_entity* pollent) {
void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent,
grpc_pollset_set* pss_dst) {
if (pollent->tag == GRPC_POLLS_POLLSET) {
- GPR_ASSERT(pollent->pollent.pollset != nullptr);
- grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset);
+ // CFStream does not use file destriptors. When CFStream is used, the fd
+ // pollset is possible to be null.
+ if (pollent->pollent.pollset != nullptr) {
+ grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset);
+ }
} else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
GPR_ASSERT(pollent->pollent.pollset_set != nullptr);
grpc_pollset_set_add_pollset_set(pss_dst, pollent->pollent.pollset_set);
@@ -75,8 +78,14 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent,
void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent,
grpc_pollset_set* pss_dst) {
if (pollent->tag == GRPC_POLLS_POLLSET) {
+#ifdef GRPC_CFSTREAM
+ if (pollent->pollent.pollset != nullptr) {
+ grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset);
+ }
+#else
GPR_ASSERT(pollent->pollent.pollset != nullptr);
grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset);
+#endif
} else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
GPR_ASSERT(pollent->pollent.pollset_set != nullptr);
grpc_pollset_set_del_pollset_set(pss_dst, pollent->pollent.pollset_set);
diff --git a/src/core/lib/iomgr/pollset_custom.cc b/src/core/lib/iomgr/pollset_custom.cc
index 04bd104055..70e8a4596f 100644
--- a/src/core/lib/iomgr/pollset_custom.cc
+++ b/src/core/lib/iomgr/pollset_custom.cc
@@ -69,7 +69,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
gpr_mu_unlock(&pollset->mu);
grpc_millis now = grpc_core::ExecCtx::Get()->Now();
- size_t timeout = 0;
+ grpc_millis timeout = 0;
if (deadline > now) {
timeout = deadline - now;
}
@@ -77,7 +77,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
// control back to the application
grpc_core::ExecCtx* curr = grpc_core::ExecCtx::Get();
grpc_core::ExecCtx::Set(nullptr);
- poller_vtable->poll(timeout);
+ poller_vtable->poll(static_cast<size_t>(timeout));
grpc_core::ExecCtx::Set(curr);
grpc_core::ExecCtx::Get()->InvalidateNow();
if (grpc_core::ExecCtx::Get()->HasWork()) {
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index a397012003..80d8e63cdd 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -97,7 +97,26 @@
#define GRPC_MSG_IOVLEN_TYPE int
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#ifdef GRPC_CFSTREAM
+#define GRPC_POSIX_SOCKET_IOMGR 1
+#define GRPC_CFSTREAM_ENDPOINT 1
+#define GRPC_CFSTREAM_CLIENT 1
+#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
+#define GRPC_POSIX_SOCKET_EV 1
+#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1
+#define GRPC_POSIX_SOCKET_EV_POLL 1
+#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1
+#define GRPC_POSIX_SOCKET_SOCKADDR 1
+#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1
+#define GRPC_POSIX_SOCKET_TCP 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1
+#define GRPC_POSIX_SOCKET_UTILS_COMMON 1
+#else
#define GRPC_POSIX_SOCKET 1
+#endif
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_SYSCONF 1
#define GRPC_POSIX_WAKEUP_FD 1
@@ -131,12 +150,30 @@
#endif
#if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \
- defined(GRPC_CUSTOM_SOCKET) != \
+ defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_CFSTREAM) != \
1
#error \
"Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET"
#endif
+#ifdef GRPC_POSIX_SOCKET
+#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
+#define GRPC_POSIX_SOCKET_EV 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1
+#define GRPC_POSIX_SOCKET_EV_POLL 1
+#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
+#define GRPC_POSIX_SOCKET_IOMGR 1
+#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1
+#define GRPC_POSIX_SOCKET_SOCKADDR 1
+#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1
+#define GRPC_POSIX_SOCKET_TCP 1
+#define GRPC_POSIX_SOCKET_TCP_CLIENT 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1
+#define GRPC_POSIX_SOCKET_UTILS_COMMON 1
+#endif
+
#if defined(GRPC_POSIX_HOST_NAME_MAX) && defined(GRPC_POSIX_SYSCONF)
#error "Cannot define both GRPC_POSIX_HOST_NAME_MAX and GRPC_POSIX_SYSCONF"
#endif
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index fe0d834582..6afe94a7a9 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -33,7 +33,7 @@
#include <ws2tcpip.h>
#endif
-#ifdef GRPC_POSIX_SOCKET
+#if defined(GRPC_POSIX_SOCKET) || defined(GRPC_CFSTREAM)
#include <sys/socket.h>
#endif
diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc
index a82075542f..7a825643e1 100644
--- a/src/core/lib/iomgr/resolve_address_posix.cc
+++ b/src/core/lib/iomgr/resolve_address_posix.cc
@@ -19,7 +19,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
#include "src/core/lib/iomgr/sockaddr.h"
diff --git a/src/core/lib/iomgr/sockaddr_posix.h b/src/core/lib/iomgr/sockaddr_posix.h
index 5b18bbc465..3cedd9082d 100644
--- a/src/core/lib/iomgr/sockaddr_posix.h
+++ b/src/core/lib/iomgr/sockaddr_posix.h
@@ -23,7 +23,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_SOCKADDR
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
diff --git a/src/core/lib/iomgr/socket_factory_posix.cc b/src/core/lib/iomgr/socket_factory_posix.cc
index 1d1e36c0e3..57137769c8 100644
--- a/src/core/lib/iomgr/socket_factory_posix.cc
+++ b/src/core/lib/iomgr/socket_factory_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_SOCKET_FACTORY
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc
index 04a1767731..caee652307 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.cc
+++ b/src/core/lib/iomgr/socket_utils_common_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
new file mode 100644
index 0000000000..ffed3bbef6
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -0,0 +1,216 @@
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_CLIENT
+
+#include <CoreFoundation/CoreFoundation.h>
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+
+#include <netinet/in.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/host_port.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint_cfstream.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/error_cfstream.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/timer.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+typedef struct CFStreamConnect {
+ gpr_mu mu;
+ gpr_refcount refcount;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+ CFStreamHandle* stream_sync;
+
+ grpc_timer alarm;
+ grpc_closure on_alarm;
+ grpc_closure on_open;
+
+ bool read_stream_open;
+ bool write_stream_open;
+ bool failed;
+
+ grpc_closure* closure;
+ grpc_endpoint** endpoint;
+ int refs;
+ char* addr_name;
+ grpc_resource_quota* resource_quota;
+} CFStreamConnect;
+
+static void CFStreamConnectCleanup(CFStreamConnect* connect) {
+ grpc_resource_quota_unref_internal(connect->resource_quota);
+ CFSTREAM_HANDLE_UNREF(connect->stream_sync, "async connect clean up");
+ CFRelease(connect->read_stream);
+ CFRelease(connect->write_stream);
+ gpr_mu_destroy(&connect->mu);
+ gpr_free(connect->addr_name);
+ gpr_free(connect);
+}
+
+static void OnAlarm(void* arg, grpc_error* error) {
+ CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error);
+ }
+ gpr_mu_lock(&connect->mu);
+ grpc_closure* closure = connect->closure;
+ connect->closure = nil;
+ const bool done = (--connect->refs == 0);
+ gpr_mu_unlock(&connect->mu);
+ // Only schedule a callback once, by either OnAlarm or OnOpen. The
+ // first one issues callback while the second one does cleanup.
+ if (done) {
+ CFStreamConnectCleanup(connect);
+ } else {
+ grpc_error* error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
+ GRPC_CLOSURE_SCHED(closure, error);
+ }
+}
+
+static void OnOpen(void* arg, grpc_error* error) {
+ CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error);
+ }
+ gpr_mu_lock(&connect->mu);
+ grpc_timer_cancel(&connect->alarm);
+ grpc_closure* closure = connect->closure;
+ connect->closure = nil;
+
+ bool done = (--connect->refs == 0);
+ grpc_endpoint** endpoint = connect->endpoint;
+
+ // Only schedule a callback once, by either OnAlarm or OnOpen. The
+ // first one issues callback while the second one does cleanup.
+ if (done) {
+ gpr_mu_unlock(&connect->mu);
+ CFStreamConnectCleanup(connect);
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
+ if (stream_error == NULL) {
+ stream_error = CFWriteStreamCopyError(connect->write_stream);
+ }
+ if (stream_error) {
+ error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error");
+ CFRelease(stream_error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ *endpoint = grpc_cfstream_endpoint_create(
+ connect->read_stream, connect->write_stream, connect->addr_name,
+ connect->resource_quota, connect->stream_sync);
+ }
+ } else {
+ GRPC_ERROR_REF(error);
+ }
+ gpr_mu_unlock(&connect->mu);
+ GRPC_CLOSURE_SCHED(closure, error);
+ }
+}
+
+static void ParseResolvedAddress(const grpc_resolved_address* addr,
+ CFStringRef* host, int* port) {
+ char *host_port, *host_string, *port_string;
+ grpc_sockaddr_to_string(&host_port, addr, 1);
+ gpr_split_host_port(host_port, &host_string, &port_string);
+ *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8);
+ gpr_free(host_string);
+ gpr_free(port_string);
+ gpr_free(host_port);
+ *port = grpc_sockaddr_get_port(addr);
+}
+
+static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* resolved_addr,
+ grpc_millis deadline) {
+ CFStreamConnect* connect;
+
+ connect = (CFStreamConnect*)gpr_zalloc(sizeof(CFStreamConnect));
+ connect->closure = closure;
+ connect->endpoint = ep;
+ connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
+ // connect->resource_quota = resource_quota;
+ connect->refs = 2; // One for the connect operation, one for the timer.
+ gpr_ref_init(&connect->refcount, 1);
+ gpr_mu_init(&connect->mu);
+
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
+ connect->addr_name);
+ }
+
+ grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_unref_internal(resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
+ (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+ connect->resource_quota = resource_quota;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+
+ CFStringRef host;
+ int port;
+ ParseResolvedAddress(resolved_addr, &host, &port);
+ CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream,
+ &write_stream);
+ CFRelease(host);
+ connect->read_stream = read_stream;
+ connect->write_stream = write_stream;
+ connect->stream_sync =
+ CFStreamHandle::CreateStreamHandle(read_stream, write_stream);
+ GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
+ grpc_schedule_on_exec_ctx);
+ connect->stream_sync->NotifyOnOpen(&connect->on_open);
+ GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
+ grpc_schedule_on_exec_ctx);
+ gpr_mu_lock(&connect->mu);
+ CFReadStreamOpen(read_stream);
+ CFWriteStreamOpen(write_stream);
+ grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
+ gpr_mu_unlock(&connect->mu);
+}
+
+grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {CFStreamClientConnect};
+
+#endif /* GRPC_CFSTREAM_CLIENT */
diff --git a/src/core/lib/iomgr/tcp_client_custom.cc b/src/core/lib/iomgr/tcp_client_custom.cc
index 932c79ea0b..9389861d07 100644
--- a/src/core/lib/iomgr/tcp_client_custom.cc
+++ b/src/core/lib/iomgr/tcp_client_custom.cc
@@ -140,12 +140,12 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
socket, connect->addr_name);
}
- grpc_custom_socket_vtable->connect(
- socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len,
- custom_connect_callback);
GRPC_CLOSURE_INIT(&connect->on_alarm, on_alarm, socket,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
+ grpc_custom_socket_vtable->connect(
+ socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len,
+ custom_connect_callback);
}
grpc_tcp_client_vtable custom_tcp_client_vtable = {tcp_connect};
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 900c056575..39da7f1637 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
#include "src/core/lib/iomgr/tcp_client_posix.h"
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index b79ffe20f1..43d545846d 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/tcp_posix.h"
@@ -70,7 +70,9 @@ struct grpc_tcp {
grpc_endpoint base;
grpc_fd* em_fd;
int fd;
- bool finished_edge;
+ /* Used by the endpoint read function to distinguish the very first read call
+ * from the rest */
+ bool is_first_read;
double target_length;
double bytes_read_this_round;
gpr_refcount refcount;
@@ -377,7 +379,6 @@ static void tcp_do_read(grpc_tcp* tcp) {
ssize_t read_bytes;
size_t i;
- GPR_ASSERT(!tcp->finished_edge);
GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
for (i = 0; i < tcp->incoming_buffer->count; i++) {
@@ -473,7 +474,6 @@ static void tcp_continue_read(grpc_tcp* tcp) {
static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) {
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
- GPR_ASSERT(!tcp->finished_edge);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p got_read: %s", tcp, grpc_error_string(error));
}
@@ -497,10 +497,17 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
grpc_slice_buffer_reset_and_unref_internal(incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
- if (tcp->finished_edge) {
- tcp->finished_edge = false;
+ if (tcp->is_first_read) {
+ /* Endpoint read called for the very first time. Register read callback with
+ * the polling engine */
+ tcp->is_first_read = false;
notify_on_read(tcp);
} else {
+ /* Not the first time. We may or may not have more bytes available. In any
+ * case call tcp->read_done_closure (i.e tcp_handle_read()) which does the
+ * right thing (i.e calls tcp_do_read() which either reads the available
+ * bytes or calls notify_on_read() to be notified when new bytes become
+ * available */
GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE);
}
}
@@ -778,7 +785,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
tcp->min_read_chunk_size = tcp_min_read_chunk_size;
tcp->max_read_chunk_size = tcp_max_read_chunk_size;
tcp->bytes_read_this_round = 0;
- tcp->finished_edge = true;
+ /* Will be set to false by the very first endpoint read function */
+ tcp->is_first_read = true;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
@@ -811,4 +819,4 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
TCP_UNREF(tcp, "destroy");
}
-#endif
+#endif /* GRPC_POSIX_SOCKET_TCP */
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 484d2b6077..0a5caca906 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -25,7 +25,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
#include "src/core/lib/iomgr/tcp_server.h"
@@ -559,4 +559,4 @@ grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
tcp_server_shutdown_starting_add,
tcp_server_unref,
tcp_server_shutdown_listeners};
-#endif
+#endif /* GRPC_POSIX_SOCKET_TCP_SERVER */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 2d95aa66d6..73afa15e65 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
@@ -217,4 +217,4 @@ error:
return ret;
}
-#endif /* GRPC_POSIX_SOCKET */
+#endif /* GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON */
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index 5ff10d3aee..7f534476df 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -28,7 +28,7 @@
#include "src/core/lib/iomgr/iomgr.h"
typedef struct grpc_timer {
- gpr_atm deadline;
+ grpc_millis deadline;
uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
bool pending;
struct grpc_timer* next;
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc
index de2256f7cb..4294162af7 100644
--- a/src/core/lib/iomgr/timer_generic.cc
+++ b/src/core/lib/iomgr/timer_generic.cc
@@ -34,6 +34,7 @@
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/time_averaged_stats.h"
#include "src/core/lib/iomgr/timer_heap.h"
@@ -59,9 +60,9 @@ typedef struct {
gpr_mu mu;
grpc_time_averaged_stats stats;
/* All and only timers with deadlines <= this will be in the heap. */
- gpr_atm queue_deadline_cap;
+ grpc_millis queue_deadline_cap;
/* The deadline of the next timer due in this shard */
- gpr_atm min_deadline;
+ grpc_millis min_deadline;
/* Index of this timer_shard in the g_shard_queue */
uint32_t shard_queue_index;
/* This holds all timers with deadlines < queue_deadline_cap. Timers in this
@@ -209,15 +210,23 @@ static void validate_non_pending_timer(grpc_timer* t) {
#endif
+#if GPR_ARCH_64
+/* NOTE: TODO(sreek) - Currently the thread local storage support in grpc is
+ for intptr_t which means on 32-bit machines it is not wide enough to hold
+ grpc_millis which is 64-bit. Adding thread local support for 64 bit values
+ is a lot of work for very little gain. So we are currently restricting this
+ optimization to only 64 bit machines */
+
/* Thread local variable that stores the deadline of the next timer the thread
* has last-seen. This is an optimization to prevent the thread from checking
* shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock,
* an expensive operation) */
GPR_TLS_DECL(g_last_seen_min_timer);
+#endif
struct shared_mutables {
/* The deadline of the next timer due across all timer shards */
- gpr_atm min_timer;
+ grpc_millis min_timer;
/* Allow only one run_some_expired_timers at once */
gpr_spinlock checker_mu;
bool initialized;
@@ -227,18 +236,18 @@ struct shared_mutables {
static struct shared_mutables g_shared_mutables;
-static gpr_atm saturating_add(gpr_atm a, gpr_atm b) {
- if (a > GPR_ATM_MAX - b) {
- return GPR_ATM_MAX;
+static grpc_millis saturating_add(grpc_millis a, grpc_millis b) {
+ if (a > GRPC_MILLIS_INF_FUTURE - b) {
+ return GRPC_MILLIS_INF_FUTURE;
}
return a + b;
}
-static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
- gpr_atm* next,
+static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
+ grpc_millis* next,
grpc_error* error);
-static gpr_atm compute_min_deadline(timer_shard* shard) {
+static grpc_millis compute_min_deadline(timer_shard* shard) {
return grpc_timer_heap_is_empty(&shard->heap)
? saturating_add(shard->queue_deadline_cap, 1)
: grpc_timer_heap_top(&shard->heap)->deadline;
@@ -257,8 +266,11 @@ static void timer_list_init() {
g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mu_init(&g_shared_mutables.mu);
g_shared_mutables.min_timer = grpc_core::ExecCtx::Get()->Now();
+
+#if GPR_ARCH_64
gpr_tls_init(&g_last_seen_min_timer);
gpr_tls_set(&g_last_seen_min_timer, 0);
+#endif
for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i];
@@ -287,7 +299,11 @@ static void timer_list_shutdown() {
grpc_timer_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_shared_mutables.mu);
+
+#if GPR_ARCH_64
gpr_tls_destroy(&g_last_seen_min_timer);
+#endif
+
gpr_free(g_shards);
gpr_free(g_shard_queue);
g_shared_mutables.initialized = false;
@@ -346,7 +362,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
#endif
if (grpc_timer_trace.enabled()) {
- gpr_log(GPR_INFO, "TIMER %p: SET %" PRIdPTR " now %" PRIdPTR " call %p[%p]",
+ gpr_log(GPR_INFO, "TIMER %p: SET %" PRId64 " now %" PRId64 " call %p[%p]",
timer, deadline, grpc_core::ExecCtx::Get()->Now(), closure,
closure->cb);
}
@@ -383,7 +399,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
}
if (grpc_timer_trace.enabled()) {
gpr_log(GPR_INFO,
- " .. add to shard %d with queue_deadline_cap=%" PRIdPTR
+ " .. add to shard %d with queue_deadline_cap=%" PRId64
" => is_first_timer=%s",
static_cast<int>(shard - g_shards), shard->queue_deadline_cap,
is_first_timer ? "true" : "false");
@@ -404,15 +420,27 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
if (is_first_timer) {
gpr_mu_lock(&g_shared_mutables.mu);
if (grpc_timer_trace.enabled()) {
- gpr_log(GPR_INFO, " .. old shard min_deadline=%" PRIdPTR,
+ gpr_log(GPR_INFO, " .. old shard min_deadline=%" PRId64,
shard->min_deadline);
}
if (deadline < shard->min_deadline) {
- gpr_atm old_min_deadline = g_shard_queue[0]->min_deadline;
+ grpc_millis old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = deadline;
note_deadline_change(shard);
if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
- gpr_atm_no_barrier_store(&g_shared_mutables.min_timer, deadline);
+#if GPR_ARCH_64
+ // TODO: sreek - Using c-style cast here. static_cast<> gives an error
+ // (on mac platforms complaining that gpr_atm* is (long *) while
+ // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
+ // safe since we know that both are pointer types and 64-bit wide.
+ gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
+ deadline);
+#else
+ // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
+ // types (like grpc_millis). So all reads and writes to
+ // g_shared_mutables.min_timer varialbe under g_shared_mutables.mu
+ g_shared_mutables.min_timer = deadline;
+#endif
grpc_kick_poller();
}
}
@@ -421,8 +449,10 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
}
static void timer_consume_kick(void) {
- /* force re-evaluation of last seeen min */
+#if GPR_ARCH_64
+ /* Force re-evaluation of last seen min */
gpr_tls_set(&g_last_seen_min_timer, 0);
+#endif
}
static void timer_cancel(grpc_timer* timer) {
@@ -459,7 +489,7 @@ static void timer_cancel(grpc_timer* timer) {
'queue_deadline_cap') into into shard->heap.
Returns 'true' if shard->heap has atleast ONE element
REQUIRES: shard->mu locked */
-static int refill_heap(timer_shard* shard, gpr_atm now) {
+static int refill_heap(timer_shard* shard, grpc_millis now) {
/* Compute the new queue window width and bound by the limits: */
double computed_deadline_delta =
grpc_time_averaged_stats_update_average(&shard->stats) *
@@ -472,10 +502,10 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
/* Compute the new cap and put all timers under it into the queue: */
shard->queue_deadline_cap =
saturating_add(GPR_MAX(now, shard->queue_deadline_cap),
- static_cast<gpr_atm>(deadline_delta * 1000.0));
+ static_cast<grpc_millis>(deadline_delta * 1000.0));
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_INFO, " .. shard[%d]->queue_deadline_cap --> %" PRIdPTR,
+ gpr_log(GPR_INFO, " .. shard[%d]->queue_deadline_cap --> %" PRId64,
static_cast<int>(shard - g_shards), shard->queue_deadline_cap);
}
for (timer = shard->list.next; timer != &shard->list; timer = next) {
@@ -483,7 +513,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
if (timer->deadline < shard->queue_deadline_cap) {
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_INFO, " .. add timer with deadline %" PRIdPTR " to heap",
+ gpr_log(GPR_INFO, " .. add timer with deadline %" PRId64 " to heap",
timer->deadline);
}
list_remove(timer);
@@ -496,7 +526,7 @@ static int refill_heap(timer_shard* shard, gpr_atm now) {
/* This pops the next non-cancelled timer with deadline <= now from the
queue, or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
-static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
+static grpc_timer* pop_one(timer_shard* shard, grpc_millis now) {
grpc_timer* timer;
for (;;) {
if (grpc_timer_check_trace.enabled()) {
@@ -511,12 +541,12 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
timer = grpc_timer_heap_top(&shard->heap);
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_INFO,
- " .. check top timer deadline=%" PRIdPTR " now=%" PRIdPTR,
+ " .. check top timer deadline=%" PRId64 " now=%" PRId64,
timer->deadline, now);
}
if (timer->deadline > now) return nullptr;
if (grpc_timer_trace.enabled()) {
- gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRIdPTR "ms late via %s scheduler",
+ gpr_log(GPR_INFO, "TIMER %p: FIRE %" PRId64 "ms late via %s scheduler",
timer, now - timer->deadline,
timer->closure->scheduler->vtable->name);
}
@@ -527,8 +557,8 @@ static grpc_timer* pop_one(timer_shard* shard, gpr_atm now) {
}
/* REQUIRES: shard->mu unlocked */
-static size_t pop_timers(timer_shard* shard, gpr_atm now,
- gpr_atm* new_min_deadline, grpc_error* error) {
+static size_t pop_timers(timer_shard* shard, grpc_millis now,
+ grpc_millis* new_min_deadline, grpc_error* error) {
size_t n = 0;
grpc_timer* timer;
gpr_mu_lock(&shard->mu);
@@ -546,13 +576,27 @@ static size_t pop_timers(timer_shard* shard, gpr_atm now,
return n;
}
-static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
- gpr_atm* next,
+static grpc_timer_check_result run_some_expired_timers(grpc_millis now,
+ grpc_millis* next,
grpc_error* error) {
grpc_timer_check_result result = GRPC_TIMERS_NOT_CHECKED;
- gpr_atm min_timer = gpr_atm_no_barrier_load(&g_shared_mutables.min_timer);
+#if GPR_ARCH_64
+ // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on
+ // mac platforms complaining that gpr_atm* is (long *) while
+ // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
+ // safe since we know that both are pointer types and 64-bit wide
+ grpc_millis min_timer = static_cast<grpc_millis>(
+ gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
gpr_tls_set(&g_last_seen_min_timer, min_timer);
+#else
+ // On 32-bit systems, gpr_atm_no_barrier_load does not work on 64-bit types
+ // (like grpc_millis). So all reads and writes to g_shared_mutables.min_timer
+ // are done under g_shared_mutables.mu
+ gpr_mu_lock(&g_shared_mutables.mu);
+ grpc_millis min_timer = g_shared_mutables.min_timer;
+ gpr_mu_unlock(&g_shared_mutables.mu);
+#endif
if (now < min_timer) {
if (next != nullptr) *next = GPR_MIN(*next, min_timer);
return GRPC_TIMERS_CHECKED_AND_EMPTY;
@@ -563,14 +607,15 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
result = GRPC_TIMERS_CHECKED_AND_EMPTY;
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_INFO, " .. shard[%d]->min_deadline = %" PRIdPTR,
+ gpr_log(GPR_INFO, " .. shard[%d]->min_deadline = %" PRId64,
static_cast<int>(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline);
}
while (g_shard_queue[0]->min_deadline < now ||
- (now != GPR_ATM_MAX && g_shard_queue[0]->min_deadline == now)) {
- gpr_atm new_min_deadline;
+ (now != GRPC_MILLIS_INF_FUTURE &&
+ g_shard_queue[0]->min_deadline == now)) {
+ grpc_millis new_min_deadline;
/* For efficiency, we pop as many available timers as we can from the
shard. This may violate perfect timer deadline ordering, but that
@@ -582,8 +627,8 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
if (grpc_timer_check_trace.enabled()) {
gpr_log(GPR_INFO,
" .. result --> %d"
- ", shard[%d]->min_deadline %" PRIdPTR " --> %" PRIdPTR
- ", now=%" PRIdPTR,
+ ", shard[%d]->min_deadline %" PRId64 " --> %" PRId64
+ ", now=%" PRId64,
result, static_cast<int>(g_shard_queue[0] - g_shards),
g_shard_queue[0]->min_deadline, new_min_deadline, now);
}
@@ -601,8 +646,19 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
*next = GPR_MIN(*next, g_shard_queue[0]->min_deadline);
}
- gpr_atm_no_barrier_store(&g_shared_mutables.min_timer,
+#if GPR_ARCH_64
+ // TODO: sreek - Using c-style cast here. static_cast<> gives an error (on
+ // mac platforms complaining that gpr_atm* is (long *) while
+ // (&g_shared_mutables.min_timer) is a (long long *). The cast should be
+ // safe since we know that both are pointer types and 64-bit wide
+ gpr_atm_no_barrier_store((gpr_atm*)(&g_shared_mutables.min_timer),
g_shard_queue[0]->min_deadline);
+#else
+ // On 32-bit systems, gpr_atm_no_barrier_store does not work on 64-bit
+ // types (like grpc_millis). So all reads and writes to
+ // g_shared_mutables.min_timer are done under g_shared_mutables.mu
+ g_shared_mutables.min_timer = g_shard_queue[0]->min_deadline;
+#endif
gpr_mu_unlock(&g_shared_mutables.mu);
gpr_spinlock_unlock(&g_shared_mutables.checker_mu);
}
@@ -616,17 +672,28 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
// prelude
grpc_millis now = grpc_core::ExecCtx::Get()->Now();
+#if GPR_ARCH_64
/* fetch from a thread-local first: this avoids contention on a globally
mutable cacheline in the common case */
grpc_millis min_timer = gpr_tls_get(&g_last_seen_min_timer);
+#else
+ // On 32-bit systems, we currently do not have thread local support for 64-bit
+ // types. In this case, directly read from g_shared_mutables.min_timer.
+ // Also, note that on 32-bit systems, gpr_atm_no_barrier_store does not work
+ // on 64-bit types (like grpc_millis). So all reads and writes to
+ // g_shared_mutables.min_timer are done under g_shared_mutables.mu
+ gpr_mu_lock(&g_shared_mutables.mu);
+ grpc_millis min_timer = g_shared_mutables.min_timer;
+ gpr_mu_unlock(&g_shared_mutables.mu);
+#endif
+
if (now < min_timer) {
if (next != nullptr) {
*next = GPR_MIN(*next, min_timer);
}
if (grpc_timer_check_trace.enabled()) {
- gpr_log(GPR_INFO,
- "TIMER CHECK SKIP: now=%" PRIdPTR " min_timer=%" PRIdPTR, now,
- min_timer);
+ gpr_log(GPR_INFO, "TIMER CHECK SKIP: now=%" PRId64 " min_timer=%" PRId64,
+ now, min_timer);
}
return GRPC_TIMERS_CHECKED_AND_EMPTY;
}
@@ -642,13 +709,18 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
if (next == nullptr) {
next_str = gpr_strdup("NULL");
} else {
- gpr_asprintf(&next_str, "%" PRIdPTR, *next);
+ gpr_asprintf(&next_str, "%" PRId64, *next);
}
+#if GPR_ARCH_64
gpr_log(GPR_INFO,
- "TIMER CHECK BEGIN: now=%" PRIdPTR " next=%s tls_min=%" PRIdPTR
+ "TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64
" glob_min=%" PRIdPTR,
- now, next_str, gpr_tls_get(&g_last_seen_min_timer),
- gpr_atm_no_barrier_load(&g_shared_mutables.min_timer));
+ now, next_str, min_timer,
+ gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
+#else
+ gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64,
+ now, next_str, min_timer);
+#endif
gpr_free(next_str);
}
// actual code
@@ -660,7 +732,7 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
if (next == nullptr) {
next_str = gpr_strdup("NULL");
} else {
- gpr_asprintf(&next_str, "%" PRIdPTR, *next);
+ gpr_asprintf(&next_str, "%" PRId64, *next);
}
gpr_log(GPR_INFO, "TIMER CHECK END: r=%d; next=%s", r, next_str);
gpr_free(next_str);
diff --git a/src/core/lib/iomgr/timer_manager.cc b/src/core/lib/iomgr/timer_manager.cc
index 35e7914568..9fdae17909 100644
--- a/src/core/lib/iomgr/timer_manager.cc
+++ b/src/core/lib/iomgr/timer_manager.cc
@@ -172,7 +172,7 @@ static bool wait_until(grpc_millis next) {
if (grpc_timer_check_trace.enabled()) {
grpc_millis wait_time = next - grpc_core::ExecCtx::Get()->Now();
- gpr_log(GPR_INFO, "sleep for a %" PRIdPTR " milliseconds", wait_time);
+ gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds", wait_time);
}
} else { // g_timed_waiter == true && next >= g_timed_waiter_deadline
next = GRPC_MILLIS_INF_FUTURE;
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
index 2129029737..43dd68e874 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
@@ -219,9 +219,11 @@ static void on_oauth2_token_fetcher_http_response(void* user_data,
gpr_mu_lock(&c->mu);
c->token_fetch_pending = false;
c->access_token_md = GRPC_MDELEM_REF(access_token_md);
- c->token_expiration = status == GRPC_CREDENTIALS_OK
- ? grpc_core::ExecCtx::Get()->Now() + token_lifetime
- : 0;
+ c->token_expiration =
+ status == GRPC_CREDENTIALS_OK
+ ? gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(token_lifetime, GPR_TIMESPAN))
+ : gpr_inf_past(GPR_CLOCK_MONOTONIC);
grpc_oauth2_pending_get_request_metadata* pending_request =
c->pending_requests;
c->pending_requests = nullptr;
@@ -259,8 +261,10 @@ static bool oauth2_token_fetcher_get_request_metadata(
grpc_mdelem cached_access_token_md = GRPC_MDNULL;
gpr_mu_lock(&c->mu);
if (!GRPC_MDISNULL(c->access_token_md) &&
- (c->token_expiration - grpc_core::ExecCtx::Get()->Now() >
- refresh_threshold)) {
+ gpr_time_cmp(
+ gpr_time_sub(c->token_expiration, gpr_now(GPR_CLOCK_MONOTONIC)),
+ gpr_time_from_seconds(GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS,
+ GPR_TIMESPAN)) > 0) {
cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md);
}
if (!GRPC_MDISNULL(cached_access_token_md)) {
@@ -333,7 +337,7 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials* c,
c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2;
gpr_ref_init(&c->base.refcount, 1);
gpr_mu_init(&c->mu);
- c->token_expiration = 0;
+ c->token_expiration = gpr_inf_past(GPR_CLOCK_MONOTONIC);
c->fetch_func = fetch_func;
c->pollent =
grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create());
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
index c0dd1546e3..12a1d4484f 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
@@ -71,7 +71,7 @@ typedef struct {
grpc_call_credentials base;
gpr_mu mu;
grpc_mdelem access_token_md;
- grpc_millis token_expiration;
+ gpr_timespec token_expiration;
bool token_fetch_pending;
grpc_oauth2_pending_get_request_metadata* pending_requests;
grpc_httpcli_context httpcli_context;
diff --git a/src/core/lib/security/util/json_util.cc b/src/core/lib/security/util/json_util.cc
index 75512a19c9..fe9f5fe3d3 100644
--- a/src/core/lib/security/util/json_util.cc
+++ b/src/core/lib/security/util/json_util.cc
@@ -29,6 +29,10 @@ const char* grpc_json_get_string_property(const grpc_json* json,
const char* prop_name) {
grpc_json* child;
for (child = json->child; child != nullptr; child = child->next) {
+ if (child->key == nullptr) {
+ gpr_log(GPR_ERROR, "Invalid (null) JSON key encountered");
+ return nullptr;
+ }
if (strcmp(child->key, prop_name) == 0) break;
}
if (child == nullptr || child->type != GRPC_JSON_STRING) {
diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc
index fd56997388..1f1c08b159 100644
--- a/src/core/lib/slice/slice_buffer.cc
+++ b/src/core/lib/slice/slice_buffer.cc
@@ -333,14 +333,26 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n,
size_t slice_len = GRPC_SLICE_LENGTH(slice);
if (slice_len > n) {
sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n);
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
return;
} else if (slice_len == n) {
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
sb->count = idx;
return;
} else {
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
n -= slice_len;
sb->count = idx;
}
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index ba7cb92cdc..d44846cd12 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -233,6 +233,7 @@ struct grpc_call {
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
+ grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
grpc_closure release_call;
@@ -516,7 +517,6 @@ static void release_call(void* call, grpc_error* error) {
grpc_call* c = static_cast<grpc_call*>(call);
grpc_channel* channel = c->channel;
grpc_call_combiner_destroy(&c->call_combiner);
- gpr_free((char*)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
}
@@ -1132,7 +1132,7 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
return !(flags & invalid_positions);
}
-static int batch_slot_for_op(grpc_op_type type) {
+static size_t batch_slot_for_op(grpc_op_type type) {
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
return 0;
@@ -1155,17 +1155,20 @@ static int batch_slot_for_op(grpc_op_type type) {
static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
const grpc_op* ops,
size_t num_ops) {
- int slot = batch_slot_for_op(ops[0].op);
- batch_control** pslot = &call->active_batches[slot];
- if (*pslot == nullptr) {
- *pslot = static_cast<batch_control*>(
+ size_t slot_idx = batch_slot_for_op(ops[0].op);
+ batch_control** pslot = &call->active_batches[slot_idx];
+ batch_control* bctl;
+ if (*pslot != nullptr) {
+ bctl = *pslot;
+ if (bctl->call != nullptr) {
+ return nullptr;
+ }
+ memset(bctl, 0, sizeof(*bctl));
+ } else {
+ bctl = static_cast<batch_control*>(
gpr_arena_alloc(call->arena, sizeof(batch_control)));
+ *pslot = bctl;
}
- batch_control* bctl = *pslot;
- if (bctl->call != nullptr) {
- return nullptr;
- }
- memset(bctl, 0, sizeof(*bctl));
bctl->call = call;
bctl->op.payload = &call->stream_op_payload;
return bctl;
@@ -1207,7 +1210,6 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@@ -1215,14 +1217,9 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
-
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
- grpc_metadata_batch* md =
- &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
-
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@@ -1244,7 +1241,6 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
-
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
@@ -1254,7 +1250,6 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
-
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1536,6 +1531,19 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl);
}
+static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
+ batch_control* bctl = static_cast<batch_control*>(bctlp);
+ grpc_call* call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
+ add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_metadata_batch* md =
+ &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
+ recv_trailing_filter(call, md);
+ }
+ finish_batch_step(bctl);
+}
+
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
@@ -1556,7 +1564,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i;
const grpc_op* op;
batch_control* bctl;
- int num_completion_callbacks_needed = 1;
+ bool has_send_ops = false;
+ int num_recv_ops = 0;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@@ -1662,6 +1671,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@@ -1691,6 +1701,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@@ -1711,6 +1722,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@@ -1775,6 +1787,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
+ has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@@ -1802,7 +1815,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@@ -1824,7 +1837,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
- num_completion_callbacks_needed++;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@@ -1850,11 +1863,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@@ -1875,11 +1893,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
- stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- stream_op_payload->collect_stats.collect_stats =
+ stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
+ GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
+ receiving_trailing_metadata_ready, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->receiving_trailing_metadata_ready;
+ ++num_recv_ops;
break;
}
}
@@ -1889,13 +1912,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
- gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
+ gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
- GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
- grpc_schedule_on_exec_ctx);
- stream_op->on_complete = &bctl->finish_batch;
- gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
+ if (has_send_ops) {
+ GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
+ grpc_schedule_on_exec_ctx);
+ stream_op->on_complete = &bctl->finish_batch;
+ }
+ gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_batch(call, stream_op, &bctl->start_batch);
done:
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 10bc5900a1..a466b325be 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -72,10 +72,6 @@ struct grpc_channel {
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack*)((c) + 1))
-#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \
- (((grpc_channel*)(channel_stack)) - 1)
-#define CHANNEL_FROM_TOP_ELEM(top_elem) \
- CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
static void destroy_channel(void* arg, grpc_error* error);
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index f751741712..7da9e6b74c 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -321,8 +321,7 @@ static const cq_vtable g_cq_vtable[] = {
#define POLLSET_FROM_CQ(cq) \
((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
-grpc_core::TraceFlag grpc_cq_pluck_trace(true, "queue_pluck");
-grpc_core::TraceFlag grpc_cq_event_timeout_trace(true, "queue_timeout");
+grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
#define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \
if (grpc_api_trace.enabled() && (grpc_cq_pluck_trace.enabled() || \
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index c9dc2d93c1..84446a4d92 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -30,7 +30,6 @@
/* These trace flags default to 1. The corresponding lines are only traced
if grpc_api_trace is also truthy */
extern grpc_core::TraceFlag grpc_cq_pluck_trace;
-extern grpc_core::TraceFlag grpc_cq_event_timeout_trace;
extern grpc_core::TraceFlag grpc_trace_operation_failures;
extern grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags;
extern grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount;
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index 6b41e4b37e..cbdb77c844 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -184,7 +184,8 @@ void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
nullptr) {
transport->vtable->set_pollset_set(transport, stream, pollset_set);
} else {
- abort();
+ // No-op for empty pollset. Empty pollset is possible when using
+ // non-fd-based event engines such as CFStream.
}
}
@@ -211,21 +212,32 @@ void grpc_transport_stream_op_batch_finish_with_failure(
if (batch->send_message) {
batch->payload->send_message.send_message.reset();
}
- if (batch->recv_message) {
- GRPC_CALL_COMBINER_START(
- call_combiner, batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error), "failing recv_message_ready");
+ if (batch->cancel_stream) {
+ GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
+ // Construct a list of closures to execute.
+ grpc_core::CallCombinerClosureList closures;
if (batch->recv_initial_metadata) {
- GRPC_CALL_COMBINER_START(
- call_combiner,
+ closures.Add(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
- GRPC_CLOSURE_SCHED(batch->on_complete, error);
- if (batch->cancel_stream) {
- GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
+ if (batch->recv_message) {
+ closures.Add(batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error), "failing recv_message_ready");
+ }
+ if (batch->recv_trailing_metadata) {
+ closures.Add(
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
+ GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
+ }
+ if (batch->on_complete != nullptr) {
+ closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
+ "failing on_complete");
}
+ // Execute closures.
+ closures.RunClosures(call_combiner);
+ GRPC_ERROR_UNREF(error);
}
typedef struct {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 10e9df0f7c..585b9dfae9 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -122,9 +122,15 @@ typedef struct grpc_transport_stream_op_batch_payload
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op_batch {
- /** Should be enqueued when all requested operations (excluding recv_message
- and recv_initial_metadata which have their own closures) in a given batch
- have been completed. */
+ /** Should be scheduled when all of the non-recv operations in the batch
+ are complete.
+
+ The recv ops (recv_initial_metadata, recv_message, and
+ recv_trailing_metadata) each have their own callbacks. If a batch
+ contains both recv ops and non-recv ops, on_complete should be
+ scheduled as soon as the non-recv ops are complete, regardless of
+ whether or not the recv ops are complete. If a batch contains
+ only recv ops, on_complete can be null. */
grpc_closure* on_complete;
/** Values for the stream op (fields set are determined by flags above) */
@@ -149,9 +155,6 @@ typedef struct grpc_transport_stream_op_batch {
*/
bool recv_trailing_metadata : 1;
- /** Collect any stats into provided buffer, zero internal stat counters */
- bool collect_stats : 1;
-
/** Cancel this stream with the provided error */
bool cancel_stream : 1;
@@ -168,13 +171,11 @@ struct grpc_transport_stream_op_batch_payload {
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags;
- // If non-NULL, will be set by the transport to the peer string
- // (a char*, which the caller takes ownership of).
+ // If non-NULL, will be set by the transport to the peer string (a char*).
+ // The transport retains ownership of the string.
// Note: This pointer may be used by the transport after the
// send_initial_metadata op is completed. It must remain valid
// until the call is destroyed.
- // Note: When a transport sets this, it must free the previous
- // value, if any.
gpr_atm* peer_string;
} send_initial_metadata;
@@ -202,13 +203,11 @@ struct grpc_transport_stream_op_batch_payload {
// immediately available. This may be a signal that we received a
// Trailers-Only response.
bool* trailing_metadata_available;
- // If non-NULL, will be set by the transport to the peer string
- // (a char*, which the caller takes ownership of).
+ // If non-NULL, will be set by the transport to the peer string (a char*).
+ // The transport retains ownership of the string.
// Note: This pointer may be used by the transport after the
// recv_initial_metadata op is completed. It must remain valid
// until the call is destroyed.
- // Note: When a transport sets this, it must free the previous
- // value, if any.
gpr_atm* peer_string;
} recv_initial_metadata;
@@ -223,11 +222,10 @@ struct grpc_transport_stream_op_batch_payload {
struct {
grpc_metadata_batch* recv_trailing_metadata;
- } recv_trailing_metadata;
-
- struct {
grpc_transport_stream_stats* collect_stats;
- } collect_stats;
+ /** Should be enqueued when initial metadata is ready to be processed. */
+ grpc_closure* recv_trailing_metadata_ready;
+ } recv_trailing_metadata;
/** Forcefully close this stream.
The HTTP2 semantics should be:
diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc
index 99af7c1931..8c7db642a5 100644
--- a/src/core/lib/transport/transport_op_string.cc
+++ b/src/core/lib/transport/transport_op_string.cc
@@ -52,7 +52,7 @@ static void put_metadata_list(gpr_strvec* b, grpc_metadata_batch md) {
}
if (md.deadline != GRPC_MILLIS_INF_FUTURE) {
char* tmp;
- gpr_asprintf(&tmp, " deadline=%" PRIdPTR, md.deadline);
+ gpr_asprintf(&tmp, " deadline=%" PRId64, md.deadline);
gpr_strvec_add(b, tmp);
}
}
@@ -120,13 +120,6 @@ char* grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
- if (op->collect_stats) {
- gpr_strvec_add(&b, gpr_strdup(" "));
- gpr_asprintf(&tmp, "COLLECT_STATS:%p",
- op->payload->collect_stats.collect_stats);
- gpr_strvec_add(&b, tmp);
- }
-
out = gpr_strvec_flatten(&b, nullptr);
gpr_strvec_destroy(&b);