diff options
Diffstat (limited to 'src')
76 files changed, 1896 insertions, 1368 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index e01c9a2ad4..8b6ba1d472 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -36,7 +36,6 @@ #include <stdio.h> #include <string.h> -#include "include/grpc/census.h" #include "src/core/channel/channel_stack.h" #include "src/core/channel/noop_filter.h" #include "src/core/statistics/census_interface.h" diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 2b61d33c29..ec832a0367 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -85,16 +85,14 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { static void hc_on_recv(void *user_data, int success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (success) { - size_t i; - size_t nops = calld->recv_ops->nops; - grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - calld->got_initial_metadata = 1; - grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); - } + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->got_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); } calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 9012070e8e..1e38479eb1 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -61,6 +61,10 @@ typedef struct { grpc_httpcli_context *context; grpc_pollset *pollset; grpc_iomgr_object iomgr_obj; + gpr_slice_buffer incoming; + gpr_slice_buffer outgoing; + grpc_iomgr_closure on_read; + grpc_iomgr_closure done_write; } internal_request; static grpc_httpcli_get_override g_get_override = NULL; @@ -99,73 +103,70 @@ static void finish(internal_request *req, int success) { gpr_slice_unref(req->request_text); gpr_free(req->host); grpc_iomgr_unregister_object(&req->iomgr_obj); + gpr_slice_buffer_destroy(&req->incoming); + gpr_slice_buffer_destroy(&req->outgoing); gpr_free(req); } -static void on_read(void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status status) { +static void on_read(void *user_data, int success); + +static void do_read(internal_request *req) { + switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) { + case GRPC_ENDPOINT_DONE: + on_read(req, 1); + break; + case GRPC_ENDPOINT_PENDING: + break; + case GRPC_ENDPOINT_ERROR: + on_read(req, 0); + break; + } +} + +static void on_read(void *user_data, int success) { internal_request *req = user_data; size_t i; - for (i = 0; i < nslices; i++) { - if (GPR_SLICE_LENGTH(slices[i])) { + for (i = 0; i < req->incoming.count; i++) { + if (GPR_SLICE_LENGTH(req->incoming.slices[i])) { req->have_read_byte = 1; - if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) { + if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) { finish(req, 0); - goto done; + return; } } } - switch (status) { - case GRPC_ENDPOINT_CB_OK: - grpc_endpoint_notify_on_read(req->ep, on_read, req); - break; - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_ERROR: - case GRPC_ENDPOINT_CB_SHUTDOWN: - if (!req->have_read_byte) { - next_address(req); - } else { - finish(req, grpc_httpcli_parser_eof(&req->parser)); - } - break; - } - -done: - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); + if (success) { + do_read(req); + } else if (!req->have_read_byte) { + next_address(req); + } else { + finish(req, grpc_httpcli_parser_eof(&req->parser)); } } -static void on_written(internal_request *req) { - grpc_endpoint_notify_on_read(req->ep, on_read, req); -} +static void on_written(internal_request *req) { do_read(req); } -static void done_write(void *arg, grpc_endpoint_cb_status status) { +static void done_write(void *arg, int success) { internal_request *req = arg; - switch (status) { - case GRPC_ENDPOINT_CB_OK: - on_written(req); - break; - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_SHUTDOWN: - case GRPC_ENDPOINT_CB_ERROR: - next_address(req); - break; + if (success) { + on_written(req); + } else { + next_address(req); } } static void start_write(internal_request *req) { gpr_slice_ref(req->request_text); - switch ( - grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) { - case GRPC_ENDPOINT_WRITE_DONE: + gpr_slice_buffer_add(&req->outgoing, req->request_text); + switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) { + case GRPC_ENDPOINT_DONE: on_written(req); break; - case GRPC_ENDPOINT_WRITE_PENDING: + case GRPC_ENDPOINT_PENDING: break; - case GRPC_ENDPOINT_WRITE_ERROR: + case GRPC_ENDPOINT_ERROR: finish(req, 0); break; } @@ -237,6 +238,10 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; req->context = context; req->pollset = pollset; + grpc_iomgr_closure_init(&req->on_read, on_read, req); + grpc_iomgr_closure_init(&req->done_write, done_write, req); + gpr_slice_buffer_init(&req->incoming); + gpr_slice_buffer_init(&req->outgoing); gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); grpc_iomgr_register_object(&req->iomgr_obj, name); gpr_free(name); @@ -270,7 +275,11 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, request->handshaker ? request->handshaker : &grpc_httpcli_plaintext; req->context = context; req->pollset = pollset; - gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); + grpc_iomgr_closure_init(&req->on_read, on_read, req); + grpc_iomgr_closure_init(&req->done_write, done_write, req); + gpr_slice_buffer_init(&req->incoming); + gpr_slice_buffer_init(&req->outgoing); + gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path); grpc_iomgr_register_object(&req->iomgr_obj, name); gpr_free(name); req->host = gpr_strdup(request->host); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 8ee14bce9b..a7878e31dd 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -33,17 +33,16 @@ #include "src/core/iomgr/endpoint.h" -void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data) { - ep->vtable->notify_on_read(ep, cb, user_data); +grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { + return ep->vtable->read(ep, slices, cb); } -grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { - return ep->vtable->write(ep, slices, nslices, cb, user_data); +grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { + return ep->vtable->write(ep, slices, cb); } void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index ea92a500e8..d14d52d561 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -37,6 +37,7 @@ #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_set.h" #include <grpc/support/slice.h> +#include <grpc/support/slice_buffer.h> #include <grpc/support/time.h> /* An endpoint caps a streaming channel between two communicating processes. @@ -45,31 +46,17 @@ typedef struct grpc_endpoint grpc_endpoint; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; -typedef enum grpc_endpoint_cb_status { - GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */ - GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */ - GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */ - GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */ -} grpc_endpoint_cb_status; - -typedef enum grpc_endpoint_write_status { - GRPC_ENDPOINT_WRITE_DONE, /* completed immediately, cb won't be called */ - GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */ - GRPC_ENDPOINT_WRITE_ERROR /* write errored out, cb won't be called */ -} grpc_endpoint_write_status; - -typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices, - size_t nslices, - grpc_endpoint_cb_status error); -typedef void (*grpc_endpoint_write_cb)(void *user_data, - grpc_endpoint_cb_status error); +typedef enum grpc_endpoint_op_status { + GRPC_ENDPOINT_DONE, /* completed immediately, cb won't be called */ + GRPC_ENDPOINT_PENDING, /* cb will be called when completed */ + GRPC_ENDPOINT_ERROR /* write errored out, cb won't be called */ +} grpc_endpoint_op_status; struct grpc_endpoint_vtable { - void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data); - grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, - size_t nslices, grpc_endpoint_write_cb cb, - void *user_data); + grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_iomgr_closure *cb); + grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_iomgr_closure *cb); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); void (*shutdown)(grpc_endpoint *ep); @@ -77,26 +64,32 @@ struct grpc_endpoint_vtable { char *(*get_peer)(grpc_endpoint *ep); }; -/* When data is available on the connection, calls the callback with slices. */ -void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data); +/* When data is available on the connection, calls the callback with slices. + Callback success indicates that the endpoint can accept more reads, failure + indicates the endpoint is closed. + Valid slices may be placed into \a slices even on callback success == 0. */ +grpc_endpoint_op_status grpc_endpoint_read( + grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT; char *grpc_endpoint_get_peer(grpc_endpoint *ep); /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it - returns GRPC_ENDPOINT_WRITE_DONE. - Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the - connection is ready for more data. */ -grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data); + returns GRPC_ENDPOINT_DONE. + Otherwise it returns GRPC_ENDPOINT_PENDING and calls cb when the + connection is ready for more data. + \a slices may be mutated at will by the endpoint until cb is called. + No guarantee is made to the content of slices after a write EXCEPT that + it is a valid slice buffer. + */ +grpc_endpoint_op_status grpc_endpoint_write( + grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT; /* Causes any pending read/write callbacks to run immediately with - GRPC_ENDPOINT_CB_SHUTDOWN status */ + success==0 */ void grpc_endpoint_shutdown(grpc_endpoint *ep); void grpc_endpoint_destroy(grpc_endpoint *ep); diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 09a457dd9a..006f8b2abf 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -52,7 +52,6 @@ static OVERLAPPED g_iocp_custom_overlap; static gpr_event g_shutdown_iocp; static gpr_event g_iocp_done; -static gpr_atm g_orphans = 0; static gpr_atm g_custom_events = 0; static HANDLE g_iocp; @@ -92,22 +91,13 @@ static void do_iocp_work() { gpr_log(GPR_ERROR, "Unknown IOCP operation"); abort(); } - GPR_ASSERT(info->outstanding); - if (socket->orphan) { - info->outstanding = 0; - if (!socket->read_info.outstanding && !socket->write_info.outstanding) { - grpc_winsocket_destroy(socket); - gpr_atm_full_fetch_add(&g_orphans, -1); - } - return; - } success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes, FALSE, &flags); info->bytes_transfered = bytes; info->wsa_error = success ? 0 : WSAGetLastError(); GPR_ASSERT(overlapped == &info->overlapped); - gpr_mu_lock(&socket->state_mu); GPR_ASSERT(!info->has_pending_iocp); + gpr_mu_lock(&socket->state_mu); if (info->cb) { f = info->cb; opaque = info->opaque; @@ -120,9 +110,8 @@ static void do_iocp_work() { } static void iocp_loop(void *p) { - while (gpr_atm_acq_load(&g_orphans) || gpr_atm_acq_load(&g_custom_events) || + while (gpr_atm_acq_load(&g_custom_events) || !gpr_event_get(&g_shutdown_iocp)) { - grpc_maybe_call_delayed_callbacks(NULL, 1); do_iocp_work(); } @@ -175,12 +164,6 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) { GPR_ASSERT(ret == g_iocp); } -void grpc_iocp_socket_orphan(grpc_winsocket *socket) { - GPR_ASSERT(!socket->orphan); - gpr_atm_full_fetch_add(&g_orphans, 1); - socket->orphan = 1; -} - /* Calling notify_on_read or write means either of two things: -) The IOCP already completed in the background, and we need to call the callback now. diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index ee3847a229..7d2dc45176 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -42,7 +42,6 @@ void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket *); -void grpc_iocp_socket_orphan(grpc_winsocket *); void grpc_socket_notify_on_write(grpc_winsocket *, void (*cb)(void *, int success), void *opaque); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index fdc9adf4af..1dd03992ae 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -108,8 +108,14 @@ static size_t count_objects(void) { return n; } -void grpc_iomgr_shutdown(void) { +static void dump_objects(const char *kind) { grpc_iomgr_object *obj; + for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) { + gpr_log(GPR_DEBUG, "%s OBJECT: %s %p", kind, obj->name, obj); + } +} + +void grpc_iomgr_shutdown(void) { grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN)); @@ -151,12 +157,14 @@ void grpc_iomgr_shutdown(void) { } if (g_root_object.next != &g_root_object) { int timeout = 0; - gpr_timespec short_deadline = gpr_time_add( + while (g_cbs_head == NULL) { + gpr_timespec short_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); - while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { - if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { - timeout = 1; - break; + if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { + if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { + timeout = 1; + break; + } } } if (timeout) { @@ -164,9 +172,7 @@ void grpc_iomgr_shutdown(void) { "Failed to free %d iomgr objects before shutdown deadline: " "memory leaks are likely", count_objects()); - for (obj = g_root_object.next; obj != &g_root_object; obj = obj->next) { - gpr_log(GPR_DEBUG, "LEAKED OBJECT: %s %p", obj->name, obj); - } + dump_objects("LEAKED"); break; } } @@ -188,7 +194,7 @@ void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name) { obj->name = gpr_strdup(name); gpr_mu_lock(&g_mu); obj->next = &g_root_object; - obj->prev = obj->next->prev; + obj->prev = g_root_object.prev; obj->next->prev = obj->prev->next = obj; gpr_mu_unlock(&g_mu); } diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 7d8421376b..557ca82226 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -35,8 +35,12 @@ #ifdef GPR_WINSOCK_SOCKET +#include <winsock2.h> +#include <mswsock.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/log_win32.h> #include <grpc/support/string_util.h> #include "src/core/iomgr/iocp_windows.h" @@ -62,46 +66,30 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) { operations to abort them. We need to do that this way because of the various callsites of that function, which happens to be in various mutex hold states, and that'd be unsafe to call them directly. */ -int grpc_winsocket_shutdown(grpc_winsocket *winsocket) { - int callbacks_set = 0; - SOCKET socket; - gpr_mu_lock(&winsocket->state_mu); - socket = winsocket->socket; - if (winsocket->read_info.cb) { - callbacks_set++; - grpc_iomgr_closure_init(&winsocket->shutdown_closure, - winsocket->read_info.cb, - winsocket->read_info.opaque); - grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0); - } - if (winsocket->write_info.cb) { - callbacks_set++; - grpc_iomgr_closure_init(&winsocket->shutdown_closure, - winsocket->write_info.cb, - winsocket->write_info.opaque); - grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0); - } - gpr_mu_unlock(&winsocket->state_mu); - closesocket(socket); - return callbacks_set; -} +void grpc_winsocket_shutdown(grpc_winsocket *winsocket) { + /* Grab the function pointer for DisconnectEx for that specific socket. + It may change depending on the interface. */ + int status; + GUID guid = WSAID_DISCONNECTEX; + LPFN_DISCONNECTEX DisconnectEx; + DWORD ioctl_num_bytes; -/* Abandons a socket. Either we're going to queue it up for garbage collecting - from the IO Completion Port thread, or destroy it immediately. Note that this - mechanisms assumes that we're either always waiting for an operation, or we - explicitly know that we don't. If there is a future case where we can have - an "idle" socket which is neither trying to read or write, we'd start leaking - both memory and sockets. */ -void grpc_winsocket_orphan(grpc_winsocket *winsocket) { - grpc_iomgr_unregister_object(&winsocket->iomgr_object); - if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) { - grpc_iocp_socket_orphan(winsocket); + status = WSAIoctl(winsocket->socket, SIO_GET_EXTENSION_FUNCTION_POINTER, + &guid, sizeof(guid), &DisconnectEx, sizeof(DisconnectEx), + &ioctl_num_bytes, NULL, NULL); + + if (status == 0) { + DisconnectEx(winsocket->socket, NULL, 0, 0); } else { - grpc_winsocket_destroy(winsocket); + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "Unable to retrieve DisconnectEx pointer : %s", utf8_message); + gpr_free(utf8_message); } + closesocket(winsocket->socket); } void grpc_winsocket_destroy(grpc_winsocket *winsocket) { + grpc_iomgr_unregister_object(&winsocket->iomgr_object); gpr_mu_destroy(&winsocket->state_mu); gpr_free(winsocket); } diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h index ecf2530173..498921e0fd 100644 --- a/src/core/iomgr/socket_windows.h +++ b/src/core/iomgr/socket_windows.h @@ -68,8 +68,6 @@ typedef struct grpc_winsocket_callback_info { /* The results of the overlapped operation. */ DWORD bytes_transfered; int wsa_error; - /* A boolean indicating that we started an operation. */ - int outstanding; } grpc_winsocket_callback_info; /* This is a wrapper to a Windows socket. A socket can have one outstanding @@ -92,10 +90,6 @@ typedef struct grpc_winsocket { /* You can't add the same socket twice to the same IO Completion Port. This prevents that. */ int added_to_iocp; - /* A boolean to indicate that the caller has abandoned that socket, but - there is a pending operation that the IO Completion Port will have to - wait for. The socket will be collected at that time. */ - int orphan; grpc_iomgr_closure shutdown_closure; @@ -108,14 +102,10 @@ typedef struct grpc_winsocket { grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name); /* Initiate an asynchronous shutdown of the socket. Will call off any pending - operation to cancel them. Returns the number of callbacks that got setup. */ -int grpc_winsocket_shutdown(grpc_winsocket *socket); + operation to cancel them. */ +void grpc_winsocket_shutdown(grpc_winsocket *socket); -/* Abandon a socket. */ -void grpc_winsocket_orphan(grpc_winsocket *socket); - -/* Destroy a socket. Should only be called by the IO Completion Port thread, - or by grpc_winsocket_orphan if there's no pending operation. */ +/* Destroy a socket. Should only be called if there's no pending operation. */ void grpc_winsocket_destroy(grpc_winsocket *socket); #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 79a58fe2af..05198dbff4 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -60,13 +60,13 @@ typedef struct { grpc_alarm alarm; char *addr_name; int refs; - int aborted; } async_connect; -static void async_connect_cleanup(async_connect *ac) { +static void async_connect_unlock_and_cleanup(async_connect *ac) { int done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { + if (ac->socket != NULL) grpc_winsocket_destroy(ac->socket); gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_name); gpr_free(ac); @@ -77,10 +77,11 @@ static void on_alarm(void *acp, int occured) { async_connect *ac = acp; gpr_mu_lock(&ac->mu); /* If the alarm didn't occur, it got cancelled. */ + gpr_log(GPR_DEBUG, "on_alarm: %p", ac->socket); if (ac->socket != NULL && occured) { grpc_winsocket_shutdown(ac->socket); } - async_connect_cleanup(ac); + async_connect_unlock_and_cleanup(ac); } static void on_connect(void *acp, int from_iocp) { @@ -90,51 +91,33 @@ static void on_connect(void *acp, int from_iocp) { grpc_winsocket_callback_info *info = &ac->socket->write_info; void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; - int aborted; - + grpc_alarm_cancel(&ac->alarm); gpr_mu_lock(&ac->mu); - aborted = ac->aborted; + + gpr_log(GPR_DEBUG, "on_connect: %p", ac->socket); if (from_iocp) { DWORD transfered_bytes = 0; DWORD flags; BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, &transfered_bytes, FALSE, &flags); - info->outstanding = 0; GPR_ASSERT(transfered_bytes == 0); if (!wsa_success) { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); gpr_free(utf8_message); - } else if (!aborted) { + } else { ep = grpc_tcp_create(ac->socket, ac->addr_name); + ac->socket = NULL; } - } else { - gpr_log(GPR_ERROR, "on_connect is shutting down"); - /* If the connection timeouts, we will still get a notification from - the IOCP whatever happens. So we're just going to flag that connection - as being in the process of being aborted, and wait for the IOCP. We - can't just orphan the socket now, because the IOCP might already have - gotten a successful connection, which is our worst-case scenario. - We need to call our callback now to respect the deadline. */ - ac->aborted = 1; - gpr_mu_unlock(&ac->mu); - cb(cb_arg, NULL); - return; } - ac->socket->write_info.outstanding = 0; - - /* If we don't have an endpoint, it means the connection failed, - so it doesn't matter if it aborted or failed. We need to orphan - that socket. */ - if (!ep || aborted) grpc_winsocket_orphan(ac->socket); - async_connect_cleanup(ac); + async_connect_unlock_and_cleanup(ac); /* If the connection was aborted, the callback was already called when the deadline was met. */ - if (!aborted) cb(cb_arg, ep); + cb(cb_arg, ep); } /* Tries to issue one async connection, then schedules both an IOCP @@ -196,7 +179,6 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), socket = grpc_winsocket_create(sock, "client"); info = &socket->write_info; - info->outstanding = 1; success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped); /* It wouldn't be unusual to get a success immediately. But we'll still get @@ -216,11 +198,9 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), gpr_mu_init(&ac->mu); ac->refs = 2; ac->addr_name = grpc_sockaddr_to_uri(addr); - ac->aborted = 0; grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); - socket->write_info.outstanding = 1; grpc_socket_notify_on_write(socket, on_connect, ac); return; @@ -228,8 +208,8 @@ failure: utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, message, utf8_message); gpr_free(utf8_message); - if (socket) { - grpc_winsocket_orphan(socket); + if (socket != NULL) { + grpc_winsocket_destroy(socket); } else if (sock != INVALID_SOCKET) { closesocket(sock); } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 360e6ebd8c..0db7cd9f0e 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -61,209 +61,8 @@ #define SENDMSG_FLAGS 0 #endif -/* Holds a slice array and associated state. */ -typedef struct grpc_tcp_slice_state { - gpr_slice *slices; /* Array of slices */ - size_t nslices; /* Size of slices array. */ - ssize_t first_slice; /* First valid slice in array */ - ssize_t last_slice; /* Last valid slice in array */ - gpr_slice working_slice; /* pointer to original final slice */ - int working_slice_valid; /* True if there is a working slice */ - int memory_owned; /* True if slices array is owned */ -} grpc_tcp_slice_state; - int grpc_tcp_trace = 0; -static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices, - size_t nslices, size_t valid_slices) { - state->slices = slices; - state->nslices = nslices; - if (valid_slices == 0) { - state->first_slice = -1; - } else { - state->first_slice = 0; - } - state->last_slice = valid_slices - 1; - state->working_slice_valid = 0; - state->memory_owned = 0; -} - -/* Returns true if there is still available data */ -static int slice_state_has_available(grpc_tcp_slice_state *state) { - return state->first_slice != -1 && state->last_slice >= state->first_slice; -} - -static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) { - if (state->first_slice == -1) { - return 0; - } else { - return state->last_slice - state->first_slice + 1; - } -} - -static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) { - /* TODO(klempner): use realloc instead when first_slice is 0 */ - /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */ - gpr_slice *slices = state->slices; - size_t original_size = slice_state_slices_allocated(state); - size_t i; - gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size); - - for (i = 0; i < original_size; ++i) { - new_slices[i] = slices[i + state->first_slice]; - } - - state->slices = new_slices; - state->last_slice = original_size - 1; - if (original_size > 0) { - state->first_slice = 0; - } else { - state->first_slice = -1; - } - state->nslices = new_size; - - if (state->memory_owned) { - gpr_free(slices); - } - state->memory_owned = 1; -} - -static void slice_state_remove_prefix(grpc_tcp_slice_state *state, - size_t prefix_bytes) { - gpr_slice *current_slice = &state->slices[state->first_slice]; - size_t current_slice_size; - - while (slice_state_has_available(state)) { - current_slice_size = GPR_SLICE_LENGTH(*current_slice); - if (current_slice_size > prefix_bytes) { - /* TODO(klempner): Get rid of the extra refcount created here by adding a - native "trim the first N bytes" operation to splice */ - /* TODO(klempner): This really shouldn't be modifying the current slice - unless we own the slices array. */ - gpr_slice tail; - tail = gpr_slice_split_tail(current_slice, prefix_bytes); - gpr_slice_unref(*current_slice); - *current_slice = tail; - return; - } else { - gpr_slice_unref(*current_slice); - ++state->first_slice; - ++current_slice; - prefix_bytes -= current_slice_size; - } - } -} - -static void slice_state_destroy(grpc_tcp_slice_state *state) { - while (slice_state_has_available(state)) { - gpr_slice_unref(state->slices[state->first_slice]); - ++state->first_slice; - } - - if (state->memory_owned) { - gpr_free(state->slices); - state->memory_owned = 0; - } -} - -void slice_state_transfer_ownership(grpc_tcp_slice_state *state, - gpr_slice **slices, size_t *nslices) { - *slices = state->slices + state->first_slice; - *nslices = state->last_slice - state->first_slice + 1; - - state->first_slice = -1; - state->last_slice = -1; -} - -/* Fills iov with the first min(iov_size, available) slices, returns number - filled */ -static size_t slice_state_to_iovec(grpc_tcp_slice_state *state, - struct iovec *iov, size_t iov_size) { - size_t nslices = state->last_slice - state->first_slice + 1; - gpr_slice *slices = state->slices + state->first_slice; - size_t i; - if (nslices < iov_size) { - iov_size = nslices; - } - - for (i = 0; i < iov_size; ++i) { - iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]); - iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]); - } - return iov_size; -} - -/* Makes n blocks available at the end of state, writes them into iov, and - returns the number of bytes allocated */ -static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state, - struct iovec *iov, size_t n, - size_t slice_size) { - size_t target_size; - size_t i; - size_t allocated_bytes; - ssize_t allocated_slices = slice_state_slices_allocated(state); - - if (n - state->working_slice_valid >= state->nslices - state->last_slice) { - /* Need to grow the slice array */ - target_size = state->nslices; - do { - target_size = target_size * 2; - } while (target_size < allocated_slices + n - state->working_slice_valid); - /* TODO(klempner): If this ever needs to support both prefix removal and - append, we should be smarter about the growth logic here */ - slice_state_realloc(state, target_size); - } - - i = 0; - allocated_bytes = 0; - - if (state->working_slice_valid) { - iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]); - iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) - - GPR_SLICE_LENGTH(state->slices[state->last_slice]); - allocated_bytes += iov[0].iov_len; - ++i; - state->slices[state->last_slice] = state->working_slice; - state->working_slice_valid = 0; - } - - for (; i < n; ++i) { - ++state->last_slice; - state->slices[state->last_slice] = gpr_slice_malloc(slice_size); - iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]); - iov[i].iov_len = slice_size; - allocated_bytes += slice_size; - } - if (state->first_slice == -1) { - state->first_slice = 0; - } - return allocated_bytes; -} - -/* Remove the last n bytes from state */ -/* TODO(klempner): Consider having this defer actual deletion until later */ -static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) { - while (bytes > 0 && slice_state_has_available(state)) { - if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) { - state->working_slice = state->slices[state->last_slice]; - state->working_slice_valid = 1; - /* TODO(klempner): Combine these into a single operation that doesn't need - to refcount */ - gpr_slice_unref(gpr_slice_split_tail( - &state->slices[state->last_slice], - GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes)); - bytes = 0; - } else { - bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]); - gpr_slice_unref(state->slices[state->last_slice]); - --state->last_slice; - if (state->last_slice == -1) { - state->first_slice = -1; - } - } - } -} - typedef struct { grpc_endpoint base; grpc_fd *em_fd; @@ -273,80 +72,111 @@ typedef struct { size_t slice_size; gpr_refcount refcount; - grpc_endpoint_read_cb read_cb; - void *read_user_data; - grpc_endpoint_write_cb write_cb; - void *write_user_data; + gpr_slice_buffer *incoming_buffer; + gpr_slice_buffer *outgoing_buffer; + /** slice within outgoing_buffer to write next */ + size_t outgoing_slice_idx; + /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ + size_t outgoing_byte_idx; - grpc_tcp_slice_state write_state; + grpc_iomgr_closure *read_cb; + grpc_iomgr_closure *write_cb; grpc_iomgr_closure read_closure; grpc_iomgr_closure write_closure; - grpc_iomgr_closure handle_read_closure; - char *peer_string; } grpc_tcp; -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success); +static void tcp_handle_read(void *arg /* grpc_tcp */, int success); +static void tcp_handle_write(void *arg /* grpc_tcp */, int success); -static void grpc_tcp_shutdown(grpc_endpoint *ep) { +static void tcp_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_fd_shutdown(tcp->em_fd); } -static void grpc_tcp_unref(grpc_tcp *tcp) { - int refcount_zero = gpr_unref(&tcp->refcount); - if (refcount_zero) { - grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); - gpr_free(tcp->peer_string); - gpr_free(tcp); +static void tcp_free(grpc_tcp *tcp) { + grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_free(tcp->peer_string); + gpr_free(tcp); +} + +/*#define GRPC_TCP_REFCOUNT_DEBUG*/ +#ifdef GRPC_TCP_REFCOUNT_DEBUG +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count - 1); + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); } } -static void grpc_tcp_destroy(grpc_endpoint *ep) { +static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count + 1); + gpr_ref(&tcp->refcount); +} +#else +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_REF(tcp, reason) tcp_ref((tcp)) +static void tcp_unref(grpc_tcp *tcp) { + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); + } +} + +static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +#endif + +static void tcp_destroy(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_tcp_unref(tcp); + TCP_UNREF(tcp, "destroy"); } -static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status status) { - grpc_endpoint_read_cb cb = tcp->read_cb; +static void call_read_cb(grpc_tcp *tcp, int success) { + grpc_iomgr_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { size_t i; - gpr_log(GPR_DEBUG, "read: status=%d", status); - for (i = 0; i < nslices; i++) { - char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "read: success=%d", success); + for (i = 0; i < tcp->incoming_buffer->count; i++) { + char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump); gpr_free(dump); } } tcp->read_cb = NULL; - cb(tcp->read_user_data, slices, nslices, status); + tcp->incoming_buffer = NULL; + cb->cb(cb->cb_arg, success); } -#define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 -static void grpc_tcp_continue_read(grpc_tcp *tcp) { - gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; +static void tcp_continue_read(grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; - ssize_t allocated_bytes; - struct grpc_tcp_slice_state read_state; - gpr_slice *final_slices; - size_t final_nslices; + size_t i; GPR_ASSERT(!tcp->finished_edge); + GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC); + GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC); GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); - slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, - 0); - allocated_bytes = slice_state_append_blocks_into_iovec( - &read_state, iov, tcp->iov_size, tcp->slice_size); + while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { + gpr_slice_buffer_add_indexed(tcp->incoming_buffer, + gpr_slice_malloc(tcp->slice_size)); + } + for (i = 0; i < tcp->incoming_buffer->count; i++) { + iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); + iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); + } msg.msg_name = NULL; msg.msg_namelen = 0; @@ -362,106 +192,105 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) { } while (read_bytes < 0 && errno == EINTR); GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0); - if (read_bytes < allocated_bytes) { - /* TODO(klempner): Consider a second read first, in hopes of getting a - * quick EAGAIN and saving a bunch of allocations. */ - slice_state_remove_last(&read_state, read_bytes < 0 - ? allocated_bytes - : allocated_bytes - read_bytes); - } - if (read_bytes < 0) { - /* NB: After calling the user_cb a parallel call of the read handler may + /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ if (errno == EAGAIN) { if (tcp->iov_size > 1) { tcp->iov_size /= 2; } - if (slice_state_has_available(&read_state)) { - /* TODO(klempner): We should probably do the call into the application - without all this junk on the stack */ - /* FIXME(klempner): Refcount properly */ - slice_state_transfer_ownership(&read_state, &final_slices, - &final_nslices); - tcp->finished_edge = 1; - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); - } else { - /* We've consumed the edge, request a new one */ - slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); - } + /* We've consumed the edge, request a new one */ + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { /* TODO(klempner): Log interesting errors */ - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); + gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + call_read_cb(tcp, 0); + TCP_UNREF(tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ - if (slice_state_has_available(&read_state)) { - /* there were bytes already read: pass them up to the application */ - slice_state_transfer_ownership(&read_state, &final_slices, - &final_nslices); - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); - } else { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); - } - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); + gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + call_read_cb(tcp, 0); + TCP_UNREF(tcp, "read"); } else { - if (tcp->iov_size < MAX_READ_IOVEC) { + GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); + if ((size_t)read_bytes < tcp->incoming_buffer->length) { + gpr_slice_buffer_trim_end(tcp->incoming_buffer, + tcp->incoming_buffer->length - read_bytes); + } else if (tcp->iov_size < MAX_READ_IOVEC) { ++tcp->iov_size; } - GPR_ASSERT(slice_state_has_available(&read_state)); - slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices); - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); + GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); + call_read_cb(tcp, 1); + TCP_UNREF(tcp, "read"); } GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); } -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { +static void tcp_handle_read(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); if (!success) { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); - grpc_tcp_unref(tcp); + gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + call_read_cb(tcp, 0); + TCP_UNREF(tcp, "read"); } else { - grpc_tcp_continue_read(tcp); + tcp_continue_read(tcp); } } -static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data) { +static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, + gpr_slice_buffer *incoming_buffer, + grpc_iomgr_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; - tcp->read_user_data = user_data; - gpr_ref(&tcp->refcount); + tcp->incoming_buffer = incoming_buffer; + gpr_slice_buffer_reset_and_unref(incoming_buffer); + TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = 0; grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { - tcp->handle_read_closure.cb_arg = tcp; - grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1); + grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1); } + /* TODO(ctiller): immediate return */ + return GRPC_ENDPOINT_PENDING; } #define MAX_WRITE_IOVEC 16 -static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { +static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; int iov_size; ssize_t sent_length; - grpc_tcp_slice_state *state = &tcp->write_state; + ssize_t sending_length; + ssize_t trailing; + ssize_t unwind_slice_idx; + ssize_t unwind_byte_idx; for (;;) { - iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC); + sending_length = 0; + unwind_slice_idx = tcp->outgoing_slice_idx; + unwind_byte_idx = tcp->outgoing_byte_idx; + for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && + iov_size != MAX_WRITE_IOVEC; + iov_size++) { + iov[iov_size].iov_base = + GPR_SLICE_START_PTR( + tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + + tcp->outgoing_byte_idx; + iov[iov_size].iov_len = + GPR_SLICE_LENGTH( + tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - + tcp->outgoing_byte_idx; + sending_length += iov[iov_size].iov_len; + tcp->outgoing_slice_idx++; + tcp->outgoing_byte_idx = 0; + } + GPR_ASSERT(iov_size > 0); msg.msg_name = NULL; msg.msg_namelen = 0; @@ -480,70 +309,75 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { if (sent_length < 0) { if (errno == EAGAIN) { - return GRPC_ENDPOINT_WRITE_PENDING; + tcp->outgoing_slice_idx = unwind_slice_idx; + tcp->outgoing_byte_idx = unwind_byte_idx; + return GRPC_ENDPOINT_PENDING; } else { /* TODO(klempner): Log some of these */ - slice_state_destroy(state); - return GRPC_ENDPOINT_WRITE_ERROR; + return GRPC_ENDPOINT_ERROR; } } - /* TODO(klempner): Probably better to batch this after we finish flushing */ - slice_state_remove_prefix(state, sent_length); + GPR_ASSERT(tcp->outgoing_byte_idx == 0); + trailing = sending_length - sent_length; + while (trailing > 0) { + ssize_t slice_length; + + tcp->outgoing_slice_idx--; + slice_length = GPR_SLICE_LENGTH( + tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); + if (slice_length > trailing) { + tcp->outgoing_byte_idx = slice_length - trailing; + break; + } else { + trailing -= slice_length; + } + } - if (!slice_state_has_available(state)) { - return GRPC_ENDPOINT_WRITE_DONE; + if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { + return GRPC_ENDPOINT_DONE; } }; } -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { +static void tcp_handle_write(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; - grpc_endpoint_write_status write_status; - grpc_endpoint_cb_status cb_status; - grpc_endpoint_write_cb cb; + grpc_endpoint_op_status status; + grpc_iomgr_closure *cb; if (!success) { - slice_state_destroy(&tcp->write_state); cb = tcp->write_cb; tcp->write_cb = NULL; - cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN); - grpc_tcp_unref(tcp); + cb->cb(cb->cb_arg, 0); + TCP_UNREF(tcp, "write"); return; } GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); - write_status = grpc_tcp_flush(tcp); - if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { + status = tcp_flush(tcp); + if (status == GRPC_ENDPOINT_PENDING) { grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); } else { - slice_state_destroy(&tcp->write_state); - if (write_status == GRPC_ENDPOINT_WRITE_DONE) { - cb_status = GRPC_ENDPOINT_CB_OK; - } else { - cb_status = GRPC_ENDPOINT_CB_ERROR; - } cb = tcp->write_cb; tcp->write_cb = NULL; - cb(tcp->write_user_data, cb_status); - grpc_tcp_unref(tcp); + cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE); + TCP_UNREF(tcp, "write"); } GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); } -static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { +static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep, + gpr_slice_buffer *buf, + grpc_iomgr_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_endpoint_write_status status; + grpc_endpoint_op_status status; if (grpc_tcp_trace) { size_t i; - for (i = 0; i < nslices; i++) { - char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + for (i = 0; i < buf->count; i++) { + char *data = + gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data); gpr_free(data); } @@ -551,15 +385,19 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0); GPR_ASSERT(tcp->write_cb == NULL); - slice_state_init(&tcp->write_state, slices, nslices, nslices); - status = grpc_tcp_flush(tcp); - if (status == GRPC_ENDPOINT_WRITE_PENDING) { - /* TODO(klempner): Consider inlining rather than malloc for small nslices */ - slice_state_realloc(&tcp->write_state, nslices); - gpr_ref(&tcp->refcount); + if (buf->length == 0) { + GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); + return GRPC_ENDPOINT_DONE; + } + tcp->outgoing_buffer = buf; + tcp->outgoing_slice_idx = 0; + tcp->outgoing_byte_idx = 0; + + status = tcp_flush(tcp); + if (status == GRPC_ENDPOINT_PENDING) { + TCP_REF(tcp, "write"); tcp->write_cb = cb; - tcp->write_user_data = user_data; grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); } @@ -567,27 +405,25 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, return status; } -static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { +static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_pollset_add_fd(pollset, tcp->em_fd); } -static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, - grpc_pollset_set *pollset_set) { +static void tcp_add_to_pollset_set(grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); } -static char *grpc_tcp_get_peer(grpc_endpoint *ep) { +static char *tcp_get_peer(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return gpr_strdup(tcp->peer_string); } static const grpc_endpoint_vtable vtable = { - grpc_tcp_notify_on_read, grpc_tcp_write, - grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set, - grpc_tcp_shutdown, grpc_tcp_destroy, - grpc_tcp_get_peer}; + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_shutdown, tcp_destroy, tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, const char *peer_string) { @@ -597,21 +433,18 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; - tcp->read_user_data = NULL; - tcp->write_user_data = NULL; + tcp->incoming_buffer = NULL; tcp->slice_size = slice_size; tcp->iov_size = 1; tcp->finished_edge = 1; - slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); tcp->em_fd = em_fd; - tcp->read_closure.cb = grpc_tcp_handle_read; + tcp->read_closure.cb = tcp_handle_read; tcp->read_closure.cb_arg = tcp; - tcp->write_closure.cb = grpc_tcp_handle_write; + tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; - tcp->handle_read_closure.cb = grpc_tcp_handle_read; return &tcp->base; } diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index d0478d3604..b513d854aa 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -75,18 +75,18 @@ struct grpc_tcp_server { void *cb_arg; gpr_mu mu; - gpr_cv cv; /* active port count: how many ports are actually still listening */ int active_ports; - /* number of iomgr callbacks that have been explicitly scheduled during - * shutdown */ - int iomgr_callbacks_pending; /* all listening ports */ server_port *ports; size_t nports; size_t port_capacity; + + /* shutdown callback */ + void(*shutdown_complete)(void *); + void *shutdown_complete_arg; }; /* Public function. Allocates the proper data structures to hold a @@ -94,48 +94,61 @@ struct grpc_tcp_server { grpc_tcp_server *grpc_tcp_server_create(void) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); gpr_mu_init(&s->mu); - gpr_cv_init(&s->cv); s->active_ports = 0; - s->iomgr_callbacks_pending = 0; s->cb = NULL; s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; + s->shutdown_complete = NULL; return s; } +static void dont_care_about_shutdown_completion(void *arg) {} + +static void finish_shutdown(grpc_tcp_server *s) { + size_t i; + + s->shutdown_complete(s->shutdown_complete_arg); + + /* Now that the accepts have been aborted, we can destroy the sockets. + The IOCP won't get notified on these, so we can flag them as already + closed by the system. */ + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + grpc_winsocket_destroy(sp->socket); + } + gpr_free(s->ports); + gpr_free(s); +} + /* Public function. Stops and destroys a grpc_tcp_server. */ void grpc_tcp_server_destroy(grpc_tcp_server *s, - void (*shutdown_done)(void *shutdown_done_arg), - void *shutdown_done_arg) { + void (*shutdown_complete)(void *shutdown_done_arg), + void *shutdown_complete_arg) { size_t i; + int immediately_done = 0; gpr_mu_lock(&s->mu); + + s->shutdown_complete = shutdown_complete + ? shutdown_complete + : dont_care_about_shutdown_completion; + s->shutdown_complete_arg = shutdown_complete_arg; + /* First, shutdown all fd's. This will queue abortion calls for all of the pending accepts due to the normal operation mechanism. */ + if (s->active_ports == 0) { + immediately_done = 1; + } for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; sp->shutting_down = 1; - s->iomgr_callbacks_pending += grpc_winsocket_shutdown(sp->socket); - } - /* This happens asynchronously. Wait while that happens. */ - while (s->active_ports || s->iomgr_callbacks_pending) { - gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_winsocket_shutdown(sp->socket); } gpr_mu_unlock(&s->mu); - /* Now that the accepts have been aborted, we can destroy the sockets. - The IOCP won't get notified on these, so we can flag them as already - closed by the system. */ - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; - grpc_winsocket_orphan(sp->socket); - } - gpr_free(s->ports); - gpr_free(s); - - if (shutdown_done) { - shutdown_done(shutdown_done_arg); + if (immediately_done) { + finish_shutdown(s); } } @@ -188,14 +201,17 @@ error: } static void decrement_active_ports_and_notify(server_port *sp) { + int notify = 0; sp->shutting_down = 0; - sp->socket->read_info.outstanding = 0; gpr_mu_lock(&sp->server->mu); GPR_ASSERT(sp->server->active_ports > 0); - if (0 == --sp->server->active_ports) { - gpr_cv_broadcast(&sp->server->cv); + if (0 == --sp->server->active_ports && sp->server->shutdown_complete != NULL) { + notify = 1; } gpr_mu_unlock(&sp->server->mu); + if (notify) { + finish_shutdown(sp->server); + } } /* start_accept will reference that for the IOCP notification request. */ @@ -280,12 +296,6 @@ static void on_accept(void *arg, int from_iocp) { this is necessary in the read/write case, it's useless for the accept case. We only need to adjust the pending callback count */ if (!from_iocp) { - gpr_mu_lock(&sp->server->mu); - GPR_ASSERT(sp->server->iomgr_callbacks_pending > 0); - if (0 == --sp->server->iomgr_callbacks_pending) { - gpr_cv_broadcast(&sp->server->cv); - } - gpr_mu_unlock(&sp->server->mu); return; } @@ -462,7 +472,6 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollset, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { - s->ports[i].socket->read_info.outstanding = 1; start_accept(s->ports + i); s->active_ports++; } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 901793ec43..fe3673c607 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -82,13 +82,11 @@ typedef struct grpc_tcp { /* Refcounting how many operations are in progress. */ gpr_refcount refcount; - grpc_endpoint_read_cb read_cb; - void *read_user_data; + grpc_iomgr_closure *read_cb; + grpc_iomgr_closure *write_cb; gpr_slice read_slice; - - grpc_endpoint_write_cb write_cb; - void *write_user_data; - gpr_slice_buffer write_slices; + gpr_slice_buffer *write_slices; + gpr_slice_buffer *read_slices; /* The IO Completion Port runs from another thread. We need some mechanism to protect ourselves when requesting a shutdown. */ @@ -98,82 +96,91 @@ typedef struct grpc_tcp { char *peer_string; } grpc_tcp; -static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +static void tcp_free(grpc_tcp *tcp) { + grpc_winsocket_destroy(tcp->socket); + gpr_mu_destroy(&tcp->mu); + gpr_free(tcp->peer_string); + gpr_free(tcp); +} +/*#define GRPC_TCP_REFCOUNT_DEBUG*/ +#ifdef GRPC_TCP_REFCOUNT_DEBUG +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count - 1); + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); + } +} + +static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count + 1); + gpr_ref(&tcp->refcount); +} +#else +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_REF(tcp, reason) tcp_ref((tcp)) static void tcp_unref(grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { - gpr_slice_buffer_destroy(&tcp->write_slices); - grpc_winsocket_orphan(tcp->socket); - gpr_mu_destroy(&tcp->mu); - gpr_free(tcp->peer_string); - gpr_free(tcp); + tcp_free(tcp); } } +static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +#endif + /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_read(void *tcpp, int from_iocp) { - grpc_tcp *tcp = (grpc_tcp *)tcpp; +static int on_read(grpc_tcp *tcp, int success) { grpc_winsocket *socket = tcp->socket; gpr_slice sub; gpr_slice *slice = NULL; size_t nslices = 0; - grpc_endpoint_cb_status status; - grpc_endpoint_read_cb cb; grpc_winsocket_callback_info *info = &socket->read_info; - void *opaque = tcp->read_user_data; int do_abort = 0; - gpr_mu_lock(&tcp->mu); - cb = tcp->read_cb; - tcp->read_cb = NULL; - if (!from_iocp || tcp->shutting_down) { - /* If we are here with from_iocp set to true, it means we got raced to - shutting down the endpoint. No actual abort callback will happen - though, so we're going to do it from here. */ - do_abort = 1; - } - gpr_mu_unlock(&tcp->mu); - - if (do_abort) { - if (from_iocp) { - tcp->socket->read_info.outstanding = 0; + if (success) { + if (socket->read_info.wsa_error != 0) { + if (socket->read_info.wsa_error != WSAECONNRESET) { + char *utf8_message = gpr_format_message(info->wsa_error); + gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); + gpr_free(utf8_message); + } + success = 0; gpr_slice_unref(tcp->read_slice); - } - tcp_unref(tcp); - if (cb) cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); - return; - } - - GPR_ASSERT(tcp->socket->read_info.outstanding); - - if (socket->read_info.wsa_error != 0) { - if (socket->read_info.wsa_error != WSAECONNRESET) { - char *utf8_message = gpr_format_message(info->wsa_error); - gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message); - gpr_free(utf8_message); - } - gpr_slice_unref(tcp->read_slice); - status = GRPC_ENDPOINT_CB_ERROR; - } else { - if (info->bytes_transfered != 0) { - sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); - status = GRPC_ENDPOINT_CB_OK; - slice = ⊂ - nslices = 1; } else { - gpr_slice_unref(tcp->read_slice); - status = GRPC_ENDPOINT_CB_EOF; + if (info->bytes_transfered != 0) { + sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); + gpr_slice_buffer_add(tcp->read_slices, sub); + success = 1; + } else { + gpr_slice_unref(tcp->read_slice); + success = 0; + } } } - tcp->socket->read_info.outstanding = 0; + return success; +} - tcp_unref(tcp); - cb(opaque, slice, nslices, status); +static void on_read_cb(void *tcpp, int from_iocp) { + grpc_tcp *tcp = tcpp; + grpc_iomgr_closure *cb = tcp->read_cb; + int success = on_read(tcp, from_iocp); + tcp->read_cb = NULL; + TCP_UNREF(tcp, "read"); + if (cb) { + cb->cb(cb->cb_arg, success); + } } -static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *arg) { +static grpc_endpoint_op_status win_read(grpc_endpoint *ep, + gpr_slice_buffer *read_slices, + grpc_iomgr_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->read_info; @@ -182,15 +189,13 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, DWORD flags = 0; WSABUF buffer; - GPR_ASSERT(!tcp->socket->read_info.outstanding); if (tcp->shutting_down) { - cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); - return; + return GRPC_ENDPOINT_ERROR; } - tcp_ref(tcp); - tcp->socket->read_info.outstanding = 1; + tcp->read_cb = cb; - tcp->read_user_data = arg; + tcp->read_slices = read_slices; + gpr_slice_buffer_reset_and_unref(read_slices); tcp->read_slice = gpr_slice_malloc(8192); @@ -204,12 +209,14 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { + int ok; info->bytes_transfered = bytes_read; - /* This might heavily recurse. */ - on_read(tcp, 1); - return; + ok = on_read(tcp, 1); + return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; } + TCP_REF(tcp, "read"); + /* Otherwise, let's retry, by queuing a read. */ memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, @@ -218,71 +225,51 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, if (status != 0) { int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { + int ok; info->wsa_error = wsa_error; - on_read(tcp, 1); - return; + ok = on_read(tcp, 1); + return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; } } - grpc_socket_notify_on_read(tcp->socket, on_read, tcp); + grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp); + return GRPC_ENDPOINT_PENDING; } /* Asynchronous callback from the IOCP, or the background thread. */ -static void on_write(void *tcpp, int from_iocp) { +static void on_write(void *tcpp, int success) { grpc_tcp *tcp = (grpc_tcp *)tcpp; grpc_winsocket *handle = tcp->socket; grpc_winsocket_callback_info *info = &handle->write_info; - grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK; - grpc_endpoint_write_cb cb; - void *opaque = tcp->write_user_data; + grpc_iomgr_closure *cb; int do_abort = 0; gpr_mu_lock(&tcp->mu); cb = tcp->write_cb; tcp->write_cb = NULL; - if (!from_iocp || tcp->shutting_down) { - /* If we are here with from_iocp set to true, it means we got raced to - shutting down the endpoint. No actual abort callback will happen - though, so we're going to do it from here. */ - do_abort = 1; - } gpr_mu_unlock(&tcp->mu); - if (do_abort) { - if (from_iocp) { - tcp->socket->write_info.outstanding = 0; - gpr_slice_buffer_reset_and_unref(&tcp->write_slices); - } - tcp_unref(tcp); - if (cb) cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN); - return; - } - - GPR_ASSERT(tcp->socket->write_info.outstanding); - - if (info->wsa_error != 0) { - if (info->wsa_error != WSAECONNRESET) { - char *utf8_message = gpr_format_message(info->wsa_error); - gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); - gpr_free(utf8_message); + if (success) { + if (info->wsa_error != 0) { + if (info->wsa_error != WSAECONNRESET) { + char *utf8_message = gpr_format_message(info->wsa_error); + gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message); + gpr_free(utf8_message); + } + success = 0; + } else { + GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length); } - status = GRPC_ENDPOINT_CB_ERROR; - } else { - GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length); } - gpr_slice_buffer_reset_and_unref(&tcp->write_slices); - tcp->socket->write_info.outstanding = 0; - - tcp_unref(tcp); - cb(opaque, status); + TCP_UNREF(tcp, "write"); + cb->cb(cb->cb_arg, success); } /* Initiates a write. */ -static grpc_endpoint_write_status win_write(grpc_endpoint *ep, - gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, - void *arg) { +static grpc_endpoint_op_status win_write(grpc_endpoint *ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_winsocket *socket = tcp->socket; grpc_winsocket_callback_info *info = &socket->write_info; @@ -293,30 +280,25 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, WSABUF *allocated = NULL; WSABUF *buffers = local_buffers; - GPR_ASSERT(!tcp->socket->write_info.outstanding); if (tcp->shutting_down) { - return GRPC_ENDPOINT_WRITE_ERROR; + return GRPC_ENDPOINT_ERROR; } - tcp_ref(tcp); - tcp->socket->write_info.outstanding = 1; tcp->write_cb = cb; - tcp->write_user_data = arg; + tcp->write_slices = slices; - gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices); - - if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) { - buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count); + if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) { + buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count); allocated = buffers; } - for (i = 0; i < tcp->write_slices.count; i++) { - buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]); - buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]); + for (i = 0; i < tcp->write_slices->count; i++) { + buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]); + buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]); } /* First, let's try a synchronous, non-blocking write. */ - status = WSASend(socket->socket, buffers, tcp->write_slices.count, + status = WSASend(socket->socket, buffers, tcp->write_slices->count, &bytes_sent, 0, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); @@ -324,10 +306,10 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, connection that has its send queue filled up. But if we don't, then we can avoid doing an async write operation at all. */ if (info->wsa_error != WSAEWOULDBLOCK) { - grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR; + grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR; if (status == 0) { - ret = GRPC_ENDPOINT_WRITE_DONE; - GPR_ASSERT(bytes_sent == tcp->write_slices.length); + ret = GRPC_ENDPOINT_DONE; + GPR_ASSERT(bytes_sent == tcp->write_slices->length); } else { if (socket->read_info.wsa_error != WSAECONNRESET) { char *utf8_message = gpr_format_message(info->wsa_error); @@ -336,33 +318,30 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, } } if (allocated) gpr_free(allocated); - gpr_slice_buffer_reset_and_unref(&tcp->write_slices); - tcp->socket->write_info.outstanding = 0; - tcp_unref(tcp); return ret; } + TCP_REF(tcp, "write"); + /* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same operation, this time asynchronously. */ memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED)); - status = WSASend(socket->socket, buffers, tcp->write_slices.count, + status = WSASend(socket->socket, buffers, tcp->write_slices->count, &bytes_sent, 0, &socket->write_info.overlapped, NULL); if (allocated) gpr_free(allocated); if (status != 0) { int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { - gpr_slice_buffer_reset_and_unref(&tcp->write_slices); - tcp->socket->write_info.outstanding = 0; - tcp_unref(tcp); - return GRPC_ENDPOINT_WRITE_ERROR; + TCP_UNREF(tcp, "write"); + return GRPC_ENDPOINT_ERROR; } } /* As all is now setup, we can now ask for the IOCP notification. It may trigger the callback immediately however, but no matter. */ grpc_socket_notify_on_write(socket, on_write, tcp); - return GRPC_ENDPOINT_WRITE_PENDING; + return GRPC_ENDPOINT_PENDING; } static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) { @@ -387,19 +366,17 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) { concurrent access of the data structure in that regard. */ static void win_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - int extra_refs = 0; gpr_mu_lock(&tcp->mu); /* At that point, what may happen is that we're already inside the IOCP callback. See the comments in on_read and on_write. */ tcp->shutting_down = 1; - extra_refs = grpc_winsocket_shutdown(tcp->socket); - while (extra_refs--) tcp_ref(tcp); + grpc_winsocket_shutdown(tcp->socket); gpr_mu_unlock(&tcp->mu); } static void win_destroy(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_unref(tcp); + TCP_UNREF(tcp, "destroy"); } static char *win_get_peer(grpc_endpoint *ep) { @@ -408,8 +385,8 @@ static char *win_get_peer(grpc_endpoint *ep) { } static grpc_endpoint_vtable vtable = { - win_notify_on_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_peer}; + win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, + win_shutdown, win_destroy, win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); @@ -417,7 +394,6 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { tcp->base.vtable = &vtable; tcp->socket = socket; gpr_mu_init(&tcp->mu); - gpr_slice_buffer_init(&tcp->write_slices); gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); return &tcp->base; diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 81b3e33cb2..b696e384fc 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -49,15 +49,15 @@ typedef struct { struct tsi_frame_protector *protector; gpr_mu protector_mu; /* saved upper level callbacks and user_data. */ - grpc_endpoint_read_cb read_cb; - void *read_user_data; - grpc_endpoint_write_cb write_cb; - void *write_user_data; + grpc_iomgr_closure *read_cb; + grpc_iomgr_closure *write_cb; + grpc_iomgr_closure on_read; + gpr_slice_buffer *read_buffer; + gpr_slice_buffer source_buffer; /* saved handshaker leftover data to unprotect. */ gpr_slice_buffer leftover_bytes; /* buffers for read and write */ gpr_slice read_staging_buffer; - gpr_slice_buffer input_buffer; gpr_slice write_staging_buffer; gpr_slice_buffer output_buffer; @@ -67,62 +67,91 @@ typedef struct { int grpc_trace_secure_endpoint = 0; -static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); } - static void destroy(secure_endpoint *secure_ep) { secure_endpoint *ep = secure_ep; grpc_endpoint_destroy(ep->wrapped_ep); tsi_frame_protector_destroy(ep->protector); gpr_slice_buffer_destroy(&ep->leftover_bytes); gpr_slice_unref(ep->read_staging_buffer); - gpr_slice_buffer_destroy(&ep->input_buffer); gpr_slice_unref(ep->write_staging_buffer); gpr_slice_buffer_destroy(&ep->output_buffer); + gpr_slice_buffer_destroy(&ep->source_buffer); gpr_mu_destroy(&ep->protector_mu); gpr_free(ep); } +/*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/ +#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG +#define SECURE_ENDPOINT_UNREF(ep, reason) \ + secure_endpoint_unref((ep), (reason), __FILE__, __LINE__) +#define SECURE_ENDPOINT_REF(ep, reason) \ + secure_endpoint_ref((ep), (reason), __FILE__, __LINE__) +static void secure_endpoint_unref(secure_endpoint *ep, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d", + ep, reason, ep->ref.count, ep->ref.count - 1); + if (gpr_unref(&ep->ref)) { + destroy(ep); + } +} + +static void secure_endpoint_ref(secure_endpoint *ep, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP ref %p : %s %d -> %d", + ep, reason, ep->ref.count, ep->ref.count + 1); + gpr_ref(&ep->ref); +} +#else +#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep)) +#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep)) static void secure_endpoint_unref(secure_endpoint *ep) { if (gpr_unref(&ep->ref)) { destroy(ep); } } +static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); } +#endif + static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, gpr_uint8 **end) { - gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer); + gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer); ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); } -static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { +static void call_read_cb(secure_endpoint *ep, int success) { if (grpc_trace_secure_endpoint) { size_t i; - for (i = 0; i < nslices; i++) { - char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + for (i = 0; i < ep->read_buffer->count; i++) { + char *data = gpr_dump_slice(ep->read_buffer->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p: %s", ep, data); gpr_free(data); } } - ep->read_cb(ep->read_user_data, slices, nslices, error); - secure_endpoint_unref(ep); + ep->read_buffer = NULL; + ep->read_cb->cb(ep->read_cb->cb_arg, success); + SECURE_ENDPOINT_UNREF(ep, "read"); } -static void on_read(void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { +static int on_read(void *user_data, int success) { unsigned i; gpr_uint8 keep_looping = 0; - size_t input_buffer_count = 0; tsi_result result = TSI_OK; secure_endpoint *ep = (secure_endpoint *)user_data; gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer); gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer); + if (!success) { + gpr_slice_buffer_reset_and_unref(ep->read_buffer); + return 0; + } + /* TODO(yangg) check error, maybe bail out early */ - for (i = 0; i < nslices; i++) { - gpr_slice encrypted = slices[i]; + for (i = 0; i < ep->source_buffer.count; i++) { + gpr_slice encrypted = ep->source_buffer.slices[i]; gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted); size_t message_size = GPR_SLICE_LENGTH(encrypted); @@ -161,7 +190,7 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) { gpr_slice_buffer_add( - &ep->input_buffer, + ep->read_buffer, gpr_slice_split_head( &ep->read_staging_buffer, (size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer)))); @@ -169,38 +198,53 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, /* TODO(yangg) experiment with moving this block after read_cb to see if it helps latency */ - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); - } + gpr_slice_buffer_reset_and_unref(&ep->source_buffer); if (result != TSI_OK) { - gpr_slice_buffer_reset_and_unref(&ep->input_buffer); - call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR); - return; + gpr_slice_buffer_reset_and_unref(ep->read_buffer); + return 0; } - /* The upper level will unref the slices. */ - input_buffer_count = ep->input_buffer.count; - ep->input_buffer.count = 0; - call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error); + + return 1; +} + +static void on_read_cb(void *user_data, int success) { + call_read_cb(user_data, on_read(user_data, success)); } -static void endpoint_notify_on_read(grpc_endpoint *secure_ep, - grpc_endpoint_read_cb cb, void *user_data) { +static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { secure_endpoint *ep = (secure_endpoint *)secure_ep; + int immediate_read_success = -1; ep->read_cb = cb; - ep->read_user_data = user_data; - - secure_endpoint_ref(ep); + ep->read_buffer = slices; + gpr_slice_buffer_reset_and_unref(ep->read_buffer); if (ep->leftover_bytes.count) { - size_t leftover_nslices = ep->leftover_bytes.count; - ep->leftover_bytes.count = 0; - on_read(ep, ep->leftover_bytes.slices, leftover_nslices, - GRPC_ENDPOINT_CB_OK); - return; + gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer); + GPR_ASSERT(ep->leftover_bytes.count == 0); + return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; } - grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep); + SECURE_ENDPOINT_REF(ep, "read"); + + switch ( + grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) { + case GRPC_ENDPOINT_DONE: + immediate_read_success = on_read(ep, 1); + break; + case GRPC_ENDPOINT_PENDING: + return GRPC_ENDPOINT_PENDING; + case GRPC_ENDPOINT_ERROR: + immediate_read_success = on_read(ep, 0); + break; + } + + GPR_ASSERT(immediate_read_success != -1); + SECURE_ENDPOINT_UNREF(ep, "read"); + + return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR; } static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, @@ -211,36 +255,28 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur, *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); } -static void on_write(void *data, grpc_endpoint_cb_status error) { - secure_endpoint *ep = data; - ep->write_cb(ep->write_user_data, error); - secure_endpoint_unref(ep); -} - -static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep, - gpr_slice *slices, - size_t nslices, - grpc_endpoint_write_cb cb, - void *user_data) { +static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep, + gpr_slice_buffer *slices, + grpc_iomgr_closure *cb) { unsigned i; - size_t output_buffer_count = 0; tsi_result result = TSI_OK; secure_endpoint *ep = (secure_endpoint *)secure_ep; gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer); gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer); - grpc_endpoint_write_status status; - GPR_ASSERT(ep->output_buffer.count == 0); + + gpr_slice_buffer_reset_and_unref(&ep->output_buffer); if (grpc_trace_secure_endpoint) { - for (i = 0; i < nslices; i++) { - char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + for (i = 0; i < slices->count; i++) { + char *data = + gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data); gpr_free(data); } } - for (i = 0; i < nslices; i++) { - gpr_slice plain = slices[i]; + for (i = 0; i < slices->count; i++) { + gpr_slice plain = slices->slices[i]; gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain); size_t message_size = GPR_SLICE_LENGTH(plain); while (message_size > 0) { @@ -290,29 +326,13 @@ static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep, } } - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); - } - if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - return GRPC_ENDPOINT_WRITE_ERROR; + return GRPC_ENDPOINT_ERROR; } - /* clear output_buffer and let the lower level handle its slices. */ - output_buffer_count = ep->output_buffer.count; - ep->output_buffer.count = 0; - ep->write_cb = cb; - ep->write_user_data = user_data; - /* Need to keep the endpoint alive across a transport */ - secure_endpoint_ref(ep); - status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices, - output_buffer_count, on_write, ep); - if (status != GRPC_ENDPOINT_WRITE_PENDING) { - secure_endpoint_unref(ep); - } - return status; + return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); } static void endpoint_shutdown(grpc_endpoint *secure_ep) { @@ -320,9 +340,9 @@ static void endpoint_shutdown(grpc_endpoint *secure_ep) { grpc_endpoint_shutdown(ep->wrapped_ep); } -static void endpoint_unref(grpc_endpoint *secure_ep) { +static void endpoint_destroy(grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; - secure_endpoint_unref(ep); + SECURE_ENDPOINT_UNREF(ep, "destroy"); } static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, @@ -343,9 +363,9 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) { } static const grpc_endpoint_vtable vtable = { - endpoint_notify_on_read, endpoint_write, + endpoint_read, endpoint_write, endpoint_add_to_pollset, endpoint_add_to_pollset_set, - endpoint_shutdown, endpoint_unref, + endpoint_shutdown, endpoint_destroy, endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( @@ -363,8 +383,10 @@ grpc_endpoint *grpc_secure_endpoint_create( } ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE); - gpr_slice_buffer_init(&ep->input_buffer); gpr_slice_buffer_init(&ep->output_buffer); + gpr_slice_buffer_init(&ep->source_buffer); + ep->read_buffer = NULL; + grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep); gpr_mu_init(&ep->protector_mu); gpr_ref_init(&ep->ref, 1); return &ep->base; diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index 0c3572b53c..bf0079577e 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -50,16 +50,17 @@ typedef struct { grpc_endpoint *wrapped_endpoint; grpc_endpoint *secure_endpoint; gpr_slice_buffer left_overs; + gpr_slice_buffer incoming; + gpr_slice_buffer outgoing; grpc_secure_transport_setup_done_cb cb; void *user_data; + grpc_iomgr_closure on_handshake_data_sent_to_peer; + grpc_iomgr_closure on_handshake_data_received_from_peer; } grpc_secure_transport_setup; -static void on_handshake_data_received_from_peer(void *setup, gpr_slice *slices, - size_t nslices, - grpc_endpoint_cb_status error); +static void on_handshake_data_received_from_peer(void *setup, int success); -static void on_handshake_data_sent_to_peer(void *setup, - grpc_endpoint_cb_status error); +static void on_handshake_data_sent_to_peer(void *setup, int success); static void secure_transport_setup_done(grpc_secure_transport_setup *s, int is_success) { @@ -78,6 +79,8 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s, if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); gpr_slice_buffer_destroy(&s->left_overs); + gpr_slice_buffer_destroy(&s->outgoing); + gpr_slice_buffer_destroy(&s->incoming); GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup"); gpr_free(s); } @@ -102,6 +105,8 @@ static void on_peer_checked(void *user_data, grpc_security_status status) { s->secure_endpoint = grpc_secure_endpoint_create(protector, s->wrapped_endpoint, s->left_overs.slices, s->left_overs.count); + s->left_overs.count = 0; + s->left_overs.length = 0; secure_transport_setup_done(s, 1); return; } @@ -132,7 +137,6 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { size_t offset = 0; tsi_result result = TSI_OK; gpr_slice to_send; - grpc_endpoint_write_status write_status; do { size_t to_send_size = s->handshake_buffer_size - offset; @@ -155,28 +159,25 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { to_send = gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); + gpr_slice_buffer_reset_and_unref(&s->outgoing); + gpr_slice_buffer_add(&s->outgoing, to_send); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1, - on_handshake_data_sent_to_peer, s); - if (write_status == GRPC_ENDPOINT_WRITE_ERROR) { - gpr_log(GPR_ERROR, "Could not send handshake data to peer."); - secure_transport_setup_done(s, 0); - } else if (write_status == GRPC_ENDPOINT_WRITE_DONE) { - on_handshake_data_sent_to_peer(s, GRPC_ENDPOINT_CB_OK); - } -} - -static void cleanup_slices(gpr_slice *slices, size_t num_slices) { - size_t i; - for (i = 0; i < num_slices; i++) { - gpr_slice_unref(slices[i]); + switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing, + &s->on_handshake_data_sent_to_peer)) { + case GRPC_ENDPOINT_ERROR: + gpr_log(GPR_ERROR, "Could not send handshake data to peer."); + secure_transport_setup_done(s, 0); + break; + case GRPC_ENDPOINT_DONE: + on_handshake_data_sent_to_peer(s, 1); + break; + case GRPC_ENDPOINT_PENDING: + break; } } -static void on_handshake_data_received_from_peer( - void *setup, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { +static void on_handshake_data_received_from_peer(void *setup, int success) { grpc_secure_transport_setup *s = setup; size_t consumed_slice_size = 0; tsi_result result = TSI_OK; @@ -184,32 +185,37 @@ static void on_handshake_data_received_from_peer( size_t num_left_overs; int has_left_overs_in_current_slice = 0; - if (error != GRPC_ENDPOINT_CB_OK) { + if (!success) { gpr_log(GPR_ERROR, "Read failed."); - cleanup_slices(slices, nslices); secure_transport_setup_done(s, 0); return; } - for (i = 0; i < nslices; i++) { - consumed_slice_size = GPR_SLICE_LENGTH(slices[i]); + for (i = 0; i < s->incoming.count; i++) { + consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]); result = tsi_handshaker_process_bytes_from_peer( - s->handshaker, GPR_SLICE_START_PTR(slices[i]), &consumed_slice_size); + s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]), + &consumed_slice_size); if (!tsi_handshaker_is_in_progress(s->handshaker)) break; } if (tsi_handshaker_is_in_progress(s->handshaker)) { /* We may need more data. */ if (result == TSI_INCOMPLETE_DATA) { - /* TODO(klempner,jboeuf): This should probably use the client setup - deadline */ - grpc_endpoint_notify_on_read(s->wrapped_endpoint, - on_handshake_data_received_from_peer, setup); - cleanup_slices(slices, nslices); + switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, + &s->on_handshake_data_received_from_peer)) { + case GRPC_ENDPOINT_DONE: + on_handshake_data_received_from_peer(s, 1); + break; + case GRPC_ENDPOINT_ERROR: + on_handshake_data_received_from_peer(s, 0); + break; + case GRPC_ENDPOINT_PENDING: + break; + } return; } else { send_handshake_bytes_to_peer(s); - cleanup_slices(slices, nslices); return; } } @@ -217,42 +223,40 @@ static void on_handshake_data_received_from_peer( if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshake failed with error %s", tsi_result_to_string(result)); - cleanup_slices(slices, nslices); secure_transport_setup_done(s, 0); return; } /* Handshake is done and successful this point. */ has_left_overs_in_current_slice = - (consumed_slice_size < GPR_SLICE_LENGTH(slices[i])); - num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + nslices - i - 1; + (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i])); + num_left_overs = + (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1; if (num_left_overs == 0) { - cleanup_slices(slices, nslices); check_peer(s); return; } - cleanup_slices(slices, nslices - num_left_overs); - /* Put the leftovers in our buffer (ownership transfered). */ if (has_left_overs_in_current_slice) { - gpr_slice_buffer_add(&s->left_overs, - gpr_slice_split_tail(&slices[i], consumed_slice_size)); - gpr_slice_unref(slices[i]); /* split_tail above increments refcount. */ + gpr_slice_buffer_add( + &s->left_overs, + gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size)); + gpr_slice_unref( + s->incoming.slices[i]); /* split_tail above increments refcount. */ } gpr_slice_buffer_addn( - &s->left_overs, &slices[i + 1], + &s->left_overs, &s->incoming.slices[i + 1], num_left_overs - (size_t)has_left_overs_in_current_slice); check_peer(s); } /* If setup is NULL, the setup is done. */ -static void on_handshake_data_sent_to_peer(void *setup, - grpc_endpoint_cb_status error) { +static void on_handshake_data_sent_to_peer(void *setup, int success) { grpc_secure_transport_setup *s = setup; /* Make sure that write is OK. */ - if (error != GRPC_ENDPOINT_CB_OK) { - gpr_log(GPR_ERROR, "Write failed with error %d.", error); + if (!success) { + gpr_log(GPR_ERROR, "Write failed."); if (setup != NULL) secure_transport_setup_done(s, 0); return; } @@ -261,8 +265,17 @@ static void on_handshake_data_sent_to_peer(void *setup, if (tsi_handshaker_is_in_progress(s->handshaker)) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_notify_on_read(s->wrapped_endpoint, - on_handshake_data_received_from_peer, setup); + switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming, + &s->on_handshake_data_received_from_peer)) { + case GRPC_ENDPOINT_ERROR: + on_handshake_data_received_from_peer(s, 0); + break; + case GRPC_ENDPOINT_PENDING: + break; + case GRPC_ENDPOINT_DONE: + on_handshake_data_received_from_peer(s, 1); + break; + } } else { check_peer(s); } @@ -288,6 +301,12 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, s->wrapped_endpoint = nonsecure_endpoint; s->user_data = user_data; s->cb = cb; + grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer, + on_handshake_data_sent_to_peer, s); + grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer, + on_handshake_data_received_from_peer, s); gpr_slice_buffer_init(&s->left_overs); + gpr_slice_buffer_init(&s->outgoing); + gpr_slice_buffer_init(&s->incoming); send_handshake_bytes_to_peer(s); } diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index 987d5cb9b5..6482ef9c9f 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -207,3 +207,25 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) { src->count = 0; src->length = 0; } + +void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) { + GPR_ASSERT(n <= sb->length); + sb->length -= n; + for (;;) { + size_t idx = sb->count - 1; + gpr_slice slice = sb->slices[idx]; + size_t slice_len = GPR_SLICE_LENGTH(slice); + if (slice_len > n) { + sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n); + return; + } else if (slice_len == n) { + gpr_slice_unref(slice); + sb->count = idx; + return; + } else { + gpr_slice_unref(slice); + n -= slice_len; + sb->count = idx; + } + } +} diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c index df23492171..f546477067 100644 --- a/src/core/support/sync_win32.c +++ b/src/core/support/sync_win32.c @@ -88,9 +88,9 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) { SleepConditionVariableCS(cv, &mu->cs, INFINITE); } else { gpr_timespec now = gpr_now(abs_deadline.clock_type); - gpr_int64 now_ms = now.tv_sec * 1000 + now.tv_nsec / 1000000; + gpr_int64 now_ms = (gpr_int64)now.tv_sec * 1000 + now.tv_nsec / 1000000; gpr_int64 deadline_ms = - abs_deadline.tv_sec * 1000 + abs_deadline.tv_nsec / 1000000; + (gpr_int64)abs_deadline.tv_sec * 1000 + abs_deadline.tv_nsec / 1000000; if (now_ms >= deadline_ms) { timeout = 1; } else { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 4426bbbce9..a8b4d65fbc 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -630,9 +630,6 @@ static void unlock(grpc_call *call) { call->cancel_alarm = 0; if (!call->receiving && need_more_data(call)) { - op.recv_ops = &call->recv_ops; - op.recv_state = &call->recv_state; - op.on_done_recv = &call->on_done_recv; if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { op.max_recv_bytes = call->incoming_message_length - call->incoming_message.length + MAX_RECV_PEEK_AHEAD; @@ -644,9 +641,16 @@ static void unlock(grpc_call *call) { op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; } } - call->receiving = 1; - GRPC_CALL_INTERNAL_REF(call, "receiving"); - start_op = 1; + /* TODO(ctiller): 1024 is basically to cover a bug + I don't understand yet */ + if (op.max_recv_bytes > 1024) { + op.recv_ops = &call->recv_ops; + op.recv_state = &call->recv_state; + op.on_done_recv = &call->on_done_recv; + call->receiving = 1; + GRPC_CALL_INTERNAL_REF(call, "receiving"); + start_op = 1; + } } if (!call->sending) { diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 8de024aaea..74dc09e36e 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H #define GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H -/* Internal API for completion channels */ +/* Internal API for completion queues */ #include "src/core/iomgr/pollset.h" #include <grpc/grpc.h> diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 42cf0ecd5b..7a42de9245 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -214,6 +214,8 @@ typedef struct { grpc_chttp2_hpack_compressor hpack_compressor; /** is this a client? */ gpr_uint8 is_client; + /** callback for when writing is done */ + grpc_iomgr_closure done_cb; } grpc_chttp2_transport_writing; struct grpc_chttp2_transport_parsing { @@ -291,6 +293,9 @@ struct grpc_chttp2_transport { gpr_refcount refs; char *peer_string; + /** when this drops to zero it's safe to shutdown the endpoint */ + gpr_refcount shutdown_ep_refs; + gpr_mu mu; /** is the transport destroying itself? */ @@ -329,8 +334,11 @@ struct grpc_chttp2_transport { /** closure to execute writing */ grpc_iomgr_closure writing_action; - /** closure to start reading from the endpoint */ - grpc_iomgr_closure reading_action; + /** closure to finish reading from the endpoint */ + grpc_iomgr_closure recv_data; + + /** incoming read bytes */ + gpr_slice_buffer read_buffer; /** address to place a newly accepted stream - set and unset by grpc_chttp2_parsing_accept_stream; used by init_stream to @@ -463,8 +471,7 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); void grpc_chttp2_perform_writes( grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint); -void grpc_chttp2_terminate_writing( - grpc_chttp2_transport_writing *transport_writing, int success); +void grpc_chttp2_terminate_writing(void *transport_writing, int success); void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 123061b3fc..ac79044e08 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -37,7 +37,6 @@ #include <grpc/support/log.h> static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing); -static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status); int grpc_chttp2_unlocking_check_writes( grpc_chttp2_transport_global *transport_global, @@ -114,6 +113,10 @@ int grpc_chttp2_unlocking_check_writes( if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { + GPR_ASSERT(stream_writing->announce_window == 0); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "write", transport_writing, stream_writing, announce_window, + stream_global->unannounced_incoming_window); stream_writing->announce_window = stream_global->unannounced_incoming_window; GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( @@ -165,16 +168,15 @@ void grpc_chttp2_perform_writes( GPR_ASSERT(transport_writing->outbuf.count > 0); GPR_ASSERT(endpoint); - switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices, - transport_writing->outbuf.count, finish_write_cb, - transport_writing)) { - case GRPC_ENDPOINT_WRITE_DONE: + switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf, + &transport_writing->done_cb)) { + case GRPC_ENDPOINT_DONE: grpc_chttp2_terminate_writing(transport_writing, 1); break; - case GRPC_ENDPOINT_WRITE_ERROR: + case GRPC_ENDPOINT_ERROR: grpc_chttp2_terminate_writing(transport_writing, 0); break; - case GRPC_ENDPOINT_WRITE_PENDING: + case GRPC_ENDPOINT_PENDING: break; } } @@ -198,6 +200,9 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { &transport_writing->outbuf, grpc_chttp2_window_update_create(stream_writing->id, stream_writing->announce_window)); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM( + "write", transport_writing, stream_writing, announce_window, + -(gpr_int64)stream_writing->announce_window); stream_writing->announce_window = 0; } if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) { @@ -209,12 +214,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) { } } -static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) { - grpc_chttp2_transport_writing *transport_writing = tw; - grpc_chttp2_terminate_writing(transport_writing, - write_status == GRPC_ENDPOINT_CB_OK); -} - void grpc_chttp2_cleanup_writing( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing) { @@ -243,6 +242,5 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); } - transport_writing->outbuf.count = 0; - transport_writing->outbuf.length = 0; + gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 1bbd210e46..9e3d7dd551 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -84,15 +84,13 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t); /* forward declarations of various callbacks that we'll build closures around */ static void writing_action(void *t, int iomgr_success_ignored); -static void reading_action(void *t, int iomgr_success_ignored); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); /** Endpoint callback to process incoming data */ -static void recv_data(void *tp, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error); +static void recv_data(void *tp, int success); /** Start disconnection chain */ static void drop_connection(grpc_chttp2_transport *t); @@ -143,6 +141,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor); gpr_slice_buffer_destroy(&t->parsing.qbuf); + gpr_slice_buffer_destroy(&t->read_buffer); grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser); grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser); @@ -223,6 +222,8 @@ static void init_transport(grpc_chttp2_transport *t, t->ep = ep; /* one ref is for destroy, the other for when ep becomes NULL */ gpr_ref_init(&t->refs, 2); + /* ref is dropped at transport close() */ + gpr_ref_init(&t->shutdown_ep_refs, 1); gpr_mu_init(&t->mu); grpc_mdctx_ref(mdctx); t->peer_string = grpc_endpoint_get_peer(ep); @@ -249,12 +250,16 @@ static void init_transport(grpc_chttp2_transport *t, gpr_slice_buffer_init(&t->writing.outbuf); grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx); grpc_iomgr_closure_init(&t->writing_action, writing_action, t); - grpc_iomgr_closure_init(&t->reading_action, reading_action, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context); + grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing, + &t->writing); + grpc_iomgr_closure_init(&t->recv_data, recv_data, t); + gpr_slice_buffer_init(&t->read_buffer); + if (is_client) { gpr_slice_buffer_add( &t->global.qbuf, @@ -333,13 +338,45 @@ static void destroy_transport(grpc_transport *gt) { UNREF_TRANSPORT(t, "destroy"); } +/** block grpc_endpoint_shutdown being called until a paired + allow_endpoint_shutdown is made */ +static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) { + GPR_ASSERT(t->ep); + gpr_ref(&t->shutdown_ep_refs); +} + +static void allow_endpoint_shutdown_locked(grpc_chttp2_transport *t) { + if (gpr_unref(&t->shutdown_ep_refs)) { + if (t->ep) { + grpc_endpoint_shutdown(t->ep); + } + } +} + +static void allow_endpoint_shutdown_unlocked(grpc_chttp2_transport *t) { + if (gpr_unref(&t->shutdown_ep_refs)) { + gpr_mu_lock(&t->mu); + if (t->ep) { + grpc_endpoint_shutdown(t->ep); + } + gpr_mu_unlock(&t->mu); + } +} + +static void destroy_endpoint(grpc_chttp2_transport *t) { + grpc_endpoint_destroy(t->ep); + t->ep = NULL; + UNREF_TRANSPORT( + t, "disconnect"); /* safe because we'll still have the ref for write */ +} + static void close_transport_locked(grpc_chttp2_transport *t) { if (!t->closed) { t->closed = 1; connectivity_state_set(&t->global, GRPC_CHANNEL_FATAL_FAILURE, "close_transport"); if (t->ep) { - grpc_endpoint_shutdown(t->ep); + allow_endpoint_shutdown_locked(t); } } } @@ -468,6 +505,7 @@ static void unlock(grpc_chttp2_transport *t) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1); + prevent_endpoint_shutdown(t); } run_closures = t->global.pending_closures_head; @@ -502,12 +540,14 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, } } -void grpc_chttp2_terminate_writing( - grpc_chttp2_transport_writing *transport_writing, int success) { +void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) { + grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr; grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing); lock(t); + allow_endpoint_shutdown_locked(t); + if (!success) { drop_connection(t); } @@ -519,10 +559,7 @@ void grpc_chttp2_terminate_writing( from starting */ t->writing_active = 0; if (t->ep && !t->endpoint_reading) { - grpc_endpoint_destroy(t->ep); - t->ep = NULL; - UNREF_TRANSPORT( - t, "disconnect"); /* safe because we'll still have the ref for write */ + destroy_endpoint(t); } unlock(t); @@ -658,9 +695,9 @@ static void perform_stream_op_locked( } grpc_chttp2_incoming_metadata_live_op_buffer_end( &stream_global->outstanding_metadata); + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); if (stream_global->id != 0) { - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } @@ -1052,82 +1089,90 @@ static void update_global_window(void *args, gpr_uint32 id, void *stream) { static void read_error_locked(grpc_chttp2_transport *t) { t->endpoint_reading = 0; if (!t->writing_active && t->ep) { - grpc_endpoint_destroy(t->ep); - t->ep = NULL; - /* safe as we still have a ref for read */ - UNREF_TRANSPORT(t, "disconnect"); + destroy_endpoint(t); } } /* tcp read callback */ -static void recv_data(void *tp, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { - grpc_chttp2_transport *t = tp; +static int recv_data_loop(grpc_chttp2_transport *t, int *success) { size_t i; - int unref = 0; + int keep_reading = 0; - switch (error) { - case GRPC_ENDPOINT_CB_SHUTDOWN: - case GRPC_ENDPOINT_CB_EOF: - case GRPC_ENDPOINT_CB_ERROR: - lock(t); + lock(t); + i = 0; + GPR_ASSERT(!t->parsing_active); + if (!t->closed) { + t->parsing_active = 1; + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + grpc_chttp2_prepare_to_read(&t->global, &t->parsing); + gpr_mu_unlock(&t->mu); + for (; i < t->read_buffer.count && + grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]); + i++) + ; + gpr_mu_lock(&t->mu); + if (i != t->read_buffer.count) { drop_connection(t); - read_error_locked(t); - unlock(t); - unref = 1; - for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]); - break; - case GRPC_ENDPOINT_CB_OK: - lock(t); - i = 0; - GPR_ASSERT(!t->parsing_active); - if (!t->closed) { - t->parsing_active = 1; - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - grpc_chttp2_prepare_to_read(&t->global, &t->parsing); - gpr_mu_unlock(&t->mu); - for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]); - i++) { - gpr_slice_unref(slices[i]); - } - gpr_mu_lock(&t->mu); - if (i != nslices) { - drop_connection(t); - } - /* merge stream lists */ - grpc_chttp2_stream_map_move_into(&t->new_stream_map, - &t->parsing_stream_map); - t->global.concurrent_stream_count = - grpc_chttp2_stream_map_size(&t->parsing_stream_map); - if (t->parsing.initial_window_update != 0) { - grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); - t->parsing.initial_window_update = 0; - } - /* handle higher level things */ - grpc_chttp2_publish_reads(&t->global, &t->parsing); - t->parsing_active = 0; - } - if (i == nslices) { - grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1); - } else { - read_error_locked(t); - unref = 1; - } - unlock(t); - for (; i < nslices; i++) gpr_slice_unref(slices[i]); - break; + } + /* merge stream lists */ + grpc_chttp2_stream_map_move_into(&t->new_stream_map, + &t->parsing_stream_map); + t->global.concurrent_stream_count = + grpc_chttp2_stream_map_size(&t->parsing_stream_map); + if (t->parsing.initial_window_update != 0) { + grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, + update_global_window, t); + t->parsing.initial_window_update = 0; + } + /* handle higher level things */ + grpc_chttp2_publish_reads(&t->global, &t->parsing); + t->parsing_active = 0; + } + if (!*success || i != t->read_buffer.count) { + drop_connection(t); + read_error_locked(t); + } else if (!t->closed) { + keep_reading = 1; + REF_TRANSPORT(t, "keep_reading"); + prevent_endpoint_shutdown(t); } - if (unref) { + gpr_slice_buffer_reset_and_unref(&t->read_buffer); + unlock(t); + + if (keep_reading) { + int ret = -1; + switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) { + case GRPC_ENDPOINT_DONE: + *success = 1; + ret = 1; + break; + case GRPC_ENDPOINT_ERROR: + *success = 0; + ret = 1; + break; + case GRPC_ENDPOINT_PENDING: + ret = 0; + break; + } + allow_endpoint_shutdown_unlocked(t); + UNREF_TRANSPORT(t, "keep_reading"); + return ret; + } else { UNREF_TRANSPORT(t, "recv_data"); + return 0; } + + gpr_log(GPR_ERROR, "should never reach here"); + abort(); } -static void reading_action(void *pt, int iomgr_success_ignored) { - grpc_chttp2_transport *t = pt; - grpc_endpoint_notify_on_read(t->ep, recv_data, t); +static void recv_data(void *tp, int success) { + grpc_chttp2_transport *t = tp; + + while (recv_data_loop(t, &success)) + ; } /* @@ -1240,5 +1285,6 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport, gpr_slice *slices, size_t nslices) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ - recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); + gpr_slice_buffer_addn(&t->read_buffer, slices, nslices); + recv_data(t, 1); } diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index e46e656beb..755234d7e8 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -45,6 +45,12 @@ ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) { buffer_ = grpc_raw_byte_buffer_create(c_slices.data(), nslices); } +ByteBuffer::~ByteBuffer() { + if (buffer_) { + grpc_byte_buffer_destroy(buffer_); + } +} + void ByteBuffer::Clear() { if (buffer_) { grpc_byte_buffer_destroy(buffer_); diff --git a/src/node/README.md b/src/node/README.md index 0b97680feb..7719d08290 100644 --- a/src/node/README.md +++ b/src/node/README.md @@ -11,10 +11,10 @@ Beta **Linux (Debian):** -Add [Debian testing][] to your `sources.list` file. Example: +Add [Debian jessie-backports][] to your `sources.list` file. Example: ```sh -echo "deb http://ftp.us.debian.org/debian testing main contrib non-free" | \ +echo "deb http://http.debian.net/debian jessie-backports main" | \ sudo tee -a /etc/apt/sources.list ``` @@ -113,4 +113,4 @@ An object with factory methods for creating credential objects for servers. [homebrew]:http://brew.sh [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install -[Debian testing]:https://www.debian.org/releases/stretch/ +[Debian jessie-backports]:http://backports.debian.org/Instructions/ diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 6a8d2633ca..215d42121c 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -44,7 +44,7 @@ var assert = require('assert'); var AUTH_SCOPE = 'https://www.googleapis.com/auth/xapi.zoo'; var AUTH_SCOPE_RESPONSE = 'xapi.zoo'; -var AUTH_USER = ('155450119199-3psnrh1sdr3d8cpj1v46naggf81mhdnk' + +var AUTH_USER = ('155450119199-vefjjaekcc6cmsd5914v6lqufunmh9ue' + '@developer.gserviceaccount.com'); var COMPUTE_ENGINE_USER = ('155450119199-r5aaqa2vqoa9g5mv2m6s3m1l293rlmel' + '@developer.gserviceaccount.com'); @@ -321,7 +321,7 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) { credential.getAccessToken(function(err, token) { assert.ifError(err); var updateMetadata = function(authURI, metadata, callback) { - metadata.Add('authorization', 'Bearer ' + token); + metadata.add('authorization', 'Bearer ' + token); callback(null, metadata); }; var makeTestCall = function(error, client_metadata) { @@ -336,10 +336,10 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) { }, client_metadata); }; if (per_rpc) { - updateMetadata('', {}, makeTestCall); + updateMetadata('', new grpc.Metadata(), makeTestCall); } else { client.$updateMetadata = updateMetadata; - makeTestCall(null, {}); + makeTestCall(null, new grpc.Metadata()); } }); }); diff --git a/src/node/src/client.js b/src/node/src/client.js index b427297a8a..7f510231b3 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -637,7 +637,7 @@ exports.makeClientConstructor = function(methods, serviceName) { // Remove the optional DNS scheme, trailing port, and trailing backslash address = address.replace(/^(dns:\/{3})?([^:\/]+)(:\d+)?\/?$/, '$2'); this.$server_address = address; - this.$auth_uri = 'https://' + this.server_address + '/' + serviceName; + this.$auth_uri = 'https://' + this.$server_address + '/' + serviceName; this.$updateMetadata = updateMetadata; } diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h index 4eda499b1a..35f7e16af7 100644 --- a/src/objective-c/GRPCClient/GRPCCall.h +++ b/src/objective-c/GRPCClient/GRPCCall.h @@ -48,11 +48,112 @@ #import <Foundation/Foundation.h> #import <RxLibrary/GRXWriter.h> +#pragma mark gRPC errors + +// Domain of NSError objects produced by gRPC. +extern NSString *const kGRPCErrorDomain; + +// gRPC error codes. +// Note that a few of these are never produced by the gRPC libraries, but are of general utility for +// server applications to produce. +typedef NS_ENUM(NSUInteger, GRPCErrorCode) { + // The operation was cancelled (typically by the caller). + GRPCErrorCodeCancelled = 1, + + // Unknown error. Errors raised by APIs that do not return enough error information may be + // converted to this error. + GRPCErrorCodeUnknown = 2, + + // The client specified an invalid argument. Note that this differs from FAILED_PRECONDITION. + // INVALID_ARGUMENT indicates arguments that are problematic regardless of the state of the + // server (e.g., a malformed file name). + GRPCErrorCodeInvalidArgument = 3, + + // Deadline expired before operation could complete. For operations that change the state of the + // server, this error may be returned even if the operation has completed successfully. For + // example, a successful response from the server could have been delayed long enough for the + // deadline to expire. + GRPCErrorCodeDeadlineExceeded = 4, + + // Some requested entity (e.g., file or directory) was not found. + GRPCErrorCodeNotFound = 5, + + // Some entity that we attempted to create (e.g., file or directory) already exists. + GRPCErrorCodeAlreadyExists = 6, + + // The caller does not have permission to execute the specified operation. PERMISSION_DENIED isn't + // used for rejections caused by exhausting some resource (RESOURCE_EXHAUSTED is used instead for + // those errors). PERMISSION_DENIED doesn't indicate a failure to identify the caller + // (UNAUTHENTICATED is used instead for those errors). + GRPCErrorCodePermissionDenied = 7, + + // The request does not have valid authentication credentials for the operation (e.g. the caller's + // identity can't be verified). + GRPCErrorCodeUnauthenticated = 16, + + // Some resource has been exhausted, perhaps a per-user quota. + GRPCErrorCodeResourceExhausted = 8, + + // The RPC was rejected because the server is not in a state required for the procedure's + // execution. For example, a directory to be deleted may be non-empty, etc. + // The client should not retry until the server state has been explicitly fixed (e.g. by + // performing another RPC). The details depend on the service being called, and should be found in + // the NSError's userInfo. + GRPCErrorCodeFailedPrecondition = 9, + + // The RPC was aborted, typically due to a concurrency issue like sequencer check failures, + // transaction aborts, etc. The client should retry at a higher-level (e.g., restarting a read- + // modify-write sequence). + GRPCErrorCodeAborted = 10, + + // The RPC was attempted past the valid range. E.g., enumerating past the end of a list. + // Unlike INVALID_ARGUMENT, this error indicates a problem that may be fixed if the system state + // changes. For example, an RPC to get elements of a list will generate INVALID_ARGUMENT if asked + // to return the element at a negative index, but it will generate OUT_OF_RANGE if asked to return + // the element at an index past the current size of the list. + GRPCErrorCodeOutOfRange = 11, + + // The procedure is not implemented or not supported/enabled in this server. + GRPCErrorCodeUnimplemented = 12, + + // Internal error. Means some invariant expected by the server application or the gRPC library has + // been broken. + GRPCErrorCodeInternal = 13, + + // The server is currently unavailable. This is most likely a transient condition and may be + // corrected by retrying with a backoff. + GRPCErrorCodeUnavailable = 14, + + // Unrecoverable data loss or corruption. + GRPCErrorCodeDataLoss = 15, +}; + // Keys used in |NSError|'s |userInfo| dictionary to store the response headers and trailers sent by // the server. extern id const kGRPCHeadersKey; extern id const kGRPCTrailersKey; +#pragma mark GRPCCall + +// The container of the request headers of an RPC conforms to this protocol, which is a subset of +// NSMutableDictionary's interface. It will become a NSMutableDictionary later on. +// The keys of this container are the header names, which per the HTTP standard are case- +// insensitive. They are stored in lowercase (which is how HTTP/2 mandates them on the wire), and +// can only consist of ASCII characters. +// A header value is a NSString object (with only ASCII characters), unless the header name has the +// suffix "-bin", in which case the value has to be a NSData object. +@protocol GRPCRequestHeaders <NSObject> + +@property(nonatomic, readonly) NSUInteger count; + +- (id)objectForKeyedSubscript:(NSString *)key; +- (void)setObject:(id)obj forKeyedSubscript:(NSString *)key; + +- (void)removeAllObjects; +- (void)removeObjectForKey:(NSString *)key; + +@end + // Represents a single gRPC remote call. @interface GRPCCall : GRXWriter @@ -68,10 +169,8 @@ extern id const kGRPCTrailersKey; // // After the call is started, trying to modify this property is an error. // -// For convenience, the property is initialized to an empty NSMutableDictionary, and the setter -// accepts (and copies) both mutable and immutable dictionaries. -- (NSMutableDictionary *)requestHeaders; // nonatomic -- (void)setRequestHeaders:(NSDictionary *)requestHeaders; // nonatomic, copy +// The property is initialized to an empty NSMutableDictionary. +@property(atomic, readonly) id<GRPCRequestHeaders> requestHeaders; // This dictionary is populated with the HTTP headers received from the server. This happens before // any response message is received from the server. It has the same structure as the request diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index ff5d1c5aaf..b6986bf59c 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -37,6 +37,7 @@ #include <grpc/support/time.h> #import <RxLibrary/GRXConcurrentWriteable.h> +#import "private/GRPCRequestHeaders.h" #import "private/GRPCWrappedCall.h" #import "private/NSData+GRPC.h" #import "private/NSDictionary+GRPC.h" @@ -93,7 +94,7 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; // the response arrives. GRPCCall *_retainSelf; - NSMutableDictionary *_requestHeaders; + GRPCRequestHeaders *_requestHeaders; } @synthesize state = _state; @@ -124,21 +125,11 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; _requestWriter = requestWriter; - _requestHeaders = [NSMutableDictionary dictionary]; + _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self]; } return self; } -#pragma mark Metadata - -- (NSMutableDictionary *)requestHeaders { - return _requestHeaders; -} - -- (void)setRequestHeaders:(NSDictionary *)requestHeaders { - _requestHeaders = [NSMutableDictionary dictionaryWithDictionary:requestHeaders]; -} - #pragma mark Finish - (void)finishWithError:(NSError *)errorOrNil { @@ -230,10 +221,10 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; #pragma mark Send headers -- (void)sendHeaders:(NSDictionary *)headers { +- (void)sendHeaders:(id<GRPCRequestHeaders>)headers { // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] - initWithMetadata:headers ?: @{} handler:nil]]]; + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers + handler:nil]]]; } #pragma mark GRXWriteable implementation diff --git a/src/objective-c/GRPCClient/private/GRPCRequestHeaders.h b/src/objective-c/GRPCClient/private/GRPCRequestHeaders.h new file mode 100644 index 0000000000..1391b5725f --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCRequestHeaders.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import <Foundation/Foundation.h> +#include <grpc/grpc.h> + +#import "GRPCCall.h" + +@interface GRPCRequestHeaders : NSObject<GRPCRequestHeaders> + +@property(nonatomic, readonly) NSUInteger count; +@property(nonatomic, readonly) grpc_metadata *grpc_metadataArray; + +- (instancetype)initWithCall:(GRPCCall *)call; + +- (id)objectForKeyedSubscript:(NSString *)key; +- (void)setObject:(id)obj forKeyedSubscript:(NSString *)key; + +- (void)removeAllObjects; +- (void)removeObjectForKey:(NSString *)key; + +@end diff --git a/src/objective-c/GRPCClient/private/GRPCRequestHeaders.m b/src/objective-c/GRPCClient/private/GRPCRequestHeaders.m new file mode 100644 index 0000000000..761677ce50 --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCRequestHeaders.m @@ -0,0 +1,119 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import "GRPCRequestHeaders.h" + +#import <Foundation/Foundation.h> + +#import "../GRPCCall.h" +#import "NSDictionary+GRPC.h" + +// Used by the setter. +static void CheckIsNonNilASCII(NSString *name, NSString* value) { + if (!value) { + [NSException raise:NSInvalidArgumentException format:@"%@ cannot be nil", name]; + } + if (![value canBeConvertedToEncoding:NSASCIIStringEncoding]) { + [NSException raise:NSInvalidArgumentException + format:@"%@ %@ contains non-ASCII characters", name, value]; + } +} + +// Precondition: key isn't nil. +static void CheckKeyValuePairIsValid(NSString *key, id value) { + if ([key hasSuffix:@"-bin"]) { + if (![value isKindOfClass:NSData.class]) { + [NSException raise:NSInvalidArgumentException + format:@"Expected NSData value for header %@ ending in \"-bin\", " + @"instead got %@", key, value]; + } + } else { + if (![value isKindOfClass:NSString.class]) { + [NSException raise:NSInvalidArgumentException + format:@"Expected NSString value for header %@ not ending in \"-bin\", " + @"instead got %@", key, value]; + } + CheckIsNonNilASCII(@"Text header value", (NSString *)value); + } +} + +@implementation GRPCRequestHeaders { + __weak GRPCCall *_call; + NSMutableDictionary *_delegate; +} + +- (instancetype)initWithCall:(GRPCCall *)call { + if ((self = [super init])) { + _call = call; + _delegate = [NSMutableDictionary dictionary]; + } + return self; +} + +- (void)checkCallIsNotStarted { + if (_call.state != GRXWriterStateNotStarted) { + [NSException raise:@"Invalid modification" + format:@"Cannot modify request headers after call is started"]; + } +} + +- (id)objectForKeyedSubscript:(NSString *)key { + return _delegate[key.lowercaseString]; +} + +- (void)setObject:(id)obj forKeyedSubscript:(NSString *)key { + [self checkCallIsNotStarted]; + CheckIsNonNilASCII(@"Header name", key); + key = key.lowercaseString; + CheckKeyValuePairIsValid(key, obj); + _delegate[key] = obj; +} + +- (void)removeObjectForKey:(NSString *)key { + [self checkCallIsNotStarted]; + [_delegate removeObjectForKey:key.lowercaseString]; +} + +- (void)removeAllObjects { + [self checkCallIsNotStarted]; + [_delegate removeAllObjects]; +} + +- (NSUInteger)count { + return _delegate.count; +} + +- (grpc_metadata *)grpc_metadataArray { + return _delegate.grpc_metadataArray; +} +@end diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h index da11cbb761..4ca2766147 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h @@ -35,6 +35,7 @@ #include <grpc/grpc.h> #import "GRPCChannel.h" +#import "GRPCRequestHeaders.h" @interface GRPCOperation : NSObject @property(nonatomic, readonly) grpc_op op; @@ -44,7 +45,7 @@ @interface GRPCOpSendMetadata : GRPCOperation -- (instancetype)initWithMetadata:(NSDictionary *)metadata +- (instancetype)initWithMetadata:(GRPCRequestHeaders *)metadata handler:(void(^)())handler NS_DESIGNATED_INITIALIZER; @end diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index fe3d51da53..cea7c479e0 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -65,7 +65,7 @@ return [self initWithMetadata:nil handler:nil]; } -- (instancetype)initWithMetadata:(NSDictionary *)metadata handler:(void (^)())handler { +- (instancetype)initWithMetadata:(GRPCRequestHeaders *)metadata handler:(void (^)())handler { if (self = [super init]) { _op.op = GRPC_OP_SEND_INITIAL_METADATA; _op.data.send_initial_metadata.count = metadata.count; diff --git a/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m index 99c890e4ee..7477da7619 100644 --- a/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m +++ b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m @@ -40,8 +40,8 @@ @interface NSData (GRPCMetadata) + (instancetype)grpc_dataFromMetadataValue:(grpc_metadata *)metadata; -// Fill a metadata object with the binary value in this NSData and the given key. -- (void)grpc_initMetadata:(grpc_metadata *)metadata withKey:(NSString *)key; +// Fill a metadata object with the binary value in this NSData. +- (void)grpc_initMetadata:(grpc_metadata *)metadata; @end @implementation NSData (GRPCMetadata) @@ -50,9 +50,7 @@ return [self dataWithBytes:metadata->value length:metadata->value_length]; } -- (void)grpc_initMetadata:(grpc_metadata *)metadata withKey:(NSString *)key { - // TODO(jcanizales): Encode Unicode chars as ASCII. - metadata->key = [key stringByAppendingString:@"-bin"].UTF8String; +- (void)grpc_initMetadata:(grpc_metadata *)metadata { metadata->value = self.bytes; metadata->value_length = self.length; } @@ -63,8 +61,8 @@ @interface NSString (GRPCMetadata) + (instancetype)grpc_stringFromMetadataValue:(grpc_metadata *)metadata; -// Fill a metadata object with the textual value in this NSString and the given key. -- (void)grpc_initMetadata:(grpc_metadata *)metadata withKey:(NSString *)key; +// Fill a metadata object with the textual value in this NSString. +- (void)grpc_initMetadata:(grpc_metadata *)metadata; @end @implementation NSString (GRPCMetadata) @@ -74,22 +72,8 @@ encoding:NSASCIIStringEncoding]; } -- (void)grpc_initMetadata:(grpc_metadata *)metadata withKey:(NSString *)key { - if ([key hasSuffix:@"-bin"]) { - // Disallow this, as at best it will confuse the server. If the app really needs to send a - // textual header with a name ending in "-bin", it can be done by removing the suffix and - // encoding the NSString as a NSData object. - // - // Why raise an exception: In the most common case, the developer knows this won't happen in - // their code, so the exception isn't triggered. In the rare cases when the developer can't - // tell, it's easy enough to add a sanitizing filter before the header is set. There, the - // developer can choose whether to drop such a header, or trim its name. Doing either ourselves, - // silently, would be very unintuitive for the user. - [NSException raise:NSInvalidArgumentException - format:@"Metadata keys ending in '-bin' are reserved for NSData values."]; - } - // TODO(jcanizales): Encode Unicode chars as ASCII. - metadata->key = key.UTF8String; +// Precondition: This object contains only ASCII characters. +- (void)grpc_initMetadata:(grpc_metadata *)metadata { metadata->value = self.UTF8String; metadata->value_length = self.length; } @@ -105,8 +89,6 @@ + (instancetype)grpc_dictionaryFromMetadata:(grpc_metadata *)entries count:(size_t)count { NSMutableDictionary *metadata = [NSMutableDictionary dictionaryWithCapacity:count]; for (grpc_metadata *entry = entries; entry < entries + count; entry++) { - // TODO(jcanizales): Verify in a C library test that it's converting header names to lower case - // automatically. NSString *name = [NSString stringWithCString:entry->key encoding:NSASCIIStringEncoding]; if (!name || metadata[name]) { // Log if name is nil? @@ -114,7 +96,6 @@ } id value; if ([name hasSuffix:@"-bin"]) { - name = [name substringToIndex:name.length - 4]; value = [NSData grpc_dataFromMetadataValue:entry]; } else { value = [NSString grpc_stringFromMetadataValue:entry]; @@ -124,19 +105,21 @@ return metadata; } +// Preconditions: All keys are ASCII strings. Keys ending in -bin have NSData values; the others +// have NSString values. - (grpc_metadata *)grpc_metadataArray { grpc_metadata *metadata = gpr_malloc([self count] * sizeof(grpc_metadata)); - int i = 0; - for (id key in self) { + grpc_metadata *current = metadata; + for (NSString* key in self) { id value = self[key]; - grpc_metadata *current = &metadata[i]; - if ([value respondsToSelector:@selector(grpc_initMetadata:withKey:)]) { - [value grpc_initMetadata:current withKey:key]; + current->key = key.UTF8String; + if ([value respondsToSelector:@selector(grpc_initMetadata:)]) { + [value grpc_initMetadata:current]; } else { [NSException raise:NSInvalidArgumentException format:@"Metadata values must be NSString or NSData."]; } - i += 1; + ++current; } return metadata; } diff --git a/src/objective-c/GRPCClient/private/NSError+GRPC.h b/src/objective-c/GRPCClient/private/NSError+GRPC.h index e712791271..f4729dc8a1 100644 --- a/src/objective-c/GRPCClient/private/NSError+GRPC.h +++ b/src/objective-c/GRPCClient/private/NSError+GRPC.h @@ -34,29 +34,6 @@ #import <Foundation/Foundation.h> #include <grpc/grpc.h> -// TODO(jcanizales): Make the domain string public. -extern NSString *const kGRPCErrorDomain; - -// TODO(jcanizales): Make this public and document each code. -typedef NS_ENUM(NSInteger, GRPCErrorCode) { - GRPCErrorCodeCancelled = 1, - GRPCErrorCodeUnknown = 2, - GRPCErrorCodeInvalidArgument = 3, - GRPCErrorCodeDeadlineExceeded = 4, - GRPCErrorCodeNotFound = 5, - GRPCErrorCodeAlreadyExists = 6, - GRPCErrorCodePermissionDenied = 7, - GRPCErrorCodeUnauthenticated = 16, - GRPCErrorCodeResourceExhausted = 8, - GRPCErrorCodeFailedPrecondition = 9, - GRPCErrorCodeAborted = 10, - GRPCErrorCodeOutOfRange = 11, - GRPCErrorCodeUnimplemented = 12, - GRPCErrorCodeInternal = 13, - GRPCErrorCodeUnavailable = 14, - GRPCErrorCodeDataLoss = 15 -}; - @interface NSError (GRPC) // Returns nil if the status code is OK. Otherwise, a NSError whose code is one of |GRPCErrorCode| // and whose domain is |kGRPCErrorDomain|. diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m index 889d71a308..9bf66f347a 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.m +++ b/src/objective-c/ProtoRPC/ProtoRPC.m @@ -37,6 +37,22 @@ #import <RxLibrary/GRXWriteable.h> #import <RxLibrary/GRXWriter+Transformations.h> +static NSError *ErrorForBadProto(id proto, Class expectedClass, NSError *parsingError) { + NSDictionary *info = @{ + NSLocalizedDescriptionKey: @"Unable to parse response from the server", + NSLocalizedRecoverySuggestionErrorKey: @"If this RPC is idempotent, retry " + @"with exponential backoff. Otherwise, query the server status before " + @"retrying.", + NSUnderlyingErrorKey: parsingError, + @"Expected class": expectedClass, + @"Received value": proto, + }; + // TODO(jcanizales): Use kGRPCErrorDomain and GRPCErrorCodeInternal when they're public. + return [NSError errorWithDomain:@"io.grpc" + code:13 + userInfo:info]; +} + @implementation ProtoRPC { id<GRXWriteable> _responseWriteable; } @@ -65,14 +81,25 @@ } // A writer that serializes the proto messages to send. GRXWriter *bytesWriter = [requestsWriter map:^id(GPBMessage *proto) { - // TODO(jcanizales): Fail with an understandable error message if the requestsWriter isn't - // sending GPBMessages. + if (![proto isKindOfClass:GPBMessage.class]) { + [NSException raise:NSInvalidArgumentException + format:@"Request must be a proto message: %@", proto]; + } return [proto data]; }]; if ((self = [super initWithHost:host path:method.HTTPPath requestsWriter:bytesWriter])) { + __weak ProtoRPC *weakSelf = self; + // A writeable that parses the proto messages received. _responseWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { - [responsesWriteable writeValue:[responseClass parseFromData:value error:NULL]]; + // TODO(jcanizales): This is done in the main thread, and needs to happen in another thread. + NSError *error = nil; + id parsed = [responseClass parseFromData:value error:&error]; + if (parsed) { + [responsesWriteable writeValue:parsed]; + } else { + [weakSelf finishWithError:ErrorForBadProto(value, responseClass, error)]; + } } completionHandler:^(NSError *errorOrNil) { [responsesWriteable writesFinishedWithError:errorOrNil]; }]; diff --git a/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec b/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec index dcb0c4e500..d4f8084cb5 100644 --- a/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec +++ b/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec @@ -15,14 +15,14 @@ Pod::Spec.new do |s| ms.source_files = "*.pbobjc.{h,m}" ms.header_mappings_dir = "." ms.requires_arc = false - ms.dependency "Protobuf", "~> 3.0.0-alpha-3" + ms.dependency "Protobuf", "~> 3.0.0-alpha-4" end s.subspec "Services" do |ss| ss.source_files = "*.pbrpc.{h,m}" ss.header_mappings_dir = "." ss.requires_arc = true - ss.dependency "gRPC", "~> 0.5" + ss.dependency "gRPC", "~> 0.7" ss.dependency "#{s.name}/Messages" end end diff --git a/src/objective-c/examples/SwiftSample/Bridging-Header.h b/src/objective-c/examples/SwiftSample/Bridging-Header.h index 33db2dd1cb..65f768a760 100644 --- a/src/objective-c/examples/SwiftSample/Bridging-Header.h +++ b/src/objective-c/examples/SwiftSample/Bridging-Header.h @@ -39,6 +39,7 @@ #import <RxLibrary/GRXWriter+Immediate.h> #import <GRPCClient/GRPCCall.h> #import <ProtoRPC/ProtoMethod.h> +#import <ProtoRPC/ProtoRPC.h> #import <RemoteTest/Test.pbrpc.h> #endif diff --git a/src/objective-c/examples/SwiftSample/Podfile b/src/objective-c/examples/SwiftSample/Podfile index 7b5941eef7..3611b00863 100644 --- a/src/objective-c/examples/SwiftSample/Podfile +++ b/src/objective-c/examples/SwiftSample/Podfile @@ -1,6 +1,7 @@ source 'https://github.com/CocoaPods/Specs.git' platform :ios, '8.0' +pod 'Protobuf', :path => "../../../../third_party/protobuf" pod 'gRPC', :path => "../../../.." pod 'RemoteTest', :path => "../RemoteTestClient" diff --git a/src/objective-c/examples/SwiftSample/ViewController.swift b/src/objective-c/examples/SwiftSample/ViewController.swift index e4e7aeae49..76dad9e132 100644 --- a/src/objective-c/examples/SwiftSample/ViewController.swift +++ b/src/objective-c/examples/SwiftSample/ViewController.swift @@ -45,17 +45,37 @@ class ViewController: UIViewController { request.fillUsername = true request.fillOauthScope = true + // Example gRPC call using a generated proto client library: let service = RMTTestService(host: RemoteHost) - service.unaryCallWithRequest(request) { (response: RMTSimpleResponse?, error: NSError?) in + service.unaryCallWithRequest(request) { response, error in + if let response = response { + NSLog("1. Finished successfully with response:\n\(response)") + } else { + NSLog("1. Finished with error: \(error!)") + } + } + + + // Same but manipulating headers: + + var RPC : ProtoRPC! // Needed to convince Swift to capture by reference (__block) + RPC = service.RPCToUnaryCallWithRequest(request) { response, error in if let response = response { - NSLog("Finished successfully with response:\n\(response)") + NSLog("2. Finished successfully with response:\n\(response)") } else { - NSLog("Finished with error: \(error!)") + NSLog("2. Finished with error: \(error!)") } + NSLog("2. Response headers: \(RPC.responseHeaders)") + NSLog("2. Response trailers: \(RPC.responseTrailers)") } + RPC.requestHeaders["My-Header"] = "My value" + + RPC.start() + + // Same example call using the generic gRPC client library: let method = ProtoMethod(package: "grpc.testing", service: "TestService", method: "UnaryCall") @@ -64,14 +84,16 @@ class ViewController: UIViewController { let call = GRPCCall(host: RemoteHost, path: method.HTTPPath, requestsWriter: requestsWriter) - let responsesWriteable = GRXWriteable { (value: AnyObject?, error: NSError?) in - if let value = value as? NSData { - NSLog("Received response:\n\(RMTSimpleResponse(data: value, error: nil))") + call.requestHeaders["My-Header"] = "My value" + + call.startWithWriteable(GRXWriteable { response, error in + if let response = response as? NSData { + NSLog("3. Received response:\n\(RMTSimpleResponse(data: response, error: nil))") } else { - NSLog("Finished with error: \(error!)") + NSLog("3. Finished with error: \(error!)") } - } - - call.startWithWriteable(responsesWriteable) + NSLog("3. Response headers: \(call.responseHeaders)") + NSLog("3. Response trailers: \(call.responseTrailers)") + }) } } diff --git a/src/php/README.md b/src/php/README.md index afa09d79a1..51322c7526 100644 --- a/src/php/README.md +++ b/src/php/README.md @@ -32,10 +32,10 @@ $ sudo php -d detect_unicode=0 go-pear.phar **Linux (Debian):** -Add [Debian testing][] to your `sources.list` file. Example: +Add [Debian jessie-backports][] to your `sources.list` file. Example: ```sh -echo "deb http://ftp.us.debian.org/debian testing main contrib non-free" | \ +echo "deb http://http.debian.net/debian jessie-backports main" | \ sudo tee -a /etc/apt/sources.list ``` @@ -167,4 +167,4 @@ $ ./bin/run_gen_code_test.sh [homebrew]:http://brew.sh [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install [Node]:https://github.com/grpc/grpc/tree/master/src/node/examples -[Debian testing]:https://www.debian.org/releases/stretch/ +[Debian jessie-backports]:http://backports.debian.org/Instructions/ diff --git a/src/php/composer.json b/src/php/composer.json index 2d0fe0c87a..1d41f847ac 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -1,7 +1,7 @@ { "name": "grpc/grpc", "description": "gRPC library for PHP", - "version": "0.5.1", + "version": "0.6.0", "homepage": "http://grpc.io", "license": "BSD-3-Clause", "repositories": [ diff --git a/src/python/README.md b/src/python/README.md index 67d1a173a2..afe7c731f1 100644 --- a/src/python/README.md +++ b/src/python/README.md @@ -16,10 +16,10 @@ INSTALLATION **Linux (Debian):** -Add [Debian testing][] to your `sources.list` file. Example: +Add [Debian jessie-backports][] to your `sources.list` file. Example: ```sh -echo "deb http://ftp.us.debian.org/debian testing main contrib non-free" | \ +echo "deb http://http.debian.net/debian jessie-backports main" | \ sudo tee -a /etc/apt/sources.list ``` @@ -92,4 +92,4 @@ $ ../../tools/distrib/python/submit.py [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install [Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html [detailed example]:http://www.grpc.io/docs/installation/python.html -[Debian testing]:https://www.debian.org/releases/stretch/ +[Debian jessie-backports]:http://backports.debian.org/Instructions/ diff --git a/src/python/grpcio/grpc/_links/_constants.py b/src/python/grpcio/grpc/_links/_constants.py new file mode 100644 index 0000000000..117fc5a639 --- /dev/null +++ b/src/python/grpcio/grpc/_links/_constants.py @@ -0,0 +1,42 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Constants for use within this package.""" + +from grpc._adapter import _intermediary_low +from grpc.beta import interfaces as beta_interfaces + +LOW_STATUS_CODE_TO_HIGH_STATUS_CODE = { + low: high for low, high in zip( + _intermediary_low.Code, beta_interfaces.StatusCode) +} + +HIGH_STATUS_CODE_TO_LOW_STATUS_CODE = { + high: low for low, high in LOW_STATUS_CODE_TO_HIGH_STATUS_CODE.items() +} diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py index 1676fe7941..fecb550ae0 100644 --- a/src/python/grpcio/grpc/_links/invocation.py +++ b/src/python/grpcio/grpc/_links/invocation.py @@ -36,6 +36,7 @@ import threading import time from grpc._adapter import _intermediary_low +from grpc._links import _constants from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay @@ -168,14 +169,17 @@ class _Kernel(object): termination = links.Ticket.Termination.CANCELLATION elif event.status.code is _intermediary_low.Code.DEADLINE_EXCEEDED: termination = links.Ticket.Termination.EXPIRATION + elif event.status.code is _intermediary_low.Code.UNIMPLEMENTED: + termination = links.Ticket.Termination.REMOTE_FAILURE elif event.status.code is _intermediary_low.Code.UNKNOWN: termination = links.Ticket.Termination.LOCAL_FAILURE else: termination = links.Ticket.Termination.TRANSMISSION_FAILURE + code = _constants.LOW_STATUS_CODE_TO_HIGH_STATUS_CODE[event.status.code] ticket = links.Ticket( operation_id, rpc_state.sequence_number, None, None, None, None, None, - None, None, event.metadata, event.status.code, event.status.details, - termination, None) + None, None, event.metadata, code, event.status.details, termination, + None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 94e7cfc716..34d3b262c9 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -36,6 +36,7 @@ import threading import time from grpc._adapter import _intermediary_low +from grpc._links import _constants from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay from grpc.framework.interfaces.links import links @@ -122,13 +123,13 @@ def _metadatafy(call, metadata): call.add_metadata(metadata_key, metadata_value) -def _status(termination_kind, code, details): - effective_details = b'' if details is None else details - if code is None: - effective_code = _TERMINATION_KIND_TO_CODE[termination_kind] +def _status(termination_kind, high_code, details): + low_details = b'' if details is None else details + if high_code is None: + low_code = _TERMINATION_KIND_TO_CODE[termination_kind] else: - effective_code = code - return _intermediary_low.Status(effective_code, effective_details) + low_code = _constants.HIGH_STATUS_CODE_TO_LOW_STATUS_CODE[high_code] + return _intermediary_low.Status(low_code, low_details) class _Kernel(object): diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py index 4e46ffd17f..ebf91d80ab 100644 --- a/src/python/grpcio/grpc/beta/_server.py +++ b/src/python/grpcio/grpc/beta/_server.py @@ -32,9 +32,11 @@ import threading from grpc._links import service +from grpc.beta import interfaces from grpc.framework.core import implementations as _core_implementations from grpc.framework.crust import implementations as _crust_implementations from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.base import base from grpc.framework.interfaces.links import utilities _DEFAULT_POOL_SIZE = 8 @@ -42,6 +44,23 @@ _DEFAULT_TIMEOUT = 300 _MAXIMUM_TIMEOUT = 24 * 60 * 60 +class _GRPCServicer(base.Servicer): + + def __init__(self, delegate): + self._delegate = delegate + + def service(self, group, method, context, output_operator): + try: + return self._delegate.service(group, method, context, output_operator) + except base.NoSuchMethodError as e: + if e.code is None and e.details is None: + raise base.NoSuchMethodError( + interfaces.StatusCode.UNIMPLEMENTED, + b'Method "%s" of service "%s" not implemented!' % (method, group)) + else: + raise + + def _disassemble(grpc_link, end_link, pool, event, grace): grpc_link.begin_stop() end_link.stop(grace).wait() @@ -99,8 +118,9 @@ def server( service_thread_pool = thread_pool assembly_thread_pool = None - servicer = _crust_implementations.servicer( - implementations, multi_implementation, service_thread_pool) + servicer = _GRPCServicer( + _crust_implementations.servicer( + implementations, multi_implementation, service_thread_pool)) grpc_link = service.service_link(request_deserializers, response_serializers) diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py new file mode 100644 index 0000000000..25e6a9c66b --- /dev/null +++ b/src/python/grpcio/grpc/beta/interfaces.py @@ -0,0 +1,54 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Constants and interfaces of the Beta API of gRPC Python.""" + +import enum + + +@enum.unique +class StatusCode(enum.Enum): + """Mirrors grpc_status_code in the C core.""" + OK = 0 + CANCELLED = 1 + UNKNOWN = 2 + INVALID_ARGUMENT = 3 + DEADLINE_EXCEEDED = 4 + NOT_FOUND = 5 + ALREADY_EXISTS = 6 + PERMISSION_DENIED = 7 + RESOURCE_EXHAUSTED = 8 + FAILED_PRECONDITION = 9 + ABORTED = 10 + OUT_OF_RANGE = 11 + UNIMPLEMENTED = 12 + INTERNAL = 13 + UNAVAILABLE = 14 + DATA_LOSS = 15 + UNAUTHENTICATED = 16 diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py index d3be3a4c4a..0f47cb48e0 100644 --- a/src/python/grpcio/grpc/framework/core/_constants.py +++ b/src/python/grpcio/grpc/framework/core/_constants.py @@ -44,14 +44,15 @@ TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = { # ticket should be sent to the other side in the event of such an # outcome. ABORTION_OUTCOME_TO_TICKET_TERMINATION = { - base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION, - base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION, - base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN, - base.Outcome.REMOTE_SHUTDOWN: None, - base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE, - base.Outcome.TRANSMISSION_FAILURE: None, - base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE, - base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE, + base.Outcome.Kind.CANCELLED: links.Ticket.Termination.CANCELLATION, + base.Outcome.Kind.EXPIRED: links.Ticket.Termination.EXPIRATION, + base.Outcome.Kind.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN, + base.Outcome.Kind.REMOTE_SHUTDOWN: None, + base.Outcome.Kind.RECEPTION_FAILURE: + links.Ticket.Termination.RECEPTION_FAILURE, + base.Outcome.Kind.TRANSMISSION_FAILURE: None, + base.Outcome.Kind.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE, + base.Outcome.Kind.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE, } INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:' diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py index 76b3534530..a346e9d478 100644 --- a/src/python/grpcio/grpc/framework/core/_context.py +++ b/src/python/grpcio/grpc/framework/core/_context.py @@ -33,6 +33,7 @@ import time # _interfaces is referenced from specification in this module. from grpc.framework.core import _interfaces # pylint: disable=unused-import +from grpc.framework.core import _utilities from grpc.framework.interfaces.base import base @@ -56,11 +57,12 @@ class OperationContext(base.OperationContext): self._transmission_manager = transmission_manager self._expiration_manager = expiration_manager - def _abort(self, outcome): + def _abort(self, outcome_kind): with self._lock: if self._termination_manager.outcome is None: + outcome = _utilities.Outcome(outcome_kind, None, None) self._termination_manager.abort(outcome) - self._transmission_manager.abort(outcome, None, None) + self._transmission_manager.abort(outcome) self._expiration_manager.terminate() def outcome(self): @@ -85,8 +87,8 @@ class OperationContext(base.OperationContext): def cancel(self): """See base.OperationContext.cancel for specification.""" - self._abort(base.Outcome.CANCELLED) + self._abort(base.Outcome.Kind.CANCELLED) def fail(self, exception): """See base.OperationContext.fail for specification.""" - self._abort(base.Outcome.LOCAL_FAILURE) + self._abort(base.Outcome.Kind.LOCAL_FAILURE) diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py index 2d7b2e2f10..8ab59dc3e5 100644 --- a/src/python/grpcio/grpc/framework/core/_emission.py +++ b/src/python/grpcio/grpc/framework/core/_emission.py @@ -30,6 +30,7 @@ """State and behavior for handling emitted values.""" from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.interfaces.base import base @@ -81,9 +82,10 @@ class EmissionManager(_interfaces.EmissionManager): payload_present and self._completion_seen or completion_present and self._completion_seen or allowance_present and allowance <= 0): - self._termination_manager.abort(base.Outcome.LOCAL_FAILURE) - self._transmission_manager.abort( - base.Outcome.LOCAL_FAILURE, None, None) + outcome = _utilities.Outcome( + base.Outcome.Kind.LOCAL_FAILURE, None, None) + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) self._expiration_manager.terminate() else: self._initial_metadata_seen |= initial_metadata_present diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py index f57cde4e58..8e07d9061e 100644 --- a/src/python/grpcio/grpc/framework/core/_end.py +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -69,7 +69,7 @@ class _Cycle(object): def _abort(operations): for operation in operations: - operation.abort(base.Outcome.LOCAL_SHUTDOWN) + operation.abort(base.Outcome.Kind.LOCAL_SHUTDOWN) def _cancel_futures(futures): @@ -90,19 +90,19 @@ def _termination_action(lock, stats, operation_id, cycle): Args: lock: A lock to hold during the termination action. - states: A mapping from base.Outcome values to integers to increment with - the outcome given to the termination action. + stats: A mapping from base.Outcome.Kind values to integers to increment + with the outcome kind given to the termination action. operation_id: The operation ID for the termination action. cycle: A _Cycle value to be updated during the termination action. Returns: - A callable that takes an operation outcome as its sole parameter and that - should be used as the termination action for the operation associated - with the given operation ID. + A callable that takes an operation outcome kind as its sole parameter and + that should be used as the termination action for the operation + associated with the given operation ID. """ - def termination_action(outcome): + def termination_action(outcome_kind): with lock: - stats[outcome] += 1 + stats[outcome_kind] += 1 cycle.operations.pop(operation_id, None) if not cycle.operations: for action in cycle.idle_actions: @@ -127,7 +127,7 @@ class _End(End): self._lock = threading.Condition() self._servicer_package = servicer_package - self._stats = {outcome: 0 for outcome in base.Outcome} + self._stats = {outcome_kind: 0 for outcome_kind in base.Outcome.Kind} self._mate = None @@ -168,7 +168,7 @@ class _End(End): def operate( self, group, method, subscription, timeout, initial_metadata=None, - payload=None, completion=None): + payload=None, completion=None, protocol_options=None): """See base.End.operate for specification.""" operation_id = uuid.uuid4() with self._lock: @@ -177,9 +177,9 @@ class _End(End): termination_action = _termination_action( self._lock, self._stats, operation_id, self._cycle) operation = _operation.invocation_operate( - operation_id, group, method, subscription, timeout, initial_metadata, - payload, completion, self._mate.accept_ticket, termination_action, - self._cycle.pool) + operation_id, group, method, subscription, timeout, protocol_options, + initial_metadata, payload, completion, self._mate.accept_ticket, + termination_action, self._cycle.pool) self._cycle.operations[operation_id] = operation return operation.context, operation.operator diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py index d8690b3a02..ded0ab6bce 100644 --- a/src/python/grpcio/grpc/framework/core/_expiration.py +++ b/src/python/grpcio/grpc/framework/core/_expiration.py @@ -32,6 +32,7 @@ import time from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.foundation import later from grpc.framework.interfaces.base import base @@ -73,7 +74,8 @@ class _ExpirationManager(_interfaces.ExpirationManager): if self._future is not None and index == self._index: self._future = None self._termination_manager.expire() - self._transmission_manager.abort(base.Outcome.EXPIRED, None, None) + self._transmission_manager.abort( + _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None)) return expire def start(self): diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py index 766d57f931..9a7959a2dd 100644 --- a/src/python/grpcio/grpc/framework/core/_ingestion.py +++ b/src/python/grpcio/grpc/framework/core/_ingestion.py @@ -35,6 +35,7 @@ import enum from grpc.framework.core import _constants from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.foundation import abandonment from grpc.framework.foundation import callable_util from grpc.framework.interfaces.base import base @@ -46,7 +47,7 @@ _INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' class _SubscriptionCreation( collections.namedtuple( '_SubscriptionCreation', - ('kind', 'subscription', 'code', 'message',))): + ('kind', 'subscription', 'code', 'details',))): """A sum type for the outcome of ingestion initialization. Attributes: @@ -56,7 +57,7 @@ class _SubscriptionCreation( code: A code value to be sent to the other side of the operation along with an indication that the operation is being aborted due to an error on the remote side of the operation. Only present if kind is Kind.REMOTE_ERROR. - message: A message value to be sent to the other side of the operation + details: A details value to be sent to the other side of the operation along with an indication that the operation is being aborted due to an error on the remote side of the operation. Only present if kind is Kind.REMOTE_ERROR. @@ -190,11 +191,13 @@ class _IngestionManager(_interfaces.IngestionManager): self._pending_payloads = None self._pending_completion = None - def _abort_and_notify(self, outcome, code, message): + def _abort_and_notify(self, outcome_kind, code, details): self._abort_internal_only() - self._termination_manager.abort(outcome) - self._transmission_manager.abort(outcome, code, message) - self._expiration_manager.terminate() + if self._termination_manager.outcome is None: + outcome = _utilities.Outcome(outcome_kind, code, details) + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() def _operator_next(self): """Computes the next step for full-subscription ingestion. @@ -250,12 +253,13 @@ class _IngestionManager(_interfaces.IngestionManager): else: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + self._abort_and_notify( + base.Outcome.Kind.LOCAL_FAILURE, None, None) return else: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) return def _operator_post_create(self, subscription): @@ -279,17 +283,18 @@ class _IngestionManager(_interfaces.IngestionManager): if outcome.return_value is None: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR: code = outcome.return_value.code - message = outcome.return_value.message + details = outcome.return_value.details with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.REMOTE_FAILURE, code, message) + self._abort_and_notify( + base.Outcome.Kind.REMOTE_FAILURE, code, details) elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL: self._operator_post_create(outcome.return_value.subscription) else: diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py index deb5f34f9b..7ac440722c 100644 --- a/src/python/grpcio/grpc/framework/core/_interfaces.py +++ b/src/python/grpcio/grpc/framework/core/_interfaces.py @@ -50,13 +50,13 @@ class TerminationManager(object): If the operation has already terminated the callback will not be called. Args: - callback: A callable that will be passed an interfaces.Outcome value. + callback: A callable that will be passed a base.Outcome value. Returns: None if the operation has not yet terminated and the passed callback will - be called when it does, or a base.Outcome value describing the operation - termination if the operation has terminated and the callback will not be - called as a result of this method call. + be called when it does, or a base.Outcome value describing the + operation termination if the operation has terminated and the callback + will not be called as a result of this method call. """ raise NotImplementedError() @@ -76,8 +76,13 @@ class TerminationManager(object): raise NotImplementedError() @abc.abstractmethod - def reception_complete(self): - """Indicates that reception from the other side is complete.""" + def reception_complete(self, code, details): + """Indicates that reception from the other side is complete. + + Args: + code: An application-specific code value. + details: An application-specific details value. + """ raise NotImplementedError() @abc.abstractmethod @@ -95,7 +100,7 @@ class TerminationManager(object): """Indicates that the operation must abort for the indicated reason. Args: - outcome: An interfaces.Outcome indicating operation abortion. + outcome: A base.Outcome indicating operation abortion. """ raise NotImplementedError() @@ -106,8 +111,8 @@ class TransmissionManager(object): @abc.abstractmethod def kick_off( - self, group, method, timeout, initial_metadata, payload, completion, - allowance): + self, group, method, timeout, protocol_options, initial_metadata, + payload, completion, allowance): """Transmits the values associated with operation invocation.""" raise NotImplementedError() @@ -155,19 +160,13 @@ class TransmissionManager(object): raise NotImplementedError() @abc.abstractmethod - def abort(self, outcome, code, message): + def abort(self, outcome): """Indicates that the operation has aborted. Args: - outcome: An interfaces.Outcome for the operation. If None, indicates that - the operation abortion should not be communicated to the other side of - the operation. - code: A code value to communicate to the other side of the operation - along with indication of operation abortion. May be None, and has no - effect if outcome is None. - message: A message value to communicate to the other side of the - operation along with indication of operation abortion. May be None, and - has no effect if outcome is None. + outcome: A base.Outcome for the operation. If None, indicates that the + operation abortion should not be communicated to the other side of the + operation. """ raise NotImplementedError() @@ -279,8 +278,7 @@ class ReceptionManager(object): """Handle a ticket from the other side of the operation. Args: - ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket - appropriate to this end of the operation and this object. + ticket: A links.Ticket for the operation. """ raise NotImplementedError() @@ -305,10 +303,10 @@ class Operation(object): raise NotImplementedError() @abc.abstractmethod - def abort(self, outcome): + def abort(self, outcome_kind): """Aborts the operation. Args: - outcome: A base.Outcome value indicating operation abortion. + outcome_kind: A base.Outcome.Kind value indicating operation abortion. """ raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py index cc873c03f9..d4eacc5a3f 100644 --- a/src/python/grpcio/grpc/framework/core/_operation.py +++ b/src/python/grpcio/grpc/framework/core/_operation.py @@ -31,7 +31,6 @@ import threading -# _utilities is referenced from specification in this module. from grpc.framework.core import _context from grpc.framework.core import _emission from grpc.framework.core import _expiration @@ -40,7 +39,7 @@ from grpc.framework.core import _interfaces from grpc.framework.core import _reception from grpc.framework.core import _termination from grpc.framework.core import _transmission -from grpc.framework.core import _utilities # pylint: disable=unused-import +from grpc.framework.core import _utilities class _EasyOperation(_interfaces.Operation): @@ -75,17 +74,19 @@ class _EasyOperation(_interfaces.Operation): with self._lock: self._reception_manager.receive_ticket(ticket) - def abort(self, outcome): + def abort(self, outcome_kind): with self._lock: if self._termination_manager.outcome is None: + outcome = _utilities.Outcome(outcome_kind, None, None) self._termination_manager.abort(outcome) - self._transmission_manager.abort(outcome, None, None) + self._transmission_manager.abort(outcome) self._expiration_manager.terminate() def invocation_operate( - operation_id, group, method, subscription, timeout, initial_metadata, - payload, completion, ticket_sink, termination_action, pool): + operation_id, group, method, subscription, timeout, protocol_options, + initial_metadata, payload, completion, ticket_sink, termination_action, + pool): """Constructs objects necessary for front-side operation management. Args: @@ -95,6 +96,8 @@ def invocation_operate( subscription: A base.Subscription describing the customer's interest in the results of the operation. timeout: A length of time in seconds to allow for the operation. + protocol_options: A transport-specific, application-specific, and/or + protocol-specific value relating to the invocation. May be None. initial_metadata: An initial metadata value to be sent to the other side of the operation. May be None if the initial metadata will be passed later or if there will be no initial metadata passed at all. @@ -136,7 +139,8 @@ def invocation_operate( emission_manager.set_ingestion_manager(ingestion_manager) transmission_manager.kick_off( - group, method, timeout, initial_metadata, payload, completion, None) + group, method, timeout, protocol_options, initial_metadata, payload, + completion, None) return _EasyOperation( lock, termination_manager, transmission_manager, expiration_manager, diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py index 1cebe3874b..d374cf0c8c 100644 --- a/src/python/grpcio/grpc/framework/core/_reception.py +++ b/src/python/grpcio/grpc/framework/core/_reception.py @@ -30,21 +30,26 @@ """State and behavior for ticket reception.""" from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.interfaces.base import base from grpc.framework.interfaces.base import utilities from grpc.framework.interfaces.links import links -_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = { - links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED, - links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED, - links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN, - links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE, +_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND = { + links.Ticket.Termination.CANCELLATION: base.Outcome.Kind.CANCELLED, + links.Ticket.Termination.EXPIRATION: base.Outcome.Kind.EXPIRED, + links.Ticket.Termination.SHUTDOWN: base.Outcome.Kind.REMOTE_SHUTDOWN, + links.Ticket.Termination.RECEPTION_FAILURE: + base.Outcome.Kind.RECEPTION_FAILURE, links.Ticket.Termination.TRANSMISSION_FAILURE: - base.Outcome.TRANSMISSION_FAILURE, - links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE, - links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.LOCAL_FAILURE, + base.Outcome.Kind.TRANSMISSION_FAILURE, + links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.Kind.REMOTE_FAILURE, + links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.Kind.LOCAL_FAILURE, } +_RECEPTION_FAILURE_OUTCOME = _utilities.Outcome( + base.Outcome.Kind.RECEPTION_FAILURE, None, None) + class ReceptionManager(_interfaces.ReceptionManager): """A ReceptionManager based around a _Receiver passed to it.""" @@ -73,7 +78,7 @@ class ReceptionManager(_interfaces.ReceptionManager): self._aborted = True if self._termination_manager.outcome is None: self._termination_manager.abort(outcome) - self._transmission_manager.abort(None, None, None) + self._transmission_manager.abort(None) self._expiration_manager.terminate() def _sequence_failure(self, ticket): @@ -102,6 +107,7 @@ class ReceptionManager(_interfaces.ReceptionManager): else: completion = utilities.completion( ticket.terminal_metadata, ticket.code, ticket.message) + self._termination_manager.reception_complete(ticket.code, ticket.message) self._ingestion_manager.advance( ticket.initial_metadata, ticket.payload, completion, ticket.allowance) if ticket.allowance is not None: @@ -129,10 +135,12 @@ class ReceptionManager(_interfaces.ReceptionManager): if self._aborted: return elif self._sequence_failure(ticket): - self._abort(base.Outcome.RECEPTION_FAILURE) + self._abort(_RECEPTION_FAILURE_OUTCOME) elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION): - outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination] - self._abort(outcome) + outcome_kind = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND[ + ticket.termination] + self._abort( + _utilities.Outcome(outcome_kind, ticket.code, ticket.message)) elif ticket.sequence_number == self._lowest_unseen_sequence_number: self._process(ticket) else: diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py index ad9f6123d8..bdb9147e5b 100644 --- a/src/python/grpcio/grpc/framework/core/_termination.py +++ b/src/python/grpcio/grpc/framework/core/_termination.py @@ -33,6 +33,7 @@ import abc from grpc.framework.core import _constants from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.foundation import callable_util from grpc.framework.interfaces.base import base @@ -74,7 +75,8 @@ class _TerminationManager(TerminationManager): predicate: One of _invocation_completion_predicate or _service_completion_predicate to be used to determine when the operation has completed. - action: A behavior to pass the operation outcome on operation termination. + action: A behavior to pass the operation outcome's kind on operation + termination. pool: A thread pool. """ self._predicate = predicate @@ -82,14 +84,19 @@ class _TerminationManager(TerminationManager): self._pool = pool self._expiration_manager = None - self.outcome = None self._callbacks = [] + self._code = None + self._details = None self._emission_complete = False self._transmission_complete = False self._reception_complete = False self._ingestion_complete = False + # The None-ness of outcome is the operation-wide record of whether and how + # the operation has terminated. + self.outcome = None + def set_expiration_manager(self, expiration_manager): self._expiration_manager = expiration_manager @@ -106,8 +113,10 @@ class _TerminationManager(TerminationManager): act = callable_util.with_exceptions_logged( self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) - if outcome is base.Outcome.LOCAL_FAILURE: - self._pool.submit(act, outcome) + # TODO(issue 3202): Don't call the local application's callbacks if it has + # previously shown a programming defect. + if False and outcome.kind is base.Outcome.Kind.LOCAL_FAILURE: + self._pool.submit(act, base.Outcome.Kind.LOCAL_FAILURE) else: def call_callbacks_and_act(callbacks, outcome): for callback in callbacks: @@ -115,9 +124,11 @@ class _TerminationManager(TerminationManager): callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE, outcome) if callback_outcome.exception is not None: - outcome = base.Outcome.LOCAL_FAILURE + act_outcome_kind = base.Outcome.Kind.LOCAL_FAILURE break - act(outcome) + else: + act_outcome_kind = outcome.kind + act(act_outcome_kind) self._pool.submit( callable_util.with_exceptions_logged( @@ -132,7 +143,9 @@ class _TerminationManager(TerminationManager): if self._predicate( self._emission_complete, self._transmission_complete, self._reception_complete, self._ingestion_complete): - self._terminate_and_notify(base.Outcome.COMPLETED) + self._terminate_and_notify( + _utilities.Outcome( + base.Outcome.Kind.COMPLETED, self._code, self._details)) return True else: return False @@ -163,10 +176,12 @@ class _TerminationManager(TerminationManager): else: return False - def reception_complete(self): + def reception_complete(self, code, details): """See superclass method for specification.""" if self.outcome is None: self._reception_complete = True + self._code = code + self._details = details self._perhaps_complete() def ingestion_complete(self): @@ -177,7 +192,8 @@ class _TerminationManager(TerminationManager): def expire(self): """See _interfaces.TerminationManager.expire for specification.""" - self._terminate_internal_only(base.Outcome.EXPIRED) + self._terminate_internal_only( + _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None)) def abort(self, outcome): """See _interfaces.TerminationManager.abort for specification.""" diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py index 202a71dd71..65b12c4160 100644 --- a/src/python/grpcio/grpc/framework/core/_transmission.py +++ b/src/python/grpcio/grpc/framework/core/_transmission.py @@ -34,12 +34,16 @@ import enum from grpc.framework.core import _constants from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.foundation import callable_util from grpc.framework.interfaces.base import base from grpc.framework.interfaces.links import links _TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' +_TRANSMISSION_FAILURE_OUTCOME = _utilities.Outcome( + base.Outcome.Kind.TRANSMISSION_FAILURE, None, None) + def _explode_completion(completion): if completion is None: @@ -194,7 +198,7 @@ class TransmissionManager(_interfaces.TransmissionManager): with self._lock: self._abort = _ABORTED_NO_NOTIFY if self._termination_manager.outcome is None: - self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE) + self._termination_manager.abort(_TRANSMISSION_FAILURE_OUTCOME) self._expiration_manager.terminate() return @@ -203,18 +207,19 @@ class TransmissionManager(_interfaces.TransmissionManager): self._transmitting = True def kick_off( - self, group, method, timeout, initial_metadata, payload, completion, - allowance): + self, group, method, timeout, protocol_options, initial_metadata, + payload, completion, allowance): """See _interfaces.TransmissionManager.kickoff for specification.""" # TODO(nathaniel): Support other subscriptions. subscription = links.Ticket.Subscription.FULL terminal_metadata, code, message, termination = _explode_completion( completion) self._remote_allowance = 1 if payload is None else 0 + protocol = links.Protocol(links.Protocol.Kind.CALL_OPTION, protocol_options) ticket = links.Ticket( self._operation_id, 0, group, method, subscription, timeout, allowance, initial_metadata, payload, terminal_metadata, code, message, - termination, None) + termination, protocol) self._lowest_unused_sequence_number = 1 self._transmit(ticket) @@ -307,19 +312,24 @@ class TransmissionManager(_interfaces.TransmissionManager): self._remote_complete = True self._local_allowance = 0 - def abort(self, outcome, code, message): + def abort(self, outcome): """See _interfaces.TransmissionManager.abort for specification.""" if self._abort.kind is _Abort.Kind.NOT_ABORTED: - termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get( - outcome) - if termination is None: + if outcome is None: self._abort = _ABORTED_NO_NOTIFY - elif self._transmitting: - self._abort = _Abort( - _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, code, message) else: - ticket = links.Ticket( - self._operation_id, self._lowest_unused_sequence_number, None, - None, None, None, None, None, None, None, code, message, - termination, None) - self._transmit(ticket) + termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get( + outcome.kind) + if termination is None: + self._abort = _ABORTED_NO_NOTIFY + elif self._transmitting: + self._abort = _Abort( + _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, outcome.code, + outcome.details) + else: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, + None, None, None, None, None, None, None, outcome.code, + outcome.details, termination, None) + self._transmit(ticket) + self._abort = _ABORTED_NO_NOTIFY diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py index 5b0d798751..abedc727e4 100644 --- a/src/python/grpcio/grpc/framework/core/_utilities.py +++ b/src/python/grpcio/grpc/framework/core/_utilities.py @@ -31,6 +31,8 @@ import collections +from grpc.framework.interfaces.base import base + class ServicerPackage( collections.namedtuple( @@ -44,3 +46,9 @@ class ServicerPackage( maximum_timeout: A float indicating the maximum length of time in seconds to allow for an operation. """ + + +class Outcome( + base.Outcome, + collections.namedtuple('Outcome', ('kind', 'code', 'details',))): + """A trivial implementation of base.Outcome.""" diff --git a/src/python/grpcio/grpc/framework/crust/_calls.py b/src/python/grpcio/grpc/framework/crust/_calls.py index 4c6bf16f43..68db9fab8e 100644 --- a/src/python/grpcio/grpc/framework/crust/_calls.py +++ b/src/python/grpcio/grpc/framework/crust/_calls.py @@ -38,12 +38,14 @@ _ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!' _EMPTY_COMPLETION = utilities.completion(None, None, None) -def _invoke(end, group, method, timeout, initial_metadata, payload, complete): +def _invoke( + end, group, method, timeout, protocol_options, initial_metadata, payload, + complete): rendezvous = _control.Rendezvous(None, None) operation_context, operator = end.operate( group, method, utilities.full_subscription(rendezvous), timeout, - initial_metadata=initial_metadata, payload=payload, - completion=_EMPTY_COMPLETION if complete else None) + protocol_options=protocol_options, initial_metadata=initial_metadata, + payload=payload, completion=_EMPTY_COMPLETION if complete else None) rendezvous.set_operator_and_context(operator, operation_context) outcome = operation_context.add_termination_callback(rendezvous.set_outcome) if outcome is not None: @@ -93,36 +95,43 @@ def _event_return_stream( def blocking_unary_unary( - end, group, method, timeout, with_call, initial_metadata, payload): + end, group, method, timeout, with_call, protocol_options, initial_metadata, + payload): """Services in a blocking fashion a unary-unary servicer method.""" rendezvous, unused_operation_context, unused_outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) if with_call: return next(rendezvous), rendezvous else: return next(rendezvous) -def future_unary_unary(end, group, method, timeout, initial_metadata, payload): +def future_unary_unary( + end, group, method, timeout, protocol_options, initial_metadata, payload): """Services a value-in value-out servicer method by returning a Future.""" rendezvous, unused_operation_context, unused_outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) return rendezvous -def inline_unary_stream(end, group, method, timeout, initial_metadata, payload): +def inline_unary_stream( + end, group, method, timeout, protocol_options, initial_metadata, payload): """Services a value-in stream-out servicer method.""" rendezvous, unused_operation_context, unused_outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) return rendezvous def blocking_stream_unary( - end, group, method, timeout, with_call, initial_metadata, payload_iterator, - pool): + end, group, method, timeout, with_call, protocol_options, initial_metadata, + payload_iterator, pool): """Services in a blocking fashion a stream-in value-out servicer method.""" rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) if outcome is None: def in_pool(): for payload in payload_iterator: @@ -141,10 +150,12 @@ def blocking_stream_unary( def future_stream_unary( - end, group, method, timeout, initial_metadata, payload_iterator, pool): + end, group, method, timeout, protocol_options, initial_metadata, + payload_iterator, pool): """Services a stream-in value-out servicer method by returning a Future.""" rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) if outcome is None: def in_pool(): for payload in payload_iterator: @@ -155,10 +166,12 @@ def future_stream_unary( def inline_stream_stream( - end, group, method, timeout, initial_metadata, payload_iterator, pool): + end, group, method, timeout, protocol_options, initial_metadata, + payload_iterator, pool): """Services a stream-in stream-out servicer method.""" rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) if outcome is None: def in_pool(): for payload in payload_iterator: @@ -169,36 +182,40 @@ def inline_stream_stream( def event_unary_unary( - end, group, method, timeout, initial_metadata, payload, receiver, - abortion_callback, pool): + end, group, method, timeout, protocol_options, initial_metadata, payload, + receiver, abortion_callback, pool): rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) return _event_return_unary( receiver, abortion_callback, rendezvous, operation_context, outcome, pool) def event_unary_stream( - end, group, method, timeout, initial_metadata, payload, + end, group, method, timeout, protocol_options, initial_metadata, payload, receiver, abortion_callback, pool): rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, payload, True) + end, group, method, timeout, protocol_options, initial_metadata, payload, + True) return _event_return_stream( receiver, abortion_callback, rendezvous, operation_context, outcome, pool) def event_stream_unary( - end, group, method, timeout, initial_metadata, receiver, abortion_callback, - pool): + end, group, method, timeout, protocol_options, initial_metadata, receiver, + abortion_callback, pool): rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) return _event_return_unary( receiver, abortion_callback, rendezvous, operation_context, outcome, pool) def event_stream_stream( - end, group, method, timeout, initial_metadata, receiver, abortion_callback, - pool): + end, group, method, timeout, protocol_options, initial_metadata, receiver, + abortion_callback, pool): rendezvous, operation_context, outcome = _invoke( - end, group, method, timeout, initial_metadata, None, False) + end, group, method, timeout, protocol_options, initial_metadata, None, + False) return _event_return_stream( receiver, abortion_callback, rendezvous, operation_context, outcome, pool) diff --git a/src/python/grpcio/grpc/framework/crust/_control.py b/src/python/grpcio/grpc/framework/crust/_control.py index 01de3c15bd..e02a41d720 100644 --- a/src/python/grpcio/grpc/framework/crust/_control.py +++ b/src/python/grpcio/grpc/framework/crust/_control.py @@ -110,30 +110,31 @@ class _Termination( _NOT_TERMINATED = _Termination(False, None, None) -_OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR = { - base.Outcome.COMPLETED: lambda *unused_args: _Termination(True, None, None), - base.Outcome.CANCELLED: lambda *args: _Termination( +_OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR = { + base.Outcome.Kind.COMPLETED: lambda *unused_args: _Termination( + True, None, None), + base.Outcome.Kind.CANCELLED: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.CANCELLED, *args), face.CancellationError(*args)), - base.Outcome.EXPIRED: lambda *args: _Termination( + base.Outcome.Kind.EXPIRED: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.EXPIRED, *args), face.ExpirationError(*args)), - base.Outcome.LOCAL_SHUTDOWN: lambda *args: _Termination( + base.Outcome.Kind.LOCAL_SHUTDOWN: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args), face.LocalShutdownError(*args)), - base.Outcome.REMOTE_SHUTDOWN: lambda *args: _Termination( + base.Outcome.Kind.REMOTE_SHUTDOWN: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args), face.RemoteShutdownError(*args)), - base.Outcome.RECEPTION_FAILURE: lambda *args: _Termination( + base.Outcome.Kind.RECEPTION_FAILURE: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args), face.NetworkError(*args)), - base.Outcome.TRANSMISSION_FAILURE: lambda *args: _Termination( + base.Outcome.Kind.TRANSMISSION_FAILURE: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args), face.NetworkError(*args)), - base.Outcome.LOCAL_FAILURE: lambda *args: _Termination( + base.Outcome.Kind.LOCAL_FAILURE: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args), face.LocalError(*args)), - base.Outcome.REMOTE_FAILURE: lambda *args: _Termination( + base.Outcome.Kind.REMOTE_FAILURE: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args), face.RemoteError(*args)), } @@ -247,13 +248,17 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call): else: initial_metadata = self._up_initial_metadata.value if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED: - terminal_metadata, code, details = None, None, None + terminal_metadata = None else: terminal_metadata = self._up_completion.value.terminal_metadata + if outcome.kind is base.Outcome.Kind.COMPLETED: code = self._up_completion.value.code details = self._up_completion.value.message - self._termination = _OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR[ - outcome](initial_metadata, terminal_metadata, code, details) + else: + code = outcome.code + details = outcome.details + self._termination = _OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR[ + outcome.kind](initial_metadata, terminal_metadata, code, details) self._condition.notify_all() @@ -437,6 +442,10 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call): else: return self._termination.abortion + def protocol_context(self): + with self._condition: + raise NotImplementedError('TODO: protocol context implementation!') + def initial_metadata(self): with self._condition: while True: diff --git a/src/python/grpcio/grpc/framework/crust/_service.py b/src/python/grpcio/grpc/framework/crust/_service.py index 6ff7249e75..f1855c2f47 100644 --- a/src/python/grpcio/grpc/framework/crust/_service.py +++ b/src/python/grpcio/grpc/framework/crust/_service.py @@ -52,6 +52,9 @@ class _ServicerContext(face.ServicerContext): def cancel(self): self._rendezvous.cancel() + def protocol_context(self): + return self._rendezvous.protocol_context() + def invocation_metadata(self): return self._rendezvous.initial_metadata() diff --git a/src/python/grpcio/grpc/framework/crust/implementations.py b/src/python/grpcio/grpc/framework/crust/implementations.py index d38fab8ba0..4ebc4e9ae8 100644 --- a/src/python/grpcio/grpc/framework/crust/implementations.py +++ b/src/python/grpcio/grpc/framework/crust/implementations.py @@ -66,22 +66,23 @@ class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable): self._pool = pool def __call__( - self, request, timeout, metadata=None, with_call=False): + self, request, timeout, metadata=None, with_call=False, + protocol_options=None): return _calls.blocking_unary_unary( self._end, self._group, self._method, timeout, with_call, - metadata, request) + protocol_options, metadata, request) - def future(self, request, timeout, metadata=None): + def future(self, request, timeout, metadata=None, protocol_options=None): return _calls.future_unary_unary( - self._end, self._group, self._method, timeout, metadata, - request) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request) def event( self, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_unary_unary( - self._end, self._group, self._method, timeout, metadata, - request, receiver, abortion_callback, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request, receiver, abortion_callback, self._pool) class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable): @@ -92,17 +93,17 @@ class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable): self._method = method self._pool = pool - def __call__(self, request, timeout, metadata=None): + def __call__(self, request, timeout, metadata=None, protocol_options=None): return _calls.inline_unary_stream( - self._end, self._group, self._method, timeout, metadata, - request) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request) def event( self, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_unary_stream( - self._end, self._group, self._method, timeout, metadata, - request, receiver, abortion_callback, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request, receiver, abortion_callback, self._pool) class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable): @@ -115,21 +116,23 @@ class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable): def __call__( self, request_iterator, timeout, metadata=None, - with_call=False): + with_call=False, protocol_options=None): return _calls.blocking_stream_unary( self._end, self._group, self._method, timeout, with_call, - metadata, request_iterator, self._pool) + protocol_options, metadata, request_iterator, self._pool) - def future(self, request_iterator, timeout, metadata=None): + def future( + self, request_iterator, timeout, metadata=None, protocol_options=None): return _calls.future_stream_unary( - self._end, self._group, self._method, timeout, metadata, - request_iterator, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request_iterator, self._pool) def event( - self, receiver, abortion_callback, timeout, metadata=None): + self, receiver, abortion_callback, timeout, metadata=None, + protocol_options=None): return _calls.event_stream_unary( - self._end, self._group, self._method, timeout, metadata, - receiver, abortion_callback, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, receiver, abortion_callback, self._pool) class _StreamStreamMultiCallable(face.StreamStreamMultiCallable): @@ -140,16 +143,18 @@ class _StreamStreamMultiCallable(face.StreamStreamMultiCallable): self._method = method self._pool = pool - def __call__(self, request_iterator, timeout, metadata=None): + def __call__( + self, request_iterator, timeout, metadata=None, protocol_options=None): return _calls.inline_stream_stream( - self._end, self._group, self._method, timeout, metadata, - request_iterator, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, request_iterator, self._pool) def event( - self, receiver, abortion_callback, timeout, metadata=None): + self, receiver, abortion_callback, timeout, metadata=None, + protocol_options=None): return _calls.event_stream_stream( - self._end, self._group, self._method, timeout, metadata, - receiver, abortion_callback, self._pool) + self._end, self._group, self._method, timeout, protocol_options, + metadata, receiver, abortion_callback, self._pool) class _GenericStub(face.GenericStub): @@ -161,66 +166,70 @@ class _GenericStub(face.GenericStub): def blocking_unary_unary( self, group, method, request, timeout, metadata=None, - with_call=None): + with_call=None, protocol_options=None): return _calls.blocking_unary_unary( - self._end, group, method, timeout, with_call, metadata, - request) + self._end, group, method, timeout, with_call, protocol_options, + metadata, request) def future_unary_unary( - self, group, method, request, timeout, metadata=None): + self, group, method, request, timeout, metadata=None, + protocol_options=None): return _calls.future_unary_unary( - self._end, group, method, timeout, metadata, request) + self._end, group, method, timeout, protocol_options, metadata, request) def inline_unary_stream( - self, group, method, request, timeout, metadata=None): + self, group, method, request, timeout, metadata=None, + protocol_options=None): return _calls.inline_unary_stream( - self._end, group, method, timeout, metadata, request) + self._end, group, method, timeout, protocol_options, metadata, request) def blocking_stream_unary( self, group, method, request_iterator, timeout, metadata=None, - with_call=None): + with_call=None, protocol_options=None): return _calls.blocking_stream_unary( - self._end, group, method, timeout, with_call, metadata, - request_iterator, self._pool) + self._end, group, method, timeout, with_call, protocol_options, + metadata, request_iterator, self._pool) def future_stream_unary( - self, group, method, request_iterator, timeout, metadata=None): + self, group, method, request_iterator, timeout, metadata=None, + protocol_options=None): return _calls.future_stream_unary( - self._end, group, method, timeout, metadata, + self._end, group, method, timeout, protocol_options, metadata, request_iterator, self._pool) def inline_stream_stream( - self, group, method, request_iterator, timeout, metadata=None): + self, group, method, request_iterator, timeout, metadata=None, + protocol_options=None): return _calls.inline_stream_stream( - self._end, group, method, timeout, metadata, + self._end, group, method, timeout, protocol_options, metadata, request_iterator, self._pool) def event_unary_unary( self, group, method, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_unary_unary( - self._end, group, method, timeout, metadata, request, + self._end, group, method, timeout, protocol_options, metadata, request, receiver, abortion_callback, self._pool) def event_unary_stream( self, group, method, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_unary_stream( - self._end, group, method, timeout, metadata, request, + self._end, group, method, timeout, protocol_options, metadata, request, receiver, abortion_callback, self._pool) def event_stream_unary( self, group, method, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_stream_unary( - self._end, group, method, timeout, metadata, receiver, + self._end, group, method, timeout, protocol_options, metadata, receiver, abortion_callback, self._pool) def event_stream_stream( self, group, method, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): return _calls.event_stream_stream( - self._end, group, method, timeout, metadata, receiver, + self._end, group, method, timeout, protocol_options, metadata, receiver, abortion_callback, self._pool) def unary_unary(self, group, method): diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py index bc52efb4c5..013e7c66f2 100644 --- a/src/python/grpcio/grpc/framework/interfaces/base/base.py +++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py @@ -40,7 +40,7 @@ applications choose. # threading is referenced from specification in this module. import abc import enum -import threading +import threading # pylint: disable=unused-import # abandonment is referenced from specification in this module. from grpc.framework.foundation import abandonment # pylint: disable=unused-import @@ -69,19 +69,30 @@ class NoSuchMethodError(Exception): self.details = details -@enum.unique -class Outcome(enum.Enum): - """Operation outcomes.""" +class Outcome(object): + """The outcome of an operation. - COMPLETED = 'completed' - CANCELLED = 'cancelled' - EXPIRED = 'expired' - LOCAL_SHUTDOWN = 'local shutdown' - REMOTE_SHUTDOWN = 'remote shutdown' - RECEPTION_FAILURE = 'reception failure' - TRANSMISSION_FAILURE = 'transmission failure' - LOCAL_FAILURE = 'local failure' - REMOTE_FAILURE = 'remote failure' + Attributes: + kind: A Kind value coarsely identifying how the operation terminated. + code: An application-specific code value or None if no such value was + provided. + details: An application-specific details value or None if no such value was + provided. + """ + + @enum.unique + class Kind(enum.Enum): + """Ways in which an operation can terminate.""" + + COMPLETED = 'completed' + CANCELLED = 'cancelled' + EXPIRED = 'expired' + LOCAL_SHUTDOWN = 'local shutdown' + REMOTE_SHUTDOWN = 'remote shutdown' + RECEPTION_FAILURE = 'reception failure' + TRANSMISSION_FAILURE = 'transmission failure' + LOCAL_FAILURE = 'local failure' + REMOTE_FAILURE = 'remote failure' class Completion(object): @@ -263,7 +274,7 @@ class End(object): @abc.abstractmethod def operate( self, group, method, subscription, timeout, initial_metadata=None, - payload=None, completion=None): + payload=None, completion=None, protocol_options=None): """Commences an operation. Args: @@ -279,6 +290,8 @@ class End(object): payload: An initial payload for the operation. completion: A Completion value indicating the end of transmission to the other side of the operation. + protocol_options: A value specified by the provider of a Base interface + implementation affording custom state and behavior. Returns: A pair of objects affording information about the operation and action @@ -294,8 +307,8 @@ class End(object): """Reports the number of terminated operations broken down by outcome. Returns: - A dictionary from Outcome value to an integer identifying the number - of operations that terminated with that outcome. + A dictionary from Outcome.Kind value to an integer identifying the number + of operations that terminated with that outcome kind. """ raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py index 948e7505b6..bc9a434a76 100644 --- a/src/python/grpcio/grpc/framework/interfaces/face/face.py +++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py @@ -184,6 +184,16 @@ class RpcContext(object): """ raise NotImplementedError() + @abc.abstractmethod + def protocol_context(self): + """Accesses a custom object specified by an implementation provider. + + Returns: + A value specified by the provider of a Face interface implementation + affording custom state and behavior. + """ + raise NotImplementedError() + class Call(RpcContext): """Invocation-side utility object for an RPC.""" @@ -354,7 +364,8 @@ class UnaryUnaryMultiCallable(object): @abc.abstractmethod def __call__( - self, request, timeout, metadata=None, with_call=False): + self, request, timeout, metadata=None, with_call=False, + protocol_options=None): """Synchronously invokes the underlying RPC. Args: @@ -364,6 +375,8 @@ class UnaryUnaryMultiCallable(object): the RPC. with_call: Whether or not to include return a Call for the RPC in addition to the reponse. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -375,7 +388,7 @@ class UnaryUnaryMultiCallable(object): raise NotImplementedError() @abc.abstractmethod - def future(self, request, timeout, metadata=None): + def future(self, request, timeout, metadata=None, protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -383,6 +396,8 @@ class UnaryUnaryMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and a future.Future. In the @@ -395,7 +410,7 @@ class UnaryUnaryMultiCallable(object): @abc.abstractmethod def event( self, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -406,6 +421,8 @@ class UnaryUnaryMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A Call for the RPC. @@ -418,7 +435,7 @@ class UnaryStreamMultiCallable(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def __call__(self, request, timeout, metadata=None): + def __call__(self, request, timeout, metadata=None, protocol_options=None): """Invokes the underlying RPC. Args: @@ -426,6 +443,8 @@ class UnaryStreamMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and an iterator of response @@ -437,7 +456,7 @@ class UnaryStreamMultiCallable(object): @abc.abstractmethod def event( self, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -448,6 +467,8 @@ class UnaryStreamMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A Call object for the RPC. @@ -462,7 +483,7 @@ class StreamUnaryMultiCallable(object): @abc.abstractmethod def __call__( self, request_iterator, timeout, metadata=None, - with_call=False): + with_call=False, protocol_options=None): """Synchronously invokes the underlying RPC. Args: @@ -472,6 +493,8 @@ class StreamUnaryMultiCallable(object): the RPC. with_call: Whether or not to include return a Call for the RPC in addition to the reponse. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -483,7 +506,8 @@ class StreamUnaryMultiCallable(object): raise NotImplementedError() @abc.abstractmethod - def future(self, request_iterator, timeout, metadata=None): + def future( + self, request_iterator, timeout, metadata=None, protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -491,6 +515,8 @@ class StreamUnaryMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and a future.Future. In the @@ -502,7 +528,8 @@ class StreamUnaryMultiCallable(object): @abc.abstractmethod def event( - self, receiver, abortion_callback, timeout, metadata=None): + self, receiver, abortion_callback, timeout, metadata=None, + protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -512,6 +539,8 @@ class StreamUnaryMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A single object that is both a Call object for the RPC and a @@ -525,7 +554,8 @@ class StreamStreamMultiCallable(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def __call__(self, request_iterator, timeout, metadata=None): + def __call__( + self, request_iterator, timeout, metadata=None, protocol_options=None): """Invokes the underlying RPC. Args: @@ -533,6 +563,8 @@ class StreamStreamMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and an iterator of response @@ -543,7 +575,8 @@ class StreamStreamMultiCallable(object): @abc.abstractmethod def event( - self, receiver, abortion_callback, timeout, metadata=None): + self, receiver, abortion_callback, timeout, metadata=None, + protocol_options=None): """Asynchronously invokes the underlying RPC. Args: @@ -553,6 +586,8 @@ class StreamStreamMultiCallable(object): timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A single object that is both a Call object for the RPC and a @@ -646,7 +681,7 @@ class GenericStub(object): @abc.abstractmethod def blocking_unary_unary( self, group, method, request, timeout, metadata=None, - with_call=False): + with_call=False, protocol_options=None): """Invokes a unary-request-unary-response method. This method blocks until either returning the response value of the RPC @@ -661,6 +696,8 @@ class GenericStub(object): metadata: A metadata value to be passed to the service-side of the RPC. with_call: Whether or not to include return a Call for the RPC in addition to the reponse. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -673,7 +710,8 @@ class GenericStub(object): @abc.abstractmethod def future_unary_unary( - self, group, method, request, timeout, metadata=None): + self, group, method, request, timeout, metadata=None, + protocol_options=None): """Invokes a unary-request-unary-response method. Args: @@ -682,6 +720,8 @@ class GenericStub(object): request: The request value for the RPC. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and a future.Future. In the @@ -693,7 +733,8 @@ class GenericStub(object): @abc.abstractmethod def inline_unary_stream( - self, group, method, request, timeout, metadata=None): + self, group, method, request, timeout, metadata=None, + protocol_options=None): """Invokes a unary-request-stream-response method. Args: @@ -702,6 +743,8 @@ class GenericStub(object): request: The request value for the RPC. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and an iterator of response @@ -713,7 +756,7 @@ class GenericStub(object): @abc.abstractmethod def blocking_stream_unary( self, group, method, request_iterator, timeout, metadata=None, - with_call=False): + with_call=False, protocol_options=None): """Invokes a stream-request-unary-response method. This method blocks until either returning the response value of the RPC @@ -728,6 +771,8 @@ class GenericStub(object): metadata: A metadata value to be passed to the service-side of the RPC. with_call: Whether or not to include return a Call for the RPC in addition to the reponse. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -740,7 +785,8 @@ class GenericStub(object): @abc.abstractmethod def future_stream_unary( - self, group, method, request_iterator, timeout, metadata=None): + self, group, method, request_iterator, timeout, metadata=None, + protocol_options=None): """Invokes a stream-request-unary-response method. Args: @@ -749,6 +795,8 @@ class GenericStub(object): request_iterator: An iterator that yields request values for the RPC. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and a future.Future. In the @@ -760,7 +808,8 @@ class GenericStub(object): @abc.abstractmethod def inline_stream_stream( - self, group, method, request_iterator, timeout, metadata=None): + self, group, method, request_iterator, timeout, metadata=None, + protocol_options=None): """Invokes a stream-request-stream-response method. Args: @@ -769,6 +818,8 @@ class GenericStub(object): request_iterator: An iterator that yields request values for the RPC. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: An object that is both a Call for the RPC and an iterator of response @@ -780,7 +831,7 @@ class GenericStub(object): @abc.abstractmethod def event_unary_unary( self, group, method, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Event-driven invocation of a unary-request-unary-response method. Args: @@ -792,6 +843,8 @@ class GenericStub(object): in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A Call for the RPC. @@ -801,7 +854,7 @@ class GenericStub(object): @abc.abstractmethod def event_unary_stream( self, group, method, request, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Event-driven invocation of a unary-request-stream-response method. Args: @@ -813,6 +866,8 @@ class GenericStub(object): in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A Call for the RPC. @@ -822,7 +877,7 @@ class GenericStub(object): @abc.abstractmethod def event_stream_unary( self, group, method, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Event-driven invocation of a unary-request-unary-response method. Args: @@ -833,6 +888,8 @@ class GenericStub(object): in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A pair of a Call object for the RPC and a stream.Consumer to which the @@ -843,7 +900,7 @@ class GenericStub(object): @abc.abstractmethod def event_stream_stream( self, group, method, receiver, abortion_callback, timeout, - metadata=None): + metadata=None, protocol_options=None): """Event-driven invocation of a unary-request-stream-response method. Args: @@ -854,6 +911,8 @@ class GenericStub(object): in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. metadata: A metadata value to be passed to the service-side of the RPC. + protocol_options: A value specified by the provider of a Face interface + implementation affording custom state and behavior. Returns: A pair of a Call object for the RPC and a stream.Consumer to which the diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py index b98a30a399..24f0e3b354 100644 --- a/src/python/grpcio/grpc/framework/interfaces/links/links.py +++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py @@ -34,14 +34,13 @@ import collections import enum -class Transport(collections.namedtuple('Transport', ('kind', 'value',))): - """A sum type for handles to an underlying transport system. +class Protocol(collections.namedtuple('Protocol', ('kind', 'value',))): + """A sum type for handles to a system that transmits tickets. Attributes: - kind: A Kind value identifying the kind of value being passed to or from - the underlying transport. - value: The value being passed through RPC Framework between the high-level - application and the underlying transport. + kind: A Kind value identifying the kind of value being passed. + value: The value being passed between the high-level application and the + system affording ticket transport. """ @enum.unique @@ -56,8 +55,7 @@ class Ticket( 'Ticket', ('operation_id', 'sequence_number', 'group', 'method', 'subscription', 'timeout', 'allowance', 'initial_metadata', 'payload', - 'terminal_metadata', 'code', 'message', 'termination', - 'transport',))): + 'terminal_metadata', 'code', 'message', 'termination', 'protocol',))): """A sum type for all values sent from a front to a back. Attributes: @@ -99,8 +97,8 @@ class Ticket( termination: A Termination value describing the end of the operation, or None if the operation has not yet terminated. If set, no further tickets may be sent in the same direction. - transport: A Transport value or None, with further semantics being a matter - between high-level application and underlying transport. + protocol: A Protocol value or None, with further semantics being a matter + between high-level application and underlying ticket transport. """ @enum.unique diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py index f0bd989ea6..cafb6b6eae 100644 --- a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py +++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py @@ -38,6 +38,7 @@ import unittest from grpc._adapter import _intermediary_low from grpc._links import invocation from grpc._links import service +from grpc.beta import interfaces as beta_interfaces from grpc.framework.core import implementations from grpc.framework.interfaces.base import utilities from grpc_test import test_common as grpc_test_common @@ -45,8 +46,6 @@ from grpc_test.framework.common import test_constants from grpc_test.framework.interfaces.base import test_cases from grpc_test.framework.interfaces.base import test_interfaces -_CODE = _intermediary_low.Code.OK - class _SerializationBehaviors( collections.namedtuple( @@ -124,8 +123,8 @@ class _Implementation(test_interfaces.Implementation): def service_completion(self): return utilities.completion( - grpc_test_common.SERVICE_TERMINAL_METADATA, _CODE, - grpc_test_common.DETAILS) + grpc_test_common.SERVICE_TERMINAL_METADATA, + beta_interfaces.StatusCode.OK, grpc_test_common.DETAILS) def metadata_transmitted(self, original_metadata, transmitted_metadata): return original_metadata is None or grpc_test_common.metadata_transmitted( diff --git a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py index 28c0619f7c..a4d4dee38c 100644 --- a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py +++ b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py @@ -35,6 +35,7 @@ import unittest from grpc._adapter import _intermediary_low from grpc._links import invocation from grpc._links import service +from grpc.beta import interfaces as beta_interfaces from grpc.framework.core import implementations as core_implementations from grpc.framework.crust import implementations as crust_implementations from grpc.framework.foundation import logging_pool @@ -139,7 +140,7 @@ class _Implementation(test_interfaces.Implementation): return grpc_test_common.SERVICE_TERMINAL_METADATA def code(self): - return _intermediary_low.Code.OK + return beta_interfaces.StatusCode.OK def details(self): return grpc_test_common.DETAILS diff --git a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py index 716323cc20..77e83d5561 100644 --- a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py +++ b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py @@ -34,6 +34,7 @@ import unittest from grpc._adapter import _intermediary_low from grpc._links import invocation from grpc._links import service +from grpc.beta import interfaces as beta_interfaces from grpc.framework.interfaces.links import links from grpc_test import test_common from grpc_test._links import _proto_scenarios @@ -93,7 +94,8 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase): return None, None def create_service_completion(self): - return _intermediary_low.Code.OK, 'An exuberant test "details" message!' + return ( + beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!') def assertMetadataTransmitted(self, original_metadata, transmitted_metadata): self.assertTrue( @@ -110,7 +112,7 @@ class RoundTripTest(unittest.TestCase): test_group = 'test package.Test Group' test_method = 'test method' identity_transformation = {(test_group, test_method): _IDENTITY} - test_code = _intermediary_low.Code.OK + test_code = beta_interfaces.StatusCode.OK test_message = 'a test message' service_link = service.service_link( @@ -150,11 +152,13 @@ class RoundTripTest(unittest.TestCase): self.assertIs( invocation_mate.tickets()[-1].termination, links.Ticket.Termination.COMPLETION) + self.assertIs(invocation_mate.tickets()[-1].code, test_code) + self.assertEqual(invocation_mate.tickets()[-1].message, test_message) def _perform_scenario_test(self, scenario): test_operation_id = object() test_group, test_method = scenario.group_and_method() - test_code = _intermediary_low.Code.OK + test_code = beta_interfaces.StatusCode.OK test_message = 'a scenario test message' service_link = service.service_link( diff --git a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py index ce4c59c0ee..e9087a7949 100644 --- a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py @@ -32,8 +32,8 @@ import collections import unittest -from grpc._adapter import _intermediary_low from grpc.beta import beta +from grpc.beta import interfaces from grpc_test import resources from grpc_test import test_common as grpc_test_common from grpc_test.beta import test_utilities @@ -116,7 +116,7 @@ class _Implementation(test_interfaces.Implementation): return grpc_test_common.SERVICE_TERMINAL_METADATA def code(self): - return _intermediary_low.Code.OK + return interfaces.StatusCode.OK def details(self): return grpc_test_common.DETAILS diff --git a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py new file mode 100644 index 0000000000..ecd10f2175 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py @@ -0,0 +1,75 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests of RPC-method-not-found behavior.""" + +import unittest + +from grpc.beta import beta +from grpc.beta import interfaces +from grpc.framework.interfaces.face import face +from grpc_test.framework.common import test_constants + + +class NotFoundTest(unittest.TestCase): + + def setUp(self): + self._server = beta.server({}) + port = self._server.add_insecure_port('[::]:0') + channel = beta.create_insecure_channel('localhost', port) + self._generic_stub = beta.generic_stub(channel) + self._server.start() + + def tearDown(self): + self._server.stop(0).wait() + self._generic_stub = None + + def test_blocking_unary_unary_not_found(self): + with self.assertRaises(face.LocalError) as exception_assertion_context: + self._generic_stub.blocking_unary_unary( + 'groop', 'meffod', b'abc', test_constants.LONG_TIMEOUT, + with_call=True) + self.assertIs( + exception_assertion_context.exception.code, + interfaces.StatusCode.UNIMPLEMENTED) + + def test_future_stream_unary_not_found(self): + rpc_future = self._generic_stub.future_stream_unary( + 'grupe', 'mevvod', b'def', test_constants.LONG_TIMEOUT) + with self.assertRaises(face.LocalError) as exception_assertion_context: + rpc_future.result() + self.assertIs( + exception_assertion_context.exception.code, + interfaces.StatusCode.UNIMPLEMENTED) + self.assertIs( + rpc_future.exception().code, interfaces.StatusCode.UNIMPLEMENTED) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py index e4d2a7a0d7..46a01876d8 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py @@ -236,8 +236,8 @@ class Instruction( collections.namedtuple( 'Instruction', ('kind', 'advance_args', 'advance_kwargs', 'conclude_success', - 'conclude_message', 'conclude_invocation_outcome', - 'conclude_service_outcome',))): + 'conclude_message', 'conclude_invocation_outcome_kind', + 'conclude_service_outcome_kind',))): """""" @enum.unique @@ -532,24 +532,24 @@ class _SequenceController(Controller): self._state.service_side_outcome = outcome if self._todo is not None or self._remaining_elements: self._failed('Premature service-side outcome %s!' % (outcome,)) - elif outcome is not self._sequence.outcome.service: + elif outcome.kind is not self._sequence.outcome_kinds.service: self._failed( - 'Incorrect service-side outcome: %s should have been %s' % ( - outcome, self._sequence.outcome.service)) + 'Incorrect service-side outcome kind: %s should have been %s' % ( + outcome.kind, self._sequence.outcome_kinds.service)) elif self._state.invocation_side_outcome is not None: - self._passed(self._state.invocation_side_outcome, outcome) + self._passed(self._state.invocation_side_outcome.kind, outcome.kind) def invocation_on_termination(self, outcome): with self._condition: self._state.invocation_side_outcome = outcome if self._todo is not None or self._remaining_elements: self._failed('Premature invocation-side outcome %s!' % (outcome,)) - elif outcome is not self._sequence.outcome.invocation: + elif outcome.kind is not self._sequence.outcome_kinds.invocation: self._failed( - 'Incorrect invocation-side outcome: %s should have been %s' % ( - outcome, self._sequence.outcome.invocation)) + 'Incorrect invocation-side outcome kind: %s should have been %s' % ( + outcome.kind, self._sequence.outcome_kinds.invocation)) elif self._state.service_side_outcome is not None: - self._passed(outcome, self._state.service_side_outcome) + self._passed(outcome.kind, self._state.service_side_outcome.kind) class _SequenceControllerCreator(ControllerCreator): diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py index 1d77aaebe6..f547d91681 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py @@ -103,13 +103,14 @@ class Element(collections.namedtuple('Element', ('kind', 'transmission',))): SERVICE_FAILURE = 'service failure' -class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))): +class OutcomeKinds( + collections.namedtuple('Outcome', ('invocation', 'service',))): """A description of the expected outcome of an operation test. Attributes: - invocation: The base.Outcome value expected on the invocation side of the - operation. - service: The base.Outcome value expected on the service side of the + invocation: The base.Outcome.Kind value expected on the invocation side of + the operation. + service: The base.Outcome.Kind value expected on the service side of the operation. """ @@ -117,7 +118,8 @@ class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))): class Sequence( collections.namedtuple( 'Sequence', - ('name', 'maximum_duration', 'invocation', 'elements', 'outcome',))): + ('name', 'maximum_duration', 'invocation', 'elements', + 'outcome_kinds',))): """Describes at a high level steps to perform in a test. Attributes: @@ -128,7 +130,8 @@ class Sequence( under test. elements: A sequence of Element values describing at coarse granularity actions to take during the operation under test. - outcome: An Outcome value describing the expected outcome of the test. + outcome_kinds: An OutcomeKinds value describing the expected outcome kinds + of the test. """ _EASY = Sequence( @@ -139,7 +142,7 @@ _EASY = Sequence( Element( Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)), ), - Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED)) + OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED)) _PEASY = Sequence( 'Peasy', @@ -154,7 +157,7 @@ _PEASY = Sequence( Element( Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)), ), - Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED)) + OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED)) # TODO(issue 2959): Finish this test suite. This tuple of sequences should diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py index 87332cf612..5065a3f38a 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py @@ -44,7 +44,8 @@ from grpc_test.framework.interfaces.base import test_interfaces _SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True)) -_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome} +_EMPTY_OUTCOME_KIND_DICT = { + outcome_kind: 0 for outcome_kind in base.Outcome.Kind} class _Serialization(test_interfaces.Serialization): @@ -119,7 +120,7 @@ class _Operator(base.Operator): class _Servicer(base.Servicer): - """An base.Servicer with instrumented for testing.""" + """A base.Servicer with instrumented for testing.""" def __init__(self, group, method, controllers, pool): self._condition = threading.Condition() @@ -223,11 +224,12 @@ class _OperationTest(unittest.TestCase): self.assertTrue( instruction.conclude_success, msg=instruction.conclude_message) - expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT) - expected_invocation_stats[instruction.conclude_invocation_outcome] += 1 + expected_invocation_stats = dict(_EMPTY_OUTCOME_KIND_DICT) + expected_invocation_stats[ + instruction.conclude_invocation_outcome_kind] += 1 self.assertDictEqual(expected_invocation_stats, invocation_stats) - expected_service_stats = dict(_EMPTY_OUTCOME_DICT) - expected_service_stats[instruction.conclude_service_outcome] += 1 + expected_service_stats = dict(_EMPTY_OUTCOME_KIND_DICT) + expected_service_stats[instruction.conclude_service_outcome_kind] += 1 self.assertDictEqual(expected_service_stats, service_stats) diff --git a/src/ruby/README.md b/src/ruby/README.md index 7f75c0e313..8c56ceb135 100644 --- a/src/ruby/README.md +++ b/src/ruby/README.md @@ -19,10 +19,10 @@ INSTALLATION **Linux (Debian):** -Add [Debian testing][] to your `sources.list` file. Example: +Add [Debian jessie-backports][] to your `sources.list` file. Example: ```sh -echo "deb http://ftp.us.debian.org/debian testing main contrib non-free" | \ +echo "deb http://http.debian.net/debian jessie-backports main" | \ sudo tee -a /etc/apt/sources.list ``` @@ -99,4 +99,4 @@ Directory structure is the layout for [ruby extensions][] [ruby extensions]:http://guides.rubygems.org/gems-with-extensions/ [rubydoc]: http://www.rubydoc.info/gems/grpc [grpc.io]: http://www.grpc.io/docs/installation/ruby.html -[Debian testing]:https://www.debian.org/releases/stretch/ +[Debian jessie-backports]:http://backports.debian.org/Instructions/ |