aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc7
-rw-r--r--src/core/ext/transport/chttp2/transport/context_list.cc17
-rw-r--r--src/core/ext/transport/chttp2/transport/context_list.h30
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc3
-rw-r--r--src/core/lib/iomgr/buffer_list.cc13
-rw-r--r--src/core/lib/iomgr/buffer_list.h7
-rw-r--r--src/core/lib/iomgr/iomgr.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc17
9 files changed, 41 insertions, 59 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index da29ff1b37..4ca0f49adf 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -157,7 +157,6 @@ bool g_flow_control_enabled = true;
*/
grpc_chttp2_transport::~grpc_chttp2_transport() {
- gpr_log(GPR_INFO, "destruct transport %p", t);
size_t i;
if (channelz_socket != nullptr) {
@@ -172,11 +171,12 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
grpc_chttp2_hpack_compressor_destroy(&hpack_compressor);
grpc_core::ContextList::Execute(cl, nullptr, GRPC_ERROR_NONE);
+ cl = nullptr;
+
grpc_slice_buffer_destroy_internal(&read_buffer);
grpc_chttp2_hpack_parser_destroy(&hpack_parser);
grpc_chttp2_goaway_parser_destroy(&goaway_parser);
-
for (i = 0; i < STREAM_LIST_COUNT; i++) {
GPR_ASSERT(lists[i].head == nullptr);
GPR_ASSERT(lists[i].tail == nullptr);
@@ -1072,9 +1072,6 @@ static void write_action(void* gt, grpc_error* error) {
grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt);
void* cl = t->cl;
t->cl = nullptr;
- if (cl) {
- gpr_log(GPR_INFO, "cleared for write");
- }
grpc_endpoint_write(
t->ep, &t->outbuf,
GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t,
diff --git a/src/core/ext/transport/chttp2/transport/context_list.cc b/src/core/ext/transport/chttp2/transport/context_list.cc
index 91c26a5bca..11f5c14a39 100644
--- a/src/core/ext/transport/chttp2/transport/context_list.cc
+++ b/src/core/ext/transport/chttp2/transport/context_list.cc
@@ -21,33 +21,30 @@
#include "src/core/ext/transport/chttp2/transport/context_list.h"
namespace {
-void (*cb)(void*, const char*) = nullptr;
+void (*cb)(void*, grpc_core::Timestamps*) = nullptr;
}
namespace grpc_core {
void ContextList::Execute(void* arg, grpc_core::Timestamps* ts,
grpc_error* error) {
- gpr_log(GPR_INFO, "execute");
ContextList* head = static_cast<ContextList*>(arg);
ContextList* ptr;
while (head != nullptr) {
if (error == GRPC_ERROR_NONE && ts != nullptr) {
if (cb) {
- cb(head->s->context, ts);
+ cb(head->s_->context, ts);
}
}
- gpr_log(GPR_INFO, "one iteration %p %p", head, arg);
- GRPC_CHTTP2_STREAM_UNREF(static_cast<grpc_chttp2_stream *>(head->s),
- "timestamp exec");
- //grpc_stream_unref(head->s->refcount);
+ GRPC_CHTTP2_STREAM_UNREF(static_cast<grpc_chttp2_stream*>(head->s_),
+ "timestamp");
ptr = head;
- head = head->next;
- gpr_free(ptr);
+ head = head->next_;
+ grpc_core::Delete(ptr);
}
}
void grpc_http2_set_write_timestamps_callback(
- void (*fn)(void*, const char*)) {
+ void (*fn)(void*, grpc_core::Timestamps*)) {
cb = fn;
}
} /* namespace grpc_core */
diff --git a/src/core/ext/transport/chttp2/transport/context_list.h b/src/core/ext/transport/chttp2/transport/context_list.h
index 23a49d5b32..0cf7ba4dc3 100644
--- a/src/core/ext/transport/chttp2/transport/context_list.h
+++ b/src/core/ext/transport/chttp2/transport/context_list.h
@@ -35,29 +35,25 @@ class ContextList {
/* Make sure context is not already present */
ContextList* ptr = *head;
GRPC_CHTTP2_STREAM_REF(s, "timestamp");
- //grpc_stream_ref(s->refcount);
while (ptr != nullptr) {
- if (ptr->s == s) {
- GPR_ASSERT(false);
+ if (ptr->s_ == s) {
+ GPR_ASSERT(
+ false &&
+ "Trying to append a stream that is already present in the list");
}
- ptr = ptr->next;
+ ptr = ptr->next_;
}
- ContextList* elem =
- static_cast<ContextList*>(gpr_malloc(sizeof(ContextList)));
- elem->s = s;
- elem->next = nullptr;
+ ContextList* elem = grpc_core::New<ContextList>();
+ elem->s_ = s;
if (*head == nullptr) {
*head = elem;
- gpr_log(GPR_INFO, "new head");
- gpr_log(GPR_INFO, "append %p %p", elem, *head);
return;
}
- gpr_log(GPR_INFO, "append %p %p", elem, *head);
ptr = *head;
- while (ptr->next != nullptr) {
- ptr = ptr->next;
+ while (ptr->next_ != nullptr) {
+ ptr = ptr->next_;
}
- ptr->next = elem;
+ ptr->next_ = elem;
}
/* Executes a function \a fn with each context in the list and \a ts. It also
@@ -65,12 +61,12 @@ class ContextList {
static void Execute(void* arg, grpc_core::Timestamps* ts, grpc_error* error);
private:
- grpc_chttp2_stream* s;
- ContextList* next;
+ grpc_chttp2_stream* s_ = nullptr;
+ ContextList* next_ = nullptr;
};
void grpc_http2_set_write_timestamps_callback(
- void (*fn)(void*, const char*));
+ void (*fn)(void*, grpc_core::Timestamps*));
} /* namespace grpc_core */
#endif /* GRPC_CORE_EXT_TRANSPORT_CONTEXT_LIST_H */
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 8a83f4894c..877b8aba77 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -485,7 +485,7 @@ struct grpc_chttp2_transport {
bool keepalive_permit_without_calls = false;
/** keep-alive state machine state */
grpc_chttp2_keepalive_state keepalive_state;
- grpc_core::ContextList* cl;
+ grpc_core::ContextList* cl = nullptr;
grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
uint32_t num_messages_in_next_write = 0;
};
@@ -640,6 +640,8 @@ struct grpc_chttp2_stream {
bool unprocessed_incoming_frames_decompressed = false;
/** gRPC header bytes that are already decompressed */
size_t decompressed_header_bytes = 0;
+ /** Whether the bytes needs to be traced using Fathom */
+ bool traced = false;
};
/** Transport writing call flow:
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 3b3367d0f3..77320b496f 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -488,8 +488,7 @@ class StreamWriteContext {
return; // early out: nothing to do
}
- if (/* traced && */ grpc_endpoint_can_track_err(t_->ep)) {
- gpr_log(GPR_INFO, "for transport %p", t_);
+ if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) {
grpc_core::ContextList::Append(&t_->cl, s_);
}
while ((s_->flow_controlled_buffer.length > 0 ||
diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc
index d7884a5965..e20dab15b1 100644
--- a/src/core/lib/iomgr/buffer_list.cc
+++ b/src/core/lib/iomgr/buffer_list.cc
@@ -31,7 +31,6 @@
namespace grpc_core {
void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
void* arg) {
- gpr_log(GPR_INFO, "new entry %u", seq_no);
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
/* Store the current time as the sendmsg time. */
@@ -56,16 +55,21 @@ void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) {
gts->clock_type = GPR_CLOCK_REALTIME;
}
+void default_timestamps_callback(void* arg, grpc_core::Timestamps* ts,
+ grpc_error* shudown_err) {
+ gpr_log(GPR_DEBUG, "Timestamps callback has not been registered");
+}
+
/** The saved callback function that will be invoked when we get all the
* timestamps that we are going to get for a TracedBuffer. */
void (*timestamps_callback)(void*, grpc_core::Timestamps*,
- grpc_error* shutdown_err);
+ grpc_error* shutdown_err) =
+ default_timestamps_callback;
} /* namespace */
void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
struct sock_extended_err* serr,
struct scm_timestamping* tss) {
- gpr_log(GPR_INFO, "process timestamp %u", serr->ee_data);
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
TracedBuffer* next = nullptr;
@@ -87,7 +91,6 @@ void TracedBuffer::ProcessTimestamp(TracedBuffer** head,
/* Got all timestamps. Do the callback and free this TracedBuffer.
* The thing below can be passed by value if we don't want the
* restriction on the lifetime. */
- gpr_log(GPR_INFO, "calling");
timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE);
next = elem->next_;
Delete<TracedBuffer>(elem);
@@ -106,10 +109,8 @@ void TracedBuffer::Shutdown(TracedBuffer** head, void* remaining,
grpc_error* shutdown_err) {
GPR_DEBUG_ASSERT(head != nullptr);
TracedBuffer* elem = *head;
- gpr_log(GPR_INFO, "shutdown");
while (elem != nullptr) {
timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err);
- gpr_log(GPR_INFO, "iter");
auto* next = elem->next_;
Delete<TracedBuffer>(elem);
elem = next;
diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h
index f7d2f6c370..87d74f9ce2 100644
--- a/src/core/lib/iomgr/buffer_list.h
+++ b/src/core/lib/iomgr/buffer_list.h
@@ -82,7 +82,12 @@ class TracedBuffer {
grpc_core::TracedBuffer* next_; /* The next TracedBuffer in the list */
};
#else /* GRPC_LINUX_ERRQUEUE */
-class TracedBuffer {};
+class TracedBuffer {
+ public:
+ /* Dummy shutdown function */
+ static void Shutdown(grpc_core::TracedBuffer** head, void* remaining,
+ grpc_error* shutdown_err) {}
+};
#endif /* GRPC_LINUX_ERRQUEUE */
/** Sets the callback function to call when timestamps for a write are
diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc
index 7c0f19d0dd..30b68db4df 100644
--- a/src/core/lib/iomgr/iomgr.cc
+++ b/src/core/lib/iomgr/iomgr.cc
@@ -33,7 +33,6 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/thd.h"
-#include "src/core/ext/transport/chttp2/transport/context_list.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
@@ -59,7 +58,6 @@ void grpc_iomgr_init() {
g_root_object.name = (char*)"root";
grpc_network_status_init();
grpc_iomgr_platform_init();
- grpc_tcp_set_write_timestamps_callback(grpc_core::ContextList::Execute);
}
void grpc_iomgr_start() { grpc_timer_manager_init(); }
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 78c8d1eed8..cb4c9db7a6 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -382,7 +382,6 @@ static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
static void tcp_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
- gpr_log(GPR_INFO, "tcp destroy %p %p", ep, tcp->outgoing_buffer_arg);
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
gpr_mu_lock(&tcp->tb_mu);
@@ -594,7 +593,6 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
ssize_t* sent_length,
grpc_error** error) {
if (!tcp->socket_ts_enabled) {
- gpr_log(GPR_INFO, "set timestamps");
uint32_t opt = grpc_core::kTimestampingSocketOptions;
if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
static_cast<void*>(&opt), sizeof(opt)) != 0) {
@@ -627,7 +625,6 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
*sent_length = length;
/* Only save timestamps if all the bytes were taken by sendmsg. */
if (sending_length == static_cast<size_t>(length)) {
- gpr_log(GPR_INFO, "tcp new entry %p %p", tcp, tcp->outgoing_buffer_arg);
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::AddNewEntry(
&tcp->tb_head, static_cast<int>(tcp->bytes_counter + length),
@@ -687,7 +684,6 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
* non-linux platforms, error processing is not used/enabled currently.
*/
static bool process_errors(grpc_tcp* tcp) {
- gpr_log(GPR_INFO, "process errors");
while (true) {
struct iovec iov;
iov.iov_base = nullptr;
@@ -750,8 +746,6 @@ static bool process_errors(grpc_tcp* tcp) {
}
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
- gpr_log(GPR_INFO, "handle error %p", arg);
- GRPC_LOG_IF_ERROR("handle error", GRPC_ERROR_REF(error));
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
@@ -761,8 +755,6 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
/* We aren't going to register to hear on error anymore, so it is safe to
* unref. */
- gpr_log(GPR_INFO, "%p %d early return", error,
- static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification)));
TCP_UNREF(tcp, "error-tracking");
return;
}
@@ -797,6 +789,8 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
}
#endif /* GRPC_LINUX_ERRQUEUE */
+/* If outgoing_buffer_arg is filled, shuts down the list early, so that any
+ * release operations needed can be performed on the arg */
void tcp_shutdown_buffer_list(grpc_tcp* tcp) {
if (tcp->outgoing_buffer_arg) {
gpr_mu_lock(&tcp->tb_mu);
@@ -856,7 +850,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
if (tcp->outgoing_buffer_arg != nullptr) {
if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
error)) {
- gpr_log(GPR_INFO, "something went wrong");
tcp_shutdown_buffer_list(tcp);
return true; /* something went wrong with timestamps */
}
@@ -881,13 +874,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
}
return false;
} else if (errno == EPIPE) {
- gpr_log(GPR_INFO, "something went wrong");
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
tcp_shutdown_buffer_list(tcp);
return true;
} else {
- gpr_log(GPR_INFO, "something went wrong");
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
tcp_shutdown_buffer_list(tcp);
@@ -951,7 +942,6 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
grpc_closure* cb, void* arg) {
GPR_TIMER_SCOPE("tcp_write", 0);
- gpr_log(GPR_INFO, "tcp_write %p %p", ep, arg);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_error* error = GRPC_ERROR_NONE;
@@ -992,7 +982,6 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
}
notify_on_write(tcp);
} else {
- gpr_log(GPR_INFO, "imm sched");
if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
@@ -1041,7 +1030,6 @@ static bool tcp_can_track_err(grpc_endpoint* ep) {
struct sockaddr addr;
socklen_t len = sizeof(addr);
if (getsockname(tcp->fd, &addr, &len) < 0) {
- gpr_log(GPR_ERROR, "getsockname");
return false;
}
if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) {
@@ -1160,7 +1148,6 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
grpc_closure* done) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
- gpr_log(GPR_INFO, "destroy and release %p %p", ep, tcp->outgoing_buffer_arg);
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;