aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-01 13:21:57 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-01 13:21:57 -0700
commitd6c16558b40b723d61039c2bfbf491f90adf3ecb (patch)
treeb30695b00ac0a6c7554d98f2b974e8f7bb8fbbc5 /src/core
parent9805bb2ce86413105e706da41a1b3e6040264e34 (diff)
parent5ae895a5d06fad59a89ce6e8923b1145dea663bd (diff)
Merge github.com:grpc/grpc into one-read
Conflicts: src/core/iomgr/tcp_posix.c src/core/profiling/basic_timers.c
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/pollset_posix.c2
-rw-r--r--src/core/iomgr/tcp_posix.c16
-rw-r--r--src/core/iomgr/tcp_windows.c2
-rw-r--r--src/core/profiling/basic_timers.c (renamed from src/core/profiling/timers.c)42
-rw-r--r--src/core/profiling/stap_probes.d6
-rw-r--r--src/core/profiling/stap_timers.c57
-rw-r--r--src/core/profiling/timers.h90
-rw-r--r--src/core/security/google_default_credentials.c2
-rw-r--r--src/core/support/cpu_windows.c12
-rw-r--r--src/core/surface/call.c333
-rw-r--r--src/core/surface/channel.c7
-rw-r--r--src/core/surface/completion_queue.c96
-rw-r--r--src/core/surface/completion_queue.h41
-rw-r--r--src/core/surface/event_string.c45
-rw-r--r--src/core/surface/init.c4
-rw-r--r--src/core/surface/server.c64
-rw-r--r--src/core/transport/chttp2/stream_encoder.c10
-rw-r--r--src/core/transport/chttp2_transport.c19
18 files changed, 216 insertions, 632 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 60d0dad6d8..4d1bcad9e2 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -411,7 +411,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout);
- GRPC_TIMER_MARK(POLL_FINISHED, r);
+ GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
grpc_fd_end_poll(&fd_watcher);
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 3825713aba..ccd7833aa4 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -328,7 +328,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) {
size_t final_nslices;
GPR_ASSERT(!tcp->finished_edge);
- GRPC_TIMER_MARK(HANDLE_READ_BEGIN, 0);
+ GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0);
@@ -409,7 +409,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) {
grpc_tcp_unref(tcp);
}
- GRPC_TIMER_MARK(HANDLE_READ_END, 0);
+ GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
}
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
@@ -458,12 +458,12 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
msg.msg_controllen = 0;
msg.msg_flags = 0;
- GRPC_TIMER_MARK(SENDMSG_BEGIN, 0);
+ GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0);
do {
/* TODO(klempner): Cork if this is a partial write */
sent_length = sendmsg(tcp->fd, &msg, 0);
} while (sent_length < 0 && errno == EINTR);
- GRPC_TIMER_MARK(SENDMSG_END, 0);
+ GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0);
if (sent_length < 0) {
if (errno == EAGAIN) {
@@ -499,7 +499,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
return;
}
- GRPC_TIMER_MARK(CB_WRITE_BEGIN, 0);
+ GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
write_status = grpc_tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
@@ -515,7 +515,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
cb(tcp->write_user_data, cb_status);
grpc_tcp_unref(tcp);
}
- GRPC_TIMER_MARK(CB_WRITE_END, 0);
+ GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
}
static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
@@ -538,7 +538,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
}
}
- GRPC_TIMER_MARK(WRITE_BEGIN, 0);
+ GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
GPR_ASSERT(tcp->write_cb == NULL);
slice_state_init(&tcp->write_state, slices, nslices, nslices);
@@ -552,7 +552,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
}
- GRPC_TIMER_MARK(WRITE_END, 0);
+ GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
return status;
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 71534eaa3d..940cd5bcde 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -289,7 +289,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
return ret;
}
- memset(&socket->write_info, 0, sizeof(OVERLAPPED));
+ memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED));
status = WSASend(socket->socket, buffers, tcp->write_slices.count,
&bytes_sent, 0, &socket->write_info.overlapped, NULL);
if (allocated) gpr_free(allocated);
diff --git a/src/core/profiling/timers.c b/src/core/profiling/basic_timers.c
index bd1700ffd8..b36f4d3902 100644
--- a/src/core/profiling/timers.c
+++ b/src/core/profiling/basic_timers.c
@@ -31,7 +31,9 @@
*
*/
-#ifdef GRPC_LATENCY_PROFILER
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_BASIC_PROFILER
#include "src/core/profiling/timers.h"
#include "src/core/profiling/timers_preciseclock.h"
@@ -46,7 +48,7 @@
typedef struct grpc_timer_entry {
grpc_precise_clock tm;
gpr_thd_id thd;
- const char* tag;
+ int tag;
void* id;
const char* file;
int line;
@@ -63,7 +65,7 @@ struct grpc_timers_log {
grpc_timers_log* grpc_timers_log_global = NULL;
-grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump) {
+static grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump) {
grpc_timers_log* log = gpr_malloc(sizeof(*log));
/* TODO (vpai): Allow allocation below limit */
@@ -87,15 +89,20 @@ static void log_report_locked(grpc_timers_log* log) {
grpc_timer_entry* entry = &(log->log[i]);
fprintf(fp, "GRPC_LAT_PROF ");
grpc_precise_clock_print(&entry->tm, fp);
+<<<<<<< HEAD:src/core/profiling/timers.c
fprintf(fp, " %p %s %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->tag, entry->id, entry->file,
entry->line);
+=======
+ fprintf(fp, " %p %d %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->tag,
+ entry->id, entry->file, entry->line);
+>>>>>>> 5ae895a5d06fad59a89ce6e8923b1145dea663bd:src/core/profiling/basic_timers.c
}
/* Now clear out the log */
log->num_entries = 0;
}
-void grpc_timers_log_destroy(grpc_timers_log* log) {
+static void grpc_timers_log_destroy(grpc_timers_log* log) {
gpr_mu_lock(&log->mu);
log_report_locked(log);
gpr_mu_unlock(&log->mu);
@@ -106,8 +113,8 @@ void grpc_timers_log_destroy(grpc_timers_log* log) {
gpr_free(log);
}
-void grpc_timers_log_add(grpc_timers_log* log, const char* tag, void* id,
- const char* file, int line) {
+static void grpc_timers_log_add(grpc_timers_log* log, int tag, void* id,
+ const char* file, int line) {
grpc_timer_entry* entry;
/* TODO (vpai) : Improve concurrency */
@@ -128,14 +135,25 @@ void grpc_timers_log_add(grpc_timers_log* log, const char* tag, void* id,
gpr_mu_unlock(&log->mu);
}
-void grpc_timers_log_global_init(void) {
+/* Latency profiler API implementation. */
+void grpc_timer_add_mark(int tag, void* id, const char* file, int line) {
+ grpc_timers_log_add(grpc_timers_log_global, tag, id, file, line);
+}
+
+void grpc_timer_begin(int tag, void* id, const char *file, int line) {}
+void grpc_timer_end(int tag, void* id, const char *file, int line) {}
+
+/* Basic profiler specific API functions. */
+void grpc_timers_global_init(void) {
grpc_timers_log_global = grpc_timers_log_create(100000, stdout);
}
-void grpc_timers_log_global_destroy(void) {
+void grpc_timers_global_destroy(void) {
grpc_timers_log_destroy(grpc_timers_log_global);
}
-#else /* !GRPC_LATENCY_PROFILER */
-void grpc_timers_log_global_init(void) {}
-void grpc_timers_log_global_destroy(void) {}
-#endif /* GRPC_LATENCY_PROFILER */
+
+
+#else /* !GRPC_BASIC_PROFILER */
+void grpc_timers_global_init(void) {}
+void grpc_timers_global_destroy(void) {}
+#endif /* GRPC_BASIC_PROFILER */
diff --git a/src/core/profiling/stap_probes.d b/src/core/profiling/stap_probes.d
new file mode 100644
index 0000000000..374eeedd6c
--- /dev/null
+++ b/src/core/profiling/stap_probes.d
@@ -0,0 +1,6 @@
+provider _stap {
+ probe add_mark(int tag);
+ probe timing_ns_begin(int tag);
+ probe timing_ns_end(int tag);
+};
+
diff --git a/src/core/profiling/stap_timers.c b/src/core/profiling/stap_timers.c
new file mode 100644
index 0000000000..52fb58a279
--- /dev/null
+++ b/src/core/profiling/stap_timers.c
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_STAP_PROFILER
+
+#include "src/core/profiling/timers.h"
+
+#include <sys/sdt.h>
+/* Generated from src/core/profiling/stap_probes.d */
+#include "src/core/profiling/stap_probes.h"
+
+/* Latency profiler API implementation. */
+void grpc_timer_add_mark(int tag, void* id, const char *file, int line) {
+ _STAP_ADD_MARK(tag);
+}
+
+void grpc_timer_begin(int tag, void* id, const char *file, int line) {
+ _STAP_TIMING_NS_BEGIN(tag);
+}
+
+void grpc_timer_end(int tag, void* id, const char *file, int line) {
+ _STAP_TIMING_NS_END(tag);
+}
+
+#endif /* GRPC_STAP_PROFILER */
diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h
index 1a6b9ffac9..d0b8286c03 100644
--- a/src/core/profiling/timers.h
+++ b/src/core/profiling/timers.h
@@ -34,35 +34,87 @@
#ifndef GRPC_CORE_PROFILING_TIMERS_H
#define GRPC_CORE_PROFILING_TIMERS_H
-#include <stdio.h>
-
#ifdef __cplusplus
extern "C" {
#endif
-#ifdef GRPC_LATENCY_PROFILER
+void grpc_timers_global_init(void);
+void grpc_timers_global_destroy(void);
-typedef struct grpc_timers_log grpc_timers_log;
+void grpc_timer_add_mark(int tag, void* id, const char *file, int line);
+void grpc_timer_begin(int tag, void* id, const char *file, int line);
+void grpc_timer_end(int tag, void* id, const char *file, int line);
-grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump);
-void grpc_timers_log_add(grpc_timers_log*, const char* tag, void* id,
- const char* file, int line);
-void grpc_timers_log_destroy(grpc_timers_log *);
+enum grpc_profiling_tags {
+ /* Any GRPC_PTAG_* >= than the threshold won't generate any profiling mark. */
+ GRPC_PTAG_IGNORE_THRESHOLD = 1000000,
-extern grpc_timers_log *grpc_timers_log_global;
+ /* Re. Protos. */
+ GRPC_PTAG_PROTO_SERIALIZE = 100 + GRPC_PTAG_IGNORE_THRESHOLD,
+ GRPC_PTAG_PROTO_DESERIALIZE = 101 + GRPC_PTAG_IGNORE_THRESHOLD,
+
+ /* Re. sockets. */
+ GRPC_PTAG_HANDLE_READ = 200 + GRPC_PTAG_IGNORE_THRESHOLD,
+ GRPC_PTAG_SENDMSG = 201 + GRPC_PTAG_IGNORE_THRESHOLD,
+ GRPC_PTAG_RECVMSG = 202 + GRPC_PTAG_IGNORE_THRESHOLD,
+ GRPC_PTAG_POLL_FINISHED = 203 + GRPC_PTAG_IGNORE_THRESHOLD,
+ GRPC_PTAG_TCP_CB_WRITE = 204 + GRPC_PTAG_IGNORE_THRESHOLD,
+ GRPC_PTAG_TCP_WRITE = 205 + GRPC_PTAG_IGNORE_THRESHOLD,
+
+ /* C++ */
+ GRPC_PTAG_CPP_CALL_CREATED = 300 + GRPC_PTAG_IGNORE_THRESHOLD,
+ GRPC_PTAG_CPP_PERFORM_OPS = 301 + GRPC_PTAG_IGNORE_THRESHOLD,
-#define GRPC_TIMER_MARK(x, s) \
- grpc_timers_log_add(grpc_timers_log_global, #x, ((void *)(gpr_intptr)(s)), \
- __FILE__, __LINE__)
+ /* > 1024 Unassigned reserved. For any miscellaneous use.
+ * Use addition to generate tags from this base or take advantage of the 10
+ * zero'd bits for OR-ing. */
+ GRPC_PTAG_OTHER_BASE = 1024
+};
-#else /* !GRPC_LATENCY_PROFILER */
-#define GRPC_TIMER_MARK(x, s) \
- do { \
- } while (0)
-#endif /* GRPC_LATENCY_PROFILER */
+#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
+/* No profiling. No-op all the things. */
+#define GRPC_TIMER_MARK(tag, id) \
+ do {} while(0)
+
+#define GRPC_TIMER_BEGIN(tag, id) \
+ do {} while(0)
+
+#define GRPC_TIMER_END(tag, id) \
+ do {} while(0)
+
+#else /* at least one profiler requested... */
+/* ... hopefully only one. */
+#if defined(GRPC_STAP_PROFILER) && defined(GRPC_BASIC_PROFILER)
+#error "GRPC_STAP_PROFILER and GRPC_BASIC_PROFILER are mutually exclusive."
+#endif
+
+/* Generic profiling interface. */
+#define GRPC_TIMER_MARK(tag, id) \
+ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
+ grpc_timer_add_mark(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
+ }
+
+#define GRPC_TIMER_BEGIN(tag, id) \
+ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
+ grpc_timer_begin(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
+ }
+
+#define GRPC_TIMER_END(tag, id) \
+ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
+ grpc_timer_end(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
+ }
+
+#ifdef GRPC_STAP_PROFILER
+/* Empty placeholder for now. */
+#endif /* GRPC_STAP_PROFILER */
+
+#ifdef GRPC_BASIC_PROFILER
+typedef struct grpc_timers_log grpc_timers_log;
+
+extern grpc_timers_log *grpc_timers_log_global;
+#endif /* GRPC_BASIC_PROFILER */
-void grpc_timers_log_global_init(void);
-void grpc_timers_log_global_destroy(void);
+#endif /* at least one profiler requested. */
#ifdef __cplusplus
}
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index d2f46ddd07..0e4b9fc9d3 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -163,7 +163,7 @@ grpc_credentials *grpc_google_default_credentials_create(void) {
gpr_mu_lock(&g_mu);
if (default_credentials != NULL) {
- result = default_credentials;
+ result = grpc_credentials_ref(default_credentials);
serving_cached_credentials = 1;
goto end;
}
diff --git a/src/core/support/cpu_windows.c b/src/core/support/cpu_windows.c
index cb454ccd3b..f56bab3f8b 100644
--- a/src/core/support/cpu_windows.c
+++ b/src/core/support/cpu_windows.c
@@ -34,19 +34,17 @@
#include <grpc/support/port_platform.h>
#ifdef GPR_WIN32
-
+#include <windows.h>
#include <grpc/support/log.h>
unsigned gpr_cpu_num_cores(void) {
- /* TODO(jtattermusch): implement */
- gpr_log(GPR_ERROR, "Cannot determine number of CPUs: assuming 1");
- return 1;
+ SYSTEM_INFO si;
+ GetSystemInfo(&si);
+ return si.dwNumberOfProcessors;
}
unsigned gpr_cpu_current_cpu(void) {
- /* TODO(jtattermusch): implement */
- gpr_log(GPR_ERROR, "Cannot determine current CPU");
- return 0;
+ return GetCurrentProcessorNumber();
}
#endif /* GPR_WIN32 */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index fc07a19894..070be1b25a 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -47,9 +47,6 @@
#include <stdlib.h>
#include <string.h>
-typedef struct legacy_state legacy_state;
-static void destroy_legacy_state(legacy_state *ls);
-
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
typedef enum {
@@ -226,10 +223,6 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
-
- /* Data that the legacy api needs to track. To be deleted at some point
- soon */
- legacy_state *legacy_state;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -353,9 +346,6 @@ static void destroy_call(void *call, int ignored_success) {
}
grpc_sopb_destroy(&c->send_ops);
grpc_sopb_destroy(&c->recv_ops);
- if (c->legacy_state) {
- destroy_legacy_state(c->legacy_state);
- }
grpc_bbq_destroy(&c->incoming_queue);
gpr_slice_buffer_destroy(&c->incoming_message);
gpr_free(c);
@@ -404,12 +394,6 @@ static void set_status_details(grpc_call *call, status_source source,
call->status[source].details = status;
}
-static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
- if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
- call->cq = cq;
- return GRPC_CALL_OK;
-}
-
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@@ -729,6 +713,10 @@ static void call_on_done_recv(void *pc, int success) {
if (call->recv_state == GRPC_STREAM_CLOSED) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
+ if (call->have_alarm) {
+ grpc_alarm_cancel(&call->alarm);
+ call->have_alarm = 0;
+ }
}
finish_read_ops(call);
} else {
@@ -1157,7 +1145,7 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {
- grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
+ grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
}
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
@@ -1172,7 +1160,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
if (nops == 0) {
grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
- grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
+ grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
return GRPC_CALL_OK;
}
@@ -1268,312 +1256,3 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
tag);
}
-
-/*
- * LEGACY API IMPLEMENTATION
- * All this code will disappear as soon as wrappings are updated
- */
-
-struct legacy_state {
- gpr_uint8 md_out_buffer;
- size_t md_out_count[2];
- size_t md_out_capacity[2];
- grpc_metadata *md_out[2];
- grpc_byte_buffer *msg_out;
-
- /* input buffers */
- grpc_metadata_array initial_md_in;
- grpc_metadata_array trailing_md_in;
-
- size_t details_capacity;
- char *details;
- grpc_status_code status;
-
- char *send_details;
-
- size_t msg_in_read_idx;
- grpc_byte_buffer *msg_in;
-
- void *finished_tag;
-};
-
-static legacy_state *get_legacy_state(grpc_call *call) {
- if (call->legacy_state == NULL) {
- call->legacy_state = gpr_malloc(sizeof(legacy_state));
- memset(call->legacy_state, 0, sizeof(legacy_state));
- }
- return call->legacy_state;
-}
-
-static void destroy_legacy_state(legacy_state *ls) {
- size_t i, j;
- for (i = 0; i < 2; i++) {
- for (j = 0; j < ls->md_out_count[i]; j++) {
- gpr_free((char *)ls->md_out[i][j].key);
- gpr_free((char *)ls->md_out[i][j].value);
- }
- gpr_free(ls->md_out[i]);
- }
- gpr_free(ls->initial_md_in.metadata);
- gpr_free(ls->trailing_md_in.metadata);
- gpr_free(ls->details);
- gpr_free(ls->send_details);
- gpr_free(ls);
-}
-
-grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
- grpc_metadata *metadata,
- gpr_uint32 flags) {
- legacy_state *ls;
- grpc_metadata *mdout;
-
- lock(call);
- ls = get_legacy_state(call);
-
- if (ls->md_out_count[ls->md_out_buffer] ==
- ls->md_out_capacity[ls->md_out_buffer]) {
- ls->md_out_capacity[ls->md_out_buffer] =
- GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2,
- ls->md_out_capacity[ls->md_out_buffer] + 8);
- ls->md_out[ls->md_out_buffer] = gpr_realloc(
- ls->md_out[ls->md_out_buffer],
- sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
- }
- mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++];
- mdout->key = gpr_strdup(metadata->key);
- mdout->value = gpr_malloc(metadata->value_length);
- mdout->value_length = metadata->value_length;
- memcpy((char *)mdout->value, metadata->value, metadata->value_length);
-
- unlock(call);
-
- return GRPC_CALL_OK;
-}
-
-static void finish_status(grpc_call *call, grpc_op_error status,
- void *ignored) {
- legacy_state *ls;
-
- lock(call);
- ls = get_legacy_state(call);
- grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
- ls->status, ls->details, ls->trailing_md_in.metadata,
- ls->trailing_md_in.count);
- unlock(call);
-}
-
-static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
- void *tag) {
- legacy_state *ls;
-
- lock(call);
- ls = get_legacy_state(call);
- if (status == GRPC_OP_OK) {
- grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
- ls->initial_md_in.count,
- ls->initial_md_in.metadata);
-
- } else {
- grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
- NULL);
- }
- unlock(call);
-}
-
-static void finish_send_metadata(grpc_call *call, grpc_op_error status,
- void *tag) {}
-
-grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
- void *metadata_read_tag,
- void *finished_tag, gpr_uint32 flags) {
- grpc_ioreq reqs[4];
- legacy_state *ls;
- grpc_call_error err;
-
- grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
- grpc_cq_begin_op(cq, call, GRPC_FINISHED);
-
- lock(call);
- ls = get_legacy_state(call);
- err = bind_cq(call, cq);
- if (err != GRPC_CALL_OK) goto done;
-
- ls->finished_tag = finished_tag;
-
- reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA;
- reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
- reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
- ls->md_out_buffer++;
- err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL);
- if (err != GRPC_CALL_OK) goto done;
-
- reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
- reqs[0].data.recv_metadata = &ls->initial_md_in;
- err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag);
- if (err != GRPC_CALL_OK) goto done;
-
- reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
- reqs[0].data.recv_metadata = &ls->trailing_md_in;
- reqs[1].op = GRPC_IOREQ_RECV_STATUS;
- reqs[1].data.recv_status.user_data = &ls->status;
- reqs[1].data.recv_status.set_value = set_status_value_directly;
- reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS;
- reqs[2].data.recv_status_details.details = &ls->details;
- reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity;
- reqs[3].op = GRPC_IOREQ_RECV_CLOSE;
- err = start_ioreq(call, reqs, 4, finish_status, NULL);
- if (err != GRPC_CALL_OK) goto done;
-
-done:
- unlock(call);
- return err;
-}
-
-grpc_call_error grpc_call_server_accept_old(grpc_call *call,
- grpc_completion_queue *cq,
- void *finished_tag) {
- grpc_ioreq reqs[2];
- grpc_call_error err;
- legacy_state *ls;
-
- /* inform the completion queue of an incoming operation (corresponding to
- finished_tag) */
- grpc_cq_begin_op(cq, call, GRPC_FINISHED);
-
- lock(call);
- ls = get_legacy_state(call);
-
- err = bind_cq(call, cq);
- if (err != GRPC_CALL_OK) {
- unlock(call);
- return err;
- }
-
- ls->finished_tag = finished_tag;
-
- reqs[0].op = GRPC_IOREQ_RECV_STATUS;
- reqs[0].data.recv_status.user_data = &ls->status;
- reqs[0].data.recv_status.set_value = set_status_value_directly;
- reqs[1].op = GRPC_IOREQ_RECV_CLOSE;
- err = start_ioreq(call, reqs, 2, finish_status, NULL);
- unlock(call);
- return err;
-}
-
-static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status,
- void *tag) {}
-
-grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
- gpr_uint32 flags) {
- grpc_ioreq req;
- grpc_call_error err;
- legacy_state *ls;
-
- lock(call);
- ls = get_legacy_state(call);
- req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
- req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
- req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
- err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
- unlock(call);
-
- return err;
-}
-
-static void finish_read_event(void *p, grpc_op_error error) {
- if (p) grpc_byte_buffer_destroy(p);
-}
-
-static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
- legacy_state *ls;
- grpc_byte_buffer *msg;
- lock(call);
- ls = get_legacy_state(call);
- msg = ls->msg_in;
- grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
- unlock(call);
-}
-
-grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) {
- legacy_state *ls;
- grpc_ioreq req;
- grpc_call_error err;
-
- grpc_cq_begin_op(call->cq, call, GRPC_READ);
-
- lock(call);
- ls = get_legacy_state(call);
- req.op = GRPC_IOREQ_RECV_MESSAGE;
- req.data.recv_message = &ls->msg_in;
- err = start_ioreq(call, &req, 1, finish_read, tag);
- unlock(call);
- return err;
-}
-
-static void finish_write(grpc_call *call, grpc_op_error status, void *tag) {
- lock(call);
- grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out);
- unlock(call);
- grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status);
-}
-
-grpc_call_error grpc_call_start_write_old(grpc_call *call,
- grpc_byte_buffer *byte_buffer,
- void *tag, gpr_uint32 flags) {
- grpc_ioreq req;
- legacy_state *ls;
- grpc_call_error err;
-
- grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
-
- lock(call);
- ls = get_legacy_state(call);
- ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
- req.op = GRPC_IOREQ_SEND_MESSAGE;
- req.data.send_message = ls->msg_out;
- err = start_ioreq(call, &req, 1, finish_write, tag);
- unlock(call);
-
- return err;
-}
-
-static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) {
- grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status);
-}
-
-grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) {
- grpc_ioreq req;
- grpc_call_error err;
- grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
-
- lock(call);
- req.op = GRPC_IOREQ_SEND_CLOSE;
- err = start_ioreq(call, &req, 1, finish_finish, tag);
- unlock(call);
-
- return err;
-}
-
-grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
- grpc_status_code status,
- const char *details,
- void *tag) {
- grpc_ioreq reqs[3];
- grpc_call_error err;
- legacy_state *ls;
- grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
-
- lock(call);
- ls = get_legacy_state(call);
- reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
- reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
- reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
- reqs[1].op = GRPC_IOREQ_SEND_STATUS;
- reqs[1].data.send_status.code = status;
- reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details);
- reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
- err = start_ioreq(call, reqs, 3, finish_finish, tag);
- unlock(call);
-
- return err;
-}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 78f9144c19..be9da2b7f9 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -128,13 +128,6 @@ static grpc_call *grpc_channel_create_call_internal(
GPR_ARRAY_SIZE(send_metadata), deadline);
}
-grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
- const char *method, const char *host,
- gpr_timespec absolute_deadline) {
- return grpc_channel_create_call(channel, NULL, method, host,
- absolute_deadline);
-}
-
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_completion_queue *cq,
const char *method, const char *host,
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c1c97af337..3e9031807e 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -183,111 +183,17 @@ void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
-void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- grpc_byte_buffer *read) {
- event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
- ev->base.data.read = read;
- end_op_locked(cc, GRPC_READ);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
-void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
- grpc_call *call,
- grpc_event_finish_func on_finish,
- void *user_data, grpc_op_error error) {
- event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
- ev->base.data.write_accepted = error;
- end_op_locked(cc, GRPC_WRITE_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
-void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag,
- grpc_call *call, grpc_event_finish_func on_finish,
- void *user_data, grpc_op_error error) {
- event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
- ev->base.data.write_accepted = error;
- end_op_locked(cc, GRPC_OP_COMPLETE);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_op_error error) {
event *ev;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
- ev->base.data.write_accepted = error;
+ ev->base.data.op_complete = error;
end_op_locked(cc, GRPC_OP_COMPLETE);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
-void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
- grpc_call *call,
- grpc_event_finish_func on_finish,
- void *user_data, grpc_op_error error) {
- event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
- ev->base.data.finish_accepted = error;
- end_op_locked(cc, GRPC_FINISH_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
-void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
- grpc_call *call,
- grpc_event_finish_func on_finish,
- void *user_data, size_t count,
- grpc_metadata *elements) {
- event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
- user_data);
- ev->base.data.client_metadata_read.count = count;
- ev->base.data.client_metadata_read.elements = elements;
- end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
-void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- grpc_status_code status, const char *details,
- grpc_metadata *metadata_elements,
- size_t metadata_count) {
- event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
- ev->base.data.finished.status = status;
- ev->base.data.finished.details = details;
- ev->base.data.finished.metadata_count = metadata_count;
- ev->base.data.finished.metadata_elements = metadata_elements;
- end_op_locked(cc, GRPC_FINISHED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
-void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- const char *method, const char *host,
- gpr_timespec deadline, size_t metadata_count,
- grpc_metadata *metadata_elements) {
- event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
- ev->base.data.server_rpc_new.method = method;
- ev->base.data.server_rpc_new.host = host;
- ev->base.data.server_rpc_new.deadline = deadline;
- ev->base.data.server_rpc_new.metadata_count = metadata_count;
- ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
- end_op_locked(cc, GRPC_SERVER_RPC_NEW);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
static event *create_shutdown_event(void) {
event *ev = gpr_malloc(sizeof(event));
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 41024cda14..a0d7eeaac6 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -62,48 +62,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
Other parameters match the data member of grpc_event */
-/* Queue a GRPC_READ operation */
-void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- grpc_byte_buffer *read);
-/* Queue a GRPC_INVOKE_ACCEPTED operation */
-void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
- grpc_call *call,
- grpc_event_finish_func on_finish,
- void *user_data, grpc_op_error error);
-/* Queue a GRPC_WRITE_ACCEPTED operation */
-void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
- grpc_call *call,
- grpc_event_finish_func on_finish,
- void *user_data, grpc_op_error error);
-/* Queue a GRPC_FINISH_ACCEPTED operation */
-void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
- grpc_call *call,
- grpc_event_finish_func on_finish,
- void *user_data, grpc_op_error error);
/* Queue a GRPC_OP_COMPLETED operation */
-void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag,
- grpc_call *call, grpc_event_finish_func on_finish,
- void *user_data, grpc_op_error error);
-/* Queue a GRPC_CLIENT_METADATA_READ operation */
-void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
- grpc_call *call,
- grpc_event_finish_func on_finish,
- void *user_data, size_t count,
- grpc_metadata *elements);
-
-void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- grpc_status_code status, const char *details,
- grpc_metadata *metadata_elements,
- size_t metadata_count);
-
-void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- const char *method, const char *host,
- gpr_timespec deadline, size_t metadata_count,
- grpc_metadata *metadata_elements);
-
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_op_error error);
diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c
index 0fa3f166e2..30bdff6b85 100644
--- a/src/core/surface/event_string.c
+++ b/src/core/surface/event_string.c
@@ -62,7 +62,6 @@ static void adderr(gpr_strvec *buf, grpc_op_error err) {
char *grpc_event_string(grpc_event *ev) {
char *out;
- char *tmp;
gpr_strvec buf;
if (ev == NULL) return gpr_strdup("null");
@@ -76,55 +75,11 @@ char *grpc_event_string(grpc_event *ev) {
case GRPC_QUEUE_SHUTDOWN:
gpr_strvec_add(&buf, gpr_strdup("QUEUE_SHUTDOWN"));
break;
- case GRPC_READ:
- gpr_strvec_add(&buf, gpr_strdup("READ: "));
- addhdr(&buf, ev);
- if (ev->data.read) {
- gpr_asprintf(&tmp, " %d bytes",
- (int)grpc_byte_buffer_length(ev->data.read));
- gpr_strvec_add(&buf, tmp);
- } else {
- gpr_strvec_add(&buf, gpr_strdup(" end-of-stream"));
- }
- break;
case GRPC_OP_COMPLETE:
gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: "));
addhdr(&buf, ev);
adderr(&buf, ev->data.op_complete);
break;
- case GRPC_WRITE_ACCEPTED:
- gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: "));
- addhdr(&buf, ev);
- adderr(&buf, ev->data.write_accepted);
- break;
- case GRPC_FINISH_ACCEPTED:
- gpr_strvec_add(&buf, gpr_strdup("FINISH_ACCEPTED: "));
- addhdr(&buf, ev);
- adderr(&buf, ev->data.write_accepted);
- break;
- case GRPC_CLIENT_METADATA_READ:
- gpr_strvec_add(&buf, gpr_strdup("CLIENT_METADATA_READ: "));
- addhdr(&buf, ev);
- gpr_asprintf(&tmp, " %d elements",
- (int)ev->data.client_metadata_read.count);
- gpr_strvec_add(&buf, tmp);
- break;
- case GRPC_FINISHED:
- gpr_strvec_add(&buf, gpr_strdup("FINISHED: "));
- addhdr(&buf, ev);
- gpr_asprintf(&tmp, " status=%d details='%s' %d metadata elements",
- ev->data.finished.status, ev->data.finished.details,
- (int)ev->data.finished.metadata_count);
- gpr_strvec_add(&buf, tmp);
- break;
- case GRPC_SERVER_RPC_NEW:
- gpr_strvec_add(&buf, gpr_strdup("SERVER_RPC_NEW: "));
- addhdr(&buf, ev);
- gpr_asprintf(&tmp, " method='%s' host='%s' %d metadata elements",
- ev->data.server_rpc_new.method, ev->data.server_rpc_new.host,
- (int)ev->data.server_rpc_new.metadata_count);
- gpr_strvec_add(&buf, tmp);
- break;
case GRPC_COMPLETION_DO_NOT_USE:
gpr_strvec_add(&buf, gpr_strdup("DO_NOT_USE (this is a bug)"));
addhdr(&buf, ev);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index a5d86d0071..d6eb9b2c24 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -65,7 +65,7 @@ void grpc_init(void) {
grpc_iomgr_init();
grpc_tracer_init("GRPC_TRACE");
census_init();
- grpc_timers_log_global_init();
+ grpc_timers_global_init();
}
gpr_mu_unlock(&g_init_mu);
}
@@ -75,7 +75,7 @@ void grpc_shutdown(void) {
if (--g_initializations == 0) {
grpc_iomgr_shutdown();
census_shutdown();
- grpc_timers_log_global_destroy();
+ grpc_timers_global_destroy();
}
gpr_mu_unlock(&g_init_mu);
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 83caefcbc6..01644b4471 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -69,7 +69,7 @@ typedef struct {
call_data *prev;
} call_link;
-typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
+typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct {
requested_call_type type;
@@ -165,10 +165,6 @@ typedef enum {
ZOMBIED
} call_state;
-typedef struct legacy_data {
- grpc_metadata_array initial_metadata;
-} legacy_data;
-
struct call_data {
grpc_call *call;
@@ -178,7 +174,6 @@ struct call_data {
gpr_timespec deadline;
int got_initial_metadata;
- legacy_data *legacy;
grpc_completion_queue *cq_new;
grpc_stream_op_buffer *recv_ops;
@@ -557,11 +552,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
grpc_mdstr_unref(calld->path);
}
- if (calld->legacy) {
- gpr_free(calld->legacy->initial_metadata.metadata);
- gpr_free(calld->legacy);
- }
-
server_unref(chand->server);
}
@@ -998,7 +988,6 @@ static grpc_call_error queue_call_request(grpc_server *server,
return GRPC_CALL_OK;
}
switch (rc->type) {
- case LEGACY_CALL:
case BATCH_CALL:
calld =
call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
@@ -1057,16 +1046,6 @@ grpc_call_error grpc_server_request_registered_call(
return queue_call_request(server, &rc);
}
-grpc_call_error grpc_server_request_call_old(grpc_server *server,
- void *tag_new) {
- requested_call rc;
- grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
- rc.type = LEGACY_CALL;
- rc.tag = tag_new;
- return queue_call_request(server, &rc);
-}
-
-static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag);
static void publish_was_not_set(grpc_call *call, grpc_op_error status,
@@ -1098,14 +1077,6 @@ static void begin_call(grpc_server *server, call_data *calld,
an ioreq op, that should complete immediately. */
switch (rc->type) {
- case LEGACY_CALL:
- calld->legacy = gpr_malloc(sizeof(legacy_data));
- memset(calld->legacy, 0, sizeof(legacy_data));
- r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
- r->data.recv_metadata = &calld->legacy->initial_metadata;
- r++;
- publish = publish_legacy;
- break;
case BATCH_CALL:
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
@@ -1144,50 +1115,27 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call(grpc_server *server, requested_call *rc) {
switch (rc->type) {
- case LEGACY_CALL:
- grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing,
- NULL, NULL, NULL, gpr_inf_past, 0, NULL);
- break;
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
- grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL,
- do_nothing, NULL, GRPC_OP_ERROR);
+ grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
+ GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
- grpc_cq_end_op_complete(rc->data.registered.registered_method->cq,
- rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
+ grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL,
+ do_nothing, NULL, GRPC_OP_ERROR);
break;
}
}
-static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
- grpc_call_element *elem =
- grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- grpc_server *server = chand->server;
-
- if (status == GRPC_OP_OK) {
- grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
- grpc_mdstr_as_c_string(calld->path),
- grpc_mdstr_as_c_string(calld->host), calld->deadline,
- calld->legacy->initial_metadata.count,
- calld->legacy->initial_metadata.metadata);
- } else {
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
- }
-}
-
static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
- grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
+ grpc_cq_end_op(calld->cq_new, tag, call, do_nothing, NULL, status);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c
index cf1e66bf8b..cf78ac50cc 100644
--- a/src/core/transport/chttp2/stream_encoder.c
+++ b/src/core/transport/chttp2/stream_encoder.c
@@ -122,6 +122,12 @@ static void begin_frame(framer_state *st, frame_type type) {
st->output_length_at_start_of_frame = st->output->length;
}
+static void begin_new_frame(framer_state *st, frame_type type) {
+ finish_frame(st, 1, 0);
+ st->last_was_header = 0;
+ begin_frame(st, type);
+}
+
/* make sure that the current frame is of the type desired, and has sufficient
space to add at least about_to_add bytes -- finishes the current frame if
needed */
@@ -571,6 +577,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
a metadata element that needs to be unreffed back into the metadata
slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
updated). After this loop, we'll do a batch unref of elements. */
+ begin_new_frame(&st, HEADER);
need_unref |= op->data.metadata.garbage.head != NULL;
grpc_metadata_batch_assert_ok(&op->data.metadata);
for (l = op->data.metadata.list.head; l; l = l->next) {
@@ -580,9 +587,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
deadline_enc(compressor, op->data.metadata.deadline, &st);
}
- ensure_frame_type(&st, HEADER, 0);
- finish_frame(&st, 1, 0);
- st.last_was_header = 0; /* force a new header frame */
curop++;
break;
case GRPC_OP_SLICE:
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index c78d64d2dd..5807c62f53 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -625,17 +625,19 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
if (!server_data) {
lock(t);
s->id = 0;
+ s->outgoing_window = 0;
+ s->incoming_window = 0;
} else {
/* already locked */
s->id = (gpr_uint32)(gpr_uintptr)server_data;
+ s->outgoing_window =
+ t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ s->incoming_window =
+ t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
t->incoming_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
}
- s->outgoing_window =
- t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window =
- t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
s->incoming_deadline = gpr_inf_future;
grpc_sopb_init(&s->writing_sopb);
grpc_sopb_init(&s->callback_sopb);
@@ -1041,6 +1043,10 @@ static void maybe_start_some_streams(transport *t) {
GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id;
t->next_stream_id += 2;
+ s->outgoing_window =
+ t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ s->incoming_window =
+ t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
stream_list_join(t, s, WRITABLE);
}
@@ -1418,7 +1424,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
gpr_log(GPR_ERROR,
"ignoring out of order new stream request on server; last stream "
"id=%d, new stream id=%d",
- t->last_incoming_stream_id, t->incoming_stream);
+ t->last_incoming_stream_id, t->incoming_stream_id);
+ return init_skip_frame(t, 1);
+ } else if ((t->incoming_stream_id & 1) == 0) {
+ gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id);
return init_skip_frame(t, 1);
}
t->incoming_stream = NULL;