aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc64
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc11
-rw-r--r--src/core/lib/surface/call.cc5
3 files changed, 53 insertions, 27 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 5f373cdb25..f2bb409680 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -379,6 +379,9 @@ typedef struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
+ /* Finished sending initial request. */
+ grpc_closure lb_on_sent_initial_request;
+
/* Status from the LB server has been received. This signals the end of the LB
* call. */
grpc_closure lb_on_server_status_received;
@@ -418,6 +421,7 @@ typedef struct glb_lb_policy {
/** LB fallback timer */
grpc_timer lb_fallback_timer;
+ bool initial_request_sent;
bool seen_initial_response;
/* Stats for client-side load reporting. Should be unreffed and
@@ -1359,6 +1363,22 @@ static void client_load_report_done_locked(void* arg, grpc_error* error) {
schedule_next_client_load_report(glb_policy);
}
+static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) {
+ grpc_op op;
+ memset(&op, 0, sizeof(op));
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message.send_message = glb_policy->client_load_report_payload;
+ GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
+ client_load_report_done_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
+ grpc_call_error call_error = grpc_call_start_batch_and_execute(
+ glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
+ if (call_error != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
+ GPR_ASSERT(GRPC_CALL_OK == call_error);
+ }
+}
+
static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
grpc_grpclb_dropped_call_counts* drop_entries =
(grpc_grpclb_dropped_call_counts*)
@@ -1402,22 +1422,15 @@ static void send_client_load_report_locked(void* arg, grpc_error* error) {
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
- // Send load report message.
- grpc_op op;
- memset(&op, 0, sizeof(op));
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message.send_message = glb_policy->client_load_report_payload;
- GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
- client_load_report_done_locked, glb_policy,
- grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure);
- if (call_error != GRPC_CALL_OK) {
- gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
+ // If we've already sent the initial request, then we can go ahead and send
+ // the load report. Otherwise, we need to wait until the initial request has
+ // been sent to send this (see lb_on_sent_initial_request_locked() below).
+ if (glb_policy->initial_request_sent) {
+ do_send_client_load_report_locked(glb_policy);
}
}
+static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error);
static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
static void lb_on_response_received_locked(void* arg, grpc_error* error);
static void lb_call_init_locked(glb_lb_policy* glb_policy) {
@@ -1457,6 +1470,9 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
grpc_slice_unref_internal(request_payload_slice);
grpc_grpclb_request_destroy(request);
+ GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
+ lb_on_sent_initial_request_locked, glb_policy,
+ grpc_combiner_scheduler(glb_policy->base.combiner));
GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
lb_on_server_status_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
@@ -1473,6 +1489,7 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
glb_policy->lb_call_backoff.Init(backoff_options);
+ glb_policy->initial_request_sent = false;
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
}
@@ -1531,8 +1548,13 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
op->flags = 0;
op->reserved = nullptr;
op++;
- call_error = grpc_call_start_batch_and_execute(glb_policy->lb_call, ops,
- (size_t)(op - ops), nullptr);
+ /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
+ * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
+ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
+ "lb_on_sent_initial_request_locked");
+ call_error = grpc_call_start_batch_and_execute(
+ glb_policy->lb_call, ops, (size_t)(op - ops),
+ &glb_policy->lb_on_sent_initial_request);
GPR_ASSERT(GRPC_CALL_OK == call_error);
op = ops;
@@ -1569,6 +1591,18 @@ static void query_for_backends_locked(glb_lb_policy* glb_policy) {
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
+static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
+ glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
+ glb_policy->initial_request_sent = true;
+ // If we attempted to send a client load report before the initial request was
+ // sent, send the load report now.
+ if (glb_policy->client_load_report_payload != nullptr) {
+ do_send_client_load_report_locked(glb_policy);
+ }
+ GRPC_LB_POLICY_WEAK_UNREF(&glb_policy->base,
+ "lb_on_sent_initial_request_locked");
+}
+
static void lb_on_response_received_locked(void* arg, grpc_error* error) {
glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
grpc_op ops[2];
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 7ea1dfaa80..5fa02017f0 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -71,7 +71,6 @@ struct grpc_fd {
int shutdown;
int closed;
int released;
- gpr_atm pollhup;
grpc_error* shutdown_error;
/* The watcher list.
@@ -336,7 +335,6 @@ static grpc_fd* fd_create(int fd, const char* name) {
r->on_done_closure = nullptr;
r->closed = 0;
r->released = 0;
- gpr_atm_no_barrier_store(&r->pollhup, 0);
r->read_notifier_pollset = nullptr;
char* name2;
@@ -952,8 +950,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[0].events = POLLIN;
pfds[0].revents = 0;
for (i = 0; i < pollset->fd_count; i++) {
- if (fd_is_orphaned(pollset->fds[i]) ||
- gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
+ if (fd_is_orphaned(pollset->fds[i])) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
} else {
pollset->fds[fd_count++] = pollset->fds[i];
@@ -1020,12 +1017,6 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
(pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
}
- /* This is a mitigation to prevent poll() from spinning on a
- ** POLLHUP https://github.com/grpc/grpc/pull/13665
- */
- if (pfds[i].revents & POLLHUP) {
- gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
- }
fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK, pollset);
}
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index a457aaa7a2..d677576c14 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -1851,8 +1851,9 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
{
grpc_error* override_error = GRPC_ERROR_NONE;
if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
- override_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Error from server send status");
+ override_error =
+ error_from_status(op->data.send_status_from_server.status,
+ "Returned non-ok status");
}
if (op->data.send_status_from_server.status_details != nullptr) {
call->send_extra_metadata[1].md = grpc_mdelem_from_slices(