diff options
author | Mark D. Roth <roth@google.com> | 2016-12-09 11:04:07 -0800 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-12-09 11:04:07 -0800 |
commit | 39b1f913d4e4be5cbf3ba4aea989632a93cad3b7 (patch) | |
tree | af54b23e8d522b9babdac2ecca497773777af17a /src/core/lib | |
parent | 16eaa454154122a5db61cd8058794af6c5fe9077 (diff) | |
parent | cf9e317e836fb0619fdb970be04e000d628f9d43 (diff) |
Merge remote-tracking branch 'upstream/master' into client_channel_init_cleanup
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 21 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 4 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 2 |
3 files changed, 21 insertions, 6 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 12a4797e6f..fd80779519 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -107,6 +107,12 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { + return grpc_error_set_str( + grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), + GRPC_ERROR_STR_TARGET_ADDRESS, tcp->peer_string); +} + static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, @@ -230,13 +236,15 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg")); + call_read_cb(exec_ctx, tcp, + tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed")); + call_read_cb(exec_ctx, tcp, + tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); @@ -366,7 +374,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { tcp->outgoing_byte_idx = unwind_byte_idx; return false; } else { - *error = GRPC_OS_ERROR(errno, "sendmsg"); + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); return true; } } @@ -447,9 +455,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd) - ? GRPC_ERROR_CREATE("EOF") - : GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, cb, + grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp) + : GRPC_ERROR_NONE, NULL); return; } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 1e0f3eeca5..8ca3cab9d5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1551,6 +1551,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } + /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come + from server.c. In that case, it's coming from accept_stream, and in + that case we're not necessarily covered by a poller. */ + stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 4e0feb56ac..184c1a1a16 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -354,11 +354,13 @@ static void dump_pending_tags(grpc_completion_queue *cc) { gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); + gpr_mu_lock(cc->mu); for (size_t i = 0; i < cc->outstanding_tag_count; i++) { char *s; gpr_asprintf(&s, " %p", cc->outstanding_tags[i]); gpr_strvec_add(&v, s); } + gpr_mu_unlock(cc->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); |