aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channel_args.h4
-rw-r--r--src/core/lib/channel/channel_stack.c6
-rw-r--r--src/core/lib/channel/channel_stack.h11
-rw-r--r--src/core/lib/channel/compress_filter.c36
-rw-r--r--src/core/lib/channel/compress_filter.h2
-rw-r--r--src/core/lib/channel/connected_channel.c7
-rw-r--r--src/core/lib/channel/http_client_filter.c4
-rw-r--r--src/core/lib/channel/http_server_filter.c17
-rw-r--r--src/core/lib/http/httpcli.c2
-rw-r--r--src/core/lib/iomgr/closure.c6
-rw-r--r--src/core/lib/iomgr/closure.h3
-rw-r--r--src/core/lib/iomgr/ev_posix.c1
-rw-r--r--src/core/lib/iomgr/exec_ctx.h2
-rw-r--r--src/core/lib/iomgr/iomgr.c6
-rw-r--r--src/core/lib/iomgr/resolve_address.h5
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c9
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c9
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c25
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c29
-rw-r--r--src/core/lib/iomgr/tcp_posix.c4
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c28
-rw-r--r--src/core/lib/iomgr/tcp_windows.c16
-rw-r--r--src/core/lib/iomgr/timer.c15
-rw-r--r--src/core/lib/security/client_auth_filter.c4
-rw-r--r--src/core/lib/security/server_auth_filter.c4
-rw-r--r--src/core/lib/support/env_win32.c44
-rw-r--r--src/core/lib/support/log_linux.c4
-rw-r--r--src/core/lib/support/log_win32.c18
-rw-r--r--src/core/lib/support/string_util_win32.c94
-rw-r--r--src/core/lib/support/string_win32.c32
-rw-r--r--src/core/lib/support/time_posix.c10
-rw-r--r--src/core/lib/support/time_win32.c4
-rw-r--r--src/core/lib/support/tmpfile_msys.c73
-rw-r--r--src/core/lib/support/tmpfile_posix.c4
-rw-r--r--src/core/lib/support/tmpfile_win32.c4
-rw-r--r--src/core/lib/surface/call.c137
-rw-r--r--src/core/lib/surface/completion_queue.c4
-rw-r--r--src/core/lib/surface/init.c3
-rw-r--r--src/core/lib/surface/lame_client.c9
-rw-r--r--src/core/lib/surface/server.c4
-rw-r--r--src/core/lib/surface/validate_metadata.c2
-rw-r--r--src/core/lib/surface/version.c2
-rw-r--r--src/core/lib/transport/metadata.c29
-rw-r--r--src/core/lib/transport/metadata.h5
-rw-r--r--src/core/lib/transport/metadata_batch.c9
-rw-r--r--src/core/lib/transport/metadata_batch.h3
-rw-r--r--src/core/lib/transport/transport.c5
-rw-r--r--src/core/lib/transport/transport.h12
-rw-r--r--src/core/lib/transport/transport_impl.h2
49 files changed, 533 insertions, 235 deletions
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 0a51780a14..23c7b7b897 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -56,10 +56,6 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
/** Destroy arguments created by \a grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
-/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
- * is specified in channel args, otherwise returns 0. */
-int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
-
/** Returns the compression algorithm set in \a a. */
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
const grpc_channel_args *a);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index e36066d863..ad182d1f69 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -213,14 +213,16 @@ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_pollset *pollset) {}
-void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) {
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
+ void *and_free_memory) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count;
size_t i;
/* destroy per-filter data */
for (i = 0; i < count; i++) {
- elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]);
+ elems[i].filter->destroy_call_elem(exec_ctx, &elems[i],
+ i == count - 1 ? and_free_memory : NULL);
}
}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 9e3a25a152..36c17cb467 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -104,8 +104,12 @@ typedef struct {
void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset);
/* Destroy per call data.
- The filter does not need to do any chaining */
- void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+ The filter does not need to do any chaining.
+ The bottom filter of a stack will be passed a non-NULL pointer to
+ \a and_free_memory that should be passed to gpr_free when destruction
+ is complete. */
+ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *and_free_memory);
/* sizeof(per channel data) */
size_t sizeof_channel_data;
@@ -223,7 +227,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
#endif
/* Destroy a call stack */
-void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
+void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
+ void *and_free_memory);
/* Ignore set pollset - used by filters to implement the set_pollset method
if they don't care about pollsets at all. Does nothing. */
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 229fdb5ef6..5510c79b18 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -47,6 +47,8 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
+int grpc_compress_filter_trace = 0;
+
typedef struct call_data {
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
@@ -169,9 +171,29 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
did_compress =
grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
+ if (grpc_compress_filter_trace) {
+ char *algo_name;
+ const size_t before_size = calld->slices.length;
+ const size_t after_size = tmp.length;
+ const float savings_ratio = 1.0f - (float)after_size / (float)before_size;
+ GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
+ &algo_name));
+ gpr_log(GPR_DEBUG,
+ "Compressed[%s] %d bytes vs. %d bytes (%.2f%% savings)",
+ algo_name, before_size, after_size, 100 * savings_ratio);
+ }
gpr_slice_buffer_swap(&calld->slices, &tmp);
calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ } else {
+ if (grpc_compress_filter_trace) {
+ char *algo_name;
+ GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
+ &algo_name));
+ gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.",
+ algo_name);
+ }
}
+
gpr_slice_buffer_destroy(&tmp);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
@@ -246,8 +268,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
gpr_slice_buffer_destroy(&calld->slices);
@@ -268,8 +290,14 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args->channel_args);
/* Make sure the default isn't disabled. */
- GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
- &channeld->compression_options, channeld->default_compression_algorithm));
+ if (!grpc_compression_options_is_algorithm_enabled(
+ &channeld->compression_options,
+ channeld->default_compression_algorithm)) {
+ gpr_log(GPR_DEBUG,
+ "compression algorithm %d not enabled: switching to none",
+ channeld->default_compression_algorithm);
+ channeld->default_compression_algorithm = GRPC_COMPRESS_NONE;
+ }
channeld->compression_options.default_compression_algorithm =
channeld->default_compression_algorithm;
diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h
index 0d973329c4..cf5879d82e 100644
--- a/src/core/lib/channel/compress_filter.h
+++ b/src/core/lib/channel/compress_filter.h
@@ -38,6 +38,8 @@
#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request"
+extern int grpc_compress_filter_trace;
+
/** Compression filter for outgoing data.
*
* See <grpc/compression.h> for the available compression settings.
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index c1debab4c6..68a3a7d6fd 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -102,12 +102,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *and_free_memory) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_transport_destroy_stream(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld));
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ and_free_memory);
}
/* Constructor for channel_data */
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 211f537c69..516e708d1f 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -155,8 +155,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {}
static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
unsigned i;
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index c140c61b8f..ba865416de 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -39,6 +39,9 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/transport/static_metadata.h"
+#define EXPECTED_CONTENT_TYPE "application/grpc"
+#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
+
typedef struct call_data {
uint8_t seen_path;
uint8_t seen_method;
@@ -92,8 +95,11 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
require */
return NULL;
} else if (md->key == GRPC_MDSTR_CONTENT_TYPE) {
- if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) ==
- 0) {
+ const char *value_str = grpc_mdstr_as_c_string(md->value);
+ if (strncmp(value_str, EXPECTED_CONTENT_TYPE,
+ EXPECTED_CONTENT_TYPE_LENGTH) == 0 &&
+ (value_str[EXPECTED_CONTENT_TYPE_LENGTH] == '+' ||
+ value_str[EXPECTED_CONTENT_TYPE_LENGTH] == ';')) {
/* Although the C implementation doesn't (currently) generate them,
any custom +-suffix is explicitly valid. */
/* TODO(klempner): We should consider preallocating common values such
@@ -102,8 +108,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
} else {
/* TODO(klempner): We're currently allowing this, but we shouldn't
see it without a proxy so log for now. */
- gpr_log(GPR_INFO, "Unexpected content-type %s",
- grpc_mdstr_as_c_string(md->value));
+ gpr_log(GPR_INFO, "Unexpected content-type %s", value_str);
}
return NULL;
} else if (md->key == GRPC_MDSTR_TE || md->key == GRPC_MDSTR_METHOD ||
@@ -220,8 +225,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index 76bd1b64dc..f22721ac8f 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -246,7 +246,7 @@ static void internal_request_begin(
grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set,
req->pollset);
- grpc_resolve_address(request->host, req->handshaker->default_port,
+ grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port,
on_resolved, req);
}
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index d6f073fc9d..27793c32e4 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -54,6 +54,12 @@ void grpc_closure_list_add(grpc_closure_list *closure_list,
closure_list->tail = closure;
}
+void grpc_closure_list_fail_all(grpc_closure_list *list) {
+ for (grpc_closure *c = list->head; c != NULL; c = grpc_closure_next(c)) {
+ c->final_data &= ~(uintptr_t)1;
+ }
+}
+
bool grpc_closure_list_empty(grpc_closure_list closure_list) {
return closure_list.head == NULL;
}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 8652b53a8b..fdc2daed9d 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -86,6 +86,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
bool success);
+/** force all success bits in \a list to false */
+void grpc_closure_list_fail_all(grpc_closure_list *list);
+
/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 0eb95a2e09..7df1751352 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -44,7 +44,6 @@
static const grpc_event_engine_vtable *g_event_engine;
grpc_poll_function_type grpc_poll_function = poll;
-grpc_wakeup_fd grpc_global_wakeup_fd;
void grpc_event_engine_init(void) {
if ((g_event_engine = grpc_init_poll_and_epoll_posix())) {
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index e09ef02400..976cc40347 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -93,6 +93,8 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
grpc_workqueue *offload_target_or_null);
void grpc_exec_ctx_global_init(void);
+
+void grpc_exec_ctx_global_init(void);
void grpc_exec_ctx_global_shutdown(void);
#endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 146663984d..60cef8ba77 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -166,8 +166,10 @@ bool grpc_iomgr_abort_on_leaks(void) {
if (env == NULL) return false;
static const char *truthy[] = {"yes", "Yes", "YES", "true",
"True", "TRUE", "1"};
+ bool should_we = false;
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
- if (0 == strcmp(env, truthy[i])) return true;
+ if (0 == strcmp(env, truthy[i])) should_we = true;
}
- return false;
+ gpr_free(env);
+ return should_we;
}
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index ecc06340a3..ef198fe0f6 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -59,8 +59,9 @@ typedef void (*grpc_resolve_cb)(grpc_exec_ctx *exec_ctx, void *arg,
/* Asynchronously resolve addr. Use default_port if a port isn't designated
in addr, otherwise use the port in addr. */
/* TODO(ctiller): add a timeout here */
-void grpc_resolve_address(const char *addr, const char *default_port,
- grpc_resolve_cb cb, void *arg);
+extern void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *addr,
+ const char *default_port,
+ grpc_resolve_cb cb, void *arg);
/* Destroy resolved addresses */
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses);
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index b9d3bbdb89..cae91eec20 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -164,8 +164,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
gpr_free(addrs);
}
-void grpc_resolve_address(const char *name, const char *default_port,
- grpc_resolve_cb cb, void *arg) {
+static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) {
request *r = gpr_malloc(sizeof(request));
grpc_closure_init(&r->request_closure, do_request_thread, r);
r->name = gpr_strdup(name);
@@ -175,4 +176,8 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_executor_enqueue(&r->request_closure, 1);
}
+void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) = resolve_address_impl;
+
#endif
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 82763d11f4..914736234d 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -155,8 +155,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
gpr_free(addrs);
}
-void grpc_resolve_address(const char *name, const char *default_port,
- grpc_resolve_cb cb, void *arg) {
+static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) {
request *r = gpr_malloc(sizeof(request));
grpc_closure_init(&r->request_closure, do_request_thread, r);
r->name = gpr_strdup(name);
@@ -166,4 +167,8 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_executor_enqueue(&r->request_closure, 1);
}
+void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
+ const char *default_port, grpc_resolve_cb cb,
+ void *arg) = resolve_address_impl;
+
#endif
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 6430cb629f..e93d5734a0 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -211,11 +211,11 @@ finish:
grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL);
}
-void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_endpoint **ep,
- grpc_pollset_set *interested_parties,
- const struct sockaddr *addr, size_t addr_len,
- gpr_timespec deadline) {
+static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties,
+ const struct sockaddr *addr,
+ size_t addr_len, gpr_timespec deadline) {
int fd;
grpc_dualstack_mode dsmode;
int err;
@@ -303,4 +303,19 @@ done:
gpr_free(addr_str);
}
+// overridden by api_fuzzer.c
+void (*grpc_tcp_client_connect_impl)(
+ grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties, const struct sockaddr *addr,
+ size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl;
+
+void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_endpoint **ep,
+ grpc_pollset_set *interested_parties,
+ const struct sockaddr *addr, size_t addr_len,
+ gpr_timespec deadline) {
+ grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr,
+ addr_len, deadline);
+}
+
#endif
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 7d78beb15a..66f9ff7a46 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -63,39 +63,45 @@ typedef struct {
grpc_endpoint **endpoint;
} async_connect;
-static void async_connect_unlock_and_cleanup(async_connect *ac) {
+static void async_connect_unlock_and_cleanup(async_connect *ac,
+ grpc_winsocket *socket) {
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
- if (ac->socket != NULL) grpc_winsocket_destroy(ac->socket);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
}
+ if (socket != NULL) grpc_winsocket_destroy(socket);
}
static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
- /* If the alarm didn't occur, it got cancelled. */
- if (ac->socket != NULL && occured) {
+ if (ac->socket != NULL) {
grpc_winsocket_shutdown(ac->socket);
}
- async_connect_unlock_and_cleanup(ac);
+ async_connect_unlock_and_cleanup(ac, ac->socket);
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint **ep = ac->endpoint;
+ GPR_ASSERT(*ep == NULL);
grpc_winsocket_callback_info *info = &ac->socket->write_info;
grpc_closure *on_done = ac->on_done;
+ gpr_mu_lock(&ac->mu);
+ grpc_winsocket *socket = ac->socket;
+ ac->socket = NULL;
+ gpr_mu_unlock(&ac->mu);
+
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
- if (from_iocp) {
+ if (from_iocp && socket != NULL) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@@ -107,12 +113,12 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
ac->addr_name, utf8_message);
gpr_free(utf8_message);
} else {
- *ep = grpc_tcp_create(ac->socket, ac->addr_name);
- ac->socket = NULL;
+ *ep = grpc_tcp_create(socket, ac->addr_name);
+ socket = NULL;
}
}
- async_connect_unlock_and_cleanup(ac);
+ async_connect_unlock_and_cleanup(ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
on_done->cb(exec_ctx, on_done->cb_arg, *ep != NULL);
@@ -138,6 +144,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
const char *message = NULL;
char *utf8_message;
grpc_winsocket_callback_info *info;
+ int last_error;
*endpoint = NULL;
@@ -208,8 +215,10 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
return;
failure:
- utf8_message = gpr_format_message(WSAGetLastError());
+ last_error = WSAGetLastError();
+ utf8_message = gpr_format_message(last_error);
gpr_log(GPR_ERROR, message, utf8_message);
+ gpr_log(GPR_ERROR, "last error = %d", last_error);
gpr_free(utf8_message);
if (socket != NULL) {
grpc_winsocket_destroy(socket);
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 7210aef5d5..e2869224f1 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -164,7 +164,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
for (i = 0; i < tcp->incoming_buffer->count; i++) {
char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_free(dump);
}
}
@@ -398,7 +398,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
for (i = 0; i < buf->count; i++) {
char *data =
gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
}
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 6940dec7b0..125f521d87 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -508,34 +508,6 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
}
-unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s,
- unsigned port_index) {
- grpc_tcp_listener *sp;
- for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
- ;
- if (sp) {
- return 1;
- } else {
- return 0;
- }
-}
-
-int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
- unsigned fd_index) {
- grpc_tcp_listener *sp;
- if (fd_index != 0) {
- /* Windows implementation has only one fd per port_index. */
- return -1;
- }
- for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
- ;
- if (sp) {
- return _open_osfhandle((intptr_t)sp->socket->socket, 0);
- } else {
- return -1;
- }
-}
-
void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
grpc_pollset **pollset, size_t pollset_count,
grpc_tcp_server_cb on_accept_cb,
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 7ee689a7e4..551149e1a6 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -35,6 +35,8 @@
#ifdef GPR_WINSOCK_SOCKET
+#include <limits.h>
+
#include "src/core/lib/iomgr/sockaddr_win32.h"
#include <grpc/support/alloc.h>
@@ -51,12 +53,20 @@
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
+#if defined(__MSYS__) && defined(GPR_ARCH_64)
+/* Nasty workaround for nasty bug when using the 64 bits msys compiler
+ in conjunction with Microsoft Windows headers. */
+#define GRPC_FIONBIO _IOW('f', 126, uint32_t)
+#else
+#define GRPC_FIONBIO FIONBIO
+#endif
+
static int set_non_block(SOCKET sock) {
int status;
- unsigned long param = 1;
+ uint32_t param = 1;
DWORD ret;
- status =
- WSAIoctl(sock, FIONBIO, &param, sizeof(param), NULL, 0, &ret, NULL, NULL);
+ status = WSAIoctl(sock, GRPC_FIONBIO, &param, sizeof(param), NULL, 0, &ret,
+ NULL, NULL);
return status == 0;
}
diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c
index 713f15b69e..acb5b26c87 100644
--- a/src/core/lib/iomgr/timer.c
+++ b/src/core/lib/iomgr/timer.c
@@ -70,6 +70,7 @@ static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
+static bool g_initialized = false;
static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
gpr_timespec *next, int success);
@@ -83,6 +84,7 @@ static gpr_timespec compute_min_deadline(shard_type *shard) {
void grpc_timer_list_init(gpr_timespec now) {
uint32_t i;
+ g_initialized = true;
gpr_mu_init(&g_mu);
gpr_mu_init(&g_checker_mu);
g_clock_type = now.clock_type;
@@ -111,6 +113,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
}
gpr_mu_destroy(&g_mu);
gpr_mu_destroy(&g_checker_mu);
+ g_initialized = false;
}
/* This is a cheap, but good enough, pointer hash for sharding the tasks: */
@@ -180,6 +183,18 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
timer->deadline = deadline;
timer->triggered = 0;
+ if (!g_initialized) {
+ timer->triggered = 1;
+ grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
+ return;
+ }
+
+ if (gpr_time_cmp(deadline, now) <= 0) {
+ timer->triggered = 1;
+ grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, true, NULL);
+ return;
+ }
+
/* TODO(ctiller): check deadline expired */
gpr_mu_lock(&shard->mu);
diff --git a/src/core/lib/security/client_auth_filter.c b/src/core/lib/security/client_auth_filter.c
index 943b1da85c..8b58cb86bf 100644
--- a/src/core/lib/security/client_auth_filter.c
+++ b/src/core/lib/security/client_auth_filter.c
@@ -277,8 +277,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {
call_data *calld = elem->call_data;
grpc_call_credentials_unref(calld->creds);
if (calld->host != NULL) {
diff --git a/src/core/lib/security/server_auth_filter.c b/src/core/lib/security/server_auth_filter.c
index 7844dc87cb..3320497d21 100644
--- a/src/core/lib/security/server_auth_filter.c
+++ b/src/core/lib/security/server_auth_filter.c
@@ -224,8 +224,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset) {}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/support/env_win32.c b/src/core/lib/support/env_win32.c
index ef84c941df..e670e1e8d0 100644
--- a/src/core/lib/support/env_win32.c
+++ b/src/core/lib/support/env_win32.c
@@ -33,41 +33,47 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_ENV
+
+#include <windows.h>
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
-
-#ifdef __MINGW32__
-errno_t getenv_s(size_t *size_needed, char *buffer, size_t size,
- const char *varname);
-#else
-#include <stdlib.h>
-#endif
+#include "src/core/lib/support/string_win32.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
char *gpr_getenv(const char *name) {
- size_t size;
char *result = NULL;
- errno_t err;
+ DWORD size;
+ LPTSTR tresult = NULL;
+ LPTSTR tname = gpr_char_to_tchar(name);
+ DWORD ret;
- err = getenv_s(&size, NULL, 0, name);
- if (err || (size == 0)) return NULL;
- result = gpr_malloc(size);
- err = getenv_s(&size, result, size, name);
- if (err) {
- gpr_free(result);
+ ret = GetEnvironmentVariable(tname, NULL, 0);
+ if (ret == 0) return NULL;
+ size = ret * (DWORD)sizeof(TCHAR);
+ tresult = gpr_malloc(size);
+ ret = GetEnvironmentVariable(tname, tresult, size);
+ gpr_free(tname);
+ if (ret == 0) {
+ gpr_free(tresult);
return NULL;
}
+ result = gpr_tchar_to_char(tresult);
+ gpr_free(tresult);
return result;
}
void gpr_setenv(const char *name, const char *value) {
- errno_t res = _putenv_s(name, value);
- GPR_ASSERT(res == 0);
+ LPTSTR tname = gpr_char_to_tchar(name);
+ LPTSTR tvalue = gpr_char_to_tchar(value);
+ BOOL res = SetEnvironmentVariable(tname, tvalue);
+ gpr_free(tname);
+ gpr_free(tvalue);
+ GPR_ASSERT(res);
}
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_ENV */
diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c
index ff3febb38e..ca04c022e3 100644
--- a/src/core/lib/support/log_linux.c
+++ b/src/core/lib/support/log_linux.c
@@ -41,7 +41,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_LINUX
+#ifdef GPR_LINUX_LOG
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -103,4 +103,4 @@ void gpr_default_log(gpr_log_func_args *args) {
gpr_free(prefix);
}
-#endif
+#endif /* GPR_LINUX_LOG */
diff --git a/src/core/lib/support/log_win32.c b/src/core/lib/support/log_win32.c
index ba78497a0a..29735bd18c 100644
--- a/src/core/lib/support/log_win32.c
+++ b/src/core/lib/support/log_win32.c
@@ -33,7 +33,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_LOG
#include <stdarg.h>
#include <stdio.h>
@@ -109,18 +109,4 @@ void gpr_default_log(gpr_log_func_args *args) {
fflush(stderr);
}
-char *gpr_format_message(int messageid) {
- LPTSTR tmessage;
- char *message;
- DWORD status = FormatMessage(
- FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
- FORMAT_MESSAGE_IGNORE_INSERTS,
- NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
- (LPTSTR)(&tmessage), 0, NULL);
- if (status == 0) return gpr_strdup("Unable to retrieve error string");
- message = gpr_tchar_to_char(tmessage);
- LocalFree(tmessage);
- return message;
-}
-
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_LOG */
diff --git a/src/core/lib/support/string_util_win32.c b/src/core/lib/support/string_util_win32.c
new file mode 100644
index 0000000000..f3cb0c050f
--- /dev/null
+++ b/src/core/lib/support/string_util_win32.c
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Posix code for gpr snprintf support. */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WIN32
+
+/* Some platforms (namely msys) need wchar to be included BEFORE
+ anything else, especially strsafe.h. */
+#include <wchar.h>
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+#include <strsafe.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/support/string.h"
+
+#if defined UNICODE || defined _UNICODE
+LPTSTR
+gpr_char_to_tchar(LPCSTR input) {
+ LPTSTR ret;
+ int needed = MultiByteToWideChar(CP_UTF8, 0, input, -1, NULL, 0);
+ if (needed <= 0) return NULL;
+ ret = gpr_malloc((unsigned)needed * sizeof(TCHAR));
+ MultiByteToWideChar(CP_UTF8, 0, input, -1, ret, needed);
+ return ret;
+}
+
+LPSTR
+gpr_tchar_to_char(LPCTSTR input) {
+ LPSTR ret;
+ int needed = WideCharToMultiByte(CP_UTF8, 0, input, -1, NULL, 0, NULL, NULL);
+ if (needed <= 0) return NULL;
+ ret = gpr_malloc((unsigned)needed);
+ WideCharToMultiByte(CP_UTF8, 0, input, -1, ret, needed, NULL, NULL);
+ return ret;
+}
+#else
+char *gpr_tchar_to_char(LPTSTR input) { return gpr_strdup(input); }
+
+char *gpr_char_to_tchar(LPTSTR input) { return gpr_strdup(input); }
+#endif
+
+char *gpr_format_message(int messageid) {
+ LPTSTR tmessage;
+ char *message;
+ DWORD status = FormatMessage(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR)(&tmessage), 0, NULL);
+ if (status == 0) return gpr_strdup("Unable to retrieve error string");
+ message = gpr_tchar_to_char(tmessage);
+ LocalFree(tmessage);
+ return message;
+}
+
+#endif /* GPR_WIN32 */
diff --git a/src/core/lib/support/string_win32.c b/src/core/lib/support/string_win32.c
index a2f9857356..6b92f79253 100644
--- a/src/core/lib/support/string_win32.c
+++ b/src/core/lib/support/string_win32.c
@@ -31,11 +31,11 @@
*
*/
-/* Posix code for gpr snprintf support. */
+/* Windows code for gpr snprintf support. */
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_STRING
#include <stdarg.h>
#include <stdio.h>
@@ -80,30 +80,4 @@ int gpr_asprintf(char **strp, const char *format, ...) {
return -1;
}
-#if defined UNICODE || defined _UNICODE
-LPTSTR
-gpr_char_to_tchar(LPCSTR input) {
- LPTSTR ret;
- int needed = MultiByteToWideChar(CP_UTF8, 0, input, -1, NULL, 0);
- if (needed <= 0) return NULL;
- ret = gpr_malloc((unsigned)needed * sizeof(TCHAR));
- MultiByteToWideChar(CP_UTF8, 0, input, -1, ret, needed);
- return ret;
-}
-
-LPSTR
-gpr_tchar_to_char(LPCTSTR input) {
- LPSTR ret;
- int needed = WideCharToMultiByte(CP_UTF8, 0, input, -1, NULL, 0, NULL, NULL);
- if (needed <= 0) return NULL;
- ret = gpr_malloc((unsigned)needed);
- WideCharToMultiByte(CP_UTF8, 0, input, -1, ret, needed, NULL, NULL);
- return ret;
-}
-#else
-char *gpr_tchar_to_char(LPTSTR input) { return gpr_strdup(input); }
-
-char *gpr_char_to_tchar(LPTSTR input) { return gpr_strdup(input); }
-#endif
-
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_STRING */
diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c
index f5f62dadc6..11542072fe 100644
--- a/src/core/lib/support/time_posix.c
+++ b/src/core/lib/support/time_posix.c
@@ -78,7 +78,7 @@ static const clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC,
void gpr_time_init(void) { gpr_precise_clock_init(); }
-gpr_timespec gpr_now(gpr_clock_type clock_type) {
+static gpr_timespec now_impl(gpr_clock_type clock_type) {
struct timespec now;
GPR_ASSERT(clock_type != GPR_TIMESPAN);
if (clock_type == GPR_CLOCK_PRECISE) {
@@ -114,7 +114,7 @@ void gpr_time_init(void) {
g_time_start = mach_absolute_time();
}
-gpr_timespec gpr_now(gpr_clock_type clock) {
+static gpr_timespec now_impl(gpr_clock_type clock) {
gpr_timespec now;
struct timeval now_tv;
double now_dbl;
@@ -142,6 +142,12 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
}
#endif
+gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type) = now_impl;
+
+gpr_timespec gpr_now(gpr_clock_type clock_type) {
+ return gpr_now_impl(clock_type);
+}
+
void gpr_sleep_until(gpr_timespec until) {
gpr_timespec now;
gpr_timespec delta;
diff --git a/src/core/lib/support/time_win32.c b/src/core/lib/support/time_win32.c
index f7acbd14a6..9e924ab3f4 100644
--- a/src/core/lib/support/time_win32.c
+++ b/src/core/lib/support/time_win32.c
@@ -35,7 +35,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_TIME
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -107,4 +107,4 @@ void gpr_sleep_until(gpr_timespec until) {
}
}
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_TIME */
diff --git a/src/core/lib/support/tmpfile_msys.c b/src/core/lib/support/tmpfile_msys.c
new file mode 100644
index 0000000000..2fdc89a64f
--- /dev/null
+++ b/src/core/lib/support/tmpfile_msys.c
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_MSYS_TMPFILE
+
+#include <io.h>
+#include <stdio.h>
+#include <string.h>
+#include <tchar.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/support/string_win32.h"
+#include "src/core/lib/support/tmpfile.h"
+
+FILE *gpr_tmpfile(const char *prefix, char **tmp_filename_out) {
+ FILE *result = NULL;
+ char tmp_filename[MAX_PATH];
+ UINT success;
+
+ if (tmp_filename_out != NULL) *tmp_filename_out = NULL;
+
+ /* Generate a unique filename with our template + temporary path. */
+ success = GetTempFileNameA(".", prefix, 0, tmp_filename);
+ fprintf(stderr, "success = %d\n", success);
+
+ if (success) {
+ /* Open a file there. */
+ result = fopen(tmp_filename, "wb+");
+ fprintf(stderr, "result = %p\n", result);
+ }
+ if (result != NULL && tmp_filename_out) {
+ *tmp_filename_out = gpr_strdup(tmp_filename);
+ }
+
+ return result;
+}
+
+#endif /* GPR_MSYS_TMPFILE */
diff --git a/src/core/lib/support/tmpfile_posix.c b/src/core/lib/support/tmpfile_posix.c
index 9e0e7ad808..0cd4bb6fc3 100644
--- a/src/core/lib/support/tmpfile_posix.c
+++ b/src/core/lib/support/tmpfile_posix.c
@@ -33,7 +33,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_POSIX_FILE
+#ifdef GPR_POSIX_TMPFILE
#include "src/core/lib/support/tmpfile.h"
@@ -82,4 +82,4 @@ end:
return result;
}
-#endif /* GPR_POSIX_FILE */
+#endif /* GPR_POSIX_TMPFILE */
diff --git a/src/core/lib/support/tmpfile_win32.c b/src/core/lib/support/tmpfile_win32.c
index 0cb2904f8d..9ac73128c3 100644
--- a/src/core/lib/support/tmpfile_win32.c
+++ b/src/core/lib/support/tmpfile_win32.c
@@ -33,7 +33,7 @@
#include <grpc/support/port_platform.h>
-#ifdef GPR_WIN32
+#ifdef GPR_WIN32_TMPFILE
#include <io.h>
#include <stdio.h>
@@ -81,4 +81,4 @@ end:
return result;
}
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32_TMPFILE */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 6581bbd3d1..9b2b94eedf 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -142,22 +142,23 @@ struct grpc_call {
gpr_mu mu;
/* client or server call */
- uint8_t is_client;
+ bool is_client;
/* is the alarm set */
- uint8_t have_alarm;
+ bool have_alarm;
/** has grpc_call_destroy been called */
- uint8_t destroy_called;
+ bool destroy_called;
/** flag indicating that cancellation is inherited */
- uint8_t cancellation_is_inherited;
+ bool cancellation_is_inherited;
/** bitmask of live batches */
uint8_t used_batches;
/** which ops are in-flight */
- uint8_t sent_initial_metadata;
- uint8_t sending_message;
- uint8_t sent_final_op;
- uint8_t received_initial_metadata;
- uint8_t receiving_message;
- uint8_t received_final_op;
+ bool sent_initial_metadata;
+ bool sending_message;
+ bool sent_final_op;
+ bool received_initial_metadata;
+ bool receiving_message;
+ bool requested_final_op;
+ bool received_final_op;
/* have we received initial metadata */
bool has_initial_md_been_received;
@@ -220,10 +221,7 @@ struct grpc_call {
} server;
} final_op;
- struct {
- void *bctlp;
- bool success;
- } saved_receiving_stream_ready_ctx;
+ void *saved_receiving_stream_ready_bctlp;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -375,8 +373,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
}
- grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
gpr_mu_destroy(&c->mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (c->status[i].details) {
@@ -394,7 +390,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
if (c->cq) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- gpr_free(c);
+ grpc_channel *channel = c->channel;
+ grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c);
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
GPR_TIMER_END("destroy_call", 0);
}
@@ -554,21 +552,6 @@ static int prepare_application_metadata(grpc_call *call, int count,
int i;
grpc_metadata_batch *batch =
&call->metadata_batch[0 /* is_receiving */][is_trailing];
- if (prepend_extra_metadata) {
- if (call->send_extra_metadata_count == 0) {
- prepend_extra_metadata = 0;
- } else {
- for (i = 0; i < call->send_extra_metadata_count; i++) {
- GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
- }
- for (i = 1; i < call->send_extra_metadata_count; i++) {
- call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
- }
- for (i = 0; i < call->send_extra_metadata_count - 1; i++) {
- call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1];
- }
- }
- }
for (i = 0; i < count; i++) {
grpc_metadata *md = &metadata[i];
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
@@ -579,14 +562,37 @@ static int prepare_application_metadata(grpc_call *call, int count,
GRPC_MDSTR_LENGTH(l->md->key))) {
gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s",
grpc_mdstr_as_c_string(l->md->key));
- return 0;
+ break;
} else if (!grpc_is_binary_header(grpc_mdstr_as_c_string(l->md->key),
GRPC_MDSTR_LENGTH(l->md->key)) &&
!grpc_header_nonbin_value_is_legal(
grpc_mdstr_as_c_string(l->md->value),
GRPC_MDSTR_LENGTH(l->md->value))) {
gpr_log(GPR_ERROR, "attempt to send invalid metadata value");
- return 0;
+ break;
+ }
+ }
+ if (i != count) {
+ for (int j = 0; j <= i; j++) {
+ grpc_metadata *md = &metadata[j];
+ grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ GRPC_MDELEM_UNREF(l->md);
+ }
+ return 0;
+ }
+ if (prepend_extra_metadata) {
+ if (call->send_extra_metadata_count == 0) {
+ prepend_extra_metadata = 0;
+ } else {
+ for (i = 0; i < call->send_extra_metadata_count; i++) {
+ GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
+ }
+ for (i = 1; i < call->send_extra_metadata_count; i++) {
+ call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
+ }
+ for (i = 0; i < call->send_extra_metadata_count - 1; i++) {
+ call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1];
+ }
}
}
for (i = 1; i < count; i++) {
@@ -1057,12 +1063,12 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
- if (bctl->call->has_initial_md_been_received) {
+ if (bctl->call->has_initial_md_been_received || !success ||
+ call->receiving_stream == NULL) {
gpr_mu_unlock(&bctl->call->mu);
process_data_after_md(exec_ctx, bctlp, success);
} else {
- call->saved_receiving_stream_ready_ctx.bctlp = bctlp;
- call->saved_receiving_stream_ready_ctx.success = success;
+ call->saved_receiving_stream_ready_bctlp = bctlp;
gpr_mu_unlock(&bctl->call->mu);
}
}
@@ -1091,13 +1097,11 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
call->has_initial_md_been_received = true;
- if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
+ if (call->saved_receiving_stream_ready_bctlp != NULL) {
grpc_closure *saved_rsr_closure = grpc_closure_create(
- receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
- grpc_exec_ctx_enqueue(
- exec_ctx, saved_rsr_closure,
- call->saved_receiving_stream_ready_ctx.success && success, NULL);
- call->saved_receiving_stream_ready_ctx.bctlp = NULL;
+ receiving_stream_ready, call->saved_receiving_stream_ready_bctlp);
+ call->saved_receiving_stream_ready_bctlp = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL);
}
gpr_mu_unlock(&call->mu);
@@ -1133,6 +1137,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_trailing_filter, call);
+ call->received_final_op = true;
if (call->have_alarm) {
grpc_timer_cancel(exec_ctx, &call->alarm);
}
@@ -1377,11 +1382,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_NOT_ON_SERVER;
goto done_with_error;
}
- if (call->received_final_op) {
+ if (call->requested_final_op) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->received_final_op = 1;
+ call->requested_final_op = 1;
call->buffered_metadata[1] =
op->data.recv_status_on_client.trailing_metadata;
call->final_op.client.status = op->data.recv_status_on_client.status;
@@ -1404,11 +1409,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
goto done_with_error;
}
- if (call->received_final_op) {
+ if (call->requested_final_op) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
- call->received_final_op = 1;
+ call->requested_final_op = 1;
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
bctl->recv_final_op = 1;
@@ -1457,7 +1462,7 @@ done_with_error:
call->receiving_message = 0;
}
if (bctl->recv_final_op) {
- call->received_final_op = 0;
+ call->requested_final_op = 0;
}
gpr_mu_unlock(&call->mu);
goto done;
@@ -1512,3 +1517,39 @@ grpc_compression_algorithm grpc_call_compression_for_level(
gpr_mu_unlock(&call->mu);
return grpc_compression_algorithm_for_level(level, accepted_encodings);
}
+
+const char *grpc_call_error_to_string(grpc_call_error error) {
+ switch (error) {
+ case GRPC_CALL_ERROR:
+ return "GRPC_CALL_ERROR";
+ case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
+ return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
+ case GRPC_CALL_ERROR_ALREADY_FINISHED:
+ return "GRPC_CALL_ERROR_ALREADY_FINISHED";
+ case GRPC_CALL_ERROR_ALREADY_INVOKED:
+ return "GRPC_CALL_ERROR_ALREADY_INVOKED";
+ case GRPC_CALL_ERROR_BATCH_TOO_BIG:
+ return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
+ case GRPC_CALL_ERROR_INVALID_FLAGS:
+ return "GRPC_CALL_ERROR_INVALID_FLAGS";
+ case GRPC_CALL_ERROR_INVALID_MESSAGE:
+ return "GRPC_CALL_ERROR_INVALID_MESSAGE";
+ case GRPC_CALL_ERROR_INVALID_METADATA:
+ return "GRPC_CALL_ERROR_INVALID_METADATA";
+ case GRPC_CALL_ERROR_NOT_INVOKED:
+ return "GRPC_CALL_ERROR_NOT_INVOKED";
+ case GRPC_CALL_ERROR_NOT_ON_CLIENT:
+ return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
+ case GRPC_CALL_ERROR_NOT_ON_SERVER:
+ return "GRPC_CALL_ERROR_NOT_ON_SERVER";
+ case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
+ return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
+ case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
+ return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
+ case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
+ return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_OK:
+ return "GRPC_CALL_OK";
+ }
+ GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
+}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 5ec8808b50..1f82c3bad2 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -227,6 +227,10 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
#endif
GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
+ GRPC_API_TRACE(
+ "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, "
+ "done_arg=%p, storage=%p)",
+ 7, (exec_ctx, cc, tag, success, done, done_arg, storage));
storage->tag = tag;
storage->done = done;
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 03f379aba8..57c6897626 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -164,10 +164,10 @@ void grpc_init(void) {
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
grpc_register_tracer("http1", &grpc_http1_trace);
+ grpc_register_tracer("compression", &grpc_compress_filter_trace);
grpc_security_pre_init();
grpc_iomgr_init();
grpc_executor_init();
- grpc_tracer_init("GRPC_TRACE");
gpr_timers_global_init();
grpc_cq_global_init();
for (i = 0; i < g_number_of_plugins; i++) {
@@ -179,6 +179,7 @@ void grpc_init(void) {
* at the appropriate time */
grpc_register_security_filters();
register_builtin_channel_init();
+ grpc_tracer_init("GRPC_TRACE");
/* no more changes to channel init pipelines */
grpc_channel_init_finalize();
}
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index c1f6812c4e..f50ec54cea 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -99,13 +99,18 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->on_consumed != NULL) {
op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 1);
}
+ if (op->send_ping != NULL) {
+ op->send_ping->cb(exec_ctx, op->send_ping->cb_arg, 0);
+ }
}
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {}
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {}
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *and_free_memory) {
+ gpr_free(and_free_memory);
+}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index ad8ee8c7a9..2db95b9cdf 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -820,8 +820,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_ref(chand->server);
}
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ void *ignored) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
diff --git a/src/core/lib/surface/validate_metadata.c b/src/core/lib/surface/validate_metadata.c
index bf4126867f..84f0a083bc 100644
--- a/src/core/lib/surface/validate_metadata.c
+++ b/src/core/lib/surface/validate_metadata.c
@@ -40,7 +40,7 @@ static int conforms_to(const char *s, size_t len, const uint8_t *legal_bits) {
const char *p = s;
const char *e = s + len;
for (; p != e; p++) {
- int idx = *p;
+ int idx = (uint8_t)*p;
int byte = idx / 8;
int bit = idx % 8;
if ((legal_bits[byte] & (1 << bit)) == 0) return 0;
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index fe954cbefb..aca76d2bb7 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,4 +36,4 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "0.14.0-dev"; }
+const char *grpc_version_string(void) { return "0.15.0-dev"; }
diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c
index 779efbb97d..5847ec9053 100644
--- a/src/core/lib/transport/metadata.c
+++ b/src/core/lib/transport/metadata.c
@@ -386,10 +386,18 @@ grpc_mdstr *grpc_mdstr_from_buffer(const uint8_t *buf, size_t length) {
for (s = shard->strs[idx]; s; s = s->bucket_next) {
if (s->hash == hash && GPR_SLICE_LENGTH(s->slice) == length &&
0 == memcmp(buf, GPR_SLICE_START_PTR(s->slice), length)) {
- GRPC_MDSTR_REF((grpc_mdstr *)s);
- gpr_mu_unlock(&shard->mu);
- GPR_TIMER_END("grpc_mdstr_from_buffer", 0);
- return (grpc_mdstr *)s;
+ if (gpr_atm_full_fetch_add(&s->refcnt, 1) == 0) {
+ /* If we get here, we've added a ref to something that was about to
+ * die - drop it immediately.
+ * The *only* possible path here (given the shard mutex) should be to
+ * drop from one ref back to zero - assert that with a CAS */
+ GPR_ASSERT(gpr_atm_rel_cas(&s->refcnt, 1, 0));
+ /* and treat this as if we were never here... sshhh */
+ } else {
+ gpr_mu_unlock(&shard->mu);
+ GPR_TIMER_END("grpc_mdstr_from_buffer", 0);
+ return (grpc_mdstr *)s;
+ }
}
}
@@ -397,7 +405,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(const uint8_t *buf, size_t length) {
if (length + 1 < GPR_SLICE_INLINED_SIZE) {
/* string data goes directly into the slice */
s = gpr_malloc(sizeof(internal_string));
- gpr_atm_rel_store(&s->refcnt, 2);
+ gpr_atm_rel_store(&s->refcnt, 1);
s->slice.refcount = NULL;
memcpy(s->slice.data.inlined.bytes, buf, length);
s->slice.data.inlined.bytes[length] = 0;
@@ -406,7 +414,7 @@ grpc_mdstr *grpc_mdstr_from_buffer(const uint8_t *buf, size_t length) {
/* string data goes after the internal_string header, and we +1 for null
terminator */
s = gpr_malloc(sizeof(internal_string) + length + 1);
- gpr_atm_rel_store(&s->refcnt, 2);
+ gpr_atm_rel_store(&s->refcnt, 1);
s->refcount.ref = slice_ref;
s->refcount.unref = slice_unref;
s->slice.refcount = &s->refcount;
@@ -675,20 +683,19 @@ const char *grpc_mdstr_as_c_string(grpc_mdstr *s) {
grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) {
internal_string *s = (internal_string *)gs;
if (is_mdstr_static(gs)) return gs;
- GPR_ASSERT(gpr_atm_full_fetch_add(&s->refcnt, 1) != 0);
+ GPR_ASSERT(gpr_atm_full_fetch_add(&s->refcnt, 1) > 0);
return gs;
}
void grpc_mdstr_unref(grpc_mdstr *gs DEBUG_ARGS) {
internal_string *s = (internal_string *)gs;
if (is_mdstr_static(gs)) return;
- if (2 == gpr_atm_full_fetch_add(&s->refcnt, -1)) {
+ if (1 == gpr_atm_full_fetch_add(&s->refcnt, -1)) {
strtab_shard *shard =
&g_strtab_shard[SHARD_IDX(s->hash, LOG2_STRTAB_SHARD_COUNT)];
gpr_mu_lock(&shard->mu);
- if (1 == gpr_atm_no_barrier_load(&s->refcnt)) {
- internal_destroy_string(shard, s);
- }
+ GPR_ASSERT(0 == gpr_atm_no_barrier_load(&s->refcnt));
+ internal_destroy_string(shard, s);
gpr_mu_unlock(&shard->mu);
}
}
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index e29e8df2c9..6d82f4d681 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -120,6 +120,7 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *),
void *user_data);
/* Reference counting */
+//#define GRPC_METADATA_REFCOUNT_DEBUG
#ifdef GRPC_METADATA_REFCOUNT_DEBUG
#define GRPC_MDSTR_REF(s) grpc_mdstr_ref((s), __FILE__, __LINE__)
#define GRPC_MDSTR_UNREF(s) grpc_mdstr_unref((s), __FILE__, __LINE__)
@@ -146,6 +147,10 @@ const char *grpc_mdstr_as_c_string(grpc_mdstr *s);
#define GRPC_MDSTR_LENGTH(s) (GPR_SLICE_LENGTH(s->slice))
+/* We add 32 bytes of padding as per RFC-7540 section 6.5.2. */
+#define GRPC_MDELEM_LENGTH(e) \
+ (GRPC_MDSTR_LENGTH((e)->key) + GRPC_MDSTR_LENGTH((e)->value) + 32)
+
int grpc_mdstr_is_legal_header(grpc_mdstr *s);
int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s);
int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s);
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index 4567221a48..e4398abeb7 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -192,3 +192,12 @@ int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) {
gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type),
batch->deadline) == 0;
}
+
+size_t grpc_metadata_batch_size(grpc_metadata_batch *batch) {
+ size_t size = 0;
+ for (grpc_linked_mdelem *elem = batch->list.head; elem != NULL;
+ elem = elem->next) {
+ size += GRPC_MDELEM_LENGTH(elem->md);
+ }
+ return size;
+}
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index b62668876e..7af823f7ca 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -66,6 +66,9 @@ void grpc_metadata_batch_destroy(grpc_metadata_batch *batch);
void grpc_metadata_batch_clear(grpc_metadata_batch *batch);
int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch);
+/* Returns the transport size of the batch. */
+size_t grpc_metadata_batch_size(grpc_metadata_batch *batch);
+
/** Moves the metadata information from \a src to \a dst. Upon return, \a src is
* zeroed. */
void grpc_metadata_batch_move(grpc_metadata_batch *dst,
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 53c634adca..e6d524abe6 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -133,8 +133,9 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream) {
- transport->vtable->destroy_stream(exec_ctx, transport, stream);
+ grpc_stream *stream, void *and_free_memory) {
+ transport->vtable->destroy_stream(exec_ctx, transport, stream,
+ and_free_memory);
}
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 1eb446312b..482a9d1791 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -98,6 +98,11 @@ void grpc_transport_move_stats(grpc_transport_stream_stats *from,
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
+ /** Should be enqueued when all requested operations (excluding recv_message
+ and recv_initial_metadata which have their own closures) in a given batch
+ have been completed. */
+ grpc_closure *on_complete;
+
/** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata;
@@ -129,11 +134,6 @@ typedef struct grpc_transport_stream_op {
/** Collect any stats into provided buffer, zero internal stat counters */
grpc_transport_stream_stats *collect_stats;
- /** Should be enqueued when all requested operations (excluding recv_message
- and recv_initial_metadata which have their own closures) in a given batch
- have been completed. */
- grpc_closure *on_complete;
-
/** If != GRPC_STATUS_OK, cancel this stream */
grpc_status_code cancel_with_status;
@@ -213,7 +213,7 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
caller, but any child memory must be cleaned up) */
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- grpc_stream *stream);
+ grpc_stream *stream, void *and_free_memory);
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index 2ff67073af..956155eec8 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -63,7 +63,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
- grpc_stream *stream);
+ grpc_stream *stream, void *and_free_memory);
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);