aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/http/parser.c.orig357
-rw-r--r--src/core/lib/iomgr/endpoint.c4
-rw-r--r--src/core/lib/iomgr/endpoint.h4
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c277
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c3
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c33
-rw-r--r--src/core/lib/iomgr/ev_posix.c4
-rw-r--r--src/core/lib/iomgr/ev_posix.h4
-rw-r--r--src/core/lib/iomgr/exec_ctx.c10
-rw-r--r--src/core/lib/iomgr/exec_ctx.h6
-rw-r--r--src/core/lib/iomgr/iomgr.c3
-rw-r--r--src/core/lib/iomgr/network_status_tracker.c24
-rw-r--r--src/core/lib/iomgr/network_status_tracker.h4
-rw-r--r--src/core/lib/iomgr/tcp_posix.c18
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c10
-rw-r--r--src/core/lib/iomgr/tcp_windows.c13
-rw-r--r--src/core/lib/iomgr/workqueue.h39
-rw-r--r--src/core/lib/iomgr/workqueue_posix.c8
-rw-r--r--src/core/lib/iomgr/workqueue_posix.h5
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c22
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c4
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c18
-rw-r--r--src/core/lib/support/time.c122
-rw-r--r--src/core/lib/surface/byte_buffer_reader.c10
-rw-r--r--src/core/lib/surface/call.c9
-rw-r--r--src/core/lib/surface/channel.c2
-rw-r--r--src/core/lib/surface/server.c76
-rw-r--r--src/core/lib/surface/version.c2
-rw-r--r--src/core/lib/transport/connectivity_state.c3
29 files changed, 436 insertions, 658 deletions
diff --git a/src/core/lib/http/parser.c.orig b/src/core/lib/http/parser.c.orig
deleted file mode 100644
index 74d90fd8bf..0000000000
--- a/src/core/lib/http/parser.c.orig
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/http/parser.h"
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-int grpc_http1_trace = 0;
-
-static char *buf2str(void *buffer, size_t length) {
- char *out = gpr_malloc(length + 1);
- memcpy(out, buffer, length);
- out[length] = 0;
- return out;
-}
-
-static grpc_error *handle_response_line(grpc_http_parser *parser) {
- uint8_t *beg = parser->cur_line;
- uint8_t *cur = beg;
- uint8_t *end = beg + parser->cur_line_length;
-
- if (cur == end || *cur++ != 'H') return GRPC_ERROR_CREATE("Expected 'H'");
- if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'");
- if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'");
- if (cur == end || *cur++ != 'P') return GRPC_ERROR_CREATE("Expected 'P'");
- if (cur == end || *cur++ != '/') return GRPC_ERROR_CREATE("Expected '/'");
- if (cur == end || *cur++ != '1') return GRPC_ERROR_CREATE("Expected '1'");
- if (cur == end || *cur++ != '.') return GRPC_ERROR_CREATE("Expected '.'");
- if (cur == end || *cur < '0' || *cur++ > '1') {
- return GRPC_ERROR_CREATE("Expected HTTP/1.0 or HTTP/1.1");
- }
- if (cur == end || *cur++ != ' ') return GRPC_ERROR_CREATE("Expected ' '");
- if (cur == end || *cur < '1' || *cur++ > '9')
- return GRPC_ERROR_CREATE("Expected status code");
- if (cur == end || *cur < '0' || *cur++ > '9')
- return GRPC_ERROR_CREATE("Expected status code");
- if (cur == end || *cur < '0' || *cur++ > '9')
- return GRPC_ERROR_CREATE("Expected status code");
- parser->http.response->status =
- (cur[-3] - '0') * 100 + (cur[-2] - '0') * 10 + (cur[-1] - '0');
- if (cur == end || *cur++ != ' ') return GRPC_ERROR_CREATE("Expected ' '");
-
- /* we don't really care about the status code message */
-
- return GRPC_ERROR_NONE;
-}
-
-static grpc_error *handle_request_line(grpc_http_parser *parser) {
- uint8_t *beg = parser->cur_line;
- uint8_t *cur = beg;
- uint8_t *end = beg + parser->cur_line_length;
- uint8_t vers_major = 0;
- uint8_t vers_minor = 0;
-
- while (cur != end && *cur++ != ' ')
- ;
- if (cur == end) return GRPC_ERROR_CREATE("No method on HTTP request line");
- parser->http.request->method = buf2str(beg, (size_t)(cur - beg - 1));
-
- beg = cur;
- while (cur != end && *cur++ != ' ')
- ;
- if (cur == end) return GRPC_ERROR_CREATE("No path on HTTP request line");
- parser->http.request->path = buf2str(beg, (size_t)(cur - beg - 1));
-
- if (cur == end || *cur++ != 'H') return GRPC_ERROR_CREATE("Expected 'H'");
- if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'");
- if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'");
- if (cur == end || *cur++ != 'P') return GRPC_ERROR_CREATE("Expected 'P'");
- if (cur == end || *cur++ != '/') return GRPC_ERROR_CREATE("Expected '/'");
- vers_major = (uint8_t)(*cur++ - '1' + 1);
- ++cur;
- if (cur == end)
- return GRPC_ERROR_CREATE("End of line in HTTP version string");
- vers_minor = (uint8_t)(*cur++ - '1' + 1);
-
- if (vers_major == 1) {
- if (vers_minor == 0) {
- parser->http.request->version = GRPC_HTTP_HTTP10;
- } else if (vers_minor == 1) {
- parser->http.request->version = GRPC_HTTP_HTTP11;
- } else {
- return GRPC_ERROR_CREATE(
- "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0");
- }
- } else if (vers_major == 2) {
- if (vers_minor == 0) {
- parser->http.request->version = GRPC_HTTP_HTTP20;
- } else {
- return GRPC_ERROR_CREATE(
- "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0");
- }
- } else {
- return GRPC_ERROR_CREATE("Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0");
- }
-
- return GRPC_ERROR_NONE;
-}
-
-static grpc_error *handle_first_line(grpc_http_parser *parser) {
- switch (parser->type) {
- case GRPC_HTTP_REQUEST:
- return handle_request_line(parser);
- case GRPC_HTTP_RESPONSE:
- return handle_response_line(parser);
- }
- GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here"));
-}
-
-static grpc_error *add_header(grpc_http_parser *parser) {
- uint8_t *beg = parser->cur_line;
- uint8_t *cur = beg;
- uint8_t *end = beg + parser->cur_line_length;
- size_t *hdr_count = NULL;
- grpc_http_header **hdrs = NULL;
- grpc_http_header hdr = {NULL, NULL};
- grpc_error *error = GRPC_ERROR_NONE;
-
- GPR_ASSERT(cur != end);
-
- if (*cur == ' ' || *cur == '\t') {
- error = GRPC_ERROR_CREATE("Continued header lines not supported yet");
- goto done;
- }
-
- while (cur != end && *cur != ':') {
- cur++;
- }
- if (cur == end) {
-<<<<<<< HEAD
- error = GRPC_ERROR_CREATE("Didn't find ':' in header string");
- goto done;
-=======
- if (grpc_http1_trace) {
- gpr_log(GPR_ERROR, "Didn't find ':' in header string");
- }
- goto error;
->>>>>>> a709afe241d8b264a1c326315f757b4a8d330207
- }
- GPR_ASSERT(cur >= beg);
- hdr.key = buf2str(beg, (size_t)(cur - beg));
- cur++; /* skip : */
-
- while (cur != end && (*cur == ' ' || *cur == '\t')) {
- cur++;
- }
- GPR_ASSERT((size_t)(end - cur) >= parser->cur_line_end_length);
- hdr.value = buf2str(cur, (size_t)(end - cur) - parser->cur_line_end_length);
-
- switch (parser->type) {
- case GRPC_HTTP_RESPONSE:
- hdr_count = &parser->http.response->hdr_count;
- hdrs = &parser->http.response->hdrs;
- break;
- case GRPC_HTTP_REQUEST:
- hdr_count = &parser->http.request->hdr_count;
- hdrs = &parser->http.request->hdrs;
- break;
- }
-
- if (*hdr_count == parser->hdr_capacity) {
- parser->hdr_capacity =
- GPR_MAX(parser->hdr_capacity + 1, parser->hdr_capacity * 3 / 2);
- *hdrs = gpr_realloc(*hdrs, parser->hdr_capacity * sizeof(**hdrs));
- }
- (*hdrs)[(*hdr_count)++] = hdr;
-
-done:
- if (error != GRPC_ERROR_NONE) {
- gpr_free(hdr.key);
- gpr_free(hdr.value);
- }
- return error;
-}
-
-static grpc_error *finish_line(grpc_http_parser *parser) {
- grpc_error *err;
- switch (parser->state) {
- case GRPC_HTTP_FIRST_LINE:
- err = handle_first_line(parser);
- if (err != GRPC_ERROR_NONE) return err;
- parser->state = GRPC_HTTP_HEADERS;
- break;
- case GRPC_HTTP_HEADERS:
- if (parser->cur_line_length == parser->cur_line_end_length) {
- parser->state = GRPC_HTTP_BODY;
- break;
- }
- err = add_header(parser);
- if (err != GRPC_ERROR_NONE) {
- return err;
- }
- break;
- case GRPC_HTTP_BODY:
- GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here"));
- }
-
- parser->cur_line_length = 0;
- return GRPC_ERROR_NONE;
-}
-
-static grpc_error *addbyte_body(grpc_http_parser *parser, uint8_t byte) {
- size_t *body_length = NULL;
- char **body = NULL;
-
- if (parser->type == GRPC_HTTP_RESPONSE) {
- body_length = &parser->http.response->body_length;
- body = &parser->http.response->body;
- } else if (parser->type == GRPC_HTTP_REQUEST) {
- body_length = &parser->http.request->body_length;
- body = &parser->http.request->body;
- } else {
- GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here"));
- }
-
- if (*body_length == parser->body_capacity) {
- parser->body_capacity = GPR_MAX(8, parser->body_capacity * 3 / 2);
- *body = gpr_realloc((void *)*body, parser->body_capacity);
- }
- (*body)[*body_length] = (char)byte;
- (*body_length)++;
-
- return GRPC_ERROR_NONE;
-}
-
-static bool check_line(grpc_http_parser *parser) {
- if (parser->cur_line_length >= 2 &&
- parser->cur_line[parser->cur_line_length - 2] == '\r' &&
- parser->cur_line[parser->cur_line_length - 1] == '\n') {
- return true;
- }
-
- // HTTP request with \n\r line termiantors.
- else if (parser->cur_line_length >= 2 &&
- parser->cur_line[parser->cur_line_length - 2] == '\n' &&
- parser->cur_line[parser->cur_line_length - 1] == '\r') {
- return true;
- }
-
- // HTTP request with only \n line terminators.
- else if (parser->cur_line_length >= 1 &&
- parser->cur_line[parser->cur_line_length - 1] == '\n') {
- parser->cur_line_end_length = 1;
- return true;
- }
-
- return false;
-}
-
-static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte) {
- switch (parser->state) {
- case GRPC_HTTP_FIRST_LINE:
- case GRPC_HTTP_HEADERS:
- if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) {
- if (grpc_http1_trace)
- gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded",
- GRPC_HTTP_PARSER_MAX_HEADER_LENGTH);
- return 0;
- }
- parser->cur_line[parser->cur_line_length] = byte;
- parser->cur_line_length++;
- if (check_line(parser)) {
- return finish_line(parser);
- } else {
- return GRPC_ERROR_NONE;
- }
- GPR_UNREACHABLE_CODE(return 0);
- case GRPC_HTTP_BODY:
- return addbyte_body(parser, byte);
- }
- GPR_UNREACHABLE_CODE(return 0);
-}
-
-void grpc_http_parser_init(grpc_http_parser *parser, grpc_http_type type,
- void *request_or_response) {
- memset(parser, 0, sizeof(*parser));
- parser->state = GRPC_HTTP_FIRST_LINE;
- parser->type = type;
- parser->http.request_or_response = request_or_response;
- parser->cur_line_end_length = 2;
-}
-
-void grpc_http_parser_destroy(grpc_http_parser *parser) {}
-
-void grpc_http_request_destroy(grpc_http_request *request) {
- size_t i;
- gpr_free(request->body);
- for (i = 0; i < request->hdr_count; i++) {
- gpr_free(request->hdrs[i].key);
- gpr_free(request->hdrs[i].value);
- }
- gpr_free(request->hdrs);
- gpr_free(request->method);
- gpr_free(request->path);
-}
-
-void grpc_http_response_destroy(grpc_http_response *response) {
- size_t i;
- gpr_free(response->body);
- for (i = 0; i < response->hdr_count; i++) {
- gpr_free(response->hdrs[i].key);
- gpr_free(response->hdrs[i].value);
- }
- gpr_free(response->hdrs);
-}
-
-grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice) {
- size_t i;
-
- for (i = 0; i < GPR_SLICE_LENGTH(slice); i++) {
- grpc_error *err = addbyte(parser, GPR_SLICE_START_PTR(slice)[i]);
- if (err != GRPC_ERROR_NONE) return err;
- }
-
- return GRPC_ERROR_NONE;
-}
-
-grpc_error *grpc_http_parser_eof(grpc_http_parser *parser) {
- if (parser->state != GRPC_HTTP_BODY) {
- return GRPC_ERROR_CREATE("Did not finish headers");
- }
- return GRPC_ERROR_NONE;
-}
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index 1ab3733d38..f901fcf962 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -65,3 +65,7 @@ void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
return ep->vtable->get_peer(ep);
}
+
+grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
+ return ep->vtable->get_workqueue(ep);
+}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index f9808bbda1..894efc0b23 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -51,6 +51,7 @@ struct grpc_endpoint_vtable {
gpr_slice_buffer *slices, grpc_closure *cb);
void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *slices, grpc_closure *cb);
+ grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep);
void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -69,6 +70,9 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
+/* Retrieve a reference to the workqueue associated with this endpoint */
+grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep);
+
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index cf0fe736a0..6a63c4d1d1 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -57,6 +57,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -113,9 +114,7 @@ struct grpc_fd {
grpc_closure *read_closure;
grpc_closure *write_closure;
- /* The polling island to which this fd belongs to and the mutex protecting the
- the field */
- gpr_mu pi_mu;
+ /* The polling island to which this fd belongs to (protected by mu) */
struct polling_island *polling_island;
struct grpc_fd *freelist_next;
@@ -152,16 +151,17 @@ static void fd_global_shutdown(void);
* Polling island Declarations
*/
-// #define GRPC_PI_REF_COUNT_DEBUG
+//#define GRPC_PI_REF_COUNT_DEBUG
#ifdef GRPC_PI_REF_COUNT_DEBUG
#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
-#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
+#define PI_UNREF(exec_ctx, p, r) \
+ pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
#define PI_ADD_REF(p, r) pi_add_ref((p))
-#define PI_UNREF(p, r) pi_unref((p))
+#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
@@ -172,7 +172,7 @@ typedef struct polling_island {
Once the ref count becomes zero, this structure is destroyed which means
we should ensure that there is never a scenario where a PI_ADD_REF() is
racing with a PI_UNREF() that just made the ref_count zero. */
- gpr_refcount ref_count;
+ gpr_atm ref_count;
/* Pointer to the polling_island this merged into.
* merged_to value is only set once in polling_island's lifetime (and that too
@@ -184,6 +184,9 @@ typedef struct polling_island {
* (except mu and ref_count) are invalid and must be ignored. */
gpr_atm merged_to;
+ /* The workqueue associated with this polling island */
+ grpc_workqueue *workqueue;
+
/* The fd of the underlying epoll set */
int epoll_fd;
@@ -191,11 +194,6 @@ typedef struct polling_island {
size_t fd_cnt;
size_t fd_capacity;
grpc_fd **fds;
-
- /* Polling islands that are no longer needed are kept in a freelist so that
- they can be reused. This field points to the next polling island in the
- free list */
- struct polling_island *next_free;
} polling_island;
/*******************************************************************************
@@ -253,13 +251,14 @@ struct grpc_pollset_set {
* Common helpers
*/
-static void append_error(grpc_error **composite, grpc_error *error,
+static bool append_error(grpc_error **composite, grpc_error *error,
const char *desc) {
- if (error == GRPC_ERROR_NONE) return;
+ if (error == GRPC_ERROR_NONE) return true;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE(desc);
}
*composite = grpc_error_add_child(*composite, error);
+ return false;
}
/*******************************************************************************
@@ -275,11 +274,8 @@ static void append_error(grpc_error **composite, grpc_error *error,
threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
static grpc_wakeup_fd polling_island_wakeup_fd;
-/* Polling island freelist */
-static gpr_mu g_pi_freelist_mu;
-static polling_island *g_pi_freelist = NULL;
-
-static void polling_island_delete(); /* Forward declaration */
+/* Forward declaration */
+static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -293,28 +289,35 @@ gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
#ifdef GRPC_PI_REF_COUNT_DEBUG
-void pi_add_ref(polling_island *pi);
-void pi_unref(polling_island *pi);
+static void pi_add_ref(polling_island *pi);
+static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) {
- long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
+static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
+ int line) {
+ long old_cnt = gpr_atm_acq_load(&pi->ref_count);
pi_add_ref(pi);
gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, old_cnt + 1, reason, file, line);
}
-void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) {
- long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
- pi_unref(pi);
+static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
+ char *reason, char *file, int line) {
+ long old_cnt = gpr_atm_acq_load(&pi->ref_count);
+ pi_unref(exec_ctx, pi);
gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
(void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
}
#endif
-void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
+static void pi_add_ref(polling_island *pi) {
+ gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
+}
-void pi_unref(polling_island *pi) {
- /* If ref count went to zero, delete the polling island.
+static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
+ /* If ref count went to one, we're back to just the workqueue owning a ref.
+ Unref the workqueue to break the loop.
+
+ If ref count went to zero, delete the polling island.
Note that this deletion not be done under a lock. Once the ref count goes
to zero, we are guaranteed that no one else holds a reference to the
polling island (and that there is no racing pi_add_ref() call either).
@@ -322,12 +325,20 @@ void pi_unref(polling_island *pi) {
Also, if we are deleting the polling island and the merged_to field is
non-empty, we should remove a ref to the merged_to polling island
*/
- if (gpr_unref(&pi->ref_count)) {
- polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- polling_island_delete(pi);
- if (next != NULL) {
- PI_UNREF(next, "pi_delete"); /* Recursive call */
+ switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
+ case 2: /* last external ref: the only one now owned is by the workqueue */
+ GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
+ break;
+ case 1: {
+ polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
+ polling_island_delete(exec_ctx, pi);
+ if (next != NULL) {
+ PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
+ }
+ break;
}
+ case 0:
+ GPR_UNREACHABLE_CODE(return );
}
}
@@ -462,69 +473,68 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
}
/* Might return NULL in case of an error */
-static polling_island *polling_island_create(grpc_fd *initial_fd,
+static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
+ grpc_fd *initial_fd,
grpc_error **error) {
polling_island *pi = NULL;
- char *err_msg;
const char *err_desc = "polling_island_create";
- /* Try to get one from the polling island freelist */
- gpr_mu_lock(&g_pi_freelist_mu);
- if (g_pi_freelist != NULL) {
- pi = g_pi_freelist;
- g_pi_freelist = g_pi_freelist->next_free;
- pi->next_free = NULL;
- }
- gpr_mu_unlock(&g_pi_freelist_mu);
+ *error = GRPC_ERROR_NONE;
- /* Create new polling island if we could not get one from the free list */
- if (pi == NULL) {
- pi = gpr_malloc(sizeof(*pi));
- gpr_mu_init(&pi->mu);
- pi->fd_cnt = 0;
- pi->fd_capacity = 0;
- pi->fds = NULL;
- }
+ pi = gpr_malloc(sizeof(*pi));
+ gpr_mu_init(&pi->mu);
+ pi->fd_cnt = 0;
+ pi->fd_capacity = 0;
+ pi->fds = NULL;
+ pi->epoll_fd = -1;
+ pi->workqueue = NULL;
- gpr_ref_init(&pi->ref_count, 0);
+ gpr_atm_rel_store(&pi->ref_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
- gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
- strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- } else {
- polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
- pi->next_free = NULL;
+ append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
+ goto done;
+ }
- if (initial_fd != NULL) {
- /* Lock the polling island here just in case we got this structure from
- the freelist and the polling island lock was not released yet (by the
- code that adds the polling island to the freelist) */
- gpr_mu_lock(&pi->mu);
- polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
- gpr_mu_unlock(&pi->mu);
- }
+ polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+
+ if (initial_fd != NULL) {
+ polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
+ }
+
+ if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
+ err_desc) &&
+ *error == GRPC_ERROR_NONE) {
+ polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
+ error);
+ GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
+ pi->workqueue->wakeup_read_fd->polling_island = pi;
+ PI_ADD_REF(pi, "fd");
}
+done:
+ if (*error != GRPC_ERROR_NONE) {
+ if (pi->workqueue != NULL) {
+ GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
+ }
+ polling_island_delete(exec_ctx, pi);
+ pi = NULL;
+ }
return pi;
}
-static void polling_island_delete(polling_island *pi) {
+static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
GPR_ASSERT(pi->fd_cnt == 0);
- gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
-
- close(pi->epoll_fd);
- pi->epoll_fd = -1;
-
- gpr_mu_lock(&g_pi_freelist_mu);
- pi->next_free = g_pi_freelist;
- g_pi_freelist = pi;
- gpr_mu_unlock(&g_pi_freelist_mu);
+ if (pi->epoll_fd >= 0) {
+ close(pi->epoll_fd);
+ }
+ gpr_mu_destroy(&pi->mu);
+ gpr_free(pi->fds);
+ gpr_free(pi);
}
/* Attempts to gets the last polling island in the linked list (liked by the
@@ -704,9 +714,6 @@ static polling_island *polling_island_merge(polling_island *p,
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
- gpr_mu_init(&g_pi_freelist_mu);
- g_pi_freelist = NULL;
-
error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
if (error == GRPC_ERROR_NONE) {
error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
@@ -716,18 +723,6 @@ static grpc_error *polling_island_global_init() {
}
static void polling_island_global_shutdown() {
- polling_island *next;
- gpr_mu_lock(&g_pi_freelist_mu);
- gpr_mu_unlock(&g_pi_freelist_mu);
- while (g_pi_freelist != NULL) {
- next = g_pi_freelist->next_free;
- gpr_mu_destroy(&g_pi_freelist->mu);
- gpr_free(g_pi_freelist->fds);
- gpr_free(g_pi_freelist);
- g_pi_freelist = next;
- }
- gpr_mu_destroy(&g_pi_freelist_mu);
-
grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
}
@@ -845,7 +840,6 @@ static grpc_fd *fd_create(int fd, const char *name) {
if (new_fd == NULL) {
new_fd = gpr_malloc(sizeof(grpc_fd));
gpr_mu_init(&new_fd->mu);
- gpr_mu_init(&new_fd->pi_mu);
}
/* Note: It is not really needed to get the new_fd->mu lock here. If this is a
@@ -896,6 +890,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
const char *reason) {
bool is_fd_closed = false;
grpc_error *error = GRPC_ERROR_NONE;
+ polling_island *unref_pi = NULL;
gpr_mu_lock(&fd->mu);
fd->on_done_closure = on_done;
@@ -923,21 +918,26 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- Unlock the latest polling island
- Set fd->polling_island to NULL (but remove the ref on the polling island
before doing this.) */
- gpr_mu_lock(&fd->pi_mu);
if (fd->polling_island != NULL) {
polling_island *pi_latest = polling_island_lock(fd->polling_island);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
- PI_UNREF(fd->polling_island, "fd_orphan");
+ unref_pi = fd->polling_island;
fd->polling_island = NULL;
}
- gpr_mu_unlock(&fd->pi_mu);
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
+ if (unref_pi != NULL) {
+ /* Unref stale polling island here, outside the fd lock above.
+ The polling island owns a workqueue which owns an fd, and unreffing
+ inside the lock can cause an eventual lock loop that makes TSAN very
+ unhappy. */
+ PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
+ }
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
}
@@ -1037,6 +1037,17 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ grpc_workqueue *workqueue = NULL;
+ if (fd->polling_island != NULL) {
+ workqueue =
+ GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
+ }
+ gpr_mu_unlock(&fd->mu);
+ return workqueue;
+}
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1227,9 +1238,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_unlock(&fd->mu);
}
-static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
+static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *ps, char *reason) {
if (ps->polling_island != NULL) {
- PI_UNREF(ps->polling_island, reason);
+ PI_UNREF(exec_ctx, ps->polling_island, reason);
}
ps->polling_island = NULL;
}
@@ -1242,7 +1254,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
pollset->finish_shutdown_called = true;
/* Release the ref and set pollset->polling_island to NULL */
- pollset_release_polling_island(pollset, "ps_shutdown");
+ pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
@@ -1281,7 +1293,7 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset->finish_shutdown_called = false;
pollset->kicked_without_pollers = false;
pollset->shutdown_done = NULL;
- pollset_release_polling_island(pollset, "ps_reset");
+ GPR_ASSERT(pollset->polling_island == NULL);
}
#define GRPC_EPOLL_MAX_EVENTS 1000
@@ -1309,7 +1321,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
this function (i.e pollset_work_and_unlock()) is called */
if (pollset->polling_island == NULL) {
- pollset->polling_island = polling_island_create(NULL, error);
+ pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
if (pollset->polling_island == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */
@@ -1329,7 +1341,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
polling island to be deleted */
PI_ADD_REF(pi, "ps");
- PI_UNREF(pollset->polling_island, "ps");
+ PI_UNREF(exec_ctx, pollset->polling_island, "ps");
pollset->polling_island = pi;
}
@@ -1400,7 +1412,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
that we got before releasing the polling island lock). This is because
pollset->polling_island pointer might get udpated in other parts of the
code when there is an island merge while we are doing epoll_wait() above */
- PI_UNREF(pi, "ps_work");
+ PI_UNREF(exec_ctx, pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
}
@@ -1517,10 +1529,11 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&pollset->mu);
- gpr_mu_lock(&fd->pi_mu);
+ gpr_mu_lock(&fd->mu);
polling_island *pi_new = NULL;
+retry:
/* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
* equal, do nothing.
* 2) If fd->polling_island and pollset->polling_island are both NULL, create
@@ -1535,15 +1548,44 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* polling_island fields in both fd and pollset to point to the merged
* polling island.
*/
+
+ if (fd->orphaned) {
+ gpr_mu_unlock(&fd->mu);
+ gpr_mu_unlock(&pollset->mu);
+ /* early out */
+ return;
+ }
+
if (fd->polling_island == pollset->polling_island) {
pi_new = fd->polling_island;
if (pi_new == NULL) {
- pi_new = polling_island_create(fd, &error);
-
- GRPC_POLLING_TRACE(
- "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
- "pollset: %p)",
- (void *)pi_new, fd->fd, (void *)pollset);
+ /* Unlock before creating a new polling island: the polling island will
+ create a workqueue which creates a file descriptor, and holding an fd
+ lock here can eventually cause a loop to appear to TSAN (making it
+ unhappy). We don't think it's a real loop (there's an epoch point where
+ that loop possibility disappears), but the advantages of keeping TSAN
+ happy outweigh any performance advantage we might have by keeping the
+ lock held. */
+ gpr_mu_unlock(&fd->mu);
+ pi_new = polling_island_create(exec_ctx, fd, &error);
+ gpr_mu_lock(&fd->mu);
+ /* Need to reverify any assumptions made between the initial lock and
+ getting to this branch: if they've changed, we need to throw away our
+ work and figure things out again. */
+ if (fd->polling_island != NULL) {
+ GRPC_POLLING_TRACE(
+ "pollset_add_fd: Raced creating new polling island. pi_new: %p "
+ "(fd: %d, pollset: %p)",
+ (void *)pi_new, fd->fd, (void *)pollset);
+ PI_ADD_REF(pi_new, "dance_of_destruction");
+ PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
+ goto retry;
+ } else {
+ GRPC_POLLING_TRACE(
+ "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
+ "pollset: %p)",
+ (void *)pi_new, fd->fd, (void *)pollset);
+ }
}
} else if (fd->polling_island == NULL) {
pi_new = polling_island_lock(pollset->polling_island);
@@ -1579,7 +1621,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (fd->polling_island != pi_new) {
PI_ADD_REF(pi_new, "fd");
if (fd->polling_island != NULL) {
- PI_UNREF(fd->polling_island, "fd");
+ PI_UNREF(exec_ctx, fd->polling_island, "fd");
}
fd->polling_island = pi_new;
}
@@ -1587,13 +1629,15 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (pollset->polling_island != pi_new) {
PI_ADD_REF(pi_new, "ps");
if (pollset->polling_island != NULL) {
- PI_UNREF(pollset->polling_island, "ps");
+ PI_UNREF(exec_ctx, pollset->polling_island, "ps");
}
pollset->polling_island = pi_new;
}
- gpr_mu_unlock(&fd->pi_mu);
+ gpr_mu_unlock(&fd->mu);
gpr_mu_unlock(&pollset->mu);
+
+ GRPC_LOG_IF_ERROR("pollset_add_fd", error);
}
/*******************************************************************************
@@ -1744,9 +1788,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
void *grpc_fd_get_polling_island(grpc_fd *fd) {
polling_island *pi;
- gpr_mu_lock(&fd->pi_mu);
+ gpr_mu_lock(&fd->mu);
pi = fd->polling_island;
- gpr_mu_unlock(&fd->pi_mu);
+ gpr_mu_unlock(&fd->mu);
return pi;
}
@@ -1794,6 +1838,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 9e306af5fa..c2107e5e39 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -725,6 +725,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
+
/*******************************************************************************
* pollset_posix.c
*/
@@ -2006,6 +2008,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 45c0a5e954..16a5e3083e 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -617,6 +617,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
GRPC_FD_UNREF(fd, "poll");
}
+static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
+
/*******************************************************************************
* pollset_posix.c
*/
@@ -842,6 +844,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
*worker_hdl = &worker;
grpc_error *error = GRPC_ERROR_NONE;
+ /* Avoid malloc for small number of elements. */
+ enum { inline_elements = 96 };
+ struct pollfd pollfd_space[inline_elements];
+ struct grpc_fd_watcher watcher_space[inline_elements];
+
/* pollset->mu already held */
int added_worker = 0;
int locked = 1;
@@ -897,15 +904,23 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int r;
size_t i, fd_count;
nfds_t pfd_count;
- /* TODO(ctiller): inline some elements to avoid an allocation */
grpc_fd_watcher *watchers;
struct pollfd *pfds;
timeout = poll_deadline_to_millis_timeout(deadline, now);
- /* TODO(ctiller): perform just one malloc here if we exceed the inline
- * case */
- pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2));
- watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2));
+
+ if (pollset->fd_count + 2 <= inline_elements) {
+ pfds = pollfd_space;
+ watchers = watcher_space;
+ } else {
+ /* Allocate one buffer to hold both pfds and watchers arrays */
+ const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
+ const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
+ void *buf = gpr_malloc(pfd_size + watch_size);
+ pfds = buf;
+ watchers = (void *)((char *)buf + pfd_size);
+ }
+
fd_count = 0;
pfd_count = 2;
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
@@ -972,8 +987,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
- gpr_free(pfds);
- gpr_free(watchers);
+ if (pfds != pollfd_space) {
+ /* pfds and watchers are in the same memory block pointed to by pfds */
+ gpr_free(pfds);
+ }
+
GPR_TIMER_END("maybe_work_and_unlock", 0);
locked = 0;
} else {
@@ -1234,6 +1252,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
+ .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index a3c1e9db9a..6536672685 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -148,6 +148,10 @@ grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
}
+grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) {
+ return g_event_engine->fd_get_workqueue(fd);
+}
+
int grpc_fd_wrapped_fd(grpc_fd *fd) {
return g_event_engine->fd_wrapped_fd(fd);
}
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 579c84ef70..c2aa1756ea 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -56,6 +56,7 @@ typedef struct grpc_event_engine_vtable {
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
bool (*fd_is_shutdown)(grpc_fd *fd);
+ grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
@@ -107,6 +108,9 @@ const char *grpc_get_poll_strategy_name();
This takes ownership of closing fd. */
grpc_fd *grpc_fd_create(int fd, const char *name);
+/* Get a workqueue that's associated with this fd */
+grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd);
+
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd *fd);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index c44aafcddf..ac7785ec13 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -37,6 +37,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
+#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
@@ -85,14 +86,17 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ if (offload_target_or_null == NULL) {
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
+ } else {
+ grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
+ GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
+ }
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
grpc_closure_list *list,
grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_move(list, &exec_ctx->closure_list);
}
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 38f27d9b13..917f332f03 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -93,7 +93,11 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
-/** Add a closure to be executed at the next flush/finish point */
+/** Add a closure to be executed in the future.
+ If \a offload_target_or_null is NULL, the closure will be executed at the
+ next exec_ctx.{finish,flush} point.
+ If \a offload_target_or_null is non-NULL, the closure will be scheduled
+ against the workqueue, and a reference to the workqueue will be consumed. */
void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error,
grpc_workqueue *offload_target_or_null);
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 89292a153e..d67d388b8c 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
@@ -62,6 +63,7 @@ void grpc_iomgr_init(void) {
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
+ grpc_network_status_init();
grpc_iomgr_platform_init();
}
@@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) {
grpc_iomgr_platform_shutdown();
grpc_exec_ctx_global_shutdown();
+ grpc_network_status_shutdown();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_rcv);
}
diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c
index 38a1c9b7d4..90c074b007 100644
--- a/src/core/lib/iomgr/network_status_tracker.c
+++ b/src/core/lib/iomgr/network_status_tracker.c
@@ -42,27 +42,21 @@ typedef struct endpoint_ll_node {
static endpoint_ll_node *head = NULL;
static gpr_mu g_endpoint_mutex;
-static bool g_init_done = false;
-void grpc_initialize_network_status_monitor() {
- g_init_done = true;
- gpr_mu_init(&g_endpoint_mutex);
- // TODO(makarandd): Install callback with OS to monitor network status.
-}
-
-void grpc_destroy_network_status_monitor() {
- for (endpoint_ll_node *curr = head; curr != NULL;) {
- endpoint_ll_node *next = curr->next;
- gpr_free(curr);
- curr = next;
+void grpc_network_status_shutdown(void) {
+ if (head != NULL) {
+ gpr_log(GPR_ERROR,
+ "Memory leaked as all network endpoints were not shut down");
}
gpr_mu_destroy(&g_endpoint_mutex);
}
+void grpc_network_status_init(void) {
+ gpr_mu_init(&g_endpoint_mutex);
+ // TODO(makarandd): Install callback with OS to monitor network status.
+}
+
void grpc_network_status_register_endpoint(grpc_endpoint *ep) {
- if (!g_init_done) {
- grpc_initialize_network_status_monitor();
- }
gpr_mu_lock(&g_endpoint_mutex);
if (head == NULL) {
head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node));
diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h
index 74a1aa8135..67cb645f44 100644
--- a/src/core/lib/iomgr/network_status_tracker.h
+++ b/src/core/lib/iomgr/network_status_tracker.h
@@ -35,7 +35,11 @@
#define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H
#include "src/core/lib/iomgr/endpoint.h"
+void grpc_network_status_init(void);
+void grpc_network_status_shutdown(void);
+
void grpc_network_status_register_endpoint(grpc_endpoint *ep);
void grpc_network_status_unregister_endpoint(grpc_endpoint *ep);
void grpc_network_status_shutdown_all_endpoints();
+
#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 2ab45e33ce..ec21e03944 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* returns true if done, false if pending; if returning true, *error is set */
-#define MAX_WRITE_IOVEC 16
+#define MAX_WRITE_IOVEC 1024
static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
@@ -450,9 +450,19 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static const grpc_endpoint_vtable vtable = {
- tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
- tcp_shutdown, tcp_destroy, tcp_get_peer};
+static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return grpc_fd_get_workqueue(tcp->em_fd);
+}
+
+static const grpc_endpoint_vtable vtable = {tcp_read,
+ tcp_write,
+ tcp_get_workqueue,
+ tcp_add_to_pollset,
+ tcp_add_to_pollset_set,
+ tcp_shutdown,
+ tcp_destroy,
+ tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index a1a463550a..cb2ff782d6 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -134,7 +134,7 @@ struct grpc_tcp_server {
size_t pollset_count;
/* next pollset to assign a channel to */
- size_t next_pollset_to_assign;
+ gpr_atm next_pollset_to_assign;
};
static gpr_once check_init = GPR_ONCE_INIT;
@@ -181,7 +181,7 @@ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
s->head = NULL;
s->tail = NULL;
s->nports = 0;
- s->next_pollset_to_assign = 0;
+ gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
*server = s;
return GRPC_ERROR_NONE;
}
@@ -369,7 +369,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
}
read_notifier_pollset =
- sp->server->pollsets[(sp->server->next_pollset_to_assign++) %
+ sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
+ &sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count];
/* loop until accept4 returns EAGAIN, and then re-arm notification */
@@ -490,7 +491,8 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) {
}
for (unsigned i = 0; i < count; i++) {
- int fd, port;
+ int fd = -1;
+ int port = -1;
grpc_dualstack_mode dsmode;
err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0,
&dsmode, &fd);
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 37ab59021e..35054c42b5 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -389,9 +389,16 @@ static char *win_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
-static grpc_endpoint_vtable vtable = {
- win_read, win_write, win_add_to_pollset, win_add_to_pollset_set,
- win_shutdown, win_destroy, win_get_peer};
+static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; }
+
+static grpc_endpoint_vtable vtable = {win_read,
+ win_write,
+ win_get_workqueue,
+ win_add_to_pollset,
+ win_add_to_pollset_set,
+ win_shutdown,
+ win_destroy,
+ win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 5cc40eea50..7156e490d7 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -38,6 +38,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/workqueue_posix.h"
@@ -49,35 +50,45 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
-/** Create a work queue */
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
- grpc_workqueue **workqueue);
-
+/* Deprecated: do not use.
+ This has *already* been removed in a future commit. */
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
-#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
+/* Reference counting functions. Use the macro's always
+ (GRPC_WORKQUEUE_{REF,UNREF}).
+
+ Pass in a descriptive reason string for reffing/unreffing as the last
+ argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that
+ string will be printed alongside the refcount. When it is not defined, the
+ string will be discarded at compilation time. */
+
+//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
- grpc_workqueue_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_WORKQUEUE_UNREF(cl, p, r) \
- grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r))
+ (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p))
+#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \
+ grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
const char *file, int line, const char *reason);
#else
-#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p))
+#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p))
#define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p))
void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Bind this workqueue to a pollset */
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset);
+/** Add a work item to a workqueue. Items added to a work queue will be started
+ in approximately the order they were enqueued, on some thread that may or
+ may not be the current thread. Successive closures enqueued onto a workqueue
+ MAY be executed concurrently.
+
+ It is generally more expensive to add a closure to a workqueue than to the
+ execution context, both in terms of CPU work and in execution latency.
-/** Add a work item to a workqueue */
+ Use work queues when it's important that other threads be given a chance to
+ tackle some workload. */
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error);
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
index 45e0f6063b..e0d6dac230 100644
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ b/src/core/lib/iomgr/workqueue_posix.c
@@ -70,7 +70,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {
- GPR_ASSERT(grpc_closure_list_empty(workqueue->closure_list));
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
}
@@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
}
-void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue,
- grpc_pollset *pollset) {
- grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd);
-}
-
void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_lock(&workqueue->mu);
grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
index dcb47e7b59..0f26ba58e2 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/workqueue_posix.h
@@ -50,4 +50,9 @@ struct grpc_workqueue {
grpc_closure read_closure;
};
+/** Create a work queue. Returns an error if creation fails. If creation
+ succeeds, sets *workqueue to point to it. */
+grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
+ grpc_workqueue **workqueue);
+
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 275f040b1c..23e2dea185 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -37,4 +37,26 @@
#include "src/core/lib/iomgr/workqueue.h"
+// Minimal implementation of grpc_workqueue for Windows
+// Works by directly enqueuing workqueue items onto the current execution
+// context, which is at least correct, if not performant or in the spirit of
+// workqueues.
+
+void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
+ const char *reason) {}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ const char *file, int line, const char *reason) {}
+#else
+void grpc_workqueue_ref(grpc_workqueue *workqueue) {}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
+#endif
+
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+ grpc_closure *closure, grpc_error *error) {
+ grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
#endif /* GPR_WINDOWS */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 14ccf72dc9..ed7929aa27 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -176,7 +176,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
calld->creds = grpc_composite_call_credentials_create(channel_call_creds,
ctx->creds, NULL);
if (calld->creds == NULL) {
- bubble_up_error(exec_ctx, elem, GRPC_STATUS_INTERNAL,
+ bubble_up_error(exec_ctx, elem, GRPC_STATUS_UNAUTHENTICATED,
"Incompatible credentials set on channel and call.");
return;
}
@@ -205,7 +205,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data,
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
- bubble_up_error(exec_ctx, elem, GRPC_STATUS_INTERNAL, error_msg);
+ bubble_up_error(exec_ctx, elem, GRPC_STATUS_UNAUTHENTICATED, error_msg);
gpr_free(error_msg);
}
}
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index 7650d68e89..bc50f9d1b0 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -360,11 +360,19 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
-static const grpc_endpoint_vtable vtable = {
- endpoint_read, endpoint_write,
- endpoint_add_to_pollset, endpoint_add_to_pollset_set,
- endpoint_shutdown, endpoint_destroy,
- endpoint_get_peer};
+static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
+ secure_endpoint *ep = (secure_endpoint *)secure_ep;
+ return grpc_endpoint_get_workqueue(ep->wrapped_ep);
+}
+
+static const grpc_endpoint_vtable vtable = {endpoint_read,
+ endpoint_write,
+ endpoint_get_workqueue,
+ endpoint_add_to_pollset,
+ endpoint_add_to_pollset_set,
+ endpoint_shutdown,
+ endpoint_destroy,
+ endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,
diff --git a/src/core/lib/support/time.c b/src/core/lib/support/time.c
index 57f8331194..5a7d043aed 100644
--- a/src/core/lib/support/time.c
+++ b/src/core/lib/support/time.c
@@ -80,103 +80,67 @@ gpr_timespec gpr_inf_past(gpr_clock_type type) {
return out;
}
-/* TODO(ctiller): consider merging _nanos, _micros, _millis into a single
- function for maintainability. Similarly for _seconds, _minutes, and _hours */
-
-gpr_timespec gpr_time_from_nanos(int64_t ns, gpr_clock_type type) {
- gpr_timespec result;
- result.clock_type = type;
- if (ns == INT64_MAX) {
- result = gpr_inf_future(type);
- } else if (ns == INT64_MIN) {
- result = gpr_inf_past(type);
- } else if (ns >= 0) {
- result.tv_sec = ns / GPR_NS_PER_SEC;
- result.tv_nsec = (int32_t)(ns - result.tv_sec * GPR_NS_PER_SEC);
+static gpr_timespec to_seconds_from_sub_second_time(int64_t time_in_units,
+ int64_t units_per_sec,
+ gpr_clock_type type) {
+ gpr_timespec out;
+ if (time_in_units == INT64_MAX) {
+ out = gpr_inf_future(type);
+ } else if (time_in_units == INT64_MIN) {
+ out = gpr_inf_past(type);
} else {
- /* Calculation carefully formulated to avoid any possible under/overflow. */
- result.tv_sec = (-(999999999 - (ns + GPR_NS_PER_SEC)) / GPR_NS_PER_SEC) - 1;
- result.tv_nsec = (int32_t)(ns - result.tv_sec * GPR_NS_PER_SEC);
+ if (time_in_units >= 0) {
+ out.tv_sec = time_in_units / units_per_sec;
+ } else {
+ out.tv_sec = (-((units_per_sec - 1) - (time_in_units + units_per_sec)) /
+ units_per_sec) -
+ 1;
+ }
+ out.tv_nsec = (int32_t)((time_in_units - out.tv_sec * units_per_sec) *
+ GPR_NS_PER_SEC / units_per_sec);
+ out.clock_type = type;
}
- return result;
+ return out;
}
-gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type type) {
- gpr_timespec result;
- result.clock_type = type;
- if (us == INT64_MAX) {
- result = gpr_inf_future(type);
- } else if (us == INT64_MIN) {
- result = gpr_inf_past(type);
- } else if (us >= 0) {
- result.tv_sec = us / 1000000;
- result.tv_nsec = (int32_t)((us - result.tv_sec * 1000000) * 1000);
+static gpr_timespec to_seconds_from_above_second_time(int64_t time_in_units,
+ int64_t secs_per_unit,
+ gpr_clock_type type) {
+ gpr_timespec out;
+ if (time_in_units >= INT64_MAX / secs_per_unit) {
+ out = gpr_inf_future(type);
+ } else if (time_in_units <= INT64_MIN / secs_per_unit) {
+ out = gpr_inf_past(type);
} else {
- /* Calculation carefully formulated to avoid any possible under/overflow. */
- result.tv_sec = (-(999999 - (us + 1000000)) / 1000000) - 1;
- result.tv_nsec = (int32_t)((us - result.tv_sec * 1000000) * 1000);
+ out.tv_sec = time_in_units * secs_per_unit;
+ out.tv_nsec = 0;
+ out.clock_type = type;
}
- return result;
+ return out;
+}
+
+gpr_timespec gpr_time_from_nanos(int64_t ns, gpr_clock_type type) {
+ return to_seconds_from_sub_second_time(ns, GPR_NS_PER_SEC, type);
+}
+
+gpr_timespec gpr_time_from_micros(int64_t us, gpr_clock_type type) {
+ return to_seconds_from_sub_second_time(us, GPR_US_PER_SEC, type);
}
gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) {
- gpr_timespec result;
- result.clock_type = type;
- if (ms == INT64_MAX) {
- result = gpr_inf_future(type);
- } else if (ms == INT64_MIN) {
- result = gpr_inf_past(type);
- } else if (ms >= 0) {
- result.tv_sec = ms / 1000;
- result.tv_nsec = (int32_t)((ms - result.tv_sec * 1000) * 1000000);
- } else {
- /* Calculation carefully formulated to avoid any possible under/overflow. */
- result.tv_sec = (-(999 - (ms + 1000)) / 1000) - 1;
- result.tv_nsec = (int32_t)((ms - result.tv_sec * 1000) * 1000000);
- }
- return result;
+ return to_seconds_from_sub_second_time(ms, GPR_MS_PER_SEC, type);
}
gpr_timespec gpr_time_from_seconds(int64_t s, gpr_clock_type type) {
- gpr_timespec result;
- result.clock_type = type;
- if (s == INT64_MAX) {
- result = gpr_inf_future(type);
- } else if (s == INT64_MIN) {
- result = gpr_inf_past(type);
- } else {
- result.tv_sec = s;
- result.tv_nsec = 0;
- }
- return result;
+ return to_seconds_from_sub_second_time(s, 1, type);
}
gpr_timespec gpr_time_from_minutes(int64_t m, gpr_clock_type type) {
- gpr_timespec result;
- result.clock_type = type;
- if (m >= INT64_MAX / 60) {
- result = gpr_inf_future(type);
- } else if (m <= INT64_MIN / 60) {
- result = gpr_inf_past(type);
- } else {
- result.tv_sec = m * 60;
- result.tv_nsec = 0;
- }
- return result;
+ return to_seconds_from_above_second_time(m, 60, type);
}
gpr_timespec gpr_time_from_hours(int64_t h, gpr_clock_type type) {
- gpr_timespec result;
- result.clock_type = type;
- if (h >= INT64_MAX / 3600) {
- result = gpr_inf_future(type);
- } else if (h <= INT64_MIN / 3600) {
- result = gpr_inf_past(type);
- } else {
- result.tv_sec = h * 3600;
- result.tv_nsec = 0;
- }
- return result;
+ return to_seconds_from_above_second_time(h, 3600, type);
}
gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) {
diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c
index c97079f638..310bacb2c9 100644
--- a/src/core/lib/surface/byte_buffer_reader.c
+++ b/src/core/lib/surface/byte_buffer_reader.c
@@ -54,8 +54,8 @@ static int is_compressed(grpc_byte_buffer *buffer) {
return 1 /* GPR_TRUE */;
}
-void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
- grpc_byte_buffer *buffer) {
+int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
+ grpc_byte_buffer *buffer) {
gpr_slice_buffer decompressed_slices_buffer;
reader->buffer_in = buffer;
switch (reader->buffer_in->type) {
@@ -67,9 +67,10 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
&decompressed_slices_buffer) == 0) {
gpr_log(GPR_ERROR,
"Unexpected error decompressing data for algorithm with enum "
- "value '%d'. Reading data as if it were uncompressed.",
+ "value '%d'.",
reader->buffer_in->data.raw.compression);
- reader->buffer_out = reader->buffer_in;
+ memset(reader, 0, sizeof(*reader));
+ return 0;
} else { /* all fine */
reader->buffer_out =
grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
@@ -82,6 +83,7 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
reader->current.index = 0;
break;
}
+ return 1;
}
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 708ea3502a..fc9df76dc1 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -259,7 +259,8 @@ grpc_call *grpc_call_create(
call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
}
}
- call->send_deadline = send_deadline;
+ call->send_deadline =
+ gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
GRPC_CHANNEL_INTERNAL_REF(channel, "call");
/* initial refcount dropped by grpc_call_destroy */
grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
@@ -1206,7 +1207,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
- char *algo_name;
+ char *algo_name = NULL;
grpc_compression_algorithm_name(algo, &algo_name);
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
@@ -1225,7 +1226,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
call->incoming_compression_algorithm)) {
extern int grpc_compression_trace;
if (grpc_compression_trace) {
- char *algo_name;
+ char *algo_name = NULL;
grpc_compression_algorithm_name(call->incoming_compression_algorithm,
&algo_name);
gpr_log(GPR_ERROR,
@@ -1426,7 +1427,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
const grpc_compression_algorithm calgo =
compression_algorithm_for_level_locked(
call, effective_compression_level);
- char *calgo_name;
+ char *calgo_name = NULL;
grpc_compression_algorithm_name(calgo, &calgo_name);
// the following will be picked up by the compress filter and used as
// the call's compression algorithm.
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 2cf6d8890a..6d2b1c4935 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -81,7 +81,7 @@ struct grpc_channel {
CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
/* the protobuf library will (by default) start warning at 100megs */
-#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024)
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index def6e5068b..2f108af48a 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct requested_call {
requested_call_type type;
+ size_t cq_idx;
void *tag;
grpc_server *server;
grpc_completion_queue *cq_bound_to_call;
@@ -206,11 +207,11 @@ struct grpc_server {
registered_method *registered_methods;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher;
- /** free list of available requested_calls indices */
- gpr_stack_lockfree *request_freelist;
+ /** free list of available requested_calls_per_cq indices */
+ gpr_stack_lockfree **request_freelist_per_cq;
/** requested call backing data */
- requested_call *requested_calls;
- size_t max_requested_calls;
+ requested_call **requested_calls_per_cq;
+ int max_requested_calls_per_cq;
gpr_atm shutdown_flag;
uint8_t shutdown_published;
@@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < server->cq_count; i++) {
while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) !=
-1) {
- fail_call(exec_ctx, server, i, &server->requested_calls[request_id],
+ fail_call(exec_ctx, server, i,
+ &server->requested_calls_per_cq[i][request_id],
GRPC_ERROR_REF(error));
}
}
@@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
+ if (server->started) {
+ gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]);
+ gpr_free(server->requested_calls_per_cq[i]);
+ }
}
- gpr_stack_lockfree_destroy(server->request_freelist);
+ gpr_free(server->request_freelist_per_cq);
+ gpr_free(server->requested_calls_per_cq);
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
- gpr_free(server->requested_calls);
gpr_free(server);
}
@@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
requested_call *rc = req;
grpc_server *server = rc->server;
- if (rc >= server->requested_calls &&
- rc < server->requested_calls + server->max_requested_calls) {
- GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
- gpr_stack_lockfree_push(server->request_freelist,
- (int)(rc - server->requested_calls));
+ if (rc >= server->requested_calls_per_cq[rc->cq_idx] &&
+ rc < server->requested_calls_per_cq[rc->cq_idx] +
+ server->max_requested_calls_per_cq) {
+ GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX);
+ gpr_stack_lockfree_push(
+ server->request_freelist_per_cq[rc->cq_idx],
+ (int)(rc - server->requested_calls_per_cq[rc->cq_idx]));
} else {
gpr_free(req);
}
@@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls[request_id]);
+ &server->requested_calls_per_cq[cq_idx][request_id]);
return; /* early out */
}
}
@@ -979,8 +987,6 @@ void grpc_server_register_non_listening_completion_queue(
}
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
- size_t i;
-
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -998,15 +1004,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
&server->root_channel_data;
/* TODO(ctiller): expose a channel_arg for this */
- server->max_requested_calls = 32768;
- server->request_freelist =
- gpr_stack_lockfree_create(server->max_requested_calls);
- for (i = 0; i < (size_t)server->max_requested_calls; i++) {
- gpr_stack_lockfree_push(server->request_freelist, (int)i);
- }
- server->requested_calls = gpr_malloc(server->max_requested_calls *
- sizeof(*server->requested_calls));
-
+ server->max_requested_calls_per_cq = 32768;
server->channel_args = grpc_channel_args_copy(args);
return server;
@@ -1066,16 +1064,28 @@ void grpc_server_start(grpc_server *server) {
server->started = true;
size_t pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->request_freelist_per_cq =
+ gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
+ server->requested_calls_per_cq =
+ gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]);
}
+ server->request_freelist_per_cq[i] =
+ gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
+ for (int j = 0; j < server->max_requested_calls_per_cq; j++) {
+ gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j);
+ }
+ server->requested_calls_per_cq[i] =
+ gpr_malloc((size_t)server->max_requested_calls_per_cq *
+ sizeof(*server->requested_calls_per_cq[i]));
}
request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls, server);
+ (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_init(&rm->request_matcher, server->max_requested_calls,
- server);
+ request_matcher_init(&rm->request_matcher,
+ (size_t)server->max_requested_calls_per_cq, server);
}
for (l = server->listeners; l; l = l->next) {
@@ -1307,11 +1317,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_CREATE("Server Shutdown"));
return GRPC_CALL_OK;
}
- request_id = gpr_stack_lockfree_pop(server->request_freelist);
+ request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]);
if (request_id == -1) {
/* out of request ids: just fail this one */
fail_call(exec_ctx, server, cq_idx, rc,
- GRPC_ERROR_CREATE("Server Shutdown"));
+ grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"),
+ GRPC_ERROR_INT_LIMIT,
+ server->max_requested_calls_per_cq));
return GRPC_CALL_OK;
}
switch (rc->type) {
@@ -1322,7 +1334,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &rc->data.registered.registered_method->request_matcher;
break;
}
- server->requested_calls[request_id] = *rc;
+ server->requested_calls_per_cq[cq_idx][request_id] = *rc;
gpr_free(rc);
if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) {
/* this was the first queued request: we need to lock and start
@@ -1346,7 +1358,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
publish_call(exec_ctx, server, calld, cq_idx,
- &server->requested_calls[request_id]);
+ &server->requested_calls_per_cq[cq_idx][request_id]);
}
gpr_mu_lock(&server->mu_call);
}
@@ -1382,6 +1394,7 @@ grpc_call_error grpc_server_request_call(
}
grpc_cq_begin_op(cq_for_notification, tag);
details->reserved = NULL;
+ rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
rc->server = server;
rc->tag = tag;
@@ -1430,6 +1443,7 @@ grpc_call_error grpc_server_request_registered_call(
goto done;
}
grpc_cq_begin_op(cq_for_notification, tag);
+ rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
rc->tag = tag;
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index 53f3c43854..1942075054 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,4 +36,4 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "0.16.0-dev"; }
+const char *grpc_version_string(void) { return "1.1.0-dev"; }
diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c
index 054f112127..68d05e3a85 100644
--- a/src/core/lib/transport/connectivity_state.c
+++ b/src/core/lib/transport/connectivity_state.c
@@ -179,6 +179,9 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
+ if (grpc_connectivity_state_trace) {
+ gpr_log(GPR_DEBUG, "NOTIFY: %p", w->notify);
+ }
grpc_exec_ctx_sched(exec_ctx, w->notify,
GRPC_ERROR_REF(tracker->current_error), NULL);
gpr_free(w);