aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-08-27 07:38:01 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-08-27 07:38:01 -0700
commitb029859e580bfb7281c623622cdc32daf9846769 (patch)
treee3e12c253230021f57fb88f09c36e913cfd02f50 /src
parent4ff47d9ea361251274247998625808305c7f7c4a (diff)
Revert "Revert "Refactor Endpoint API""
Diffstat (limited to 'src')
-rw-r--r--src/core/httpcli/httpcli.c95
-rw-r--r--src/core/iomgr/endpoint.c17
-rw-r--r--src/core/iomgr/endpoint.h63
-rw-r--r--src/core/iomgr/tcp_posix.c525
-rw-r--r--src/core/iomgr/tcp_windows.c192
-rw-r--r--src/core/security/secure_endpoint.c188
-rw-r--r--src/core/security/secure_transport_setup.c119
-rw-r--r--src/core/support/slice_buffer.c22
-rw-r--r--src/core/transport/chttp2/internal.h12
-rw-r--r--src/core/transport/chttp2/writing.c21
-rw-r--r--src/core/transport/chttp2_transport.c140
11 files changed, 658 insertions, 736 deletions
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 9012070e8e..1e38479eb1 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -61,6 +61,10 @@ typedef struct {
grpc_httpcli_context *context;
grpc_pollset *pollset;
grpc_iomgr_object iomgr_obj;
+ gpr_slice_buffer incoming;
+ gpr_slice_buffer outgoing;
+ grpc_iomgr_closure on_read;
+ grpc_iomgr_closure done_write;
} internal_request;
static grpc_httpcli_get_override g_get_override = NULL;
@@ -99,73 +103,70 @@ static void finish(internal_request *req, int success) {
gpr_slice_unref(req->request_text);
gpr_free(req->host);
grpc_iomgr_unregister_object(&req->iomgr_obj);
+ gpr_slice_buffer_destroy(&req->incoming);
+ gpr_slice_buffer_destroy(&req->outgoing);
gpr_free(req);
}
-static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status status) {
+static void on_read(void *user_data, int success);
+
+static void do_read(internal_request *req) {
+ switch (grpc_endpoint_read(req->ep, &req->incoming, &req->on_read)) {
+ case GRPC_ENDPOINT_DONE:
+ on_read(req, 1);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ on_read(req, 0);
+ break;
+ }
+}
+
+static void on_read(void *user_data, int success) {
internal_request *req = user_data;
size_t i;
- for (i = 0; i < nslices; i++) {
- if (GPR_SLICE_LENGTH(slices[i])) {
+ for (i = 0; i < req->incoming.count; i++) {
+ if (GPR_SLICE_LENGTH(req->incoming.slices[i])) {
req->have_read_byte = 1;
- if (!grpc_httpcli_parser_parse(&req->parser, slices[i])) {
+ if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) {
finish(req, 0);
- goto done;
+ return;
}
}
}
- switch (status) {
- case GRPC_ENDPOINT_CB_OK:
- grpc_endpoint_notify_on_read(req->ep, on_read, req);
- break;
- case GRPC_ENDPOINT_CB_EOF:
- case GRPC_ENDPOINT_CB_ERROR:
- case GRPC_ENDPOINT_CB_SHUTDOWN:
- if (!req->have_read_byte) {
- next_address(req);
- } else {
- finish(req, grpc_httpcli_parser_eof(&req->parser));
- }
- break;
- }
-
-done:
- for (i = 0; i < nslices; i++) {
- gpr_slice_unref(slices[i]);
+ if (success) {
+ do_read(req);
+ } else if (!req->have_read_byte) {
+ next_address(req);
+ } else {
+ finish(req, grpc_httpcli_parser_eof(&req->parser));
}
}
-static void on_written(internal_request *req) {
- grpc_endpoint_notify_on_read(req->ep, on_read, req);
-}
+static void on_written(internal_request *req) { do_read(req); }
-static void done_write(void *arg, grpc_endpoint_cb_status status) {
+static void done_write(void *arg, int success) {
internal_request *req = arg;
- switch (status) {
- case GRPC_ENDPOINT_CB_OK:
- on_written(req);
- break;
- case GRPC_ENDPOINT_CB_EOF:
- case GRPC_ENDPOINT_CB_SHUTDOWN:
- case GRPC_ENDPOINT_CB_ERROR:
- next_address(req);
- break;
+ if (success) {
+ on_written(req);
+ } else {
+ next_address(req);
}
}
static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text);
- switch (
- grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
- case GRPC_ENDPOINT_WRITE_DONE:
+ gpr_slice_buffer_add(&req->outgoing, req->request_text);
+ switch (grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write)) {
+ case GRPC_ENDPOINT_DONE:
on_written(req);
break;
- case GRPC_ENDPOINT_WRITE_PENDING:
+ case GRPC_ENDPOINT_PENDING:
break;
- case GRPC_ENDPOINT_WRITE_ERROR:
+ case GRPC_ENDPOINT_ERROR:
finish(req, 0);
break;
}
@@ -237,6 +238,10 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
+ grpc_iomgr_closure_init(&req->on_read, on_read, req);
+ grpc_iomgr_closure_init(&req->done_write, done_write, req);
+ gpr_slice_buffer_init(&req->incoming);
+ gpr_slice_buffer_init(&req->outgoing);
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name);
@@ -270,7 +275,11 @@ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
- gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path);
+ grpc_iomgr_closure_init(&req->on_read, on_read, req);
+ grpc_iomgr_closure_init(&req->done_write, done_write, req);
+ gpr_slice_buffer_init(&req->incoming);
+ gpr_slice_buffer_init(&req->outgoing);
+ gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path);
grpc_iomgr_register_object(&req->iomgr_obj, name);
gpr_free(name);
req->host = gpr_strdup(request->host);
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index 8ee14bce9b..a7878e31dd 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -33,17 +33,16 @@
#include "src/core/iomgr/endpoint.h"
-void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data) {
- ep->vtable->notify_on_read(ep, cb, user_data);
+grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
+ return ep->vtable->read(ep, slices, cb);
}
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
- return ep->vtable->write(ep, slices, nslices, cb, user_data);
+grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
+ return ep->vtable->write(ep, slices, cb);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index ea92a500e8..d14d52d561 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -37,6 +37,7 @@
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_set.h"
#include <grpc/support/slice.h>
+#include <grpc/support/slice_buffer.h>
#include <grpc/support/time.h>
/* An endpoint caps a streaming channel between two communicating processes.
@@ -45,31 +46,17 @@
typedef struct grpc_endpoint grpc_endpoint;
typedef struct grpc_endpoint_vtable grpc_endpoint_vtable;
-typedef enum grpc_endpoint_cb_status {
- GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */
- GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */
- GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */
- GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */
-} grpc_endpoint_cb_status;
-
-typedef enum grpc_endpoint_write_status {
- GRPC_ENDPOINT_WRITE_DONE, /* completed immediately, cb won't be called */
- GRPC_ENDPOINT_WRITE_PENDING, /* cb will be called when completed */
- GRPC_ENDPOINT_WRITE_ERROR /* write errored out, cb won't be called */
-} grpc_endpoint_write_status;
-
-typedef void (*grpc_endpoint_read_cb)(void *user_data, gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_cb_status error);
-typedef void (*grpc_endpoint_write_cb)(void *user_data,
- grpc_endpoint_cb_status error);
+typedef enum grpc_endpoint_op_status {
+ GRPC_ENDPOINT_DONE, /* completed immediately, cb won't be called */
+ GRPC_ENDPOINT_PENDING, /* cb will be called when completed */
+ GRPC_ENDPOINT_ERROR /* write errored out, cb won't be called */
+} grpc_endpoint_op_status;
struct grpc_endpoint_vtable {
- void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data);
- grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
- size_t nslices, grpc_endpoint_write_cb cb,
- void *user_data);
+ grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb);
+ grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
void (*shutdown)(grpc_endpoint *ep);
@@ -77,26 +64,32 @@ struct grpc_endpoint_vtable {
char *(*get_peer)(grpc_endpoint *ep);
};
-/* When data is available on the connection, calls the callback with slices. */
-void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data);
+/* When data is available on the connection, calls the callback with slices.
+ Callback success indicates that the endpoint can accept more reads, failure
+ indicates the endpoint is closed.
+ Valid slices may be placed into \a slices even on callback success == 0. */
+grpc_endpoint_op_status grpc_endpoint_read(
+ grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it
- returns GRPC_ENDPOINT_WRITE_DONE.
- Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
- connection is ready for more data. */
-grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data);
+ returns GRPC_ENDPOINT_DONE.
+ Otherwise it returns GRPC_ENDPOINT_PENDING and calls cb when the
+ connection is ready for more data.
+ \a slices may be mutated at will by the endpoint until cb is called.
+ No guarantee is made to the content of slices after a write EXCEPT that
+ it is a valid slice buffer.
+ */
+grpc_endpoint_op_status grpc_endpoint_write(
+ grpc_endpoint *ep, gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
/* Causes any pending read/write callbacks to run immediately with
- GRPC_ENDPOINT_CB_SHUTDOWN status */
+ success==0 */
void grpc_endpoint_shutdown(grpc_endpoint *ep);
void grpc_endpoint_destroy(grpc_endpoint *ep);
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 360e6ebd8c..0db7cd9f0e 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -61,209 +61,8 @@
#define SENDMSG_FLAGS 0
#endif
-/* Holds a slice array and associated state. */
-typedef struct grpc_tcp_slice_state {
- gpr_slice *slices; /* Array of slices */
- size_t nslices; /* Size of slices array. */
- ssize_t first_slice; /* First valid slice in array */
- ssize_t last_slice; /* Last valid slice in array */
- gpr_slice working_slice; /* pointer to original final slice */
- int working_slice_valid; /* True if there is a working slice */
- int memory_owned; /* True if slices array is owned */
-} grpc_tcp_slice_state;
-
int grpc_tcp_trace = 0;
-static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices,
- size_t nslices, size_t valid_slices) {
- state->slices = slices;
- state->nslices = nslices;
- if (valid_slices == 0) {
- state->first_slice = -1;
- } else {
- state->first_slice = 0;
- }
- state->last_slice = valid_slices - 1;
- state->working_slice_valid = 0;
- state->memory_owned = 0;
-}
-
-/* Returns true if there is still available data */
-static int slice_state_has_available(grpc_tcp_slice_state *state) {
- return state->first_slice != -1 && state->last_slice >= state->first_slice;
-}
-
-static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) {
- if (state->first_slice == -1) {
- return 0;
- } else {
- return state->last_slice - state->first_slice + 1;
- }
-}
-
-static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) {
- /* TODO(klempner): use realloc instead when first_slice is 0 */
- /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */
- gpr_slice *slices = state->slices;
- size_t original_size = slice_state_slices_allocated(state);
- size_t i;
- gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size);
-
- for (i = 0; i < original_size; ++i) {
- new_slices[i] = slices[i + state->first_slice];
- }
-
- state->slices = new_slices;
- state->last_slice = original_size - 1;
- if (original_size > 0) {
- state->first_slice = 0;
- } else {
- state->first_slice = -1;
- }
- state->nslices = new_size;
-
- if (state->memory_owned) {
- gpr_free(slices);
- }
- state->memory_owned = 1;
-}
-
-static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
- size_t prefix_bytes) {
- gpr_slice *current_slice = &state->slices[state->first_slice];
- size_t current_slice_size;
-
- while (slice_state_has_available(state)) {
- current_slice_size = GPR_SLICE_LENGTH(*current_slice);
- if (current_slice_size > prefix_bytes) {
- /* TODO(klempner): Get rid of the extra refcount created here by adding a
- native "trim the first N bytes" operation to splice */
- /* TODO(klempner): This really shouldn't be modifying the current slice
- unless we own the slices array. */
- gpr_slice tail;
- tail = gpr_slice_split_tail(current_slice, prefix_bytes);
- gpr_slice_unref(*current_slice);
- *current_slice = tail;
- return;
- } else {
- gpr_slice_unref(*current_slice);
- ++state->first_slice;
- ++current_slice;
- prefix_bytes -= current_slice_size;
- }
- }
-}
-
-static void slice_state_destroy(grpc_tcp_slice_state *state) {
- while (slice_state_has_available(state)) {
- gpr_slice_unref(state->slices[state->first_slice]);
- ++state->first_slice;
- }
-
- if (state->memory_owned) {
- gpr_free(state->slices);
- state->memory_owned = 0;
- }
-}
-
-void slice_state_transfer_ownership(grpc_tcp_slice_state *state,
- gpr_slice **slices, size_t *nslices) {
- *slices = state->slices + state->first_slice;
- *nslices = state->last_slice - state->first_slice + 1;
-
- state->first_slice = -1;
- state->last_slice = -1;
-}
-
-/* Fills iov with the first min(iov_size, available) slices, returns number
- filled */
-static size_t slice_state_to_iovec(grpc_tcp_slice_state *state,
- struct iovec *iov, size_t iov_size) {
- size_t nslices = state->last_slice - state->first_slice + 1;
- gpr_slice *slices = state->slices + state->first_slice;
- size_t i;
- if (nslices < iov_size) {
- iov_size = nslices;
- }
-
- for (i = 0; i < iov_size; ++i) {
- iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]);
- iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]);
- }
- return iov_size;
-}
-
-/* Makes n blocks available at the end of state, writes them into iov, and
- returns the number of bytes allocated */
-static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state,
- struct iovec *iov, size_t n,
- size_t slice_size) {
- size_t target_size;
- size_t i;
- size_t allocated_bytes;
- ssize_t allocated_slices = slice_state_slices_allocated(state);
-
- if (n - state->working_slice_valid >= state->nslices - state->last_slice) {
- /* Need to grow the slice array */
- target_size = state->nslices;
- do {
- target_size = target_size * 2;
- } while (target_size < allocated_slices + n - state->working_slice_valid);
- /* TODO(klempner): If this ever needs to support both prefix removal and
- append, we should be smarter about the growth logic here */
- slice_state_realloc(state, target_size);
- }
-
- i = 0;
- allocated_bytes = 0;
-
- if (state->working_slice_valid) {
- iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]);
- iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) -
- GPR_SLICE_LENGTH(state->slices[state->last_slice]);
- allocated_bytes += iov[0].iov_len;
- ++i;
- state->slices[state->last_slice] = state->working_slice;
- state->working_slice_valid = 0;
- }
-
- for (; i < n; ++i) {
- ++state->last_slice;
- state->slices[state->last_slice] = gpr_slice_malloc(slice_size);
- iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]);
- iov[i].iov_len = slice_size;
- allocated_bytes += slice_size;
- }
- if (state->first_slice == -1) {
- state->first_slice = 0;
- }
- return allocated_bytes;
-}
-
-/* Remove the last n bytes from state */
-/* TODO(klempner): Consider having this defer actual deletion until later */
-static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) {
- while (bytes > 0 && slice_state_has_available(state)) {
- if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) {
- state->working_slice = state->slices[state->last_slice];
- state->working_slice_valid = 1;
- /* TODO(klempner): Combine these into a single operation that doesn't need
- to refcount */
- gpr_slice_unref(gpr_slice_split_tail(
- &state->slices[state->last_slice],
- GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes));
- bytes = 0;
- } else {
- bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]);
- gpr_slice_unref(state->slices[state->last_slice]);
- --state->last_slice;
- if (state->last_slice == -1) {
- state->first_slice = -1;
- }
- }
- }
-}
-
typedef struct {
grpc_endpoint base;
grpc_fd *em_fd;
@@ -273,80 +72,111 @@ typedef struct {
size_t slice_size;
gpr_refcount refcount;
- grpc_endpoint_read_cb read_cb;
- void *read_user_data;
- grpc_endpoint_write_cb write_cb;
- void *write_user_data;
+ gpr_slice_buffer *incoming_buffer;
+ gpr_slice_buffer *outgoing_buffer;
+ /** slice within outgoing_buffer to write next */
+ size_t outgoing_slice_idx;
+ /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
+ size_t outgoing_byte_idx;
- grpc_tcp_slice_state write_state;
+ grpc_iomgr_closure *read_cb;
+ grpc_iomgr_closure *write_cb;
grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
- grpc_iomgr_closure handle_read_closure;
-
char *peer_string;
} grpc_tcp;
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
+static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
+static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
-static void grpc_tcp_shutdown(grpc_endpoint *ep) {
+static void tcp_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_fd_shutdown(tcp->em_fd);
}
-static void grpc_tcp_unref(grpc_tcp *tcp) {
- int refcount_zero = gpr_unref(&tcp->refcount);
- if (refcount_zero) {
- grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
- gpr_free(tcp->peer_string);
- gpr_free(tcp);
+static void tcp_free(grpc_tcp *tcp) {
+ grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
+ gpr_free(tcp->peer_string);
+ gpr_free(tcp);
+}
+
+/*#define GRPC_TCP_REFCOUNT_DEBUG*/
+#ifdef GRPC_TCP_REFCOUNT_DEBUG
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count - 1);
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
}
}
-static void grpc_tcp_destroy(grpc_endpoint *ep) {
+static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count + 1);
+ gpr_ref(&tcp->refcount);
+}
+#else
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_REF(tcp, reason) tcp_ref((tcp))
+static void tcp_unref(grpc_tcp *tcp) {
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
+ }
+}
+
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
+#endif
+
+static void tcp_destroy(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_tcp_unref(tcp);
+ TCP_UNREF(tcp, "destroy");
}
-static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status status) {
- grpc_endpoint_read_cb cb = tcp->read_cb;
+static void call_read_cb(grpc_tcp *tcp, int success) {
+ grpc_iomgr_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
- gpr_log(GPR_DEBUG, "read: status=%d", status);
- for (i = 0; i < nslices; i++) {
- char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "read: success=%d", success);
+ for (i = 0; i < tcp->incoming_buffer->count; i++) {
+ char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
gpr_free(dump);
}
}
tcp->read_cb = NULL;
- cb(tcp->read_user_data, slices, nslices, status);
+ tcp->incoming_buffer = NULL;
+ cb->cb(cb->cb_arg, success);
}
-#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
-static void grpc_tcp_continue_read(grpc_tcp *tcp) {
- gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
+static void tcp_continue_read(grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
- ssize_t allocated_bytes;
- struct grpc_tcp_slice_state read_state;
- gpr_slice *final_slices;
- size_t final_nslices;
+ size_t i;
GPR_ASSERT(!tcp->finished_edge);
+ GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
+ GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
- slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
- 0);
- allocated_bytes = slice_state_append_blocks_into_iovec(
- &read_state, iov, tcp->iov_size, tcp->slice_size);
+ while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
+ gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
+ gpr_slice_malloc(tcp->slice_size));
+ }
+ for (i = 0; i < tcp->incoming_buffer->count; i++) {
+ iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
+ iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
+ }
msg.msg_name = NULL;
msg.msg_namelen = 0;
@@ -362,106 +192,105 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) {
} while (read_bytes < 0 && errno == EINTR);
GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
- if (read_bytes < allocated_bytes) {
- /* TODO(klempner): Consider a second read first, in hopes of getting a
- * quick EAGAIN and saving a bunch of allocations. */
- slice_state_remove_last(&read_state, read_bytes < 0
- ? allocated_bytes
- : allocated_bytes - read_bytes);
- }
-
if (read_bytes < 0) {
- /* NB: After calling the user_cb a parallel call of the read handler may
+ /* NB: After calling call_read_cb a parallel call of the read handler may
* be running. */
if (errno == EAGAIN) {
if (tcp->iov_size > 1) {
tcp->iov_size /= 2;
}
- if (slice_state_has_available(&read_state)) {
- /* TODO(klempner): We should probably do the call into the application
- without all this junk on the stack */
- /* FIXME(klempner): Refcount properly */
- slice_state_transfer_ownership(&read_state, &final_slices,
- &final_nslices);
- tcp->finished_edge = 1;
- call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
- } else {
- /* We've consumed the edge, request a new one */
- slice_state_destroy(&read_state);
- grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
- }
+ /* We've consumed the edge, request a new one */
+ grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
/* TODO(klempner): Log interesting errors */
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
+ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ call_read_cb(tcp, 0);
+ TCP_UNREF(tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
- if (slice_state_has_available(&read_state)) {
- /* there were bytes already read: pass them up to the application */
- slice_state_transfer_ownership(&read_state, &final_slices,
- &final_nslices);
- call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
- } else {
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
- }
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
+ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ call_read_cb(tcp, 0);
+ TCP_UNREF(tcp, "read");
} else {
- if (tcp->iov_size < MAX_READ_IOVEC) {
+ GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
+ if ((size_t)read_bytes < tcp->incoming_buffer->length) {
+ gpr_slice_buffer_trim_end(tcp->incoming_buffer,
+ tcp->incoming_buffer->length - read_bytes);
+ } else if (tcp->iov_size < MAX_READ_IOVEC) {
++tcp->iov_size;
}
- GPR_ASSERT(slice_state_has_available(&read_state));
- slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices);
- call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
- slice_state_destroy(&read_state);
- grpc_tcp_unref(tcp);
+ GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
+ call_read_cb(tcp, 1);
+ TCP_UNREF(tcp, "read");
}
GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
}
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
if (!success) {
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
- grpc_tcp_unref(tcp);
+ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ call_read_cb(tcp, 0);
+ TCP_UNREF(tcp, "read");
} else {
- grpc_tcp_continue_read(tcp);
+ tcp_continue_read(tcp);
}
}
-static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data) {
+static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
+ gpr_slice_buffer *incoming_buffer,
+ grpc_iomgr_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
- tcp->read_user_data = user_data;
- gpr_ref(&tcp->refcount);
+ tcp->incoming_buffer = incoming_buffer;
+ gpr_slice_buffer_reset_and_unref(incoming_buffer);
+ TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
- tcp->handle_read_closure.cb_arg = tcp;
- grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1);
+ grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1);
}
+ /* TODO(ctiller): immediate return */
+ return GRPC_ENDPOINT_PENDING;
}
#define MAX_WRITE_IOVEC 16
-static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
+static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
int iov_size;
ssize_t sent_length;
- grpc_tcp_slice_state *state = &tcp->write_state;
+ ssize_t sending_length;
+ ssize_t trailing;
+ ssize_t unwind_slice_idx;
+ ssize_t unwind_byte_idx;
for (;;) {
- iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC);
+ sending_length = 0;
+ unwind_slice_idx = tcp->outgoing_slice_idx;
+ unwind_byte_idx = tcp->outgoing_byte_idx;
+ for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
+ iov_size != MAX_WRITE_IOVEC;
+ iov_size++) {
+ iov[iov_size].iov_base =
+ GPR_SLICE_START_PTR(
+ tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
+ tcp->outgoing_byte_idx;
+ iov[iov_size].iov_len =
+ GPR_SLICE_LENGTH(
+ tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
+ tcp->outgoing_byte_idx;
+ sending_length += iov[iov_size].iov_len;
+ tcp->outgoing_slice_idx++;
+ tcp->outgoing_byte_idx = 0;
+ }
+ GPR_ASSERT(iov_size > 0);
msg.msg_name = NULL;
msg.msg_namelen = 0;
@@ -480,70 +309,75 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
if (sent_length < 0) {
if (errno == EAGAIN) {
- return GRPC_ENDPOINT_WRITE_PENDING;
+ tcp->outgoing_slice_idx = unwind_slice_idx;
+ tcp->outgoing_byte_idx = unwind_byte_idx;
+ return GRPC_ENDPOINT_PENDING;
} else {
/* TODO(klempner): Log some of these */
- slice_state_destroy(state);
- return GRPC_ENDPOINT_WRITE_ERROR;
+ return GRPC_ENDPOINT_ERROR;
}
}
- /* TODO(klempner): Probably better to batch this after we finish flushing */
- slice_state_remove_prefix(state, sent_length);
+ GPR_ASSERT(tcp->outgoing_byte_idx == 0);
+ trailing = sending_length - sent_length;
+ while (trailing > 0) {
+ ssize_t slice_length;
+
+ tcp->outgoing_slice_idx--;
+ slice_length = GPR_SLICE_LENGTH(
+ tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
+ if (slice_length > trailing) {
+ tcp->outgoing_byte_idx = slice_length - trailing;
+ break;
+ } else {
+ trailing -= slice_length;
+ }
+ }
- if (!slice_state_has_available(state)) {
- return GRPC_ENDPOINT_WRITE_DONE;
+ if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
+ return GRPC_ENDPOINT_DONE;
}
};
}
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
+static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
- grpc_endpoint_write_status write_status;
- grpc_endpoint_cb_status cb_status;
- grpc_endpoint_write_cb cb;
+ grpc_endpoint_op_status status;
+ grpc_iomgr_closure *cb;
if (!success) {
- slice_state_destroy(&tcp->write_state);
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
- grpc_tcp_unref(tcp);
+ cb->cb(cb->cb_arg, 0);
+ TCP_UNREF(tcp, "write");
return;
}
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
- write_status = grpc_tcp_flush(tcp);
- if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
+ status = tcp_flush(tcp);
+ if (status == GRPC_ENDPOINT_PENDING) {
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
} else {
- slice_state_destroy(&tcp->write_state);
- if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
- cb_status = GRPC_ENDPOINT_CB_OK;
- } else {
- cb_status = GRPC_ENDPOINT_CB_ERROR;
- }
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb(tcp->write_user_data, cb_status);
- grpc_tcp_unref(tcp);
+ cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
+ TCP_UNREF(tcp, "write");
}
GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
}
-static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
+static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
+ gpr_slice_buffer *buf,
+ grpc_iomgr_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_endpoint_write_status status;
+ grpc_endpoint_op_status status;
if (grpc_tcp_trace) {
size_t i;
- for (i = 0; i < nslices; i++) {
- char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < buf->count; i++) {
+ char *data =
+ gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
gpr_free(data);
}
@@ -551,15 +385,19 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
GPR_ASSERT(tcp->write_cb == NULL);
- slice_state_init(&tcp->write_state, slices, nslices, nslices);
- status = grpc_tcp_flush(tcp);
- if (status == GRPC_ENDPOINT_WRITE_PENDING) {
- /* TODO(klempner): Consider inlining rather than malloc for small nslices */
- slice_state_realloc(&tcp->write_state, nslices);
- gpr_ref(&tcp->refcount);
+ if (buf->length == 0) {
+ GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
+ return GRPC_ENDPOINT_DONE;
+ }
+ tcp->outgoing_buffer = buf;
+ tcp->outgoing_slice_idx = 0;
+ tcp->outgoing_byte_idx = 0;
+
+ status = tcp_flush(tcp);
+ if (status == GRPC_ENDPOINT_PENDING) {
+ TCP_REF(tcp, "write");
tcp->write_cb = cb;
- tcp->write_user_data = user_data;
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
}
@@ -567,27 +405,25 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
return status;
}
-static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
+static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_add_fd(pollset, tcp->em_fd);
}
-static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep,
- grpc_pollset_set *pollset_set) {
+static void tcp_add_to_pollset_set(grpc_endpoint *ep,
+ grpc_pollset_set *pollset_set) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
}
-static char *grpc_tcp_get_peer(grpc_endpoint *ep) {
+static char *tcp_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string);
}
static const grpc_endpoint_vtable vtable = {
- grpc_tcp_notify_on_read, grpc_tcp_write,
- grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set,
- grpc_tcp_shutdown, grpc_tcp_destroy,
- grpc_tcp_get_peer};
+ tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
+ tcp_shutdown, tcp_destroy, tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {
@@ -597,21 +433,18 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->fd = em_fd->fd;
tcp->read_cb = NULL;
tcp->write_cb = NULL;
- tcp->read_user_data = NULL;
- tcp->write_user_data = NULL;
+ tcp->incoming_buffer = NULL;
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = 1;
- slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
- tcp->read_closure.cb = grpc_tcp_handle_read;
+ tcp->read_closure.cb = tcp_handle_read;
tcp->read_closure.cb_arg = tcp;
- tcp->write_closure.cb = grpc_tcp_handle_write;
+ tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
- tcp->handle_read_closure.cb = grpc_tcp_handle_read;
return &tcp->base;
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 901793ec43..58f9160ef9 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -82,13 +82,11 @@ typedef struct grpc_tcp {
/* Refcounting how many operations are in progress. */
gpr_refcount refcount;
- grpc_endpoint_read_cb read_cb;
- void *read_user_data;
+ grpc_iomgr_closure *read_cb;
+ grpc_iomgr_closure *write_cb;
gpr_slice read_slice;
-
- grpc_endpoint_write_cb write_cb;
- void *write_user_data;
- gpr_slice_buffer write_slices;
+ gpr_slice_buffer *write_slices;
+ gpr_slice_buffer *read_slices;
/* The IO Completion Port runs from another thread. We need some mechanism
to protect ourselves when requesting a shutdown. */
@@ -98,34 +96,55 @@ typedef struct grpc_tcp {
char *peer_string;
} grpc_tcp;
-static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
+static void tcp_free(grpc_tcp *tcp) {
+ grpc_winsocket_orphan(tcp->socket);
+ gpr_mu_destroy(&tcp->mu);
+ gpr_free(tcp->peer_string);
+ gpr_free(tcp);
+}
+
+/*#define GRPC_TCP_REFCOUNT_DEBUG*/
+#ifdef GRPC_TCP_REFCOUNT_DEBUG
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count - 1);
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp);
+ }
+}
+static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
+ int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
+ reason, tcp->refcount.count, tcp->refcount.count + 1);
+ gpr_ref(&tcp->refcount);
+}
+#else
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_REF(tcp, reason) tcp_ref((tcp))
static void tcp_unref(grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
- gpr_slice_buffer_destroy(&tcp->write_slices);
- grpc_winsocket_orphan(tcp->socket);
- gpr_mu_destroy(&tcp->mu);
- gpr_free(tcp->peer_string);
- gpr_free(tcp);
+ tcp_free(tcp);
}
}
+static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
+#endif
+
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_read(void *tcpp, int from_iocp) {
- grpc_tcp *tcp = (grpc_tcp *)tcpp;
+static int on_read(grpc_tcp *tcp, int from_iocp) {
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
gpr_slice *slice = NULL;
size_t nslices = 0;
- grpc_endpoint_cb_status status;
- grpc_endpoint_read_cb cb;
+ int success;
grpc_winsocket_callback_info *info = &socket->read_info;
- void *opaque = tcp->read_user_data;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
- cb = tcp->read_cb;
- tcp->read_cb = NULL;
if (!from_iocp || tcp->shutting_down) {
/* If we are here with from_iocp set to true, it means we got raced to
shutting down the endpoint. No actual abort callback will happen
@@ -139,9 +158,7 @@ static void on_read(void *tcpp, int from_iocp) {
tcp->socket->read_info.outstanding = 0;
gpr_slice_unref(tcp->read_slice);
}
- tcp_unref(tcp);
- if (cb) cb(opaque, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
- return;
+ return 0;
}
GPR_ASSERT(tcp->socket->read_info.outstanding);
@@ -152,28 +169,38 @@ static void on_read(void *tcpp, int from_iocp) {
gpr_log(GPR_ERROR, "ReadFile overlapped error: %s", utf8_message);
gpr_free(utf8_message);
}
+ success = 0;
gpr_slice_unref(tcp->read_slice);
- status = GRPC_ENDPOINT_CB_ERROR;
} else {
if (info->bytes_transfered != 0) {
sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
- status = GRPC_ENDPOINT_CB_OK;
- slice = &sub;
- nslices = 1;
+ gpr_slice_buffer_add(tcp->read_slices, sub);
+ success = 1;
} else {
gpr_slice_unref(tcp->read_slice);
- status = GRPC_ENDPOINT_CB_EOF;
+ success = 0;
}
}
tcp->socket->read_info.outstanding = 0;
- tcp_unref(tcp);
- cb(opaque, slice, nslices, status);
+ return success;
+}
+
+static void on_read_cb(void *tcpp, int from_iocp) {
+ grpc_tcp *tcp = tcpp;
+ grpc_iomgr_closure *cb = tcp->read_cb;
+ int success = on_read(tcp, from_iocp);
+ tcp->read_cb = NULL;
+ TCP_UNREF(tcp, "read");
+ if (cb) {
+ cb->cb(cb->cb_arg, success);
+ }
}
-static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *arg) {
+static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
+ gpr_slice_buffer *read_slices,
+ grpc_iomgr_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info;
@@ -184,13 +211,15 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
GPR_ASSERT(!tcp->socket->read_info.outstanding);
if (tcp->shutting_down) {
- cb(arg, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
- return;
+ return GRPC_ENDPOINT_ERROR;
}
- tcp_ref(tcp);
+
+ TCP_REF(tcp, "read");
+
tcp->socket->read_info.outstanding = 1;
tcp->read_cb = cb;
- tcp->read_user_data = arg;
+ tcp->read_slices = read_slices;
+ gpr_slice_buffer_reset_and_unref(read_slices);
tcp->read_slice = gpr_slice_malloc(8192);
@@ -204,10 +233,11 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
+ int ok;
info->bytes_transfered = bytes_read;
- /* This might heavily recurse. */
- on_read(tcp, 1);
- return;
+ ok = on_read(tcp, 1);
+ TCP_UNREF(tcp, "read");
+ return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
}
/* Otherwise, let's retry, by queuing a read. */
@@ -218,13 +248,15 @@ static void win_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
+ int ok;
info->wsa_error = wsa_error;
- on_read(tcp, 1);
- return;
+ ok = on_read(tcp, 1);
+ return ok ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
}
}
- grpc_socket_notify_on_read(tcp->socket, on_read, tcp);
+ grpc_socket_notify_on_read(tcp->socket, on_read_cb, tcp);
+ return GRPC_ENDPOINT_PENDING;
}
/* Asynchronous callback from the IOCP, or the background thread. */
@@ -232,9 +264,8 @@ static void on_write(void *tcpp, int from_iocp) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
- grpc_endpoint_cb_status status = GRPC_ENDPOINT_CB_OK;
- grpc_endpoint_write_cb cb;
- void *opaque = tcp->write_user_data;
+ grpc_iomgr_closure *cb;
+ int success;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
@@ -251,10 +282,11 @@ static void on_write(void *tcpp, int from_iocp) {
if (do_abort) {
if (from_iocp) {
tcp->socket->write_info.outstanding = 0;
- gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
}
- tcp_unref(tcp);
- if (cb) cb(opaque, GRPC_ENDPOINT_CB_SHUTDOWN);
+ TCP_UNREF(tcp, "write");
+ if (cb) {
+ cb->cb(cb->cb_arg, 0);
+ }
return;
}
@@ -266,23 +298,22 @@ static void on_write(void *tcpp, int from_iocp) {
gpr_log(GPR_ERROR, "WSASend overlapped error: %s", utf8_message);
gpr_free(utf8_message);
}
- status = GRPC_ENDPOINT_CB_ERROR;
+ success = 0;
} else {
- GPR_ASSERT(info->bytes_transfered == tcp->write_slices.length);
+ GPR_ASSERT(info->bytes_transfered == tcp->write_slices->length);
+ success = 1;
}
- gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0;
- tcp_unref(tcp);
- cb(opaque, status);
+ TCP_UNREF(tcp, "write");
+ cb->cb(cb->cb_arg, success);
}
/* Initiates a write. */
-static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
- gpr_slice *slices, size_t nslices,
- grpc_endpoint_write_cb cb,
- void *arg) {
+static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info;
@@ -295,28 +326,26 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
GPR_ASSERT(!tcp->socket->write_info.outstanding);
if (tcp->shutting_down) {
- return GRPC_ENDPOINT_WRITE_ERROR;
+ return GRPC_ENDPOINT_ERROR;
}
- tcp_ref(tcp);
+ TCP_REF(tcp, "write");
tcp->socket->write_info.outstanding = 1;
tcp->write_cb = cb;
- tcp->write_user_data = arg;
-
- gpr_slice_buffer_addn(&tcp->write_slices, slices, nslices);
+ tcp->write_slices = slices;
- if (tcp->write_slices.count > GPR_ARRAY_SIZE(local_buffers)) {
- buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices.count);
+ if (tcp->write_slices->count > GPR_ARRAY_SIZE(local_buffers)) {
+ buffers = (WSABUF *)gpr_malloc(sizeof(WSABUF) * tcp->write_slices->count);
allocated = buffers;
}
- for (i = 0; i < tcp->write_slices.count; i++) {
- buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]);
- buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
+ for (i = 0; i < tcp->write_slices->count; i++) {
+ buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices->slices[i]);
+ buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices->slices[i]);
}
/* First, let's try a synchronous, non-blocking write. */
- status = WSASend(socket->socket, buffers, tcp->write_slices.count,
+ status = WSASend(socket->socket, buffers, tcp->write_slices->count,
&bytes_sent, 0, NULL, NULL);
info->wsa_error = status == 0 ? 0 : WSAGetLastError();
@@ -324,10 +353,10 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) {
- grpc_endpoint_write_status ret = GRPC_ENDPOINT_WRITE_ERROR;
+ grpc_endpoint_op_status ret = GRPC_ENDPOINT_ERROR;
if (status == 0) {
- ret = GRPC_ENDPOINT_WRITE_DONE;
- GPR_ASSERT(bytes_sent == tcp->write_slices.length);
+ ret = GRPC_ENDPOINT_DONE;
+ GPR_ASSERT(bytes_sent == tcp->write_slices->length);
} else {
if (socket->read_info.wsa_error != WSAECONNRESET) {
char *utf8_message = gpr_format_message(info->wsa_error);
@@ -336,33 +365,31 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
}
}
if (allocated) gpr_free(allocated);
- gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0;
- tcp_unref(tcp);
+ TCP_UNREF(tcp, "write");
return ret;
}
/* If we got a WSAEWOULDBLOCK earlier, then we need to re-do the same
operation, this time asynchronously. */
memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
- status = WSASend(socket->socket, buffers, tcp->write_slices.count,
+ status = WSASend(socket->socket, buffers, tcp->write_slices->count,
&bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated);
if (status != 0) {
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
- gpr_slice_buffer_reset_and_unref(&tcp->write_slices);
tcp->socket->write_info.outstanding = 0;
- tcp_unref(tcp);
- return GRPC_ENDPOINT_WRITE_ERROR;
+ TCP_UNREF(tcp, "write");
+ return GRPC_ENDPOINT_ERROR;
}
}
/* As all is now setup, we can now ask for the IOCP notification. It may
trigger the callback immediately however, but no matter. */
grpc_socket_notify_on_write(socket, on_write, tcp);
- return GRPC_ENDPOINT_WRITE_PENDING;
+ return GRPC_ENDPOINT_PENDING;
}
static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) {
@@ -387,19 +414,17 @@ static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) {
concurrent access of the data structure in that regard. */
static void win_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- int extra_refs = 0;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
callback. See the comments in on_read and on_write. */
tcp->shutting_down = 1;
- extra_refs = grpc_winsocket_shutdown(tcp->socket);
- while (extra_refs--) tcp_ref(tcp);
+ grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
}
static void win_destroy(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- tcp_unref(tcp);
+ TCP_UNREF(tcp, "destroy");
}
static char *win_get_peer(grpc_endpoint *ep) {
@@ -408,8 +433,8 @@ static char *win_get_peer(grpc_endpoint *ep) {
}
static grpc_endpoint_vtable vtable = {
- win_notify_on_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
- win_shutdown, win_destroy, win_get_peer};
+ win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
+ win_shutdown, win_destroy, win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
@@ -417,7 +442,6 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
tcp->base.vtable = &vtable;
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
- gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
return &tcp->base;
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 81b3e33cb2..b696e384fc 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -49,15 +49,15 @@ typedef struct {
struct tsi_frame_protector *protector;
gpr_mu protector_mu;
/* saved upper level callbacks and user_data. */
- grpc_endpoint_read_cb read_cb;
- void *read_user_data;
- grpc_endpoint_write_cb write_cb;
- void *write_user_data;
+ grpc_iomgr_closure *read_cb;
+ grpc_iomgr_closure *write_cb;
+ grpc_iomgr_closure on_read;
+ gpr_slice_buffer *read_buffer;
+ gpr_slice_buffer source_buffer;
/* saved handshaker leftover data to unprotect. */
gpr_slice_buffer leftover_bytes;
/* buffers for read and write */
gpr_slice read_staging_buffer;
- gpr_slice_buffer input_buffer;
gpr_slice write_staging_buffer;
gpr_slice_buffer output_buffer;
@@ -67,62 +67,91 @@ typedef struct {
int grpc_trace_secure_endpoint = 0;
-static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
-
static void destroy(secure_endpoint *secure_ep) {
secure_endpoint *ep = secure_ep;
grpc_endpoint_destroy(ep->wrapped_ep);
tsi_frame_protector_destroy(ep->protector);
gpr_slice_buffer_destroy(&ep->leftover_bytes);
gpr_slice_unref(ep->read_staging_buffer);
- gpr_slice_buffer_destroy(&ep->input_buffer);
gpr_slice_unref(ep->write_staging_buffer);
gpr_slice_buffer_destroy(&ep->output_buffer);
+ gpr_slice_buffer_destroy(&ep->source_buffer);
gpr_mu_destroy(&ep->protector_mu);
gpr_free(ep);
}
+/*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/
+#ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
+#define SECURE_ENDPOINT_UNREF(ep, reason) \
+ secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
+#define SECURE_ENDPOINT_REF(ep, reason) \
+ secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
+static void secure_endpoint_unref(secure_endpoint *ep, const char *reason,
+ const char *file, int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d",
+ ep, reason, ep->ref.count, ep->ref.count - 1);
+ if (gpr_unref(&ep->ref)) {
+ destroy(ep);
+ }
+}
+
+static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
+ const char *file, int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP ref %p : %s %d -> %d",
+ ep, reason, ep->ref.count, ep->ref.count + 1);
+ gpr_ref(&ep->ref);
+}
+#else
+#define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
+#define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
static void secure_endpoint_unref(secure_endpoint *ep) {
if (gpr_unref(&ep->ref)) {
destroy(ep);
}
}
+static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
+#endif
+
static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
gpr_uint8 **end) {
- gpr_slice_buffer_add(&ep->input_buffer, ep->read_staging_buffer);
+ gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
*cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
*end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
}
-static void call_read_cb(secure_endpoint *ep, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static void call_read_cb(secure_endpoint *ep, int success) {
if (grpc_trace_secure_endpoint) {
size_t i;
- for (i = 0; i < nslices; i++) {
- char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < ep->read_buffer->count; i++) {
+ char *data = gpr_dump_slice(ep->read_buffer->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p: %s", ep, data);
gpr_free(data);
}
}
- ep->read_cb(ep->read_user_data, slices, nslices, error);
- secure_endpoint_unref(ep);
+ ep->read_buffer = NULL;
+ ep->read_cb->cb(ep->read_cb->cb_arg, success);
+ SECURE_ENDPOINT_UNREF(ep, "read");
}
-static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static int on_read(void *user_data, int success) {
unsigned i;
gpr_uint8 keep_looping = 0;
- size_t input_buffer_count = 0;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)user_data;
gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
+ if (!success) {
+ gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+ return 0;
+ }
+
/* TODO(yangg) check error, maybe bail out early */
- for (i = 0; i < nslices; i++) {
- gpr_slice encrypted = slices[i];
+ for (i = 0; i < ep->source_buffer.count; i++) {
+ gpr_slice encrypted = ep->source_buffer.slices[i];
gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted);
size_t message_size = GPR_SLICE_LENGTH(encrypted);
@@ -161,7 +190,7 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) {
gpr_slice_buffer_add(
- &ep->input_buffer,
+ ep->read_buffer,
gpr_slice_split_head(
&ep->read_staging_buffer,
(size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))));
@@ -169,38 +198,53 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
/* TODO(yangg) experiment with moving this block after read_cb to see if it
helps latency */
- for (i = 0; i < nslices; i++) {
- gpr_slice_unref(slices[i]);
- }
+ gpr_slice_buffer_reset_and_unref(&ep->source_buffer);
if (result != TSI_OK) {
- gpr_slice_buffer_reset_and_unref(&ep->input_buffer);
- call_read_cb(ep, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
- return;
+ gpr_slice_buffer_reset_and_unref(ep->read_buffer);
+ return 0;
}
- /* The upper level will unref the slices. */
- input_buffer_count = ep->input_buffer.count;
- ep->input_buffer.count = 0;
- call_read_cb(ep, ep->input_buffer.slices, input_buffer_count, error);
+
+ return 1;
+}
+
+static void on_read_cb(void *user_data, int success) {
+ call_read_cb(user_data, on_read(user_data, success));
}
-static void endpoint_notify_on_read(grpc_endpoint *secure_ep,
- grpc_endpoint_read_cb cb, void *user_data) {
+static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
+ int immediate_read_success = -1;
ep->read_cb = cb;
- ep->read_user_data = user_data;
-
- secure_endpoint_ref(ep);
+ ep->read_buffer = slices;
+ gpr_slice_buffer_reset_and_unref(ep->read_buffer);
if (ep->leftover_bytes.count) {
- size_t leftover_nslices = ep->leftover_bytes.count;
- ep->leftover_bytes.count = 0;
- on_read(ep, ep->leftover_bytes.slices, leftover_nslices,
- GRPC_ENDPOINT_CB_OK);
- return;
+ gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
+ GPR_ASSERT(ep->leftover_bytes.count == 0);
+ return on_read(ep, 1) ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
}
- grpc_endpoint_notify_on_read(ep->wrapped_ep, on_read, ep);
+ SECURE_ENDPOINT_REF(ep, "read");
+
+ switch (
+ grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read)) {
+ case GRPC_ENDPOINT_DONE:
+ immediate_read_success = on_read(ep, 1);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ return GRPC_ENDPOINT_PENDING;
+ case GRPC_ENDPOINT_ERROR:
+ immediate_read_success = on_read(ep, 0);
+ break;
+ }
+
+ GPR_ASSERT(immediate_read_success != -1);
+ SECURE_ENDPOINT_UNREF(ep, "read");
+
+ return immediate_read_success ? GRPC_ENDPOINT_DONE : GRPC_ENDPOINT_ERROR;
}
static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
@@ -211,36 +255,28 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
*end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
}
-static void on_write(void *data, grpc_endpoint_cb_status error) {
- secure_endpoint *ep = data;
- ep->write_cb(ep->write_user_data, error);
- secure_endpoint_unref(ep);
-}
-
-static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
- gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_write_cb cb,
- void *user_data) {
+static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
+ gpr_slice_buffer *slices,
+ grpc_iomgr_closure *cb) {
unsigned i;
- size_t output_buffer_count = 0;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep;
gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer);
gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
- grpc_endpoint_write_status status;
- GPR_ASSERT(ep->output_buffer.count == 0);
+
+ gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
if (grpc_trace_secure_endpoint) {
- for (i = 0; i < nslices; i++) {
- char *data = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ for (i = 0; i < slices->count; i++) {
+ char *data =
+ gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data);
gpr_free(data);
}
}
- for (i = 0; i < nslices; i++) {
- gpr_slice plain = slices[i];
+ for (i = 0; i < slices->count; i++) {
+ gpr_slice plain = slices->slices[i];
gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain);
size_t message_size = GPR_SLICE_LENGTH(plain);
while (message_size > 0) {
@@ -290,29 +326,13 @@ static grpc_endpoint_write_status endpoint_write(grpc_endpoint *secure_ep,
}
}
- for (i = 0; i < nslices; i++) {
- gpr_slice_unref(slices[i]);
- }
-
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- return GRPC_ENDPOINT_WRITE_ERROR;
+ return GRPC_ENDPOINT_ERROR;
}
- /* clear output_buffer and let the lower level handle its slices. */
- output_buffer_count = ep->output_buffer.count;
- ep->output_buffer.count = 0;
- ep->write_cb = cb;
- ep->write_user_data = user_data;
- /* Need to keep the endpoint alive across a transport */
- secure_endpoint_ref(ep);
- status = grpc_endpoint_write(ep->wrapped_ep, ep->output_buffer.slices,
- output_buffer_count, on_write, ep);
- if (status != GRPC_ENDPOINT_WRITE_PENDING) {
- secure_endpoint_unref(ep);
- }
- return status;
+ return grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb);
}
static void endpoint_shutdown(grpc_endpoint *secure_ep) {
@@ -320,9 +340,9 @@ static void endpoint_shutdown(grpc_endpoint *secure_ep) {
grpc_endpoint_shutdown(ep->wrapped_ep);
}
-static void endpoint_unref(grpc_endpoint *secure_ep) {
+static void endpoint_destroy(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
- secure_endpoint_unref(ep);
+ SECURE_ENDPOINT_UNREF(ep, "destroy");
}
static void endpoint_add_to_pollset(grpc_endpoint *secure_ep,
@@ -343,9 +363,9 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
}
static const grpc_endpoint_vtable vtable = {
- endpoint_notify_on_read, endpoint_write,
+ endpoint_read, endpoint_write,
endpoint_add_to_pollset, endpoint_add_to_pollset_set,
- endpoint_shutdown, endpoint_unref,
+ endpoint_shutdown, endpoint_destroy,
endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create(
@@ -363,8 +383,10 @@ grpc_endpoint *grpc_secure_endpoint_create(
}
ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
- gpr_slice_buffer_init(&ep->input_buffer);
gpr_slice_buffer_init(&ep->output_buffer);
+ gpr_slice_buffer_init(&ep->source_buffer);
+ ep->read_buffer = NULL;
+ grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index 0c3572b53c..bf0079577e 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -50,16 +50,17 @@ typedef struct {
grpc_endpoint *wrapped_endpoint;
grpc_endpoint *secure_endpoint;
gpr_slice_buffer left_overs;
+ gpr_slice_buffer incoming;
+ gpr_slice_buffer outgoing;
grpc_secure_transport_setup_done_cb cb;
void *user_data;
+ grpc_iomgr_closure on_handshake_data_sent_to_peer;
+ grpc_iomgr_closure on_handshake_data_received_from_peer;
} grpc_secure_transport_setup;
-static void on_handshake_data_received_from_peer(void *setup, gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_cb_status error);
+static void on_handshake_data_received_from_peer(void *setup, int success);
-static void on_handshake_data_sent_to_peer(void *setup,
- grpc_endpoint_cb_status error);
+static void on_handshake_data_sent_to_peer(void *setup, int success);
static void secure_transport_setup_done(grpc_secure_transport_setup *s,
int is_success) {
@@ -78,6 +79,8 @@ static void secure_transport_setup_done(grpc_secure_transport_setup *s,
if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker);
if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer);
gpr_slice_buffer_destroy(&s->left_overs);
+ gpr_slice_buffer_destroy(&s->outgoing);
+ gpr_slice_buffer_destroy(&s->incoming);
GRPC_SECURITY_CONNECTOR_UNREF(s->connector, "secure_transport_setup");
gpr_free(s);
}
@@ -102,6 +105,8 @@ static void on_peer_checked(void *user_data, grpc_security_status status) {
s->secure_endpoint =
grpc_secure_endpoint_create(protector, s->wrapped_endpoint,
s->left_overs.slices, s->left_overs.count);
+ s->left_overs.count = 0;
+ s->left_overs.length = 0;
secure_transport_setup_done(s, 1);
return;
}
@@ -132,7 +137,6 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
size_t offset = 0;
tsi_result result = TSI_OK;
gpr_slice to_send;
- grpc_endpoint_write_status write_status;
do {
size_t to_send_size = s->handshake_buffer_size - offset;
@@ -155,28 +159,25 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) {
to_send =
gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset);
+ gpr_slice_buffer_reset_and_unref(&s->outgoing);
+ gpr_slice_buffer_add(&s->outgoing, to_send);
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1,
- on_handshake_data_sent_to_peer, s);
- if (write_status == GRPC_ENDPOINT_WRITE_ERROR) {
- gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
- secure_transport_setup_done(s, 0);
- } else if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
- on_handshake_data_sent_to_peer(s, GRPC_ENDPOINT_CB_OK);
- }
-}
-
-static void cleanup_slices(gpr_slice *slices, size_t num_slices) {
- size_t i;
- for (i = 0; i < num_slices; i++) {
- gpr_slice_unref(slices[i]);
+ switch (grpc_endpoint_write(s->wrapped_endpoint, &s->outgoing,
+ &s->on_handshake_data_sent_to_peer)) {
+ case GRPC_ENDPOINT_ERROR:
+ gpr_log(GPR_ERROR, "Could not send handshake data to peer.");
+ secure_transport_setup_done(s, 0);
+ break;
+ case GRPC_ENDPOINT_DONE:
+ on_handshake_data_sent_to_peer(s, 1);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
}
}
-static void on_handshake_data_received_from_peer(
- void *setup, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static void on_handshake_data_received_from_peer(void *setup, int success) {
grpc_secure_transport_setup *s = setup;
size_t consumed_slice_size = 0;
tsi_result result = TSI_OK;
@@ -184,32 +185,37 @@ static void on_handshake_data_received_from_peer(
size_t num_left_overs;
int has_left_overs_in_current_slice = 0;
- if (error != GRPC_ENDPOINT_CB_OK) {
+ if (!success) {
gpr_log(GPR_ERROR, "Read failed.");
- cleanup_slices(slices, nslices);
secure_transport_setup_done(s, 0);
return;
}
- for (i = 0; i < nslices; i++) {
- consumed_slice_size = GPR_SLICE_LENGTH(slices[i]);
+ for (i = 0; i < s->incoming.count; i++) {
+ consumed_slice_size = GPR_SLICE_LENGTH(s->incoming.slices[i]);
result = tsi_handshaker_process_bytes_from_peer(
- s->handshaker, GPR_SLICE_START_PTR(slices[i]), &consumed_slice_size);
+ s->handshaker, GPR_SLICE_START_PTR(s->incoming.slices[i]),
+ &consumed_slice_size);
if (!tsi_handshaker_is_in_progress(s->handshaker)) break;
}
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* We may need more data. */
if (result == TSI_INCOMPLETE_DATA) {
- /* TODO(klempner,jboeuf): This should probably use the client setup
- deadline */
- grpc_endpoint_notify_on_read(s->wrapped_endpoint,
- on_handshake_data_received_from_peer, setup);
- cleanup_slices(slices, nslices);
+ switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
+ &s->on_handshake_data_received_from_peer)) {
+ case GRPC_ENDPOINT_DONE:
+ on_handshake_data_received_from_peer(s, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ on_handshake_data_received_from_peer(s, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
return;
} else {
send_handshake_bytes_to_peer(s);
- cleanup_slices(slices, nslices);
return;
}
}
@@ -217,42 +223,40 @@ static void on_handshake_data_received_from_peer(
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshake failed with error %s",
tsi_result_to_string(result));
- cleanup_slices(slices, nslices);
secure_transport_setup_done(s, 0);
return;
}
/* Handshake is done and successful this point. */
has_left_overs_in_current_slice =
- (consumed_slice_size < GPR_SLICE_LENGTH(slices[i]));
- num_left_overs = (has_left_overs_in_current_slice ? 1 : 0) + nslices - i - 1;
+ (consumed_slice_size < GPR_SLICE_LENGTH(s->incoming.slices[i]));
+ num_left_overs =
+ (has_left_overs_in_current_slice ? 1 : 0) + s->incoming.count - i - 1;
if (num_left_overs == 0) {
- cleanup_slices(slices, nslices);
check_peer(s);
return;
}
- cleanup_slices(slices, nslices - num_left_overs);
-
/* Put the leftovers in our buffer (ownership transfered). */
if (has_left_overs_in_current_slice) {
- gpr_slice_buffer_add(&s->left_overs,
- gpr_slice_split_tail(&slices[i], consumed_slice_size));
- gpr_slice_unref(slices[i]); /* split_tail above increments refcount. */
+ gpr_slice_buffer_add(
+ &s->left_overs,
+ gpr_slice_split_tail(&s->incoming.slices[i], consumed_slice_size));
+ gpr_slice_unref(
+ s->incoming.slices[i]); /* split_tail above increments refcount. */
}
gpr_slice_buffer_addn(
- &s->left_overs, &slices[i + 1],
+ &s->left_overs, &s->incoming.slices[i + 1],
num_left_overs - (size_t)has_left_overs_in_current_slice);
check_peer(s);
}
/* If setup is NULL, the setup is done. */
-static void on_handshake_data_sent_to_peer(void *setup,
- grpc_endpoint_cb_status error) {
+static void on_handshake_data_sent_to_peer(void *setup, int success) {
grpc_secure_transport_setup *s = setup;
/* Make sure that write is OK. */
- if (error != GRPC_ENDPOINT_CB_OK) {
- gpr_log(GPR_ERROR, "Write failed with error %d.", error);
+ if (!success) {
+ gpr_log(GPR_ERROR, "Write failed.");
if (setup != NULL) secure_transport_setup_done(s, 0);
return;
}
@@ -261,8 +265,17 @@ static void on_handshake_data_sent_to_peer(void *setup,
if (tsi_handshaker_is_in_progress(s->handshaker)) {
/* TODO(klempner,jboeuf): This should probably use the client setup
deadline */
- grpc_endpoint_notify_on_read(s->wrapped_endpoint,
- on_handshake_data_received_from_peer, setup);
+ switch (grpc_endpoint_read(s->wrapped_endpoint, &s->incoming,
+ &s->on_handshake_data_received_from_peer)) {
+ case GRPC_ENDPOINT_ERROR:
+ on_handshake_data_received_from_peer(s, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ case GRPC_ENDPOINT_DONE:
+ on_handshake_data_received_from_peer(s, 1);
+ break;
+ }
} else {
check_peer(s);
}
@@ -288,6 +301,12 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
s->wrapped_endpoint = nonsecure_endpoint;
s->user_data = user_data;
s->cb = cb;
+ grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer,
+ on_handshake_data_sent_to_peer, s);
+ grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer,
+ on_handshake_data_received_from_peer, s);
gpr_slice_buffer_init(&s->left_overs);
+ gpr_slice_buffer_init(&s->outgoing);
+ gpr_slice_buffer_init(&s->incoming);
send_handshake_bytes_to_peer(s);
}
diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c
index 987d5cb9b5..6482ef9c9f 100644
--- a/src/core/support/slice_buffer.c
+++ b/src/core/support/slice_buffer.c
@@ -207,3 +207,25 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) {
src->count = 0;
src->length = 0;
}
+
+void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n) {
+ GPR_ASSERT(n <= sb->length);
+ sb->length -= n;
+ for (;;) {
+ size_t idx = sb->count - 1;
+ gpr_slice slice = sb->slices[idx];
+ size_t slice_len = GPR_SLICE_LENGTH(slice);
+ if (slice_len > n) {
+ sb->slices[idx] = gpr_slice_sub_no_ref(slice, 0, slice_len - n);
+ return;
+ } else if (slice_len == n) {
+ gpr_slice_unref(slice);
+ sb->count = idx;
+ return;
+ } else {
+ gpr_slice_unref(slice);
+ n -= slice_len;
+ sb->count = idx;
+ }
+ }
+}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 42cf0ecd5b..a1b773b1ca 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -214,6 +214,8 @@ typedef struct {
grpc_chttp2_hpack_compressor hpack_compressor;
/** is this a client? */
gpr_uint8 is_client;
+ /** callback for when writing is done */
+ grpc_iomgr_closure done_cb;
} grpc_chttp2_transport_writing;
struct grpc_chttp2_transport_parsing {
@@ -329,8 +331,11 @@ struct grpc_chttp2_transport {
/** closure to execute writing */
grpc_iomgr_closure writing_action;
- /** closure to start reading from the endpoint */
- grpc_iomgr_closure reading_action;
+ /** closure to finish reading from the endpoint */
+ grpc_iomgr_closure recv_data;
+
+ /** incoming read bytes */
+ gpr_slice_buffer read_buffer;
/** address to place a newly accepted stream - set and unset by
grpc_chttp2_parsing_accept_stream; used by init_stream to
@@ -463,8 +468,7 @@ int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
void grpc_chttp2_perform_writes(
grpc_chttp2_transport_writing *transport_writing, grpc_endpoint *endpoint);
-void grpc_chttp2_terminate_writing(
- grpc_chttp2_transport_writing *transport_writing, int success);
+void grpc_chttp2_terminate_writing(void *transport_writing, int success);
void grpc_chttp2_cleanup_writing(grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 123061b3fc..2c8c48f47b 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -37,7 +37,6 @@
#include <grpc/support/log.h>
static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing);
-static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status);
int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_global *transport_global,
@@ -165,16 +164,15 @@ void grpc_chttp2_perform_writes(
GPR_ASSERT(transport_writing->outbuf.count > 0);
GPR_ASSERT(endpoint);
- switch (grpc_endpoint_write(endpoint, transport_writing->outbuf.slices,
- transport_writing->outbuf.count, finish_write_cb,
- transport_writing)) {
- case GRPC_ENDPOINT_WRITE_DONE:
+ switch (grpc_endpoint_write(endpoint, &transport_writing->outbuf,
+ &transport_writing->done_cb)) {
+ case GRPC_ENDPOINT_DONE:
grpc_chttp2_terminate_writing(transport_writing, 1);
break;
- case GRPC_ENDPOINT_WRITE_ERROR:
+ case GRPC_ENDPOINT_ERROR:
grpc_chttp2_terminate_writing(transport_writing, 0);
break;
- case GRPC_ENDPOINT_WRITE_PENDING:
+ case GRPC_ENDPOINT_PENDING:
break;
}
}
@@ -209,12 +207,6 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
}
}
-static void finish_write_cb(void *tw, grpc_endpoint_cb_status write_status) {
- grpc_chttp2_transport_writing *transport_writing = tw;
- grpc_chttp2_terminate_writing(transport_writing,
- write_status == GRPC_ENDPOINT_CB_OK);
-}
-
void grpc_chttp2_cleanup_writing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
@@ -243,6 +235,5 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_list_add_read_write_state_changed(transport_global,
stream_global);
}
- transport_writing->outbuf.count = 0;
- transport_writing->outbuf.length = 0;
+ gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 1bbd210e46..8caa10c938 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -84,15 +84,13 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(void *t, int iomgr_success_ignored);
-static void reading_action(void *t, int iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
/** Endpoint callback to process incoming data */
-static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error);
+static void recv_data(void *tp, int success);
/** Start disconnection chain */
static void drop_connection(grpc_chttp2_transport *t);
@@ -143,6 +141,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
gpr_slice_buffer_destroy(&t->parsing.qbuf);
+ gpr_slice_buffer_destroy(&t->read_buffer);
grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
@@ -249,12 +248,16 @@ static void init_transport(grpc_chttp2_transport *t,
gpr_slice_buffer_init(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
- grpc_iomgr_closure_init(&t->reading_action, reading_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
+ grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+ &t->writing);
+ grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
+ gpr_slice_buffer_init(&t->read_buffer);
+
if (is_client) {
gpr_slice_buffer_add(
&t->global.qbuf,
@@ -502,8 +505,8 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
}
-void grpc_chttp2_terminate_writing(
- grpc_chttp2_transport_writing *transport_writing, int success) {
+void grpc_chttp2_terminate_writing(void *transport_writing_ptr, int success) {
+ grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
lock(t);
@@ -1060,74 +1063,76 @@ static void read_error_locked(grpc_chttp2_transport *t) {
}
/* tcp read callback */
-static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
- grpc_chttp2_transport *t = tp;
+static int recv_data_loop(grpc_chttp2_transport *t, int *success) {
size_t i;
- int unref = 0;
+ int keep_reading = 0;
- switch (error) {
- case GRPC_ENDPOINT_CB_SHUTDOWN:
- case GRPC_ENDPOINT_CB_EOF:
- case GRPC_ENDPOINT_CB_ERROR:
- lock(t);
+ lock(t);
+ i = 0;
+ GPR_ASSERT(!t->parsing_active);
+ if (!t->closed) {
+ t->parsing_active = 1;
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
+ gpr_mu_unlock(&t->mu);
+ for (; i < t->read_buffer.count &&
+ grpc_chttp2_perform_read(&t->parsing, t->read_buffer.slices[i]);
+ i++)
+ ;
+ gpr_mu_lock(&t->mu);
+ if (i != t->read_buffer.count) {
drop_connection(t);
- read_error_locked(t);
- unlock(t);
- unref = 1;
- for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
- break;
- case GRPC_ENDPOINT_CB_OK:
- lock(t);
- i = 0;
- GPR_ASSERT(!t->parsing_active);
- if (!t->closed) {
- t->parsing_active = 1;
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
- gpr_mu_unlock(&t->mu);
- for (; i < nslices && grpc_chttp2_perform_read(&t->parsing, slices[i]);
- i++) {
- gpr_slice_unref(slices[i]);
- }
- gpr_mu_lock(&t->mu);
- if (i != nslices) {
- drop_connection(t);
- }
- /* merge stream lists */
- grpc_chttp2_stream_map_move_into(&t->new_stream_map,
- &t->parsing_stream_map);
- t->global.concurrent_stream_count =
- grpc_chttp2_stream_map_size(&t->parsing_stream_map);
- if (t->parsing.initial_window_update != 0) {
- grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
- update_global_window, t);
- t->parsing.initial_window_update = 0;
- }
- /* handle higher level things */
- grpc_chttp2_publish_reads(&t->global, &t->parsing);
- t->parsing_active = 0;
- }
- if (i == nslices) {
- grpc_chttp2_schedule_closure(&t->global, &t->reading_action, 1);
- } else {
- read_error_locked(t);
- unref = 1;
- }
- unlock(t);
- for (; i < nslices; i++) gpr_slice_unref(slices[i]);
- break;
+ }
+ /* merge stream lists */
+ grpc_chttp2_stream_map_move_into(&t->new_stream_map,
+ &t->parsing_stream_map);
+ t->global.concurrent_stream_count =
+ grpc_chttp2_stream_map_size(&t->parsing_stream_map);
+ if (t->parsing.initial_window_update != 0) {
+ grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
+ update_global_window, t);
+ t->parsing.initial_window_update = 0;
+ }
+ /* handle higher level things */
+ grpc_chttp2_publish_reads(&t->global, &t->parsing);
+ t->parsing_active = 0;
+ }
+ if (!*success || i != t->read_buffer.count) {
+ drop_connection(t);
+ read_error_locked(t);
+ } else {
+ keep_reading = 1;
}
- if (unref) {
+ gpr_slice_buffer_reset_and_unref(&t->read_buffer);
+ unlock(t);
+
+ if (keep_reading) {
+ switch (grpc_endpoint_read(t->ep, &t->read_buffer, &t->recv_data)) {
+ case GRPC_ENDPOINT_DONE:
+ *success = 1;
+ return 1;
+ case GRPC_ENDPOINT_ERROR:
+ *success = 0;
+ return 1;
+ case GRPC_ENDPOINT_PENDING:
+ return 0;
+ }
+ } else {
UNREF_TRANSPORT(t, "recv_data");
+ return 0;
}
+
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
}
-static void reading_action(void *pt, int iomgr_success_ignored) {
- grpc_chttp2_transport *t = pt;
- grpc_endpoint_notify_on_read(t->ep, recv_data, t);
+static void recv_data(void *tp, int success) {
+ grpc_chttp2_transport *t = tp;
+
+ while (recv_data_loop(t, &success))
+ ;
}
/*
@@ -1240,5 +1245,6 @@ void grpc_chttp2_transport_start_reading(grpc_transport *transport,
gpr_slice *slices, size_t nslices) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
- recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
+ gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
+ recv_data(t, 1);
}