diff options
author | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2015-09-02 16:40:14 -0700 |
---|---|---|
committer | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2015-09-02 16:40:14 -0700 |
commit | aca3211b0f41aa862a424327de6d3a36e455a3c4 (patch) | |
tree | 2efde70412cb2d79c859a6b776ad18ca5d5c2f8f /src/core | |
parent | bfe8719e78be1e51a98bbf6a555f40adcae3ea22 (diff) | |
parent | b059ae54c362833b287d4342cb96aae69792279f (diff) |
Merge pull request #3183 from ctiller/second-coming
Refactor Endpoint API
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/httpcli/httpcli.c | 95 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.c | 17 | ||||
-rw-r--r-- | src/core/iomgr/endpoint.h | 63 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 21 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.c | 40 | ||||
-rw-r--r-- | src/core/iomgr/socket_windows.h | 16 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_windows.c | 9 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 525 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 79 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 262 | ||||
-rw-r--r-- | src/core/security/secure_endpoint.c | 188 | ||||
-rw-r--r-- | src/core/security/secure_transport_setup.c | 119 | ||||
-rw-r--r-- | src/core/support/slice_buffer.c | 22 | ||||
-rw-r--r-- | src/core/transport/chttp2/internal.h | 15 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 21 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 198 |
17 files changed, 775 insertions, 916 deletions
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 9012070e8e..1e38479eb1 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -61,6 +61,10 @@ typedef struct { grpc_httpcli_context *context; grpc_pollset *pollset; grpc_iomgr_object iomgr_obj; + gpr_slice_buffer incoming; + gpr_slice_buffer outgoing; + grpc_iomgr_closure on_read; + grpc_iomgr_closure done_write; } internal_request; static grpc_httpcli_get_override g_get_override = NULL; @@ -99,73 +103,70 @@ static void finish(internal_request *req, int success) { gpr_slice_unref(req->request_text); gpr_free(req->host); grpc_iomgr_unregister_object(&req->iomgr_obj); + gpr_slice_buffer_destroy(&req->incoming); + gpr_slice_buffer_destroy(&req->outgoing); gpr_free(req); } -static void on_read(void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status status) { +static void on_read(void *user_data, int success); + +static void do_read(internal_request *req) { + switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) { + case GRPC_ENDPOINT_DONE: + on_read(req, 1); + break; + case GRPC_ENDPOINT_PENDING: + break; + case GRPC_ENDPOINT_ERROR: + on_read(req, 0); + break; + } +} + +static void on_read(void *user_data, int success) { internal_request *req = user_data; size_t i; - for (i = 0; i < nslices; i++) { - if (GPR_SLICE_LENGTH(slices[i])) { + for (i = 0; i < req->incoming.count; i++) { + if (GPR_SLICE_LENGTH(req->incoming.slices[i])) { req->have_read_byte = 1; - if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) { + if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) { finish(req, 0); - goto done; + return; } } } - switch (status) { - case GRPC_ENDPOINT_CB_OK: - grpc_endpoint_notify_on_read(req->ep, on_read, req); - break; - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_ERROR: - case GRPC_ENDPOINT_CB_SHUTDOWN: - if (!req->have_read_byte) { - next_address(req); - } else { - finish(req, grpc_httpcli_parser_eof(&req->parser)); - } - break; - } - -done: - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); + if (success) { + do_read(req); + } else if (!req->have_read_byte) { + next_address(req); + } else { + finish(req, grpc_httpcli_parser_eof(&req->parser)); } } -static void on_written(internal_request *req) { - grpc_endpoint_notify_on_read(req->ep, on_read, req); -} +static void on_written(internal_request *req) { do_read(req); } -static void done_write(void *arg, grpc_endpoint_cb_status status) { +static void done_write(void *arg, int success) { internal_request *req = arg; - switch (status) { - case GRPC_ENDPOINT_CB_OK: - on_written(req); - break; - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_SHUTDOWN: - case GRPC_ENDPOINT_CB_ERROR: - next_address(req); - break; + if (success) { + on_written(req); + } else { + next_address(req); } } static void start_write(internal_request *req) { gpr_slice_ref(req->request_text); - switch ( - grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) { - case GRPC_ENDPOINT_WRITE_DONE: + gpr_slice_buffer_add(&req->outgoing, req->request_text); + switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) { + case GRPC_ENDPOINT_DONE: on_written(req); break; - case GRPC_ENDPOINT_WRITE_PENDING: + case GRPC_ENDPOINT_PENDING: break; - case GRPC_ENDPOINT_WRITE_ERROR: + case GRPC_ENDPOINT_ERROR: finish(req, 0); break; } @@ -237,6 +238,10 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; req->context = context; req->pollset = pollset; + grpc_iomgr_closure_init(&req->on_read, on_read, req); + grpc_iomgr_closure_init(&req->done_write, done_write, req); + gpr_slice_buffer_init(&req->incoming); + gpr_slice_buffer_init(&req->outgoing); gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); grpc_iomgr_register_object(&req->iomgr_obj, name); gpr_free(name); @@ -270,7 +275,11 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; req->context = context; req->pollset = pollset; - gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); + grpc_iomgr_closure_init(&req->on_read, on_read, req); + grpc_iomgr_closure_init(&req->done_write, done_write, req); + gpr_slice_buffer_init(&req->incoming); + gpr_slice_buffer_init(&req->outgoing); + gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path); grpc_iomgr_register_object(&req->iomgr_obj, name); gpr_free(name); req->host = gpr_strdup(request->host); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 8ee14bce9b..a7878e31dd 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -33,17 +33,16 @@ #include "src/core/iomgr/endpoint.h" -void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data) { - ep->vtable->notify_on_read(ep, cb, user_data); +grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { + return ep->vtable->read(ep, slices, cb); } -grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { - return ep->vtable->write(ep, slices, nslices, cb, user_data); +grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { + return ep->vtable->write(ep, slices, cb); } void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index ea92a500e8..d14d52d561 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -37,6 +37,7 @@ #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_set.h" #include <grpc/support/slice.h> +#include <grpc/support/slice_buffer.h> #include <grpc/support/time.h> /* An endpoint caps a streaming channel between two communicating processes. @@ -45,31 +46,17 @@ typedef struct grpc_endpoint grpc_endpoint; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; -typedef enum grpc_endpoint_cb_status { - GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */ - GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */ - GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */ - GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */ -} grpc_endpoint_cb_status; - -typedef enum grpc_endpoint_write_status { - GRPC_ENDPOINT_WRITE_DONE, /* completed immediately, cb won't be called */ - GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */ - GRPC_ENDPOINT_WRITE_ERROR /* write errored out, cb won't be called */ -} grpc_endpoint_write_status; - -typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices, - size_t nslices, - grpc_endpoint_cb_status error); -typedef void (*grpc_endpoint_write_cb)(void *user_data, - grpc_endpoint_cb_status error); +typedef enum grpc_endpoint_op_status { + GRPC_ENDPOINT_DONE, /* completed immediately, cb won't be called */ + GRPC_ENDPOINT_PENDING, /* cb will be called when completed */ + GRPC_ENDPOINT_ERROR /* write errored out, cb won't be called */ +} grpc_endpoint_op_status; struct grpc_endpoint_vtable { - void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data); - grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, - size_t nslices, grpc_endpoint_write_cb cb, - void *user_data); + grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_iomgr_closure *cb); + grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_iomgr_closure *cb); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); void (*shutdown)(grpc_endpoint *ep); @@ -77,26 +64,32 @@ struct grpc_endpoint_vtable { char *(*get_peer)(grpc_endpoint *ep); }; -/* When data is available on the connection, calls the callback with slices. */ -void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data); +/* When data is available on the connection, calls the callback with slices. + Callback success indicates that the endpoint can accept more reads, failure + indicates the endpoint is closed. + Valid slices may be placed into \a slices even on callback success == 0. */ +grpc_endpoint_op_status grpc_endpoint_read( + grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT; char *grpc_endpoint_get_peer(grpc_endpoint *ep); /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it - returns GRPC_ENDPOINT_WRITE_DONE. - Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the - connection is ready for more data. */ -grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data); + returns GRPC_ENDPOINT_DONE. + Otherwise it returns GRPC_ENDPOINT_PENDING and calls cb when the + connection is ready for more data. + \a slices may be mutated at will by the endpoint until cb is called. + No guarantee is made to the content of slices after a write EXCEPT that + it is a valid slice buffer. + */ +grpc_endpoint_op_status grpc_endpoint_write( + grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT; /* Causes any pending read/write callbacks to run immediately with - GRPC_ENDPOINT_CB_SHUTDOWN status */ + success==0 */ void grpc_endpoint_shutdown(grpc_endpoint *ep); void grpc_endpoint_destroy(grpc_endpoint *ep); diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 09a457dd9a..006f8b2abf 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -52,7 +52,6 @@ static OVERLAPPED g_iocp_custom_overlap; static gpr_event g_shutdown_iocp; static gpr_event g_iocp_done; -static gpr_atm g_orphans = 0; static gpr_atm g_custom_events = 0; static HANDLE g_iocp; @@ -92,22 +91,13 @@ static void do_iocp_work() { gpr_log(GPR_ERROR, "Unknown IOCP operation"); abort(); } - GPR_ASSERT(info->outstanding); - if (socket->orphan) { - info->outstanding = 0; - if (!socket->read_info.outstanding && !socket->write_info.outstanding) { - grpc_winsocket_destroy(socket); - gpr_atm_full_fetch_add(&g_orphans, -1); - } - return; - } success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes, FALSE, &flags); info->bytes_transfered = bytes; info->wsa_error = success ? 0 : WSAGetLastError(); GPR_ASSERT(overlapped == &info->overlapped); - gpr_mu_lock(&socket->state_mu); GPR_ASSERT(!info->has_pending_iocp); + gpr_mu_lock(&socket->state_mu); if (info->cb) { f = info->cb; opaque = info->opaque; @@ -120,9 +110,8 @@ static void do_iocp_work() { } static void iocp_loop(void *p) { - while (gpr_atm_acq_load(&g_orphans) || gpr_atm_acq_load(&g_custom_events) || + while (gpr_atm_acq_load(&g_custom_events) || !gpr_event_get(&g_shutdown_iocp)) { - grpc_maybe_call_delayed_callbacks(NULL, 1); do_iocp_work(); } @@ -175,12 +164,6 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) { GPR_ASSERT(ret == g_iocp); } -void grpc_iocp_socket_orphan(grpc_winsocket *socket) { - GPR_ASSERT(!socket->orphan); - gpr_atm_full_fetch_add(&g_orphans, 1); - socket->orphan = 1; -} - /* Calling notify_on_read or write means either of two things: -) The IOCP already completed in the background, and we need to call the callback now. diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index ee3847a229..7d2dc45176 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -42,7 +42,6 @@ void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket *); -void grpc_iocp_socket_orphan(grpc_winsocket *); void grpc_socket_notify_on_write(grpc_winsocket *, void (*cb)(void *, int success), void *opaque); diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 7d8421376b..2cbe945ca3 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -62,46 +62,12 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) { operations to abort them. We need to do that this way because of the various callsites of that function, which happens to be in various mutex hold states, and that'd be unsafe to call them directly. */ -int grpc_winsocket_shutdown(grpc_winsocket *winsocket) { - int callbacks_set = 0; - SOCKET socket; - gpr_mu_lock(&winsocket->state_mu); - socket = winsocket->socket; - if (winsocket->read_info.cb) { - callbacks_set++; - grpc_iomgr_closure_init(&winsocket->shutdown_closure, - winsocket->read_info.cb, - winsocket->read_info.opaque); - grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0); - } - if (winsocket->write_info.cb) { - callbacks_set++; - grpc_iomgr_closure_init(&winsocket->shutdown_closure, - winsocket->write_info.cb, - winsocket->write_info.opaque); - grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0); - } - gpr_mu_unlock(&winsocket->state_mu); - closesocket(socket); - return callbacks_set; -} - -/* Abandons a socket. Either we're going to queue it up for garbage collecting - from the IO Completion Port thread, or destroy it immediately. Note that this - mechanisms assumes that we're either always waiting for an operation, or we - explicitly know that we don't. If there is a future case where we can have - an "idle" socket which is neither trying to read or write, we'd start leaking - both memory and sockets. */ -void grpc_winsocket_orphan(grpc_winsocket *winsocket) { - grpc_iomgr_unregister_object(&winsocket->iomgr_object); - if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) { - grpc_iocp_socket_orphan(winsocket); - } else { - grpc_winsocket_destroy(winsocket); - } +void grpc_winsocket_shutdown(grpc_winsocket *winsocket) { + closesocket(winsocket->socket); } void grpc_winsocket_destroy(grpc_winsocket *winsocket) { + grpc_iomgr_unregister_object(&winsocket->iomgr_object); gpr_mu_destroy(&winsocket->state_mu); gpr_free(winsocket); } diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index ecf2530173..498921e0fd 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -68,8 +68,6 @@ typedef struct grpc_winsocket_callback_info { /* The results of the overlapped operation. */ DWORD bytes_transfered; int wsa_error; - /* A boolean indicating that we started an operation. */ - int outstanding; } grpc_winsocket_callback_info; /* This is a wrapper to a Windows socket. A socket can have one outstanding @@ -92,10 +90,6 @@ typedef struct grpc_winsocket { /* You can't add the same socket twice to the same IO Completion Port. This prevents that. */ int added_to_iocp; - /* A boolean to indicate that the caller has abandoned that socket, but - there is a pending operation that the IO Completion Port will have to - wait for. The socket will be collected at that time. */ - int orphan; grpc_iomgr_closure shutdown_closure; @@ -108,14 +102,10 @@ typedef struct grpc_winsocket { grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name); /* Initiate an asynchronous shutdown of the socket. Will call off any pending - operation to cancel them. Returns the number of callbacks that got setup. */ -int grpc_winsocket_shutdown(grpc_winsocket *socket); + operation to cancel them. */ +void grpc_winsocket_shutdown(grpc_winsocket *socket); -/* Abandon a socket. */ -void grpc_winsocket_orphan(grpc_winsocket *socket); - -/* Destroy a socket. Should only be called by the IO Completion Port thread, - or by grpc_winsocket_orphan if there's no pending operation. */ +/* Destroy a socket. Should only be called if there's no pending operation. */ void grpc_winsocket_destroy(grpc_winsocket *socket); #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 79a58fe2af..665ef2885f 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -102,7 +102,6 @@ static void on_connect(void *acp, int from_iocp) { DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, &transfered_bytes, FALSE, &flags); - info->outstanding = 0; GPR_ASSERT(transfered_bytes == 0); if (!wsa_success) { char *utf8_message = gpr_format_message(WSAGetLastError()); @@ -125,12 +124,10 @@ static void on_connect(void *acp, int from_iocp) { return; } - ac->socket->write_info.outstanding = 0; - /* If we don't have an endpoint, it means the connection failed, so it doesn't matter if it aborted or failed. We need to orphan that socket. */ - if (!ep || aborted) grpc_winsocket_orphan(ac->socket); + if (!ep || aborted) grpc_winsocket_destroy(ac->socket); async_connect_cleanup(ac); /* If the connection was aborted, the callback was already called when the deadline was met. */ @@ -196,7 +193,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), socket = grpc_winsocket_create(sock, "client"); info = &socket->write_info; - info->outstanding = 1; success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); /* It wouldn't be unusual to get a success immediately. But we'll still get @@ -220,7 +216,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); - socket->write_info.outstanding = 1; grpc_socket_notify_on_write(socket, on_connect, ac); return; @@ -229,7 +224,7 @@ failure: gpr_log(GPR_ERROR, message, utf8_message); gpr_free(utf8_message); if (socket) { - grpc_winsocket_orphan(socket); + grpc_winsocket_destroy(socket); } else if (sock != INVALID_SOCKET) { closesocket(sock); } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 360e6ebd8c..0db7cd9f0e 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -61,209 +61,8 @@ #define SENDMSG_FLAGS 0 #endif -/* Holds a slice array and associated state. */ -typedef struct grpc_tcp_slice_state { - gpr_slice *slices; /* Array of slices */ - size_t nslices; /* Size of slices array. */ - ssize_t first_slice; /* First valid slice in array */ - ssize_t last_slice; /* Last valid slice in array */ - gpr_slice working_slice; /* pointer to original final slice */ - int working_slice_valid; /* True if there is a working slice */ - int memory_owned; /* True if slices array is owned */ -} grpc_tcp_slice_state; - int grpc_tcp_trace = 0; -static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices, - size_t nslices, size_t valid_slices) { - state->slices = slices; - state->nslices = nslices; - if (valid_slices == 0) { - state->first_slice = -1; - } else { - state->first_slice = 0; - } - state->last_slice = valid_slices - 1; - state->working_slice_valid = 0; - state->memory_owned = 0; -} - -/* Returns true if there is still available data */ -static int slice_state_has_available(grpc_tcp_slice_state *state) { - return state->first_slice != -1 && state->last_slice >= state->first_slice; -} - -static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) { - if (state->first_slice == -1) { - return 0; - } else { - return state->last_slice - state->first_slice + 1; - } -} - -static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) { - /* TODO(klempner): use realloc instead when first_slice is 0 */ - /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */ - gpr_slice *slices = state->slices; - size_t original_size = slice_state_slices_allocated(state); - size_t i; - gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size); - - for (i = 0; i < original_size; ++i) { - new_slices[i] = slices[i + state->first_slice]; - } - - state->slices = new_slices; - state->last_slice = original_size - 1; - if (original_size > 0) { - state->first_slice = 0; - } else { - state->first_slice = -1; - } - state->nslices = new_size; - - if (state->memory_owned) { - gpr_free(slices); - } - state->memory_owned = 1; -} - -static void slice_state_remove_prefix(grpc_tcp_slice_state *state, - size_t prefix_bytes) { - gpr_slice *current_slice = &state->slices[state->first_slice]; - size_t current_slice_size; - - while (slice_state_has_available(state)) { - current_slice_size = GPR_SLICE_LENGTH(*current_slice); - if (current_slice_size > prefix_bytes) { - /* TODO(klempner): Get rid of the extra refcount created here by adding a - native "trim the first N bytes" operation to splice */ - /* TODO(klempner): This really shouldn't be modifying the current slice - unless we own the slices array. */ - gpr_slice tail; - tail = gpr_slice_split_tail(current_slice, prefix_bytes); - gpr_slice_unref(*current_slice); - *current_slice = tail; - return; - } else { - gpr_slice_unref(*current_slice); - ++state->first_slice; - ++current_slice; - prefix_bytes -= current_slice_size; - } - } -} - -static void slice_state_destroy(grpc_tcp_slice_state *state) { - while (slice_state_has_available(state)) { - gpr_slice_unref(state->slices[state->first_slice]); - ++state->first_slice; - } - - if (state->memory_owned) { - gpr_free(state->slices); - state->memory_owned = 0; - } -} - -void slice_state_transfer_ownership(grpc_tcp_slice_state *state, - gpr_slice **slices, size_t *nslices) { - *slices = state->slices + state->first_slice; - *nslices = state->last_slice - state->first_slice + 1; - - state->first_slice = -1; - state->last_slice = -1; -} - -/* Fills iov with the first min(iov_size, available) slices, returns number - filled */ -static size_t slice_state_to_iovec(grpc_tcp_slice_state *state, - struct iovec *iov, size_t iov_size) { - size_t nslices = state->last_slice - state->first_slice + 1; - gpr_slice *slices = state->slices + state->first_slice; - size_t i; - if (nslices < iov_size) { - iov_size = nslices; - } - - for (i = 0; i < iov_size; ++i) { - iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]); - iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]); - } - return iov_size; -} - -/* Makes n blocks available at the end of state, writes them into iov, and - returns the number of bytes allocated */ -static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state, - struct iovec *iov, size_t n, - size_t slice_size) { - size_t target_size; - size_t i; - size_t allocated_bytes; - ssize_t allocated_slices = slice_state_slices_allocated(state); - - if (n - state->working_slice_valid >= state->nslices - state->last_slice) { - /* Need to grow the slice array */ - target_size = state->nslices; - do { - target_size = target_size * 2; - } while (target_size < allocated_slices + n - state->working_slice_valid); - /* TODO(klempner): If this ever needs to support both prefix removal and - append, we should be smarter about the growth logic here */ - slice_state_realloc(state, target_size); - } - - i = 0; - allocated_bytes = 0; - - if (state->working_slice_valid) { - iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]); - iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) - - GPR_SLICE_LENGTH(state->slices[state->last_slice]); - allocated_bytes += iov[0].iov_len; - ++i; - state->slices[state->last_slice] = state->working_slice; - state->working_slice_valid = 0; - } - - for (; i < n; ++i) { - ++state->last_slice; - state->slices[state->last_slice] = gpr_slice_malloc(slice_size); - iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]); - iov[i].iov_len = slice_size; - allocated_bytes += slice_size; - } - if (state->first_slice == -1) { - state->first_slice = 0; - } - return allocated_bytes; -} - -/* Remove the last n bytes from state */ -/* TODO(klempner): Consider having this defer actual deletion until later */ -static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) { - while (bytes > 0 && slice_state_has_available(state)) { - if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) { - state->working_slice = state->slices[state->last_slice]; - state->working_slice_valid = 1; - /* TODO(klempner): Combine these into a single operation that doesn't need - to refcount */ - gpr_slice_unref(gpr_slice_split_tail( - &state->slices[state->last_slice], - GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes)); - bytes = 0; - } else { - bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]); - gpr_slice_unref(state->slices[state->last_slice]); - --state->last_slice; - if (state->last_slice == -1) { - state->first_slice = -1; - } - } - } -} - typedef struct { grpc_endpoint base; grpc_fd *em_fd; @@ -273,80 +72,111 @@ typedef struct { size_t slice_size; gpr_refcount refcount; - grpc_endpoint_read_cb read_cb; - void *read_user_data; - grpc_endpoint_write_cb write_cb; - void *write_user_data; + gpr_slice_buffer *incoming_buffer; + gpr_slice_buffer *outgoing_buffer; + /** slice within outgoing_buffer to write next */ + size_t outgoing_slice_idx; + /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ + size_t outgoing_byte_idx; - grpc_tcp_slice_state write_state; + grpc_iomgr_closure *read_cb; + grpc_iomgr_closure *write_cb; grpc_iomgr_closure read_closure; grpc_iomgr_closure write_closure; - grpc_iomgr_closure handle_read_closure; - char *peer_string; } grpc_tcp; -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success); +static void tcp_handle_read(void *arg /* grpc_tcp */, int success); +static void tcp_handle_write(void *arg /* grpc_tcp */, int success); -static void grpc_tcp_shutdown(grpc_endpoint *ep) { +static void tcp_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_fd_shutdown(tcp->em_fd); } -static void grpc_tcp_unref(grpc_tcp *tcp) { - int refcount_zero = gpr_unref(&tcp->refcount); - if (refcount_zero) { - grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); - gpr_free(tcp->peer_string); - gpr_free(tcp); +static void tcp_free(grpc_tcp *tcp) { + grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_free(tcp->peer_string); + gpr_free(tcp); +} + +/*#define GRPC_TCP_REFCOUNT_DEBUG*/ +#ifdef GRPC_TCP_REFCOUNT_DEBUG +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count - 1); + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); } } -static void grpc_tcp_destroy(grpc_endpoint *ep) { +static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count + 1); + gpr_ref(&tcp->refcount); +} +#else +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_REF(tcp, reason) tcp_ref((tcp)) +static void tcp_unref(grpc_tcp *tcp) { + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); + } +} + +static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +#endif + +static void tcp_destroy(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_tcp_unref(tcp); + TCP_UNREF(tcp, "destroy"); } -static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status status) { - grpc_endpoint_read_cb cb = tcp->read_cb; +static void call_read_cb(grpc_tcp *tcp, int success) { + grpc_iomgr_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { size_t i; - gpr_log(GPR_DEBUG, "read: status=%d", status); - for (i = 0; i < nslices; i++) { - char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "read: success=%d", success); + for (i = 0; i < tcp->incoming_buffer->count; i++) { + char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump); gpr_free(dump); } } tcp->read_cb = NULL; - cb(tcp->read_user_data, slices, nslices, status); + tcp->incoming_buffer = NULL; + cb->cb(cb->cb_arg, success); } -#define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 -static void grpc_tcp_continue_read(grpc_tcp *tcp) { - gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; +static void tcp_continue_read(grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; - ssize_t allocated_bytes; - struct grpc_tcp_slice_state read_state; - gpr_slice *final_slices; - size_t final_nslices; + size_t i; GPR_ASSERT(!tcp->finished_edge); + GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC); + GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); - slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, - 0); - allocated_bytes = slice_state_append_blocks_into_iovec( - &read_state, iov, tcp->iov_size, tcp->slice_size); + while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { + gpr_slice_buffer_add_indexed(tcp->incoming_buffer, + gpr_slice_malloc(tcp->slice_size)); + } + for (i = 0; i < tcp->incoming_buffer->count; i++) { + iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); + iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); + } msg.msg_name = NULL; msg.msg_namelen = 0; @@ -362,106 +192,105 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) { } while (read_bytes < 0 && errno == EINTR); GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0); - if (read_bytes < allocated_bytes) { - /* TODO(klempner): Consider a second read first, in hopes of getting a - * quick EAGAIN and saving a bunch of allocations. */ - slice_state_remove_last(&read_state, read_bytes < 0 - ? allocated_bytes - : allocated_bytes - read_bytes); - } - if (read_bytes < 0) { - /* NB: After calling the user_cb a parallel call of the read handler may + /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ if (errno == EAGAIN) { if (tcp->iov_size > 1) { tcp->iov_size /= 2; } - if (slice_state_has_available(&read_state)) { - /* TODO(klempner): We should probably do the call into the application - without all this junk on the stack */ - /* FIXME(klempner): Refcount properly */ - slice_state_transfer_ownership(&read_state, &final_slices, - &final_nslices); - tcp->finished_edge = 1; - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); - } else { - /* We've consumed the edge, request a new one */ - slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); - } + /* We've consumed the edge, request a new one */ + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { /* TODO(klempner): Log interesting errors */ - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); + gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + call_read_cb(tcp, 0); + TCP_UNREF(tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ - if (slice_state_has_available(&read_state)) { - /* there were bytes already read: pass them up to the application */ - slice_state_transfer_ownership(&read_state, &final_slices, - &final_nslices); - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); - } else { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); - } - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); + gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + call_read_cb(tcp, 0); + TCP_UNREF(tcp, "read"); } else { - if (tcp->iov_size < MAX_READ_IOVEC) { + GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); + if ((size_t)read_bytes < tcp->incoming_buffer->length) { + gpr_slice_buffer_trim_end(tcp->incoming_buffer, + tcp->incoming_buffer->length - read_bytes); + } else if (tcp->iov_size < MAX_READ_IOVEC) { ++tcp->iov_size; } - GPR_ASSERT(slice_state_has_available(&read_state)); - slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices); - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); + GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); + call_read_cb(tcp, 1); + TCP_UNREF(tcp, "read"); } GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); } -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { +static void tcp_handle_read(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); if (!success) { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); - grpc_tcp_unref(tcp); + gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + call_read_cb(tcp, 0); + TCP_UNREF(tcp, "read"); } else { - grpc_tcp_continue_read(tcp); + tcp_continue_read(tcp); } } -static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data) { +static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, + gpr_slice_buffer *incoming_buffer, + grpc_iomgr_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; - tcp->read_user_data = user_data; - gpr_ref(&tcp->refcount); + tcp->incoming_buffer = incoming_buffer; + gpr_slice_buffer_reset_and_unref(incoming_buffer); + TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = 0; grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { - tcp->handle_read_closure.cb_arg = tcp; - grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1); + grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1); } + /* TODO(ctiller): immediate return */ + return GRPC_ENDPOINT_PENDING; } #define MAX_WRITE_IOVEC 16 -static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { +static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; int iov_size; ssize_t sent_length; - grpc_tcp_slice_state *state = &tcp->write_state; + ssize_t sending_length; + ssize_t trailing; + ssize_t unwind_slice_idx; + ssize_t unwind_byte_idx; for (;;) { - iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC); + sending_length = 0; + unwind_slice_idx = tcp->outgoing_slice_idx; + unwind_byte_idx = tcp->outgoing_byte_idx; + for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && + iov_size != MAX_WRITE_IOVEC; + iov_size++) { + iov[iov_size].iov_base = + GPR_SLICE_START_PTR( + tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + + tcp->outgoing_byte_idx; + iov[iov_size].iov_len = + GPR_SLICE_LENGTH( + tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - + tcp->outgoing_byte_idx; + sending_length += iov[iov_size].iov_len; + tcp->outgoing_slice_idx++; + tcp->outgoing_byte_idx = 0; + } + GPR_ASSERT(iov_size > 0); msg.msg_name = NULL; msg.msg_namelen = 0; @@ -480,70 +309,75 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { if (sent_length < 0) { if (errno == EAGAIN) { - return GRPC_ENDPOINT_WRITE_PENDING; + tcp->outgoing_slice_idx = unwind_slice_idx; + tcp->outgoing_byte_idx = unwind_byte_idx; + return GRPC_ENDPOINT_PENDING; } else { /* TODO(klempner): Log some of these */ - slice_state_destroy(state); - return GRPC_ENDPOINT_WRITE_ERROR; + return GRPC_ENDPOINT_ERROR; } } - /* TODO(klempner): Probably better to batch this after we finish flushing */ - slice_state_remove_prefix(state, sent_length); + GPR_ASSERT(tcp->outgoing_byte_idx == 0); + trailing = sending_length - sent_length; + while (trailing > 0) { + ssize_t slice_length; + + tcp->outgoing_slice_idx--; + slice_length = GPR_SLICE_LENGTH( + tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); + if (slice_length > trailing) { + tcp->outgoing_byte_idx = slice_length - trailing; + break; + } else { + trailing -= slice_length; + } + } - if (!slice_state_has_available(state)) { - return GRPC_ENDPOINT_WRITE_DONE; + if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { + return GRPC_ENDPOINT_DONE; } }; } -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { +static void tcp_handle_write(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; - grpc_endpoint_write_status write_status; - grpc_endpoint_cb_status cb_status; - grpc_endpoint_write_cb cb; + grpc_endpoint_op_status status; + grpc_iomgr_closure *cb; if (!success) { - slice_state_destroy(&tcp->write_state); cb = tcp->write_cb; tcp->write_cb = NULL; - cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN); - grpc_tcp_unref(tcp); + cb->cb(cb->cb_arg, 0); + TCP_UNREF(tcp, "write"); return; } GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); - write_status = grpc_tcp_flush(tcp); - if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { + status = tcp_flush(tcp); + if (status == GRPC_ENDPOINT_PENDING) { grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); } else { - slice_state_destroy(&tcp->write_state); - if (write_status == GRPC_ENDPOINT_WRITE_DONE) { - cb_status = GRPC_ENDPOINT_CB_OK; - } else { - cb_status = GRPC_ENDPOINT_CB_ERROR; - } cb = tcp->write_cb; tcp->write_cb = NULL; - cb(tcp->write_user_data, cb_status); - grpc_tcp_unref(tcp); + cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE); + TCP_UNREF(tcp, "write"); } GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); } -static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { +static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep, + gpr_slice_buffer *buf, + grpc_iomgr_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_endpoint_write_status status; + grpc_endpoint_op_status status; if (grpc_tcp_trace) { size_t i; - for (i = 0; i < nslices; i++) { - char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + for (i = 0; i < buf->count; i++) { + char *data = + gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data); gpr_free(data); } @@ -551,15 +385,19 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0); GPR_ASSERT(tcp->write_cb == NULL); - slice_state_init(&tcp->write_state, slices, nslices, nslices); - status = grpc_tcp_flush(tcp); - if (status == GRPC_ENDPOINT_WRITE_PENDING) { - /* TODO(klempner): Consider inlining rather than malloc for small nslices */ - slice_state_realloc(&tcp->write_state, nslices); - gpr_ref(&tcp->refcount); + if (buf->length == 0) { + GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); + return GRPC_ENDPOINT_DONE; + } + tcp->outgoing_buffer = buf; + tcp->outgoing_slice_idx = 0; + tcp->outgoing_byte_idx = 0; + + status = tcp_flush(tcp); + if (status == GRPC_ENDPOINT_PENDING) { + TCP_REF(tcp, "write"); tcp->write_cb = cb; - tcp->write_user_data = user_data; grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); } @@ -567,27 +405,25 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, return status; } -static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { +static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_pollset_add_fd(pollset, tcp->em_fd); } -static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, - grpc_pollset_set *pollset_set) { +static void tcp_add_to_pollset_set(grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); } -static char *grpc_tcp_get_peer(grpc_endpoint *ep) { +static char *tcp_get_peer(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return gpr_strdup(tcp->peer_string); } static const grpc_endpoint_vtable vtable = { - grpc_tcp_notify_on_read, grpc_tcp_write, - grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set, - grpc_tcp_shutdown, grpc_tcp_destroy, - grpc_tcp_get_peer}; + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_shutdown, tcp_destroy, tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, const char *peer_string) { @@ -597,21 +433,18 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; - tcp->read_user_data = NULL; - tcp->write_user_data = NULL; + tcp->incoming_buffer = NULL; tcp->slice_size = slice_size; tcp->iov_size = 1; tcp->finished_edge = 1; - slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); tcp->em_fd = em_fd; - tcp->read_closure.cb = grpc_tcp_handle_read; + tcp->read_closure.cb = tcp_handle_read; tcp->read_closure.cb_arg = tcp; - tcp->write_closure.cb = grpc_tcp_handle_write; + tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; - tcp->handle_read_closure.cb = grpc_tcp_handle_read; return &tcp->base; } diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index d0478d3604..b513d854aa 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -75,18 +75,18 @@ struct grpc_tcp_server { void *cb_arg; gpr_mu mu; - gpr_cv cv; /* active port count: how many ports are actually still listening */ int active_ports; - /* number of iomgr callbacks that have been explicitly scheduled during - * shutdown */ - int iomgr_callbacks_pending; /* all listening ports */ server_port *ports; size_t nports; size_t port_capacity; + + /* shutdown callback */ + void(*shutdown_complete)(void *); + void *shutdown_complete_arg; }; /* Public function. Allocates the proper data structures to hold a @@ -94,48 +94,61 @@ struct grpc_tcp_server { grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_mu_init(&s->mu); - gpr_cv_init(&s->cv); s->active_ports = 0; - s->iomgr_callbacks_pending = 0; s->cb = NULL; s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; + s->shutdown_complete = NULL; return s; } +static void dont_care_about_shutdown_completion(void *arg) {} + +static void finish_shutdown(grpc_tcp_server *s) { + size_t i; + + s->shutdown_complete(s->shutdown_complete_arg); + + /* Now that the accepts have been aborted, we can destroy the sockets. + The IOCP won't get notified on these, so we can flag them as already + closed by the system. */ + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + grpc_winsocket_destroy(sp->socket); + } + gpr_free(s->ports); + gpr_free(s); +} + /* Public function. Stops and destroys a grpc_tcp_server. */ void grpc_tcp_server_destroy(grpc_tcp_server *s, - void (*shutdown_done)(void *shutdown_done_arg), - void *shutdown_done_arg) { + void (*shutdown_complete)(void *shutdown_done_arg), + void *shutdown_complete_arg) { size_t i; + int immediately_done = 0; gpr_mu_lock(&s->mu); + + s->shutdown_complete = shutdown_complete + ? shutdown_complete + : dont_care_about_shutdown_completion; + s->shutdown_complete_arg = shutdown_complete_arg; + /* First, shutdown all fd's. This will queue abortion calls for all of the pending accepts due to the normal operation mechanism. */ + if (s->active_ports == 0) { + immediately_done = 1; + } for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; sp->shutting_down = 1; - s->iomgr_callbacks_pending += grpc_winsocket_shutdown(sp->socket); - } - /* This happens asynchronously. Wait while that happens. */ - while (s->active_ports || s->iomgr_callbacks_pending) { - gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_winsocket_shutdown(sp->socket); } gpr_mu_unlock(&s->mu); - /* Now that the accepts have been aborted, we can destroy the sockets. - The IOCP won't get notified on these, so we can flag them as already - closed by the system. */ - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; - grpc_winsocket_orphan(sp->socket); - } - gpr_free(s->ports); - gpr_free(s); - - if (shutdown_done) { - shutdown_done(shutdown_done_arg); + if (immediately_done) { + finish_shutdown(s); } } @@ -188,14 +201,17 @@ error: } static void decrement_active_ports_and_notify(server_port *sp) { + int notify = 0; sp->shutting_down = 0; - sp->socket->read_info.outstanding = 0; gpr_mu_lock(&sp->server->mu); GPR_ASSERT(sp->server->active_ports > 0); - if (0 == --sp->server->active_ports) { - gpr_cv_broadcast(&sp->server->cv); + if (0 == --sp->server->active_ports && sp->server->shutdown_complete != NULL) { + notify = 1; } gpr_mu_unlock(&sp->server->mu); + if (notify) { + finish_shutdown(sp->server); + } } /* start_accept will reference that for the IOCP notification request. */ @@ -280,12 +296,6 @@ static void on_accept(void *arg, int from_iocp) { this is necessary in the read/write case, it's useless for the accept case. We only need to adjust the pending callback count */ if (!from_iocp) { - gpr_mu_lock(&sp->server->mu); - GPR_ASSERT(sp->server->iomgr_callbacks_pending > 0); - if (0 == --sp->server->iomgr_callbacks_pending) { - gpr_cv_broadcast(&sp->server->cv); - } - gpr_mu_unlock(&sp->server->mu); return; } @@ -462,7 +472,6 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { - s->ports[i].socket->read_info.outstanding = 1; start_accept(s->ports + i); s->active_ports++; } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 901793ec43..fe3673c607 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -82,13 +82,11 @@ typedef struct grpc_tcp { /* Refcounting how many operations are in progress. */ gpr_refcount refcount; - grpc_endpoint_read_cb read_cb; - void *read_user_data; + grpc_iomgr_closure *read_cb; + grpc_iomgr_closure *write_cb; gpr_slice read_slice; - - grpc_endpoint_write_cb write_cb; - void *write_user_data; - gpr_slice_buffer write_slices; + gpr_slice_buffer *write_slices; + gpr_slice_buffer *read_slices; /* The IO Completion Port runs from another thread. We need some mechanism to protect ourselves when requesting a shutdown. */ @@ -98,82 +96,91 @@ typedef struct grpc_tcp { char *peer_string; } grpc_tcp; -static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +static void tcp_free(grpc_tcp *tcp) { + grpc_winsocket_destroy(tcp->socket); + gpr_mu_destroy(&tcp->mu); + gpr_free(tcp->peer_string); + gpr_free(tcp); +} +/*#define GRPC_TCP_REFCOUNT_DEBUG*/ +#ifdef GRPC_TCP_REFCOUNT_DEBUG +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count - 1); + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); + } +} + +static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count + 1); + gpr_ref(&tcp->refcount); +} +#else +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_REF(tcp, reason) tcp_ref((tcp)) static void tcp_unref(grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { - gpr_slice_buffer_destroy(&tcp->write_slices); - grpc_winsocket_orphan(tcp->socket); - gpr_mu_destroy(&tcp->mu); - gpr_free(tcp->peer_string); - gpr_free(tcp); + tcp_free(tcp); } } +static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +#endif + /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_read(void *tcpp, int from_iocp) { - grpc_tcp *tcp = (grpc_tcp *)tcpp; +static int on_read(grpc_tcp *tcp, int success) { grpc_winsocket *socket = tcp->socket; gpr_slice sub; gpr_slice *slice = NULL; size_t nslices = 0; - grpc_endpoint_cb_status status; - grpc_endpoint_read_cb cb; grpc_winsocket_callback_info *info = &socket->read_info; - void *opaque = tcp->read_user_data; int do_abort = 0; - gpr_mu_lock(&tcp->mu); - cb = tcp->read_cb; - tcp->read_cb = NULL; - if (!from_iocp || tcp->shutting_down) { - /* If we are here with from_iocp set to true, it means we got raced to - shutting down the endpoint. No actual abort callback will happen - though, so we're going to do it from here. */ - do_abort = 1; - } - gpr_mu_unlock(&tcp->mu); - - if (do_abort) { - if (from_iocp) { - tcp->socket->read_info.outstanding = 0; + if (success) { + if (socket->read_info.wsa_error != 0) { + if (socket->read_info.wsa_error != WSAECONNRESET) { + char *utf8_message = gpr_format_message(info->wsa_error); + gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); + gpr_free(utf8_message); + } + success = 0; gpr_slice_unref(tcp->read_slice); - } - tcp_unref(tcp); - if (cb) cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); - return; - } - - GPR_ASSERT(tcp->socket->read_info.outstanding); - - if (socket->read_info.wsa_error != 0) { - if (socket->read_info.wsa_error != WSAECONNRESET) { - char *utf8_message = gpr_format_message(info->wsa_error); - gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); - gpr_free(utf8_message); - } - gpr_slice_unref(tcp->read_slice); - status = GRPC_ENDPOINT_CB_ERROR; - } else { - if (info->bytes_transfered != 0) { - sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); - status = GRPC_ENDPOINT_CB_OK; - slice = ⊂ - nslices = 1; } else { - gpr_slice_unref(tcp->read_slice); - status = GRPC_ENDPOINT_CB_EOF; + if (info->bytes_transfered != 0) { + sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); + gpr_slice_buffer_add(tcp->read_slices, sub); + success = 1; + } else { + gpr_slice_unref(tcp->read_slice); + success = 0; + } } } - tcp->socket->read_info.outstanding = 0; + return success; +} - tcp_unref(tcp); - cb(opaque, slice, nslices, status); +static void on_read_cb(void *tcpp, int from_iocp) { + grpc_tcp *tcp = tcpp; + grpc_iomgr_closure *cb = tcp->read_cb; + int success = on_read(tcp, from_iocp); + tcp->read_cb = NULL; + TCP_UNREF(tcp, "read"); + if (cb) { + cb->cb(cb->cb_arg, success); + } } -static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *arg) { +static grpc_endpoint_op_status win_read(grpc_endpoint *ep, + gpr_slice_buffer *read_slices, + grpc_iomgr_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->read_info; @@ -182,15 +189,13 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, DWORD flags = 0; WSABUF buffer; - GPR_ASSERT(!tcp->socket->read_info.outstanding); if (tcp->shutting_down) { - cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); - return; + return GRPC_ENDPOINT_ERROR; } - tcp_ref(tcp); - tcp->socket->read_info.outstanding = 1; + tcp->read_cb = cb; - tcp->read_user_data = arg; + tcp->read_slices = read_slices; + gpr_slice_buffer_reset_and_unref(read_slices); tcp->read_slice = gpr_slice_malloc(8192); @@ -204,12 +209,14 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { + int ok; info->bytes_transfered = bytes_read; - /* This might heavily recurse. */ - on_read(tcp, 1); - return; + ok = on_read(tcp, 1); + return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; } + TCP_REF(tcp, "read"); + /* Otherwise, let's retry, by queuing a read. */ memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, @@ -218,71 +225,51 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, if (status != 0) { int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { + int ok; info->wsa_error = wsa_error; - on_read(tcp, 1); - return; + ok = on_read(tcp, 1); + return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; } } - grpc_socket_notify_on_read(tcp->socket, on_read, tcp); + grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp); + return GRPC_ENDPOINT_PENDING; } /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_write(void *tcpp, int from_iocp) { +static void on_write(void *tcpp, int success) { grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; - grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK; - grpc_endpoint_write_cb cb; - void *opaque = tcp->write_user_data; + grpc_iomgr_closure *cb; int do_abort = 0; gpr_mu_lock(&tcp->mu); cb = tcp->write_cb; tcp->write_cb = NULL; - if (!from_iocp || tcp->shutting_down) { - /* If we are here with from_iocp set to true, it means we got raced to - shutting down the endpoint. No actual abort callback will happen - though, so we're going to do it from here. */ - do_abort = 1; - } gpr_mu_unlock(&tcp->mu); - if (do_abort) { - if (from_iocp) { - tcp->socket->write_info.outstanding = 0; - gpr_slice_buffer_reset_and_unref(&tcp->write_slices); - } - tcp_unref(tcp); - if (cb) cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN); - return; - } - - GPR_ASSERT(tcp->socket->write_info.outstanding); - - if (info->wsa_error != 0) { - if (info->wsa_error != WSAECONNRESET) { - char *utf8_message = gpr_format_message(info->wsa_error); - gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); - gpr_free(utf8_message); + if (success) { + if (info->wsa_error != 0) { + if (info->wsa_error != WSAECONNRESET) { + char *utf8_message = gpr_format_message(info->wsa_error); + gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); + gpr_free(utf8_message); + } + success = 0; + } else { + GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length); } - status = GRPC_ENDPOINT_CB_ERROR; - } else { - GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length); } - gpr_slice_buffer_reset_and_unref(&tcp->write_slices); - tcp->socket->write_info.outstanding = 0; - - tcp_unref(tcp); - cb(opaque, status); + TCP_UNREF(tcp, "write"); + cb->cb(cb->cb_arg, success); } /* Initiates a write. */ -static grpc_endpoint_write_status win_write(grpc_endpoint *ep, - gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, - void *arg) { +static grpc_endpoint_op_status win_write(grpc_endpoint *ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *socket = tcp->socket; grpc_winsocket_callback_info *info = &socket->write_info; @@ -293,30 +280,25 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, WSABUF *allocated = NULL; WSABUF *buffers = local_buffers; - GPR_ASSERT(!tcp->socket->write_info.outstanding); if (tcp->shutting_down) { - return GRPC_ENDPOINT_WRITE_ERROR; + return GRPC_ENDPOINT_ERROR; } - tcp_ref(tcp); - tcp->socket->write_info.outstanding = 1; tcp->write_cb = cb; - tcp->write_user_data = arg; + tcp->write_slices = slices; - gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices); - - if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) { - buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count); + if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) { + buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count); allocated = buffers; } - for (i = 0; i < tcp->write_slices.count; i++) { - buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]); - buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]); + for (i = 0; i < tcp->write_slices->count; i++) { + buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]); + buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]); } /* First, let's try a synchronous, non-blocking write. */ - status = WSASend(socket->socket, buffers, tcp->write_slices.count, + status = WSASend(socket->socket, buffers, tcp->write_slices->count, &bytes_sent, 0, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); @@ -324,10 +306,10 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, connection that has its send queue filled up. But if we don't, then we can avoid doing an async write operation at all. */ if (info->wsa_error != WSAEWOULDBLOCK) { - grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR; + grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR; if (status == 0) { - ret = GRPC_ENDPOINT_WRITE_DONE; - GPR_ASSERT(bytes_sent == tcp->write_slices.length); + ret = GRPC_ENDPOINT_DONE; + GPR_ASSERT(bytes_sent == tcp->write_slices->length); } else { if (socket->read_info.wsa_error != WSAECONNRESET) { char *utf8_message = gpr_format_message(info->wsa_error); @@ -336,33 +318,30 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, } } if (allocated) gpr_free(allocated); - gpr_slice_buffer_reset_and_unref(&tcp->write_slices); - tcp->socket->write_info.outstanding = 0; - tcp_unref(tcp); return ret; } + TCP_REF(tcp, "write"); + /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same operation, this time asynchronously. */ memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED)); - status = WSASend(socket->socket, buffers, tcp->write_slices.count, + status = WSASend(socket->socket, buffers, tcp->write_slices->count, &bytes_sent, 0, &socket->write_info.overlapped, NULL); if (allocated) gpr_free(allocated); if (status != 0) { int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { - gpr_slice_buffer_reset_and_unref(&tcp->write_slices); - tcp->socket->write_info.outstanding = 0; - tcp_unref(tcp); - return GRPC_ENDPOINT_WRITE_ERROR; + TCP_UNREF(tcp, "write"); + return GRPC_ENDPOINT_ERROR; } } /* As all is now setup, we can now ask for the IOCP notification. It may trigger the callback immediately however, but no matter. */ grpc_socket_notify_on_write(socket, on_write, tcp); - return GRPC_ENDPOINT_WRITE_PENDING; + return GRPC_ENDPOINT_PENDING; } static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) { @@ -387,19 +366,17 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) { concurrent access of the data structure in that regard. */ static void win_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - int extra_refs = 0; gpr_mu_lock(&tcp->mu); /* At that point, what may happen is that we're already inside the IOCP callback. See the comments in on_read and on_write. */ tcp->shutting_down = 1; - extra_refs = grpc_winsocket_shutdown(tcp->socket); - while (extra_refs--) tcp_ref(tcp); + grpc_winsocket_shutdown(tcp->socket); gpr_mu_unlock(&tcp->mu); } static void win_destroy(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_unref(tcp); + TCP_UNREF(tcp, "destroy"); } static char *win_get_peer(grpc_endpoint *ep) { @@ -408,8 +385,8 @@ static char *win_get_peer(grpc_endpoint *ep) { } static grpc_endpoint_vtable vtable = { - win_notify_on_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_peer}; + win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, + win_shutdown, win_destroy, win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); @@ -417,7 +394,6 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { tcp->base.vtable = &vtable; tcp->socket = socket; gpr_mu_init(&tcp->mu); - gpr_slice_buffer_init(&tcp->write_slices); gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); return &tcp->base; diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 81b3e33cb2..b696e384fc 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -49,15 +49,15 @@ typedef struct { struct tsi_frame_protector *protector; gpr_mu protector_mu; /* saved upper level callbacks and user_data. */ - grpc_endpoint_read_cb read_cb; - void *read_user_data; - grpc_endpoint_write_cb write_cb; - void *write_user_data; + grpc_iomgr_closure *read_cb; + grpc_iomgr_closure *write_cb; + grpc_iomgr_closure on_read; + gpr_slice_buffer *read_buffer; + gpr_slice_buffer source_buffer; /* saved handshaker leftover data to unprotect. */ gpr_slice_buffer leftover_bytes; /* buffers for read and write */ gpr_slice read_staging_buffer; - gpr_slice_buffer input_buffer; gpr_slice write_staging_buffer; gpr_slice_buffer output_buffer; @@ -67,62 +67,91 @@ typedef struct { int grpc_trace_secure_endpoint = 0; -static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); } - static void destroy(secure_endpoint *secure_ep) { secure_endpoint *ep = secure_ep; grpc_endpoint_destroy(ep->wrapped_ep); tsi_frame_protector_destroy(ep->protector); gpr_slice_buffer_destroy(&ep->leftover_bytes); gpr_slice_unref(ep->read_staging_buffer); - gpr_slice_buffer_destroy(&ep->input_buffer); gpr_slice_unref(ep->write_staging_buffer); gpr_slice_buffer_destroy(&ep->output_buffer); + gpr_slice_buffer_destroy(&ep->source_buffer); gpr_mu_destroy(&ep->protector_mu); gpr_free(ep); } +/*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/ +#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG +#define SECURE_ENDPOINT_UNREF(ep, reason) \ + secure_endpoint_unref((ep), (reason), __FILE__, __LINE__) +#define SECURE_ENDPOINT_REF(ep, reason) \ + secure_endpoint_ref((ep), (reason), __FILE__, __LINE__) +static void secure_endpoint_unref(secure_endpoint *ep, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d", + ep, reason, ep->ref.count, ep->ref.count - 1); + if (gpr_unref(&ep->ref)) { + destroy(ep); + } +} + +static void secure_endpoint_ref(secure_endpoint *ep, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP ref %p : %s %d -> %d", + ep, reason, ep->ref.count, ep->ref.count + 1); + gpr_ref(&ep->ref); +} +#else +#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep)) +#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep)) static void secure_endpoint_unref(secure_endpoint *ep) { if (gpr_unref(&ep->ref)) { destroy(ep); } } +static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); } +#endif + static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, gpr_uint8 **end) { - gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer); + gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer); ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); } -static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { +static void call_read_cb(secure_endpoint *ep, int success) { if (grpc_trace_secure_endpoint) { size_t i; - for (i = 0; i < nslices; i++) { - char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + for (i = 0; i < ep->read_buffer->count; i++) { + char *data = gpr_dump_slice(ep->read_buffer->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p: %s", ep, data); gpr_free(data); } } - ep->read_cb(ep->read_user_data, slices, nslices, error); - secure_endpoint_unref(ep); + ep->read_buffer = NULL; + ep->read_cb->cb(ep->read_cb->cb_arg, success); + SECURE_ENDPOINT_UNREF(ep, "read"); } -static void on_read(void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { +static int on_read(void *user_data, int success) { unsigned i; gpr_uint8 keep_looping = 0; - size_t input_buffer_count = 0; tsi_result result = TSI_OK; secure_endpoint *ep = (secure_endpoint *)user_data; gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); + if (!success) { + gpr_slice_buffer_reset_and_unref(ep->read_buffer); + return 0; + } + /* TODO(yangg) check error, maybe bail out early */ - for (i = 0; i < nslices; i++) { - gpr_slice encrypted = slices[i]; + for (i = 0; i < ep->source_buffer.count; i++) { + gpr_slice encrypted = ep->source_buffer.slices[i]; gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted); size_t message_size = GPR_SLICE_LENGTH(encrypted); @@ -161,7 +190,7 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) { gpr_slice_buffer_add( - &ep->input_buffer, + ep->read_buffer, gpr_slice_split_head( &ep->read_staging_buffer, (size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer)))); @@ -169,38 +198,53 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, /* TODO(yangg) experiment with moving this block after read_cb to see if it helps latency */ - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); - } + gpr_slice_buffer_reset_and_unref(&ep->source_buffer); if (result != TSI_OK) { - gpr_slice_buffer_reset_and_unref(&ep->input_buffer); - call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR); - return; + gpr_slice_buffer_reset_and_unref(ep->read_buffer); + return 0; } - /* The upper level will unref the slices. */ - input_buffer_count = ep->input_buffer.count; - ep->input_buffer.count = 0; - call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error); + + return 1; +} + +static void on_read_cb(void *user_data, int success) { + call_read_cb(user_data, on_read(user_data, success)); } -static void endpoint_notify_on_read(grpc_endpoint *secure_ep, - grpc_endpoint_read_cb cb, void *user_data) { +static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { secure_endpoint *ep = (secure_endpoint *)secure_ep; + int immediate_read_success = -1; ep->read_cb = cb; - ep->read_user_data = user_data; - - secure_endpoint_ref(ep); + ep->read_buffer = slices; + gpr_slice_buffer_reset_and_unref(ep->read_buffer); if (ep->leftover_bytes.count) { - size_t leftover_nslices = ep->leftover_bytes.count; - ep->leftover_bytes.count = 0; - on_read(ep, ep->leftover_bytes.slices, leftover_nslices, - GRPC_ENDPOINT_CB_OK); - return; + gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer); + GPR_ASSERT(ep->leftover_bytes.count == 0); + return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; } - grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep); + SECURE_ENDPOINT_REF(ep, "read"); + + switch ( + grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) { + case GRPC_ENDPOINT_DONE: + immediate_read_success = on_read(ep, 1); + break; + case GRPC_ENDPOINT_PENDING: + return GRPC_ENDPOINT_PENDING; + case GRPC_ENDPOINT_ERROR: + immediate_read_success = on_read(ep, 0); + break; + } + + GPR_ASSERT(immediate_read_success != -1); + SECURE_ENDPOINT_UNREF(ep, "read"); + + return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; } static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, @@ -211,36 +255,28 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); } -static void on_write(void *data, grpc_endpoint_cb_status error) { - secure_endpoint *ep = data; - ep->write_cb(ep->write_user_data, error); - secure_endpoint_unref(ep); -} - -static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { +static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { unsigned i; - size_t output_buffer_count = 0; tsi_result result = TSI_OK; secure_endpoint *ep = (secure_endpoint *)secure_ep; gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer); gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); - grpc_endpoint_write_status status; - GPR_ASSERT(ep->output_buffer.count == 0); + + gpr_slice_buffer_reset_and_unref(&ep->output_buffer); if (grpc_trace_secure_endpoint) { - for (i = 0; i < nslices; i++) { - char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + for (i = 0; i < slices->count; i++) { + char *data = + gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data); gpr_free(data); } } - for (i = 0; i < nslices; i++) { - gpr_slice plain = slices[i]; + for (i = 0; i < slices->count; i++) { + gpr_slice plain = slices->slices[i]; gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain); size_t message_size = GPR_SLICE_LENGTH(plain); while (message_size > 0) { @@ -290,29 +326,13 @@ static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep, } } - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); - } - if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - return GRPC_ENDPOINT_WRITE_ERROR; + return GRPC_ENDPOINT_ERROR; } - /* clear output_buffer and let the lower level handle its slices. */ - output_buffer_count = ep->output_buffer.count; - ep->output_buffer.count = 0; - ep->write_cb = cb; - ep->write_user_data = user_data; - /* Need to keep the endpoint alive across a transport */ - secure_endpoint_ref(ep); - status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices, - output_buffer_count, on_write, ep); - if (status != GRPC_ENDPOINT_WRITE_PENDING) { - secure_endpoint_unref(ep); - } - return status; + return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); } static void endpoint_shutdown(grpc_endpoint *secure_ep) { @@ -320,9 +340,9 @@ static void endpoint_shutdown(grpc_endpoint *secure_ep) { grpc_endpoint_shutdown(ep->wrapped_ep); } -static void endpoint_unref(grpc_endpoint *secure_ep) { +static void endpoint_destroy(grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - secure_endpoint_unref(ep); + SECURE_ENDPOINT_UNREF(ep, "destroy"); } static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, @@ -343,9 +363,9 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) { } static const grpc_endpoint_vtable vtable = { - endpoint_notify_on_read, endpoint_write, + endpoint_read, endpoint_write, endpoint_add_to_pollset, endpoint_add_to_pollset_set, - endpoint_shutdown, endpoint_unref, + endpoint_shutdown, endpoint_destroy, endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( @@ -363,8 +383,10 @@ grpc_endpoint *grpc_secure_endpoint_create( } ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); - gpr_slice_buffer_init(&ep->input_buffer); gpr_slice_buffer_init(&ep->output_buffer); + gpr_slice_buffer_init(&ep->source_buffer); + ep->read_buffer = NULL; + grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep); gpr_mu_init(&ep->protector_mu); gpr_ref_init(&ep->ref, 1); return &ep->base; diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index 0c3572b53c..bf0079577e 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -50,16 +50,17 @@ typedef struct { grpc_endpoint *wrapped_endpoint; grpc_endpoint *secure_endpoint; gpr_slice_buffer left_overs; + gpr_slice_buffer incoming; + gpr_slice_buffer outgoing; grpc_secure_transport_setup_done_cb cb; void *user_data; + grpc_iomgr_closure on_handshake_data_sent_to_peer; + grpc_iomgr_closure on_handshake_data_received_from_peer; } grpc_secure_transport_setup; -static void on_handshake_data_received_from_peer(void *setup, gpr_slice *slices, - size_t nslices, - grpc_endpoint_cb_status error); +static void on_handshake_data_received_from_peer(void *setup, int success); -static void on_handshake_data_sent_to_peer(void *setup, - grpc_endpoint_cb_status error); +static void on_handshake_data_sent_to_peer(void *setup, int success); static void secure_transport_setup_done(grpc_secure_transport_setup *s, int is_success) { @@ -78,6 +79,8 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s, if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); gpr_slice_buffer_destroy(&s->left_overs); + gpr_slice_buffer_destroy(&s->outgoing); + gpr_slice_buffer_destroy(&s->incoming); GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup"); gpr_free(s); } @@ -102,6 +105,8 @@ static void on_peer_checked(void *user_data, grpc_security_status status) { s->secure_endpoint = grpc_secure_endpoint_create(protector, s->wrapped_endpoint, s->left_overs.slices, s->left_overs.count); + s->left_overs.count = 0; + s->left_overs.length = 0; secure_transport_setup_done(s, 1); return; } @@ -132,7 +137,6 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { size_t offset = 0; tsi_result result = TSI_OK; gpr_slice to_send; - grpc_endpoint_write_status write_status; do { size_t to_send_size = s->handshake_buffer_size - offset; @@ -155,28 +159,25 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { to_send = gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); + gpr_slice_buffer_reset_and_unref(&s->outgoing); + gpr_slice_buffer_add(&s->outgoing, to_send); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1, - on_handshake_data_sent_to_peer, s); - if (write_status == GRPC_ENDPOINT_WRITE_ERROR) { - gpr_log(GPR_ERROR, "Could not send handshake data to peer."); - secure_transport_setup_done(s, 0); - } else if (write_status == GRPC_ENDPOINT_WRITE_DONE) { - on_handshake_data_sent_to_peer(s, GRPC_ENDPOINT_CB_OK); - } -} - -static void cleanup_slices(gpr_slice *slices, size_t num_slices) { - size_t i; - for (i = 0; i < num_slices; i++) { - gpr_slice_unref(slices[i]); + switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing, + &s->on_handshake_data_sent_to_peer)) { + case GRPC_ENDPOINT_ERROR: + gpr_log(GPR_ERROR, "Could not send handshake data to peer."); + secure_transport_setup_done(s, 0); + break; + case GRPC_ENDPOINT_DONE: + on_handshake_data_sent_to_peer(s, 1); + break; + case GRPC_ENDPOINT_PENDING: + break; } } -static void on_handshake_data_received_from_peer( - void *setup, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { +static void on_handshake_data_received_from_peer(void *setup, int success) { grpc_secure_transport_setup *s = setup; size_t consumed_slice_size = 0; tsi_result result = TSI_OK; @@ -184,32 +185,37 @@ static void on_handshake_data_received_from_peer( size_t num_left_overs; int has_left_overs_in_current_slice = 0; - if (error != GRPC_ENDPOINT_CB_OK) { + if (!success) { gpr_log(GPR_ERROR, "Read failed."); - cleanup_slices(slices, nslices); secure_transport_setup_done(s, 0); return; } - for (i = 0; i < nslices; i++) { - consumed_slice_size = GPR_SLICE_LENGTH(slices[i]); + for (i = 0; i < s->incoming.count; i++) { + consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]); result = tsi_handshaker_process_bytes_from_peer( - s->handshaker, GPR_SLICE_START_PTR(slices[i]), &consumed_slice_size); + s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]), + &consumed_slice_size); if (!tsi_handshaker_is_in_progress(s->handshaker)) break; } if (tsi_handshaker_is_in_progress(s->handshaker)) { /* We may need more data. */ if (result == TSI_INCOMPLETE_DATA) { - /* TODO(klempner,jboeuf): This should probably use the client setup - deadline */ - grpc_endpoint_notify_on_read(s->wrapped_endpoint, - on_handshake_data_received_from_peer, setup); - cleanup_slices(slices, nslices); + switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, + &s->on_handshake_data_received_from_peer)) { + case GRPC_ENDPOINT_DONE: + on_handshake_data_received_from_peer(s, 1); + break; + case GRPC_ENDPOINT_ERROR: + on_handshake_data_received_from_peer(s, 0); + break; + case GRPC_ENDPOINT_PENDING: + break; + } return; } else { send_handshake_bytes_to_peer(s); - cleanup_slices(slices, nslices); return; } } @@ -217,42 +223,40 @@ static void on_handshake_data_received_from_peer( if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string(result)); - cleanup_slices(slices, nslices); secure_transport_setup_done(s, 0); return; } /* Handshake is done and successful this point. */ has_left_overs_in_current_slice = - (consumed_slice_size < GPR_SLICE_LENGTH(slices[i])); - num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + nslices - i - 1; + (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i])); + num_left_overs = + (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1; if (num_left_overs == 0) { - cleanup_slices(slices, nslices); check_peer(s); return; } - cleanup_slices(slices, nslices - num_left_overs); - /* Put the leftovers in our buffer (ownership transfered). */ if (has_left_overs_in_current_slice) { - gpr_slice_buffer_add(&s->left_overs, - gpr_slice_split_tail(&slices[i], consumed_slice_size)); - gpr_slice_unref(slices[i]); /* split_tail above increments refcount. */ + gpr_slice_buffer_add( + &s->left_overs, + gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size)); + gpr_slice_unref( + s->incoming.slices[i]); /* split_tail above increments refcount. */ } gpr_slice_buffer_addn( - &s->left_overs, &slices[i + 1], + &s->left_overs, &s->incoming.slices[i + 1], num_left_overs - (size_t)has_left_overs_in_current_slice); check_peer(s); } /* If setup is NULL, the setup is done. */ -static void on_handshake_data_sent_to_peer(void *setup, - grpc_endpoint_cb_status error) { +static void on_handshake_data_sent_to_peer(void *setup, int success) { grpc_secure_transport_setup *s = setup; /* Make sure that write is OK. */ - if (error != GRPC_ENDPOINT_CB_OK) { - gpr_log(GPR_ERROR, "Write failed with error %d.", error); + if (!success) { + gpr_log(GPR_ERROR, "Write failed."); if (setup != NULL) secure_transport_setup_done(s, 0); return; } @@ -261,8 +265,17 @@ static void on_handshake_data_sent_to_peer(void *setup, if (tsi_handshaker_is_in_progress(s->handshaker)) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_notify_on_read(s->wrapped_endpoint, - on_handshake_data_received_from_peer, setup); + switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, + &s->on_handshake_data_received_from_peer)) { + case GRPC_ENDPOINT_ERROR: + on_handshake_data_received_from_peer(s, 0); + break; + case GRPC_ENDPOINT_PENDING: + break; + case GRPC_ENDPOINT_DONE: + on_handshake_data_received_from_peer(s, 1); + break; + } } else { check_peer(s); } @@ -288,6 +301,12 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, s->wrapped_endpoint = nonsecure_endpoint; s->user_data = user_data; s->cb = cb; + grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer, + on_handshake_data_sent_to_peer, s); + grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer, + on_handshake_data_received_from_peer, s); gpr_slice_buffer_init(&s->left_overs); + gpr_slice_buffer_init(&s->outgoing); + gpr_slice_buffer_init(&s->incoming); send_handshake_bytes_to_peer(s); } diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index 987d5cb9b5..6482ef9c9f 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -207,3 +207,25 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) { src->count = 0; src->length = 0; } + +void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) { + GPR_ASSERT(n <= sb->length); + sb->length -= n; + for (;;) { + size_t idx = sb->count - 1; + gpr_slice slice = sb->slices[idx]; + size_t slice_len = GPR_SLICE_LENGTH(slice); + if (slice_len > n) { + sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n); + return; + } else if (slice_len == n) { + gpr_slice_unref(slice); + sb->count = idx; + return; + } else { + gpr_slice_unref(slice); + n -= slice_len; + sb->count = idx; + } + } +} diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 42cf0ecd5b..7a42de9245 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -214,6 +214,8 @@ typedef struct { grpc_chttp2_hpack_compressor hpack_compressor; /** is this a client? */ gpr_uint8 is_client; + /** callback for when writing is done */ + grpc_iomgr_closure done_cb; } grpc_chttp2_transport_writing; struct grpc_chttp2_transport_parsing { @@ -291,6 +293,9 @@ struct grpc_chttp2_transport { gpr_refcount refs; char *peer_string; + /** when this drops to zero it's safe to shutdown the endpoint */ + gpr_refcount shutdown_ep_refs; + gpr_mu mu; /** is the transport destroying itself? */ @@ -329,8 +334,11 @@ struct grpc_chttp2_transport { /** closure to execute writing */ grpc_iomgr_closure writing_action; - /** closure to start reading from the endpoint */ - grpc_iomgr_closure reading_action; + /** closure to finish reading from the endpoint */ + grpc_iomgr_closure recv_data; + + /** incoming read bytes */ + gpr_slice_buffer read_buffer; /** address to place a newly accepted stream - set and unset by grpc_chttp2_parsing_accept_stream; used by init_stream to @@ -463,8 +471,7 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); void grpc_chttp2_perform_writes( grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); -void grpc_chttp2_terminate_writing( - grpc_chttp2_transport_writing *transport_writing, int success); +void grpc_chttp2_terminate_writing(void *transport_writing, int success); void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 123061b3fc..2c8c48f47b 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -37,7 +37,6 @@ #include <grpc/support/log.h> static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); -static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status); int grpc_chttp2_unlocking_check_writes( grpc_chttp2_transport_global *transport_global, @@ -165,16 +164,15 @@ void grpc_chttp2_perform_writes( GPR_ASSERT(transport_writing->outbuf.count > 0); GPR_ASSERT(endpoint); - switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, - transport_writing->outbuf.count, finish_write_cb, - transport_writing)) { - case GRPC_ENDPOINT_WRITE_DONE: + switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf, + &transport_writing->done_cb)) { + case GRPC_ENDPOINT_DONE: grpc_chttp2_terminate_writing(transport_writing, 1); break; - case GRPC_ENDPOINT_WRITE_ERROR: + case GRPC_ENDPOINT_ERROR: grpc_chttp2_terminate_writing(transport_writing, 0); break; - case GRPC_ENDPOINT_WRITE_PENDING: + case GRPC_ENDPOINT_PENDING: break; } } @@ -209,12 +207,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { } } -static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) { - grpc_chttp2_transport_writing *transport_writing = tw; - grpc_chttp2_terminate_writing(transport_writing, - write_status == GRPC_ENDPOINT_CB_OK); -} - void grpc_chttp2_cleanup_writing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { @@ -243,6 +235,5 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); } - transport_writing->outbuf.count = 0; - transport_writing->outbuf.length = 0; + gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1bbd210e46..aa6a860c67 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -84,15 +84,13 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); -static void reading_action(void *t, int iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); /** Endpoint callback to process incoming data */ -static void recv_data(void *tp, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error); +static void recv_data(void *tp, int success); /** Start disconnection chain */ static void drop_connection(grpc_chttp2_transport *t); @@ -143,6 +141,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor); gpr_slice_buffer_destroy(&t->parsing.qbuf); + gpr_slice_buffer_destroy(&t->read_buffer); grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); @@ -223,6 +222,8 @@ static void init_transport(grpc_chttp2_transport *t, t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ gpr_ref_init(&t->refs, 2); + /* ref is dropped at transport close() */ + gpr_ref_init(&t->shutdown_ep_refs, 1); gpr_mu_init(&t->mu); grpc_mdctx_ref(mdctx); t->peer_string = grpc_endpoint_get_peer(ep); @@ -249,12 +250,16 @@ static void init_transport(grpc_chttp2_transport *t, gpr_slice_buffer_init(&t->writing.outbuf); grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); grpc_iomgr_closure_init(&t->writing_action, writing_action, t); - grpc_iomgr_closure_init(&t->reading_action, reading_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); + grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, + &t->writing); + grpc_iomgr_closure_init(&t->recv_data, recv_data, t); + gpr_slice_buffer_init(&t->read_buffer); + if (is_client) { gpr_slice_buffer_add( &t->global.qbuf, @@ -333,13 +338,45 @@ static void destroy_transport(grpc_transport *gt) { UNREF_TRANSPORT(t, "destroy"); } +/** block grpc_endpoint_shutdown being called until a paired + allow_endpoint_shutdown is made */ +static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { + GPR_ASSERT(t->ep); + gpr_ref(&t->shutdown_ep_refs); +} + +static void allow_endpoint_shutdown_locked(grpc_chttp2_transport *t) { + if (gpr_unref(&t->shutdown_ep_refs)) { + if (t->ep) { + grpc_endpoint_shutdown(t->ep); + } + } +} + +static void allow_endpoint_shutdown_unlocked(grpc_chttp2_transport *t) { + if (gpr_unref(&t->shutdown_ep_refs)) { + gpr_mu_lock(&t->mu); + if (t->ep) { + grpc_endpoint_shutdown(t->ep); + } + gpr_mu_unlock(&t->mu); + } +} + +static void destroy_endpoint(grpc_chttp2_transport *t) { + grpc_endpoint_destroy(t->ep); + t->ep = NULL; + UNREF_TRANSPORT( + t, "disconnect"); /* safe because we'll still have the ref for write */ +} + static void close_transport_locked(grpc_chttp2_transport *t) { if (!t->closed) { t->closed = 1; connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE, "close_transport"); if (t->ep) { - grpc_endpoint_shutdown(t->ep); + allow_endpoint_shutdown_locked(t); } } } @@ -468,6 +505,7 @@ static void unlock(grpc_chttp2_transport *t) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); + prevent_endpoint_shutdown(t); } run_closures = t->global.pending_closures_head; @@ -502,12 +540,14 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -void grpc_chttp2_terminate_writing( - grpc_chttp2_transport_writing *transport_writing, int success) { +void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) { + grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); lock(t); + allow_endpoint_shutdown_locked(t); + if (!success) { drop_connection(t); } @@ -519,10 +559,7 @@ void grpc_chttp2_terminate_writing( from starting */ t->writing_active = 0; if (t->ep && !t->endpoint_reading) { - grpc_endpoint_destroy(t->ep); - t->ep = NULL; - UNREF_TRANSPORT( - t, "disconnect"); /* safe because we'll still have the ref for write */ + destroy_endpoint(t); } unlock(t); @@ -1052,82 +1089,90 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) { static void read_error_locked(grpc_chttp2_transport *t) { t->endpoint_reading = 0; if (!t->writing_active && t->ep) { - grpc_endpoint_destroy(t->ep); - t->ep = NULL; - /* safe as we still have a ref for read */ - UNREF_TRANSPORT(t, "disconnect"); + destroy_endpoint(t); } } /* tcp read callback */ -static void recv_data(void *tp, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { - grpc_chttp2_transport *t = tp; +static int recv_data_loop(grpc_chttp2_transport *t, int *success) { size_t i; - int unref = 0; + int keep_reading = 0; - switch (error) { - case GRPC_ENDPOINT_CB_SHUTDOWN: - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_ERROR: - lock(t); + lock(t); + i = 0; + GPR_ASSERT(!t->parsing_active); + if (!t->closed) { + t->parsing_active = 1; + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + grpc_chttp2_prepare_to_read(&t->global, &t->parsing); + gpr_mu_unlock(&t->mu); + for (; i < t->read_buffer.count && + grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]); + i++) + ; + gpr_mu_lock(&t->mu); + if (i != t->read_buffer.count) { drop_connection(t); - read_error_locked(t); - unlock(t); - unref = 1; - for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); - break; - case GRPC_ENDPOINT_CB_OK: - lock(t); - i = 0; - GPR_ASSERT(!t->parsing_active); - if (!t->closed) { - t->parsing_active = 1; - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - grpc_chttp2_prepare_to_read(&t->global, &t->parsing); - gpr_mu_unlock(&t->mu); - for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); - i++) { - gpr_slice_unref(slices[i]); - } - gpr_mu_lock(&t->mu); - if (i != nslices) { - drop_connection(t); - } - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - t->global.concurrent_stream_count = - grpc_chttp2_stream_map_size(&t->parsing_stream_map); - if (t->parsing.initial_window_update != 0) { - grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); - t->parsing.initial_window_update = 0; - } - /* handle higher level things */ - grpc_chttp2_publish_reads(&t->global, &t->parsing); - t->parsing_active = 0; - } - if (i == nslices) { - grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); - } else { - read_error_locked(t); - unref = 1; - } - unlock(t); - for (; i < nslices; i++) gpr_slice_unref(slices[i]); - break; + } + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + t->global.concurrent_stream_count = + grpc_chttp2_stream_map_size(&t->parsing_stream_map); + if (t->parsing.initial_window_update != 0) { + grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, + update_global_window, t); + t->parsing.initial_window_update = 0; + } + /* handle higher level things */ + grpc_chttp2_publish_reads(&t->global, &t->parsing); + t->parsing_active = 0; } - if (unref) { + if (!*success || i != t->read_buffer.count) { + drop_connection(t); + read_error_locked(t); + } else if (!t->closed) { + keep_reading = 1; + REF_TRANSPORT(t, "keep_reading"); + prevent_endpoint_shutdown(t); + } + gpr_slice_buffer_reset_and_unref(&t->read_buffer); + unlock(t); + + if (keep_reading) { + int ret = -1; + switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) { + case GRPC_ENDPOINT_DONE: + *success = 1; + ret = 1; + break; + case GRPC_ENDPOINT_ERROR: + *success = 0; + ret = 1; + break; + case GRPC_ENDPOINT_PENDING: + ret = 0; + break; + } + allow_endpoint_shutdown_unlocked(t); + UNREF_TRANSPORT(t, "keep_reading"); + return ret; + } else { UNREF_TRANSPORT(t, "recv_data"); + return 0; } + + gpr_log(GPR_ERROR, "should never reach here"); + abort(); } -static void reading_action(void *pt, int iomgr_success_ignored) { - grpc_chttp2_transport *t = pt; - grpc_endpoint_notify_on_read(t->ep, recv_data, t); +static void recv_data(void *tp, int success) { + grpc_chttp2_transport *t = tp; + + while (recv_data_loop(t, &success)) + ; } /* @@ -1240,5 +1285,6 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport, gpr_slice *slices, size_t nslices) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ - recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); + gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); + recv_data(t, 1); } |