diff options
57 files changed, 872 insertions, 246 deletions
@@ -1,5 +1,5 @@ # GRPC Bazel BUILD file. -# This currently builds C and C++ code. +# This currently builds C, C++ and Objective-C code. # This file has been automatically generated from a template file. # Please look at the templates directory instead. # This file can be regenerated from the template by running @@ -2675,6 +2675,21 @@ $(GENDIR)/examples/pubsub/pubsub.grpc.pb.cc: examples/pubsub/pubsub.proto $(PROT endif ifeq ($(NO_PROTOC),true) +$(GENDIR)/test/cpp/qps/perf_db.pb.cc: protoc_dep_error +$(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc: protoc_dep_error +else +$(GENDIR)/test/cpp/qps/perf_db.pb.cc: test/cpp/qps/perf_db.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS) + $(E) "[PROTOC] Generating protobuf CC file from $<" + $(Q) mkdir -p `dirname $@` + $(Q) $(PROTOC) --cpp_out=$(GENDIR) $< + +$(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc: test/cpp/qps/perf_db.proto $(PROTOBUF_DEP) $(PROTOC_PLUGINS) + $(E) "[GRPC] Generating gRPC's protobuf service CC file from $<" + $(Q) mkdir -p `dirname $@` + $(Q) $(PROTOC) --grpc_out=$(GENDIR) --plugin=protoc-gen-grpc=$(BINDIR)/$(CONFIG)/grpc_cpp_plugin $< +endif + +ifeq ($(NO_PROTOC),true) $(GENDIR)/test/cpp/qps/qpstest.pb.cc: protoc_dep_error $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc: protoc_dep_error else @@ -4244,9 +4259,11 @@ $(OBJDIR)/$(CONFIG)/examples/pubsub/subscriber.o: $(GENDIR)/examples/pubsub/labe LIBQPS_SRC = \ $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc \ + $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc \ test/cpp/qps/client_async.cc \ test/cpp/qps/client_sync.cc \ test/cpp/qps/driver.cc \ + test/cpp/qps/perf_db_client.cc \ test/cpp/qps/qps_worker.cc \ test/cpp/qps/report.cc \ test/cpp/qps/server_async.cc \ @@ -4296,15 +4313,16 @@ ifneq ($(NO_DEPS),true) -include $(LIBQPS_OBJS:.o=.dep) endif endif -$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_sync.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/test/cpp/qps/driver.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/test/cpp/qps/qps_worker.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/test/cpp/qps/report.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_sync.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/test/cpp/qps/timer.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc -$(OBJDIR)/$(CONFIG)/test/cpp/util/benchmark_config.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_sync.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/driver.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/perf_db_client.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/qps_worker.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/report.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_sync.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/qps/timer.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/util/benchmark_config.o: $(GENDIR)/test/cpp/qps/qpstest.pb.cc $(GENDIR)/test/cpp/qps/qpstest.grpc.pb.cc $(GENDIR)/test/cpp/qps/perf_db.pb.cc $(GENDIR)/test/cpp/qps/perf_db.grpc.pb.cc LIBGRPC_CSHARP_EXT_SRC = \ diff --git a/build.json b/build.json index c77b7340ba..19078ed726 100644 --- a/build.json +++ b/build.json @@ -768,6 +768,7 @@ "test/cpp/qps/driver.h", "test/cpp/qps/histogram.h", "test/cpp/qps/interarrival.h", + "test/cpp/qps/perf_db_client.h", "test/cpp/qps/qps_worker.h", "test/cpp/qps/report.h", "test/cpp/qps/server.h", @@ -777,9 +778,11 @@ ], "src": [ "test/cpp/qps/qpstest.proto", + "test/cpp/qps/perf_db.proto", "test/cpp/qps/client_async.cc", "test/cpp/qps/client_sync.cc", "test/cpp/qps/driver.cc", + "test/cpp/qps/perf_db_client.cc", "test/cpp/qps/qps_worker.cc", "test/cpp/qps/report.cc", "test/cpp/qps/server_async.cc", diff --git a/gRPC.podspec b/gRPC.podspec index d96e65c1ba..bdcbc3f612 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -524,9 +524,9 @@ Pod::Spec.new do |s| BAD_TIME="$DIR_TIME/time.h" GOOD_TIME="$DIR_TIME/grpc_time.h" grep -rl "$BAD_TIME" grpc src/core src/objective-c/GRPCClient | xargs sed -i '' -e s@$BAD_TIME@$GOOD_TIME@g - if [ -f "include/$BAD_TIME" ]; + if [ -f "$BAD_TIME" ]; then - mv -f "include/$BAD_TIME" "include/$GOOD_TIME" + mv -f "$BAD_TIME" "$GOOD_TIME" fi DIR_STRING="src/core/support" diff --git a/grpc.bzl b/grpc.bzl new file mode 100644 index 0000000000..9f2693126a --- /dev/null +++ b/grpc.bzl @@ -0,0 +1,128 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Bazel macros to declare gRPC libraries automatically generated from proto files. + +This file declares two macros: +- objc_proto_library +- objc_grpc_library +""" + +def _lower_underscore_to_upper_camel(str): + humps = [] + for hump in str.split('_'): + humps += [hump[0].upper() + hump[1:]] + return "".join(humps) + +def _file_to_upper_camel(src): + elements = src.rpartition('/') + upper_camel = _lower_underscore_to_upper_camel(elements[-1]) + return "".join(elements[:-1] + [upper_camel]) + +def _file_with_extension(src, ext): + elements = src.rpartition('/') + basename = elements[-1].partition('.')[0] + return "".join(elements[:-1] + [basename, ext]) + +def _protoc_invocation(srcs, flags): + """Returns a command line to invoke protoc from a genrule, on the given + sources, using the given flags. + """ + protoc_command = "$(location //external:protoc) -I . " + srcs_params = "" + for src in srcs: + srcs_params += " $(location %s)" % (src) + return protoc_command + flags + srcs_params + +def objc_proto_library(name, srcs, visibility=None): + """Declares an objc_library for the code generated by protoc from the given + proto sources. This generated code doesn't include proto services. + """ + h_files = [] + m_files = [] + for src in srcs: + src = _file_to_upper_camel(src) + h_files += [_file_with_extension(src, ".pbobjc.h")] + m_files += [_file_with_extension(src, ".pbobjc.m")] + + protoc_flags = "--objc_out=$(GENDIR)" + + native.genrule( + name = name + "_codegen", + srcs = srcs + ["//external:protoc"], + outs = h_files + m_files, + cmd = _protoc_invocation(srcs, protoc_flags), + ) + native.objc_library( + name = name, + hdrs = h_files, + includes = ["."], + non_arc_srcs = m_files, + deps = ["//external:protobuf_objc"], + visibility = visibility, + ) + +def objc_grpc_library(name, services, other_messages, visibility=None): + """Declares an objc_library for the code generated by gRPC and protoc from the + given proto sources (services and other_messages). The generated code doesn't + include proto services of the files passed as other_messages. + """ + objc_proto_library(name + "_messages", services + other_messages) + + h_files = [] + m_files = [] + for src in services: + src = _file_to_upper_camel(src) + h_files += [_file_with_extension(src, ".pbrpc.h")] + m_files += [_file_with_extension(src, ".pbrpc.m")] + + protoc_flags = ("--grpc_out=$(GENDIR) --plugin=" + + "protoc-gen-grpc=$(location //external:grpc_protoc_plugin_objc)") + + native.genrule( + name = name + "_codegen", + srcs = services + [ + "//external:grpc_protoc_plugin_objc", + "//external:protoc", + ], + outs = h_files + m_files, + cmd = _protoc_invocation(services, protoc_flags), + ) + native.objc_library( + name = name, + hdrs = h_files, + includes = ["."], + srcs = m_files, + deps = [ + ":" + name + "_messages", + "//external:proto_objc_rpc", + ], + visibility = visibility, + ) diff --git a/include/grpc/byte_buffer.h b/include/grpc/byte_buffer.h index a62054ac19..913e2a7697 100644 --- a/include/grpc/byte_buffer.h +++ b/include/grpc/byte_buffer.h @@ -102,6 +102,10 @@ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader); int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, gpr_slice *slice); +/** Returns a RAW byte buffer instance from the output of \a reader. */ +grpc_byte_buffer *grpc_raw_byte_buffer_from_reader( + grpc_byte_buffer_reader *reader); + #ifdef __cplusplus } #endif diff --git a/src/core/client_config/README.md b/src/core/client_config/README.md index 7cb19cd130..d7aed27223 100644 --- a/src/core/client_config/README.md +++ b/src/core/client_config/README.md @@ -42,3 +42,19 @@ Their behavior is specified by a set of grpc channel filters defined at their construction. To customize this behavior, resolvers build grpc_subchannel_factory objects, which use the decorator pattern to customize construction arguments for concrete grpc_subchannel instances. + + +Naming for GRPC +=============== + +Names in GRPC are represented by a URI. + +The following schemes are currently supported: + +dns:///host:port - dns schemes are currently supported so long as authority is + empty (authority based dns resolution is expected in a future + release) + +unix:path - the unix scheme is used to create and connect to unix domain + sockets - the authority must be empty, and the path represents + the absolute or relative path to the desired socket diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 3d57e3136a..73da624aff 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -155,8 +155,6 @@ loop: switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: p->selected = p->subchannels[p->checking_subchannel]; - GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) == - GRPC_CHANNEL_READY); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; @@ -185,6 +183,7 @@ loop: GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); if (p->num_subchannels == 0) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -197,7 +196,6 @@ loop: p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); add_interested_parties_locked(p); goto loop; } diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h index e5262e2199..c067a0b8a3 100644 --- a/src/core/iomgr/alarm.h +++ b/src/core/iomgr/alarm.h @@ -41,9 +41,9 @@ typedef struct grpc_alarm { gpr_timespec deadline; gpr_uint32 heap_index; /* INVALID_HEAP_INDEX if not in heap */ + int triggered; struct grpc_alarm *next; struct grpc_alarm *prev; - int triggered; grpc_iomgr_cb_func cb; void *cb_arg; } grpc_alarm; diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index e8c24c772a..6ad377ce1c 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -369,16 +369,17 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, watcher->fd = NULL; watcher->pollset = NULL; gpr_mu_unlock(&fd->watcher_mu); + GRPC_FD_UNREF(fd, "poll"); return 0; } /* if there is nobody polling for read, but we need to, then start doing so */ - if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { + if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { fd->read_watcher = watcher; mask |= read_mask; } /* if there is nobody polling for write, but we need to, then start doing so */ - if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { + if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { fd->write_watcher = watcher; mask |= write_mask; } diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index dcf08d379c..1900bbf9e1 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -54,17 +54,25 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; - - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - ev.data.ptr = fd; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); - if (err < 0) { - /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ - if (errno != EEXIST) { - gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, - strerror(errno)); + grpc_fd_watcher watcher; + + /* We pretend to be polling whilst adding an fd to keep the fd from being + closed during the add. This may result in a spurious wakeup being assigned + to this pollset whilst adding, but that should be benign. */ + GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0); + if (watcher.fd != NULL) { + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.data.ptr = fd; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (err < 0) { + /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ + if (errno != EEXIST) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, + strerror(errno)); + } } } + grpc_fd_end_poll(&watcher, 0, 0); } static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 15ed8e75e6..12496440de 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -249,7 +249,8 @@ static void basic_do_promote(void *args, int success) { pollset->in_flight_cbs--; if (pollset->shutting_down) { /* We don't care about this pollset anymore. */ - if (pollset->in_flight_cbs == 0 && pollset->counter == 0) { + if (pollset->in_flight_cbs == 0 && pollset->counter == 0 && !pollset->called_shutdown) { + pollset->called_shutdown = 1; do_shutdown_cb = 1; } } else if (grpc_fd_is_orphaned(fd)) { diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 6a99324da6..8a7ada07af 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -99,9 +99,10 @@ static void on_secure_transport_setup_done(void *statep, if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); transport = grpc_create_chttp2_transport( - grpc_server_get_channel_args(state->server), secure_endpoint, NULL, 0, - mdctx, 0); + grpc_server_get_channel_args(state->server), secure_endpoint, mdctx, + 0); setup_transport(state, transport, mdctx); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } else { /* We need to consume this here, because the server may already have gone * away. */ diff --git a/src/core/support/log_linux.c b/src/core/support/log_linux.c index 48349d2c83..7937466b79 100644 --- a/src/core/support/log_linux.c +++ b/src/core/support/log_linux.c @@ -43,7 +43,9 @@ #ifdef GPR_LINUX +#include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <stdio.h> #include <stdarg.h> @@ -71,6 +73,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, void gpr_default_log(gpr_log_func_args *args) { char *final_slash; + char *prefix; const char *display_file; char time_buffer[64]; gpr_timespec now = gpr_now(); @@ -89,10 +92,12 @@ void gpr_default_log(gpr_log_func_args *args) { strcpy(time_buffer, "error:strftime"); } - fprintf(stderr, "%s%s.%09d %7ld %s:%d] %s\n", + gpr_asprintf(&prefix, "%s%s.%09d %7tu %s:%d]", gpr_log_severity_string(args->severity), time_buffer, - (int)(now.tv_nsec), gettid(), display_file, args->line, - args->message); + (int)(now.tv_nsec), gettid(), display_file, args->line); + + fprintf(stderr, "%-60s %s\n", prefix, args->message); + gpr_free(prefix); } #endif diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c index 4817e00454..a930949f2d 100644 --- a/src/core/surface/byte_buffer.c +++ b/src/core/surface/byte_buffer.c @@ -55,6 +55,20 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create( return bb; } +grpc_byte_buffer *grpc_raw_byte_buffer_from_reader( + grpc_byte_buffer_reader *reader) { + grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer)); + gpr_slice slice; + bb->type = GRPC_BB_RAW; + bb->data.raw.compression = GRPC_COMPRESS_NONE; + gpr_slice_buffer_init(&bb->data.raw.slice_buffer); + + while (grpc_byte_buffer_reader_next(reader, &slice)) { + gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slice); + } + return bb; +} + grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { switch (bb->type) { case GRPC_BB_RAW: diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ae1b215767..fc09137b67 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -76,14 +76,14 @@ typedef struct { typedef struct { /* Overall status of the operation: starts OK, may degrade to non-OK */ - int success; - /* Completion function to call at the end of the operation */ - grpc_ioreq_completion_func on_complete; - void *user_data; + gpr_uint8 success; /* a bit mask of which request ops are needed (1u << opid) */ gpr_uint16 need_mask; /* a bit mask of which request ops are now completed */ gpr_uint16 complete_mask; + /* Completion function to call at the end of the operation */ + grpc_ioreq_completion_func on_complete; + void *user_data; } reqinfo_master; /* Status data for a request can come from several sources; this diff --git a/src/core/surface/call.h b/src/core/surface/call.h index fb3662b50d..3b6f9c942e 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -78,8 +78,8 @@ typedef union { typedef struct { grpc_ioreq_op op; - grpc_ioreq_data data; gpr_uint32 flags; /**< A copy of the write flags from grpc_op */ + grpc_ioreq_data data; } grpc_ioreq; typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 09b4fb782b..e205f0a9f8 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -72,7 +72,8 @@ static void connected(void *arg, grpc_endpoint *tcp) { grpc_iomgr_closure *notify; if (tcp != NULL) { c->result->transport = grpc_create_chttp2_transport( - c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1); + c->args.channel_args, tcp, c->args.metadata_context, 1); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); GPR_ASSERT(c->result->transport); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); c->result->filters[0] = &grpc_http_client_filter; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 76fc862621..34ee3f8400 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -82,9 +82,9 @@ static void on_secure_transport_setup_done(void *arg, gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); memset(c->result, 0, sizeof(*c->result)); } else { - c->result->transport = - grpc_create_chttp2_transport(c->args.channel_args, secure_endpoint, - NULL, 0, c->args.metadata_context, 1); + c->result->transport = grpc_create_chttp2_transport( + c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); c->result->filters[0] = &grpc_client_auth_filter; c->result->filters[1] = &grpc_http_client_filter; diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 9c02c3ef29..78c53466b3 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -61,8 +61,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) { */ grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_transport *transport = grpc_create_chttp2_transport( - grpc_server_get_channel_args(server), tcp, NULL, 0, mdctx, 0); + grpc_server_get_channel_args(server), tcp, mdctx, 0); setup_transport(server, transport, mdctx); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } /* Server callback: start listening on our ports */ diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c index a4b7174329..68e0912b9c 100644 --- a/src/core/transport/chttp2/incoming_metadata.c +++ b/src/core/transport/chttp2/incoming_metadata.c @@ -124,6 +124,7 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb( sopb->ops[i].data.metadata.list.tail = (void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail); } + src->count = 0; } void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op( diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 8f682e9017..4664a0895c 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -109,9 +109,6 @@ void grpc_chttp2_publish_reads( transport_parsing->incoming_stream_id; } - /* TODO(ctiller): re-implement */ - GPR_ASSERT(transport_parsing->initial_window_update == 0); - /* copy parsing qbuf to global qbuf */ gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf); diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index fdcc300099..a78654334e 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -97,12 +97,8 @@ int grpc_chttp2_unlocking_check_writes( grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } - /* we should either exhaust window or have no ops left, but not both */ - if (stream_global->outgoing_sopb->nops == 0) { - stream_global->outgoing_sopb = NULL; - grpc_chttp2_schedule_closure(transport_global, - stream_global->send_done_closure, 1); - } else if (stream_global->outgoing_window > 0) { + if (stream_global->outgoing_window > 0 && + stream_global->outgoing_sopb->nops != 0) { grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } @@ -201,6 +197,11 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { + if (stream_global->outgoing_sopb->nops == 0) { + stream_global->outgoing_sopb = NULL; + grpc_chttp2_schedule_closure(transport_global, + stream_global->send_done_closure, 1); + } if (stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE; if (!transport_global->is_client) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 0a7b8f5bf9..3483512ab8 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -201,8 +201,8 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_chttp2_transport *t, const grpc_channel_args *channel_args, - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_mdctx *mdctx, int is_client) { + grpc_endpoint *ep, grpc_mdctx *mdctx, + int is_client) { size_t i; int j; @@ -311,9 +311,6 @@ static void init_transport(grpc_chttp2_transport *t, } } } - - REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ - recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); } static void destroy_transport(grpc_transport *gt) { @@ -687,7 +684,7 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_goaway_append( t->global.last_incoming_stream_id, grpc_chttp2_grpc_status_to_http2_error(op->goaway_status), - *op->goaway_message, &t->global.qbuf); + gpr_slice_ref(*op->goaway_message), &t->global.qbuf); } if (op->set_accept_stream != NULL) { @@ -933,6 +930,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, if (t->parsing.initial_window_update != 0) { grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, update_global_window, t); + t->parsing.initial_window_update = 0; } /* handle higher level things */ grpc_chttp2_publish_reads(&t->global, &t->parsing); @@ -1042,9 +1040,16 @@ static const grpc_transport_vtable vtable = { perform_transport_op, destroy_stream, destroy_transport}; grpc_transport *grpc_create_chttp2_transport( - const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, - size_t nslices, grpc_mdctx *mdctx, int is_client) { + const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, + int is_client) { grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); - init_transport(t, channel_args, ep, slices, nslices, mdctx, is_client); + init_transport(t, channel_args, ep, mdctx, is_client); return &t->base; } + +void grpc_chttp2_transport_start_reading(grpc_transport *transport, + gpr_slice *slices, size_t nslices) { + grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport; + REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */ + recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); +} diff --git a/src/core/transport/chttp2_transport.h b/src/core/transport/chttp2_transport.h index 1747792b95..fa0d6e4151 100644 --- a/src/core/transport/chttp2_transport.h +++ b/src/core/transport/chttp2_transport.h @@ -41,7 +41,10 @@ extern int grpc_http_trace; extern int grpc_flowctl_trace; grpc_transport *grpc_create_chttp2_transport( - const grpc_channel_args *channel_args, grpc_endpoint *ep, gpr_slice *slices, - size_t nslices, grpc_mdctx *metadata_context, int is_client); + const grpc_channel_args *channel_args, grpc_endpoint *ep, + grpc_mdctx *metadata_context, int is_client); + +void grpc_chttp2_transport_start_reading(grpc_transport *transport, + gpr_slice *slices, size_t nslices); #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_TRANSPORT_H */ diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 842fc932b9..964d39d14f 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -41,7 +41,7 @@ #include "src/core/transport/metadata.h" /* this many stream ops are inlined into a sopb before allocating */ -#define GRPC_SOPB_INLINE_ELEMENTS 16 +#define GRPC_SOPB_INLINE_ELEMENTS 4 /* Operations that can be performed on a stream. Used by grpc_stream_op. */ diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h index 33aae10747..cba53fa2f6 100644 --- a/src/objective-c/GRPCClient/GRPCCall.h +++ b/src/objective-c/GRPCClient/GRPCCall.h @@ -48,8 +48,6 @@ #import <Foundation/Foundation.h> #import <RxLibrary/GRXWriter.h> -@class GRPCMethodName; - // Key used in |NSError|'s |userInfo| dictionary to store the response metadata sent by the server. extern id const kGRPCStatusMetadataKey; @@ -90,7 +88,7 @@ extern id const kGRPCStatusMetadataKey; // the specific remote method called). // To finish a call right away, invoke cancel. - (instancetype)initWithHost:(NSString *)host - method:(GRPCMethodName *)method + path:(NSString *)path requestsWriter:(id<GRXWriter>)requestsWriter NS_DESIGNATED_INITIALIZER; // Finishes the request side of this call, notifies the server that the RPC diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 77eebeff76..4ac4e4d37f 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -36,11 +36,9 @@ #include <grpc/grpc.h> #include <grpc/support/time.h> -#import "GRPCMethodName.h" #import "private/GRPCChannel.h" #import "private/GRPCCompletionQueue.h" #import "private/GRPCDelegateWrapper.h" -#import "private/GRPCMethodName+HTTP2Encoding.h" #import "private/GRPCWrappedCall.h" #import "private/NSData+GRPC.h" #import "private/NSDictionary+GRPC.h" @@ -90,14 +88,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; @synthesize state = _state; - (instancetype)init { - return [self initWithHost:nil method:nil requestsWriter:nil]; + return [self initWithHost:nil path:nil requestsWriter:nil]; } // Designated initializer - (instancetype)initWithHost:(NSString *)host - method:(GRPCMethodName *)method + path:(NSString *)path requestsWriter:(id<GRXWriter>)requestWriter { - if (!host || !method) { + if (!host || !path) { [NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."]; } if (requestWriter.state != GRXWriterStateNotStarted) { @@ -114,7 +112,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; _channel = [GRPCChannel channelToHost:host]; _wrappedCall = [[GRPCWrappedCall alloc] initWithChannel:_channel - method:method.HTTP2Path + path:path host:host]; // Serial queue to invoke the non-reentrant methods of the grpc_call object. diff --git a/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h deleted file mode 100644 index 81c80f2a49..0000000000 --- a/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#import <Foundation/Foundation.h> - -#import "GRPCClient/GRPCMethodName.h" - -@interface GRPCMethodName (HTTP2Encoding) -- (NSString *)HTTP2Path; -@end diff --git a/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m deleted file mode 100644 index 3ad757fb29..0000000000 --- a/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#import "GRPCMethodName+HTTP2Encoding.h" - -@implementation GRPCMethodName (HTTP2Encoding) -- (NSString *)HTTP2Path { - if (self.package) { - return [NSString stringWithFormat:@"/%@.%@/%@", self.package, self.interface, self.method]; - } else { - return [NSString stringWithFormat:@"/%@/%@", self.interface, self.method]; - } -} -@end diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h index c08aefc6a8..18f8bb5531 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h @@ -84,7 +84,7 @@ @interface GRPCWrappedCall : NSObject - (instancetype)initWithChannel:(GRPCChannel *)channel - method:(NSString *)method + path:(NSString *)path host:(NSString *)host NS_DESIGNATED_INITIALIZER; - (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void(^)())errorHandler; diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index d94b25091e..45f10f5d63 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -225,13 +225,13 @@ } - (instancetype)init { - return [self initWithChannel:nil method:nil host:nil]; + return [self initWithChannel:nil path:nil host:nil]; } - (instancetype)initWithChannel:(GRPCChannel *)channel - method:(NSString *)method + path:(NSString *)path host:(NSString *)host { - if (!channel || !method || !host) { + if (!channel || !path || !host) { [NSException raise:NSInvalidArgumentException format:@"channel, method, and host cannot be nil."]; } @@ -247,7 +247,7 @@ return nil; } _call = grpc_channel_create_call(channel.unmanagedChannel, _queue.unmanagedQueue, - method.UTF8String, host.UTF8String, gpr_inf_future); + path.UTF8String, host.UTF8String, gpr_inf_future); if (_call == NULL) { return nil; } diff --git a/src/objective-c/GRPCClient/GRPCMethodName.h b/src/objective-c/ProtoRPC/ProtoMethod.h index fe153dd478..8f554a0483 100644 --- a/src/objective-c/GRPCClient/GRPCMethodName.h +++ b/src/objective-c/ProtoRPC/ProtoMethod.h @@ -33,17 +33,16 @@ #import <Foundation/Foundation.h> -// See the README file for an introduction to this library. - -// A fully-qualified gRPC method name. Full qualification is needed because a gRPC endpoint can -// implement multiple interfaces. -// TODO(jcanizales): Move to ProtoRPC package. -// TODO(jcanizales): Rename interface -> service. -@interface GRPCMethodName : NSObject +// A fully-qualified proto service method name. Full qualification is needed because a gRPC endpoint +// can implement multiple services. +@interface ProtoMethod : NSObject @property(nonatomic, readonly) NSString *package; -@property(nonatomic, readonly) NSString *interface; +@property(nonatomic, readonly) NSString *service; @property(nonatomic, readonly) NSString *method; + +@property(nonatomic, readonly) NSString *HTTPPath; + - (instancetype)initWithPackage:(NSString *)package - interface:(NSString *)interface + service:(NSString *)service method:(NSString *)method; @end diff --git a/src/objective-c/GRPCClient/GRPCMethodName.m b/src/objective-c/ProtoRPC/ProtoMethod.m index 96724073a5..1113b4fbaa 100644 --- a/src/objective-c/GRPCClient/GRPCMethodName.m +++ b/src/objective-c/ProtoRPC/ProtoMethod.m @@ -31,17 +31,25 @@ * */ -#import "GRPCMethodName.h" +#import "ProtoMethod.h" -@implementation GRPCMethodName +@implementation ProtoMethod - (instancetype)initWithPackage:(NSString *)package - interface:(NSString *)interface + service:(NSString *)service method:(NSString *)method { if ((self = [super init])) { _package = [package copy]; - _interface = [interface copy]; + _service = [service copy]; _method = [method copy]; } return self; } + +- (NSString *)HTTPPath { + if (_package) { + return [NSString stringWithFormat:@"/%@.%@/%@", _package, _service, _method]; + } else { + return [NSString stringWithFormat:@"/%@/%@", _service, _method]; + } +} @end diff --git a/src/objective-c/ProtoRPC/ProtoRPC.h b/src/objective-c/ProtoRPC/ProtoRPC.h index a383310619..fcc0a507fe 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.h +++ b/src/objective-c/ProtoRPC/ProtoRPC.h @@ -34,10 +34,12 @@ #import <Foundation/Foundation.h> #import <GRPCClient/GRPCCall.h> +#import "ProtoMethod.h" + @interface ProtoRPC : GRPCCall - (instancetype)initWithHost:(NSString *)host - method:(GRPCMethodName *)method + method:(ProtoMethod *)method requestsWriter:(id<GRXWriter>)requestsWriter responseClass:(Class)responseClass responsesWriteable:(id<GRXWriteable>)responsesWriteable NS_DESIGNATED_INITIALIZER; diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m index 4da646d7b4..fe3ccf0541 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.m +++ b/src/objective-c/ProtoRPC/ProtoRPC.m @@ -42,19 +42,20 @@ id<GRXWriteable> _responseWriteable; } +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wobjc-designated-initializers" - (instancetype)initWithHost:(NSString *)host - method:(GRPCMethodName *)method + path:(NSString *)path requestsWriter:(id<GRXWriter>)requestsWriter { - return [self initWithHost:host - method:method - requestsWriter:requestsWriter - responseClass:nil - responsesWriteable:nil]; + [NSException raise:NSInvalidArgumentException + format:@"Please use ProtoRPC's designated initializer instead."]; + return nil; } +#pragma clang diagnostic pop // Designated initializer - (instancetype)initWithHost:(NSString *)host - method:(GRPCMethodName *)method + method:(ProtoMethod *)method requestsWriter:(id<GRXWriter>)requestsWriter responseClass:(Class)responseClass responsesWriteable:(id<GRXWriteable>)responsesWriteable { @@ -70,7 +71,7 @@ // sending GPBMessages. return [proto data]; }]; - if ((self = [super initWithHost:host method:method requestsWriter:bytesWriter])) { + if ((self = [super initWithHost:host path:method.HTTPPath requestsWriter:bytesWriter])) { // A writeable that parses the proto messages received. _responseWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { [responsesWriteable writeValue:[responseClass parseFromData:value error:NULL]]; diff --git a/src/objective-c/ProtoRPC/ProtoService.m b/src/objective-c/ProtoRPC/ProtoService.m index 47bdb5dc6e..d7c5b6a850 100644 --- a/src/objective-c/ProtoRPC/ProtoService.m +++ b/src/objective-c/ProtoRPC/ProtoService.m @@ -33,10 +33,10 @@ #import "ProtoService.h" -#import <GRPCClient/GRPCMethodName.h> #import <RxLibrary/GRXWriteable.h> #import <RxLibrary/GRXWriter.h> +#import "ProtoMethod.h" #import "ProtoRPC.h" @implementation ProtoService { @@ -69,9 +69,9 @@ requestsWriter:(id<GRXWriter>)requestsWriter responseClass:(Class)responseClass responsesWriteable:(id<GRXWriteable>)responsesWriteable { - GRPCMethodName *methodName = [[GRPCMethodName alloc] initWithPackage:_packageName - interface:_serviceName - method:method]; + ProtoMethod *methodName = [[ProtoMethod alloc] initWithPackage:_packageName + service:_serviceName + method:method]; return [[ProtoRPC alloc] initWithHost:_host method:methodName requestsWriter:requestsWriter diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index e421127ea1..f9c2d5d8d6 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -35,7 +35,7 @@ #import <XCTest/XCTest.h> #import <GRPCClient/GRPCCall.h> -#import <GRPCClient/GRPCMethodName.h> +#import <ProtoRPC/ProtoMethod.h> #import <RemoteTest/Messages.pbobjc.h> #import <RxLibrary/GRXWriteable.h> #import <RxLibrary/GRXWriter+Immediate.h> @@ -47,9 +47,9 @@ static NSString * const kHostAddress = @"grpc-test.sandbox.google.com"; static NSString * const kPackage = @"grpc.testing"; static NSString * const kService = @"TestService"; -static GRPCMethodName *kInexistentMethod; -static GRPCMethodName *kEmptyCallMethod; -static GRPCMethodName *kUnaryCallMethod; +static ProtoMethod *kInexistentMethod; +static ProtoMethod *kEmptyCallMethod; +static ProtoMethod *kUnaryCallMethod; @interface GRPCClientTests : XCTestCase @end @@ -58,22 +58,22 @@ static GRPCMethodName *kUnaryCallMethod; - (void)setUp { // This method isn't implemented by the remote server. - kInexistentMethod = [[GRPCMethodName alloc] initWithPackage:kPackage - interface:kService - method:@"Inexistent"]; - kEmptyCallMethod = [[GRPCMethodName alloc] initWithPackage:kPackage - interface:kService - method:@"EmptyCall"]; - kUnaryCallMethod = [[GRPCMethodName alloc] initWithPackage:kPackage - interface:kService - method:@"UnaryCall"]; + kInexistentMethod = [[ProtoMethod alloc] initWithPackage:kPackage + service:kService + method:@"Inexistent"]; + kEmptyCallMethod = [[ProtoMethod alloc] initWithPackage:kPackage + service:kService + method:@"EmptyCall"]; + kUnaryCallMethod = [[ProtoMethod alloc] initWithPackage:kPackage + service:kService + method:@"UnaryCall"]; } - (void)testConnectionToRemoteServer { __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Server reachable."]; GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress - method:kInexistentMethod + path:kInexistentMethod.HTTPPath requestsWriter:[GRXWriter writerWithValue:[NSData data]]]; id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { @@ -95,7 +95,7 @@ static GRPCMethodName *kUnaryCallMethod; __weak XCTestExpectation *completion = [self expectationWithDescription:@"Empty RPC completed."]; GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress - method:kEmptyCallMethod + path:kEmptyCallMethod.HTTPPath requestsWriter:[GRXWriter writerWithValue:[NSData data]]]; id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { @@ -123,7 +123,7 @@ static GRPCMethodName *kUnaryCallMethod; id<GRXWriter> requestsWriter = [GRXWriter writerWithValue:[request data]]; GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress - method:kUnaryCallMethod + path:kUnaryCallMethod.HTTPPath requestsWriter:requestsWriter]; id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { @@ -153,7 +153,7 @@ static GRPCMethodName *kUnaryCallMethod; id<GRXWriter> requestsWriter = [GRXWriter writerWithValue:[request data]]; GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress - method:kUnaryCallMethod + path:kUnaryCallMethod.HTTPPath requestsWriter:requestsWriter]; call.requestMetadata[@"Authorization"] = @"Bearer bogusToken"; diff --git a/src/objective-c/tests/LocalClearTextTests.m b/src/objective-c/tests/LocalClearTextTests.m index 05cc10410a..10c9f13ea3 100644 --- a/src/objective-c/tests/LocalClearTextTests.m +++ b/src/objective-c/tests/LocalClearTextTests.m @@ -35,7 +35,7 @@ #import <XCTest/XCTest.h> #import <GRPCClient/GRPCCall.h> -#import <GRPCClient/GRPCMethodName.h> +#import <ProtoRPC/ProtoMethod.h> #import <RouteGuide/RouteGuide.pbobjc.h> #import <RouteGuide/RouteGuide.pbrpc.h> #import <RxLibrary/GRXWriteable.h> @@ -87,14 +87,14 @@ static NSString * const kService = @"RouteGuide"; __weak XCTestExpectation *response = [self expectationWithDescription:@"Empty response received."]; __weak XCTestExpectation *completion = [self expectationWithDescription:@"Empty RPC completed."]; - GRPCMethodName *method = [[GRPCMethodName alloc] initWithPackage:kPackage - interface:kService - method:@"RecordRoute"]; + ProtoMethod *method = [[ProtoMethod alloc] initWithPackage:kPackage + service:kService + method:@"RecordRoute"]; id<GRXWriter> requestsWriter = [GRXWriter emptyWriter]; GRPCCall *call = [[GRPCCall alloc] initWithHost:kRouteGuideHost - method:method + path:method.HTTPPath requestsWriter:requestsWriter]; id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { @@ -115,9 +115,9 @@ static NSString * const kService = @"RouteGuide"; __weak XCTestExpectation *response = [self expectationWithDescription:@"Response received."]; __weak XCTestExpectation *completion = [self expectationWithDescription:@"RPC completed."]; - GRPCMethodName *method = [[GRPCMethodName alloc] initWithPackage:kPackage - interface:kService - method:@"GetFeature"]; + ProtoMethod *method = [[ProtoMethod alloc] initWithPackage:kPackage + service:kService + method:@"GetFeature"]; RGDPoint *point = [RGDPoint message]; point.latitude = 28E7; @@ -125,7 +125,7 @@ static NSString * const kService = @"RouteGuide"; id<GRXWriter> requestsWriter = [GRXWriter writerWithValue:[point data]]; GRPCCall *call = [[GRPCCall alloc] initWithHost:kRouteGuideHost - method:method + path:method.HTTPPath requestsWriter:requestsWriter]; id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index 0ff8bb9aa7..6dd0234489 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -89,7 +89,7 @@ $CFLAGS << ' -Wno-return-type ' $CFLAGS << ' -Wall ' $CFLAGS << ' -pedantic ' -$LDFLAGS << ' -lgrpc -lgpr -ldl' +$LDFLAGS << ' -lgrpc -lgpr -lz -ldl' crash('need grpc lib') unless have_library('grpc', 'grpc_channel_destroy') have_library('grpc', 'grpc_channel_destroy') diff --git a/templates/BUILD.template b/templates/BUILD.template index dffdc1dddd..4e9d8c376a 100644 --- a/templates/BUILD.template +++ b/templates/BUILD.template @@ -1,5 +1,5 @@ # GRPC Bazel BUILD file. -# This currently builds C and C++ code. +# This currently builds C, C++ and Objective-C code. # This file has been automatically generated from a template file. # Please look at the templates directory instead. # This file can be regenerated from the template by running diff --git a/templates/gRPC.podspec.template b/templates/gRPC.podspec.template index deea07cee3..495ea49c9c 100644 --- a/templates/gRPC.podspec.template +++ b/templates/gRPC.podspec.template @@ -111,9 +111,9 @@ Pod::Spec.new do |s| BAD_TIME="$DIR_TIME/time.h" GOOD_TIME="$DIR_TIME/grpc_time.h" grep -rl "$BAD_TIME" grpc src/core src/objective-c/GRPCClient | xargs sed -i '' -e s@$BAD_TIME@$GOOD_TIME@g - if [ -f "include/$BAD_TIME" ]; + if [ -f "$BAD_TIME" ]; then - mv -f "include/$BAD_TIME" "include/$GOOD_TIME" + mv -f "$BAD_TIME" "$GOOD_TIME" fi DIR_STRING="src/core/support" diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 9b6a246075..8ce666dcde 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -108,8 +108,9 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, a.validator = validator; grpc_server_register_completion_queue(a.server, a.cq); grpc_server_start(a.server); - transport = grpc_create_chttp2_transport(NULL, sfd.server, NULL, 0, mdctx, 0); + transport = grpc_create_chttp2_transport(NULL, sfd.server, mdctx, 0); server_setup_transport(&a, transport, mdctx); + grpc_chttp2_transport_start_reading(transport, NULL, 0); /* Bind everything into the same pollset */ grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq)); diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index f42b9831c8..be523608d0 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -108,10 +108,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = - grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, @@ -123,9 +123,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq); grpc_server_start(f->server); - transport = - grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); server_setup_transport(f, transport, mdctx); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) { diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index be520380a7..f875ca54a5 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -108,10 +108,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = - grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, @@ -123,9 +123,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq); grpc_server_start(f->server); - transport = - grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); server_setup_transport(f, transport, mdctx); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) { diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c index 037281c5ad..52c0e2ca8b 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c @@ -109,10 +109,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = - grpc_create_chttp2_transport(client_args, sfd->client, NULL, 0, mdctx, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, @@ -124,9 +124,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq); grpc_server_start(f->server); - transport = - grpc_create_chttp2_transport(server_args, sfd->server, NULL, 0, mdctx, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); server_setup_transport(f, transport, mdctx); + grpc_chttp2_transport_start_reading(transport, NULL, 0); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) { diff --git a/test/core/surface/byte_buffer_reader_test.c b/test/core/surface/byte_buffer_reader_test.c index 7c2cb9484a..d9c60e4212 100644 --- a/test/core/surface/byte_buffer_reader_test.c +++ b/test/core/surface/byte_buffer_reader_test.c @@ -160,6 +160,30 @@ static void test_read_deflate_compressed_slice(void) { read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE); } +static void test_byte_buffer_from_reader(void) { + gpr_slice slice; + grpc_byte_buffer *buffer, *buffer_from_reader; + grpc_byte_buffer_reader reader; + + LOG_TEST("test_byte_buffer_from_reader"); + slice = gpr_slice_malloc(4); + memcpy(GPR_SLICE_START_PTR(slice), "test", 4); + buffer = grpc_raw_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + grpc_byte_buffer_reader_init(&reader, buffer); + + buffer_from_reader = grpc_raw_byte_buffer_from_reader(&reader); + GPR_ASSERT(buffer->type == buffer_from_reader->type); + GPR_ASSERT(buffer_from_reader->data.raw.compression == GRPC_COMPRESS_NONE); + GPR_ASSERT(buffer_from_reader->data.raw.slice_buffer.count == 1); + GPR_ASSERT(memcmp(GPR_SLICE_START_PTR( + buffer_from_reader->data.raw.slice_buffer.slices[0]), + "test", 4) == 0); + + grpc_byte_buffer_destroy(buffer); + grpc_byte_buffer_destroy(buffer_from_reader); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); test_read_one_slice(); @@ -167,6 +191,7 @@ int main(int argc, char **argv) { test_read_none_compressed_slice(); test_read_gzip_compressed_slice(); test_read_deflate_compressed_slice(); + test_byte_buffer_from_reader(); return 0; } diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 8c8d927d15..e1e44f9ac0 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -199,6 +199,15 @@ class AsyncClient : public Client { delete ClientRpcContext::detag(got_tag); } } + // Now clear out all the pre-allocated idle contexts + for (int ch = 0; ch < channel_count_; ch++) { + while (!contexts_[ch].empty()) { + // Get an idle context from the front of the list + auto* ctx = *(contexts_[ch].begin()); + contexts_[ch].pop_front(); + delete ctx; + } + } } bool ThreadFunc(Histogram* histogram, diff --git a/test/cpp/qps/perf_db.proto b/test/cpp/qps/perf_db.proto new file mode 100644 index 0000000000..60e038406a --- /dev/null +++ b/test/cpp/qps/perf_db.proto @@ -0,0 +1,71 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +import "test/cpp/qps/qpstest.proto"; + +package grpc.testing; + +service PerfDbTransfer { + // Sends client info + rpc RecordSingleClientData(SingleUserRecordRequest) + returns (SingleUserRecordReply) { + } +} + +// Metrics to be stored +message Metrics { + double qps = 1; + double qps_per_core = 2; + double perc_lat_50 = 3; + double perc_lat_90 = 4; + double perc_lat_95 = 5; + double perc_lat_99 = 6; + double perc_lat_99_point_9 = 7; + double server_system_time = 8; + double server_user_time = 9; + double client_system_time = 10; + double client_user_time = 11; +} + +// Request for storing a single user's data +message SingleUserRecordRequest { + string hashed_id = 1; + string test_name = 2; + string sys_info = 3; + string tag = 4; + Metrics metrics = 5; + ClientConfig client_config = 6; + ServerConfig server_config = 7; +} + +// Reply to request for storing single user's data +message SingleUserRecordReply { +} diff --git a/test/cpp/qps/perf_db_client.cc b/test/cpp/qps/perf_db_client.cc new file mode 100644 index 0000000000..08d20f0b8d --- /dev/null +++ b/test/cpp/qps/perf_db_client.cc @@ -0,0 +1,143 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/perf_db_client.h" + +namespace grpc { +namespace testing { + +// sets the client and server config information +void PerfDbClient::setConfigs(const ClientConfig& client_config, + const ServerConfig& server_config) { + client_config_ = client_config; + server_config_ = server_config; +} + +// sets the QPS +void PerfDbClient::setQps(double qps) { + qps_ = qps; +} + +// sets the QPS per core +void PerfDbClient::setQpsPerCore(double qps_per_core) { + qps_per_core_ = qps_per_core; +} + +// sets the 50th, 90th, 95th, 99th and 99.9th percentile latency +void PerfDbClient::setLatencies(double perc_lat_50, + double perc_lat_90, + double perc_lat_95, + double perc_lat_99, + double perc_lat_99_point_9) { + perc_lat_50_ = perc_lat_50; + perc_lat_90_ = perc_lat_90; + perc_lat_95_ = perc_lat_95; + perc_lat_99_ = perc_lat_99; + perc_lat_99_point_9_ = perc_lat_99_point_9; +} + +// sets the server and client, user and system times +void PerfDbClient::setTimes(double server_system_time, double server_user_time, + double client_system_time, double client_user_time) { + server_system_time_ = server_system_time; + server_user_time_ = server_user_time; + client_system_time_ = client_system_time; + client_user_time_ = client_user_time; +} + +// sends the data to the performance database server +bool PerfDbClient::sendData(std::string hashed_id, std::string test_name, + std::string sys_info, std::string tag) { + // Data record request object + SingleUserRecordRequest single_user_record_request; + + // setting access token, name of the test and the system information + single_user_record_request.set_hashed_id(hashed_id); + single_user_record_request.set_test_name(test_name); + single_user_record_request.set_sys_info(sys_info); + single_user_record_request.set_tag(tag); + + // setting configs + *(single_user_record_request.mutable_client_config()) = client_config_; + *(single_user_record_request.mutable_server_config()) = server_config_; + + Metrics* metrics = single_user_record_request.mutable_metrics(); + + // setting metrcs in data record request + if (qps_ != DBL_MIN) { + metrics->set_qps(qps_); + } + if (qps_per_core_ != DBL_MIN) { + metrics->set_qps_per_core(qps_per_core_); + } + if (perc_lat_50_ != DBL_MIN) { + metrics->set_perc_lat_50(perc_lat_50_); + } + if (perc_lat_90_ != DBL_MIN) { + metrics->set_perc_lat_90(perc_lat_90_); + } + if (perc_lat_95_ != DBL_MIN) { + metrics->set_perc_lat_95(perc_lat_95_); + } + if (perc_lat_99_ != DBL_MIN) { + metrics->set_perc_lat_99(perc_lat_99_); + } + if (perc_lat_99_point_9_ != DBL_MIN) { + metrics->set_perc_lat_99_point_9(perc_lat_99_point_9_); + } + if (server_system_time_ != DBL_MIN) { + metrics->set_server_system_time(server_system_time_); + } + if (server_user_time_ != DBL_MIN) { + metrics->set_server_user_time(server_user_time_); + } + if (client_system_time_ != DBL_MIN) { + metrics->set_client_system_time(client_system_time_); + } + if (client_user_time_ != DBL_MIN) { + metrics->set_client_user_time(client_user_time_); + } + + SingleUserRecordReply single_user_record_reply; + ClientContext context; + + Status status = stub_->RecordSingleClientData( + &context, single_user_record_request, &single_user_record_reply); + if (status.ok()) { + return true; // data sent to database successfully + } else { + return false; // error in data sending + } +} +} // testing +} // grpc diff --git a/test/cpp/qps/perf_db_client.h b/test/cpp/qps/perf_db_client.h new file mode 100644 index 0000000000..ce7a88bbff --- /dev/null +++ b/test/cpp/qps/perf_db_client.h @@ -0,0 +1,115 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <iostream> +#include <memory> +#include <string> +#include <cfloat> + +#include <grpc/grpc.h> +#include <grpc++/channel_arguments.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/credentials.h> +#include <grpc++/status.h> +#include "test/cpp/qps/perf_db.grpc.pb.h" + +namespace grpc { +namespace testing { + +// Manages data sending to performance database server +class PerfDbClient { + public: + PerfDbClient() { + qps_ = DBL_MIN; + qps_per_core_ = DBL_MIN; + perc_lat_50_ = DBL_MIN; + perc_lat_90_ = DBL_MIN; + perc_lat_95_ = DBL_MIN; + perc_lat_99_ = DBL_MIN; + perc_lat_99_point_9_ = DBL_MIN; + server_system_time_ = DBL_MIN; + server_user_time_ = DBL_MIN; + client_system_time_ = DBL_MIN; + client_user_time_ = DBL_MIN; + } + + void init(std::shared_ptr<ChannelInterface> channel) { + stub_ = PerfDbTransfer::NewStub(channel); + } + + ~PerfDbClient() {} + + // sets the client and server config information + void setConfigs(const ClientConfig& client_config, + const ServerConfig& server_config); + + // sets the qps + void setQps(double qps); + + // sets the qps per core + void setQpsPerCore(double qps_per_core); + + // sets the 50th, 90th, 95th, 99th and 99.9th percentile latency + void setLatencies(double perc_lat_50, double perc_lat_90, + double perc_lat_95, double perc_lat_99, + double perc_lat_99_point_9); + + // sets the server and client, user and system times + void setTimes(double server_system_time, double server_user_time, + double client_system_time, double client_user_time); + + // sends the data to the performance database server + bool sendData(std::string hashed_id, std::string test_name, + std::string sys_info, std::string tag); + + private: + std::unique_ptr<PerfDbTransfer::Stub> stub_; + ClientConfig client_config_; + ServerConfig server_config_; + double qps_; + double qps_per_core_; + double perc_lat_50_; + double perc_lat_90_; + double perc_lat_95_; + double perc_lat_99_; + double perc_lat_99_point_9_; + double server_system_time_; + double server_user_time_; + double client_system_time_; + double client_user_time_; +}; + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/qps_test_openloop.cc b/test/cpp/qps/qps_test_openloop.cc index 52873b2987..96a9b4504c 100644 --- a/test/cpp/qps/qps_test_openloop.cc +++ b/test/cpp/qps/qps_test_openloop.cc @@ -60,7 +60,7 @@ static void RunQPS() { client_config.set_rpc_type(UNARY); client_config.set_load_type(POISSON); client_config.mutable_load_params()-> - mutable_poisson()->set_offered_load(10000.0); + mutable_poisson()->set_offered_load(1000.0); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index 94aacdbd1c..ff01ec1501 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -67,7 +67,6 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) { } } - void GprLogReporter::ReportQPS(const ScenarioResult& result) { gpr_log(GPR_INFO, "QPS: %.1f", result.latencies.Count() / @@ -76,10 +75,9 @@ void GprLogReporter::ReportQPS(const ScenarioResult& result) { } void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { - auto qps = - result.latencies.Count() / - average(result.client_resources, - [](ResourceUsage u) { return u.wall_time; }); + auto qps = result.latencies.Count() / + average(result.client_resources, + [](ResourceUsage u) { return u.wall_time; }); gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, qps / result.server_config.threads()); @@ -118,5 +116,71 @@ void GprLogReporter::ReportTimes(const ScenarioResult& result) { [](ResourceUsage u) { return u.wall_time; })); } +void PerfDbReporter::ReportQPS(const ScenarioResult& result) { + auto qps = result.latencies.Count() / + average(result.client_resources, + [](ResourceUsage u) { return u.wall_time; }); + + perf_db_client_.setQps(qps); + perf_db_client_.setConfigs(result.client_config, result.server_config); +} + +void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { + auto qps = result.latencies.Count() / + average(result.client_resources, + [](ResourceUsage u) { return u.wall_time; }); + + auto qpsPerCore = qps / result.server_config.threads(); + + perf_db_client_.setQps(qps); + perf_db_client_.setQpsPerCore(qpsPerCore); + perf_db_client_.setConfigs(result.client_config, result.server_config); +} + +void PerfDbReporter::ReportLatency(const ScenarioResult& result) { + perf_db_client_.setLatencies(result.latencies.Percentile(50) / 1000, + result.latencies.Percentile(90) / 1000, + result.latencies.Percentile(95) / 1000, + result.latencies.Percentile(99) / 1000, + result.latencies.Percentile(99.9) / 1000); + perf_db_client_.setConfigs(result.client_config, result.server_config); +} + +void PerfDbReporter::ReportTimes(const ScenarioResult& result) { + double server_system_time = + 100.0 * sum(result.server_resources, + [](ResourceUsage u) { return u.system_time; }) / + sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); + double server_user_time = + 100.0 * sum(result.server_resources, + [](ResourceUsage u) { return u.user_time; }) / + sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; }); + double client_system_time = + 100.0 * sum(result.client_resources, + [](ResourceUsage u) { return u.system_time; }) / + sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); + double client_user_time = + 100.0 * sum(result.client_resources, + [](ResourceUsage u) { return u.user_time; }) / + sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; }); + + perf_db_client_.setTimes(server_system_time, server_user_time, client_system_time, + client_user_time); + perf_db_client_.setConfigs(result.client_config, result.server_config); +} + +void PerfDbReporter::SendData() { + // send data to performance database + bool data_state = + perf_db_client_.sendData(hashed_id_, test_name_, sys_info_, tag_); + + // check state of data sending + if (data_state) { + gpr_log(GPR_INFO, "Data sent to performance database successfully"); + } else { + gpr_log(GPR_INFO, "Data could not be sent to performance database"); + } +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h index b1cf83fc23..aec3cbe80a 100644 --- a/test/cpp/qps/report.h +++ b/test/cpp/qps/report.h @@ -41,6 +41,7 @@ #include "test/cpp/qps/driver.h" #include "test/cpp/qps/qpstest.grpc.pb.h" +#include "test/cpp/qps/perf_db_client.h" namespace grpc { namespace testing { @@ -103,6 +104,35 @@ class GprLogReporter : public Reporter { void ReportTimes(const ScenarioResult& result) GRPC_OVERRIDE; }; +/** Reporter for performance database tool */ +class PerfDbReporter : public Reporter { + public: + PerfDbReporter(const string& name, const string& hashed_id, + const string& test_name, const string& sys_info, + const string& server_address, const string& tag) + : Reporter(name), + hashed_id_(hashed_id), + test_name_(test_name), + sys_info_(sys_info), + tag_(tag) { + perf_db_client_.init(grpc::CreateChannel( + server_address, grpc::InsecureCredentials(), ChannelArguments())); + } + ~PerfDbReporter() GRPC_OVERRIDE { SendData(); }; + + private: + PerfDbClient perf_db_client_; + std::string hashed_id_; + std::string test_name_; + std::string sys_info_; + std::string tag_; + void ReportQPS(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportQPSPerCore(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportLatency(const ScenarioResult& result) GRPC_OVERRIDE; + void ReportTimes(const ScenarioResult& result) GRPC_OVERRIDE; + void SendData(); +}; + } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 210aef4fd6..f5251e961b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -64,7 +64,7 @@ namespace testing { class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) { + AsyncQpsServerTest(const ServerConfig &config, int port) { char *server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -97,6 +97,9 @@ class AsyncQpsServerTest : public Server { } } for (int i = 0; i < config.threads(); i++) { + shutdown_state_.emplace_back(new PerThreadShutdownState()); + } + for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; @@ -105,11 +108,9 @@ class AsyncQpsServerTest : public Server { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke bool still_going = ctx->RunNextState(ok); - std::unique_lock<std::mutex> g(shutdown_mutex_); - if (!shutdown_) { + if (!shutdown_state_[i]->shutdown()) { // this RPC context is done, so refresh it if (!still_going) { - g.unlock(); ctx->Reset(); } } else { @@ -122,9 +123,8 @@ class AsyncQpsServerTest : public Server { } ~AsyncQpsServerTest() { server_->Shutdown(); - { - std::lock_guard<std::mutex> g(shutdown_mutex_); - shutdown_ = true; + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + (*ss)->set_shutdown(); } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); @@ -316,8 +316,25 @@ class AsyncQpsServerTest : public Server { TestService::AsyncService async_service_; std::forward_list<ServerRpcContext *> contexts_; - std::mutex shutdown_mutex_; - bool shutdown_; + class PerThreadShutdownState { + public: + PerThreadShutdownState() : shutdown_(false) {} + + bool shutdown() const { + std::lock_guard<std::mutex> lock(mutex_); + return shutdown_; + } + + void set_shutdown() { + std::lock_guard<std::mutex> lock(mutex_); + shutdown_ = true; + } + + private: + mutable std::mutex mutex_; + bool shutdown_; + }; + std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, diff --git a/test/cpp/util/benchmark_config.cc b/test/cpp/util/benchmark_config.cc index 5b3c1daf5d..91fbbf9677 100644 --- a/test/cpp/util/benchmark_config.cc +++ b/test/cpp/util/benchmark_config.cc @@ -37,6 +37,18 @@ DEFINE_bool(enable_log_reporter, true, "Enable reporting of benchmark results through GprLog"); +DEFINE_bool(report_metrics_db, false, "True if metrics to be reported to performance database"); + +DEFINE_string(hashed_id, "", "Hash of the user id"); + +DEFINE_string(test_name, "", "Name of the test being executed"); + +DEFINE_string(sys_info, "", "System information"); + +DEFINE_string(server_address, "localhost:50052", "Address of the performance database server"); + +DEFINE_string(tag, "", "Optional tag for the test"); + // In some distros, gflags is in the namespace google, and in some others, // in gflags. This hack is enabling us to find both. namespace google {} @@ -57,6 +69,12 @@ static std::shared_ptr<Reporter> InitBenchmarkReporters() { composite_reporter->add( std::unique_ptr<Reporter>(new GprLogReporter("LogReporter"))); } + if(FLAGS_report_metrics_db) { + composite_reporter->add( + std::unique_ptr<Reporter>(new PerfDbReporter("PerfDbReporter", FLAGS_hashed_id, FLAGS_test_name, + FLAGS_sys_info, FLAGS_server_address, FLAGS_tag))); + } + return std::shared_ptr<Reporter>(composite_reporter); } diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 3a858277fa..b228aaa66c 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -9881,6 +9881,9 @@ "test/cpp/qps/driver.h", "test/cpp/qps/histogram.h", "test/cpp/qps/interarrival.h", + "test/cpp/qps/perf_db.grpc.pb.h", + "test/cpp/qps/perf_db.pb.h", + "test/cpp/qps/perf_db_client.h", "test/cpp/qps/qps_worker.h", "test/cpp/qps/qpstest.grpc.pb.h", "test/cpp/qps/qpstest.pb.h", @@ -9900,6 +9903,8 @@ "test/cpp/qps/driver.h", "test/cpp/qps/histogram.h", "test/cpp/qps/interarrival.h", + "test/cpp/qps/perf_db_client.cc", + "test/cpp/qps/perf_db_client.h", "test/cpp/qps/qps_worker.cc", "test/cpp/qps/qps_worker.h", "test/cpp/qps/report.cc", |