diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-05-01 13:21:57 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-05-01 13:21:57 -0700 |
commit | d6c16558b40b723d61039c2bfbf491f90adf3ecb (patch) | |
tree | b30695b00ac0a6c7554d98f2b974e8f7bb8fbbc5 /src/core | |
parent | 9805bb2ce86413105e706da41a1b3e6040264e34 (diff) | |
parent | 5ae895a5d06fad59a89ce6e8923b1145dea663bd (diff) |
Merge github.com:grpc/grpc into one-read
Conflicts:
src/core/iomgr/tcp_posix.c
src/core/profiling/basic_timers.c
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 16 | ||||
-rw-r--r-- | src/core/iomgr/tcp_windows.c | 2 | ||||
-rw-r--r-- | src/core/profiling/basic_timers.c (renamed from src/core/profiling/timers.c) | 42 | ||||
-rw-r--r-- | src/core/profiling/stap_probes.d | 6 | ||||
-rw-r--r-- | src/core/profiling/stap_timers.c | 57 | ||||
-rw-r--r-- | src/core/profiling/timers.h | 90 | ||||
-rw-r--r-- | src/core/security/google_default_credentials.c | 2 | ||||
-rw-r--r-- | src/core/support/cpu_windows.c | 12 | ||||
-rw-r--r-- | src/core/surface/call.c | 333 | ||||
-rw-r--r-- | src/core/surface/channel.c | 7 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 96 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 41 | ||||
-rw-r--r-- | src/core/surface/event_string.c | 45 | ||||
-rw-r--r-- | src/core/surface/init.c | 4 | ||||
-rw-r--r-- | src/core/surface/server.c | 64 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_encoder.c | 10 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 19 |
18 files changed, 216 insertions, 632 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 60d0dad6d8..4d1bcad9e2 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -411,7 +411,7 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); - GRPC_TIMER_MARK(POLL_FINISHED, r); + GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); grpc_fd_end_poll(&fd_watcher); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 3825713aba..ccd7833aa4 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -328,7 +328,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) { size_t final_nslices; GPR_ASSERT(!tcp->finished_edge); - GRPC_TIMER_MARK(HANDLE_READ_BEGIN, 0); + GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, 0); @@ -409,7 +409,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) { grpc_tcp_unref(tcp); } - GRPC_TIMER_MARK(HANDLE_READ_END, 0); + GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); } static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { @@ -458,12 +458,12 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_TIMER_MARK(SENDMSG_BEGIN, 0); + GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0); do { /* TODO(klempner): Cork if this is a partial write */ sent_length = sendmsg(tcp->fd, &msg, 0); } while (sent_length < 0 && errno == EINTR); - GRPC_TIMER_MARK(SENDMSG_END, 0); + GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0); if (sent_length < 0) { if (errno == EAGAIN) { @@ -499,7 +499,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { return; } - GRPC_TIMER_MARK(CB_WRITE_BEGIN, 0); + GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); write_status = grpc_tcp_flush(tcp); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); @@ -515,7 +515,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { cb(tcp->write_user_data, cb_status); grpc_tcp_unref(tcp); } - GRPC_TIMER_MARK(CB_WRITE_END, 0); + GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); } static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, @@ -538,7 +538,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, } } - GRPC_TIMER_MARK(WRITE_BEGIN, 0); + GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0); GPR_ASSERT(tcp->write_cb == NULL); slice_state_init(&tcp->write_state, slices, nslices, nslices); @@ -552,7 +552,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); } - GRPC_TIMER_MARK(WRITE_END, 0); + GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); return status; } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 71534eaa3d..940cd5bcde 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -289,7 +289,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, return ret; } - memset(&socket->write_info, 0, sizeof(OVERLAPPED)); + memset(&socket->write_info.overlapped, 0, sizeof(OVERLAPPED)); status = WSASend(socket->socket, buffers, tcp->write_slices.count, &bytes_sent, 0, &socket->write_info.overlapped, NULL); if (allocated) gpr_free(allocated); diff --git a/src/core/profiling/timers.c b/src/core/profiling/basic_timers.c index bd1700ffd8..b36f4d3902 100644 --- a/src/core/profiling/timers.c +++ b/src/core/profiling/basic_timers.c @@ -31,7 +31,9 @@ * */ -#ifdef GRPC_LATENCY_PROFILER +#include <grpc/support/port_platform.h> + +#ifdef GRPC_BASIC_PROFILER #include "src/core/profiling/timers.h" #include "src/core/profiling/timers_preciseclock.h" @@ -46,7 +48,7 @@ typedef struct grpc_timer_entry { grpc_precise_clock tm; gpr_thd_id thd; - const char* tag; + int tag; void* id; const char* file; int line; @@ -63,7 +65,7 @@ struct grpc_timers_log { grpc_timers_log* grpc_timers_log_global = NULL; -grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump) { +static grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump) { grpc_timers_log* log = gpr_malloc(sizeof(*log)); /* TODO (vpai): Allow allocation below limit */ @@ -87,15 +89,20 @@ static void log_report_locked(grpc_timers_log* log) { grpc_timer_entry* entry = &(log->log[i]); fprintf(fp, "GRPC_LAT_PROF "); grpc_precise_clock_print(&entry->tm, fp); +<<<<<<< HEAD:src/core/profiling/timers.c fprintf(fp, " %p %s %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->tag, entry->id, entry->file, entry->line); +======= + fprintf(fp, " %p %d %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->tag, + entry->id, entry->file, entry->line); +>>>>>>> 5ae895a5d06fad59a89ce6e8923b1145dea663bd:src/core/profiling/basic_timers.c } /* Now clear out the log */ log->num_entries = 0; } -void grpc_timers_log_destroy(grpc_timers_log* log) { +static void grpc_timers_log_destroy(grpc_timers_log* log) { gpr_mu_lock(&log->mu); log_report_locked(log); gpr_mu_unlock(&log->mu); @@ -106,8 +113,8 @@ void grpc_timers_log_destroy(grpc_timers_log* log) { gpr_free(log); } -void grpc_timers_log_add(grpc_timers_log* log, const char* tag, void* id, - const char* file, int line) { +static void grpc_timers_log_add(grpc_timers_log* log, int tag, void* id, + const char* file, int line) { grpc_timer_entry* entry; /* TODO (vpai) : Improve concurrency */ @@ -128,14 +135,25 @@ void grpc_timers_log_add(grpc_timers_log* log, const char* tag, void* id, gpr_mu_unlock(&log->mu); } -void grpc_timers_log_global_init(void) { +/* Latency profiler API implementation. */ +void grpc_timer_add_mark(int tag, void* id, const char* file, int line) { + grpc_timers_log_add(grpc_timers_log_global, tag, id, file, line); +} + +void grpc_timer_begin(int tag, void* id, const char *file, int line) {} +void grpc_timer_end(int tag, void* id, const char *file, int line) {} + +/* Basic profiler specific API functions. */ +void grpc_timers_global_init(void) { grpc_timers_log_global = grpc_timers_log_create(100000, stdout); } -void grpc_timers_log_global_destroy(void) { +void grpc_timers_global_destroy(void) { grpc_timers_log_destroy(grpc_timers_log_global); } -#else /* !GRPC_LATENCY_PROFILER */ -void grpc_timers_log_global_init(void) {} -void grpc_timers_log_global_destroy(void) {} -#endif /* GRPC_LATENCY_PROFILER */ + + +#else /* !GRPC_BASIC_PROFILER */ +void grpc_timers_global_init(void) {} +void grpc_timers_global_destroy(void) {} +#endif /* GRPC_BASIC_PROFILER */ diff --git a/src/core/profiling/stap_probes.d b/src/core/profiling/stap_probes.d new file mode 100644 index 0000000000..374eeedd6c --- /dev/null +++ b/src/core/profiling/stap_probes.d @@ -0,0 +1,6 @@ +provider _stap { + probe add_mark(int tag); + probe timing_ns_begin(int tag); + probe timing_ns_end(int tag); +}; + diff --git a/src/core/profiling/stap_timers.c b/src/core/profiling/stap_timers.c new file mode 100644 index 0000000000..52fb58a279 --- /dev/null +++ b/src/core/profiling/stap_timers.c @@ -0,0 +1,57 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GRPC_STAP_PROFILER + +#include "src/core/profiling/timers.h" + +#include <sys/sdt.h> +/* Generated from src/core/profiling/stap_probes.d */ +#include "src/core/profiling/stap_probes.h" + +/* Latency profiler API implementation. */ +void grpc_timer_add_mark(int tag, void* id, const char *file, int line) { + _STAP_ADD_MARK(tag); +} + +void grpc_timer_begin(int tag, void* id, const char *file, int line) { + _STAP_TIMING_NS_BEGIN(tag); +} + +void grpc_timer_end(int tag, void* id, const char *file, int line) { + _STAP_TIMING_NS_END(tag); +} + +#endif /* GRPC_STAP_PROFILER */ diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h index 1a6b9ffac9..d0b8286c03 100644 --- a/src/core/profiling/timers.h +++ b/src/core/profiling/timers.h @@ -34,35 +34,87 @@ #ifndef GRPC_CORE_PROFILING_TIMERS_H #define GRPC_CORE_PROFILING_TIMERS_H -#include <stdio.h> - #ifdef __cplusplus extern "C" { #endif -#ifdef GRPC_LATENCY_PROFILER +void grpc_timers_global_init(void); +void grpc_timers_global_destroy(void); -typedef struct grpc_timers_log grpc_timers_log; +void grpc_timer_add_mark(int tag, void* id, const char *file, int line); +void grpc_timer_begin(int tag, void* id, const char *file, int line); +void grpc_timer_end(int tag, void* id, const char *file, int line); -grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump); -void grpc_timers_log_add(grpc_timers_log*, const char* tag, void* id, - const char* file, int line); -void grpc_timers_log_destroy(grpc_timers_log *); +enum grpc_profiling_tags { + /* Any GRPC_PTAG_* >= than the threshold won't generate any profiling mark. */ + GRPC_PTAG_IGNORE_THRESHOLD = 1000000, -extern grpc_timers_log *grpc_timers_log_global; + /* Re. Protos. */ + GRPC_PTAG_PROTO_SERIALIZE = 100 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_PROTO_DESERIALIZE = 101 + GRPC_PTAG_IGNORE_THRESHOLD, + + /* Re. sockets. */ + GRPC_PTAG_HANDLE_READ = 200 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_SENDMSG = 201 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_RECVMSG = 202 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_POLL_FINISHED = 203 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_TCP_CB_WRITE = 204 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_TCP_WRITE = 205 + GRPC_PTAG_IGNORE_THRESHOLD, + + /* C++ */ + GRPC_PTAG_CPP_CALL_CREATED = 300 + GRPC_PTAG_IGNORE_THRESHOLD, + GRPC_PTAG_CPP_PERFORM_OPS = 301 + GRPC_PTAG_IGNORE_THRESHOLD, -#define GRPC_TIMER_MARK(x, s) \ - grpc_timers_log_add(grpc_timers_log_global, #x, ((void *)(gpr_intptr)(s)), \ - __FILE__, __LINE__) + /* > 1024 Unassigned reserved. For any miscellaneous use. + * Use addition to generate tags from this base or take advantage of the 10 + * zero'd bits for OR-ing. */ + GRPC_PTAG_OTHER_BASE = 1024 +}; -#else /* !GRPC_LATENCY_PROFILER */ -#define GRPC_TIMER_MARK(x, s) \ - do { \ - } while (0) -#endif /* GRPC_LATENCY_PROFILER */ +#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER)) +/* No profiling. No-op all the things. */ +#define GRPC_TIMER_MARK(tag, id) \ + do {} while(0) + +#define GRPC_TIMER_BEGIN(tag, id) \ + do {} while(0) + +#define GRPC_TIMER_END(tag, id) \ + do {} while(0) + +#else /* at least one profiler requested... */ +/* ... hopefully only one. */ +#if defined(GRPC_STAP_PROFILER) && defined(GRPC_BASIC_PROFILER) +#error "GRPC_STAP_PROFILER and GRPC_BASIC_PROFILER are mutually exclusive." +#endif + +/* Generic profiling interface. */ +#define GRPC_TIMER_MARK(tag, id) \ + if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ + grpc_timer_add_mark(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ + } + +#define GRPC_TIMER_BEGIN(tag, id) \ + if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ + grpc_timer_begin(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ + } + +#define GRPC_TIMER_END(tag, id) \ + if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ + grpc_timer_end(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ + } + +#ifdef GRPC_STAP_PROFILER +/* Empty placeholder for now. */ +#endif /* GRPC_STAP_PROFILER */ + +#ifdef GRPC_BASIC_PROFILER +typedef struct grpc_timers_log grpc_timers_log; + +extern grpc_timers_log *grpc_timers_log_global; +#endif /* GRPC_BASIC_PROFILER */ -void grpc_timers_log_global_init(void); -void grpc_timers_log_global_destroy(void); +#endif /* at least one profiler requested. */ #ifdef __cplusplus } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index d2f46ddd07..0e4b9fc9d3 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -163,7 +163,7 @@ grpc_credentials *grpc_google_default_credentials_create(void) { gpr_mu_lock(&g_mu); if (default_credentials != NULL) { - result = default_credentials; + result = grpc_credentials_ref(default_credentials); serving_cached_credentials = 1; goto end; } diff --git a/src/core/support/cpu_windows.c b/src/core/support/cpu_windows.c index cb454ccd3b..f56bab3f8b 100644 --- a/src/core/support/cpu_windows.c +++ b/src/core/support/cpu_windows.c @@ -34,19 +34,17 @@ #include <grpc/support/port_platform.h> #ifdef GPR_WIN32 - +#include <windows.h> #include <grpc/support/log.h> unsigned gpr_cpu_num_cores(void) { - /* TODO(jtattermusch): implement */ - gpr_log(GPR_ERROR, "Cannot determine number of CPUs: assuming 1"); - return 1; + SYSTEM_INFO si; + GetSystemInfo(&si); + return si.dwNumberOfProcessors; } unsigned gpr_cpu_current_cpu(void) { - /* TODO(jtattermusch): implement */ - gpr_log(GPR_ERROR, "Cannot determine current CPU"); - return 0; + return GetCurrentProcessorNumber(); } #endif /* GPR_WIN32 */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index fc07a19894..070be1b25a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -47,9 +47,6 @@ #include <stdlib.h> #include <string.h> -typedef struct legacy_state legacy_state; -static void destroy_legacy_state(legacy_state *ls); - typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; typedef enum { @@ -226,10 +223,6 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; - - /* Data that the legacy api needs to track. To be deleted at some point - soon */ - legacy_state *legacy_state; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -353,9 +346,6 @@ static void destroy_call(void *call, int ignored_success) { } grpc_sopb_destroy(&c->send_ops); grpc_sopb_destroy(&c->recv_ops); - if (c->legacy_state) { - destroy_legacy_state(c->legacy_state); - } grpc_bbq_destroy(&c->incoming_queue); gpr_slice_buffer_destroy(&c->incoming_message); gpr_free(c); @@ -404,12 +394,6 @@ static void set_status_details(grpc_call *call, status_source source, call->status[source].details = status; } -static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { - if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED; - call->cq = cq; - return GRPC_CALL_OK; -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -729,6 +713,10 @@ static void call_on_done_recv(void *pc, int success) { if (call->recv_state == GRPC_STREAM_CLOSED) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; + if (call->have_alarm) { + grpc_alarm_cancel(&call->alarm); + call->have_alarm = 0; + } } finish_read_ops(call); } else { @@ -1157,7 +1145,7 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) { - grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); + grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); } grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, @@ -1172,7 +1160,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, if (nops == 0) { grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); - grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); + grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); return GRPC_CALL_OK; } @@ -1268,312 +1256,3 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, tag); } - -/* - * LEGACY API IMPLEMENTATION - * All this code will disappear as soon as wrappings are updated - */ - -struct legacy_state { - gpr_uint8 md_out_buffer; - size_t md_out_count[2]; - size_t md_out_capacity[2]; - grpc_metadata *md_out[2]; - grpc_byte_buffer *msg_out; - - /* input buffers */ - grpc_metadata_array initial_md_in; - grpc_metadata_array trailing_md_in; - - size_t details_capacity; - char *details; - grpc_status_code status; - - char *send_details; - - size_t msg_in_read_idx; - grpc_byte_buffer *msg_in; - - void *finished_tag; -}; - -static legacy_state *get_legacy_state(grpc_call *call) { - if (call->legacy_state == NULL) { - call->legacy_state = gpr_malloc(sizeof(legacy_state)); - memset(call->legacy_state, 0, sizeof(legacy_state)); - } - return call->legacy_state; -} - -static void destroy_legacy_state(legacy_state *ls) { - size_t i, j; - for (i = 0; i < 2; i++) { - for (j = 0; j < ls->md_out_count[i]; j++) { - gpr_free((char *)ls->md_out[i][j].key); - gpr_free((char *)ls->md_out[i][j].value); - } - gpr_free(ls->md_out[i]); - } - gpr_free(ls->initial_md_in.metadata); - gpr_free(ls->trailing_md_in.metadata); - gpr_free(ls->details); - gpr_free(ls->send_details); - gpr_free(ls); -} - -grpc_call_error grpc_call_add_metadata_old(grpc_call *call, - grpc_metadata *metadata, - gpr_uint32 flags) { - legacy_state *ls; - grpc_metadata *mdout; - - lock(call); - ls = get_legacy_state(call); - - if (ls->md_out_count[ls->md_out_buffer] == - ls->md_out_capacity[ls->md_out_buffer]) { - ls->md_out_capacity[ls->md_out_buffer] = - GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, - ls->md_out_capacity[ls->md_out_buffer] + 8); - ls->md_out[ls->md_out_buffer] = gpr_realloc( - ls->md_out[ls->md_out_buffer], - sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]); - } - mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++]; - mdout->key = gpr_strdup(metadata->key); - mdout->value = gpr_malloc(metadata->value_length); - mdout->value_length = metadata->value_length; - memcpy((char *)mdout->value, metadata->value, metadata->value_length); - - unlock(call); - - return GRPC_CALL_OK; -} - -static void finish_status(grpc_call *call, grpc_op_error status, - void *ignored) { - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, - ls->status, ls->details, ls->trailing_md_in.metadata, - ls->trailing_md_in.count); - unlock(call); -} - -static void finish_recv_metadata(grpc_call *call, grpc_op_error status, - void *tag) { - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - if (status == GRPC_OP_OK) { - grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, - ls->initial_md_in.count, - ls->initial_md_in.metadata); - - } else { - grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, - NULL); - } - unlock(call); -} - -static void finish_send_metadata(grpc_call *call, grpc_op_error status, - void *tag) {} - -grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, - void *metadata_read_tag, - void *finished_tag, gpr_uint32 flags) { - grpc_ioreq reqs[4]; - legacy_state *ls; - grpc_call_error err; - - grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); - grpc_cq_begin_op(cq, call, GRPC_FINISHED); - - lock(call); - ls = get_legacy_state(call); - err = bind_cq(call, cq); - if (err != GRPC_CALL_OK) goto done; - - ls->finished_tag = finished_tag; - - reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA; - reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - ls->md_out_buffer++; - err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL); - if (err != GRPC_CALL_OK) goto done; - - reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA; - reqs[0].data.recv_metadata = &ls->initial_md_in; - err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag); - if (err != GRPC_CALL_OK) goto done; - - reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; - reqs[0].data.recv_metadata = &ls->trailing_md_in; - reqs[1].op = GRPC_IOREQ_RECV_STATUS; - reqs[1].data.recv_status.user_data = &ls->status; - reqs[1].data.recv_status.set_value = set_status_value_directly; - reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS; - reqs[2].data.recv_status_details.details = &ls->details; - reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity; - reqs[3].op = GRPC_IOREQ_RECV_CLOSE; - err = start_ioreq(call, reqs, 4, finish_status, NULL); - if (err != GRPC_CALL_OK) goto done; - -done: - unlock(call); - return err; -} - -grpc_call_error grpc_call_server_accept_old(grpc_call *call, - grpc_completion_queue *cq, - void *finished_tag) { - grpc_ioreq reqs[2]; - grpc_call_error err; - legacy_state *ls; - - /* inform the completion queue of an incoming operation (corresponding to - finished_tag) */ - grpc_cq_begin_op(cq, call, GRPC_FINISHED); - - lock(call); - ls = get_legacy_state(call); - - err = bind_cq(call, cq); - if (err != GRPC_CALL_OK) { - unlock(call); - return err; - } - - ls->finished_tag = finished_tag; - - reqs[0].op = GRPC_IOREQ_RECV_STATUS; - reqs[0].data.recv_status.user_data = &ls->status; - reqs[0].data.recv_status.set_value = set_status_value_directly; - reqs[1].op = GRPC_IOREQ_RECV_CLOSE; - err = start_ioreq(call, reqs, 2, finish_status, NULL); - unlock(call); - return err; -} - -static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status, - void *tag) {} - -grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call, - gpr_uint32 flags) { - grpc_ioreq req; - grpc_call_error err; - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - req.op = GRPC_IOREQ_SEND_INITIAL_METADATA; - req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL); - unlock(call); - - return err; -} - -static void finish_read_event(void *p, grpc_op_error error) { - if (p) grpc_byte_buffer_destroy(p); -} - -static void finish_read(grpc_call *call, grpc_op_error error, void *tag) { - legacy_state *ls; - grpc_byte_buffer *msg; - lock(call); - ls = get_legacy_state(call); - msg = ls->msg_in; - grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg); - unlock(call); -} - -grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) { - legacy_state *ls; - grpc_ioreq req; - grpc_call_error err; - - grpc_cq_begin_op(call->cq, call, GRPC_READ); - - lock(call); - ls = get_legacy_state(call); - req.op = GRPC_IOREQ_RECV_MESSAGE; - req.data.recv_message = &ls->msg_in; - err = start_ioreq(call, &req, 1, finish_read, tag); - unlock(call); - return err; -} - -static void finish_write(grpc_call *call, grpc_op_error status, void *tag) { - lock(call); - grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out); - unlock(call); - grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status); -} - -grpc_call_error grpc_call_start_write_old(grpc_call *call, - grpc_byte_buffer *byte_buffer, - void *tag, gpr_uint32 flags) { - grpc_ioreq req; - legacy_state *ls; - grpc_call_error err; - - grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); - - lock(call); - ls = get_legacy_state(call); - ls->msg_out = grpc_byte_buffer_copy(byte_buffer); - req.op = GRPC_IOREQ_SEND_MESSAGE; - req.data.send_message = ls->msg_out; - err = start_ioreq(call, &req, 1, finish_write, tag); - unlock(call); - - return err; -} - -static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) { - grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status); -} - -grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) { - grpc_ioreq req; - grpc_call_error err; - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - lock(call); - req.op = GRPC_IOREQ_SEND_CLOSE; - err = start_ioreq(call, &req, 1, finish_finish, tag); - unlock(call); - - return err; -} - -grpc_call_error grpc_call_start_write_status_old(grpc_call *call, - grpc_status_code status, - const char *details, - void *tag) { - grpc_ioreq reqs[3]; - grpc_call_error err; - legacy_state *ls; - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - lock(call); - ls = get_legacy_state(call); - reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA; - reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - reqs[1].op = GRPC_IOREQ_SEND_STATUS; - reqs[1].data.send_status.code = status; - reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details); - reqs[2].op = GRPC_IOREQ_SEND_CLOSE; - err = start_ioreq(call, reqs, 3, finish_finish, tag); - unlock(call); - - return err; -} diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 78f9144c19..be9da2b7f9 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -128,13 +128,6 @@ static grpc_call *grpc_channel_create_call_internal( GPR_ARRAY_SIZE(send_metadata), deadline); } -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, - const char *method, const char *host, - gpr_timespec absolute_deadline) { - return grpc_channel_create_call(channel, NULL, method, host, - absolute_deadline); -} - grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq, const char *method, const char *host, diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index c1c97af337..3e9031807e 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -183,111 +183,17 @@ void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } -void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_byte_buffer *read) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data); - ev->base.data.read = read; - end_op_locked(cc, GRPC_READ); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data); - ev->base.data.write_accepted = error; - end_op_locked(cc, GRPC_WRITE_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, - grpc_call *call, grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); - ev->base.data.write_accepted = error; - end_op_locked(cc, GRPC_OP_COMPLETE); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); - ev->base.data.write_accepted = error; + ev->base.data.op_complete = error; end_op_locked(cc, GRPC_OP_COMPLETE); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } -void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data); - ev->base.data.finish_accepted = error; - end_op_locked(cc, GRPC_FINISH_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, size_t count, - grpc_metadata *elements) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish, - user_data); - ev->base.data.client_metadata_read.count = count; - ev->base.data.client_metadata_read.elements = elements; - end_op_locked(cc, GRPC_CLIENT_METADATA_READ); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_status_code status, const char *details, - grpc_metadata *metadata_elements, - size_t metadata_count) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data); - ev->base.data.finished.status = status; - ev->base.data.finished.details = details; - ev->base.data.finished.metadata_count = metadata_count; - ev->base.data.finished.metadata_elements = metadata_elements; - end_op_locked(cc, GRPC_FINISHED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - -void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - const char *method, const char *host, - gpr_timespec deadline, size_t metadata_count, - grpc_metadata *metadata_elements) { - event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data); - ev->base.data.server_rpc_new.method = method; - ev->base.data.server_rpc_new.host = host; - ev->base.data.server_rpc_new.deadline = deadline; - ev->base.data.server_rpc_new.metadata_count = metadata_count; - ev->base.data.server_rpc_new.metadata_elements = metadata_elements; - end_op_locked(cc, GRPC_SERVER_RPC_NEW); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ static event *create_shutdown_event(void) { event *ev = gpr_malloc(sizeof(event)); diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 41024cda14..a0d7eeaac6 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -62,48 +62,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, Other parameters match the data member of grpc_event */ -/* Queue a GRPC_READ operation */ -void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_byte_buffer *read); -/* Queue a GRPC_INVOKE_ACCEPTED operation */ -void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error); -/* Queue a GRPC_WRITE_ACCEPTED operation */ -void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error); -/* Queue a GRPC_FINISH_ACCEPTED operation */ -void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error); /* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op_complete(grpc_completion_queue *cc, void *tag, - grpc_call *call, grpc_event_finish_func on_finish, - void *user_data, grpc_op_error error); -/* Queue a GRPC_CLIENT_METADATA_READ operation */ -void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, - grpc_call *call, - grpc_event_finish_func on_finish, - void *user_data, size_t count, - grpc_metadata *elements); - -void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_status_code status, const char *details, - grpc_metadata *metadata_elements, - size_t metadata_count); - -void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - const char *method, const char *host, - gpr_timespec deadline, size_t metadata_count, - grpc_metadata *metadata_elements); - void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error); diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 0fa3f166e2..30bdff6b85 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -62,7 +62,6 @@ static void adderr(gpr_strvec *buf, grpc_op_error err) { char *grpc_event_string(grpc_event *ev) { char *out; - char *tmp; gpr_strvec buf; if (ev == NULL) return gpr_strdup("null"); @@ -76,55 +75,11 @@ char *grpc_event_string(grpc_event *ev) { case GRPC_QUEUE_SHUTDOWN: gpr_strvec_add(&buf, gpr_strdup("QUEUE_SHUTDOWN")); break; - case GRPC_READ: - gpr_strvec_add(&buf, gpr_strdup("READ: ")); - addhdr(&buf, ev); - if (ev->data.read) { - gpr_asprintf(&tmp, " %d bytes", - (int)grpc_byte_buffer_length(ev->data.read)); - gpr_strvec_add(&buf, tmp); - } else { - gpr_strvec_add(&buf, gpr_strdup(" end-of-stream")); - } - break; case GRPC_OP_COMPLETE: gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: ")); addhdr(&buf, ev); adderr(&buf, ev->data.op_complete); break; - case GRPC_WRITE_ACCEPTED: - gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: ")); - addhdr(&buf, ev); - adderr(&buf, ev->data.write_accepted); - break; - case GRPC_FINISH_ACCEPTED: - gpr_strvec_add(&buf, gpr_strdup("FINISH_ACCEPTED: ")); - addhdr(&buf, ev); - adderr(&buf, ev->data.write_accepted); - break; - case GRPC_CLIENT_METADATA_READ: - gpr_strvec_add(&buf, gpr_strdup("CLIENT_METADATA_READ: ")); - addhdr(&buf, ev); - gpr_asprintf(&tmp, " %d elements", - (int)ev->data.client_metadata_read.count); - gpr_strvec_add(&buf, tmp); - break; - case GRPC_FINISHED: - gpr_strvec_add(&buf, gpr_strdup("FINISHED: ")); - addhdr(&buf, ev); - gpr_asprintf(&tmp, " status=%d details='%s' %d metadata elements", - ev->data.finished.status, ev->data.finished.details, - (int)ev->data.finished.metadata_count); - gpr_strvec_add(&buf, tmp); - break; - case GRPC_SERVER_RPC_NEW: - gpr_strvec_add(&buf, gpr_strdup("SERVER_RPC_NEW: ")); - addhdr(&buf, ev); - gpr_asprintf(&tmp, " method='%s' host='%s' %d metadata elements", - ev->data.server_rpc_new.method, ev->data.server_rpc_new.host, - (int)ev->data.server_rpc_new.metadata_count); - gpr_strvec_add(&buf, tmp); - break; case GRPC_COMPLETION_DO_NOT_USE: gpr_strvec_add(&buf, gpr_strdup("DO_NOT_USE (this is a bug)")); addhdr(&buf, ev); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index a5d86d0071..d6eb9b2c24 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -65,7 +65,7 @@ void grpc_init(void) { grpc_iomgr_init(); grpc_tracer_init("GRPC_TRACE"); census_init(); - grpc_timers_log_global_init(); + grpc_timers_global_init(); } gpr_mu_unlock(&g_init_mu); } @@ -75,7 +75,7 @@ void grpc_shutdown(void) { if (--g_initializations == 0) { grpc_iomgr_shutdown(); census_shutdown(); - grpc_timers_log_global_destroy(); + grpc_timers_global_destroy(); } gpr_mu_unlock(&g_init_mu); } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 83caefcbc6..01644b4471 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -69,7 +69,7 @@ typedef struct { call_data *prev; } call_link; -typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type; +typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct { requested_call_type type; @@ -165,10 +165,6 @@ typedef enum { ZOMBIED } call_state; -typedef struct legacy_data { - grpc_metadata_array initial_metadata; -} legacy_data; - struct call_data { grpc_call *call; @@ -178,7 +174,6 @@ struct call_data { gpr_timespec deadline; int got_initial_metadata; - legacy_data *legacy; grpc_completion_queue *cq_new; grpc_stream_op_buffer *recv_ops; @@ -557,11 +552,6 @@ static void destroy_call_elem(grpc_call_element *elem) { grpc_mdstr_unref(calld->path); } - if (calld->legacy) { - gpr_free(calld->legacy->initial_metadata.metadata); - gpr_free(calld->legacy); - } - server_unref(chand->server); } @@ -998,7 +988,6 @@ static grpc_call_error queue_call_request(grpc_server *server, return GRPC_CALL_OK; } switch (rc->type) { - case LEGACY_CALL: case BATCH_CALL: calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START); @@ -1057,16 +1046,6 @@ grpc_call_error grpc_server_request_registered_call( return queue_call_request(server, &rc); } -grpc_call_error grpc_server_request_call_old(grpc_server *server, - void *tag_new) { - requested_call rc; - grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW); - rc.type = LEGACY_CALL; - rc.tag = tag_new; - return queue_call_request(server, &rc); -} - -static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag); static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag); static void publish_was_not_set(grpc_call *call, grpc_op_error status, @@ -1098,14 +1077,6 @@ static void begin_call(grpc_server *server, call_data *calld, an ioreq op, that should complete immediately. */ switch (rc->type) { - case LEGACY_CALL: - calld->legacy = gpr_malloc(sizeof(legacy_data)); - memset(calld->legacy, 0, sizeof(legacy_data)); - r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; - r->data.recv_metadata = &calld->legacy->initial_metadata; - r++; - publish = publish_legacy; - break; case BATCH_CALL: cpstr(&rc->data.batch.details->host, &rc->data.batch.details->host_capacity, calld->host); @@ -1144,50 +1115,27 @@ static void begin_call(grpc_server *server, call_data *calld, static void fail_call(grpc_server *server, requested_call *rc) { switch (rc->type) { - case LEGACY_CALL: - grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, - NULL, NULL, NULL, gpr_inf_past, 0, NULL); - break; case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, - do_nothing, NULL, GRPC_OP_ERROR); + grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, + GRPC_OP_ERROR); break; case REGISTERED_CALL: *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, - rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); + grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL, + do_nothing, NULL, GRPC_OP_ERROR); break; } } -static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) { - grpc_call_element *elem = - grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - grpc_server *server = chand->server; - - if (status == GRPC_OP_OK) { - grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL, - grpc_mdstr_as_c_string(calld->path), - grpc_mdstr_as_c_string(calld->host), calld->deadline, - calld->legacy->initial_metadata.count, - calld->legacy->initial_metadata.metadata); - } else { - gpr_log(GPR_ERROR, "should never reach here"); - abort(); - } -} - static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); call_data *calld = elem->call_data; - grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status); + grpc_cq_end_op(calld->cq_new, tag, call, do_nothing, NULL, status); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index cf1e66bf8b..cf78ac50cc 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -122,6 +122,12 @@ static void begin_frame(framer_state *st, frame_type type) { st->output_length_at_start_of_frame = st->output->length; } +static void begin_new_frame(framer_state *st, frame_type type) { + finish_frame(st, 1, 0); + st->last_was_header = 0; + begin_frame(st, type); +} + /* make sure that the current frame is of the type desired, and has sufficient space to add at least about_to_add bytes -- finishes the current frame if needed */ @@ -571,6 +577,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, a metadata element that needs to be unreffed back into the metadata slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got updated). After this loop, we'll do a batch unref of elements. */ + begin_new_frame(&st, HEADER); need_unref |= op->data.metadata.garbage.head != NULL; grpc_metadata_batch_assert_ok(&op->data.metadata); for (l = op->data.metadata.list.head; l; l = l->next) { @@ -580,9 +587,6 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { deadline_enc(compressor, op->data.metadata.deadline, &st); } - ensure_frame_type(&st, HEADER, 0); - finish_frame(&st, 1, 0); - st.last_was_header = 0; /* force a new header frame */ curop++; break; case GRPC_OP_SLICE: diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c78d64d2dd..5807c62f53 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -625,17 +625,19 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, if (!server_data) { lock(t); s->id = 0; + s->outgoing_window = 0; + s->incoming_window = 0; } else { /* already locked */ s->id = (gpr_uint32)(gpr_uintptr)server_data; + s->outgoing_window = + t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->incoming_window = + t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); } - s->outgoing_window = - t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - s->incoming_window = - t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; s->incoming_deadline = gpr_inf_future; grpc_sopb_init(&s->writing_sopb); grpc_sopb_init(&s->callback_sopb); @@ -1041,6 +1043,10 @@ static void maybe_start_some_streams(transport *t) { GPR_ASSERT(s->id == 0); s->id = t->next_stream_id; t->next_stream_id += 2; + s->outgoing_window = + t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + s->incoming_window = + t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); stream_list_join(t, s, WRITABLE); } @@ -1418,7 +1424,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) { gpr_log(GPR_ERROR, "ignoring out of order new stream request on server; last stream " "id=%d, new stream id=%d", - t->last_incoming_stream_id, t->incoming_stream); + t->last_incoming_stream_id, t->incoming_stream_id); + return init_skip_frame(t, 1); + } else if ((t->incoming_stream_id & 1) == 0) { + gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id); return init_skip_frame(t, 1); } t->incoming_stream = NULL; |