diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_uv.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.cc | 94 |
1 files changed, 45 insertions, 49 deletions
diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index e311964dbc..3628e1cc2c 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -65,20 +65,19 @@ typedef struct { grpc_pollset *pollset; } grpc_tcp; -static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { - grpc_slice_unref_internal(exec_ctx, tcp->read_slice); - grpc_resource_user_unref(exec_ctx, tcp->resource_user); +static void tcp_free(grpc_tcp *tcp) { + grpc_slice_unref_internal(tcp->read_slice); + grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->handle); gpr_free(tcp->peer_string); gpr_free(tcp); } #ifndef NDEBUG -#define TCP_UNREF(exec_ctx, tcp, reason) \ - tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, - const char *reason, const char *file, int line) { +static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, @@ -86,7 +85,7 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, val - 1); } if (gpr_unref(&tcp->refcount)) { - tcp_free(exec_ctx, tcp); + tcp_free(tcp); } } @@ -101,11 +100,11 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, gpr_ref(&tcp->refcount); } #else -#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp)) +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { +static void tcp_unref(grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { - tcp_free(exec_ctx, tcp); + tcp_free(tcp); } } @@ -113,40 +112,39 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif static void uv_close_callback(uv_handle_t *handle) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; grpc_tcp *tcp = (grpc_tcp *)handle->data; - TCP_UNREF(&exec_ctx, tcp, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); + TCP_UNREF(tcp, "destroy"); + grpc_exec_ctx_finish(); } -static grpc_slice alloc_read_slice(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user) { - return grpc_resource_user_slice_malloc(exec_ctx, resource_user, +static grpc_slice alloc_read_slice(grpc_resource_user *resource_user) { + return grpc_resource_user_slice_malloc(resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); } static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; grpc_tcp *tcp = (grpc_tcp *)handle->data; (void)suggested_size; buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); } static void read_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { grpc_slice sub; grpc_error *error; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; grpc_tcp *tcp = (grpc_tcp *)stream->data; grpc_closure *cb = tcp->read_cb; if (nread == 0) { // Nothing happened. Wait for the next callback return; } - TCP_UNREF(&exec_ctx, tcp, "read"); + TCP_UNREF(tcp, "read"); tcp->read_cb = NULL; // TODO(murgatroid99): figure out what the return value here means uv_read_stop(stream); @@ -156,7 +154,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // Successful read sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); grpc_slice_buffer_add(tcp->read_slices, sub); - tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); + tcp->read_slice = alloc_read_slice(tcp->resource_user); error = GRPC_ERROR_NONE; if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t i; @@ -175,12 +173,12 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // nread < 0: Error error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); } - GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_CLOSURE_SCHED(cb, error); + grpc_exec_ctx_finish(); } -static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_slice_buffer *read_slices, grpc_closure *cb) { +static void uv_endpoint_read(grpc_endpoint *ep, grpc_slice_buffer *read_slices, + grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; @@ -188,7 +186,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices); + grpc_slice_buffer_reset_and_unref_internal(read_slices); TCP_REF(tcp, "read"); // TODO(murgatroid99): figure out what the return value here means status = @@ -198,7 +196,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, grpc_slice_from_static_string(uv_strerror(status))); - GRPC_CLOSURE_SCHED(exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(cb, error); } if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); @@ -209,10 +207,10 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void write_callback(uv_write_t *req, int status) { grpc_tcp *tcp = (grpc_tcp *)req->data; grpc_error *error; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; grpc_closure *cb = tcp->write_cb; tcp->write_cb = NULL; - TCP_UNREF(&exec_ctx, tcp, "write"); + TCP_UNREF(tcp, "write"); if (status == 0) { error = GRPC_ERROR_NONE; } else { @@ -223,13 +221,13 @@ static void write_callback(uv_write_t *req, int status) { gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); } gpr_free(tcp->write_buffers); - grpc_resource_user_free(&exec_ctx, tcp->resource_user, + grpc_resource_user_free(tcp->resource_user, sizeof(uv_buf_t) * tcp->write_slices->count); - GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_CLOSURE_SCHED(cb, error); + grpc_exec_ctx_finish(); } -static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, +static void uv_endpoint_write(grpc_endpoint *ep, grpc_slice_buffer *write_slices, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -252,8 +250,8 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } if (tcp->shutting_down) { - GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "TCP socket is shutting down")); + GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "TCP socket is shutting down")); return; } @@ -263,15 +261,15 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->write_slices->count == 0) { // No slices means we don't have to do anything, // and libuv doesn't like empty writes - GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); return; } tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; buffers = (uv_buf_t *)gpr_malloc(sizeof(uv_buf_t) * buffer_count); - grpc_resource_user_alloc(exec_ctx, tcp->resource_user, - sizeof(uv_buf_t) * buffer_count, NULL); + grpc_resource_user_alloc(tcp->resource_user, sizeof(uv_buf_t) * buffer_count, + NULL); for (i = 0; i < buffer_count; i++) { slice = &tcp->write_slices->slices[i]; buffers[i].base = (char *)GRPC_SLICE_START_PTR(*slice); @@ -286,8 +284,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, write_callback); } -static void uv_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_pollset *pollset) { +static void uv_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { // No-op. We're ignoring pollsets currently (void)exec_ctx; (void)ep; @@ -296,7 +293,7 @@ static void uv_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->pollset = pollset; } -static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, +static void uv_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset) { // No-op. We're ignoring pollsets currently (void)exec_ctx; @@ -306,8 +303,7 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void shutdown_callback(uv_shutdown_t *req, int status) {} -static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - grpc_error *why) { +static void uv_endpoint_shutdown(grpc_endpoint *ep, grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { if (GRPC_TRACER_ON(grpc_tcp_trace)) { @@ -317,12 +313,12 @@ static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->shutting_down = true; uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); - grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); + grpc_resource_user_shutdown(tcp->resource_user); } GRPC_ERROR_UNREF(why); } -static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { +static void uv_destroy(grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; uv_close((uv_handle_t *)tcp->handle, uv_close_callback); @@ -349,7 +345,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + ExecCtx _local_exec_ctx; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); @@ -366,7 +362,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); - tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); + tcp->read_slice = alloc_read_slice(tcp->resource_user); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -374,7 +370,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, uv_unref((uv_handle_t *)handle); #endif - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_finish(); return &tcp->base; } |