diff options
Diffstat (limited to 'src')
27 files changed, 291 insertions, 181 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index fa72f9b0d9..a26eeb46b9 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -322,7 +322,7 @@ void PrintHeaderClientMethod(Printer *printer, const Method *method, printer->Print( *vars, "::grpc::Status $Method$(::grpc::ClientContext* context, " - "const $Request$& request, $Response$* response) GRPC_OVERRIDE;\n"); + "const $Request$& request, $Response$* response) override;\n"); printer->Print( *vars, "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " @@ -417,37 +417,34 @@ void PrintHeaderClientMethod(Printer *printer, const Method *method, "::grpc::ClientAsyncResponseReader< $Response$>* " "Async$Method$Raw(::grpc::ClientContext* context, " "const $Request$& request, " - "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n"); + "::grpc::CompletionQueue* cq) override;\n"); } else if (method->ClientOnlyStreaming()) { printer->Print(*vars, "::grpc::ClientWriter< $Request$>* $Method$Raw(" "::grpc::ClientContext* context, $Response$* response) " - "GRPC_OVERRIDE;\n"); - printer->Print( - *vars, - "::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw(" - "::grpc::ClientContext* context, $Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n"); + "override;\n"); + printer->Print(*vars, + "::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw(" + "::grpc::ClientContext* context, $Response$* response, " + "::grpc::CompletionQueue* cq, void* tag) override;\n"); } else if (method->ServerOnlyStreaming()) { printer->Print(*vars, "::grpc::ClientReader< $Response$>* $Method$Raw(" "::grpc::ClientContext* context, const $Request$& request)" - " GRPC_OVERRIDE;\n"); + " override;\n"); printer->Print( *vars, "::grpc::ClientAsyncReader< $Response$>* Async$Method$Raw(" "::grpc::ClientContext* context, const $Request$& request, " - "::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n"); + "::grpc::CompletionQueue* cq, void* tag) override;\n"); } else if (method->BidiStreaming()) { - printer->Print( - *vars, - "::grpc::ClientReaderWriter< $Request$, $Response$>* " - "$Method$Raw(::grpc::ClientContext* context) GRPC_OVERRIDE;\n"); - printer->Print( - *vars, - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n"); + printer->Print(*vars, + "::grpc::ClientReaderWriter< $Request$, $Response$>* " + "$Method$Raw(::grpc::ClientContext* context) override;\n"); + printer->Print(*vars, + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " + "Async$Method$Raw(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq, void* tag) override;\n"); } } } @@ -509,7 +506,7 @@ void PrintHeaderServerMethodAsync(Printer *printer, const Method *method, " ::grpc::Service::MarkMethodAsync($Idx$);\n" "}\n"); printer->Print(*vars, - "~WithAsyncMethod_$Method$() GRPC_OVERRIDE {\n" + "~WithAsyncMethod_$Method$() override {\n" " BaseClassMustBeDerivedFromService(this);\n" "}\n"); if (method->NoStreaming()) { @@ -518,7 +515,7 @@ void PrintHeaderServerMethodAsync(Printer *printer, const Method *method, "// disable synchronous version of this method\n" "::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n" + "$Response$* response) final override {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); @@ -540,7 +537,7 @@ void PrintHeaderServerMethodAsync(Printer *printer, const Method *method, "::grpc::Status $Method$(" "::grpc::ServerContext* context, " "::grpc::ServerReader< $Request$>* reader, " - "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n" + "$Response$* response) final override {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); @@ -561,7 +558,7 @@ void PrintHeaderServerMethodAsync(Printer *printer, const Method *method, "// disable synchronous version of this method\n" "::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE " + "::grpc::ServerWriter< $Response$>* writer) final override " "{\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" @@ -585,7 +582,7 @@ void PrintHeaderServerMethodAsync(Printer *printer, const Method *method, "::grpc::Status $Method$(" "::grpc::ServerContext* context, " "::grpc::ServerReaderWriter< $Response$, $Request$>* stream) " - "GRPC_FINAL GRPC_OVERRIDE {\n" + "final override {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); @@ -632,7 +629,7 @@ void PrintHeaderServerMethodStreamedUnary( "std::placeholders::_2)));\n" "}\n"); printer->Print(*vars, - "~WithStreamedUnaryMethod_$Method$() GRPC_OVERRIDE {\n" + "~WithStreamedUnaryMethod_$Method$() override {\n" " BaseClassMustBeDerivedFromService(this);\n" "}\n"); printer->Print( @@ -640,7 +637,7 @@ void PrintHeaderServerMethodStreamedUnary( "// disable regular version of this method\n" "::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n" + "$Response$* response) final override {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); @@ -683,7 +680,7 @@ void PrintHeaderServerMethodSplitStreaming( "std::placeholders::_2)));\n" "}\n"); printer->Print(*vars, - "~WithSplitStreamingMethod_$Method$() GRPC_OVERRIDE {\n" + "~WithSplitStreamingMethod_$Method$() override {\n" " BaseClassMustBeDerivedFromService(this);\n" "}\n"); printer->Print( @@ -691,7 +688,7 @@ void PrintHeaderServerMethodSplitStreaming( "// disable regular version of this method\n" "::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE " + "::grpc::ServerWriter< $Response$>* writer) final override " "{\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" @@ -727,7 +724,7 @@ void PrintHeaderServerMethodGeneric( " ::grpc::Service::MarkMethodGeneric($Idx$);\n" "}\n"); printer->Print(*vars, - "~WithGenericMethod_$Method$() GRPC_OVERRIDE {\n" + "~WithGenericMethod_$Method$() override {\n" " BaseClassMustBeDerivedFromService(this);\n" "}\n"); if (method->NoStreaming()) { @@ -736,7 +733,7 @@ void PrintHeaderServerMethodGeneric( "// disable synchronous version of this method\n" "::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n" + "$Response$* response) final override {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); @@ -747,7 +744,7 @@ void PrintHeaderServerMethodGeneric( "::grpc::Status $Method$(" "::grpc::ServerContext* context, " "::grpc::ServerReader< $Request$>* reader, " - "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n" + "$Response$* response) final override {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); @@ -757,7 +754,7 @@ void PrintHeaderServerMethodGeneric( "// disable synchronous version of this method\n" "::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE " + "::grpc::ServerWriter< $Response$>* writer) final override " "{\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" @@ -769,7 +766,7 @@ void PrintHeaderServerMethodGeneric( "::grpc::Status $Method$(" "::grpc::ServerContext* context, " "::grpc::ServerReaderWriter< $Response$, $Request$>* stream) " - "GRPC_FINAL GRPC_OVERRIDE {\n" + "final override {\n" " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); @@ -784,7 +781,7 @@ void PrintHeaderService(Printer *printer, const Service *service, printer->Print(service->GetLeadingComments().c_str()); printer->Print(*vars, - "class $Service$ GRPC_FINAL {\n" + "class $Service$ final {\n" " public:\n"); printer->Indent(); @@ -810,7 +807,7 @@ void PrintHeaderService(Printer *printer, const Service *service, printer->Outdent(); printer->Print("};\n"); printer->Print( - "class Stub GRPC_FINAL : public StubInterface" + "class Stub final : public StubInterface" " {\n public:\n"); printer->Indent(); printer->Print( diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 734108a9db..30e412e358 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -402,7 +402,7 @@ static void parse_server(const grpc_grpclb_server *server, } else if (ip->size == 16) { addr->len = sizeof(struct sockaddr_in6); struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr; - addr6->sin6_family = AF_INET; + addr6->sin6_family = AF_INET6; memcpy(&addr6->sin6_addr, ip->bytes, ip->size); addr6->sin6_port = netorder_port; } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index b0c461730b..427999aa6b 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -678,8 +678,10 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->index = subchannel_idx; sd->subchannel = subchannel; sd->user_data_vtable = addresses->user_data_vtable; - sd->user_data = - sd->user_data_vtable->copy(addresses->addresses[i].user_data); + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 8180f78fc0..9702cb2c81 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -50,6 +50,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/bin_encoder.h" +#include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" @@ -1578,6 +1579,20 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { grpc_chttp2_maybe_complete_recv_initial_metadata, grpc_chttp2_maybe_complete_recv_trailing_metadata}; +static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, + grpc_error *error) { + grpc_chttp2_stream *s = sp; + grpc_chttp2_transport *t = s->t; + if (!s->write_closed) { + gpr_slice_buffer_add( + &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR, + &s->stats.outgoing)); + grpc_chttp2_initiate_write(exec_ctx, t, false, "force_rst_stream"); + grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE); + } + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); +} + grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, void *hpack_parser, grpc_chttp2_transport *t, @@ -1613,6 +1628,17 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, s->header_frames_received++; } if (parser->is_eof) { + if (t->is_client && !s->write_closed) { + /* server eof ==> complete closure; we may need to forcefully close + the stream. Wait until the combiner lock is ready to be released + however -- it might be that we receive a RST_STREAM following this + and can avoid the extra write */ + GRPC_CHTTP2_STREAM_REF(s, "final_rst"); + grpc_combiner_execute_finally( + exec_ctx, t->combiner, + grpc_closure_create(force_client_rst_stream, s), GRPC_ERROR_NONE, + false); + } grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, GRPC_ERROR_NONE); } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index e39cf28e35..8a06443d58 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -715,3 +715,10 @@ void grpc_resource_user_alloc_slices( grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, count * length, &slice_allocator->on_allocated); } + +gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + size_t size) { + grpc_resource_user_alloc(exec_ctx, resource_user, size, NULL); + return ru_slice_create(resource_user, size); +} diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index 6dfac55f88..da68f21a2c 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -221,4 +221,9 @@ void grpc_resource_user_alloc_slices( grpc_resource_user_slice_allocator *slice_allocator, size_t length, size_t count, gpr_slice_buffer *dest); +/* Allocate one slice of length \a size synchronously. */ +gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + size_t size); + #endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 6274667042..b07f9ceffa 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -54,9 +54,12 @@ typedef struct grpc_uv_tcp_connect { grpc_endpoint **endpoint; int refs; char *addr_name; + grpc_resource_quota *resource_quota; } grpc_uv_tcp_connect; -static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) { +static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx, + grpc_uv_tcp_connect *connect) { + grpc_resource_quota_internal_unref(exec_ctx, connect->resource_quota); gpr_free(connect); } @@ -74,7 +77,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, } done = (--connect->refs == 0); if (done) { - uv_tcp_connect_cleanup(connect); + uv_tcp_connect_cleanup(exec_ctx, connect); } } @@ -86,8 +89,8 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { grpc_closure *closure = connect->closure; grpc_timer_cancel(&exec_ctx, &connect->alarm); if (status == 0) { - *connect->endpoint = - grpc_tcp_create(connect->tcp_handle, connect->addr_name); + *connect->endpoint = grpc_tcp_create( + connect->tcp_handle, connect->resource_quota, connect->addr_name); } else { error = GRPC_ERROR_CREATE("Failed to connect to remote host"); error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status); @@ -105,7 +108,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { } done = (--connect->refs == 0); if (done) { - uv_tcp_connect_cleanup(connect); + uv_tcp_connect_cleanup(&exec_ctx, connect); } grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL); grpc_exec_ctx_finish(&exec_ctx); @@ -114,16 +117,31 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, const grpc_resolved_address *resolved_addr, gpr_timespec deadline) { grpc_uv_tcp_connect *connect; + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + (void)channel_args; (void)interested_parties; + + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_internal_ref( + channel_args->args[i].value.pointer.p); + } + } + } + connect = gpr_malloc(sizeof(grpc_uv_tcp_connect)); memset(connect, 0, sizeof(grpc_uv_tcp_connect)); connect->closure = closure; connect->endpoint = ep; connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t)); connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); + connect->resource_quota = resource_quota; uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; // TODO(murgatroid99): figure out what the return value here means @@ -138,16 +156,18 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, // overridden by api_fuzzer.c void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - grpc_pollset_set *interested_parties, const grpc_resolved_address *addr, + grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, + const grpc_resolved_address *addr, gpr_timespec deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, const grpc_resolved_address *addr, gpr_timespec deadline) { - grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, - deadline); + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, + channel_args, addr, deadline); } #endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 73e4db3d65..b5b9b92a20 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -76,13 +76,30 @@ struct grpc_tcp_server { /* shutdown callback */ grpc_closure *shutdown_complete; + + grpc_resource_quota *resource_quota; }; -grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, +grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, + grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); - (void)args; + s->resource_quota = grpc_resource_quota_create(NULL); + for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { + if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { + if (args->args[i].type == GRPC_ARG_POINTER) { + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + s->resource_quota = + grpc_resource_quota_internal_ref(args->args[i].value.pointer.p); + } else { + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + gpr_free(s); + return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA + " must be a pointer to a buffer pool"); + } + } + } gpr_ref_init(&s->refs, 1); s->on_accept_cb = NULL; s->on_accept_cb_arg = NULL; @@ -119,6 +136,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(sp->handle); gpr_free(sp); } + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); gpr_free(s); } @@ -201,7 +219,7 @@ static void on_connect(uv_stream_t *server, int status) { } else { gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); } - ep = grpc_tcp_create(client, peer_name_string); + ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, &acceptor); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 3860fe3e9b..8e74c9e863 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -54,6 +54,9 @@ typedef struct { grpc_endpoint base; gpr_refcount refcount; + uv_write_t write_req; + uv_shutdown_t shutdown_req; + uv_tcp_t *handle; grpc_closure *read_cb; @@ -64,14 +67,23 @@ typedef struct { gpr_slice_buffer *write_slices; uv_buf_t *write_buffers; + grpc_resource_user resource_user; + bool shutting_down; + bool resource_user_shutting_down; + char *peer_string; grpc_pollset *pollset; } grpc_tcp; static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } -static void tcp_free(grpc_tcp *tcp) { gpr_free(tcp); } +static void tcp_free(grpc_tcp *tcp) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user); + gpr_free(tcp); + grpc_exec_ctx_finish(&exec_ctx); +} /*#define GRPC_TCP_REFCOUNT_DEBUG*/ #ifdef GRPC_TCP_REFCOUNT_DEBUG @@ -106,11 +118,14 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp *tcp = handle->data; (void)suggested_size; - tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + tcp->read_slice = grpc_resource_user_slice_malloc( + &exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice); buf->len = GPR_SLICE_LENGTH(tcp->read_slice); + grpc_exec_ctx_finish(&exec_ctx); } static void read_callback(uv_stream_t *stream, ssize_t nread, @@ -198,7 +213,8 @@ static void write_callback(uv_write_t *req, int status) { gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); } gpr_free(tcp->write_buffers); - gpr_free(req); + grpc_resource_user_free(&exec_ctx, &tcp->resource_user, + sizeof(uv_buf_t) * tcp->write_slices->count); grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); grpc_exec_ctx_finish(&exec_ctx); } @@ -243,12 +259,15 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); + grpc_resource_user_alloc(exec_ctx, &tcp->resource_user, + sizeof(uv_buf_t) * buffer_count, NULL); for (i = 0; i < buffer_count; i++) { slice = &tcp->write_slices->slices[i]; buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice); buffers[i].len = GPR_SLICE_LENGTH(*slice); } - write_req = gpr_malloc(sizeof(uv_write_t)); + tcp->write_buffers = buffers; + write_req = &tcp->write_req; write_req->data = tcp; TCP_REF(tcp, "write"); // TODO(murgatroid99): figure out what the return value here means @@ -274,13 +293,29 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, (void)pollset; } -static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); } +static void shutdown_callback(uv_shutdown_t *req, int status) {} + +static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + TCP_UNREF(arg, "resource_user"); +} + +static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx, + grpc_tcp *tcp) { + if (!tcp->resource_user_shutting_down) { + tcp->resource_user_shutting_down = true; + TCP_REF(tcp, "resource_user"); + grpc_resource_user_shutdown( + exec_ctx, &tcp->resource_user, + grpc_closure_create(resource_user_shutdown_done, tcp)); + } +} static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { tcp->shutting_down = true; - uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t)); + uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); } } @@ -289,6 +324,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; uv_close((uv_handle_t *)tcp->handle, uv_close_callback); + uv_resource_user_maybe_shutdown(exec_ctx, tcp); TCP_UNREF(tcp, "destroy"); } @@ -297,18 +333,21 @@ static char *uv_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } +static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return &tcp->resource_user; +} + static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } -static grpc_endpoint_vtable vtable = {uv_endpoint_read, - uv_endpoint_write, - uv_get_workqueue, - uv_add_to_pollset, - uv_add_to_pollset_set, - uv_endpoint_shutdown, - uv_destroy, - uv_get_peer}; +static grpc_endpoint_vtable vtable = { + uv_endpoint_read, uv_endpoint_write, uv_get_workqueue, + uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown, + uv_destroy, uv_get_resource_user, uv_get_peer}; -grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { +grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, + grpc_resource_quota *resource_quota, + char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); if (grpc_tcp_trace) { @@ -325,6 +364,8 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; + tcp->resource_user_shutting_down = false; + grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h index eed41151ea..970fcafe4a 100644 --- a/src/core/lib/iomgr/tcp_uv.h +++ b/src/core/lib/iomgr/tcp_uv.h @@ -52,6 +52,8 @@ extern int grpc_tcp_trace; #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 -grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string); +grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, + grpc_resource_quota *resource_quota, + char *peer_string); #endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 43b3875cb3..847c8c7dc0 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -106,11 +106,11 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) { } namespace { -class TagSaver GRPC_FINAL : public CompletionQueueTag { +class TagSaver final : public CompletionQueueTag { public: explicit TagSaver(void* tag) : tag_(tag) {} - ~TagSaver() GRPC_OVERRIDE {} - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + ~TagSaver() override {} + bool FinalizeResult(void** tag, bool* status) override { *tag = tag_; delete this; return true; diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index b6008f47b1..c073741dac 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -45,12 +45,12 @@ namespace grpc { -class DefaultGlobalClientCallbacks GRPC_FINAL +class DefaultGlobalClientCallbacks final : public ClientContext::GlobalCallbacks { public: - ~DefaultGlobalClientCallbacks() GRPC_OVERRIDE {} - void DefaultConstructor(ClientContext* context) GRPC_OVERRIDE {} - void Destructor(ClientContext* context) GRPC_OVERRIDE {} + ~DefaultGlobalClientCallbacks() override {} + void DefaultConstructor(ClientContext* context) override {} + void Destructor(ClientContext* context) override {} }; static DefaultGlobalClientCallbacks g_default_client_callbacks; @@ -93,7 +93,7 @@ void ClientContext::AddMetadata(const grpc::string& meta_key, void ClientContext::set_call(grpc_call* call, const std::shared_ptr<Channel>& channel) { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); GPR_ASSERT(call_ == nullptr); call_ = call; channel_ = channel; @@ -119,7 +119,7 @@ void ClientContext::set_compression_algorithm( } void ClientContext::TryCancel() { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (call_) { grpc_call_cancel(call_, nullptr); } else { diff --git a/src/cpp/client/cronet_credentials.cc b/src/cpp/client/cronet_credentials.cc index 60cad097db..8e94cf0ad7 100644 --- a/src/cpp/client/cronet_credentials.cc +++ b/src/cpp/client/cronet_credentials.cc @@ -40,12 +40,12 @@ namespace grpc { -class CronetChannelCredentialsImpl GRPC_FINAL : public ChannelCredentials { +class CronetChannelCredentialsImpl final : public ChannelCredentials { public: CronetChannelCredentialsImpl(void* engine) : engine_(engine) {} std::shared_ptr<grpc::Channel> CreateChannel( - const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE { + const string& target, const grpc::ChannelArguments& args) override { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); return CreateChannelInternal( @@ -53,9 +53,7 @@ class CronetChannelCredentialsImpl GRPC_FINAL : public ChannelCredentials { &channel_args, nullptr)); } - SecureChannelCredentials* AsSecureCredentials() GRPC_OVERRIDE { - return nullptr; - } + SecureChannelCredentials* AsSecureCredentials() override { return nullptr; } private: void* engine_; diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc index 13019a7117..116f1dd4ad 100644 --- a/src/cpp/client/insecure_credentials.cc +++ b/src/cpp/client/insecure_credentials.cc @@ -43,10 +43,10 @@ namespace grpc { namespace { -class InsecureChannelCredentialsImpl GRPC_FINAL : public ChannelCredentials { +class InsecureChannelCredentialsImpl final : public ChannelCredentials { public: std::shared_ptr<grpc::Channel> CreateChannel( - const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE { + const string& target, const grpc::ChannelArguments& args) override { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); return CreateChannelInternal( @@ -54,9 +54,7 @@ class InsecureChannelCredentialsImpl GRPC_FINAL : public ChannelCredentials { grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr)); } - SecureChannelCredentials* AsSecureCredentials() GRPC_OVERRIDE { - return nullptr; - } + SecureChannelCredentials* AsSecureCredentials() override { return nullptr; } }; } // namespace diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h index ae41ef8007..281db17e98 100644 --- a/src/cpp/client/secure_credentials.h +++ b/src/cpp/client/secure_credentials.h @@ -43,34 +43,34 @@ namespace grpc { -class SecureChannelCredentials GRPC_FINAL : public ChannelCredentials { +class SecureChannelCredentials final : public ChannelCredentials { public: explicit SecureChannelCredentials(grpc_channel_credentials* c_creds); ~SecureChannelCredentials() { grpc_channel_credentials_release(c_creds_); } grpc_channel_credentials* GetRawCreds() { return c_creds_; } std::shared_ptr<grpc::Channel> CreateChannel( - const string& target, const grpc::ChannelArguments& args) GRPC_OVERRIDE; - SecureChannelCredentials* AsSecureCredentials() GRPC_OVERRIDE { return this; } + const string& target, const grpc::ChannelArguments& args) override; + SecureChannelCredentials* AsSecureCredentials() override { return this; } private: grpc_channel_credentials* const c_creds_; }; -class SecureCallCredentials GRPC_FINAL : public CallCredentials { +class SecureCallCredentials final : public CallCredentials { public: explicit SecureCallCredentials(grpc_call_credentials* c_creds); ~SecureCallCredentials() { grpc_call_credentials_release(c_creds_); } grpc_call_credentials* GetRawCreds() { return c_creds_; } - bool ApplyToCall(grpc_call* call) GRPC_OVERRIDE; - SecureCallCredentials* AsSecureCredentials() GRPC_OVERRIDE { return this; } + bool ApplyToCall(grpc_call* call) override; + SecureCallCredentials* AsSecureCredentials() override { return this; } private: grpc_call_credentials* const c_creds_; }; -class MetadataCredentialsPluginWrapper GRPC_FINAL { +class MetadataCredentialsPluginWrapper final { public: static void Destroy(void* wrapper); static void GetMetadata(void* wrapper, grpc_auth_metadata_context context, diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index ae32e02f69..fc0deff3b3 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -268,7 +268,7 @@ namespace internal { // Members of this class correspond to the members of the C // grpc_channel_filter struct. template <typename ChannelDataType, typename CallDataType> -class ChannelFilter GRPC_FINAL { +class ChannelFilter final { public: static const size_t channel_data_size = sizeof(ChannelDataType); diff --git a/src/cpp/common/secure_auth_context.h b/src/cpp/common/secure_auth_context.h index c9f1dad131..98f5f09e27 100644 --- a/src/cpp/common/secure_auth_context.h +++ b/src/cpp/common/secure_auth_context.h @@ -40,30 +40,29 @@ struct grpc_auth_context; namespace grpc { -class SecureAuthContext GRPC_FINAL : public AuthContext { +class SecureAuthContext final : public AuthContext { public: SecureAuthContext(grpc_auth_context* ctx, bool take_ownership); - ~SecureAuthContext() GRPC_OVERRIDE; + ~SecureAuthContext() override; - bool IsPeerAuthenticated() const GRPC_OVERRIDE; + bool IsPeerAuthenticated() const override; - std::vector<grpc::string_ref> GetPeerIdentity() const GRPC_OVERRIDE; + std::vector<grpc::string_ref> GetPeerIdentity() const override; - grpc::string GetPeerIdentityPropertyName() const GRPC_OVERRIDE; + grpc::string GetPeerIdentityPropertyName() const override; std::vector<grpc::string_ref> FindPropertyValues( - const grpc::string& name) const GRPC_OVERRIDE; + const grpc::string& name) const override; - AuthPropertyIterator begin() const GRPC_OVERRIDE; + AuthPropertyIterator begin() const override; - AuthPropertyIterator end() const GRPC_OVERRIDE; + AuthPropertyIterator end() const override; void AddProperty(const grpc::string& key, - const grpc::string_ref& value) GRPC_OVERRIDE; + const grpc::string_ref& value) override; - virtual bool SetPeerIdentityPropertyName(const grpc::string& name) - GRPC_OVERRIDE; + virtual bool SetPeerIdentityPropertyName(const grpc::string& name) override; private: grpc_auth_context* ctx_; diff --git a/src/cpp/ext/proto_server_reflection.h b/src/cpp/ext/proto_server_reflection.h index be5f062f9f..ca0ba97d88 100644 --- a/src/cpp/ext/proto_server_reflection.h +++ b/src/cpp/ext/proto_server_reflection.h @@ -42,7 +42,7 @@ namespace grpc { -class ProtoServerReflection GRPC_FINAL +class ProtoServerReflection final : public reflection::v1alpha::ServerReflection::Service { public: ProtoServerReflection(); @@ -56,7 +56,7 @@ class ProtoServerReflection GRPC_FINAL ServerContext* context, ServerReaderWriter<reflection::v1alpha::ServerReflectionResponse, reflection::v1alpha::ServerReflectionRequest>* stream) - GRPC_OVERRIDE; + override; private: Status ListService(ServerContext* context, diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index 4b226c2992..1fdc2edb25 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -31,16 +31,16 @@ * */ -#include <grpc++/impl/sync.h> -#include <grpc++/impl/thd.h> +#include <mutex> +#include <thread> #include "src/cpp/server/dynamic_thread_pool.h" namespace grpc { DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool) : pool_(pool), - thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, - this)) {} + thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, + this)) {} DynamicThreadPool::DynamicThread::~DynamicThread() { thd_->join(); thd_.reset(); @@ -49,7 +49,7 @@ DynamicThreadPool::DynamicThread::~DynamicThread() { void DynamicThreadPool::DynamicThread::ThreadFunc() { pool_->ThreadFunc(); // Now that we have killed ourselves, we should reduce the thread count - grpc::unique_lock<grpc::mutex> lock(pool_->mu_); + std::unique_lock<std::mutex> lock(pool_->mu_); pool_->nthreads_--; // Move ourselves to dead list pool_->dead_threads_.push_back(this); @@ -62,7 +62,7 @@ void DynamicThreadPool::DynamicThread::ThreadFunc() { void DynamicThreadPool::ThreadFunc() { for (;;) { // Wait until work is available or we are shutting down. - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (!shutdown_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread if (threads_waiting_ >= reserve_threads_) { @@ -91,7 +91,7 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) nthreads_(0), threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { - grpc::lock_guard<grpc::mutex> lock(mu_); + std::lock_guard<std::mutex> lock(mu_); nthreads_++; new DynamicThread(this); } @@ -104,7 +104,7 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) { } DynamicThreadPool::~DynamicThreadPool() { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); shutdown_ = true; cv_.notify_all(); while (nthreads_ != 0) { @@ -114,7 +114,7 @@ DynamicThreadPool::~DynamicThreadPool() { } void DynamicThreadPool::Add(const std::function<void()>& callback) { - grpc::lock_guard<grpc::mutex> lock(mu_); + std::lock_guard<std::mutex> lock(mu_); // Add works to the callbacks list callbacks_.push(callback); // Increase pool size or notify as needed diff --git a/src/cpp/server/dynamic_thread_pool.h b/src/cpp/server/dynamic_thread_pool.h index 5ba7533c05..4f8c4111cc 100644 --- a/src/cpp/server/dynamic_thread_pool.h +++ b/src/cpp/server/dynamic_thread_pool.h @@ -34,24 +34,25 @@ #ifndef GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H #define GRPC_INTERNAL_CPP_DYNAMIC_THREAD_POOL_H +#include <condition_variable> #include <list> #include <memory> +#include <mutex> #include <queue> +#include <thread> -#include <grpc++/impl/sync.h> -#include <grpc++/impl/thd.h> #include <grpc++/support/config.h> #include "src/cpp/server/thread_pool_interface.h" namespace grpc { -class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface { +class DynamicThreadPool final : public ThreadPoolInterface { public: explicit DynamicThreadPool(int reserve_threads); ~DynamicThreadPool(); - void Add(const std::function<void()>& callback) GRPC_OVERRIDE; + void Add(const std::function<void()>& callback) override; private: class DynamicThread { @@ -61,12 +62,12 @@ class DynamicThreadPool GRPC_FINAL : public ThreadPoolInterface { private: DynamicThreadPool* pool_; - std::unique_ptr<grpc::thread> thd_; + std::unique_ptr<std::thread> thd_; void ThreadFunc(); }; - grpc::mutex mu_; - grpc::condition_variable cv_; - grpc::condition_variable shutdown_cv_; + std::mutex mu_; + std::condition_variable cv_; + std::condition_variable shutdown_cv_; bool shutdown_; std::queue<std::function<void()>> callbacks_; int reserve_threads_; diff --git a/src/cpp/server/insecure_server_credentials.cc b/src/cpp/server/insecure_server_credentials.cc index ef3cae5fd7..eb5931b7b0 100644 --- a/src/cpp/server/insecure_server_credentials.cc +++ b/src/cpp/server/insecure_server_credentials.cc @@ -38,14 +38,13 @@ namespace grpc { namespace { -class InsecureServerCredentialsImpl GRPC_FINAL : public ServerCredentials { +class InsecureServerCredentialsImpl final : public ServerCredentials { public: - int AddPortToServer(const grpc::string& addr, - grpc_server* server) GRPC_OVERRIDE { + int AddPortToServer(const grpc::string& addr, grpc_server* server) override { return grpc_server_add_insecure_http2_port(server, addr.c_str()); } void SetAuthMetadataProcessor( - const std::shared_ptr<AuthMetadataProcessor>& processor) GRPC_OVERRIDE { + const std::shared_ptr<AuthMetadataProcessor>& processor) override { (void)processor; GPR_ASSERT(0); // Should not be called on InsecureServerCredentials. } diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h index 5460f4a02c..3a301e60c2 100644 --- a/src/cpp/server/secure_server_credentials.h +++ b/src/cpp/server/secure_server_credentials.h @@ -44,7 +44,7 @@ namespace grpc { -class AuthMetadataProcessorAyncWrapper GRPC_FINAL { +class AuthMetadataProcessorAyncWrapper final { public: static void Destroy(void* wrapper); @@ -64,19 +64,18 @@ class AuthMetadataProcessorAyncWrapper GRPC_FINAL { std::shared_ptr<AuthMetadataProcessor> processor_; }; -class SecureServerCredentials GRPC_FINAL : public ServerCredentials { +class SecureServerCredentials final : public ServerCredentials { public: explicit SecureServerCredentials(grpc_server_credentials* creds) : creds_(creds) {} - ~SecureServerCredentials() GRPC_OVERRIDE { + ~SecureServerCredentials() override { grpc_server_credentials_release(creds_); } - int AddPortToServer(const grpc::string& addr, - grpc_server* server) GRPC_OVERRIDE; + int AddPortToServer(const grpc::string& addr, grpc_server* server) override; void SetAuthMetadataProcessor( - const std::shared_ptr<AuthMetadataProcessor>& processor) GRPC_OVERRIDE; + const std::shared_ptr<AuthMetadataProcessor>& processor) override; private: grpc_server_credentials* creds_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index d46942d257..b7cfd6dbf1 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -55,11 +55,11 @@ namespace grpc { -class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks { +class DefaultGlobalCallbacks final : public Server::GlobalCallbacks { public: - ~DefaultGlobalCallbacks() GRPC_OVERRIDE {} - void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {} - void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {} + ~DefaultGlobalCallbacks() override {} + void PreSynchronousRequest(ServerContext* context) override {} + void PostSynchronousRequest(ServerContext* context) override {} }; static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; @@ -79,7 +79,7 @@ class Server::UnimplementedAsyncRequestContext { GenericServerAsyncReaderWriter generic_stream_; }; -class Server::UnimplementedAsyncRequest GRPC_FINAL +class Server::UnimplementedAsyncRequest final : public UnimplementedAsyncRequestContext, public GenericAsyncRequest { public: @@ -89,7 +89,7 @@ class Server::UnimplementedAsyncRequest GRPC_FINAL server_(server), cq_(cq) {} - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + bool FinalizeResult(void** tag, bool* status) override; ServerContext* context() { return &server_context_; } GenericServerAsyncReaderWriter* stream() { return &generic_stream_; } @@ -101,13 +101,13 @@ class Server::UnimplementedAsyncRequest GRPC_FINAL typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> UnimplementedAsyncResponseOp; -class Server::UnimplementedAsyncResponse GRPC_FINAL +class Server::UnimplementedAsyncResponse final : public UnimplementedAsyncResponseOp { public: UnimplementedAsyncResponse(UnimplementedAsyncRequest* request); ~UnimplementedAsyncResponse() { delete request_; } - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool FinalizeResult(void** tag, bool* status) override { bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status); delete this; return r; @@ -122,7 +122,7 @@ class ShutdownTag : public CompletionQueueTag { bool FinalizeResult(void** tag, bool* status) { return false; } }; -class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { +class Server::SyncRequest final : public CompletionQueueTag { public: SyncRequest(RpcServiceMethod* method, void* tag) : method_(method), @@ -170,7 +170,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } } - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool FinalizeResult(void** tag, bool* status) override { if (!*status) { grpc_completion_queue_destroy(cq_); } @@ -182,7 +182,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return true; } - class CallData GRPC_FINAL { + class CallData final { public: explicit CallData(Server* server, SyncRequest* mrd) : cq_(mrd->cq_), @@ -255,7 +255,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { cq_timeout_msec_(cq_timeout_msec), global_callbacks_(global_callbacks) {} - WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE { + WorkStatus PollForWork(void** tag, bool* ok) override { *tag = nullptr; gpr_timespec deadline = gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN); @@ -272,7 +272,7 @@ class Server::SyncRequestThreadManager : public ThreadManager { GPR_UNREACHABLE_CODE(return TIMEOUT); } - void DoWork(void* tag, bool ok) GRPC_OVERRIDE { + void DoWork(void* tag, bool ok) override { SyncRequest* sync_req = static_cast<SyncRequest*>(tag); if (!sync_req) { @@ -379,7 +379,7 @@ Server::Server( Server::~Server() { { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (started_ && !shutdown_) { lock.unlock(); Shutdown(); @@ -501,7 +501,7 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } void Server::ShutdownInternal(gpr_timespec deadline) { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; @@ -549,7 +549,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } void Server::Wait() { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); while (started_ && !shutdown_notified_) { shutdown_cv_.wait(lock); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 1ca6a2b906..a66ec4ac84 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -33,9 +33,10 @@ #include <grpc++/server_context.h> +#include <mutex> + #include <grpc++/completion_queue.h> #include <grpc++/impl/call.h> -#include <grpc++/impl/sync.h> #include <grpc++/support/time.h> #include <grpc/compression.h> #include <grpc/grpc.h> @@ -48,7 +49,7 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { +class ServerContext::CompletionOp final : public CallOpSetInterface { public: // initial refs: one in the server context, one in the cq CompletionOp() @@ -58,8 +59,8 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { finalized_(false), cancelled_(0) {} - void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + void FillOps(grpc_op* ops, size_t* nops) override; + bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { cq->TryPluck(this); @@ -76,20 +77,20 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { private: bool CheckCancelledNoPluck() { - grpc::lock_guard<grpc::mutex> g(mu_); + std::lock_guard<std::mutex> g(mu_); return finalized_ ? (cancelled_ != 0) : false; } bool has_tag_; void* tag_; - grpc::mutex mu_; + std::mutex mu_; int refs_; bool finalized_; int cancelled_; }; void ServerContext::CompletionOp::Unref() { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (--refs_ == 0) { lock.unlock(); delete this; @@ -105,7 +106,7 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); finalized_ = true; bool ret = false; if (has_tag_) { diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index caae4c457d..1450d009e4 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -31,12 +31,13 @@ * */ -#include <grpc++/impl/sync.h> -#include <grpc++/impl/thd.h> -#include <grpc/support/log.h> +#include "src/cpp/thread_manager/thread_manager.h" + #include <climits> +#include <mutex> +#include <thread> -#include "src/cpp/thread_manager/thread_manager.h" +#include <grpc/support/log.h> namespace grpc { @@ -59,7 +60,7 @@ ThreadManager::ThreadManager(int min_pollers, int max_pollers) ThreadManager::~ThreadManager() { { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); GPR_ASSERT(num_threads_ == 0); } @@ -67,29 +68,29 @@ ThreadManager::~ThreadManager() { } void ThreadManager::Wait() { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); while (num_threads_ != 0) { shutdown_cv_.wait(lock); } } void ThreadManager::Shutdown() { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); shutdown_ = true; } bool ThreadManager::IsShutdown() { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); return shutdown_; } void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { - std::unique_lock<grpc::mutex> list_lock(list_mu_); + std::unique_lock<std::mutex> list_lock(list_mu_); completed_threads_.push_back(thd); } - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); num_threads_--; if (num_threads_ == 0) { shutdown_cv_.notify_one(); @@ -97,7 +98,7 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) { } void ThreadManager::CleanupCompletedThreads() { - std::unique_lock<grpc::mutex> lock(list_mu_); + std::unique_lock<std::mutex> lock(list_mu_); for (auto thd = completed_threads_.begin(); thd != completed_threads_.end(); thd = completed_threads_.erase(thd)) { delete *thd; @@ -114,7 +115,7 @@ void ThreadManager::Initialize() { // less than max threshold (i.e max_pollers_) and the total number of threads is // below the maximum threshold, we can let the current thread continue as poller bool ThreadManager::MaybeContinueAsPoller() { - std::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (shutdown_ || num_pollers_ > max_pollers_) { return false; } @@ -127,7 +128,7 @@ bool ThreadManager::MaybeContinueAsPoller() { // threads currently blocked in PollForWork()) is below the threshold (i.e // min_pollers_) and the total number of threads is below the maximum threshold void ThreadManager::MaybeCreatePoller() { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); if (!shutdown_ && num_pollers_ < min_pollers_) { num_pollers_++; num_threads_++; @@ -156,7 +157,7 @@ void ThreadManager::MainWorkLoop() { WorkStatus work_status = PollForWork(&tag, &ok); { - grpc::unique_lock<grpc::mutex> lock(mu_); + std::unique_lock<std::mutex> lock(mu_); num_pollers_--; if (work_status == TIMEOUT && num_pollers_ > min_pollers_) { diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 9cfdb8af25..9c0569c62c 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -34,11 +34,12 @@ #ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H #define GRPC_INTERNAL_CPP_THREAD_MANAGER_H +#include <condition_variable> #include <list> #include <memory> +#include <mutex> +#include <thread> -#include <grpc++/impl/sync.h> -#include <grpc++/impl/thd.h> #include <grpc++/support/config.h> namespace grpc { @@ -115,7 +116,7 @@ class ThreadManager { void Run(); ThreadManager* thd_mgr_; - grpc::thread thd_; + std::thread thd_; }; // The main funtion in ThreadManager @@ -134,10 +135,10 @@ class ThreadManager { // Protects shutdown_, num_pollers_ and num_threads_ // TODO: sreek - Change num_pollers and num_threads_ to atomics - grpc::mutex mu_; + std::mutex mu_; bool shutdown_; - grpc::condition_variable shutdown_cv_; + std::condition_variable shutdown_cv_; // Number of threads doing polling int num_pollers_; @@ -150,7 +151,7 @@ class ThreadManager { // currently polling i.e num_pollers_) int num_threads_; - grpc::mutex list_mu_; + std::mutex list_mu_; std::list<WorkerThread*> completed_threads_; }; diff --git a/src/cpp/util/time_cc.cc b/src/cpp/util/time_cc.cc index c43d848cc6..cd59a19703 100644 --- a/src/cpp/util/time_cc.cc +++ b/src/cpp/util/time_cc.cc @@ -32,9 +32,6 @@ */ #include <grpc++/support/config.h> - -#ifndef GRPC_CXX0X_NO_CHRONO - #include <grpc++/support/time.h> #include <grpc/support/time.h> @@ -91,5 +88,3 @@ system_clock::time_point Timespec2Timepoint(gpr_timespec t) { } } // namespace grpc - -#endif // !GRPC_CXX0X_NO_CHRONO |