aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-08-27 07:37:48 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-08-27 07:37:48 -0700
commit4ff47d9ea361251274247998625808305c7f7c4a (patch)
tree8396780e126a8db360700b182ed2722eef2527a9
parentb6c912b75b14cebc0527ccbac91964af347e7df7 (diff)
parent99d7b661bede39143d1be6040fb67c81b8117ae3 (diff)
Merge pull request #3095 from grpc/revert-3025-endpoints
Revert "Refactor Endpoint API"
-rw-r--r--include/grpc/support/slice_buffer.h2
-rw-r--r--src/core/httpcli/httpcli.c95
-rw-r--r--src/core/iomgr/endpoint.c17
-rw-r--r--src/core/iomgr/endpoint.h63
-rw-r--r--src/core/iomgr/tcp_posix.c525
-rw-r--r--src/core/iomgr/tcp_windows.c192
-rw-r--r--src/core/security/secure_endpoint.c188
-rw-r--r--src/core/security/secure_transport_setup.c119
-rw-r--r--src/core/support/slice_buffer.c22
-rw-r--r--src/core/transport/chttp2/internal.h12
-rw-r--r--src/core/transport/chttp2/writing.c21
-rw-r--r--src/core/transport/chttp2_transport.c140
-rw-r--r--test/core/bad_client/bad_client.c17
-rw-r--r--test/core/iomgr/endpoint_tests.c204
-rw-r--r--test/core/iomgr/tcp_posix_test.c148
-rw-r--r--test/core/security/secure_endpoint_test.c55
-rw-r--r--test/core/util/port_posix.c14
-rw-r--r--test/core/util/port_windows.c91
18 files changed, 978 insertions, 947 deletions
diff --git a/include/grpc/support/slice_buffer.h b/include/grpc/support/slice_buffer.h
index 04db003ac5..ec048e8c91 100644
--- a/include/grpc/support/slice_buffer.h
+++ b/include/grpc/support/slice_buffer.h
@@ -86,8 +86,6 @@ void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb);
void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b);
/* move all of the elements of src into dst */
void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst);
-/* remove n bytes from the end of a slice buffer */
-void gpr_slice_buffer_trim_end(gpr_slice_buffer *src, size_t n);
#ifdef __cplusplus
}
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 1e38479eb1..9012070e8e 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -61,10 +61,6 @@ typedef struct {
grpc_httpcli_context *context;
grpc_pollset *pollset;
grpc_iomgr_object iomgr_obj;
- gpr_slice_buffer incoming;
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure on_read;
- grpc_iomgr_closure done_write;
} internal_request;
static grpc_httpcli_get_override g_get_override = NULL;
@@ -103,70 +99,73 @@ static void finish(internal_request *req, int success) {
gpr_slice_unref(req->request_text);
gpr_free(req->host);
grpc_iomgr_unregister_object(&req->iomgr_obj);
- gpr_slice_buffer_destroy(&req->incoming);
- gpr_slice_buffer_destroy(&req->outgoing);
gpr_free(req);
}
-static void on_read(void *user_data, int success);
-
-static void do_read(internal_request *req) {
- switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) {
- case GRPC_ENDPOINT_DONE:
- on_read(req, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_ERROR:
- on_read(req, 0);
- break;
- }
-}
-
-static void on_read(void *user_data, int success) {
+static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status status) {
internal_request *req = user_data;
size_t i;
- for (i = 0; i < req->incoming.count; i++) {
- if (GPR_SLICE_LENGTH(req->incoming.slices[i])) {
+ for (i = 0; i < nslices; i++) {
+ if (GPR_SLICE_LENGTH(slices[i])) {
req->have_read_byte = 1;
- if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) {
+ if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) {
finish(req, 0);
- return;
+ goto done;
}
}
}
- if (success) {
- do_read(req);
- } else if (!req->have_read_byte) {
- next_address(req);
- } else {
- finish(req, grpc_httpcli_parser_eof(&req->parser));
+ switch (status) {
+ case GRPC_ENDPOINT_CB_OK:
+ grpc_endpoint_notify_on_read(req->ep, on_read, req);
+ break;
+ case GRPC_ENDPOINT_CB_EOF:
+ case GRPC_ENDPOINT_CB_ERROR:
+ case GRPC_ENDPOINT_CB_SHUTDOWN:
+ if (!req->have_read_byte) {
+ next_address(req);
+ } else {
+ finish(req, grpc_httpcli_parser_eof(&req->parser));
+ }
+ break;
+ }
+
+done:
+ for (i = 0; i < nslices; i++) {
+ gpr_slice_unref(slices[i]);
}
}
-static void on_written(internal_request *req) { do_read(req); }
+static void on_written(internal_request *req) {
+ grpc_endpoint_notify_on_read(req->ep, on_read, req);
+}
-static void done_write(void *arg, int success) {
+static void done_write(void *arg, grpc_endpoint_cb_status status) {
internal_request *req = arg;
- if (success) {
- on_written(req);
- } else {
- next_address(req);
+ switch (status) {
+ case GRPC_ENDPOINT_CB_OK:
+ on_written(req);
+ break;
+ case GRPC_ENDPOINT_CB_EOF:
+ case GRPC_ENDPOINT_CB_SHUTDOWN:
+ case GRPC_ENDPOINT_CB_ERROR:
+ next_address(req);
+ break;
}
}
static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text);
- gpr_slice_buffer_add(&req->outgoing, req->request_text);
- switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) {
- case GRPC_ENDPOINT_DONE:
+ switch (
+ grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
on_written(req);
break;
- case GRPC_ENDPOINT_PENDING:
+ case GRPC_ENDPOINT_WRITE_PENDING:
break;
- case GRPC_ENDPOINT_ERROR:
+ case GRPC_ENDPOINT_WRITE_ERROR:
finish(req, 0);
break;
}
@@ -238,10 +237,6 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
- grpc_iomgr_closure_init(&req->on_read, on_read, req);
- grpc_iomgr_closure_init(&req->done_write, done_write, req);
- gpr_slice_buffer_init(&req->incoming);
- gpr_slice_buffer_init(&req->outgoing);
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name);
@@ -275,11 +270,7 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
- grpc_iomgr_closure_init(&req->on_read, on_read, req);
- grpc_iomgr_closure_init(&req->done_write, done_write, req);
- gpr_slice_buffer_init(&req->incoming);
- gpr_slice_buffer_init(&req->outgoing);
- gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
+ gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name);
req->host = gpr_strdup(request->host);
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index a7878e31dd..8ee14bce9b 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -33,16 +33,17 @@
#include "src/core/iomgr/endpoint.h"
-grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
- gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
- return ep->vtable->read(ep, slices, cb);
+void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+ void *user_data) {
+ ep->vtable->notify_on_read(ep, cb, user_data);
}
-grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
- return ep->vtable->write(ep, slices, cb);
+grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
+ gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_write_cb cb,
+ void *user_data) {
+ return ep->vtable->write(ep, slices, nslices, cb, user_data);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index d14d52d561..ea92a500e8 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -37,7 +37,6 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_set.h"
#include <grpc/support/slice.h>
-#include <grpc/support/slice_buffer.h>
#include <grpc/support/time.h>
/* An endpoint caps a streaming channel between two communicating processes.
@@ -46,17 +45,31 @@
typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
-typedef enum grpc_endpoint_op_status {
- GRPC_ENDPOINT_DONE, /* completed immediately, cb won't be called */
- GRPC_ENDPOINT_PENDING, /* cb will be called when completed */
- GRPC_ENDPOINT_ERROR /* write errored out, cb won't be called */
-} grpc_endpoint_op_status;
+typedef enum grpc_endpoint_cb_status {
+ GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */
+ GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */
+ GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */
+ GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */
+} grpc_endpoint_cb_status;
+
+typedef enum grpc_endpoint_write_status {
+ GRPC_ENDPOINT_WRITE_DONE, /* completed immediately, cb won't be called */
+ GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */
+ GRPC_ENDPOINT_WRITE_ERROR /* write errored out, cb won't be called */
+} grpc_endpoint_write_status;
+
+typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_cb_status error);
+typedef void (*grpc_endpoint_write_cb)(void *user_data,
+ grpc_endpoint_cb_status error);
struct grpc_endpoint_vtable {
- grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb);
- grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb);
+ void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+ void *user_data);
+ grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
+ size_t nslices, grpc_endpoint_write_cb cb,
+ void *user_data);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
void (*shutdown)(grpc_endpoint *ep);
@@ -64,32 +77,26 @@ struct grpc_endpoint_vtable {
char *(*get_peer)(grpc_endpoint *ep);
};
-/* When data is available on the connection, calls the callback with slices.
- Callback success indicates that the endpoint can accept more reads, failure
- indicates the endpoint is closed.
- Valid slices may be placed into \a slices even on callback success == 0. */
-grpc_endpoint_op_status grpc_endpoint_read(
- grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
+/* When data is available on the connection, calls the callback with slices. */
+void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+ void *user_data);
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it
- returns GRPC_ENDPOINT_DONE.
- Otherwise it returns GRPC_ENDPOINT_PENDING and calls cb when the
- connection is ready for more data.
- \a slices may be mutated at will by the endpoint until cb is called.
- No guarantee is made to the content of slices after a write EXCEPT that
- it is a valid slice buffer.
- */
-grpc_endpoint_op_status grpc_endpoint_write(
- grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
+ returns GRPC_ENDPOINT_WRITE_DONE.
+ Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
+ connection is ready for more data. */
+grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
+ gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_write_cb cb,
+ void *user_data);
/* Causes any pending read/write callbacks to run immediately with
- success==0 */
+ GRPC_ENDPOINT_CB_SHUTDOWN status */
void grpc_endpoint_shutdown(grpc_endpoint *ep);
void grpc_endpoint_destroy(grpc_endpoint *ep);
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 0db7cd9f0e..360e6ebd8c 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -61,8 +61,209 @@
#define SENDMSG_FLAGS 0
#endif
+/* Holds a slice array and associated state. */
+typedef struct grpc_tcp_slice_state {
+ gpr_slice *slices; /* Array of slices */
+ size_t nslices; /* Size of slices array. */
+ ssize_t first_slice; /* First valid slice in array */
+ ssize_t last_slice; /* Last valid slice in array */
+ gpr_slice working_slice; /* pointer to original final slice */
+ int working_slice_valid; /* True if there is a working slice */
+ int memory_owned; /* True if slices array is owned */
+} grpc_tcp_slice_state;
+
int grpc_tcp_trace = 0;
+static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices,
+ size_t nslices, size_t valid_slices) {
+ state->slices = slices;
+ state->nslices = nslices;
+ if (valid_slices == 0) {
+ state->first_slice = -1;
+ } else {
+ state->first_slice = 0;
+ }
+ state->last_slice = valid_slices - 1;
+ state->working_slice_valid = 0;
+ state->memory_owned = 0;
+}
+
+/* Returns true if there is still available data */
+static int slice_state_has_available(grpc_tcp_slice_state *state) {
+ return state->first_slice != -1 && state->last_slice >= state->first_slice;
+}
+
+static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) {
+ if (state->first_slice == -1) {
+ return 0;
+ } else {
+ return state->last_slice - state->first_slice + 1;
+ }
+}
+
+static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) {
+ /* TODO(klempner): use realloc instead when first_slice is 0 */
+ /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */
+ gpr_slice *slices = state->slices;
+ size_t original_size = slice_state_slices_allocated(state);
+ size_t i;
+ gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size);
+
+ for (i = 0; i < original_size; ++i) {
+ new_slices[i] = slices[i + state->first_slice];
+ }
+
+ state->slices = new_slices;
+ state->last_slice = original_size - 1;
+ if (original_size > 0) {
+ state->first_slice = 0;
+ } else {
+ state->first_slice = -1;
+ }
+ state->nslices = new_size;
+
+ if (state->memory_owned) {
+ gpr_free(slices);
+ }
+ state->memory_owned = 1;
+}
+
+static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
+ size_t prefix_bytes) {
+ gpr_slice *current_slice = &state->slices[state->first_slice];
+ size_t current_slice_size;
+
+ while (slice_state_has_available(state)) {
+ current_slice_size = GPR_SLICE_LENGTH(*current_slice);
+ if (current_slice_size > prefix_bytes) {
+ /* TODO(klempner): Get rid of the extra refcount created here by adding a
+ native "trim the first N bytes" operation to splice */
+ /* TODO(klempner): This really shouldn't be modifying the current slice
+ unless we own the slices array. */
+ gpr_slice tail;
+ tail = gpr_slice_split_tail(current_slice, prefix_bytes);
+ gpr_slice_unref(*current_slice);
+ *current_slice = tail;
+ return;
+ } else {
+ gpr_slice_unref(*current_slice);
+ ++state->first_slice;
+ ++current_slice;
+ prefix_bytes -= current_slice_size;
+ }
+ }
+}
+
+static void slice_state_destroy(grpc_tcp_slice_state *state) {
+ while (slice_state_has_available(state)) {
+ gpr_slice_unref(state->slices[state->first_slice]);
+ ++state->first_slice;
+ }
+
+ if (state->memory_owned) {
+ gpr_free(state->slices);
+ state->memory_owned = 0;
+ }
+}
+
+void slice_state_transfer_ownership(grpc_tcp_slice_state *state,
+ gpr_slice **slices, size_t *nslices) {
+ *slices = state->slices + state->first_slice;
+ *nslices = state->last_slice - state->first_slice + 1;
+
+ state->first_slice = -1;
+ state->last_slice = -1;
+}
+
+/* Fills iov with the first min(iov_size, available) slices, returns number
+ filled */
+static size_t slice_state_to_iovec(grpc_tcp_slice_state *state,
+ struct iovec *iov, size_t iov_size) {
+ size_t nslices = state->last_slice - state->first_slice + 1;
+ gpr_slice *slices = state->slices + state->first_slice;
+ size_t i;
+ if (nslices < iov_size) {
+ iov_size = nslices;
+ }
+
+ for (i = 0; i < iov_size; ++i) {
+ iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]);
+ iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]);
+ }
+ return iov_size;
+}
+
+/* Makes n blocks available at the end of state, writes them into iov, and
+ returns the number of bytes allocated */
+static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state,
+ struct iovec *iov, size_t n,
+ size_t slice_size) {
+ size_t target_size;
+ size_t i;
+ size_t allocated_bytes;
+ ssize_t allocated_slices = slice_state_slices_allocated(state);
+
+ if (n - state->working_slice_valid >= state->nslices - state->last_slice) {
+ /* Need to grow the slice array */
+ target_size = state->nslices;
+ do {
+ target_size = target_size * 2;
+ } while (target_size < allocated_slices + n - state->working_slice_valid);
+ /* TODO(klempner): If this ever needs to support both prefix removal and
+ append, we should be smarter about the growth logic here */
+ slice_state_realloc(state, target_size);
+ }
+
+ i = 0;
+ allocated_bytes = 0;
+
+ if (state->working_slice_valid) {
+ iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]);
+ iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) -
+ GPR_SLICE_LENGTH(state->slices[state->last_slice]);
+ allocated_bytes += iov[0].iov_len;
+ ++i;
+ state->slices[state->last_slice] = state->working_slice;
+ state->working_slice_valid = 0;
+ }
+
+ for (; i < n; ++i) {
+ ++state->last_slice;
+ state->slices[state->last_slice] = gpr_slice_malloc(slice_size);
+ iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]);
+ iov[i].iov_len = slice_size;
+ allocated_bytes += slice_size;
+ }
+ if (state->first_slice == -1) {
+ state->first_slice = 0;
+ }
+ return allocated_bytes;
+}
+
+/* Remove the last n bytes from state */
+/* TODO(klempner): Consider having this defer actual deletion until later */
+static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) {
+ while (bytes > 0 && slice_state_has_available(state)) {
+ if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) {
+ state->working_slice = state->slices[state->last_slice];
+ state->working_slice_valid = 1;
+ /* TODO(klempner): Combine these into a single operation that doesn't need
+ to refcount */
+ gpr_slice_unref(gpr_slice_split_tail(
+ &state->slices[state->last_slice],
+ GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes));
+ bytes = 0;
+ } else {
+ bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]);
+ gpr_slice_unref(state->slices[state->last_slice]);
+ --state->last_slice;
+ if (state->last_slice == -1) {
+ state->first_slice = -1;
+ }
+ }
+ }
+}
+
typedef struct {
grpc_endpoint base;
grpc_fd *em_fd;
@@ -72,111 +273,80 @@ typedef struct {
size_t slice_size;
gpr_refcount refcount;
- gpr_slice_buffer *incoming_buffer;
- gpr_slice_buffer *outgoing_buffer;
- /** slice within outgoing_buffer to write next */
- size_t outgoing_slice_idx;
- /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
- size_t outgoing_byte_idx;
+ grpc_endpoint_read_cb read_cb;
+ void *read_user_data;
+ grpc_endpoint_write_cb write_cb;
+ void *write_user_data;
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
+ grpc_tcp_slice_state write_state;
grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
+ grpc_iomgr_closure handle_read_closure;
+
char *peer_string;
} grpc_tcp;
-static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
-static void tcp_shutdown(grpc_endpoint *ep) {
+static void grpc_tcp_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_fd_shutdown(tcp->em_fd);
}
-static void tcp_free(grpc_tcp *tcp) {
- grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
- gpr_free(tcp->peer_string);
- gpr_free(tcp);
-}
-
-/*#define GRPC_TCP_REFCOUNT_DEBUG*/
-#ifdef GRPC_TCP_REFCOUNT_DEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
- int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
- reason, tcp->refcount.count, tcp->refcount.count - 1);
- if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+static void grpc_tcp_unref(grpc_tcp *tcp) {
+ int refcount_zero = gpr_unref(&tcp->refcount);
+ if (refcount_zero) {
+ grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
+ gpr_free(tcp->peer_string);
+ gpr_free(tcp);
}
}
-static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
- int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
- reason, tcp->refcount.count, tcp->refcount.count + 1);
- gpr_ref(&tcp->refcount);
-}
-#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
-#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp *tcp) {
- if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
- }
-}
-
-static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
-#endif
-
-static void tcp_destroy(grpc_endpoint *ep) {
+static void grpc_tcp_destroy(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- TCP_UNREF(tcp, "destroy");
+ grpc_tcp_unref(tcp);
}
-static void call_read_cb(grpc_tcp *tcp, int success) {
- grpc_iomgr_closure *cb = tcp->read_cb;
+static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status status) {
+ grpc_endpoint_read_cb cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
- gpr_log(GPR_DEBUG, "read: success=%d", success);
- for (i = 0; i < tcp->incoming_buffer->count; i++) {
- char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "read: status=%d", status);
+ for (i = 0; i < nslices; i++) {
+ char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
gpr_free(dump);
}
}
tcp->read_cb = NULL;
- tcp->incoming_buffer = NULL;
- cb->cb(cb->cb_arg, success);
+ cb(tcp->read_user_data, slices, nslices, status);
}
+#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
-static void tcp_continue_read(grpc_tcp *tcp) {
+static void grpc_tcp_continue_read(grpc_tcp *tcp) {
+ gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
- size_t i;
+ ssize_t allocated_bytes;
+ struct grpc_tcp_slice_state read_state;
+ gpr_slice *final_slices;
+ size_t final_nslices;
GPR_ASSERT(!tcp->finished_edge);
- GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
- GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
+ slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
+ 0);
- while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
- gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
- gpr_slice_malloc(tcp->slice_size));
- }
- for (i = 0; i < tcp->incoming_buffer->count; i++) {
- iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
- iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
- }
+ allocated_bytes = slice_state_append_blocks_into_iovec(
+ &read_state, iov, tcp->iov_size, tcp->slice_size);
msg.msg_name = NULL;
msg.msg_namelen = 0;
@@ -192,105 +362,106 @@ static void tcp_continue_read(grpc_tcp *tcp) {
} while (read_bytes < 0 && errno == EINTR);
GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
+ if (read_bytes < allocated_bytes) {
+ /* TODO(klempner): Consider a second read first, in hopes of getting a
+ * quick EAGAIN and saving a bunch of allocations. */
+ slice_state_remove_last(&read_state, read_bytes < 0
+ ? allocated_bytes
+ : allocated_bytes - read_bytes);
+ }
+
if (read_bytes < 0) {
- /* NB: After calling call_read_cb a parallel call of the read handler may
+ /* NB: After calling the user_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (tcp->iov_size > 1) {
tcp->iov_size /= 2;
}
- /* We've consumed the edge, request a new one */
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+ if (slice_state_has_available(&read_state)) {
+ /* TODO(klempner): We should probably do the call into the application
+ without all this junk on the stack */
+ /* FIXME(klempner): Refcount properly */
+ slice_state_transfer_ownership(&read_state, &final_slices,
+ &final_nslices);
+ tcp->finished_edge = 1;
+ call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
+ slice_state_destroy(&read_state);
+ grpc_tcp_unref(tcp);
+ } else {
+ /* We've consumed the edge, request a new one */
+ slice_state_destroy(&read_state);
+ grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
+ }
} else {
/* TODO(klempner): Log interesting errors */
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
+ slice_state_destroy(&read_state);
+ grpc_tcp_unref(tcp);
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ if (slice_state_has_available(&read_state)) {
+ /* there were bytes already read: pass them up to the application */
+ slice_state_transfer_ownership(&read_state, &final_slices,
+ &final_nslices);
+ call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
+ } else {
+ call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
+ }
+ slice_state_destroy(&read_state);
+ grpc_tcp_unref(tcp);
} else {
- GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
- if ((size_t)read_bytes < tcp->incoming_buffer->length) {
- gpr_slice_buffer_trim_end(tcp->incoming_buffer,
- tcp->incoming_buffer->length - read_bytes);
- } else if (tcp->iov_size < MAX_READ_IOVEC) {
+ if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
- GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
- call_read_cb(tcp, 1);
- TCP_UNREF(tcp, "read");
+ GPR_ASSERT(slice_state_has_available(&read_state));
+ slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices);
+ call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
+ slice_state_destroy(&read_state);
+ grpc_tcp_unref(tcp);
}
GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
}
-static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
if (!success) {
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(tcp, 0);
- TCP_UNREF(tcp, "read");
+ call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+ grpc_tcp_unref(tcp);
} else {
- tcp_continue_read(tcp);
+ grpc_tcp_continue_read(tcp);
}
}
-static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
- gpr_slice_buffer *incoming_buffer,
- grpc_iomgr_closure *cb) {
+static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+ void *user_data) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
- tcp->incoming_buffer = incoming_buffer;
- gpr_slice_buffer_reset_and_unref(incoming_buffer);
- TCP_REF(tcp, "read");
+ tcp->read_user_data = user_data;
+ gpr_ref(&tcp->refcount);
if (tcp->finished_edge) {
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
- grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1);
+ tcp->handle_read_closure.cb_arg = tcp;
+ grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1);
}
- /* TODO(ctiller): immediate return */
- return GRPC_ENDPOINT_PENDING;
}
#define MAX_WRITE_IOVEC 16
-static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
+static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
int iov_size;
ssize_t sent_length;
- ssize_t sending_length;
- ssize_t trailing;
- ssize_t unwind_slice_idx;
- ssize_t unwind_byte_idx;
+ grpc_tcp_slice_state *state = &tcp->write_state;
for (;;) {
- sending_length = 0;
- unwind_slice_idx = tcp->outgoing_slice_idx;
- unwind_byte_idx = tcp->outgoing_byte_idx;
- for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
- iov_size != MAX_WRITE_IOVEC;
- iov_size++) {
- iov[iov_size].iov_base =
- GPR_SLICE_START_PTR(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
- tcp->outgoing_byte_idx;
- iov[iov_size].iov_len =
- GPR_SLICE_LENGTH(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
- tcp->outgoing_byte_idx;
- sending_length += iov[iov_size].iov_len;
- tcp->outgoing_slice_idx++;
- tcp->outgoing_byte_idx = 0;
- }
- GPR_ASSERT(iov_size > 0);
+ iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC);
msg.msg_name = NULL;
msg.msg_namelen = 0;
@@ -309,75 +480,70 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
if (sent_length < 0) {
if (errno == EAGAIN) {
- tcp->outgoing_slice_idx = unwind_slice_idx;
- tcp->outgoing_byte_idx = unwind_byte_idx;
- return GRPC_ENDPOINT_PENDING;
+ return GRPC_ENDPOINT_WRITE_PENDING;
} else {
/* TODO(klempner): Log some of these */
- return GRPC_ENDPOINT_ERROR;
+ slice_state_destroy(state);
+ return GRPC_ENDPOINT_WRITE_ERROR;
}
}
- GPR_ASSERT(tcp->outgoing_byte_idx == 0);
- trailing = sending_length - sent_length;
- while (trailing > 0) {
- ssize_t slice_length;
-
- tcp->outgoing_slice_idx--;
- slice_length = GPR_SLICE_LENGTH(
- tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
- if (slice_length > trailing) {
- tcp->outgoing_byte_idx = slice_length - trailing;
- break;
- } else {
- trailing -= slice_length;
- }
- }
+ /* TODO(klempner): Probably better to batch this after we finish flushing */
+ slice_state_remove_prefix(state, sent_length);
- if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
- return GRPC_ENDPOINT_DONE;
+ if (!slice_state_has_available(state)) {
+ return GRPC_ENDPOINT_WRITE_DONE;
}
};
}
-static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
- grpc_endpoint_op_status status;
- grpc_iomgr_closure *cb;
+ grpc_endpoint_write_status write_status;
+ grpc_endpoint_cb_status cb_status;
+ grpc_endpoint_write_cb cb;
if (!success) {
+ slice_state_destroy(&tcp->write_state);
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb->cb(cb->cb_arg, 0);
- TCP_UNREF(tcp, "write");
+ cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
+ grpc_tcp_unref(tcp);
return;
}
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
- status = tcp_flush(tcp);
- if (status == GRPC_ENDPOINT_PENDING) {
+ write_status = grpc_tcp_flush(tcp);
+ if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
} else {
+ slice_state_destroy(&tcp->write_state);
+ if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
+ cb_status = GRPC_ENDPOINT_CB_OK;
+ } else {
+ cb_status = GRPC_ENDPOINT_CB_ERROR;
+ }
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
- TCP_UNREF(tcp, "write");
+ cb(tcp->write_user_data, cb_status);
+ grpc_tcp_unref(tcp);
}
GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
}
-static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
- gpr_slice_buffer *buf,
- grpc_iomgr_closure *cb) {
+static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
+ gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_write_cb cb,
+ void *user_data) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_endpoint_op_status status;
+ grpc_endpoint_write_status status;
if (grpc_tcp_trace) {
size_t i;
- for (i = 0; i < buf->count; i++) {
- char *data =
- gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < nslices; i++) {
+ char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
gpr_free(data);
}
@@ -385,19 +551,15 @@ static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
GPR_ASSERT(tcp->write_cb == NULL);
+ slice_state_init(&tcp->write_state, slices, nslices, nslices);
- if (buf->length == 0) {
- GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
- return GRPC_ENDPOINT_DONE;
- }
- tcp->outgoing_buffer = buf;
- tcp->outgoing_slice_idx = 0;
- tcp->outgoing_byte_idx = 0;
-
- status = tcp_flush(tcp);
- if (status == GRPC_ENDPOINT_PENDING) {
- TCP_REF(tcp, "write");
+ status = grpc_tcp_flush(tcp);
+ if (status == GRPC_ENDPOINT_WRITE_PENDING) {
+ /* TODO(klempner): Consider inlining rather than malloc for small nslices */
+ slice_state_realloc(&tcp->write_state, nslices);
+ gpr_ref(&tcp->refcount);
tcp->write_cb = cb;
+ tcp->write_user_data = user_data;
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
}
@@ -405,25 +567,27 @@ static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
return status;
}
-static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_add_fd(pollset, tcp->em_fd);
}
-static void tcp_add_to_pollset_set(grpc_endpoint *ep,
- grpc_pollset_set *pollset_set) {
+static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep,
+ grpc_pollset_set *pollset_set) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
}
-static char *tcp_get_peer(grpc_endpoint *ep) {
+static char *grpc_tcp_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string);
}
static const grpc_endpoint_vtable vtable = {
- tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
- tcp_shutdown, tcp_destroy, tcp_get_peer};
+ grpc_tcp_notify_on_read, grpc_tcp_write,
+ grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set,
+ grpc_tcp_shutdown, grpc_tcp_destroy,
+ grpc_tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {
@@ -433,18 +597,21 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->fd = em_fd->fd;
tcp->read_cb = NULL;
tcp->write_cb = NULL;
- tcp->incoming_buffer = NULL;
+ tcp->read_user_data = NULL;
+ tcp->write_user_data = NULL;
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = 1;
+ slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
- tcp->read_closure.cb = tcp_handle_read;
+ tcp->read_closure.cb = grpc_tcp_handle_read;
tcp->read_closure.cb_arg = tcp;
- tcp->write_closure.cb = tcp_handle_write;
+ tcp->write_closure.cb = grpc_tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
+ tcp->handle_read_closure.cb = grpc_tcp_handle_read;
return &tcp->base;
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 58f9160ef9..901793ec43 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -82,11 +82,13 @@ typedef struct grpc_tcp {
/* Refcounting how many operations are in progress. */
gpr_refcount refcount;
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
+ grpc_endpoint_read_cb read_cb;
+ void *read_user_data;
gpr_slice read_slice;
- gpr_slice_buffer *write_slices;
- gpr_slice_buffer *read_slices;
+
+ grpc_endpoint_write_cb write_cb;
+ void *write_user_data;
+ gpr_slice_buffer write_slices;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
@@ -96,55 +98,34 @@ typedef struct grpc_tcp {
char *peer_string;
} grpc_tcp;
-static void tcp_free(grpc_tcp *tcp) {
- grpc_winsocket_orphan(tcp->socket);
- gpr_mu_destroy(&tcp->mu);
- gpr_free(tcp->peer_string);
- gpr_free(tcp);
-}
-
-/*#define GRPC_TCP_REFCOUNT_DEBUG*/
-#ifdef GRPC_TCP_REFCOUNT_DEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
- int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
- reason, tcp->refcount.count, tcp->refcount.count - 1);
- if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
- }
-}
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
-static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
- int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
- reason, tcp->refcount.count, tcp->refcount.count + 1);
- gpr_ref(&tcp->refcount);
-}
-#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
-#define TCP_REF(tcp, reason) tcp_ref((tcp))
static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ gpr_slice_buffer_destroy(&tcp->write_slices);
+ grpc_winsocket_orphan(tcp->socket);
+ gpr_mu_destroy(&tcp->mu);
+ gpr_free(tcp->peer_string);
+ gpr_free(tcp);
}
}
-static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
-#endif
-
/* Asynchronous callback from the IOCP, or the background thread. */
-static int on_read(grpc_tcp *tcp, int from_iocp) {
+static void on_read(void *tcpp, int from_iocp) {
+ grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
gpr_slice *slice = NULL;
size_t nslices = 0;
- int success;
+ grpc_endpoint_cb_status status;
+ grpc_endpoint_read_cb cb;
grpc_winsocket_callback_info *info = &socket->read_info;
+ void *opaque = tcp->read_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
+ cb = tcp->read_cb;
+ tcp->read_cb = NULL;
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
@@ -158,7 +139,9 @@ static int on_read(grpc_tcp *tcp, int from_iocp) {
tcp->socket->read_info.outstanding = 0;
gpr_slice_unref(tcp->read_slice);
}
- return 0;
+ tcp_unref(tcp);
+ if (cb) cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+ return;
}
GPR_ASSERT(tcp->socket->read_info.outstanding);
@@ -169,38 +152,28 @@ static int on_read(grpc_tcp *tcp, int from_iocp) {
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message);
}
- success = 0;
gpr_slice_unref(tcp->read_slice);
+ status = GRPC_ENDPOINT_CB_ERROR;
} else {
if (info->bytes_transfered != 0) {
sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
- gpr_slice_buffer_add(tcp->read_slices, sub);
- success = 1;
+ status = GRPC_ENDPOINT_CB_OK;
+ slice = &sub;
+ nslices = 1;
} else {
gpr_slice_unref(tcp->read_slice);
- success = 0;
+ status = GRPC_ENDPOINT_CB_EOF;
}
}
tcp->socket->read_info.outstanding = 0;
- return success;
-}
-
-static void on_read_cb(void *tcpp, int from_iocp) {
- grpc_tcp *tcp = tcpp;
- grpc_iomgr_closure *cb = tcp->read_cb;
- int success = on_read(tcp, from_iocp);
- tcp->read_cb = NULL;
- TCP_UNREF(tcp, "read");
- if (cb) {
- cb->cb(cb->cb_arg, success);
- }
+ tcp_unref(tcp);
+ cb(opaque, slice, nslices, status);
}
-static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
- gpr_slice_buffer *read_slices,
- grpc_iomgr_closure *cb) {
+static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
+ void *arg) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info;
@@ -211,15 +184,13 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
GPR_ASSERT(!tcp->socket->read_info.outstanding);
if (tcp->shutting_down) {
- return GRPC_ENDPOINT_ERROR;
+ cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
+ return;
}
-
- TCP_REF(tcp, "read");
-
+ tcp_ref(tcp);
tcp->socket->read_info.outstanding = 1;
tcp->read_cb = cb;
- tcp->read_slices = read_slices;
- gpr_slice_buffer_reset_and_unref(read_slices);
+ tcp->read_user_data = arg;
tcp->read_slice = gpr_slice_malloc(8192);
@@ -233,11 +204,10 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
- int ok;
info->bytes_transfered = bytes_read;
- ok = on_read(tcp, 1);
- TCP_UNREF(tcp, "read");
- return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ /* This might heavily recurse. */
+ on_read(tcp, 1);
+ return;
}
/* Otherwise, let's retry, by queuing a read. */
@@ -248,15 +218,13 @@ static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
- int ok;
info->wsa_error = wsa_error;
- ok = on_read(tcp, 1);
- return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ on_read(tcp, 1);
+ return;
}
}
- grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp);
- return GRPC_ENDPOINT_PENDING;
+ grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
}
/* Asynchronous callback from the IOCP, or the background thread. */
@@ -264,8 +232,9 @@ static void on_write(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
- grpc_iomgr_closure *cb;
- int success;
+ grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
+ grpc_endpoint_write_cb cb;
+ void *opaque = tcp->write_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
@@ -282,11 +251,10 @@ static void on_write(void *tcpp, int from_iocp) {
if (do_abort) {
if (from_iocp) {
tcp->socket->write_info.outstanding = 0;
+ gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
}
- TCP_UNREF(tcp, "write");
- if (cb) {
- cb->cb(cb->cb_arg, 0);
- }
+ tcp_unref(tcp);
+ if (cb) cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
return;
}
@@ -298,22 +266,23 @@ static void on_write(void *tcpp, int from_iocp) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message);
}
- success = 0;
+ status = GRPC_ENDPOINT_CB_ERROR;
} else {
- GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
- success = 1;
+ GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
}
+ gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0;
- TCP_UNREF(tcp, "write");
- cb->cb(cb->cb_arg, success);
+ tcp_unref(tcp);
+ cb(opaque, status);
}
/* Initiates a write. */
-static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
- gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
+ gpr_slice *slices, size_t nslices,
+ grpc_endpoint_write_cb cb,
+ void *arg) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info;
@@ -326,26 +295,28 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
GPR_ASSERT(!tcp->socket->write_info.outstanding);
if (tcp->shutting_down) {
- return GRPC_ENDPOINT_ERROR;
+ return GRPC_ENDPOINT_WRITE_ERROR;
}
- TCP_REF(tcp, "write");
+ tcp_ref(tcp);
tcp->socket->write_info.outstanding = 1;
tcp->write_cb = cb;
- tcp->write_slices = slices;
+ tcp->write_user_data = arg;
+
+ gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
- if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
- buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
+ if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
+ buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count);
allocated = buffers;
}
- for (i = 0; i < tcp->write_slices->count; i++) {
- buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]);
- buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]);
+ for (i = 0; i < tcp->write_slices.count; i++) {
+ buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]);
+ buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
}
/* First, let's try a synchronous, non-blocking write. */
- status = WSASend(socket->socket, buffers, tcp->write_slices->count,
+ status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError();
@@ -353,10 +324,10 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) {
- grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR;
+ grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR;
if (status == 0) {
- ret = GRPC_ENDPOINT_DONE;
- GPR_ASSERT(bytes_sent == tcp->write_slices->length);
+ ret = GRPC_ENDPOINT_WRITE_DONE;
+ GPR_ASSERT(bytes_sent == tcp->write_slices.length);
} else {
if (socket->read_info.wsa_error != WSAECONNRESET) {
char *utf8_message = gpr_format_message(info->wsa_error);
@@ -365,31 +336,33 @@ static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
}
}
if (allocated) gpr_free(allocated);
+ gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0;
- TCP_UNREF(tcp, "write");
+ tcp_unref(tcp);
return ret;
}
/* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
- status = WSASend(socket->socket, buffers, tcp->write_slices->count,
+ status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated);
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
+ gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0;
- TCP_UNREF(tcp, "write");
- return GRPC_ENDPOINT_ERROR;
+ tcp_unref(tcp);
+ return GRPC_ENDPOINT_WRITE_ERROR;
}
}
/* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */
grpc_socket_notify_on_write(socket, on_write, tcp);
- return GRPC_ENDPOINT_PENDING;
+ return GRPC_ENDPOINT_WRITE_PENDING;
}
static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) {
@@ -414,17 +387,19 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) {
concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
+ int extra_refs = 0;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
- grpc_winsocket_shutdown(tcp->socket);
+ extra_refs = grpc_winsocket_shutdown(tcp->socket);
+ while (extra_refs--) tcp_ref(tcp);
gpr_mu_unlock(&tcp->mu);
}
static void win_destroy(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- TCP_UNREF(tcp, "destroy");
+ tcp_unref(tcp);
}
static char *win_get_peer(grpc_endpoint *ep) {
@@ -433,8 +408,8 @@ static char *win_get_peer(grpc_endpoint *ep) {
}
static grpc_endpoint_vtable vtable = {
- win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
- win_shutdown, win_destroy, win_get_peer};
+ win_notify_on_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
+ win_shutdown, win_destroy, win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
@@ -442,6 +417,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
tcp->base.vtable = &vtable;
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
+ gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
return &tcp->base;
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index b696e384fc..81b3e33cb2 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -49,15 +49,15 @@ typedef struct {
struct tsi_frame_protector *protector;
gpr_mu protector_mu;
/* saved upper level callbacks and user_data. */
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
- grpc_iomgr_closure on_read;
- gpr_slice_buffer *read_buffer;
- gpr_slice_buffer source_buffer;
+ grpc_endpoint_read_cb read_cb;
+ void *read_user_data;
+ grpc_endpoint_write_cb write_cb;
+ void *write_user_data;
/* saved handshaker leftover data to unprotect. */
gpr_slice_buffer leftover_bytes;
/* buffers for read and write */
gpr_slice read_staging_buffer;
+ gpr_slice_buffer input_buffer;
gpr_slice write_staging_buffer;
gpr_slice_buffer output_buffer;
@@ -67,91 +67,62 @@ typedef struct {
int grpc_trace_secure_endpoint = 0;
+static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
+
static void destroy(secure_endpoint *secure_ep) {
secure_endpoint *ep = secure_ep;
grpc_endpoint_destroy(ep->wrapped_ep);
tsi_frame_protector_destroy(ep->protector);
gpr_slice_buffer_destroy(&ep->leftover_bytes);
gpr_slice_unref(ep->read_staging_buffer);
+ gpr_slice_buffer_destroy(&ep->input_buffer);
gpr_slice_unref(ep->write_staging_buffer);
gpr_slice_buffer_destroy(&ep->output_buffer);
- gpr_slice_buffer_destroy(&ep->source_buffer);
gpr_mu_destroy(&ep->protector_mu);
gpr_free(ep);
}
-/*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/
-#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
-#define SECURE_ENDPOINT_UNREF(ep, reason) \
- secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
-#define SECURE_ENDPOINT_REF(ep, reason) \
- secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
-static void secure_endpoint_unref(secure_endpoint *ep, const char *reason,
- const char *file, int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d",
- ep, reason, ep->ref.count, ep->ref.count - 1);
- if (gpr_unref(&ep->ref)) {
- destroy(ep);
- }
-}
-
-static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
- const char *file, int line) {
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP ref %p : %s %d -> %d",
- ep, reason, ep->ref.count, ep->ref.count + 1);
- gpr_ref(&ep->ref);
-}
-#else
-#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
-#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
static void secure_endpoint_unref(secure_endpoint *ep) {
if (gpr_unref(&ep->ref)) {
destroy(ep);
}
}
-static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
-#endif
-
static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
gpr_uint8 **end) {
- gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
+ gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer);
ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
*cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
*end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
}
-static void call_read_cb(secure_endpoint *ep, int success) {
+static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
if (grpc_trace_secure_endpoint) {
size_t i;
- for (i = 0; i < ep->read_buffer->count; i++) {
- char *data = gpr_dump_slice(ep->read_buffer->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < nslices; i++) {
+ char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p: %s", ep, data);
gpr_free(data);
}
}
- ep->read_buffer = NULL;
- ep->read_cb->cb(ep->read_cb->cb_arg, success);
- SECURE_ENDPOINT_UNREF(ep, "read");
+ ep->read_cb(ep->read_user_data, slices, nslices, error);
+ secure_endpoint_unref(ep);
}
-static int on_read(void *user_data, int success) {
+static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
unsigned i;
gpr_uint8 keep_looping = 0;
+ size_t input_buffer_count = 0;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)user_data;
gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
- if (!success) {
- gpr_slice_buffer_reset_and_unref(ep->read_buffer);
- return 0;
- }
-
/* TODO(yangg) check error, maybe bail out early */
- for (i = 0; i < ep->source_buffer.count; i++) {
- gpr_slice encrypted = ep->source_buffer.slices[i];
+ for (i = 0; i < nslices; i++) {
+ gpr_slice encrypted = slices[i];
gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted);
size_t message_size = GPR_SLICE_LENGTH(encrypted);
@@ -190,7 +161,7 @@ static int on_read(void *user_data, int success) {
if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) {
gpr_slice_buffer_add(
- ep->read_buffer,
+ &ep->input_buffer,
gpr_slice_split_head(
&ep->read_staging_buffer,
(size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))));
@@ -198,53 +169,38 @@ static int on_read(void *user_data, int success) {
/* TODO(yangg) experiment with moving this block after read_cb to see if it
helps latency */
- gpr_slice_buffer_reset_and_unref(&ep->source_buffer);
+ for (i = 0; i < nslices; i++) {
+ gpr_slice_unref(slices[i]);
+ }
if (result != TSI_OK) {
- gpr_slice_buffer_reset_and_unref(ep->read_buffer);
- return 0;
+ gpr_slice_buffer_reset_and_unref(&ep->input_buffer);
+ call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
+ return;
}
-
- return 1;
-}
-
-static void on_read_cb(void *user_data, int success) {
- call_read_cb(user_data, on_read(user_data, success));
+ /* The upper level will unref the slices. */
+ input_buffer_count = ep->input_buffer.count;
+ ep->input_buffer.count = 0;
+ call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error);
}
-static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
- gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
+ grpc_endpoint_read_cb cb, void *user_data) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- int immediate_read_success = -1;
ep->read_cb = cb;
- ep->read_buffer = slices;
- gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+ ep->read_user_data = user_data;
- if (ep->leftover_bytes.count) {
- gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
- GPR_ASSERT(ep->leftover_bytes.count == 0);
- return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
- }
+ secure_endpoint_ref(ep);
- SECURE_ENDPOINT_REF(ep, "read");
-
- switch (
- grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) {
- case GRPC_ENDPOINT_DONE:
- immediate_read_success = on_read(ep, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- return GRPC_ENDPOINT_PENDING;
- case GRPC_ENDPOINT_ERROR:
- immediate_read_success = on_read(ep, 0);
- break;
+ if (ep->leftover_bytes.count) {
+ size_t leftover_nslices = ep->leftover_bytes.count;
+ ep->leftover_bytes.count = 0;
+ on_read(ep, ep->leftover_bytes.slices, leftover_nslices,
+ GRPC_ENDPOINT_CB_OK);
+ return;
}
- GPR_ASSERT(immediate_read_success != -1);
- SECURE_ENDPOINT_UNREF(ep, "read");
-
- return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
+ grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep);
}
static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -255,28 +211,36 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
*end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
}
-static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
- gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+static void on_write(void *data, grpc_endpoint_cb_status error) {
+ secure_endpoint *ep = data;
+ ep->write_cb(ep->write_user_data, error);
+ secure_endpoint_unref(ep);
+}
+
+static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
+ gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_write_cb cb,
+ void *user_data) {
unsigned i;
+ size_t output_buffer_count = 0;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep;
gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer);
gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
-
- gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
+ grpc_endpoint_write_status status;
+ GPR_ASSERT(ep->output_buffer.count == 0);
if (grpc_trace_secure_endpoint) {
- for (i = 0; i < slices->count; i++) {
- char *data =
- gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < nslices; i++) {
+ char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data);
gpr_free(data);
}
}
- for (i = 0; i < slices->count; i++) {
- gpr_slice plain = slices->slices[i];
+ for (i = 0; i < nslices; i++) {
+ gpr_slice plain = slices[i];
gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain);
size_t message_size = GPR_SLICE_LENGTH(plain);
while (message_size > 0) {
@@ -326,13 +290,29 @@ static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
}
}
+ for (i = 0; i < nslices; i++) {
+ gpr_slice_unref(slices[i]);
+ }
+
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- return GRPC_ENDPOINT_ERROR;
+ return GRPC_ENDPOINT_WRITE_ERROR;
}
- return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
+ /* clear output_buffer and let the lower level handle its slices. */
+ output_buffer_count = ep->output_buffer.count;
+ ep->output_buffer.count = 0;
+ ep->write_cb = cb;
+ ep->write_user_data = user_data;
+ /* Need to keep the endpoint alive across a transport */
+ secure_endpoint_ref(ep);
+ status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices,
+ output_buffer_count, on_write, ep);
+ if (status != GRPC_ENDPOINT_WRITE_PENDING) {
+ secure_endpoint_unref(ep);
+ }
+ return status;
}
static void endpoint_shutdown(grpc_endpoint *secure_ep) {
@@ -340,9 +320,9 @@ static void endpoint_shutdown(grpc_endpoint *secure_ep) {
grpc_endpoint_shutdown(ep->wrapped_ep);
}
-static void endpoint_destroy(grpc_endpoint *secure_ep) {
+static void endpoint_unref(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- SECURE_ENDPOINT_UNREF(ep, "destroy");
+ secure_endpoint_unref(ep);
}
static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
@@ -363,9 +343,9 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
}
static const grpc_endpoint_vtable vtable = {
- endpoint_read, endpoint_write,
+ endpoint_notify_on_read, endpoint_write,
endpoint_add_to_pollset, endpoint_add_to_pollset_set,
- endpoint_shutdown, endpoint_destroy,
+ endpoint_shutdown, endpoint_unref,
endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create(
@@ -383,10 +363,8 @@ grpc_endpoint *grpc_secure_endpoint_create(
}
ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
+ gpr_slice_buffer_init(&ep->input_buffer);
gpr_slice_buffer_init(&ep->output_buffer);
- gpr_slice_buffer_init(&ep->source_buffer);
- ep->read_buffer = NULL;
- grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index bf0079577e..0c3572b53c 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -50,17 +50,16 @@ typedef struct {
grpc_endpoint *wrapped_endpoint;
grpc_endpoint *secure_endpoint;
gpr_slice_buffer left_overs;
- gpr_slice_buffer incoming;
- gpr_slice_buffer outgoing;
grpc_secure_transport_setup_done_cb cb;
void *user_data;
- grpc_iomgr_closure on_handshake_data_sent_to_peer;
- grpc_iomgr_closure on_handshake_data_received_from_peer;
} grpc_secure_transport_setup;
-static void on_handshake_data_received_from_peer(void *setup, int success);
+static void on_handshake_data_received_from_peer(void *setup, gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_cb_status error);
-static void on_handshake_data_sent_to_peer(void *setup, int success);
+static void on_handshake_data_sent_to_peer(void *setup,
+ grpc_endpoint_cb_status error);
static void secure_transport_setup_done(grpc_secure_transport_setup *s,
int is_success) {
@@ -79,8 +78,6 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s,
if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
gpr_slice_buffer_destroy(&s->left_overs);
- gpr_slice_buffer_destroy(&s->outgoing);
- gpr_slice_buffer_destroy(&s->incoming);
GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup");
gpr_free(s);
}
@@ -105,8 +102,6 @@ static void on_peer_checked(void *user_data, grpc_security_status status) {
s->secure_endpoint =
grpc_secure_endpoint_create(protector, s->wrapped_endpoint,
s->left_overs.slices, s->left_overs.count);
- s->left_overs.count = 0;
- s->left_overs.length = 0;
secure_transport_setup_done(s, 1);
return;
}
@@ -137,6 +132,7 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
size_t offset = 0;
tsi_result result = TSI_OK;
gpr_slice to_send;
+ grpc_endpoint_write_status write_status;
do {
size_t to_send_size = s->handshake_buffer_size - offset;
@@ -159,25 +155,28 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
to_send =
gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
- gpr_slice_buffer_reset_and_unref(&s->outgoing);
- gpr_slice_buffer_add(&s->outgoing, to_send);
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing,
- &s->on_handshake_data_sent_to_peer)) {
- case GRPC_ENDPOINT_ERROR:
- gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
- secure_transport_setup_done(s, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- on_handshake_data_sent_to_peer(s, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
+ write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1,
+ on_handshake_data_sent_to_peer, s);
+ if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
+ gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
+ secure_transport_setup_done(s, 0);
+ } else if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
+ on_handshake_data_sent_to_peer(s, GRPC_ENDPOINT_CB_OK);
+ }
+}
+
+static void cleanup_slices(gpr_slice *slices, size_t num_slices) {
+ size_t i;
+ for (i = 0; i < num_slices; i++) {
+ gpr_slice_unref(slices[i]);
}
}
-static void on_handshake_data_received_from_peer(void *setup, int success) {
+static void on_handshake_data_received_from_peer(
+ void *setup, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
grpc_secure_transport_setup *s = setup;
size_t consumed_slice_size = 0;
tsi_result result = TSI_OK;
@@ -185,37 +184,32 @@ static void on_handshake_data_received_from_peer(void *setup, int success) {
size_t num_left_overs;
int has_left_overs_in_current_slice = 0;
- if (!success) {
+ if (error != GRPC_ENDPOINT_CB_OK) {
gpr_log(GPR_ERROR, "Read failed.");
+ cleanup_slices(slices, nslices);
secure_transport_setup_done(s, 0);
return;
}
- for (i = 0; i < s->incoming.count; i++) {
- consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]);
+ for (i = 0; i < nslices; i++) {
+ consumed_slice_size = GPR_SLICE_LENGTH(slices[i]);
result = tsi_handshaker_process_bytes_from_peer(
- s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]),
- &consumed_slice_size);
+ s->handshaker, GPR_SLICE_START_PTR(slices[i]), &consumed_slice_size);
if (!tsi_handshaker_is_in_progress(s->handshaker)) break;
}
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* We may need more data. */
if (result == TSI_INCOMPLETE_DATA) {
- switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
- &s->on_handshake_data_received_from_peer)) {
- case GRPC_ENDPOINT_DONE:
- on_handshake_data_received_from_peer(s, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- on_handshake_data_received_from_peer(s, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ /* TODO(klempner,jboeuf): This should probably use the client setup
+ deadline */
+ grpc_endpoint_notify_on_read(s->wrapped_endpoint,
+ on_handshake_data_received_from_peer, setup);
+ cleanup_slices(slices, nslices);
return;
} else {
send_handshake_bytes_to_peer(s);
+ cleanup_slices(slices, nslices);
return;
}
}
@@ -223,40 +217,42 @@ static void on_handshake_data_received_from_peer(void *setup, int success) {
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshake failed with error %s",
tsi_result_to_string(result));
+ cleanup_slices(slices, nslices);
secure_transport_setup_done(s, 0);
return;
}
/* Handshake is done and successful this point. */
has_left_overs_in_current_slice =
- (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i]));
- num_left_overs =
- (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1;
+ (consumed_slice_size < GPR_SLICE_LENGTH(slices[i]));
+ num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + nslices - i - 1;
if (num_left_overs == 0) {
+ cleanup_slices(slices, nslices);
check_peer(s);
return;
}
+ cleanup_slices(slices, nslices - num_left_overs);
+
/* Put the leftovers in our buffer (ownership transfered). */
if (has_left_overs_in_current_slice) {
- gpr_slice_buffer_add(
- &s->left_overs,
- gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size));
- gpr_slice_unref(
- s->incoming.slices[i]); /* split_tail above increments refcount. */
+ gpr_slice_buffer_add(&s->left_overs,
+ gpr_slice_split_tail(&slices[i], consumed_slice_size));
+ gpr_slice_unref(slices[i]); /* split_tail above increments refcount. */
}
gpr_slice_buffer_addn(
- &s->left_overs, &s->incoming.slices[i + 1],
+ &s->left_overs, &slices[i + 1],
num_left_overs - (size_t)has_left_overs_in_current_slice);
check_peer(s);
}
/* If setup is NULL, the setup is done. */
-static void on_handshake_data_sent_to_peer(void *setup, int success) {
+static void on_handshake_data_sent_to_peer(void *setup,
+ grpc_endpoint_cb_status error) {
grpc_secure_transport_setup *s = setup;
/* Make sure that write is OK. */
- if (!success) {
- gpr_log(GPR_ERROR, "Write failed.");
+ if (error != GRPC_ENDPOINT_CB_OK) {
+ gpr_log(GPR_ERROR, "Write failed with error %d.", error);
if (setup != NULL) secure_transport_setup_done(s, 0);
return;
}
@@ -265,17 +261,8 @@ static void on_handshake_data_sent_to_peer(void *setup, int success) {
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
- &s->on_handshake_data_received_from_peer)) {
- case GRPC_ENDPOINT_ERROR:
- on_handshake_data_received_from_peer(s, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_DONE:
- on_handshake_data_received_from_peer(s, 1);
- break;
- }
+ grpc_endpoint_notify_on_read(s->wrapped_endpoint,
+ on_handshake_data_received_from_peer, setup);
} else {
check_peer(s);
}
@@ -301,12 +288,6 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
s->wrapped_endpoint = nonsecure_endpoint;
s->user_data = user_data;
s->cb = cb;
- grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer,
- on_handshake_data_sent_to_peer, s);
- grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer,
- on_handshake_data_received_from_peer, s);
gpr_slice_buffer_init(&s->left_overs);
- gpr_slice_buffer_init(&s->outgoing);
- gpr_slice_buffer_init(&s->incoming);
send_handshake_bytes_to_peer(s);
}
diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c
index 6482ef9c9f..987d5cb9b5 100644
--- a/src/core/support/slice_buffer.c
+++ b/src/core/support/slice_buffer.c
@@ -207,25 +207,3 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) {
src->count = 0;
src->length = 0;
}
-
-void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) {
- GPR_ASSERT(n <= sb->length);
- sb->length -= n;
- for (;;) {
- size_t idx = sb->count - 1;
- gpr_slice slice = sb->slices[idx];
- size_t slice_len = GPR_SLICE_LENGTH(slice);
- if (slice_len > n) {
- sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n);
- return;
- } else if (slice_len == n) {
- gpr_slice_unref(slice);
- sb->count = idx;
- return;
- } else {
- gpr_slice_unref(slice);
- n -= slice_len;
- sb->count = idx;
- }
- }
-}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index a1b773b1ca..42cf0ecd5b 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -214,8 +214,6 @@ typedef struct {
grpc_chttp2_hpack_compressor hpack_compressor;
/** is this a client? */
gpr_uint8 is_client;
- /** callback for when writing is done */
- grpc_iomgr_closure done_cb;
} grpc_chttp2_transport_writing;
struct grpc_chttp2_transport_parsing {
@@ -331,11 +329,8 @@ struct grpc_chttp2_transport {
/** closure to execute writing */
grpc_iomgr_closure writing_action;
- /** closure to finish reading from the endpoint */
- grpc_iomgr_closure recv_data;
-
- /** incoming read bytes */
- gpr_slice_buffer read_buffer;
+ /** closure to start reading from the endpoint */
+ grpc_iomgr_closure reading_action;
/** address to place a newly accepted stream - set and unset by
grpc_chttp2_parsing_accept_stream; used by init_stream to
@@ -468,7 +463,8 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
void grpc_chttp2_perform_writes(
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
-void grpc_chttp2_terminate_writing(void *transport_writing, int success);
+void grpc_chttp2_terminate_writing(
+ grpc_chttp2_transport_writing *transport_writing, int success);
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 2c8c48f47b..123061b3fc 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -37,6 +37,7 @@
#include <grpc/support/log.h>
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
+static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global,
@@ -164,15 +165,16 @@ void grpc_chttp2_perform_writes(
GPR_ASSERT(transport_writing->outbuf.count > 0);
GPR_ASSERT(endpoint);
- switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf,
- &transport_writing->done_cb)) {
- case GRPC_ENDPOINT_DONE:
+ switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices,
+ transport_writing->outbuf.count, finish_write_cb,
+ transport_writing)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
grpc_chttp2_terminate_writing(transport_writing, 1);
break;
- case GRPC_ENDPOINT_ERROR:
+ case GRPC_ENDPOINT_WRITE_ERROR:
grpc_chttp2_terminate_writing(transport_writing, 0);
break;
- case GRPC_ENDPOINT_PENDING:
+ case GRPC_ENDPOINT_WRITE_PENDING:
break;
}
}
@@ -207,6 +209,12 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
}
}
+static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
+ grpc_chttp2_transport_writing *transport_writing = tw;
+ grpc_chttp2_terminate_writing(transport_writing,
+ write_status == GRPC_ENDPOINT_CB_OK);
+}
+
void grpc_chttp2_cleanup_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
@@ -235,5 +243,6 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
- gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
+ transport_writing->outbuf.count = 0;
+ transport_writing->outbuf.length = 0;
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 8caa10c938..1bbd210e46 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -84,13 +84,15 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored);
+static void reading_action(void *t, int iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
/** Endpoint callback to process incoming data */
-static void recv_data(void *tp, int success);
+static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error);
/** Start disconnection chain */
static void drop_connection(grpc_chttp2_transport *t);
@@ -141,7 +143,6 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
gpr_slice_buffer_destroy(&t->parsing.qbuf);
- gpr_slice_buffer_destroy(&t->read_buffer);
grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
@@ -248,16 +249,12 @@ static void init_transport(grpc_chttp2_transport *t,
gpr_slice_buffer_init(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
+ grpc_iomgr_closure_init(&t->reading_action, reading_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
- grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
- &t->writing);
- grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
- gpr_slice_buffer_init(&t->read_buffer);
-
if (is_client) {
gpr_slice_buffer_add(
&t->global.qbuf,
@@ -505,8 +502,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
-void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
- grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
+void grpc_chttp2_terminate_writing(
+ grpc_chttp2_transport_writing *transport_writing, int success) {
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t);
@@ -1063,76 +1060,74 @@ static void read_error_locked(grpc_chttp2_transport *t) {
}
/* tcp read callback */
-static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
+static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
+ grpc_chttp2_transport *t = tp;
size_t i;
- int keep_reading = 0;
+ int unref = 0;
- lock(t);
- i = 0;
- GPR_ASSERT(!t->parsing_active);
- if (!t->closed) {
- t->parsing_active = 1;
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
- gpr_mu_unlock(&t->mu);
- for (; i < t->read_buffer.count &&
- grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
- i++)
- ;
- gpr_mu_lock(&t->mu);
- if (i != t->read_buffer.count) {
+ switch (error) {
+ case GRPC_ENDPOINT_CB_SHUTDOWN:
+ case GRPC_ENDPOINT_CB_EOF:
+ case GRPC_ENDPOINT_CB_ERROR:
+ lock(t);
drop_connection(t);
- }
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- t->global.concurrent_stream_count =
- grpc_chttp2_stream_map_size(&t->parsing_stream_map);
- if (t->parsing.initial_window_update != 0) {
- grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
- update_global_window, t);
- t->parsing.initial_window_update = 0;
- }
- /* handle higher level things */
- grpc_chttp2_publish_reads(&t->global, &t->parsing);
- t->parsing_active = 0;
- }
- if (!*success || i != t->read_buffer.count) {
- drop_connection(t);
- read_error_locked(t);
- } else {
- keep_reading = 1;
+ read_error_locked(t);
+ unlock(t);
+ unref = 1;
+ for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
+ break;
+ case GRPC_ENDPOINT_CB_OK:
+ lock(t);
+ i = 0;
+ GPR_ASSERT(!t->parsing_active);
+ if (!t->closed) {
+ t->parsing_active = 1;
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
+ gpr_mu_unlock(&t->mu);
+ for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
+ i++) {
+ gpr_slice_unref(slices[i]);
+ }
+ gpr_mu_lock(&t->mu);
+ if (i != nslices) {
+ drop_connection(t);
+ }
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ t->global.concurrent_stream_count =
+ grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+ if (t->parsing.initial_window_update != 0) {
+ grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
+ update_global_window, t);
+ t->parsing.initial_window_update = 0;
+ }
+ /* handle higher level things */
+ grpc_chttp2_publish_reads(&t->global, &t->parsing);
+ t->parsing_active = 0;
+ }
+ if (i == nslices) {
+ grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
+ } else {
+ read_error_locked(t);
+ unref = 1;
+ }
+ unlock(t);
+ for (; i < nslices; i++) gpr_slice_unref(slices[i]);
+ break;
}
- gpr_slice_buffer_reset_and_unref(&t->read_buffer);
- unlock(t);
-
- if (keep_reading) {
- switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
- case GRPC_ENDPOINT_DONE:
- *success = 1;
- return 1;
- case GRPC_ENDPOINT_ERROR:
- *success = 0;
- return 1;
- case GRPC_ENDPOINT_PENDING:
- return 0;
- }
- } else {
+ if (unref) {
UNREF_TRANSPORT(t, "recv_data");
- return 0;
}
-
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
}
-static void recv_data(void *tp, int success) {
- grpc_chttp2_transport *t = tp;
-
- while (recv_data_loop(t, &success))
- ;
+static void reading_action(void *pt, int iomgr_success_ignored) {
+ grpc_chttp2_transport *t = pt;
+ grpc_endpoint_notify_on_read(t->ep, recv_data, t);
}
/*
@@ -1245,6 +1240,5 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport,
gpr_slice *slices, size_t nslices) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
- gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
- recv_data(t, 1);
+ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
}
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 1d98879662..24bf5d3625 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -59,7 +59,7 @@ static void thd_func(void *arg) {
gpr_event_set(&a->done_thd, (void *)1);
}
-static void done_write(void *arg, int success) {
+static void done_write(void *arg, grpc_endpoint_cb_status status) {
thd_args *a = arg;
gpr_event_set(&a->done_write, (void *)1);
}
@@ -85,8 +85,6 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
grpc_mdctx *mdctx = grpc_mdctx_create();
gpr_slice slice =
gpr_slice_from_copied_buffer(client_payload, client_payload_length);
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure done_write_closure;
hex = gpr_dump(client_payload, client_payload_length,
GPR_DUMP_HEX | GPR_DUMP_ASCII);
@@ -124,18 +122,14 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
/* Start validator */
gpr_thd_new(&id, thd_func, &a, NULL);
- gpr_slice_buffer_init(&outgoing);
- gpr_slice_buffer_add(&outgoing, slice);
- grpc_iomgr_closure_init(&done_write_closure, done_write, &a);
-
/* Write data */
- switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) {
- case GRPC_ENDPOINT_DONE:
+ switch (grpc_endpoint_write(sfd.client, &slice, 1, done_write, &a)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
done_write(&a, 1);
break;
- case GRPC_ENDPOINT_PENDING:
+ case GRPC_ENDPOINT_WRITE_PENDING:
break;
- case GRPC_ENDPOINT_ERROR:
+ case GRPC_ENDPOINT_WRITE_ERROR:
done_write(&a, 0);
break;
}
@@ -161,7 +155,6 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(a.server);
grpc_completion_queue_destroy(a.cq);
- gpr_slice_buffer_destroy(&outgoing);
grpc_shutdown();
}
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index ef673747a1..6ef8e9ca3b 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -59,7 +59,8 @@
static grpc_pollset *g_pollset;
-size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
+size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
+ int *current_data) {
size_t num_bytes = 0;
size_t i;
size_t j;
@@ -71,6 +72,7 @@ size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
*current_data = (*current_data + 1) % 256;
}
num_bytes += GPR_SLICE_LENGTH(slices[i]);
+ gpr_slice_unref(slices[i]);
}
return num_bytes;
}
@@ -119,76 +121,86 @@ struct read_and_write_test_state {
int current_write_data;
int read_done;
int write_done;
- gpr_slice_buffer incoming;
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure done_read;
- grpc_iomgr_closure done_write;
};
-static void read_and_write_test_read_handler(void *data, int success) {
+static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_cb_status error) {
struct read_and_write_test_state *state = data;
+ GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
+ if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
+ gpr_log(GPR_INFO, "Read handler shutdown");
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+ state->read_done = 1;
+ grpc_pollset_kick(g_pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+ return;
+ }
- state->bytes_read += count_slices(
- state->incoming.slices, state->incoming.count, &state->current_read_data);
- if (state->bytes_read == state->target_bytes || !success) {
+ state->bytes_read +=
+ count_and_unref_slices(slices, nslices, &state->current_read_data);
+ if (state->bytes_read == state->target_bytes) {
gpr_log(GPR_INFO, "Read handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->read_done = 1 + success;
+ state->read_done = 1;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- } else if (success) {
- switch (grpc_endpoint_read(state->read_ep, &state->incoming,
- &state->done_read)) {
- case GRPC_ENDPOINT_ERROR:
- read_and_write_test_read_handler(data, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- read_and_write_test_read_handler(data, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ } else {
+ grpc_endpoint_notify_on_read(state->read_ep,
+ read_and_write_test_read_handler, data);
}
}
-static void read_and_write_test_write_handler(void *data, int success) {
+static void read_and_write_test_write_handler(void *data,
+ grpc_endpoint_cb_status error) {
struct read_and_write_test_state *state = data;
gpr_slice *slices = NULL;
size_t nslices;
- grpc_endpoint_op_status write_status;
-
- if (success) {
- for (;;) {
- /* Need to do inline writes until they don't succeed synchronously or we
- finish writing */
- state->bytes_written += state->current_write_size;
- if (state->target_bytes - state->bytes_written <
- state->current_write_size) {
- state->current_write_size = state->target_bytes - state->bytes_written;
- }
- if (state->current_write_size == 0) {
- break;
- }
-
- slices = allocate_blocks(state->current_write_size, 8192, &nslices,
- &state->current_write_data);
- gpr_slice_buffer_reset_and_unref(&state->outgoing);
- gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
- write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
- &state->done_write);
- gpr_log(GPR_DEBUG, "write_status=%d", write_status);
- GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
- free(slices);
- if (write_status == GRPC_ENDPOINT_PENDING) {
- return;
- }
+ grpc_endpoint_write_status write_status;
+
+ GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
+
+ gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler",
+ error);
+
+ if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
+ gpr_log(GPR_INFO, "Write handler shutdown");
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+ state->write_done = 1;
+ grpc_pollset_kick(g_pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+ return;
+ }
+
+ for (;;) {
+ /* Need to do inline writes until they don't succeed synchronously or we
+ finish writing */
+ state->bytes_written += state->current_write_size;
+ if (state->target_bytes - state->bytes_written <
+ state->current_write_size) {
+ state->current_write_size = state->target_bytes - state->bytes_written;
+ }
+ if (state->current_write_size == 0) {
+ break;
+ }
+
+ slices = allocate_blocks(state->current_write_size, 8192, &nslices,
+ &state->current_write_data);
+ write_status =
+ grpc_endpoint_write(state->write_ep, slices, nslices,
+ read_and_write_test_write_handler, state);
+ gpr_log(GPR_DEBUG, "write_status=%d", write_status);
+ GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR);
+ free(slices);
+ if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
+ return;
}
- GPR_ASSERT(state->bytes_written == state->target_bytes);
}
+ GPR_ASSERT(state->bytes_written == state->target_bytes);
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->write_done = 1 + success;
+ state->write_done = 1;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
@@ -222,31 +234,16 @@ static void read_and_write_test(grpc_endpoint_test_config config,
state.write_done = 0;
state.current_read_data = 0;
state.current_write_data = 0;
- grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
- &state);
- grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
- &state);
- gpr_slice_buffer_init(&state.outgoing);
- gpr_slice_buffer_init(&state.incoming);
/* Get started by pretending an initial write completed */
/* NOTE: Sets up initial conditions so we can have the same write handler
for the first iteration as for later iterations. It does the right thing
even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size;
- read_and_write_test_write_handler(&state, 1);
+ read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK);
- switch (
- grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_ERROR:
- read_and_write_test_read_handler(&state, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- read_and_write_test_read_handler(&state, 1);
- break;
- }
+ grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler,
+ &state);
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
@@ -266,8 +263,6 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
- gpr_slice_buffer_destroy(&state.outgoing);
- gpr_slice_buffer_destroy(&state.incoming);
end_test(config);
}
@@ -278,40 +273,36 @@ struct timeout_test_state {
typedef struct {
int done;
grpc_endpoint *ep;
- gpr_slice_buffer incoming;
- grpc_iomgr_closure done_read;
} shutdown_during_write_test_state;
-static void shutdown_during_write_test_read_handler(void *user_data,
- int success) {
+static void shutdown_during_write_test_read_handler(
+ void *user_data, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
+ size_t i;
shutdown_during_write_test_state *st = user_data;
- if (!success) {
+ for (i = 0; i < nslices; i++) {
+ gpr_slice_unref(slices[i]);
+ }
+
+ if (error != GRPC_ENDPOINT_CB_OK) {
grpc_endpoint_destroy(st->ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- st->done = 1;
+ st->done = error;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else {
- switch (grpc_endpoint_read(st->ep, &st->incoming, &st->done_read)) {
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_ERROR:
- shutdown_during_write_test_read_handler(user_data, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- shutdown_during_write_test_read_handler(user_data, 1);
- break;
- }
+ grpc_endpoint_notify_on_read(
+ st->ep, shutdown_during_write_test_read_handler, user_data);
}
}
-static void shutdown_during_write_test_write_handler(void *user_data,
- int success) {
+static void shutdown_during_write_test_write_handler(
+ void *user_data, grpc_endpoint_cb_status error) {
shutdown_during_write_test_state *st = user_data;
- gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: success = %d",
- success);
- if (success) {
+ gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d",
+ error);
+ if (error == 0) {
/* This happens about 0.5% of the time when run under TSAN, and is entirely
legitimate, but means we aren't testing the path we think we are. */
/* TODO(klempner): Change this test to retry the write in that case */
@@ -334,8 +325,6 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
shutdown_during_write_test_state read_st;
shutdown_during_write_test_state write_st;
gpr_slice *slices;
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure done_write;
grpc_endpoint_test_fixture f =
begin_test(config, "shutdown_during_write_test", slice_size);
@@ -346,26 +335,19 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
read_st.done = 0;
write_st.done = 0;
- grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
- &write_st);
- grpc_iomgr_closure_init(&read_st.done_read,
- shutdown_during_write_test_read_handler, &read_st);
- gpr_slice_buffer_init(&read_st.incoming);
- gpr_slice_buffer_init(&outgoing);
-
- GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
- &read_st.done_read) == GRPC_ENDPOINT_PENDING);
+ grpc_endpoint_notify_on_read(
+ read_st.ep, shutdown_during_write_test_read_handler, &read_st);
for (size = 1;; size *= 2) {
slices = allocate_blocks(size, 1, &nblocks, &current_data);
- gpr_slice_buffer_reset_and_unref(&outgoing);
- gpr_slice_buffer_addn(&outgoing, slices, nblocks);
- switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
- case GRPC_ENDPOINT_DONE:
+ switch (grpc_endpoint_write(write_st.ep, slices, nblocks,
+ shutdown_during_write_test_write_handler,
+ &write_st)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
break;
- case GRPC_ENDPOINT_ERROR:
+ case GRPC_ENDPOINT_WRITE_ERROR:
gpr_log(GPR_ERROR, "error writing");
abort();
- case GRPC_ENDPOINT_PENDING:
+ case GRPC_ENDPOINT_WRITE_PENDING:
grpc_endpoint_shutdown(write_st.ep);
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@@ -386,8 +368,6 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
gpr_free(slices);
- gpr_slice_buffer_destroy(&read_st.incoming);
- gpr_slice_buffer_destroy(&outgoing);
end_test(config);
return;
}
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 8acaa433bb..6ad832231f 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -118,12 +118,10 @@ struct read_socket_state {
grpc_endpoint *ep;
ssize_t read_bytes;
ssize_t target_read_bytes;
- gpr_slice_buffer incoming;
- grpc_iomgr_closure read_cb;
};
-static ssize_t count_slices(gpr_slice *slices, size_t nslices,
- int *current_data) {
+static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
+ int *current_data) {
ssize_t num_bytes = 0;
unsigned i, j;
unsigned char *buf;
@@ -134,41 +132,31 @@ static ssize_t count_slices(gpr_slice *slices, size_t nslices,
*current_data = (*current_data + 1) % 256;
}
num_bytes += GPR_SLICE_LENGTH(slices[i]);
+ gpr_slice_unref(slices[i]);
}
return num_bytes;
}
-static void read_cb(void *user_data, int success) {
+static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
struct read_socket_state *state = (struct read_socket_state *)user_data;
ssize_t read_bytes;
int current_data;
- GPR_ASSERT(success);
+ GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
current_data = state->read_bytes % 256;
- read_bytes = count_slices(state->incoming.slices, state->incoming.count,
- &current_data);
+ read_bytes = count_and_unref_slices(slices, nslices, &current_data);
state->read_bytes += read_bytes;
gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
state->target_read_bytes);
if (state->read_bytes >= state->target_read_bytes) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ /* empty */
} else {
- switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) {
- case GRPC_ENDPOINT_DONE:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- read_cb(user_data, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- read_cb(user_data, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- break;
- }
+ grpc_endpoint_notify_on_read(state->ep, read_cb, state);
}
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* Write to a socket, then read from it using the grpc_tcp API. */
@@ -193,19 +181,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- gpr_slice_buffer_init(&state.incoming);
- grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
- switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
- case GRPC_ENDPOINT_DONE:
- read_cb(&state, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- read_cb(&state, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_notify_on_read(ep, read_cb, &state);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
@@ -216,7 +193,6 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep);
}
@@ -243,19 +219,8 @@ static void large_read_test(ssize_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- gpr_slice_buffer_init(&state.incoming);
- grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
- switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
- case GRPC_ENDPOINT_DONE:
- read_cb(&state, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- read_cb(&state, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_notify_on_read(ep, read_cb, &state);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
@@ -266,7 +231,6 @@ static void large_read_test(ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep);
}
@@ -298,7 +262,8 @@ static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size,
return slices;
}
-static void write_done(void *user_data /* write_socket_state */, int success) {
+static void write_done(void *user_data /* write_socket_state */,
+ grpc_endpoint_cb_status error) {
struct write_socket_state *state = (struct write_socket_state *)user_data;
gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -374,8 +339,6 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
size_t num_blocks;
gpr_slice *slices;
int current_data = 0;
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure write_done_closure;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
@@ -392,21 +355,74 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
- gpr_slice_buffer_init(&outgoing);
- gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
- grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
+ if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) ==
+ GRPC_ENDPOINT_WRITE_DONE) {
+ /* Write completed immediately */
+ read_bytes = drain_socket(sv[0]);
+ GPR_ASSERT(read_bytes == num_bytes);
+ } else {
+ drain_socket_blocking(sv[0], num_bytes, num_bytes);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ for (;;) {
+ grpc_pollset_worker worker;
+ if (state.write_done) {
+ break;
+ }
+ grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ deadline);
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ }
+
+ grpc_endpoint_destroy(ep);
+ gpr_free(slices);
+}
+
+static void read_done_for_write_error(void *ud, gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_cb_status error) {
+ GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK);
+ GPR_ASSERT(nslices == 0);
+}
+
+/* Write to a socket using the grpc_tcp API, then drain it directly.
+ Note that if the write does not complete immediately we need to drain the
+ socket in parallel with the read. */
+static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
+ int sv[2];
+ grpc_endpoint *ep;
+ struct write_socket_state state;
+ size_t num_blocks;
+ gpr_slice *slices;
+ int current_data = 0;
+ grpc_pollset_worker worker;
+ gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
+
+ gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d",
+ num_bytes, slice_size);
+
+ create_sockets(sv);
- switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
- case GRPC_ENDPOINT_DONE:
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
+ grpc_endpoint_add_to_pollset(ep, &g_pollset);
+
+ close(sv[0]);
+
+ state.ep = ep;
+ state.write_done = 0;
+
+ slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
+
+ switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
+ case GRPC_ENDPOINT_WRITE_ERROR:
/* Write completed immediately */
- read_bytes = drain_socket(sv[0]);
- GPR_ASSERT(read_bytes == num_bytes);
break;
- case GRPC_ENDPOINT_PENDING:
- drain_socket_blocking(sv[0], num_bytes, num_bytes);
+ case GRPC_ENDPOINT_WRITE_PENDING:
+ grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
- grpc_pollset_worker worker;
if (state.write_done) {
break;
}
@@ -415,14 +431,10 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break;
- case GRPC_ENDPOINT_ERROR:
- gpr_log(GPR_ERROR, "endpoint got error");
- abort();
}
- gpr_slice_buffer_destroy(&outgoing);
grpc_endpoint_destroy(ep);
- gpr_free(slices);
+ free(slices);
}
void run_tests(void) {
@@ -442,6 +454,10 @@ void run_tests(void) {
write_test(100000, 137);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
+ write_error_test(40320, i);
+ }
+
+ for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
write_test(40320, i);
}
}
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index c76ddcd194..a8368fc842 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -135,26 +135,62 @@ static grpc_endpoint_test_config configs[] = {
secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up},
};
-static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
- grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
- gpr_slice_buffer incoming;
+static void verify_leftover(void *user_data, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
gpr_slice s =
gpr_slice_from_copied_string("hello world 12345678900987654321");
+
+ GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
+ GPR_ASSERT(nslices == 1);
+
+ GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
+ gpr_slice_unref(slices[0]);
+ gpr_slice_unref(s);
+ *(int *)user_data = 1;
+}
+
+static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
+ grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
+ int verified = 0;
gpr_log(GPR_INFO, "Start test left over");
- gpr_slice_buffer_init(&incoming);
- GPR_ASSERT(grpc_endpoint_read(f.client_ep, &incoming, NULL) ==
- GRPC_ENDPOINT_DONE);
- GPR_ASSERT(incoming.count == 1);
- GPR_ASSERT(0 == gpr_slice_cmp(s, incoming.slices[0]));
+ grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified);
+ GPR_ASSERT(verified == 1);
grpc_endpoint_shutdown(f.client_ep);
grpc_endpoint_shutdown(f.server_ep);
grpc_endpoint_destroy(f.client_ep);
grpc_endpoint_destroy(f.server_ep);
+ clean_up();
+}
+
+static void destroy_early(void *user_data, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
+ grpc_endpoint_test_fixture *f = user_data;
+ gpr_slice s =
+ gpr_slice_from_copied_string("hello world 12345678900987654321");
+
+ GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
+ GPR_ASSERT(nslices == 1);
+
+ grpc_endpoint_shutdown(f->client_ep);
+ grpc_endpoint_destroy(f->client_ep);
+
+ GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
+ gpr_slice_unref(slices[0]);
gpr_slice_unref(s);
- gpr_slice_buffer_destroy(&incoming);
+}
+/* test which destroys the ep before finishing reading */
+static void test_destroy_ep_early(grpc_endpoint_test_config config,
+ size_t slice_size) {
+ grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
+ gpr_log(GPR_INFO, "Start test destroy early");
+
+ grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f);
+
+ grpc_endpoint_shutdown(f.server_ep);
+ grpc_endpoint_destroy(f.server_ep);
clean_up();
}
@@ -167,6 +203,7 @@ int main(int argc, char **argv) {
grpc_pollset_init(&g_pollset);
grpc_endpoint_tests(configs[0], &g_pollset);
test_leftover(configs[1], 1);
+ test_destroy_ep_early(configs[1], 1);
grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_iomgr_shutdown();
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index 4781d334e2..836e62a541 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -198,13 +198,14 @@ int grpc_pick_unused_port(void) {
races with other processes on kernels that want to reuse the same
port numbers over and over. */
- /* In alternating iterations we trial UDP ports before TCP ports UDP
+ /* In alternating iterations we try UDP ports before TCP ports UDP
ports -- it could be the case that this machine has been using up
UDP ports and they are scarcer. */
/* Type of port to first pick in next iteration */
int is_tcp = 1;
- int trial = 0;
+ int try
+ = 0;
char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
if (env) {
@@ -217,10 +218,11 @@ int grpc_pick_unused_port(void) {
for (;;) {
int port;
- trial++;
- if (trial == 1) {
+ try
+ ++;
+ if (try == 1) {
port = getpid() % (65536 - 30000) + 30000;
- } else if (trial <= NUM_RANDOM_PORTS_TO_PICK) {
+ } else if (try <= NUM_RANDOM_PORTS_TO_PICK) {
port = rand() % (65536 - 30000) + 30000;
} else {
port = 0;
@@ -237,7 +239,7 @@ int grpc_pick_unused_port(void) {
GPR_ASSERT(port > 0);
/* Check that the port # is free for the other type of socket also */
if (!is_port_available(&port, !is_tcp)) {
- /* In the next iteration trial to bind to the other type first
+ /* In the next iteration try to bind to the other type first
because perhaps it is more rare. */
is_tcp = !is_tcp;
continue;
diff --git a/test/core/util/port_windows.c b/test/core/util/port_windows.c
index 2f64626cf3..5b072f805a 100644
--- a/test/core/util/port_windows.c
+++ b/test/core/util/port_windows.c
@@ -35,6 +35,7 @@
#include "test/core/util/test_config.h"
#if defined(GPR_WINSOCK_SOCKET) && defined(GRPC_TEST_PICK_PORT)
+#include "src/core/iomgr/sockaddr_utils.h"
#include "test/core/util/port.h"
#include <process.h>
@@ -42,14 +43,8 @@
#include <errno.h>
#include <string.h>
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/support/env.h"
-#include "src/core/httpcli/httpcli.h"
-#include "src/core/iomgr/sockaddr_utils.h"
-
#define NUM_RANDOM_PORTS_TO_PICK 100
static int is_port_available(int *port, int is_tcp) {
@@ -104,67 +99,6 @@ static int is_port_available(int *port, int is_tcp) {
return 1;
}
-typedef struct portreq {
- grpc_pollset pollset;
- int port;
-} portreq;
-
-static void got_port_from_server(void *arg,
- const grpc_httpcli_response *response) {
- size_t i;
- int port = 0;
- portreq *pr = arg;
- GPR_ASSERT(response);
- GPR_ASSERT(response->status == 200);
- for (i = 0; i < response->body_length; i++) {
- GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9');
- port = port * 10 + response->body[i] - '0';
- }
- GPR_ASSERT(port > 1024);
- gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset));
- pr->port = port;
- grpc_pollset_kick(&pr->pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset));
-}
-
-static void destroy_pollset_and_shutdown(void *p) {
- grpc_pollset_destroy(p);
- grpc_shutdown();
-}
-
-static int pick_port_using_server(char *server) {
- grpc_httpcli_context context;
- grpc_httpcli_request req;
- portreq pr;
-
- grpc_init();
-
- memset(&pr, 0, sizeof(pr));
- memset(&req, 0, sizeof(req));
- grpc_pollset_init(&pr.pollset);
- pr.port = -1;
-
- req.host = server;
- req.path = "/get";
-
- grpc_httpcli_context_init(&context);
- grpc_httpcli_get(&context, &pr.pollset, &req,
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server,
- &pr);
- gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
- while (pr.port == -1) {
- grpc_pollset_worker worker;
- grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
-
- grpc_httpcli_context_destroy(&context);
- grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset);
-
- return pr.port;
-}
-
int grpc_pick_unused_port(void) {
/* We repeatedly pick a port and then see whether or not it is
available for use both as a TCP socket and a UDP socket. First, we
@@ -174,29 +108,22 @@ int grpc_pick_unused_port(void) {
races with other processes on kernels that want to reuse the same
port numbers over and over. */
- /* In alternating iterations we trial UDP ports before TCP ports UDP
+ /* In alternating iterations we try UDP ports before TCP ports UDP
ports -- it could be the case that this machine has been using up
UDP ports and they are scarcer. */
/* Type of port to first pick in next iteration */
int is_tcp = 1;
- int trial = 0;
-
- char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
- if (env) {
- int port = pick_port_using_server(env);
- gpr_free(env);
- if (port != 0) {
- return port;
- }
- }
+ int try
+ = 0;
for (;;) {
int port;
- trial++;
- if (trial == 1) {
+ try
+ ++;
+ if (try == 1) {
port = _getpid() % (65536 - 30000) + 30000;
- } else if (trial <= NUM_RANDOM_PORTS_TO_PICK) {
+ } else if (try <= NUM_RANDOM_PORTS_TO_PICK) {
port = rand() % (65536 - 30000) + 30000;
} else {
port = 0;
@@ -209,7 +136,7 @@ int grpc_pick_unused_port(void) {
GPR_ASSERT(port > 0);
/* Check that the port # is free for the other type of socket also */
if (!is_port_available(&port, !is_tcp)) {
- /* In the next iteration trial to bind to the other type first
+ /* In the next iteration try to bind to the other type first
because perhaps it is more rare. */
is_tcp = !is_tcp;
continue;