From d79b4865ce124d12b2394c2755df8f6539418e93 Mon Sep 17 00:00:00 2001 From: ctiller Date: Wed, 17 Dec 2014 16:36:59 -0800 Subject: Introduce the (outside-of-iomgr) pollset API. This CL introduces the public side of this interface. There will need to be an iomgr-private API also, but this will be a per-implementation API and so is not covered here. I've taken care of wiring the interface through the codebase in the manner that I expect it will be used. Change on 2014/12/17 by ctiller ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=82376987 --- src/core/channel/call_op_string.c | 2 +- src/core/channel/channel_stack.h | 3 +++ src/core/channel/client_channel.c | 3 +++ src/core/channel/connected_channel.c | 1 + src/core/iomgr/endpoint.c | 4 +++ src/core/iomgr/endpoint.h | 6 +++++ src/core/iomgr/pollset.c | 37 +++++++++++++++++++++++++ src/core/iomgr/pollset.h | 51 +++++++++++++++++++++++++++++++++++ src/core/iomgr/tcp_posix.c | 11 +++++--- src/core/security/secure_endpoint.c | 9 ++++++- src/core/surface/call.c | 2 ++ src/core/surface/completion_queue.c | 8 ++++++ src/core/surface/completion_queue.h | 3 +++ src/core/surface/server.c | 2 ++ src/core/transport/chttp2_transport.c | 13 +++++++-- src/core/transport/transport.c | 5 ++++ src/core/transport/transport.h | 12 ++++++--- src/core/transport/transport_impl.h | 3 +++ 18 files changed, 164 insertions(+), 11 deletions(-) create mode 100644 src/core/iomgr/pollset.c create mode 100644 src/core/iomgr/pollset.h (limited to 'src/core') diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c index 4a98cbfbbb..9a7838ce2f 100644 --- a/src/core/channel/call_op_string.c +++ b/src/core/channel/call_op_string.c @@ -107,7 +107,7 @@ char *grpc_call_op_string(grpc_call_op *op) { op->data.deadline.tv_nsec); break; case GRPC_SEND_START: - bprintf(&b, "SEND_START"); + bprintf(&b, "SEND_START pollset=%p", op->data.start.pollset); break; case GRPC_SEND_MESSAGE: bprintf(&b, "SEND_MESSAGE"); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index b837caf97b..14e972f539 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -108,6 +108,9 @@ typedef struct { /* Argument data, matching up with grpc_call_op_type names */ union { + struct { + grpc_pollset *pollset; + } start; grpc_byte_buffer *message; grpc_mdelem *metadata; gpr_timespec deadline; diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index a146a7b6a0..6329932330 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -98,6 +98,7 @@ struct call_data { void (*on_complete)(void *user_data, grpc_op_error error); void *on_complete_user_data; gpr_uint32 start_flags; + grpc_pollset *pollset; } waiting; } s; }; @@ -186,6 +187,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { calld->s.waiting.on_complete = op->done_cb; calld->s.waiting.on_complete_user_data = op->user_data; calld->s.waiting.start_flags = op->flags; + calld->s.waiting.pollset = op->data.start.pollset; chand->waiting_children[chand->waiting_child_count++] = calld; gpr_mu_unlock(&chand->mu); @@ -523,6 +525,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete; call_ops[i].user_data = waiting_children[i]->s.waiting.on_complete_user_data; + call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 8581fb41d6..5faa03c2f4 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -132,6 +132,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, op->user_data); break; case GRPC_SEND_START: + grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset); grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb); end_bufferable_op(op, chand, calld, 0); break; diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 259c948720..f1944bf672 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -44,6 +44,10 @@ grpc_endpoint_write_status grpc_endpoint_write( return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline); } +void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { + ep->vtable->add_to_pollset(ep, pollset); +} + void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index 88949fb03f..bbd800bea8 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -34,6 +34,7 @@ #ifndef __GRPC_INTERNAL_IOMGR_ENDPOINT_H__ #define __GRPC_INTERNAL_IOMGR_ENDPOINT_H__ +#include "src/core/iomgr/pollset.h" #include #include @@ -69,6 +70,7 @@ struct grpc_endpoint_vtable { grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline); + void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); }; @@ -92,6 +94,10 @@ grpc_endpoint_write_status grpc_endpoint_write( void grpc_endpoint_shutdown(grpc_endpoint *ep); void grpc_endpoint_destroy(grpc_endpoint *ep); +/* Add an endpoint to a pollset, so that when the pollset is polled, events from + this endpoint are considered */ +void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset); + struct grpc_endpoint { const grpc_endpoint_vtable *vtable; }; diff --git a/src/core/iomgr/pollset.c b/src/core/iomgr/pollset.c new file mode 100644 index 0000000000..62a0019eb3 --- /dev/null +++ b/src/core/iomgr/pollset.c @@ -0,0 +1,37 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/pollset.h" + +void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; } +void grpc_pollset_destroy(grpc_pollset *pollset) {} diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h new file mode 100644 index 0000000000..ba1a9d5429 --- /dev/null +++ b/src/core/iomgr/pollset.h @@ -0,0 +1,51 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_ +#define __GRPC_INTERNAL_IOMGR_POLLSET_H_ + +/* A grpc_pollset is a set of file descriptors that a higher level item is + interested in. For example: + - a server will typically keep a pollset containing all connected channels, + so that it can find new calls to service + - a completion queue might keep a pollset with an entry for each transport + that is servicing a call that it's tracking */ +/* Eventually different implementations of iomgr will provide their own + grpc_pollset structs. As this is just a dummy wrapper to get the API in, + we just define a simple type here. */ +typedef struct { char unused; } grpc_pollset; + +void grpc_pollset_init(grpc_pollset *pollset); +void grpc_pollset_destroy(grpc_pollset *pollset); + +#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */ diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index b59e916e13..bc3ce69e47 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -538,9 +538,14 @@ static grpc_endpoint_write_status grpc_tcp_write( return status; } -static const grpc_endpoint_vtable vtable = {grpc_tcp_notify_on_read, - grpc_tcp_write, grpc_tcp_shutdown, - grpc_tcp_destroy}; +static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { + /* tickle the pollset so we crash if things aren't wired correctly */ + pollset->unused++; +} + +static const grpc_endpoint_vtable vtable = { + grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, + grpc_tcp_shutdown, grpc_tcp_destroy}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 34be4256a9..cab09ca49d 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -325,8 +325,15 @@ static void endpoint_unref(grpc_endpoint *secure_ep) { secure_endpoint_unref(ep); } +static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, + grpc_pollset *pollset) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset); +} + static const grpc_endpoint_vtable vtable = { - endpoint_notify_on_read, endpoint_write, endpoint_shutdown, endpoint_unref}; + endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset, + endpoint_shutdown, endpoint_unref}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ba4c806602..6270ce640d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -409,6 +409,7 @@ grpc_call_error grpc_call_start_invoke(grpc_call *call, op.dir = GRPC_CALL_DOWN; op.flags = flags; op.done_cb = done_invoke; + op.data.start.pollset = grpc_cq_pollset(cq); op.user_data = call; elem = CALL_ELEM_FROM_CALL(call, 0); @@ -480,6 +481,7 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, op.dir = GRPC_CALL_DOWN; op.flags = flags; op.done_cb = do_nothing; + op.data.start.pollset = grpc_cq_pollset(call->cq); op.user_data = NULL; elem = CALL_ELEM_FROM_CALL(call, 0); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b890f9dab7..4837f5b978 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -66,6 +66,8 @@ struct grpc_completion_queue { /* When refs drops to zero, we are in shutdown mode, and will be destroyable once all queued events are drained */ gpr_refcount refs; + /* the set of low level i/o things that concern this cq */ + grpc_pollset pollset; /* 0 initially, 1 once we've begun shutting down */ int shutdown; /* Head of a linked list of queued events (prev points to the last element) */ @@ -87,6 +89,7 @@ grpc_completion_queue *grpc_completion_queue_create() { memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->refs, 1); + grpc_pollset_init(&cc->pollset); cc->allow_polling = 1; return cc; } @@ -367,6 +370,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GPR_ASSERT(cc->queue == NULL); + grpc_pollset_destroy(&cc->pollset); gpr_free(cc); } @@ -392,3 +396,7 @@ void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) { gpr_log(GPR_INFO, "pending ops:%s", tmp); #endif } + +grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { + return &cc->pollset; +} diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 8f3d34ab31..2e752a3fe0 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -36,6 +36,7 @@ /* Internal API for completion channels */ +#include "src/core/iomgr/pollset.h" #include /* A finish func is executed whenever the event consumer calls @@ -101,4 +102,6 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc); void grpc_cq_dump_pending_ops(grpc_completion_queue *cc); +grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); + #endif /* __GRPC_INTERNAL_SURFACE_COMPLETION_QUEUE_H__ */ diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 9eaa3c0602..3829e7aa8f 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -491,6 +491,8 @@ grpc_transport_setup_result grpc_server_setup_transport( } filters[i] = &grpc_connected_channel_filter; + grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq)); + channel = grpc_channel_create_from_filters(filters, num_filters, s->channel_args, mdctx, 0); chand = (channel_data *)grpc_channel_stack_element( diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 0570439813..a8ae8cc5bc 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1696,14 +1696,23 @@ static void run_callbacks(transport *t) { } } +static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { + transport *t = (transport *)gt; + lock(t); + if (t->ep) { + grpc_endpoint_add_to_pollset(t->ep, pollset); + } + unlock(t); +} + /* * INTEGRATION GLUE */ static const grpc_transport_vtable vtable = { sizeof(stream), init_stream, send_batch, set_allow_window_updates, - destroy_stream, abort_stream, goaway, close_transport, send_ping, - destroy_transport}; + add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, + send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void *arg, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 1c44abcadf..0ca67acb92 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -66,6 +66,11 @@ void grpc_transport_set_allow_window_updates(grpc_transport *transport, transport->vtable->set_allow_window_updates(transport, stream, allow); } +void grpc_transport_add_to_pollset(grpc_transport *transport, + grpc_pollset *pollset) { + transport->vtable->add_to_pollset(transport, pollset); +} + void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream) { transport->vtable->destroy_stream(transport, stream); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 6a089a2a15..00dacbf5b9 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -36,6 +36,7 @@ #include +#include "src/core/iomgr/pollset.h" #include "src/core/transport/stream_op.h" /* forward declarations */ @@ -202,15 +203,18 @@ void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data), void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream, grpc_status_code status); +void grpc_transport_add_to_pollset(grpc_transport *transport, + grpc_pollset *pollset); + /* Advise peer of pending connection termination. */ -void grpc_transport_goaway(struct grpc_transport *transport, - grpc_status_code status, gpr_slice debug_data); +void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, + gpr_slice debug_data); /* Close a transport. Aborts all open streams. */ -void grpc_transport_close(struct grpc_transport *transport); +void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ -void grpc_transport_destroy(struct grpc_transport *transport); +void grpc_transport_destroy(grpc_transport *transport); /* Return type for grpc_transport_setup_callback */ typedef struct grpc_transport_setup_result { diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 328ead2872..9f497b9cba 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -53,6 +53,9 @@ typedef struct grpc_transport_vtable { void (*set_allow_window_updates)(grpc_transport *self, grpc_stream *stream, int allow); + /* implementation of grpc_transport_add_to_pollset */ + void (*add_to_pollset)(grpc_transport *self, grpc_pollset *pollset); + /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_transport *self, grpc_stream *stream); -- cgit v1.2.3