aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc/support/slice_buffer.h2
-rw-r--r--src/core/httpcli/httpcli.c95
-rw-r--r--src/core/iomgr/endpoint.c17
-rw-r--r--src/core/iomgr/endpoint.h51
-rw-r--r--src/core/iomgr/tcp_posix.c520
-rw-r--r--src/core/security/secure_endpoint.c188
-rw-r--r--src/core/security/secure_transport_setup.c119
-rw-r--r--src/core/support/slice_buffer.c22
-rw-r--r--src/core/transport/chttp2/internal.h10
-rw-r--r--src/core/transport/chttp2/writing.c21
-rw-r--r--src/core/transport/chttp2_transport.c125
-rw-r--r--test/core/bad_client/bad_client.c17
-rw-r--r--test/core/iomgr/endpoint_tests.c204
-rw-r--r--test/core/iomgr/tcp_posix_test.c147
-rw-r--r--test/core/security/secure_endpoint_test.c55
15 files changed, 734 insertions, 859 deletions
diff --git a/include/grpc/support/slice_buffer.h b/include/grpc/support/slice_buffer.h
index ec048e8c91..04db003ac5 100644
--- a/include/grpc/support/slice_buffer.h
+++ b/include/grpc/support/slice_buffer.h
@@ -86,6 +86,8 @@ void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb);
void gpr_slice_buffer_swap(gpr_slice_buffer *a, gpr_slice_buffer *b);
/* move all of the elements of src into dst */
void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst);
+/* remove n bytes from the end of a slice buffer */
+void gpr_slice_buffer_trim_end(gpr_slice_buffer *src, size_t n);
#ifdef __cplusplus
}
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 9012070e8e..1e38479eb1 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -61,6 +61,10 @@ typedef struct {
grpc_httpcli_context *context;
grpc_pollset *pollset;
grpc_iomgr_object iomgr_obj;
+ gpr_slice_buffer incoming;
+ gpr_slice_buffer outgoing;
+ grpc_iomgr_closure on_read;
+ grpc_iomgr_closure done_write;
} internal_request;
static grpc_httpcli_get_override g_get_override = NULL;
@@ -99,73 +103,70 @@ static void finish(internal_request *req, int success) {
gpr_slice_unref(req->request_text);
gpr_free(req->host);
grpc_iomgr_unregister_object(&req->iomgr_obj);
+ gpr_slice_buffer_destroy(&req->incoming);
+ gpr_slice_buffer_destroy(&req->outgoing);
gpr_free(req);
}
-static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status status) {
+static void on_read(void *user_data, int success);
+
+static void do_read(internal_request *req) {
+ switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) {
+ case GRPC_ENDPOINT_DONE:
+ on_read(req, 1);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ on_read(req, 0);
+ break;
+ }
+}
+
+static void on_read(void *user_data, int success) {
internal_request *req = user_data;
size_t i;
- for (i = 0; i < nslices; i++) {
- if (GPR_SLICE_LENGTH(slices[i])) {
+ for (i = 0; i < req->incoming.count; i++) {
+ if (GPR_SLICE_LENGTH(req->incoming.slices[i])) {
req->have_read_byte = 1;
- if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) {
+ if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) {
finish(req, 0);
- goto done;
+ return;
}
}
}
- switch (status) {
- case GRPC_ENDPOINT_CB_OK:
- grpc_endpoint_notify_on_read(req->ep, on_read, req);
- break;
- case GRPC_ENDPOINT_CB_EOF:
- case GRPC_ENDPOINT_CB_ERROR:
- case GRPC_ENDPOINT_CB_SHUTDOWN:
- if (!req->have_read_byte) {
- next_address(req);
- } else {
- finish(req, grpc_httpcli_parser_eof(&req->parser));
- }
- break;
- }
-
-done:
- for (i = 0; i < nslices; i++) {
- gpr_slice_unref(slices[i]);
+ if (success) {
+ do_read(req);
+ } else if (!req->have_read_byte) {
+ next_address(req);
+ } else {
+ finish(req, grpc_httpcli_parser_eof(&req->parser));
}
}
-static void on_written(internal_request *req) {
- grpc_endpoint_notify_on_read(req->ep, on_read, req);
-}
+static void on_written(internal_request *req) { do_read(req); }
-static void done_write(void *arg, grpc_endpoint_cb_status status) {
+static void done_write(void *arg, int success) {
internal_request *req = arg;
- switch (status) {
- case GRPC_ENDPOINT_CB_OK:
- on_written(req);
- break;
- case GRPC_ENDPOINT_CB_EOF:
- case GRPC_ENDPOINT_CB_SHUTDOWN:
- case GRPC_ENDPOINT_CB_ERROR:
- next_address(req);
- break;
+ if (success) {
+ on_written(req);
+ } else {
+ next_address(req);
}
}
static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text);
- switch (
- grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
- case GRPC_ENDPOINT_WRITE_DONE:
+ gpr_slice_buffer_add(&req->outgoing, req->request_text);
+ switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) {
+ case GRPC_ENDPOINT_DONE:
on_written(req);
break;
- case GRPC_ENDPOINT_WRITE_PENDING:
+ case GRPC_ENDPOINT_PENDING:
break;
- case GRPC_ENDPOINT_WRITE_ERROR:
+ case GRPC_ENDPOINT_ERROR:
finish(req, 0);
break;
}
@@ -237,6 +238,10 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
+ grpc_iomgr_closure_init(&req->on_read, on_read, req);
+ grpc_iomgr_closure_init(&req->done_write, done_write, req);
+ gpr_slice_buffer_init(&req->incoming);
+ gpr_slice_buffer_init(&req->outgoing);
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name);
@@ -270,7 +275,11 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
- gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
+ grpc_iomgr_closure_init(&req->on_read, on_read, req);
+ grpc_iomgr_closure_init(&req->done_write, done_write, req);
+ gpr_slice_buffer_init(&req->incoming);
+ gpr_slice_buffer_init(&req->outgoing);
+ gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name);
req->host = gpr_strdup(request->host);
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 8ee14bce9b..a7878e31dd 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -33,17 +33,16 @@
#include "src/core/iomgr/endpoint.h"
-void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data) {
- ep->vtable->notify_on_read(ep, cb, user_data);
+grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
+ return ep->vtable->read(ep, slices, cb);
}
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
- return ep->vtable->write(ep, slices, nslices, cb, user_data);
+grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
+ return ep->vtable->write(ep, slices, cb);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index ea92a500e8..38f1e46d67 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -37,6 +37,7 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_set.h"
#include <grpc/support/slice.h>
+#include <grpc/support/slice_buffer.h>
#include <grpc/support/time.h>
/* An endpoint caps a streaming channel between two communicating processes.
@@ -45,31 +46,17 @@
typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
-typedef enum grpc_endpoint_cb_status {
- GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */
- GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */
- GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */
- GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */
-} grpc_endpoint_cb_status;
-
-typedef enum grpc_endpoint_write_status {
- GRPC_ENDPOINT_WRITE_DONE, /* completed immediately, cb won't be called */
- GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */
- GRPC_ENDPOINT_WRITE_ERROR /* write errored out, cb won't be called */
-} grpc_endpoint_write_status;
-
-typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_cb_status error);
-typedef void (*grpc_endpoint_write_cb)(void *user_data,
- grpc_endpoint_cb_status error);
+typedef enum grpc_endpoint_op_status {
+ GRPC_ENDPOINT_DONE, /* completed immediately, cb won't be called */
+ GRPC_ENDPOINT_PENDING, /* cb will be called when completed */
+ GRPC_ENDPOINT_ERROR /* write errored out, cb won't be called */
+} grpc_endpoint_op_status;
struct grpc_endpoint_vtable {
- void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data);
- grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
- size_t nslices, grpc_endpoint_write_cb cb,
- void *user_data);
+ grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb);
+ grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
void (*shutdown)(grpc_endpoint *ep);
@@ -77,9 +64,13 @@ struct grpc_endpoint_vtable {
char *(*get_peer)(grpc_endpoint *ep);
};
-/* When data is available on the connection, calls the callback with slices. */
-void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data);
+/* When data is available on the connection, calls the callback with slices.
+ Callback success indicates that the endpoint can accept more reads, failure
+ indicates the endpoint is closed.
+ Valid slices may be placed into \a slices even on callback success == 0. */
+grpc_endpoint_op_status grpc_endpoint_read(
+ grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
@@ -89,11 +80,9 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
returns GRPC_ENDPOINT_WRITE_DONE.
Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
connection is ready for more data. */
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data);
+grpc_endpoint_op_status grpc_endpoint_write(
+ grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
/* Causes any pending read/write callbacks to run immediately with
GRPC_ENDPOINT_CB_SHUTDOWN status */
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 360e6ebd8c..36ba3a7606 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -61,209 +61,8 @@
#define SENDMSG_FLAGS 0
#endif
-/* Holds a slice array and associated state. */
-typedef struct grpc_tcp_slice_state {
- gpr_slice *slices; /* Array of slices */
- size_t nslices; /* Size of slices array. */
- ssize_t first_slice; /* First valid slice in array */
- ssize_t last_slice; /* Last valid slice in array */
- gpr_slice working_slice; /* pointer to original final slice */
- int working_slice_valid; /* True if there is a working slice */
- int memory_owned; /* True if slices array is owned */
-} grpc_tcp_slice_state;
-
int grpc_tcp_trace = 0;
-static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices,
- size_t nslices, size_t valid_slices) {
- state->slices = slices;
- state->nslices = nslices;
- if (valid_slices == 0) {
- state->first_slice = -1;
- } else {
- state->first_slice = 0;
- }
- state->last_slice = valid_slices - 1;
- state->working_slice_valid = 0;
- state->memory_owned = 0;
-}
-
-/* Returns true if there is still available data */
-static int slice_state_has_available(grpc_tcp_slice_state *state) {
- return state->first_slice != -1 && state->last_slice >= state->first_slice;
-}
-
-static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) {
- if (state->first_slice == -1) {
- return 0;
- } else {
- return state->last_slice - state->first_slice + 1;
- }
-}
-
-static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) {
- /* TODO(klempner): use realloc instead when first_slice is 0 */
- /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */
- gpr_slice *slices = state->slices;
- size_t original_size = slice_state_slices_allocated(state);
- size_t i;
- gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size);
-
- for (i = 0; i < original_size; ++i) {
- new_slices[i] = slices[i + state->first_slice];
- }
-
- state->slices = new_slices;
- state->last_slice = original_size - 1;
- if (original_size > 0) {
- state->first_slice = 0;
- } else {
- state->first_slice = -1;
- }
- state->nslices = new_size;
-
- if (state->memory_owned) {
- gpr_free(slices);
- }
- state->memory_owned = 1;
-}
-
-static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
- size_t prefix_bytes) {
- gpr_slice *current_slice = &state->slices[state->first_slice];
- size_t current_slice_size;
-
- while (slice_state_has_available(state)) {
- current_slice_size = GPR_SLICE_LENGTH(*current_slice);
- if (current_slice_size > prefix_bytes) {
- /* TODO(klempner): Get rid of the extra refcount created here by adding a
- native "trim the first N bytes" operation to splice */
- /* TODO(klempner): This really shouldn't be modifying the current slice
- unless we own the slices array. */
- gpr_slice tail;
- tail = gpr_slice_split_tail(current_slice, prefix_bytes);
- gpr_slice_unref(*current_slice);
- *current_slice = tail;
- return;
- } else {
- gpr_slice_unref(*current_slice);
- ++state->first_slice;
- ++current_slice;
- prefix_bytes -= current_slice_size;
- }
- }
-}
-
-static void slice_state_destroy(grpc_tcp_slice_state *state) {
- while (slice_state_has_available(state)) {
- gpr_slice_unref(state->slices[state->first_slice]);
- ++state->first_slice;
- }
-
- if (state->memory_owned) {
- gpr_free(state->slices);
- state->memory_owned = 0;
- }
-}
-
-void slice_state_transfer_ownership(grpc_tcp_slice_state *state,
- gpr_slice **slices, size_t *nslices) {
- *slices = state->slices + state->first_slice;
- *nslices = state->last_slice - state->first_slice + 1;
-
- state->first_slice = -1;
- state->last_slice = -1;
-}
-
-/* Fills iov with the first min(iov_size, available) slices, returns number
- filled */
-static size_t slice_state_to_iovec(grpc_tcp_slice_state *state,
- struct iovec *iov, size_t iov_size) {
- size_t nslices = state->last_slice - state->first_slice + 1;
- gpr_slice *slices = state->slices + state->first_slice;
- size_t i;
- if (nslices < iov_size) {
- iov_size = nslices;
- }
-
- for (i = 0; i < iov_size; ++i) {
- iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]);
- iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]);
- }
- return iov_size;
-}
-
-/* Makes n blocks available at the end of state, writes them into iov, and
- returns the number of bytes allocated */
-static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state,
- struct iovec *iov, size_t n,
- size_t slice_size) {
- size_t target_size;
- size_t i;
- size_t allocated_bytes;
- ssize_t allocated_slices = slice_state_slices_allocated(state);
-
- if (n - state->working_slice_valid >= state->nslices - state->last_slice) {
- /* Need to grow the slice array */
- target_size = state->nslices;
- do {
- target_size = target_size * 2;
- } while (target_size < allocated_slices + n - state->working_slice_valid);
- /* TODO(klempner): If this ever needs to support both prefix removal and
- append, we should be smarter about the growth logic here */
- slice_state_realloc(state, target_size);
- }
-
- i = 0;
- allocated_bytes = 0;
-
- if (state->working_slice_valid) {
- iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]);
- iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) -
- GPR_SLICE_LENGTH(state->slices[state->last_slice]);
- allocated_bytes += iov[0].iov_len;
- ++i;
- state->slices[state->last_slice] = state->working_slice;
- state->working_slice_valid = 0;
- }
-
- for (; i < n; ++i) {
- ++state->last_slice;
- state->slices[state->last_slice] = gpr_slice_malloc(slice_size);
- iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]);
- iov[i].iov_len = slice_size;
- allocated_bytes += slice_size;
- }
- if (state->first_slice == -1) {
- state->first_slice = 0;
- }
- return allocated_bytes;
-}
-
-/* Remove the last n bytes from state */
-/* TODO(klempner): Consider having this defer actual deletion until later */
-static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) {
- while (bytes > 0 && slice_state_has_available(state)) {
- if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) {
- state->working_slice = state->slices[state->last_slice];
- state->working_slice_valid = 1;
- /* TODO(klempner): Combine these into a single operation that doesn't need
- to refcount */
- gpr_slice_unref(gpr_slice_split_tail(
- &state->slices[state->last_slice],
- GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes));
- bytes = 0;
- } else {
- bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]);
- gpr_slice_unref(state->slices[state->last_slice]);
- --state->last_slice;
- if (state->last_slice == -1) {
- state->first_slice = -1;
- }
- }
- }
-}
-
typedef struct {
grpc_endpoint base;
grpc_fd *em_fd;
@@ -273,12 +72,15 @@ typedef struct {
size_t slice_size;
gpr_refcount refcount;
- grpc_endpoint_read_cb read_cb;
- void *read_user_data;
- grpc_endpoint_write_cb write_cb;
- void *write_user_data;
+ gpr_slice_buffer *incoming_buffer;
+ gpr_slice_buffer *outgoing_buffer;
+ /** slice within outgoing_buffer to write next */
+ size_t outgoing_slice_idx;
+ /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
+ size_t outgoing_byte_idx;
- grpc_tcp_slice_state write_state;
+ grpc_iomgr_closure *read_cb;
+ grpc_iomgr_closure *write_cb;
grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
@@ -288,65 +90,95 @@ typedef struct {
char *peer_string;
} grpc_tcp;
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
+static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
-static void grpc_tcp_shutdown(grpc_endpoint *ep) {
+static void tcp_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_fd_shutdown(tcp->em_fd);
}
-static void grpc_tcp_unref(grpc_tcp *tcp) {
- int refcount_zero = gpr_unref(&tcp->refcount);
- if (refcount_zero) {
- grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
- gpr_free(tcp->peer_string);
- gpr_free(tcp);
+static void tcp_free(grpc_tcp *tcp) {
+ grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
+ gpr_free(tcp->peer_string);
+ gpr_free(tcp);
+}
+
+#define GRPC_TCP_REFCOUNT_DEBUG
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count - 1);
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
+ }
+}
+
+static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count + 1);
+ gpr_ref(&tcp->refcount);
+}
+#ifdef GRPC_TCP_REFCOUNT_DEBUG
+#else
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_REF(tcp, reason) tcp_ref((tcp))
+static void tcp_unref(grpc_tcp *tcp) {
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
}
}
-static void grpc_tcp_destroy(grpc_endpoint *ep) {
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
+#endif
+
+static void tcp_destroy(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_tcp_unref(tcp);
+ TCP_UNREF(tcp, "destroy");
}
-static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status status) {
- grpc_endpoint_read_cb cb = tcp->read_cb;
+static void call_read_cb(grpc_tcp *tcp, int success) {
+ grpc_iomgr_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
- gpr_log(GPR_DEBUG, "read: status=%d", status);
- for (i = 0; i < nslices; i++) {
- char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "read: success=%d", success);
+ for (i = 0; i < tcp->incoming_buffer->count; i++) {
+ char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
gpr_free(dump);
}
}
tcp->read_cb = NULL;
- cb(tcp->read_user_data, slices, nslices, status);
+ tcp->incoming_buffer = NULL;
+ cb->cb(cb->cb_arg, success);
}
-#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
-static void grpc_tcp_continue_read(grpc_tcp *tcp) {
- gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
+static void tcp_continue_read(grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
- ssize_t allocated_bytes;
- struct grpc_tcp_slice_state read_state;
- gpr_slice *final_slices;
- size_t final_nslices;
+ size_t i;
GPR_ASSERT(!tcp->finished_edge);
+ GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
+ GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
- slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
- 0);
- allocated_bytes = slice_state_append_blocks_into_iovec(
- &read_state, iov, tcp->iov_size, tcp->slice_size);
+ while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
+ gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
+ gpr_slice_malloc(tcp->slice_size));
+ }
+ for (i = 0; i < tcp->incoming_buffer->count; i++) {
+ iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
+ iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
+ }
msg.msg_name = NULL;
msg.msg_namelen = 0;
@@ -362,87 +194,63 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) {
} while (read_bytes < 0 && errno == EINTR);
GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
- if (read_bytes < allocated_bytes) {
- /* TODO(klempner): Consider a second read first, in hopes of getting a
- * quick EAGAIN and saving a bunch of allocations. */
- slice_state_remove_last(&read_state, read_bytes < 0
- ? allocated_bytes
- : allocated_bytes - read_bytes);
- }
-
if (read_bytes < 0) {
- /* NB: After calling the user_cb a parallel call of the read handler may
+ /* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (tcp->iov_size > 1) {
tcp->iov_size /= 2;
}
- if (slice_state_has_available(&read_state)) {
- /* TODO(klempner): We should probably do the call into the application
- without all this junk on the stack */
- /* FIXME(klempner): Refcount properly */
- slice_state_transfer_ownership(&read_state, &final_slices,
- &final_nslices);
- tcp->finished_edge = 1;
- call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
- } else {
- /* We've consumed the edge, request a new one */
- slice_state_destroy(&read_state);
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
- }
+ /* We've consumed the edge, request a new one */
+ grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
/* TODO(klempner): Log interesting errors */
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
+ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ call_read_cb(tcp, 0);
+ TCP_UNREF(tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
- if (slice_state_has_available(&read_state)) {
- /* there were bytes already read: pass them up to the application */
- slice_state_transfer_ownership(&read_state, &final_slices,
- &final_nslices);
- call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
- } else {
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
- }
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
+ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ call_read_cb(tcp, 0);
+ TCP_UNREF(tcp, "read");
} else {
- if (tcp->iov_size < MAX_READ_IOVEC) {
+ GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
+ if ((size_t)read_bytes < tcp->incoming_buffer->length) {
+ gpr_slice_buffer_trim_end(tcp->incoming_buffer,
+ tcp->incoming_buffer->length - read_bytes);
+ } else if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
- GPR_ASSERT(slice_state_has_available(&read_state));
- slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices);
- call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
+ GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
+ call_read_cb(tcp, 1);
+ TCP_UNREF(tcp, "read");
}
GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
}
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
if (!success) {
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
- grpc_tcp_unref(tcp);
+ call_read_cb(tcp, 0);
+ TCP_UNREF(tcp, "read");
} else {
- grpc_tcp_continue_read(tcp);
+ tcp_continue_read(tcp);
}
}
-static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data) {
+static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
+ gpr_slice_buffer *incoming_buffer,
+ grpc_iomgr_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
- tcp->read_user_data = user_data;
- gpr_ref(&tcp->refcount);
+ tcp->incoming_buffer = incoming_buffer;
+ gpr_slice_buffer_reset_and_unref(incoming_buffer);
+ TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
@@ -450,18 +258,41 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->handle_read_closure.cb_arg = tcp;
grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1);
}
+ /* TODO(ctiller): immediate return */
+ return GRPC_ENDPOINT_PENDING;
}
#define MAX_WRITE_IOVEC 16
-static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
+static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
int iov_size;
ssize_t sent_length;
- grpc_tcp_slice_state *state = &tcp->write_state;
+ ssize_t sending_length;
+ ssize_t trailing;
+ ssize_t unwind_slice_idx;
+ ssize_t unwind_byte_idx;
for (;;) {
- iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC);
+ sending_length = 0;
+ unwind_slice_idx = tcp->outgoing_slice_idx;
+ unwind_byte_idx = tcp->outgoing_byte_idx;
+ for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
+ iov_size != MAX_WRITE_IOVEC;
+ iov_size++) {
+ iov[iov_size].iov_base =
+ GPR_SLICE_START_PTR(
+ tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
+ tcp->outgoing_byte_idx;
+ iov[iov_size].iov_len =
+ GPR_SLICE_LENGTH(
+ tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
+ tcp->outgoing_byte_idx;
+ sending_length += iov[iov_size].iov_len;
+ tcp->outgoing_slice_idx++;
+ tcp->outgoing_byte_idx = 0;
+ }
+ GPR_ASSERT(iov_size > 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
@@ -480,70 +311,75 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
if (sent_length < 0) {
if (errno == EAGAIN) {
- return GRPC_ENDPOINT_WRITE_PENDING;
+ tcp->outgoing_slice_idx = unwind_slice_idx;
+ tcp->outgoing_byte_idx = unwind_byte_idx;
+ return GRPC_ENDPOINT_PENDING;
} else {
/* TODO(klempner): Log some of these */
- slice_state_destroy(state);
- return GRPC_ENDPOINT_WRITE_ERROR;
+ return GRPC_ENDPOINT_ERROR;
}
}
- /* TODO(klempner): Probably better to batch this after we finish flushing */
- slice_state_remove_prefix(state, sent_length);
+ GPR_ASSERT(tcp->outgoing_byte_idx == 0);
+ trailing = sending_length - sent_length;
+ while (trailing > 0) {
+ ssize_t slice_length;
+
+ tcp->outgoing_slice_idx--;
+ slice_length = GPR_SLICE_LENGTH(
+ tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
+ if (slice_length > trailing) {
+ tcp->outgoing_byte_idx = slice_length - trailing;
+ break;
+ } else {
+ trailing -= slice_length;
+ }
+ }
- if (!slice_state_has_available(state)) {
- return GRPC_ENDPOINT_WRITE_DONE;
+ if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
+ return GRPC_ENDPOINT_DONE;
}
};
}
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
- grpc_endpoint_write_status write_status;
- grpc_endpoint_cb_status cb_status;
- grpc_endpoint_write_cb cb;
+ grpc_endpoint_op_status status;
+ grpc_iomgr_closure *cb;
if (!success) {
- slice_state_destroy(&tcp->write_state);
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
- grpc_tcp_unref(tcp);
+ cb->cb(cb->cb_arg, 0);
+ TCP_UNREF(tcp, "write");
return;
}
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
- write_status = grpc_tcp_flush(tcp);
- if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
+ status = tcp_flush(tcp);
+ if (status == GRPC_ENDPOINT_PENDING) {
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
} else {
- slice_state_destroy(&tcp->write_state);
- if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
- cb_status = GRPC_ENDPOINT_CB_OK;
- } else {
- cb_status = GRPC_ENDPOINT_CB_ERROR;
- }
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb(tcp->write_user_data, cb_status);
- grpc_tcp_unref(tcp);
+ cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
+ TCP_UNREF(tcp, "write");
}
GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
}
-static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
+static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
+ gpr_slice_buffer *buf,
+ grpc_iomgr_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_endpoint_write_status status;
+ grpc_endpoint_op_status status;
if (grpc_tcp_trace) {
size_t i;
- for (i = 0; i < nslices; i++) {
- char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < buf->count; i++) {
+ char *data =
+ gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
gpr_free(data);
}
@@ -551,15 +387,19 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
GPR_ASSERT(tcp->write_cb == NULL);
- slice_state_init(&tcp->write_state, slices, nslices, nslices);
- status = grpc_tcp_flush(tcp);
- if (status == GRPC_ENDPOINT_WRITE_PENDING) {
- /* TODO(klempner): Consider inlining rather than malloc for small nslices */
- slice_state_realloc(&tcp->write_state, nslices);
- gpr_ref(&tcp->refcount);
+ if (buf->length == 0) {
+ GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
+ return GRPC_ENDPOINT_DONE;
+ }
+ tcp->outgoing_buffer = buf;
+ tcp->outgoing_slice_idx = 0;
+ tcp->outgoing_byte_idx = 0;
+
+ status = tcp_flush(tcp);
+ if (status == GRPC_ENDPOINT_PENDING) {
+ TCP_REF(tcp, "write");
tcp->write_cb = cb;
- tcp->write_user_data = user_data;
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
}
@@ -567,27 +407,25 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
return status;
}
-static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_add_fd(pollset, tcp->em_fd);
}
-static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep,
- grpc_pollset_set *pollset_set) {
+static void tcp_add_to_pollset_set(grpc_endpoint *ep,
+ grpc_pollset_set *pollset_set) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
}
-static char *grpc_tcp_get_peer(grpc_endpoint *ep) {
+static char *tcp_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string);
}
static const grpc_endpoint_vtable vtable = {
- grpc_tcp_notify_on_read, grpc_tcp_write,
- grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set,
- grpc_tcp_shutdown, grpc_tcp_destroy,
- grpc_tcp_get_peer};
+ tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
+ tcp_shutdown, tcp_destroy, tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {
@@ -597,21 +435,19 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->fd = em_fd->fd;
tcp->read_cb = NULL;
tcp->write_cb = NULL;
- tcp->read_user_data = NULL;
- tcp->write_user_data = NULL;
+ tcp->incoming_buffer = NULL;
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = 1;
- slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
- tcp->read_closure.cb = grpc_tcp_handle_read;
+ tcp->read_closure.cb = tcp_handle_read;
tcp->read_closure.cb_arg = tcp;
- tcp->write_closure.cb = grpc_tcp_handle_write;
+ tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
- tcp->handle_read_closure.cb = grpc_tcp_handle_read;
+ tcp->handle_read_closure.cb = tcp_handle_read;
return &tcp->base;
}
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 81b3e33cb2..4206c39318 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -49,15 +49,15 @@ typedef struct {
struct tsi_frame_protector *protector;
gpr_mu protector_mu;
/* saved upper level callbacks and user_data. */
- grpc_endpoint_read_cb read_cb;
- void *read_user_data;
- grpc_endpoint_write_cb write_cb;
- void *write_user_data;
+ grpc_iomgr_closure *read_cb;
+ grpc_iomgr_closure *write_cb;
+ grpc_iomgr_closure on_read;
+ gpr_slice_buffer *read_buffer;
+ gpr_slice_buffer source_buffer;
/* saved handshaker leftover data to unprotect. */
gpr_slice_buffer leftover_bytes;
/* buffers for read and write */
gpr_slice read_staging_buffer;
- gpr_slice_buffer input_buffer;
gpr_slice write_staging_buffer;
gpr_slice_buffer output_buffer;
@@ -67,62 +67,91 @@ typedef struct {
int grpc_trace_secure_endpoint = 0;
-static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
-
static void destroy(secure_endpoint *secure_ep) {
secure_endpoint *ep = secure_ep;
grpc_endpoint_destroy(ep->wrapped_ep);
tsi_frame_protector_destroy(ep->protector);
gpr_slice_buffer_destroy(&ep->leftover_bytes);
gpr_slice_unref(ep->read_staging_buffer);
- gpr_slice_buffer_destroy(&ep->input_buffer);
gpr_slice_unref(ep->write_staging_buffer);
gpr_slice_buffer_destroy(&ep->output_buffer);
+ gpr_slice_buffer_destroy(&ep->source_buffer);
gpr_mu_destroy(&ep->protector_mu);
gpr_free(ep);
}
+#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
+#define SECURE_ENDPOINT_UNREF(ep, reason) \
+ secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
+#define SECURE_ENDPOINT_REF(ep, reason) \
+ secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
+static void secure_endpoint_unref(secure_endpoint *ep, const char *reason,
+ const char *file, int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d",
+ ep, reason, ep->ref.count, ep->ref.count - 1);
+ if (gpr_unref(&ep->ref)) {
+ destroy(ep);
+ }
+}
+
+static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
+ const char *file, int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP ref %p : %s %d -> %d",
+ ep, reason, ep->ref.count, ep->ref.count + 1);
+ gpr_ref(&ep->ref);
+}
+#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
+#else
+#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
+#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
static void secure_endpoint_unref(secure_endpoint *ep) {
if (gpr_unref(&ep->ref)) {
destroy(ep);
}
}
+static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
+#endif
+
static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
gpr_uint8 **end) {
- gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer);
+ gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
*cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
*end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
}
-static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static void call_read_cb(secure_endpoint *ep, int success) {
if (grpc_trace_secure_endpoint) {
size_t i;
- for (i = 0; i < nslices; i++) {
- char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < ep->read_buffer->count; i++) {
+ char *data = gpr_dump_slice(ep->read_buffer->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p: %s", ep, data);
gpr_free(data);
}
}
- ep->read_cb(ep->read_user_data, slices, nslices, error);
- secure_endpoint_unref(ep);
+ ep->read_buffer = NULL;
+ ep->read_cb->cb(ep->read_cb->cb_arg, success);
+ SECURE_ENDPOINT_UNREF(ep, "read");
}
-static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static int on_read(void *user_data, int success) {
unsigned i;
gpr_uint8 keep_looping = 0;
- size_t input_buffer_count = 0;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)user_data;
gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
+ if (!success) {
+ gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+ return 0;
+ }
+
/* TODO(yangg) check error, maybe bail out early */
- for (i = 0; i < nslices; i++) {
- gpr_slice encrypted = slices[i];
+ for (i = 0; i < ep->source_buffer.count; i++) {
+ gpr_slice encrypted = ep->source_buffer.slices[i];
gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted);
size_t message_size = GPR_SLICE_LENGTH(encrypted);
@@ -161,7 +190,7 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) {
gpr_slice_buffer_add(
- &ep->input_buffer,
+ ep->read_buffer,
gpr_slice_split_head(
&ep->read_staging_buffer,
(size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))));
@@ -169,38 +198,53 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
/* TODO(yangg) experiment with moving this block after read_cb to see if it
helps latency */
- for (i = 0; i < nslices; i++) {
- gpr_slice_unref(slices[i]);
- }
+ gpr_slice_buffer_reset_and_unref(&ep->source_buffer);
if (result != TSI_OK) {
- gpr_slice_buffer_reset_and_unref(&ep->input_buffer);
- call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
- return;
+ gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+ return 0;
}
- /* The upper level will unref the slices. */
- input_buffer_count = ep->input_buffer.count;
- ep->input_buffer.count = 0;
- call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error);
+
+ return 1;
+}
+
+static void on_read_cb(void *user_data, int success) {
+ call_read_cb(user_data, on_read(user_data, success));
}
-static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
- grpc_endpoint_read_cb cb, void *user_data) {
+static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
+ int immediate_read_success = -1;
ep->read_cb = cb;
- ep->read_user_data = user_data;
-
- secure_endpoint_ref(ep);
+ ep->read_buffer = slices;
+ gpr_slice_buffer_reset_and_unref(ep->read_buffer);
if (ep->leftover_bytes.count) {
- size_t leftover_nslices = ep->leftover_bytes.count;
- ep->leftover_bytes.count = 0;
- on_read(ep, ep->leftover_bytes.slices, leftover_nslices,
- GRPC_ENDPOINT_CB_OK);
- return;
+ gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
+ GPR_ASSERT(ep->leftover_bytes.count == 0);
+ return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
}
- grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep);
+ SECURE_ENDPOINT_REF(ep, "read");
+
+ switch (
+ grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) {
+ case GRPC_ENDPOINT_DONE:
+ immediate_read_success = on_read(ep, 1);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ return GRPC_ENDPOINT_PENDING;
+ case GRPC_ENDPOINT_ERROR:
+ immediate_read_success = on_read(ep, 0);
+ break;
+ }
+
+ GPR_ASSERT(immediate_read_success != -1);
+ SECURE_ENDPOINT_UNREF(ep, "read");
+
+ return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
}
static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -211,36 +255,28 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
*end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
}
-static void on_write(void *data, grpc_endpoint_cb_status error) {
- secure_endpoint *ep = data;
- ep->write_cb(ep->write_user_data, error);
- secure_endpoint_unref(ep);
-}
-
-static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
+static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
unsigned i;
- size_t output_buffer_count = 0;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep;
gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer);
gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
- grpc_endpoint_write_status status;
- GPR_ASSERT(ep->output_buffer.count == 0);
+
+ gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
if (grpc_trace_secure_endpoint) {
- for (i = 0; i < nslices; i++) {
- char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < slices->count; i++) {
+ char *data =
+ gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data);
gpr_free(data);
}
}
- for (i = 0; i < nslices; i++) {
- gpr_slice plain = slices[i];
+ for (i = 0; i < slices->count; i++) {
+ gpr_slice plain = slices->slices[i];
gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain);
size_t message_size = GPR_SLICE_LENGTH(plain);
while (message_size > 0) {
@@ -290,29 +326,13 @@ static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
}
}
- for (i = 0; i < nslices; i++) {
- gpr_slice_unref(slices[i]);
- }
-
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- return GRPC_ENDPOINT_WRITE_ERROR;
+ return GRPC_ENDPOINT_ERROR;
}
- /* clear output_buffer and let the lower level handle its slices. */
- output_buffer_count = ep->output_buffer.count;
- ep->output_buffer.count = 0;
- ep->write_cb = cb;
- ep->write_user_data = user_data;
- /* Need to keep the endpoint alive across a transport */
- secure_endpoint_ref(ep);
- status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices,
- output_buffer_count, on_write, ep);
- if (status != GRPC_ENDPOINT_WRITE_PENDING) {
- secure_endpoint_unref(ep);
- }
- return status;
+ return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
}
static void endpoint_shutdown(grpc_endpoint *secure_ep) {
@@ -320,9 +340,9 @@ static void endpoint_shutdown(grpc_endpoint *secure_ep) {
grpc_endpoint_shutdown(ep->wrapped_ep);
}
-static void endpoint_unref(grpc_endpoint *secure_ep) {
+static void endpoint_destroy(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- secure_endpoint_unref(ep);
+ SECURE_ENDPOINT_UNREF(ep, "destroy");
}
static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
@@ -343,9 +363,9 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
}
static const grpc_endpoint_vtable vtable = {
- endpoint_notify_on_read, endpoint_write,
+ endpoint_read, endpoint_write,
endpoint_add_to_pollset, endpoint_add_to_pollset_set,
- endpoint_shutdown, endpoint_unref,
+ endpoint_shutdown, endpoint_destroy,
endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create(
@@ -363,8 +383,10 @@ grpc_endpoint *grpc_secure_endpoint_create(
}
ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
- gpr_slice_buffer_init(&ep->input_buffer);
gpr_slice_buffer_init(&ep->output_buffer);
+ gpr_slice_buffer_init(&ep->source_buffer);
+ ep->read_buffer = NULL;
+ grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 0c3572b53c..bf0079577e 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -50,16 +50,17 @@ typedef struct {
grpc_endpoint *wrapped_endpoint;
grpc_endpoint *secure_endpoint;
gpr_slice_buffer left_overs;
+ gpr_slice_buffer incoming;
+ gpr_slice_buffer outgoing;
grpc_secure_transport_setup_done_cb cb;
void *user_data;
+ grpc_iomgr_closure on_handshake_data_sent_to_peer;
+ grpc_iomgr_closure on_handshake_data_received_from_peer;
} grpc_secure_transport_setup;
-static void on_handshake_data_received_from_peer(void *setup, gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_cb_status error);
+static void on_handshake_data_received_from_peer(void *setup, int success);
-static void on_handshake_data_sent_to_peer(void *setup,
- grpc_endpoint_cb_status error);
+static void on_handshake_data_sent_to_peer(void *setup, int success);
static void secure_transport_setup_done(grpc_secure_transport_setup *s,
int is_success) {
@@ -78,6 +79,8 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s,
if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
gpr_slice_buffer_destroy(&s->left_overs);
+ gpr_slice_buffer_destroy(&s->outgoing);
+ gpr_slice_buffer_destroy(&s->incoming);
GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup");
gpr_free(s);
}
@@ -102,6 +105,8 @@ static void on_peer_checked(void *user_data, grpc_security_status status) {
s->secure_endpoint =
grpc_secure_endpoint_create(protector, s->wrapped_endpoint,
s->left_overs.slices, s->left_overs.count);
+ s->left_overs.count = 0;
+ s->left_overs.length = 0;
secure_transport_setup_done(s, 1);
return;
}
@@ -132,7 +137,6 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
size_t offset = 0;
tsi_result result = TSI_OK;
gpr_slice to_send;
- grpc_endpoint_write_status write_status;
do {
size_t to_send_size = s->handshake_buffer_size - offset;
@@ -155,28 +159,25 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
to_send =
gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
+ gpr_slice_buffer_reset_and_unref(&s->outgoing);
+ gpr_slice_buffer_add(&s->outgoing, to_send);
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1,
- on_handshake_data_sent_to_peer, s);
- if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
- gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
- secure_transport_setup_done(s, 0);
- } else if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
- on_handshake_data_sent_to_peer(s, GRPC_ENDPOINT_CB_OK);
- }
-}
-
-static void cleanup_slices(gpr_slice *slices, size_t num_slices) {
- size_t i;
- for (i = 0; i < num_slices; i++) {
- gpr_slice_unref(slices[i]);
+ switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing,
+ &s->on_handshake_data_sent_to_peer)) {
+ case GRPC_ENDPOINT_ERROR:
+ gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
+ secure_transport_setup_done(s, 0);
+ break;
+ case GRPC_ENDPOINT_DONE:
+ on_handshake_data_sent_to_peer(s, 1);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
}
}
-static void on_handshake_data_received_from_peer(
- void *setup, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static void on_handshake_data_received_from_peer(void *setup, int success) {
grpc_secure_transport_setup *s = setup;
size_t consumed_slice_size = 0;
tsi_result result = TSI_OK;
@@ -184,32 +185,37 @@ static void on_handshake_data_received_from_peer(
size_t num_left_overs;
int has_left_overs_in_current_slice = 0;
- if (error != GRPC_ENDPOINT_CB_OK) {
+ if (!success) {
gpr_log(GPR_ERROR, "Read failed.");
- cleanup_slices(slices, nslices);
secure_transport_setup_done(s, 0);
return;
}
- for (i = 0; i < nslices; i++) {
- consumed_slice_size = GPR_SLICE_LENGTH(slices[i]);
+ for (i = 0; i < s->incoming.count; i++) {
+ consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]);
result = tsi_handshaker_process_bytes_from_peer(
- s->handshaker, GPR_SLICE_START_PTR(slices[i]), &consumed_slice_size);
+ s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]),
+ &consumed_slice_size);
if (!tsi_handshaker_is_in_progress(s->handshaker)) break;
}
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* We may need more data. */
if (result == TSI_INCOMPLETE_DATA) {
- /* TODO(klempner,jboeuf): This should probably use the client setup
- deadline */
- grpc_endpoint_notify_on_read(s->wrapped_endpoint,
- on_handshake_data_received_from_peer, setup);
- cleanup_slices(slices, nslices);
+ switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
+ &s->on_handshake_data_received_from_peer)) {
+ case GRPC_ENDPOINT_DONE:
+ on_handshake_data_received_from_peer(s, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ on_handshake_data_received_from_peer(s, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
return;
} else {
send_handshake_bytes_to_peer(s);
- cleanup_slices(slices, nslices);
return;
}
}
@@ -217,42 +223,40 @@ static void on_handshake_data_received_from_peer(
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshake failed with error %s",
tsi_result_to_string(result));
- cleanup_slices(slices, nslices);
secure_transport_setup_done(s, 0);
return;
}
/* Handshake is done and successful this point. */
has_left_overs_in_current_slice =
- (consumed_slice_size < GPR_SLICE_LENGTH(slices[i]));
- num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + nslices - i - 1;
+ (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i]));
+ num_left_overs =
+ (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1;
if (num_left_overs == 0) {
- cleanup_slices(slices, nslices);
check_peer(s);
return;
}
- cleanup_slices(slices, nslices - num_left_overs);
-
/* Put the leftovers in our buffer (ownership transfered). */
if (has_left_overs_in_current_slice) {
- gpr_slice_buffer_add(&s->left_overs,
- gpr_slice_split_tail(&slices[i], consumed_slice_size));
- gpr_slice_unref(slices[i]); /* split_tail above increments refcount. */
+ gpr_slice_buffer_add(
+ &s->left_overs,
+ gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size));
+ gpr_slice_unref(
+ s->incoming.slices[i]); /* split_tail above increments refcount. */
}
gpr_slice_buffer_addn(
- &s->left_overs, &slices[i + 1],
+ &s->left_overs, &s->incoming.slices[i + 1],
num_left_overs - (size_t)has_left_overs_in_current_slice);
check_peer(s);
}
/* If setup is NULL, the setup is done. */
-static void on_handshake_data_sent_to_peer(void *setup,
- grpc_endpoint_cb_status error) {
+static void on_handshake_data_sent_to_peer(void *setup, int success) {
grpc_secure_transport_setup *s = setup;
/* Make sure that write is OK. */
- if (error != GRPC_ENDPOINT_CB_OK) {
- gpr_log(GPR_ERROR, "Write failed with error %d.", error);
+ if (!success) {
+ gpr_log(GPR_ERROR, "Write failed.");
if (setup != NULL) secure_transport_setup_done(s, 0);
return;
}
@@ -261,8 +265,17 @@ static void on_handshake_data_sent_to_peer(void *setup,
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- grpc_endpoint_notify_on_read(s->wrapped_endpoint,
- on_handshake_data_received_from_peer, setup);
+ switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
+ &s->on_handshake_data_received_from_peer)) {
+ case GRPC_ENDPOINT_ERROR:
+ on_handshake_data_received_from_peer(s, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ case GRPC_ENDPOINT_DONE:
+ on_handshake_data_received_from_peer(s, 1);
+ break;
+ }
} else {
check_peer(s);
}
@@ -288,6 +301,12 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
s->wrapped_endpoint = nonsecure_endpoint;
s->user_data = user_data;
s->cb = cb;
+ grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer,
+ on_handshake_data_sent_to_peer, s);
+ grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer,
+ on_handshake_data_received_from_peer, s);
gpr_slice_buffer_init(&s->left_overs);
+ gpr_slice_buffer_init(&s->outgoing);
+ gpr_slice_buffer_init(&s->incoming);
send_handshake_bytes_to_peer(s);
}
diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c
index 987d5cb9b5..6482ef9c9f 100644
--- a/src/core/support/slice_buffer.c
+++ b/src/core/support/slice_buffer.c
@@ -207,3 +207,25 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) {
src->count = 0;
src->length = 0;
}
+
+void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) {
+ GPR_ASSERT(n <= sb->length);
+ sb->length -= n;
+ for (;;) {
+ size_t idx = sb->count - 1;
+ gpr_slice slice = sb->slices[idx];
+ size_t slice_len = GPR_SLICE_LENGTH(slice);
+ if (slice_len > n) {
+ sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n);
+ return;
+ } else if (slice_len == n) {
+ gpr_slice_unref(slice);
+ sb->count = idx;
+ return;
+ } else {
+ gpr_slice_unref(slice);
+ n -= slice_len;
+ sb->count = idx;
+ }
+ }
+}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 42cf0ecd5b..0e7f94be38 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -214,6 +214,8 @@ typedef struct {
grpc_chttp2_hpack_compressor hpack_compressor;
/** is this a client? */
gpr_uint8 is_client;
+ /** callback for when writing is done */
+ grpc_iomgr_closure done_cb;
} grpc_chttp2_transport_writing;
struct grpc_chttp2_transport_parsing {
@@ -331,6 +333,11 @@ struct grpc_chttp2_transport {
grpc_iomgr_closure writing_action;
/** closure to start reading from the endpoint */
grpc_iomgr_closure reading_action;
+ /** closure to finish reading from the endpoint */
+ grpc_iomgr_closure recv_data;
+
+ /** incoming read bytes */
+ gpr_slice_buffer read_buffer;
/** address to place a newly accepted stream - set and unset by
grpc_chttp2_parsing_accept_stream; used by init_stream to
@@ -463,8 +470,7 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
void grpc_chttp2_perform_writes(
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
-void grpc_chttp2_terminate_writing(
- grpc_chttp2_transport_writing *transport_writing, int success);
+void grpc_chttp2_terminate_writing(void *transport_writing, int success);
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 123061b3fc..2c8c48f47b 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -37,7 +37,6 @@
#include <grpc/support/log.h>
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
-static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global,
@@ -165,16 +164,15 @@ void grpc_chttp2_perform_writes(
GPR_ASSERT(transport_writing->outbuf.count > 0);
GPR_ASSERT(endpoint);
- switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices,
- transport_writing->outbuf.count, finish_write_cb,
- transport_writing)) {
- case GRPC_ENDPOINT_WRITE_DONE:
+ switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf,
+ &transport_writing->done_cb)) {
+ case GRPC_ENDPOINT_DONE:
grpc_chttp2_terminate_writing(transport_writing, 1);
break;
- case GRPC_ENDPOINT_WRITE_ERROR:
+ case GRPC_ENDPOINT_ERROR:
grpc_chttp2_terminate_writing(transport_writing, 0);
break;
- case GRPC_ENDPOINT_WRITE_PENDING:
+ case GRPC_ENDPOINT_PENDING:
break;
}
}
@@ -209,12 +207,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
}
}
-static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
- grpc_chttp2_transport_writing *transport_writing = tw;
- grpc_chttp2_terminate_writing(transport_writing,
- write_status == GRPC_ENDPOINT_CB_OK);
-}
-
void grpc_chttp2_cleanup_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
@@ -243,6 +235,5 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
- transport_writing->outbuf.count = 0;
- transport_writing->outbuf.length = 0;
+ gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1bbd210e46..3d3d708e2f 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -91,8 +91,7 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
/** Endpoint callback to process incoming data */
-static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error);
+static void recv_data(void *tp, int success);
/** Start disconnection chain */
static void drop_connection(grpc_chttp2_transport *t);
@@ -143,6 +142,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
gpr_slice_buffer_destroy(&t->parsing.qbuf);
+ gpr_slice_buffer_destroy(&t->read_buffer);
grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
@@ -255,6 +255,11 @@ static void init_transport(grpc_chttp2_transport *t,
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
+ grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+ &t->writing);
+ grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
+ gpr_slice_buffer_init(&t->read_buffer);
+
if (is_client) {
gpr_slice_buffer_add(
&t->global.qbuf,
@@ -502,8 +507,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
-void grpc_chttp2_terminate_writing(
- grpc_chttp2_transport_writing *transport_writing, int success) {
+void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
+ grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t);
@@ -1060,66 +1065,56 @@ static void read_error_locked(grpc_chttp2_transport *t) {
}
/* tcp read callback */
-static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static void recv_data(void *tp, int success) {
grpc_chttp2_transport *t = tp;
size_t i;
int unref = 0;
- switch (error) {
- case GRPC_ENDPOINT_CB_SHUTDOWN:
- case GRPC_ENDPOINT_CB_EOF:
- case GRPC_ENDPOINT_CB_ERROR:
- lock(t);
+ lock(t);
+ i = 0;
+ GPR_ASSERT(!t->parsing_active);
+ if (!t->closed) {
+ t->parsing_active = 1;
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
+ gpr_mu_unlock(&t->mu);
+ for (; i < t->read_buffer.count &&
+ grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
+ i++)
+ ;
+ gpr_mu_lock(&t->mu);
+ if (i != t->read_buffer.count) {
drop_connection(t);
- read_error_locked(t);
- unlock(t);
- unref = 1;
- for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
- break;
- case GRPC_ENDPOINT_CB_OK:
- lock(t);
- i = 0;
- GPR_ASSERT(!t->parsing_active);
- if (!t->closed) {
- t->parsing_active = 1;
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
- gpr_mu_unlock(&t->mu);
- for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
- i++) {
- gpr_slice_unref(slices[i]);
- }
- gpr_mu_lock(&t->mu);
- if (i != nslices) {
- drop_connection(t);
- }
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- t->global.concurrent_stream_count =
- grpc_chttp2_stream_map_size(&t->parsing_stream_map);
- if (t->parsing.initial_window_update != 0) {
- grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
- update_global_window, t);
- t->parsing.initial_window_update = 0;
- }
- /* handle higher level things */
- grpc_chttp2_publish_reads(&t->global, &t->parsing);
- t->parsing_active = 0;
- }
- if (i == nslices) {
- grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
- } else {
- read_error_locked(t);
- unref = 1;
- }
- unlock(t);
- for (; i < nslices; i++) gpr_slice_unref(slices[i]);
- break;
+ }
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ t->global.concurrent_stream_count =
+ grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+ if (t->parsing.initial_window_update != 0) {
+ grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
+ update_global_window, t);
+ t->parsing.initial_window_update = 0;
+ }
+ /* handle higher level things */
+ grpc_chttp2_publish_reads(&t->global, &t->parsing);
+ t->parsing_active = 0;
+ }
+ if (!success) {
+ drop_connection(t);
+ read_error_locked(t);
+ unref = 1;
+ } else if (i == t->read_buffer.count) {
+ grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
+ } else {
+ read_error_locked(t);
+ unref = 1;
}
+ gpr_slice_buffer_reset_and_unref(&t->read_buffer);
+ unlock(t);
+
if (unref) {
UNREF_TRANSPORT(t, "recv_data");
}
@@ -1127,7 +1122,16 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
static void reading_action(void *pt, int iomgr_success_ignored) {
grpc_chttp2_transport *t = pt;
- grpc_endpoint_notify_on_read(t->ep, recv_data, t);
+ switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
+ case GRPC_ENDPOINT_DONE:
+ recv_data(t, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ recv_data(t, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
}
/*
@@ -1240,5 +1244,6 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport,
gpr_slice *slices, size_t nslices) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
- recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
+ gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
+ recv_data(t, 1);
}
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index 24bf5d3625..1d98879662 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -59,7 +59,7 @@ static void thd_func(void *arg) {
gpr_event_set(&a->done_thd, (void *)1);
}
-static void done_write(void *arg, grpc_endpoint_cb_status status) {
+static void done_write(void *arg, int success) {
thd_args *a = arg;
gpr_event_set(&a->done_write, (void *)1);
}
@@ -85,6 +85,8 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
grpc_mdctx *mdctx = grpc_mdctx_create();
gpr_slice slice =
gpr_slice_from_copied_buffer(client_payload, client_payload_length);
+ gpr_slice_buffer outgoing;
+ grpc_iomgr_closure done_write_closure;
hex = gpr_dump(client_payload, client_payload_length,
GPR_DUMP_HEX | GPR_DUMP_ASCII);
@@ -122,14 +124,18 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
/* Start validator */
gpr_thd_new(&id, thd_func, &a, NULL);
+ gpr_slice_buffer_init(&outgoing);
+ gpr_slice_buffer_add(&outgoing, slice);
+ grpc_iomgr_closure_init(&done_write_closure, done_write, &a);
+
/* Write data */
- switch (grpc_endpoint_write(sfd.client, &slice, 1, done_write, &a)) {
- case GRPC_ENDPOINT_WRITE_DONE:
+ switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) {
+ case GRPC_ENDPOINT_DONE:
done_write(&a, 1);
break;
- case GRPC_ENDPOINT_WRITE_PENDING:
+ case GRPC_ENDPOINT_PENDING:
break;
- case GRPC_ENDPOINT_WRITE_ERROR:
+ case GRPC_ENDPOINT_ERROR:
done_write(&a, 0);
break;
}
@@ -155,6 +161,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(a.server);
grpc_completion_queue_destroy(a.cq);
+ gpr_slice_buffer_destroy(&outgoing);
grpc_shutdown();
}
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 8186c96da1..b4f170cfbe 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -59,8 +59,7 @@
static grpc_pollset *g_pollset;
-size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
- int *current_data) {
+size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
size_t num_bytes = 0;
size_t i;
size_t j;
@@ -72,7 +71,6 @@ size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
*current_data = (*current_data + 1) % 256;
}
num_bytes += GPR_SLICE_LENGTH(slices[i]);
- gpr_slice_unref(slices[i]);
}
return num_bytes;
}
@@ -121,86 +119,76 @@ struct read_and_write_test_state {
int current_write_data;
int read_done;
int write_done;
+ gpr_slice_buffer incoming;
+ gpr_slice_buffer outgoing;
+ grpc_iomgr_closure done_read;
+ grpc_iomgr_closure done_write;
};
-static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_cb_status error) {
+static void read_and_write_test_read_handler(void *data, int success) {
struct read_and_write_test_state *state = data;
- GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
- if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
- gpr_log(GPR_INFO, "Read handler shutdown");
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->read_done = 1;
- grpc_pollset_kick(g_pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- return;
- }
- state->bytes_read +=
- count_and_unref_slices(slices, nslices, &state->current_read_data);
- if (state->bytes_read == state->target_bytes) {
+ state->bytes_read += count_slices(
+ state->incoming.slices, state->incoming.count, &state->current_read_data);
+ if (state->bytes_read == state->target_bytes || !success) {
gpr_log(GPR_INFO, "Read handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->read_done = 1;
+ state->read_done = 1 + success;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- } else {
- grpc_endpoint_notify_on_read(state->read_ep,
- read_and_write_test_read_handler, data);
+ } else if (success) {
+ switch (grpc_endpoint_read(state->read_ep, &state->incoming,
+ &state->done_read)) {
+ case GRPC_ENDPOINT_ERROR:
+ read_and_write_test_read_handler(data, 0);
+ break;
+ case GRPC_ENDPOINT_DONE:
+ read_and_write_test_read_handler(data, 1);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
}
}
-static void read_and_write_test_write_handler(void *data,
- grpc_endpoint_cb_status error) {
+static void read_and_write_test_write_handler(void *data, int success) {
struct read_and_write_test_state *state = data;
gpr_slice *slices = NULL;
size_t nslices;
- grpc_endpoint_write_status write_status;
-
- GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
-
- gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler",
- error);
-
- if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
- gpr_log(GPR_INFO, "Write handler shutdown");
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->write_done = 1;
- grpc_pollset_kick(g_pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- return;
- }
-
- for (;;) {
- /* Need to do inline writes until they don't succeed synchronously or we
- finish writing */
- state->bytes_written += state->current_write_size;
- if (state->target_bytes - state->bytes_written <
- state->current_write_size) {
- state->current_write_size = state->target_bytes - state->bytes_written;
- }
- if (state->current_write_size == 0) {
- break;
- }
-
- slices = allocate_blocks(state->current_write_size, 8192, &nslices,
- &state->current_write_data);
- write_status =
- grpc_endpoint_write(state->write_ep, slices, nslices,
- read_and_write_test_write_handler, state);
- gpr_log(GPR_DEBUG, "write_status=%d", write_status);
- GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR);
- free(slices);
- if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
- return;
+ grpc_endpoint_op_status write_status;
+
+ if (success) {
+ for (;;) {
+ /* Need to do inline writes until they don't succeed synchronously or we
+ finish writing */
+ state->bytes_written += state->current_write_size;
+ if (state->target_bytes - state->bytes_written <
+ state->current_write_size) {
+ state->current_write_size = state->target_bytes - state->bytes_written;
+ }
+ if (state->current_write_size == 0) {
+ break;
+ }
+
+ slices = allocate_blocks(state->current_write_size, 8192, &nslices,
+ &state->current_write_data);
+ gpr_slice_buffer_reset_and_unref(&state->outgoing);
+ gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
+ write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
+ &state->done_write);
+ gpr_log(GPR_DEBUG, "write_status=%d", write_status);
+ GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
+ free(slices);
+ if (write_status == GRPC_ENDPOINT_PENDING) {
+ return;
+ }
}
+ GPR_ASSERT(state->bytes_written == state->target_bytes);
}
- GPR_ASSERT(state->bytes_written == state->target_bytes);
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->write_done = 1;
+ state->write_done = 1 + success;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
@@ -234,16 +222,31 @@ static void read_and_write_test(grpc_endpoint_test_config config,
state.write_done = 0;
state.current_read_data = 0;
state.current_write_data = 0;
+ grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
+ &state);
+ grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
+ &state);
+ gpr_slice_buffer_init(&state.outgoing);
+ gpr_slice_buffer_init(&state.incoming);
/* Get started by pretending an initial write completed */
/* NOTE: Sets up initial conditions so we can have the same write handler
for the first iteration as for later iterations. It does the right thing
even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size;
- read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK);
+ read_and_write_test_write_handler(&state, 1);
- grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler,
- &state);
+ switch (
+ grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ read_and_write_test_read_handler(&state, 0);
+ break;
+ case GRPC_ENDPOINT_DONE:
+ read_and_write_test_read_handler(&state, 1);
+ break;
+ }
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
@@ -262,6 +265,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
+ gpr_slice_buffer_destroy(&state.outgoing);
+ gpr_slice_buffer_destroy(&state.incoming);
end_test(config);
}
@@ -272,36 +277,40 @@ struct timeout_test_state {
typedef struct {
int done;
grpc_endpoint *ep;
+ gpr_slice_buffer incoming;
+ grpc_iomgr_closure done_read;
} shutdown_during_write_test_state;
-static void shutdown_during_write_test_read_handler(
- void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
- size_t i;
+static void shutdown_during_write_test_read_handler(void *user_data,
+ int success) {
shutdown_during_write_test_state *st = user_data;
- for (i = 0; i < nslices; i++) {
- gpr_slice_unref(slices[i]);
- }
-
- if (error != GRPC_ENDPOINT_CB_OK) {
+ if (!success) {
grpc_endpoint_destroy(st->ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- st->done = error;
+ st->done = 1;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else {
- grpc_endpoint_notify_on_read(
- st->ep, shutdown_during_write_test_read_handler, user_data);
+ switch (grpc_endpoint_read(st->ep, &st->incoming, &st->done_read)) {
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ shutdown_during_write_test_read_handler(user_data, 0);
+ break;
+ case GRPC_ENDPOINT_DONE:
+ shutdown_during_write_test_read_handler(user_data, 1);
+ break;
+ }
}
}
-static void shutdown_during_write_test_write_handler(
- void *user_data, grpc_endpoint_cb_status error) {
+static void shutdown_during_write_test_write_handler(void *user_data,
+ int success) {
shutdown_during_write_test_state *st = user_data;
- gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d",
- error);
- if (error == 0) {
+ gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: success = %d",
+ success);
+ if (success) {
/* This happens about 0.5% of the time when run under TSAN, and is entirely
legitimate, but means we aren't testing the path we think we are. */
/* TODO(klempner): Change this test to retry the write in that case */
@@ -324,6 +333,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
shutdown_during_write_test_state read_st;
shutdown_during_write_test_state write_st;
gpr_slice *slices;
+ gpr_slice_buffer outgoing;
+ grpc_iomgr_closure done_write;
grpc_endpoint_test_fixture f =
begin_test(config, "shutdown_during_write_test", slice_size);
@@ -334,19 +345,26 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
read_st.done = 0;
write_st.done = 0;
- grpc_endpoint_notify_on_read(
- read_st.ep, shutdown_during_write_test_read_handler, &read_st);
+ grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
+ &write_st);
+ grpc_iomgr_closure_init(&read_st.done_read,
+ shutdown_during_write_test_read_handler, &read_st);
+ gpr_slice_buffer_init(&read_st.incoming);
+ gpr_slice_buffer_init(&outgoing);
+
+ GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
+ &read_st.done_read) == GRPC_ENDPOINT_PENDING);
for (size = 1;; size *= 2) {
slices = allocate_blocks(size, 1, &nblocks, &current_data);
- switch (grpc_endpoint_write(write_st.ep, slices, nblocks,
- shutdown_during_write_test_write_handler,
- &write_st)) {
- case GRPC_ENDPOINT_WRITE_DONE:
+ gpr_slice_buffer_reset_and_unref(&outgoing);
+ gpr_slice_buffer_addn(&outgoing, slices, nblocks);
+ switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
+ case GRPC_ENDPOINT_DONE:
break;
- case GRPC_ENDPOINT_WRITE_ERROR:
+ case GRPC_ENDPOINT_ERROR:
gpr_log(GPR_ERROR, "error writing");
abort();
- case GRPC_ENDPOINT_WRITE_PENDING:
+ case GRPC_ENDPOINT_PENDING:
grpc_endpoint_shutdown(write_st.ep);
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@@ -365,6 +383,8 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
gpr_free(slices);
+ gpr_slice_buffer_destroy(&read_st.incoming);
+ gpr_slice_buffer_destroy(&outgoing);
end_test(config);
return;
}
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 17a85ceaec..a2e3adcf29 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -118,10 +118,12 @@ struct read_socket_state {
grpc_endpoint *ep;
ssize_t read_bytes;
ssize_t target_read_bytes;
+ gpr_slice_buffer incoming;
+ grpc_iomgr_closure read_cb;
};
-static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
- int *current_data) {
+static ssize_t count_slices(gpr_slice *slices, size_t nslices,
+ int *current_data) {
ssize_t num_bytes = 0;
unsigned i, j;
unsigned char *buf;
@@ -132,31 +134,41 @@ static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
*current_data = (*current_data + 1) % 256;
}
num_bytes += GPR_SLICE_LENGTH(slices[i]);
- gpr_slice_unref(slices[i]);
}
return num_bytes;
}
-static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static void read_cb(void *user_data, int success) {
struct read_socket_state *state = (struct read_socket_state *)user_data;
ssize_t read_bytes;
int current_data;
- GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
+ GPR_ASSERT(success);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
current_data = state->read_bytes % 256;
- read_bytes = count_and_unref_slices(slices, nslices, &current_data);
+ read_bytes = count_slices(state->incoming.slices, state->incoming.count,
+ &current_data);
state->read_bytes += read_bytes;
gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
state->target_read_bytes);
if (state->read_bytes >= state->target_read_bytes) {
- /* empty */
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} else {
- grpc_endpoint_notify_on_read(state->ep, read_cb, state);
+ switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) {
+ case GRPC_ENDPOINT_DONE:
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ read_cb(user_data, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ read_cb(user_data, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ break;
+ }
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* Write to a socket, then read from it using the grpc_tcp API. */
@@ -181,8 +193,19 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
+ gpr_slice_buffer_init(&state.incoming);
+ grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
- grpc_endpoint_notify_on_read(ep, read_cb, &state);
+ switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
+ case GRPC_ENDPOINT_DONE:
+ read_cb(&state, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ read_cb(&state, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
@@ -192,6 +215,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep);
}
@@ -218,8 +242,19 @@ static void large_read_test(ssize_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
+ gpr_slice_buffer_init(&state.incoming);
+ grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
- grpc_endpoint_notify_on_read(ep, read_cb, &state);
+ switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
+ case GRPC_ENDPOINT_DONE:
+ read_cb(&state, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ read_cb(&state, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
@@ -229,6 +264,7 @@ static void large_read_test(ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep);
}
@@ -260,8 +296,7 @@ static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size,
return slices;
}
-static void write_done(void *user_data /* write_socket_state */,
- grpc_endpoint_cb_status error) {
+static void write_done(void *user_data /* write_socket_state */, int success) {
struct write_socket_state *state = (struct write_socket_state *)user_data;
gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -336,6 +371,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
size_t num_blocks;
gpr_slice *slices;
int current_data = 0;
+ gpr_slice_buffer outgoing;
+ grpc_iomgr_closure write_done_closure;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
@@ -352,73 +389,21 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
- if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) ==
- GRPC_ENDPOINT_WRITE_DONE) {
- /* Write completed immediately */
- read_bytes = drain_socket(sv[0]);
- GPR_ASSERT(read_bytes == num_bytes);
- } else {
- drain_socket_blocking(sv[0], num_bytes, num_bytes);
- gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
- for (;;) {
- grpc_pollset_worker worker;
- if (state.write_done) {
- break;
- }
- grpc_pollset_work(&g_pollset, &worker, deadline);
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- }
-
- grpc_endpoint_destroy(ep);
- gpr_free(slices);
-}
-
-static void read_done_for_write_error(void *ud, gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_cb_status error) {
- GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK);
- GPR_ASSERT(nslices == 0);
-}
-
-/* Write to a socket using the grpc_tcp API, then drain it directly.
- Note that if the write does not complete immediately we need to drain the
- socket in parallel with the read. */
-static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
- int sv[2];
- grpc_endpoint *ep;
- struct write_socket_state state;
- size_t num_blocks;
- gpr_slice *slices;
- int current_data = 0;
- grpc_pollset_worker worker;
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
-
- gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d",
- num_bytes, slice_size);
-
- create_sockets(sv);
+ gpr_slice_buffer_init(&outgoing);
+ gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
+ grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
- grpc_endpoint_add_to_pollset(ep, &g_pollset);
-
- close(sv[0]);
-
- state.ep = ep;
- state.write_done = 0;
-
- slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
-
- switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) {
- case GRPC_ENDPOINT_WRITE_DONE:
- case GRPC_ENDPOINT_WRITE_ERROR:
+ switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
+ case GRPC_ENDPOINT_DONE:
/* Write completed immediately */
+ read_bytes = drain_socket(sv[0]);
+ GPR_ASSERT(read_bytes == num_bytes);
break;
- case GRPC_ENDPOINT_WRITE_PENDING:
- grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL);
+ case GRPC_ENDPOINT_PENDING:
+ drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
+ grpc_pollset_worker worker;
if (state.write_done) {
break;
}
@@ -426,10 +411,14 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break;
+ case GRPC_ENDPOINT_ERROR:
+ gpr_log(GPR_ERROR, "endpoint got error");
+ abort();
}
+ gpr_slice_buffer_destroy(&outgoing);
grpc_endpoint_destroy(ep);
- free(slices);
+ gpr_free(slices);
}
void run_tests(void) {
@@ -449,10 +438,6 @@ void run_tests(void) {
write_test(100000, 137);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
- write_error_test(40320, i);
- }
-
- for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
write_test(40320, i);
}
}
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index a8368fc842..c76ddcd194 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -135,62 +135,26 @@ static grpc_endpoint_test_config configs[] = {
secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up},
};
-static void verify_leftover(void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
- gpr_slice s =
- gpr_slice_from_copied_string("hello world 12345678900987654321");
-
- GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
- GPR_ASSERT(nslices == 1);
-
- GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
- gpr_slice_unref(slices[0]);
- gpr_slice_unref(s);
- *(int *)user_data = 1;
-}
-
static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
- int verified = 0;
+ gpr_slice_buffer incoming;
+ gpr_slice s =
+ gpr_slice_from_copied_string("hello world 12345678900987654321");
gpr_log(GPR_INFO, "Start test left over");
- grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified);
- GPR_ASSERT(verified == 1);
+ gpr_slice_buffer_init(&incoming);
+ GPR_ASSERT(grpc_endpoint_read(f.client_ep, &incoming, NULL) ==
+ GRPC_ENDPOINT_DONE);
+ GPR_ASSERT(incoming.count == 1);
+ GPR_ASSERT(0 == gpr_slice_cmp(s, incoming.slices[0]));
grpc_endpoint_shutdown(f.client_ep);
grpc_endpoint_shutdown(f.server_ep);
grpc_endpoint_destroy(f.client_ep);
grpc_endpoint_destroy(f.server_ep);
- clean_up();
-}
-
-static void destroy_early(void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
- grpc_endpoint_test_fixture *f = user_data;
- gpr_slice s =
- gpr_slice_from_copied_string("hello world 12345678900987654321");
-
- GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
- GPR_ASSERT(nslices == 1);
-
- grpc_endpoint_shutdown(f->client_ep);
- grpc_endpoint_destroy(f->client_ep);
-
- GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0]));
- gpr_slice_unref(slices[0]);
gpr_slice_unref(s);
-}
+ gpr_slice_buffer_destroy(&incoming);
-/* test which destroys the ep before finishing reading */
-static void test_destroy_ep_early(grpc_endpoint_test_config config,
- size_t slice_size) {
- grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
- gpr_log(GPR_INFO, "Start test destroy early");
-
- grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f);
-
- grpc_endpoint_shutdown(f.server_ep);
- grpc_endpoint_destroy(f.server_ep);
clean_up();
}
@@ -203,7 +167,6 @@ int main(int argc, char **argv) {
grpc_pollset_init(&g_pollset);
grpc_endpoint_tests(configs[0], &g_pollset);
test_leftover(configs[1], 1);
- test_destroy_ep_early(configs[1], 1);
grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_iomgr_shutdown();