aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-12-09 10:55:30 -0800
committerGravatar Mark D. Roth <roth@google.com>2016-12-09 10:55:30 -0800
commit32f35f85769f92f87541ea56a8aee7071a2b13d6 (patch)
treea51cf76ff97d76ac76d045c3fda94362e9965dd1 /src/core/lib
parent170a073b26b79200bbbaac2613965434a76df434 (diff)
parentcf9e317e836fb0619fdb970be04e000d628f9d43 (diff)
Merge remote-tracking branch 'upstream/master' into revert-8949-revert-8922-slice_cleanup
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/iomgr/tcp_posix.c21
-rw-r--r--src/core/lib/surface/call.c4
-rw-r--r--src/core/lib/surface/completion_queue.c2
3 files changed, 21 insertions, 6 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 12a4797e6f..fd80779519 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -107,6 +107,12 @@ typedef struct {
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
+static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) {
+ return grpc_error_set_str(
+ grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
+ GRPC_ERROR_STR_TARGET_ADDRESS, tcp->peer_string);
+}
+
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
@@ -230,13 +236,15 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
+ call_read_cb(exec_ctx, tcp,
+ tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed"));
+ call_read_cb(exec_ctx, tcp,
+ tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
@@ -366,7 +374,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
tcp->outgoing_byte_idx = unwind_byte_idx;
return false;
} else {
- *error = GRPC_OS_ERROR(errno, "sendmsg");
+ *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
return true;
}
}
@@ -447,9 +455,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd)
- ? GRPC_ERROR_CREATE("EOF")
- : GRPC_ERROR_NONE,
+ grpc_exec_ctx_sched(exec_ctx, cb,
+ grpc_fd_is_shutdown(tcp->em_fd)
+ ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
+ : GRPC_ERROR_NONE,
NULL);
return;
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 1e0f3eeca5..8ca3cab9d5 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1551,6 +1551,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
+ /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come
+ from server.c. In that case, it's coming from accept_stream, and in
+ that case we're not necessarily covered by a poller. */
+ stream_op->covered_by_poller = call->is_client;
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 4e0feb56ac..184c1a1a16 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -354,11 +354,13 @@ static void dump_pending_tags(grpc_completion_queue *cc) {
gpr_strvec v;
gpr_strvec_init(&v);
gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
+ gpr_mu_lock(cc->mu);
for (size_t i = 0; i < cc->outstanding_tag_count; i++) {
char *s;
gpr_asprintf(&s, " %p", cc->outstanding_tags[i]);
gpr_strvec_add(&v, s);
}
+ gpr_mu_unlock(cc->mu);
char *out = gpr_strvec_flatten(&v, NULL);
gpr_strvec_destroy(&v);
gpr_log(GPR_DEBUG, "%s", out);