diff options
41 files changed, 913 insertions, 1593 deletions
@@ -391,7 +391,6 @@ qps_client: bins/$(CONFIG)/qps_client qps_server: bins/$(CONFIG)/qps_server ruby_plugin: bins/$(CONFIG)/ruby_plugin status_test: bins/$(CONFIG)/status_test -sync_client_async_server_test: bins/$(CONFIG)/sync_client_async_server_test thread_pool_test: bins/$(CONFIG)/thread_pool_test tips_client: bins/$(CONFIG)/tips_client tips_publisher_test: bins/$(CONFIG)/tips_publisher_test @@ -725,7 +724,7 @@ buildtests: buildtests_c buildtests_cxx buildtests_c: privatelibs_c bins/$(CONFIG)/alarm_heap_test bins/$(CONFIG)/alarm_list_test bins/$(CONFIG)/alarm_test bins/$(CONFIG)/alpn_test bins/$(CONFIG)/bin_encoder_test bins/$(CONFIG)/census_hash_table_test bins/$(CONFIG)/census_statistics_multiple_writers_circular_buffer_test bins/$(CONFIG)/census_statistics_multiple_writers_test bins/$(CONFIG)/census_statistics_performance_test bins/$(CONFIG)/census_statistics_quick_test bins/$(CONFIG)/census_statistics_small_log_test bins/$(CONFIG)/census_stub_test bins/$(CONFIG)/census_window_stats_test bins/$(CONFIG)/chttp2_status_conversion_test bins/$(CONFIG)/chttp2_stream_encoder_test bins/$(CONFIG)/chttp2_stream_map_test bins/$(CONFIG)/chttp2_transport_end2end_test bins/$(CONFIG)/dualstack_socket_test bins/$(CONFIG)/echo_client bins/$(CONFIG)/echo_server bins/$(CONFIG)/echo_test bins/$(CONFIG)/fd_posix_test bins/$(CONFIG)/fling_client bins/$(CONFIG)/fling_server bins/$(CONFIG)/fling_stream_test bins/$(CONFIG)/fling_test bins/$(CONFIG)/gpr_cancellable_test bins/$(CONFIG)/gpr_cmdline_test bins/$(CONFIG)/gpr_env_test bins/$(CONFIG)/gpr_file_test bins/$(CONFIG)/gpr_histogram_test bins/$(CONFIG)/gpr_host_port_test bins/$(CONFIG)/gpr_log_test bins/$(CONFIG)/gpr_slice_buffer_test bins/$(CONFIG)/gpr_slice_test bins/$(CONFIG)/gpr_string_test bins/$(CONFIG)/gpr_sync_test bins/$(CONFIG)/gpr_thd_test bins/$(CONFIG)/gpr_time_test bins/$(CONFIG)/gpr_useful_test bins/$(CONFIG)/grpc_base64_test bins/$(CONFIG)/grpc_byte_buffer_reader_test bins/$(CONFIG)/grpc_channel_stack_test bins/$(CONFIG)/grpc_completion_queue_test bins/$(CONFIG)/grpc_credentials_test bins/$(CONFIG)/grpc_json_token_test bins/$(CONFIG)/grpc_stream_op_test bins/$(CONFIG)/hpack_parser_test bins/$(CONFIG)/hpack_table_test bins/$(CONFIG)/httpcli_format_request_test bins/$(CONFIG)/httpcli_parser_test bins/$(CONFIG)/httpcli_test bins/$(CONFIG)/json_rewrite bins/$(CONFIG)/json_rewrite_test bins/$(CONFIG)/json_test bins/$(CONFIG)/lame_client_test bins/$(CONFIG)/message_compress_test bins/$(CONFIG)/metadata_buffer_test bins/$(CONFIG)/murmur_hash_test bins/$(CONFIG)/no_server_test bins/$(CONFIG)/poll_kick_posix_test bins/$(CONFIG)/resolve_address_test bins/$(CONFIG)/secure_endpoint_test bins/$(CONFIG)/sockaddr_utils_test bins/$(CONFIG)/tcp_client_posix_test bins/$(CONFIG)/tcp_posix_test bins/$(CONFIG)/tcp_server_posix_test bins/$(CONFIG)/time_averaged_stats_test bins/$(CONFIG)/time_test bins/$(CONFIG)/timeout_encoding_test bins/$(CONFIG)/transport_metadata_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_accept_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_invoke_test bins/$(CONFIG)/chttp2_fake_security_cancel_before_invoke_test bins/$(CONFIG)/chttp2_fake_security_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_fake_security_census_simple_request_test bins/$(CONFIG)/chttp2_fake_security_disappearing_server_test bins/$(CONFIG)/chttp2_fake_security_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_fake_security_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_fake_security_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_fake_security_invoke_large_request_test bins/$(CONFIG)/chttp2_fake_security_max_concurrent_streams_test bins/$(CONFIG)/chttp2_fake_security_no_op_test bins/$(CONFIG)/chttp2_fake_security_ping_pong_streaming_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_payload_test bins/$(CONFIG)/chttp2_fake_security_request_with_large_metadata_test bins/$(CONFIG)/chttp2_fake_security_request_with_payload_test bins/$(CONFIG)/chttp2_fake_security_simple_delayed_request_test bins/$(CONFIG)/chttp2_fake_security_simple_request_test bins/$(CONFIG)/chttp2_fake_security_thread_stress_test bins/$(CONFIG)/chttp2_fake_security_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_fake_security_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_fake_security_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_fake_security_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_fake_security_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_fake_security_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_fake_security_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_fake_security_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_fake_security_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_fake_security_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_fake_security_no_op_legacy_test bins/$(CONFIG)/chttp2_fake_security_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_fake_security_simple_request_legacy_test bins/$(CONFIG)/chttp2_fake_security_thread_stress_legacy_test bins/$(CONFIG)/chttp2_fake_security_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_accept_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_invoke_test bins/$(CONFIG)/chttp2_fullstack_cancel_before_invoke_test bins/$(CONFIG)/chttp2_fullstack_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_fullstack_census_simple_request_test bins/$(CONFIG)/chttp2_fullstack_disappearing_server_test bins/$(CONFIG)/chttp2_fullstack_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_fullstack_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_fullstack_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_fullstack_invoke_large_request_test bins/$(CONFIG)/chttp2_fullstack_max_concurrent_streams_test bins/$(CONFIG)/chttp2_fullstack_no_op_test bins/$(CONFIG)/chttp2_fullstack_ping_pong_streaming_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_payload_test bins/$(CONFIG)/chttp2_fullstack_request_with_large_metadata_test bins/$(CONFIG)/chttp2_fullstack_request_with_payload_test bins/$(CONFIG)/chttp2_fullstack_simple_delayed_request_test bins/$(CONFIG)/chttp2_fullstack_simple_request_test bins/$(CONFIG)/chttp2_fullstack_thread_stress_test bins/$(CONFIG)/chttp2_fullstack_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_fullstack_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_fullstack_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_fullstack_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_fullstack_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_fullstack_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_fullstack_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_fullstack_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_fullstack_no_op_legacy_test bins/$(CONFIG)/chttp2_fullstack_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_fullstack_simple_request_legacy_test bins/$(CONFIG)/chttp2_fullstack_thread_stress_legacy_test bins/$(CONFIG)/chttp2_fullstack_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_accept_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_invoke_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_before_invoke_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_census_simple_request_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_disappearing_server_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_invoke_large_request_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_max_concurrent_streams_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_no_op_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_ping_pong_streaming_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_payload_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_with_large_metadata_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_with_payload_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_simple_delayed_request_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_simple_request_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_thread_stress_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_no_op_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_simple_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_thread_stress_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_accept_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_invoke_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_before_invoke_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_census_simple_request_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_disappearing_server_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_invoke_large_request_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_max_concurrent_streams_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_no_op_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_ping_pong_streaming_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_payload_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_with_large_metadata_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_with_payload_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_simple_delayed_request_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_simple_request_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_thread_stress_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_no_op_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_simple_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_thread_stress_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_accept_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_invoke_test bins/$(CONFIG)/chttp2_socket_pair_cancel_before_invoke_test bins/$(CONFIG)/chttp2_socket_pair_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_socket_pair_census_simple_request_test bins/$(CONFIG)/chttp2_socket_pair_disappearing_server_test bins/$(CONFIG)/chttp2_socket_pair_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_socket_pair_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_socket_pair_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_socket_pair_invoke_large_request_test bins/$(CONFIG)/chttp2_socket_pair_max_concurrent_streams_test bins/$(CONFIG)/chttp2_socket_pair_no_op_test bins/$(CONFIG)/chttp2_socket_pair_ping_pong_streaming_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_payload_test bins/$(CONFIG)/chttp2_socket_pair_request_with_large_metadata_test bins/$(CONFIG)/chttp2_socket_pair_request_with_payload_test bins/$(CONFIG)/chttp2_socket_pair_simple_delayed_request_test bins/$(CONFIG)/chttp2_socket_pair_simple_request_test bins/$(CONFIG)/chttp2_socket_pair_thread_stress_test bins/$(CONFIG)/chttp2_socket_pair_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_socket_pair_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_socket_pair_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_socket_pair_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_socket_pair_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_socket_pair_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_socket_pair_no_op_legacy_test bins/$(CONFIG)/chttp2_socket_pair_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_simple_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_thread_stress_legacy_test bins/$(CONFIG)/chttp2_socket_pair_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_accept_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_invoke_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_before_invoke_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_census_simple_request_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_disappearing_server_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_invoke_large_request_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_max_concurrent_streams_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_no_op_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_ping_pong_streaming_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_payload_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_with_large_metadata_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_with_payload_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_simple_delayed_request_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_simple_request_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_thread_stress_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_no_op_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_simple_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_thread_stress_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_writes_done_hangs_with_pending_read_legacy_test -buildtests_cxx: privatelibs_cxx bins/$(CONFIG)/channel_arguments_test bins/$(CONFIG)/credentials_test bins/$(CONFIG)/end2end_test bins/$(CONFIG)/interop_client bins/$(CONFIG)/interop_server bins/$(CONFIG)/qps_client bins/$(CONFIG)/qps_server bins/$(CONFIG)/status_test bins/$(CONFIG)/sync_client_async_server_test bins/$(CONFIG)/thread_pool_test bins/$(CONFIG)/tips_client bins/$(CONFIG)/tips_publisher_test bins/$(CONFIG)/tips_subscriber_test +buildtests_cxx: privatelibs_cxx bins/$(CONFIG)/channel_arguments_test bins/$(CONFIG)/credentials_test bins/$(CONFIG)/end2end_test bins/$(CONFIG)/interop_client bins/$(CONFIG)/interop_server bins/$(CONFIG)/qps_client bins/$(CONFIG)/qps_server bins/$(CONFIG)/status_test bins/$(CONFIG)/thread_pool_test bins/$(CONFIG)/tips_client bins/$(CONFIG)/tips_publisher_test bins/$(CONFIG)/tips_subscriber_test test: test_c test_cxx @@ -1439,8 +1438,6 @@ test_cxx: buildtests_cxx $(Q) ./bins/$(CONFIG)/qps_server || ( echo test qps_server failed ; exit 1 ) $(E) "[RUN] Testing status_test" $(Q) ./bins/$(CONFIG)/status_test || ( echo test status_test failed ; exit 1 ) - $(E) "[RUN] Testing sync_client_async_server_test" - $(Q) ./bins/$(CONFIG)/sync_client_async_server_test || ( echo test sync_client_async_server_test failed ; exit 1 ) $(E) "[RUN] Testing thread_pool_test" $(Q) ./bins/$(CONFIG)/thread_pool_test || ( echo test thread_pool_test failed ; exit 1 ) $(E) "[RUN] Testing tips_publisher_test" @@ -2611,27 +2608,23 @@ LIBGRPC++_SRC = \ src/cpp/client/channel.cc \ src/cpp/client/channel_arguments.cc \ src/cpp/client/client_context.cc \ + src/cpp/client/client_unary_call.cc \ src/cpp/client/create_channel.cc \ src/cpp/client/credentials.cc \ src/cpp/client/internal_stub.cc \ + src/cpp/common/call.cc \ src/cpp/common/completion_queue.cc \ src/cpp/common/rpc_method.cc \ src/cpp/proto/proto_utils.cc \ - src/cpp/server/async_server.cc \ - src/cpp/server/async_server_context.cc \ src/cpp/server/server.cc \ src/cpp/server/server_builder.cc \ src/cpp/server/server_context_impl.cc \ src/cpp/server/server_credentials.cc \ - src/cpp/server/server_rpc_handler.cc \ src/cpp/server/thread_pool.cc \ - src/cpp/stream/stream_context.cc \ src/cpp/util/status.cc \ src/cpp/util/time.cc \ PUBLIC_HEADERS_CXX += \ - include/grpc++/async_server.h \ - include/grpc++/async_server_context.h \ include/grpc++/channel_arguments.h \ include/grpc++/channel_interface.h \ include/grpc++/client_context.h \ @@ -2639,6 +2632,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/config.h \ include/grpc++/create_channel.h \ include/grpc++/credentials.h \ + include/grpc++/impl/client_unary_call.h \ include/grpc++/impl/internal_stub.h \ include/grpc++/impl/rpc_method.h \ include/grpc++/impl/rpc_service_method.h \ @@ -2670,21 +2664,19 @@ ifneq ($(OPENSSL_DEP),) src/cpp/client/channel.cc: $(OPENSSL_DEP) src/cpp/client/channel_arguments.cc: $(OPENSSL_DEP) src/cpp/client/client_context.cc: $(OPENSSL_DEP) +src/cpp/client/client_unary_call.cc: $(OPENSSL_DEP) src/cpp/client/create_channel.cc: $(OPENSSL_DEP) src/cpp/client/credentials.cc: $(OPENSSL_DEP) src/cpp/client/internal_stub.cc: $(OPENSSL_DEP) +src/cpp/common/call.cc: $(OPENSSL_DEP) src/cpp/common/completion_queue.cc: $(OPENSSL_DEP) src/cpp/common/rpc_method.cc: $(OPENSSL_DEP) src/cpp/proto/proto_utils.cc: $(OPENSSL_DEP) -src/cpp/server/async_server.cc: $(OPENSSL_DEP) -src/cpp/server/async_server_context.cc: $(OPENSSL_DEP) src/cpp/server/server.cc: $(OPENSSL_DEP) src/cpp/server/server_builder.cc: $(OPENSSL_DEP) src/cpp/server/server_context_impl.cc: $(OPENSSL_DEP) src/cpp/server/server_credentials.cc: $(OPENSSL_DEP) -src/cpp/server/server_rpc_handler.cc: $(OPENSSL_DEP) src/cpp/server/thread_pool.cc: $(OPENSSL_DEP) -src/cpp/stream/stream_context.cc: $(OPENSSL_DEP) src/cpp/util/status.cc: $(OPENSSL_DEP) src/cpp/util/time.cc: $(OPENSSL_DEP) endif @@ -2730,21 +2722,19 @@ endif objs/$(CONFIG)/src/cpp/client/channel.o: objs/$(CONFIG)/src/cpp/client/channel_arguments.o: objs/$(CONFIG)/src/cpp/client/client_context.o: +objs/$(CONFIG)/src/cpp/client/client_unary_call.o: objs/$(CONFIG)/src/cpp/client/create_channel.o: objs/$(CONFIG)/src/cpp/client/credentials.o: objs/$(CONFIG)/src/cpp/client/internal_stub.o: +objs/$(CONFIG)/src/cpp/common/call.o: objs/$(CONFIG)/src/cpp/common/completion_queue.o: objs/$(CONFIG)/src/cpp/common/rpc_method.o: objs/$(CONFIG)/src/cpp/proto/proto_utils.o: -objs/$(CONFIG)/src/cpp/server/async_server.o: -objs/$(CONFIG)/src/cpp/server/async_server_context.o: objs/$(CONFIG)/src/cpp/server/server.o: objs/$(CONFIG)/src/cpp/server/server_builder.o: objs/$(CONFIG)/src/cpp/server/server_context_impl.o: objs/$(CONFIG)/src/cpp/server/server_credentials.o: -objs/$(CONFIG)/src/cpp/server/server_rpc_handler.o: objs/$(CONFIG)/src/cpp/server/thread_pool.o: -objs/$(CONFIG)/src/cpp/stream/stream_context.o: objs/$(CONFIG)/src/cpp/util/status.o: objs/$(CONFIG)/src/cpp/util/time.o: @@ -2753,7 +2743,6 @@ LIBGRPC++_TEST_UTIL_SRC = \ gens/test/cpp/util/messages.pb.cc \ gens/test/cpp/util/echo.pb.cc \ gens/test/cpp/util/echo_duplicate.pb.cc \ - test/cpp/end2end/async_test_server.cc \ test/cpp/util/create_test_channel.cc \ @@ -2772,7 +2761,6 @@ ifneq ($(OPENSSL_DEP),) test/cpp/util/messages.proto: $(OPENSSL_DEP) test/cpp/util/echo.proto: $(OPENSSL_DEP) test/cpp/util/echo_duplicate.proto: $(OPENSSL_DEP) -test/cpp/end2end/async_test_server.cc: $(OPENSSL_DEP) test/cpp/util/create_test_channel.cc: $(OPENSSL_DEP) endif @@ -2800,7 +2788,6 @@ endif -objs/$(CONFIG)/test/cpp/end2end/async_test_server.o: gens/test/cpp/util/messages.pb.cc gens/test/cpp/util/echo.pb.cc gens/test/cpp/util/echo_duplicate.pb.cc objs/$(CONFIG)/test/cpp/util/create_test_channel.o: gens/test/cpp/util/messages.pb.cc gens/test/cpp/util/echo.pb.cc gens/test/cpp/util/echo_duplicate.pb.cc @@ -7088,37 +7075,6 @@ endif endif -SYNC_CLIENT_ASYNC_SERVER_TEST_SRC = \ - test/cpp/end2end/sync_client_async_server_test.cc \ - -SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS = $(addprefix objs/$(CONFIG)/, $(addsuffix .o, $(basename $(SYNC_CLIENT_ASYNC_SERVER_TEST_SRC)))) - -ifeq ($(NO_SECURE),true) - -# You can't build secure targets if you don't have OpenSSL with ALPN. - -bins/$(CONFIG)/sync_client_async_server_test: openssl_dep_error - -else - -bins/$(CONFIG)/sync_client_async_server_test: $(SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS) libs/$(CONFIG)/libgrpc++_test_util.a libs/$(CONFIG)/libgrpc_test_util.a libs/$(CONFIG)/libgrpc++.a libs/$(CONFIG)/libgrpc.a libs/$(CONFIG)/libgpr_test_util.a libs/$(CONFIG)/libgpr.a - $(E) "[LD] Linking $@" - $(Q) mkdir -p `dirname $@` - $(Q) $(LDXX) $(LDFLAGS) $(SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS) $(GTEST_LIB) libs/$(CONFIG)/libgrpc++_test_util.a libs/$(CONFIG)/libgrpc_test_util.a libs/$(CONFIG)/libgrpc++.a libs/$(CONFIG)/libgrpc.a libs/$(CONFIG)/libgpr_test_util.a libs/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS) $(LDLIBS_SECURE) -o bins/$(CONFIG)/sync_client_async_server_test - -endif - -objs/$(CONFIG)/test/cpp/end2end/sync_client_async_server_test.o: libs/$(CONFIG)/libgrpc++_test_util.a libs/$(CONFIG)/libgrpc_test_util.a libs/$(CONFIG)/libgrpc++.a libs/$(CONFIG)/libgrpc.a libs/$(CONFIG)/libgpr_test_util.a libs/$(CONFIG)/libgpr.a - -deps_sync_client_async_server_test: $(SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS:.o=.dep) - -ifneq ($(NO_SECURE),true) -ifneq ($(NO_DEPS),true) --include $(SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS:.o=.dep) -endif -endif - - THREAD_POOL_TEST_SRC = \ test/cpp/server/thread_pool_test.cc \ diff --git a/build.json b/build.json index 68110e4702..fdb32ebf71 100644 --- a/build.json +++ b/build.json @@ -378,8 +378,6 @@ "build": "all", "language": "c++", "public_headers": [ - "include/grpc++/async_server.h", - "include/grpc++/async_server_context.h", "include/grpc++/channel_arguments.h", "include/grpc++/channel_interface.h", "include/grpc++/client_context.h", @@ -387,6 +385,7 @@ "include/grpc++/config.h", "include/grpc++/create_channel.h", "include/grpc++/credentials.h", + "include/grpc++/impl/client_unary_call.h", "include/grpc++/impl/internal_stub.h", "include/grpc++/impl/rpc_method.h", "include/grpc++/impl/rpc_service_method.h", @@ -401,30 +400,26 @@ "headers": [ "src/cpp/client/channel.h", "src/cpp/proto/proto_utils.h", - "src/cpp/server/server_rpc_handler.h", "src/cpp/server/thread_pool.h", - "src/cpp/stream/stream_context.h", "src/cpp/util/time.h" ], "src": [ "src/cpp/client/channel.cc", "src/cpp/client/channel_arguments.cc", "src/cpp/client/client_context.cc", + "src/cpp/client/client_unary_call.cc", "src/cpp/client/create_channel.cc", "src/cpp/client/credentials.cc", "src/cpp/client/internal_stub.cc", + "src/cpp/common/call.cc", "src/cpp/common/completion_queue.cc", "src/cpp/common/rpc_method.cc", "src/cpp/proto/proto_utils.cc", - "src/cpp/server/async_server.cc", - "src/cpp/server/async_server_context.cc", "src/cpp/server/server.cc", "src/cpp/server/server_builder.cc", "src/cpp/server/server_context_impl.cc", "src/cpp/server/server_credentials.cc", - "src/cpp/server/server_rpc_handler.cc", "src/cpp/server/thread_pool.cc", - "src/cpp/stream/stream_context.cc", "src/cpp/util/status.cc", "src/cpp/util/time.cc" ], @@ -442,7 +437,6 @@ "test/cpp/util/messages.proto", "test/cpp/util/echo.proto", "test/cpp/util/echo_duplicate.proto", - "test/cpp/end2end/async_test_server.cc", "test/cpp/util/create_test_channel.cc" ] }, @@ -1680,22 +1674,6 @@ ] }, { - "name": "sync_client_async_server_test", - "build": "test", - "language": "c++", - "src": [ - "test/cpp/end2end/sync_client_async_server_test.cc" - ], - "deps": [ - "grpc++_test_util", - "grpc_test_util", - "grpc++", - "grpc", - "gpr_test_util", - "gpr" - ] - }, - { "name": "thread_pool_test", "build": "test", "language": "c++", diff --git a/examples/tips/publisher_test.cc b/examples/tips/publisher_test.cc index 34737ae6ed..db3e3784da 100644 --- a/examples/tips/publisher_test.cc +++ b/examples/tips/publisher_test.cc @@ -107,7 +107,7 @@ class PublisherTest : public ::testing::Test { server_address_ << "localhost:" << port; ServerBuilder builder; builder.AddPort(server_address_.str()); - builder.RegisterService(service_.service()); + builder.RegisterService(&service_); server_ = builder.BuildAndStart(); channel_ = CreateChannel(server_address_.str(), ChannelArguments()); diff --git a/examples/tips/subscriber_test.cc b/examples/tips/subscriber_test.cc index fda8909a02..736e6da319 100644 --- a/examples/tips/subscriber_test.cc +++ b/examples/tips/subscriber_test.cc @@ -106,7 +106,7 @@ class SubscriberTest : public ::testing::Test { server_address_ << "localhost:" << port; ServerBuilder builder; builder.AddPort(server_address_.str()); - builder.RegisterService(service_.service()); + builder.RegisterService(&service_); server_ = builder.BuildAndStart(); channel_ = CreateChannel(server_address_.str(), ChannelArguments()); diff --git a/include/grpc++/async_server_context.h b/include/grpc++/async_server_context.h deleted file mode 100644 index c038286ac1..0000000000 --- a/include/grpc++/async_server_context.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPCPP_ASYNC_SERVER_CONTEXT_H__ -#define __GRPCPP_ASYNC_SERVER_CONTEXT_H__ - -#include <chrono> - -#include <grpc++/config.h> - -struct grpc_byte_buffer; -struct grpc_call; -struct grpc_completion_queue; - -namespace google { -namespace protobuf { -class Message; -} -} - -using std::chrono::system_clock; - -namespace grpc { -class Status; - -// TODO(rocking): wrap grpc c structures. -class AsyncServerContext { - public: - AsyncServerContext(grpc_call* call, const grpc::string& method, - const grpc::string& host, - system_clock::time_point absolute_deadline); - ~AsyncServerContext(); - - // Accept this rpc, bind it to a completion queue. - void Accept(grpc_completion_queue* cq); - - // Read and write calls, all async. Return true for success. - bool StartRead(google::protobuf::Message* request); - bool StartWrite(const google::protobuf::Message& response, int flags); - bool StartWriteStatus(const Status& status); - - bool ParseRead(grpc_byte_buffer* read_buffer); - - grpc::string method() const { return method_; } - grpc::string host() const { return host_; } - system_clock::time_point absolute_deadline() { return absolute_deadline_; } - - grpc_call* call() { return call_; } - - private: - AsyncServerContext(const AsyncServerContext&); - AsyncServerContext& operator=(const AsyncServerContext&); - - // These properties may be moved to a ServerContext class. - const grpc::string method_; - const grpc::string host_; - system_clock::time_point absolute_deadline_; - - google::protobuf::Message* request_; // not owned - grpc_call* call_; // owned -}; - -} // namespace grpc - -#endif // __GRPCPP_ASYNC_SERVER_CONTEXT_H__ diff --git a/test/cpp/end2end/async_test_server.h b/include/grpc++/call.h index a277061ace..de789febe6 100644 --- a/test/cpp/end2end/async_test_server.h +++ b/include/grpc++/call.h @@ -31,45 +31,73 @@ * */ -#ifndef __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__ -#define __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__ +#ifndef __GRPCPP_CALL_H__ +#define __GRPCPP_CALL_H__ -#include <condition_variable> -#include <mutex> -#include <string> - -#include <grpc++/async_server.h> +#include <grpc++/status.h> #include <grpc++/completion_queue.h> +#include <memory> +#include <vector> + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +struct grpc_call; +struct grpc_op; + namespace grpc { -namespace testing { +class ChannelInterface; + +class CallOpBuffer final : public CompletionQueueTag { + public: + CallOpBuffer() : return_tag_(this) {} + + void Reset(void *next_return_tag); + + void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata); + void AddSendMessage(const google::protobuf::Message &message); + void AddRecvMessage(google::protobuf::Message *message); + void AddClientSendClose(); + void AddClientRecvStatus(Status *status); -class AsyncTestServer { + // INTERNAL API: + + // Convert to an array of grpc_op elements + void FillOps(grpc_op *ops, size_t *nops); + + // Called by completion queue just prior to returning from Next() or Pluck() + void FinalizeResult(void *tag, bool *status) override; + + private: + void *return_tag_; +}; + +class CCallDeleter { public: - AsyncTestServer(); - virtual ~AsyncTestServer(); + void operator()(grpc_call *c); +}; - void AddPort(const grpc::string& addr); - void Start(); - void RequestOneRpc(); - virtual void MainLoop(); - void Shutdown(); +// Straightforward wrapping of the C call object +class Call final { + public: + Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq); - CompletionQueue* completion_queue() { return &cq_; } + void PerformOps(CallOpBuffer *buffer); - protected: - void HandleQueueClosed(); + grpc_call *call() { return call_.get(); } + CompletionQueue *cq() { return cq_; } private: - CompletionQueue cq_; - AsyncServer server_; - bool cq_drained_; - std::mutex cq_drained_mu_; - std::condition_variable cq_drained_cv_; + ChannelInterface *channel_; + CompletionQueue *cq_; + std::unique_ptr<grpc_call, CCallDeleter> call_; }; -} // namespace testing } // namespace grpc -#endif // __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__ +#endif // __GRPCPP_CALL_INTERFACE_H__ diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h index 9ed35422b8..3631ea4d5d 100644 --- a/include/grpc++/channel_interface.h +++ b/include/grpc++/channel_interface.h @@ -39,28 +39,26 @@ namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google -namespace grpc { +struct grpc_call; +namespace grpc { +class Call; +class CallOpBuffer; class ClientContext; +class CompletionQueue; class RpcMethod; -class StreamContextInterface; +class CallInterface; class ChannelInterface { public: virtual ~ChannelInterface() {} - virtual Status StartBlockingRpc(const RpcMethod& method, - ClientContext* context, - const google::protobuf::Message& request, - google::protobuf::Message* result) = 0; - - virtual StreamContextInterface* CreateStream( - const RpcMethod& method, ClientContext* context, - const google::protobuf::Message* request, - google::protobuf::Message* result) = 0; + virtual Call CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) = 0; + virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0; }; } // namespace grpc diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 72f6253f8e..c976bd5b45 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -34,51 +34,66 @@ #ifndef __GRPCPP_COMPLETION_QUEUE_H__ #define __GRPCPP_COMPLETION_QUEUE_H__ +#include <grpc++/impl/client_unary_call.h> + struct grpc_completion_queue; namespace grpc { +template <class R> +class ClientReader; +template <class W> +class ClientWriter; +template <class R, class W> +class ClientReaderWriter; +template <class R> +class ServerReader; +template <class W> +class ServerWriter; +template <class R, class W> +class ServerReaderWriter; + +class CompletionQueue; + +class CompletionQueueTag { + public: + // Called prior to returning from Next(), return value + // is the status of the operation (return status is the default thing + // to do) + virtual void FinalizeResult(void *tag, bool *status) = 0; +}; + // grpc_completion_queue wrapper class class CompletionQueue { public: CompletionQueue(); ~CompletionQueue(); - enum CompletionType { - QUEUE_CLOSED = 0, // Shutting down. - RPC_END = 1, // An RPC finished. Either at client or server. - CLIENT_READ_OK = 2, // A client-side read has finished successfully. - CLIENT_READ_ERROR = 3, // A client-side read has finished with error. - CLIENT_WRITE_OK = 4, - CLIENT_WRITE_ERROR = 5, - SERVER_RPC_NEW = 6, // A new RPC just arrived at the server. - SERVER_READ_OK = 7, // A server-side read has finished successfully. - SERVER_READ_ERROR = 8, // A server-side read has finished with error. - SERVER_WRITE_OK = 9, - SERVER_WRITE_ERROR = 10, - // Client or server has sent half close successfully. - HALFCLOSE_OK = 11, - // New CompletionTypes may be added in the future, so user code should - // always - // handle the default case of a CompletionType that appears after such code - // was - // written. - DO_NOT_USE = 20, - }; - // Blocking read from queue. - // For QUEUE_CLOSED, *tag is not changed. - // For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext. - // For others, *tag will be the AsyncServerContext of this rpc. - CompletionType Next(void** tag); + // Returns true if an event was received, false if the queue is ready + // for destruction. + bool Next(void **tag, bool *ok); // Shutdown has to be called, and the CompletionQueue can only be - // destructed when the QUEUE_CLOSED message has been read with Next(). + // destructed when false is returned from Next(). void Shutdown(); grpc_completion_queue* cq() { return cq_; } private: + template <class R> friend class ::grpc::ClientReader; + template <class W> friend class ::grpc::ClientWriter; + template <class R, class W> friend class ::grpc::ClientReaderWriter; + template <class R> friend class ::grpc::ServerReader; + template <class W> friend class ::grpc::ServerWriter; + template <class R, class W> friend class ::grpc::ServerReaderWriter; + friend Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result); + + bool Pluck(CompletionQueueTag *tag); + grpc_completion_queue* cq_; // owned }; diff --git a/include/grpc++/config.h b/include/grpc++/config.h index 52913fbf0f..1b4b463d35 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/config.h @@ -39,6 +39,7 @@ namespace grpc { typedef std::string string; -} + +} // namespace grpc #endif // __GRPCPP_CONFIG_H__ diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h new file mode 100644 index 0000000000..091430b884 --- /dev/null +++ b/include/grpc++/impl/client_unary_call.h @@ -0,0 +1,67 @@ +/* +* +* Copyright 2014, Google Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are +* met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following disclaimer +* in the documentation and/or other materials provided with the +* distribution. +* * Neither the name of Google Inc. nor the names of its +* contributors may be used to endorse or promote products derived from +* this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +* +*/ + +#ifndef __GRPCPP_CLIENT_UNARY_CALL_H__ +#define __GRPCPP_CLIENT_UNARY_CALL_H__ + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace grpc { + +class ChannelInterface; +class ClientContext; +class CompletionQueue; +class RpcMethod; +class Status; + +// Wrapper that begins an asynchronous unary call +void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result, Status *status, + CompletionQueue *cq, void *tag); + +// Wrapper that performs a blocking unary call +Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result); + +} // namespace grpc + +#endif + diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h index 75fec356dd..bb16e64c96 100644 --- a/include/grpc++/impl/rpc_method.h +++ b/include/grpc++/impl/rpc_method.h @@ -37,8 +37,8 @@ namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google namespace grpc { diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 620de5e67f..0fb4f79b59 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -55,25 +55,18 @@ class MethodHandler { public: virtual ~MethodHandler() {} struct HandlerParameter { - HandlerParameter(ServerContext* context, + HandlerParameter(Call *c, + ServerContext* context, const google::protobuf::Message* req, google::protobuf::Message* resp) - : server_context(context), + : call(c), + server_context(context), request(req), - response(resp), - stream_context(nullptr) {} - HandlerParameter(ServerContext* context, - const google::protobuf::Message* req, - google::protobuf::Message* resp, - StreamContextInterface* stream) - : server_context(context), - request(req), - response(resp), - stream_context(stream) {} + response(resp) {} + Call* call; ServerContext* server_context; const google::protobuf::Message* request; google::protobuf::Message* response; - StreamContextInterface* stream_context; }; virtual Status RunHandler(const HandlerParameter& param) = 0; }; @@ -114,7 +107,7 @@ class ClientStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerReader<RequestType> reader(param.stream_context); + ServerReader<RequestType> reader(param.call); return func_(service_, param.server_context, &reader, dynamic_cast<ResponseType*>(param.response)); } @@ -136,7 +129,7 @@ class ServerStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerWriter<ResponseType> writer(param.stream_context); + ServerWriter<ResponseType> writer(param.call); return func_(service_, param.server_context, dynamic_cast<const RequestType*>(param.request), &writer); } @@ -159,7 +152,7 @@ class BidiStreamingHandler : public MethodHandler { : func_(func), service_(service) {} Status RunHandler(const HandlerParameter& param) final { - ServerReaderWriter<ResponseType, RequestType> stream(param.stream_context); + ServerReaderWriter<ResponseType, RequestType> stream(param.call); return func_(service_, param.server_context, &stream); } @@ -203,7 +196,7 @@ class RpcService { public: // Takes ownership. void AddMethod(RpcServiceMethod* method) { - methods_.push_back(std::unique_ptr<RpcServiceMethod>(method)); + methods_.emplace_back(method); } RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); } diff --git a/include/grpc++/async_server.h b/include/grpc++/impl/service_type.h index fe2c5d9367..0684f322d8 100644 --- a/include/grpc++/async_server.h +++ b/include/grpc++/impl/service_type.h @@ -31,40 +31,25 @@ * */ -#ifndef __GRPCPP_ASYNC_SERVER_H__ -#define __GRPCPP_ASYNC_SERVER_H__ - -#include <mutex> - -#include <grpc++/config.h> - -struct grpc_server; +#ifndef __GRPCPP_IMPL_SERVICE_TYPE_H__ +#define __GRPCPP_IMPL_SERVICE_TYPE_H__ namespace grpc { -class CompletionQueue; - -class AsyncServer { - public: - explicit AsyncServer(CompletionQueue* cc); - ~AsyncServer(); - void AddPort(const grpc::string& addr); +class RpcService; - void Start(); - - // The user has to call this to get one new rpc on the completion - // queue. - void RequestOneRpc(); - - void Shutdown(); +class SynchronousService { + public: + virtual ~SynchronousService() {} + virtual RpcService *service() = 0; +}; - private: - bool started_; - std::mutex shutdown_mu_; - bool shutdown_; - grpc_server* server_; +class AsynchronousService { + public: + virtual ~AsynchronousService() {} + virtual RpcService *service() = 0; }; } // namespace grpc -#endif // __GRPCPP_ASYNC_SERVER_H__ +#endif // __GRPCPP_IMPL_SERVICE_TYPE_H__
\ No newline at end of file diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 5fa371ba62..670ffa7815 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -48,8 +48,8 @@ struct grpc_server; namespace google { namespace protobuf { class Message; -} -} +} // namespace protobuf +} // namespace google namespace grpc { class AsyncServerContext; @@ -70,17 +70,16 @@ class Server { friend class ServerBuilder; // ServerBuilder use only - Server(ThreadPoolInterface* thread_pool, ServerCredentials* creds); + Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds); Server(); // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance. - void RegisterService(RpcService* service); + bool RegisterService(RpcService* service); // Add a listening port. Can be called multiple times. - void AddPort(const grpc::string& addr); + int AddPort(const grpc::string& addr); // Start the server. - void Start(); + bool Start(); - void AllowOneRpc(); void HandleQueueClosed(); void RunRpc(); void ScheduleCallback(); diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index cf27452010..8b4c81bc87 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -41,9 +41,11 @@ namespace grpc { +class AsynchronousService; class RpcService; class Server; class ServerCredentials; +class SynchronousService; class ThreadPoolInterface; class ServerBuilder { @@ -53,7 +55,9 @@ class ServerBuilder { // Register a service. This call does not take ownership of the service. // The service must exist for the lifetime of the Server instance returned by // BuildAndStart(). - void RegisterService(RpcService* service); + void RegisterService(SynchronousService* service); + + void RegisterAsyncService(AsynchronousService *service); // Add a listening port. Can be called multiple times. void AddPort(const grpc::string& addr); @@ -71,9 +75,10 @@ class ServerBuilder { private: std::vector<RpcService*> services_; + std::vector<AsynchronousService*> async_services_; std::vector<grpc::string> ports_; std::shared_ptr<ServerCredentials> creds_; - ThreadPoolInterface* thread_pool_; + ThreadPoolInterface* thread_pool_ = nullptr; }; } // namespace grpc diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 47fd6cf1c8..4af9fd6aaa 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -35,6 +35,9 @@ #define __GRPCPP_SERVER_CONTEXT_H_ #include <chrono> +#include <vector> + +#include "config.h" namespace grpc { @@ -43,7 +46,10 @@ class ServerContext { public: virtual ~ServerContext() {} - virtual std::chrono::system_clock::time_point absolute_deadline() const = 0; + std::chrono::system_clock::time_point absolute_deadline(); + + private: + std::vector<std::pair<grpc::string, grpc::string> > metadata_; }; } // namespace grpc diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index b8982f4d93..c30825a7a5 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -34,7 +34,9 @@ #ifndef __GRPCPP_STREAM_H__ #define __GRPCPP_STREAM_H__ -#include <grpc++/stream_context_interface.h> +#include <grpc++/call.h> +#include <grpc++/channel_interface.h> +#include <grpc++/completion_queue.h> #include <grpc++/status.h> #include <grpc/support/log.h> @@ -45,16 +47,12 @@ class ClientStreamingInterface { public: virtual ~ClientStreamingInterface() {} - // Try to cancel the stream. Wait() still needs to be called to get the final - // status. Cancelling after the stream has finished has no effects. - virtual void Cancel() = 0; - // Wait until the stream finishes, and return the final status. When the // client side declares it has no more message to send, either implicitly or // by calling WritesDone, it needs to make sure there is no more message to // be received from the server, either implicitly or by getting a false from // a Read(). Otherwise, this implicitly cancels the stream. - virtual const Status& Wait() = 0; + virtual Status Finish() = 0; }; // An interface that yields a sequence of R messages. @@ -82,147 +80,391 @@ class WriterInterface { }; template <class R> -class ClientReader : public ClientStreamingInterface, - public ReaderInterface<R> { +class ClientReader final : public ClientStreamingInterface, + public ReaderInterface<R> { public: // Blocking create a stream and write the first request out. - explicit ClientReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Write(context_->request(), true); + ClientReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request) + : call_(channel->CreateCall(method, context, &cq_)) { + CallOpBuffer buf; + buf.AddSendMessage(request); + buf.AddClientSendClose(); + call_.PerformOps(&buf); + cq_.Pluck(&buf); } - ~ClientReader() { delete context_; } + virtual bool Read(R *msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } - virtual void Cancel() { context_->Cancel(); } + private: + CompletionQueue cq_; + Call call_; +}; - virtual const Status& Wait() { return context_->Wait(); } +template <class W> +class ClientWriter final : public ClientStreamingInterface, + public WriterInterface<W> { + public: + // Blocking create a stream. + ClientWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_)) {} + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } + + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } + + // Read the final response and wait for the final status. + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddRecvMessage(response_); + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } private: - StreamContextInterface* const context_; + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; }; -template <class W> -class ClientWriter : public ClientStreamingInterface, - public WriterInterface<W> { +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientReaderWriter final : public ClientStreamingInterface, + public WriterInterface<W>, + public ReaderInterface<R> { public: // Blocking create a stream. - explicit ClientWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + ClientReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_)) {} + + virtual bool Read(R *msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } - ~ClientWriter() { delete context_; } + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual bool WritesDone() { + CallOpBuffer buf; + buf.AddClientSendClose(); + call_.PerformOps(&buf); + return cq_.Pluck(&buf); } - virtual void WritesDone() { context_->Write(nullptr, true); } + virtual Status Finish() override { + CallOpBuffer buf; + Status status; + buf.AddClientRecvStatus(&status); + call_.PerformOps(&buf); + GPR_ASSERT(cq_.Pluck(&buf)); + return status; + } - virtual void Cancel() { context_->Cancel(); } + private: + CompletionQueue cq_; + Call call_; +}; - // Read the final response and wait for the final status. - virtual const Status& Wait() { - bool success = context_->Read(context_->response()); - if (!success) { - Cancel(); - } else { - success = context_->Read(nullptr); - if (success) { - Cancel(); - } - } - return context_->Wait(); +template <class R> +class ServerReader final : public ReaderInterface<R> { + public: + explicit ServerReader(Call* call) : call_(call) {} + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } private: - StreamContextInterface* const context_; + Call* call_; }; -// Client-side interface for bi-directional streaming. +template <class W> +class ServerWriter final : public WriterInterface<W> { + public: + explicit ServerWriter(Call* call) : call_(call) {} + + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } + + private: + Call* call_; +}; + +// Server-side interface for bi-directional streaming. template <class W, class R> -class ClientReaderWriter : public ClientStreamingInterface, - public WriterInterface<W>, +class ServerReaderWriter final : public WriterInterface<W>, public ReaderInterface<R> { public: - // Blocking create a stream. - explicit ClientReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(false); + explicit ServerReaderWriter(Call* call) : call_(call) {} + + virtual bool Read(R* msg) override { + CallOpBuffer buf; + buf.AddRecvMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); } - ~ClientReaderWriter() { delete context_; } + virtual bool Write(const W& msg) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); + return call_->cq()->Pluck(&buf); + } + + private: + CompletionQueue* cq_; + Call* call_; +}; + +// Async interfaces +// Common interface for all client side streaming. +class ClientAsyncStreamingInterface { + public: + virtual ~ClientAsyncStreamingInterface() {} + + virtual void Finish(Status* status, void* tag) = 0; +}; + +// An interface that yields a sequence of R messages. +template <class R> +class AsyncReaderInterface { + public: + virtual ~AsyncReaderInterface() {} + + virtual void Read(R* msg, void* tag) = 0; +}; - virtual bool Read(R* msg) { return context_->Read(msg); } +// An interface that can be fed a sequence of W messages. +template <class W> +class AsyncWriterInterface { + public: + virtual ~AsyncWriterInterface() {} - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Write(const W& msg, void* tag) = 0; +}; + +template <class R> +class ClientAsyncReader final : public ClientAsyncStreamingInterface, + public AsyncReaderInterface<R> { + public: + // Blocking create a stream and write the first request out. + ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, void* tag) + : call_(channel->CreateCall(method, context, &cq_)) { + init_buf_.Reset(tag); + init_buf_.AddSendMessage(request); + init_buf_.AddClientSendClose(); + call_.PerformOps(&init_buf_); } - virtual void WritesDone() { context_->Write(nullptr, true); } + virtual void Read(R *msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } - virtual void Cancel() { context_->Cancel(); } + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } - virtual const Status& Wait() { return context_->Wait(); } + private: + CompletionQueue cq_; + Call call_; + CallOpBuffer init_buf_; + CallOpBuffer read_buf_; + CallOpBuffer finish_buf_; +}; + +template <class W> +class ClientAsyncWriter final : public ClientAsyncStreamingInterface, + public WriterInterface<W> { + public: + // Blocking create a stream. + ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + google::protobuf::Message *response) + : response_(response), + call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); + } + + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); + } + + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddRecvMessage(response_); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } private: - StreamContextInterface* const context_; + google::protobuf::Message *const response_; + CompletionQueue cq_; + Call call_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; -template <class R> -class ServerReader : public ReaderInterface<R> { +// Client-side interface for bi-directional streaming. +template <class W, class R> +class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface, + public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerReader(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); + ClientAsyncReaderWriter(ChannelInterface *channel, + const RpcMethod &method, ClientContext *context) + : call_(channel->CreateCall(method, context, &cq_)) {} + + virtual void Read(R *msg, void* tag) override { + read_buf_.Reset(tag); + read_buf_.AddRecvMessage(msg); + call_.PerformOps(&read_buf_); + } + + virtual void Write(const W& msg, void* tag) override { + write_buf_.Reset(tag); + write_buf_.AddSendMessage(msg); + call_.PerformOps(&write_buf_); } - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void WritesDone(void* tag) override { + writes_done_buf_.Reset(tag); + writes_done_buf_.AddClientSendClose(); + call_.PerformOps(&writes_done_buf_); + } + + virtual void Finish(Status* status, void* tag) override { + finish_buf_.Reset(tag); + finish_buf_.AddClientRecvStatus(status); + call_.PerformOps(&finish_buf_); + } private: - StreamContextInterface* const context_; // not owned + CompletionQueue cq_; + Call call_; + CallOpBuffer read_buf_; + CallOpBuffer write_buf_; + CallOpBuffer writes_done_buf_; + CallOpBuffer finish_buf_; }; +// TODO(yangg) Move out of stream.h template <class W> -class ServerWriter : public WriterInterface<W> { +class ServerAsyncResponseWriter final { public: - explicit ServerWriter(StreamContextInterface* context) : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - context_->Read(context_->request()); + explicit ServerAsyncResponseWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) override { + CallOpBuffer buf; + buf.AddSendMessage(msg); + call_->PerformOps(&buf); } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + private: + Call* call_; +}; + +template <class R> +class ServerAsyncReader : public AsyncReaderInterface<R> { + public: + explicit ServerAsyncReader(Call* call) : call_(call) {} + + virtual void Read(R* msg, void* tag) { + // TODO + } + + private: + Call* call_; +}; + +template <class W> +class ServerAsyncWriter : public AsyncWriterInterface<W> { + public: + explicit ServerAsyncWriter(Call* call) : call_(call) {} + + virtual void Write(const W& msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; // Server-side interface for bi-directional streaming. template <class W, class R> -class ServerReaderWriter : public WriterInterface<W>, - public ReaderInterface<R> { +class ServerAsyncReaderWriter : public AsyncWriterInterface<W>, + public AsyncReaderInterface<R> { public: - explicit ServerReaderWriter(StreamContextInterface* context) - : context_(context) { - GPR_ASSERT(context_); - context_->Start(true); - } + explicit ServerAsyncReaderWriter(Call* call) : call_(call) {} - virtual bool Read(R* msg) { return context_->Read(msg); } + virtual void Read(R* msg, void* tag) { + // TODO + } - virtual bool Write(const W& msg) { - return context_->Write(const_cast<W*>(&msg), false); + virtual void Write(const W& msg, void* tag) { + // TODO } private: - StreamContextInterface* const context_; // not owned + Call* call_; }; } // namespace grpc diff --git a/src/core/support/cpu.h b/include/grpc/support/cpu.h index f8ec2c6522..9025f7c21f 100644 --- a/src/core/support/cpu.h +++ b/include/grpc/support/cpu.h @@ -34,6 +34,10 @@ #ifndef __GRPC_INTERNAL_SUPPORT_CPU_H__ #define __GRPC_INTERNAL_SUPPORT_CPU_H__ +#ifdef __cplusplus +extern "C" { +#endif + /* Interface providing CPU information for currently running system */ /* Return the number of CPU cores on the current system. Will return 0 if @@ -46,4 +50,8 @@ unsigned gpr_cpu_num_cores(void); [0, gpr_cpu_num_cores() - 1] */ unsigned gpr_cpu_current_cpu(void); +#ifdef __cplusplus +} // extern "C" +#endif + #endif /* __GRPC_INTERNAL_SUPPORT_CPU_H__ */ diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 8724f97e8b..d04c2efcc4 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -61,6 +61,17 @@ bool BidiStreaming(const google::protobuf::MethodDescriptor *method) { return method->client_streaming() && method->server_streaming(); } +bool HasUnaryCalls(const google::protobuf::FileDescriptor *file) { + for (int i = 0; i < file->service_count(); i++) { + for (int j = 0; j < file->service(i)->method_count(); j++) { + if (NoStreaming(file->service(i)->method(j))) { + return true; + } + } + } + return false; +} + bool HasClientOnlyStreaming(const google::protobuf::FileDescriptor *file) { for (int i = 0; i < file->service_count(); i++) { for (int j = 0; j < file->service(i)->method_count(); j++) { @@ -97,20 +108,29 @@ bool HasBidiStreaming(const google::protobuf::FileDescriptor *file) { std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) { std::string temp = - "#include \"grpc++/impl/internal_stub.h\"\n" - "#include \"grpc++/status.h\"\n" + "#include <grpc++/impl/internal_stub.h>\n" + "#include <grpc++/impl/service_type.h>\n" + "#include <grpc++/status.h>\n" "\n" "namespace grpc {\n" "class ChannelInterface;\n" "class RpcService;\n" "class ServerContext;\n"; + if (HasUnaryCalls(file)) { + temp.append( + "template <class OutMessage> class ServerAsyncResponseWriter;\n"); + } if (HasClientOnlyStreaming(file)) { temp.append("template <class OutMessage> class ClientWriter;\n"); temp.append("template <class InMessage> class ServerReader;\n"); + temp.append("template <class OutMessage> class ClientAsyncWriter;\n"); + temp.append("template <class InMessage> class ServerAsyncReader;\n"); } if (HasServerOnlyStreaming(file)) { temp.append("template <class InMessage> class ClientReader;\n"); temp.append("template <class OutMessage> class ServerWriter;\n"); + temp.append("template <class OutMessage> class ClientAsyncReader;\n"); + temp.append("template <class InMessage> class ServerAsyncWriter;\n"); } if (HasBidiStreaming(file)) { temp.append( @@ -119,16 +139,24 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) { temp.append( "template <class OutMessage, class InMessage>\n" "class ServerReaderWriter;\n"); + temp.append( + "template <class OutMessage, class InMessage>\n" + "class ClientAsyncReaderWriter;\n"); + temp.append( + "template <class OutMessage, class InMessage>\n" + "class ServerAsyncReaderWriter;\n"); } temp.append("} // namespace grpc\n"); return temp; } std::string GetSourceIncludes() { - return "#include \"grpc++/channel_interface.h\"\n" - "#include \"grpc++/impl/rpc_method.h\"\n" - "#include \"grpc++/impl/rpc_service_method.h\"\n" - "#include \"grpc++/stream.h\"\n"; + return "#include <grpc++/channel_interface.h>\n" + "#include <grpc++/impl/client_unary_call.h>\n" + "#include <grpc++/impl/rpc_method.h>\n" + "#include <grpc++/impl/rpc_service_method.h>\n" + "#include <grpc++/impl/service_type.h>\n" + "#include <grpc++/stream.h>\n"; } void PrintHeaderClientMethod(google::protobuf::io::Printer *printer, @@ -142,27 +170,45 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer, if (NoStreaming(method)) { printer->Print(*vars, "::grpc::Status $Method$(::grpc::ClientContext* context, " - "const $Request$& request, $Response$* response);\n\n"); + "const $Request$& request, $Response$* response);\n"); + printer->Print(*vars, + "void $Method$(::grpc::ClientContext* context, " + "const $Request$& request, $Response$* response, " + "::grpc::Status *status, " + "::grpc::CompletionQueue *cq, void *tag);\n"); } else if (ClientOnlyStreaming(method)) { - printer->Print( - *vars, - "::grpc::ClientWriter< $Request$>* $Method$(" - "::grpc::ClientContext* context, $Response$* response);\n\n"); + printer->Print(*vars, + "::grpc::ClientWriter< $Request$>* $Method$(" + "::grpc::ClientContext* context, $Response$* response);\n"); + printer->Print(*vars, + "::grpc::ClientWriter< $Request$>* $Method$(" + "::grpc::ClientContext* context, $Response$* response, " + "::grpc::Status *status, " + "::grpc::CompletionQueue *cq, void *tag);\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, "::grpc::ClientReader< $Response$>* $Method$(" - "::grpc::ClientContext* context, const $Request$* request);\n\n"); + "::grpc::ClientContext* context, const $Request$* request);\n"); + printer->Print(*vars, + "::grpc::ClientReader< $Response$>* $Method$(" + "::grpc::ClientContext* context, const $Request$* request, " + "::grpc::CompletionQueue *cq, void *tag);\n"); } else if (BidiStreaming(method)) { printer->Print(*vars, "::grpc::ClientReaderWriter< $Request$, $Response$>* " - "$Method$(::grpc::ClientContext* context);\n\n"); + "$Method$(::grpc::ClientContext* context);\n"); + printer->Print(*vars, + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " + "$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue *cq, void *tag);\n"); } } -void PrintHeaderServerMethod(google::protobuf::io::Printer *printer, - const google::protobuf::MethodDescriptor *method, - std::map<std::string, std::string> *vars) { +void PrintHeaderServerMethodSync( + google::protobuf::io::Printer *printer, + const google::protobuf::MethodDescriptor *method, + std::map<std::string, std::string> *vars) { (*vars)["Method"] = method->name(); (*vars)["Request"] = grpc_cpp_generator::ClassName(method->input_type(), true); @@ -194,19 +240,56 @@ void PrintHeaderServerMethod(google::protobuf::io::Printer *printer, } } +void PrintHeaderServerMethodAsync( + google::protobuf::io::Printer *printer, + const google::protobuf::MethodDescriptor *method, + std::map<std::string, std::string> *vars) { + (*vars)["Method"] = method->name(); + (*vars)["Request"] = + grpc_cpp_generator::ClassName(method->input_type(), true); + (*vars)["Response"] = + grpc_cpp_generator::ClassName(method->output_type(), true); + if (NoStreaming(method)) { + printer->Print(*vars, + "void $Method$(" + "::grpc::ServerContext* context, $Request$* request, " + "::grpc::ServerAsyncResponseWriter< $Response$>* response, " + "::grpc::CompletionQueue* cq, void *tag);\n"); + } else if (ClientOnlyStreaming(method)) { + printer->Print(*vars, + "void $Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReader< $Request$>* reader, " + "::grpc::CompletionQueue* cq, void *tag);\n"); + } else if (ServerOnlyStreaming(method)) { + printer->Print(*vars, + "void $Method$(" + "::grpc::ServerContext* context, $Request$* request, " + "::grpc::ServerAsyncWriter< $Response$>* writer, " + "::grpc::CompletionQueue* cq, void *tag);\n"); + } else if (BidiStreaming(method)) { + printer->Print( + *vars, + "void $Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " + "::grpc::CompletionQueue* cq, void *tag);\n"); + } +} + void PrintHeaderService(google::protobuf::io::Printer *printer, const google::protobuf::ServiceDescriptor *service, std::map<std::string, std::string> *vars) { (*vars)["Service"] = service->name(); printer->Print(*vars, - "class $Service$ {\n" + "class $Service$ final {\n" " public:\n"); printer->Indent(); // Client side printer->Print( - "class Stub : public ::grpc::InternalStub {\n" + "class Stub final : public ::grpc::InternalStub {\n" " public:\n"); printer->Indent(); for (int i = 0; i < service->method_count(); ++i) { @@ -220,17 +303,34 @@ void PrintHeaderService(google::protobuf::io::Printer *printer, printer->Print("\n"); - // Server side + // Server side - Synchronous printer->Print( - "class Service {\n" + "class Service : public ::grpc::SynchronousService {\n" " public:\n"); printer->Indent(); printer->Print("Service() : service_(nullptr) {}\n"); printer->Print("virtual ~Service();\n"); for (int i = 0; i < service->method_count(); ++i) { - PrintHeaderServerMethod(printer, service->method(i), vars); + PrintHeaderServerMethodSync(printer, service->method(i), vars); + } + printer->Print("::grpc::RpcService* service() override final;\n"); + printer->Outdent(); + printer->Print( + " private:\n" + " ::grpc::RpcService* service_;\n"); + printer->Print("};\n"); + + // Server side - Asynchronous + printer->Print( + "class AsyncService final : public ::grpc::AsynchronousService {\n" + " public:\n"); + printer->Indent(); + printer->Print("AsyncService() : service_(nullptr) {}\n"); + printer->Print("~AsyncService();\n"); + for (int i = 0; i < service->method_count(); ++i) { + PrintHeaderServerMethodAsync(printer, service->method(i), vars); } - printer->Print("::grpc::RpcService* service();\n"); + printer->Print("::grpc::RpcService* service() override;\n"); printer->Outdent(); printer->Print( " private:\n" @@ -268,7 +368,7 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) {\n"); printer->Print(*vars, - " return channel()->StartBlockingRpc(" + "return ::grpc::BlockingUnaryCall(channel()," "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\"), " "context, request, response);\n" "}\n\n"); @@ -279,10 +379,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::ClientContext* context, $Response$* response) {\n"); printer->Print(*vars, " return new ::grpc::ClientWriter< $Request$>(" - "channel()->CreateStream(" + "channel()," "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " - "context, nullptr, response));\n" + "context, response);\n" "}\n\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( @@ -291,10 +391,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, "::grpc::ClientContext* context, const $Request$* request) {\n"); printer->Print(*vars, " return new ::grpc::ClientReader< $Response$>(" - "channel()->CreateStream(" + "channel()," "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " - "context, request, nullptr));\n" + "context, *request);\n" "}\n\n"); } else if (BidiStreaming(method)) { printer->Print( @@ -304,10 +404,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer, printer->Print( *vars, " return new ::grpc::ClientReaderWriter< $Request$, $Response$>(" - "channel()->CreateStream(" + "channel()," "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " - "context, nullptr, nullptr));\n" + "context);\n" "}\n\n"); } } @@ -362,6 +462,47 @@ void PrintSourceServerMethod(google::protobuf::io::Printer *printer, } } +void PrintSourceServerAsyncMethod(google::protobuf::io::Printer *printer, + const google::protobuf::MethodDescriptor *method, + std::map<std::string, std::string> *vars) { + (*vars)["Method"] = method->name(); + (*vars)["Request"] = + grpc_cpp_generator::ClassName(method->input_type(), true); + (*vars)["Response"] = + grpc_cpp_generator::ClassName(method->output_type(), true); + if (NoStreaming(method)) { + printer->Print(*vars, + "void $Service$::AsyncService::$Method$(" + "::grpc::ServerContext* context, " + "$Request$* request, " + "::grpc::ServerAsyncResponseWriter< $Response$>* response, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print("}\n\n"); + } else if (ClientOnlyStreaming(method)) { + printer->Print(*vars, + "void $Service$::AsyncService::$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReader< $Request$>* reader, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print("}\n\n"); + } else if (ServerOnlyStreaming(method)) { + printer->Print(*vars, + "void $Service$::AsyncService::$Method$(" + "::grpc::ServerContext* context, " + "$Request$* request, " + "::grpc::ServerAsyncWriter< $Response$>* writer, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print("}\n\n"); + } else if (BidiStreaming(method)) { + printer->Print(*vars, + "void $Service$::AsyncService::$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, " + "::grpc::CompletionQueue* cq, void *tag) {\n"); + printer->Print("}\n\n"); + } +} + void PrintSourceService(google::protobuf::io::Printer *printer, const google::protobuf::ServiceDescriptor *service, std::map<std::string, std::string> *vars) { @@ -384,6 +525,7 @@ void PrintSourceService(google::protobuf::io::Printer *printer, "}\n\n"); for (int i = 0; i < service->method_count(); ++i) { PrintSourceServerMethod(printer, service->method(i), vars); + PrintSourceServerAsyncMethod(printer, service->method(i), vars); } printer->Print(*vars, "::grpc::RpcService* $Service$::Service::service() {\n"); diff --git a/src/core/statistics/census_log.c b/src/core/statistics/census_log.c index aea0a33bad..1504c027de 100644 --- a/src/core/statistics/census_log.c +++ b/src/core/statistics/census_log.c @@ -91,9 +91,9 @@ */ #include "src/core/statistics/census_log.h" #include <string.h> -#include "src/core/support/cpu.h" #include <grpc/support/alloc.h> #include <grpc/support/atm.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/sync.h> diff --git a/src/core/support/cpu_linux.c b/src/core/support/cpu_linux.c index ad82174894..c8375e65b6 100644 --- a/src/core/support/cpu_linux.c +++ b/src/core/support/cpu_linux.c @@ -39,7 +39,7 @@ #ifdef GPR_CPU_LINUX -#include "src/core/support/cpu.h" +#include <grpc/support/cpu.h> #include <sched.h> #include <errno.h> diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 3f39364bda..a1539e4711 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -42,9 +42,10 @@ #include <grpc/support/slice.h> #include "src/cpp/proto/proto_utils.h" -#include "src/cpp/stream/stream_context.h" +#include <grpc++/call.h> #include <grpc++/channel_arguments.h> #include <grpc++/client_context.h> +#include <grpc++/completion_queue.h> #include <grpc++/config.h> #include <grpc++/credentials.h> #include <grpc++/impl/rpc_method.h> @@ -77,103 +78,23 @@ Channel::Channel(const grpc::string &target, Channel::~Channel() { grpc_channel_destroy(c_channel_); } -namespace { -// Pluck the finished event and set to status when it is not nullptr. -void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag, - Status *status) { - grpc_event *ev = - grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future); - if (status) { - StatusCode error_code = static_cast<StatusCode>(ev->data.finished.status); - grpc::string details(ev->data.finished.details ? ev->data.finished.details - : ""); - *status = Status(error_code, details); - } - grpc_event_finish(ev); +Call Channel::CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) { + auto c_call = + grpc_channel_create_call(c_channel_, cq->cq(), method.name(), + target_.c_str(), context->RawDeadline()); + context->set_call(c_call); + return Call(c_call, this, cq); } -} // namespace -// TODO(yangg) more error handling -Status Channel::StartBlockingRpc(const RpcMethod &method, - ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result) { - Status status; - grpc_call *call = grpc_channel_create_call_old( - c_channel_, method.name(), target_.c_str(), context->RawDeadline()); - context->set_call(call); - - grpc_event *ev; - void *finished_tag = reinterpret_cast<char *>(call); - void *metadata_read_tag = reinterpret_cast<char *>(call) + 2; - void *write_tag = reinterpret_cast<char *>(call) + 3; - void *halfclose_tag = reinterpret_cast<char *>(call) + 4; - void *read_tag = reinterpret_cast<char *>(call) + 5; - - grpc_completion_queue *cq = grpc_completion_queue_create(); - context->set_cq(cq); - // add_metadata from context - // - // invoke - GPR_ASSERT(grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - // write request - grpc_byte_buffer *write_buffer = nullptr; - bool success = SerializeProto(request, &write_buffer); - if (!success) { - grpc_call_cancel(call); - status = - Status(StatusCode::DATA_LOSS, "Failed to serialize request proto."); - GetFinalStatus(cq, finished_tag, nullptr); - return status; - } - GPR_ASSERT(grpc_call_start_write_old(call, write_buffer, write_tag, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - grpc_byte_buffer_destroy(write_buffer); - ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future); - - success = ev->data.write_accepted == GRPC_OP_OK; - grpc_event_finish(ev); - if (!success) { - GetFinalStatus(cq, finished_tag, &status); - return status; - } - // writes done - GPR_ASSERT(grpc_call_writes_done_old(call, halfclose_tag) == GRPC_CALL_OK); - ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future); - grpc_event_finish(ev); - // start read metadata - // - ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future); - grpc_event_finish(ev); - // start read - GPR_ASSERT(grpc_call_start_read_old(call, read_tag) == GRPC_CALL_OK); - ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future); - if (ev->data.read) { - if (!DeserializeProto(ev->data.read, result)) { - grpc_event_finish(ev); - status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto."); - GetFinalStatus(cq, finished_tag, nullptr); - return status; - } - } - grpc_event_finish(ev); - - // wait status - GetFinalStatus(cq, finished_tag, &status); - return status; -} - -StreamContextInterface *Channel::CreateStream( - const RpcMethod &method, ClientContext *context, - const google::protobuf::Message *request, - google::protobuf::Message *result) { - grpc_call *call = grpc_channel_create_call_old( - c_channel_, method.name(), target_.c_str(), context->RawDeadline()); - context->set_call(call); - grpc_completion_queue *cq = grpc_completion_queue_create(); - context->set_cq(cq); - return new StreamContext(method, context, request, result); +void Channel::PerformOpsOnCall(CallOpBuffer *buf, Call *call) { + static const size_t MAX_OPS = 8; + size_t nops = MAX_OPS; + grpc_op ops[MAX_OPS]; + buf->FillOps(ops, &nops); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call->call(), ops, nops, + buf)); } } // namespace grpc diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index 67d18bf4c8..894f87698c 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -42,11 +42,14 @@ struct grpc_channel; namespace grpc { +class Call; +class CallOpBuffer; class ChannelArguments; +class CompletionQueue; class Credentials; class StreamContextInterface; -class Channel : public ChannelInterface { +class Channel final : public ChannelInterface { public: Channel(const grpc::string &target, const ChannelArguments &args); Channel(const grpc::string &target, const std::unique_ptr<Credentials> &creds, @@ -54,14 +57,10 @@ class Channel : public ChannelInterface { ~Channel() override; - Status StartBlockingRpc(const RpcMethod &method, ClientContext *context, - const google::protobuf::Message &request, - google::protobuf::Message *result) override; - - StreamContextInterface *CreateStream( - const RpcMethod &method, ClientContext *context, - const google::protobuf::Message *request, - google::protobuf::Message *result) override; + virtual Call CreateCall(const RpcMethod &method, ClientContext *context, + CompletionQueue *cq) override; + virtual void PerformOpsOnCall(CallOpBuffer *ops, + Call *call) override; private: const grpc::string target_; diff --git a/src/cpp/server/server_rpc_handler.h b/src/cpp/client/client_unary_call.cc index a43e07dc5f..e652750e22 100644 --- a/src/cpp/server/server_rpc_handler.h +++ b/src/cpp/client/client_unary_call.cc @@ -31,36 +31,30 @@ * */ -#ifndef __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ -#define __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ - -#include <memory> - +#include <grpc++/impl/client_unary_call.h> +#include <grpc++/call.h> +#include <grpc++/channel_interface.h> #include <grpc++/completion_queue.h> #include <grpc++/status.h> namespace grpc { -class AsyncServerContext; -class RpcServiceMethod; - -class ServerRpcHandler { - public: - // Takes ownership of async_server_context. - ServerRpcHandler(AsyncServerContext *async_server_context, - RpcServiceMethod *method); - - void StartRpc(); - - private: - CompletionQueue::CompletionType WaitForNextEvent(); - void FinishRpc(const Status &status); - - std::unique_ptr<AsyncServerContext> async_server_context_; - RpcServiceMethod *method_; - CompletionQueue cq_; -}; +// Wrapper that performs a blocking unary call +Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method, + ClientContext *context, + const google::protobuf::Message &request, + google::protobuf::Message *result) { + CompletionQueue cq; + Call call(channel->CreateCall(method, context, &cq)); + CallOpBuffer buf; + Status status; + buf.AddSendMessage(request); + buf.AddRecvMessage(result); + buf.AddClientSendClose(); + buf.AddClientRecvStatus(&status); + call.PerformOps(&buf); + cq.Pluck(&buf); + return status; +} } // namespace grpc - -#endif // __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__ diff --git a/include/grpc++/stream_context_interface.h b/src/cpp/common/call.cc index a84119800b..d3a9de3620 100644 --- a/include/grpc++/stream_context_interface.h +++ b/src/cpp/common/call.cc @@ -31,34 +31,13 @@ * */ -#ifndef __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ -#define __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ - -namespace google { -namespace protobuf { -class Message; -} -} +#include <include/grpc++/call.h> +#include <include/grpc++/channel_interface.h> namespace grpc { -class Status; - -// An interface to avoid dependency on internal implementation. -class StreamContextInterface { - public: - virtual ~StreamContextInterface() {} - virtual void Start(bool buffered) = 0; - - virtual bool Read(google::protobuf::Message* msg) = 0; - virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0; - virtual const Status& Wait() = 0; - virtual void Cancel() = 0; - - virtual google::protobuf::Message* request() = 0; - virtual google::protobuf::Message* response() = 0; -}; +void Call::PerformOps(CallOpBuffer* buffer) { + channel_->PerformOpsOnCall(buffer, this); +} } // namespace grpc - -#endif // __GRPCPP_STREAM_CONTEXT_INTERFACE_H__ diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index f06da9b04f..cbeda81a0b 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -1,5 +1,4 @@ /* - * * Copyright 2014, Google Inc. * All rights reserved. * @@ -33,11 +32,12 @@ #include <grpc++/completion_queue.h> +#include <memory> + #include <grpc/grpc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> #include "src/cpp/util/time.h" -#include <grpc++/async_server_context.h> namespace grpc { @@ -47,66 +47,35 @@ CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } -CompletionQueue::CompletionType CompletionQueue::Next(void **tag) { - grpc_event *ev; - CompletionType return_type; - bool success; +// Helper class so we can declare a unique_ptr with grpc_event +class EventDeleter { + public: + void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); } +}; - ev = grpc_completion_queue_next(cq_, gpr_inf_future); - if (!ev) { - gpr_log(GPR_ERROR, "no next event in queue"); - abort(); - } - switch (ev->type) { - case GRPC_QUEUE_SHUTDOWN: - return_type = QUEUE_CLOSED; - break; - case GRPC_READ: - *tag = ev->tag; - if (ev->data.read) { - success = static_cast<AsyncServerContext *>(ev->tag) - ->ParseRead(ev->data.read); - return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR; - } else { - return_type = SERVER_READ_ERROR; - } - break; - case GRPC_WRITE_ACCEPTED: - *tag = ev->tag; - if (ev->data.write_accepted != GRPC_OP_ERROR) { - return_type = SERVER_WRITE_OK; - } else { - return_type = SERVER_WRITE_ERROR; - } - break; - case GRPC_SERVER_RPC_NEW: - GPR_ASSERT(!ev->tag); - // Finishing the pending new rpcs after the server has been shutdown. - if (!ev->call) { - *tag = nullptr; - } else { - *tag = new AsyncServerContext( - ev->call, ev->data.server_rpc_new.method, - ev->data.server_rpc_new.host, - Timespec2Timepoint(ev->data.server_rpc_new.deadline)); - } - return_type = SERVER_RPC_NEW; - break; - case GRPC_FINISHED: - *tag = ev->tag; - return_type = RPC_END; - break; - case GRPC_FINISH_ACCEPTED: - *tag = ev->tag; - return_type = HALFCLOSE_OK; - break; - default: - // We do not handle client side messages now - gpr_log(GPR_ERROR, "client-side messages aren't supported yet"); - abort(); +bool CompletionQueue::Next(void **tag, bool *ok) { + std::unique_ptr<grpc_event, EventDeleter> ev; + + ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future)); + if (ev->type == GRPC_QUEUE_SHUTDOWN) { + return false; } - grpc_event_finish(ev); - return return_type; + auto cq_tag = static_cast<CompletionQueueTag *>(ev->tag); + *ok = ev->data.op_complete == GRPC_OP_OK; + *tag = cq_tag; + cq_tag->FinalizeResult(tag, ok); + return true; +} + +bool CompletionQueue::Pluck(CompletionQueueTag *tag) { + std::unique_ptr<grpc_event, EventDeleter> ev; + + ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future)); + bool ok = ev->data.op_complete == GRPC_OP_OK; + void *ignored = tag; + tag->FinalizeResult(&ignored, &ok); + GPR_ASSERT(ignored == tag); + return ok; } } // namespace grpc diff --git a/src/cpp/server/async_server.cc b/src/cpp/server/async_server.cc deleted file mode 100644 index 86faa07b31..0000000000 --- a/src/cpp/server/async_server.cc +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc++/async_server.h> - -#include <grpc/grpc.h> -#include <grpc/support/log.h> -#include <grpc++/completion_queue.h> - -namespace grpc { - -AsyncServer::AsyncServer(CompletionQueue *cc) - : started_(false), shutdown_(false) { - server_ = grpc_server_create(cc->cq(), nullptr); -} - -AsyncServer::~AsyncServer() { - std::unique_lock<std::mutex> lock(shutdown_mu_); - if (started_ && !shutdown_) { - lock.unlock(); - Shutdown(); - } - grpc_server_destroy(server_); -} - -void AsyncServer::AddPort(const grpc::string &addr) { - GPR_ASSERT(!started_); - int success = grpc_server_add_http2_port(server_, addr.c_str()); - GPR_ASSERT(success); -} - -void AsyncServer::Start() { - GPR_ASSERT(!started_); - started_ = true; - grpc_server_start(server_); -} - -void AsyncServer::RequestOneRpc() { - GPR_ASSERT(started_); - std::unique_lock<std::mutex> lock(shutdown_mu_); - if (shutdown_) { - return; - } - lock.unlock(); - grpc_call_error err = grpc_server_request_call_old(server_, nullptr); - GPR_ASSERT(err == GRPC_CALL_OK); -} - -void AsyncServer::Shutdown() { - std::unique_lock<std::mutex> lock(shutdown_mu_); - if (started_ && !shutdown_) { - shutdown_ = true; - lock.unlock(); - // TODO(yangg) should we shutdown without start? - grpc_server_shutdown(server_); - } -} - -} // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 1abdf702e2..5d44ab2ba4 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -37,25 +37,20 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpc/support/log.h> -#include "src/cpp/server/server_rpc_handler.h" -#include "src/cpp/server/thread_pool.h" -#include <grpc++/async_server_context.h> #include <grpc++/completion_queue.h> #include <grpc++/impl/rpc_service_method.h> +#include <grpc++/server_context.h> #include <grpc++/server_credentials.h> +#include <grpc++/thread_pool_interface.h> namespace grpc { -// TODO(rocking): consider a better default value like num of cores. -static const int kNumThreads = 4; - -Server::Server(ThreadPoolInterface *thread_pool, ServerCredentials *creds) +Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerCredentials *creds) : started_(false), shutdown_(false), num_running_cb_(0), - thread_pool_(thread_pool == nullptr ? new ThreadPool(kNumThreads) - : thread_pool), - thread_pool_owned_(thread_pool == nullptr), + thread_pool_(thread_pool), + thread_pool_owned_(thread_pool_owned), secure_(creds != nullptr) { if (creds) { server_ = @@ -75,6 +70,8 @@ Server::~Server() { if (started_ && !shutdown_) { lock.unlock(); Shutdown(); + } else { + lock.unlock(); } grpc_server_destroy(server_); if (thread_pool_owned_) { @@ -82,37 +79,38 @@ Server::~Server() { } } -void Server::RegisterService(RpcService *service) { +bool Server::RegisterService(RpcService *service) { for (int i = 0; i < service->GetMethodCount(); ++i) { RpcServiceMethod *method = service->GetMethod(i); + if (method_map_.find(method->name()) != method_map_.end()) { + gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name()); + return false; + } method_map_.insert(std::make_pair(method->name(), method)); } + return true; } -void Server::AddPort(const grpc::string &addr) { +int Server::AddPort(const grpc::string &addr) { GPR_ASSERT(!started_); - int success; if (secure_) { - success = grpc_server_add_secure_http2_port(server_, addr.c_str()); + return grpc_server_add_secure_http2_port(server_, addr.c_str()); } else { - success = grpc_server_add_http2_port(server_, addr.c_str()); + return grpc_server_add_http2_port(server_, addr.c_str()); } - GPR_ASSERT(success); } -void Server::Start() { +bool Server::Start() { GPR_ASSERT(!started_); started_ = true; grpc_server_start(server_); // Start processing rpcs. - ScheduleCallback(); -} + if (thread_pool_) { + ScheduleCallback(); + } -void Server::AllowOneRpc() { - GPR_ASSERT(started_); - grpc_call_error err = grpc_server_request_call_old(server_, nullptr); - GPR_ASSERT(err == GRPC_CALL_OK); + return true; } void Server::Shutdown() { @@ -132,8 +130,8 @@ void Server::Shutdown() { // Shutdown the completion queue. cq_.Shutdown(); void *tag = nullptr; - CompletionQueue::CompletionType t = cq_.Next(&tag); - GPR_ASSERT(t == CompletionQueue::QUEUE_CLOSED); + bool ok = false; + GPR_ASSERT(false == cq_.Next(&tag, &ok)); } void Server::ScheduleCallback() { @@ -141,31 +139,40 @@ void Server::ScheduleCallback() { std::unique_lock<std::mutex> lock(mu_); num_running_cb_++; } - std::function<void()> callback = std::bind(&Server::RunRpc, this); - thread_pool_->ScheduleCallback(callback); + thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this)); } void Server::RunRpc() { // Wait for one more incoming rpc. void *tag = nullptr; - AllowOneRpc(); - CompletionQueue::CompletionType t = cq_.Next(&tag); - GPR_ASSERT(t == CompletionQueue::SERVER_RPC_NEW); - - AsyncServerContext *server_context = static_cast<AsyncServerContext *>(tag); - // server_context could be nullptr during server shutdown. - if (server_context != nullptr) { - // Schedule a new callback to handle more rpcs. + GPR_ASSERT(started_); + grpc_call *c_call = NULL; + grpc_call_details call_details; + grpc_call_details_init(&call_details); + grpc_metadata_array initial_metadata; + grpc_metadata_array_init(&initial_metadata); + CompletionQueue cq; + grpc_call_error err = grpc_server_request_call(server_, &c_call, &call_details, &initial_metadata, cq.cq(), nullptr); + GPR_ASSERT(err == GRPC_CALL_OK); + bool ok = false; + GPR_ASSERT(cq_.Next(&tag, &ok)); + if (ok) { + ServerContext context; + Call call(c_call, nullptr, &cq); ScheduleCallback(); - RpcServiceMethod *method = nullptr; - auto iter = method_map_.find(server_context->method()); + auto iter = method_map_.find(call_details.method); if (iter != method_map_.end()) { method = iter->second; } - ServerRpcHandler rpc_handler(server_context, method); - rpc_handler.StartRpc(); + // TODO(ctiller): allocate only if necessary + std::unique_ptr<google::protobuf::Message> request(method->AllocateRequestProto()); + std::unique_ptr<google::protobuf::Message> response(method->AllocateResponseProto()); + method->handler()->RunHandler(MethodHandler::HandlerParameter( + &call, &context, request.get(), response.get())); } + grpc_call_details_destroy(&call_details); + grpc_metadata_array_destroy(&initial_metadata); { std::unique_lock<std::mutex> lock(mu_); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index add22cc3d8..d6bcb9313a 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -33,15 +33,22 @@ #include <grpc++/server_builder.h> +#include <grpc/support/cpu.h> #include <grpc/support/log.h> +#include <grpc++/impl/service_type.h> #include <grpc++/server.h> +#include "src/cpp/server/thread_pool.h" namespace grpc { -ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {} +ServerBuilder::ServerBuilder() {} -void ServerBuilder::RegisterService(RpcService *service) { - services_.push_back(service); +void ServerBuilder::RegisterService(SynchronousService *service) { + services_.push_back(service->service()); +} + +void ServerBuilder::RegisterAsyncService(AsynchronousService *service) { + async_services_.push_back(service); } void ServerBuilder::AddPort(const grpc::string &addr) { @@ -59,14 +66,31 @@ void ServerBuilder::SetThreadPool(ThreadPoolInterface *thread_pool) { } std::unique_ptr<Server> ServerBuilder::BuildAndStart() { - std::unique_ptr<Server> server(new Server(thread_pool_, creds_.get())); + bool thread_pool_owned = false; + if (!async_services_.empty() && !services_.empty()) { + gpr_log(GPR_ERROR, "Mixing async and sync services is unsupported for now"); + return nullptr; + } + if (!thread_pool_ && services_.size()) { + int cores = gpr_cpu_num_cores(); + if (!cores) cores = 4; + thread_pool_ = new ThreadPool(cores); + thread_pool_owned = true; + } + std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned, creds_.get())); for (auto *service : services_) { - server->RegisterService(service); + if (!server->RegisterService(service)) { + return nullptr; + } } for (auto &port : ports_) { - server->AddPort(port); + if (!server->AddPort(port)) { + return nullptr; + } + } + if (!server->Start()) { + return nullptr; } - server->Start(); return server; } diff --git a/src/cpp/server/server_rpc_handler.cc b/src/cpp/server/server_rpc_handler.cc deleted file mode 100644 index bf02de8b80..0000000000 --- a/src/cpp/server/server_rpc_handler.cc +++ /dev/null @@ -1,140 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/cpp/server/server_rpc_handler.h" - -#include <grpc/support/log.h> -#include "src/cpp/server/server_context_impl.h" -#include "src/cpp/stream/stream_context.h" -#include <grpc++/async_server_context.h> -#include <grpc++/impl/rpc_service_method.h> - -namespace grpc { - -ServerRpcHandler::ServerRpcHandler(AsyncServerContext *async_server_context, - RpcServiceMethod *method) - : async_server_context_(async_server_context), method_(method) {} - -void ServerRpcHandler::StartRpc() { - if (method_ == nullptr) { - // Method not supported, finish the rpc with error. - // TODO(rocking): do we need to call read to consume the request? - FinishRpc(Status(StatusCode::UNIMPLEMENTED, "No such method.")); - return; - } - - ServerContextImpl user_context(async_server_context_->absolute_deadline()); - - if (method_->method_type() == RpcMethod::NORMAL_RPC) { - // Start the rpc on this dedicated completion queue. - async_server_context_->Accept(cq_.cq()); - - // Allocate request and response. - std::unique_ptr<google::protobuf::Message> request( - method_->AllocateRequestProto()); - std::unique_ptr<google::protobuf::Message> response( - method_->AllocateResponseProto()); - - // Read request - async_server_context_->StartRead(request.get()); - auto type = WaitForNextEvent(); - GPR_ASSERT(type == CompletionQueue::SERVER_READ_OK); - - // Run the application's rpc handler - MethodHandler *handler = method_->handler(); - Status status = handler->RunHandler(MethodHandler::HandlerParameter( - &user_context, request.get(), response.get())); - - if (status.IsOk()) { - // Send the response if we get an ok status. - async_server_context_->StartWrite(*response, GRPC_WRITE_BUFFER_HINT); - type = WaitForNextEvent(); - if (type != CompletionQueue::SERVER_WRITE_OK) { - status = Status(StatusCode::INTERNAL, "Error writing response."); - } - } - - FinishRpc(status); - } else { - // Allocate request and response. - // TODO(yangg) maybe not allocate both when not needed? - std::unique_ptr<google::protobuf::Message> request( - method_->AllocateRequestProto()); - std::unique_ptr<google::protobuf::Message> response( - method_->AllocateResponseProto()); - - StreamContext stream_context(*method_, async_server_context_->call(), - cq_.cq(), request.get(), response.get()); - - // Run the application's rpc handler - MethodHandler *handler = method_->handler(); - Status status = handler->RunHandler(MethodHandler::HandlerParameter( - &user_context, request.get(), response.get(), &stream_context)); - if (status.IsOk() && - method_->method_type() == RpcMethod::CLIENT_STREAMING) { - stream_context.Write(response.get(), false); - } - // TODO(yangg) Do we need to consider the status in stream_context? - FinishRpc(status); - } -} - -CompletionQueue::CompletionType ServerRpcHandler::WaitForNextEvent() { - void *tag = nullptr; - CompletionQueue::CompletionType type = cq_.Next(&tag); - if (type != CompletionQueue::QUEUE_CLOSED && - type != CompletionQueue::RPC_END) { - GPR_ASSERT(static_cast<AsyncServerContext *>(tag) == - async_server_context_.get()); - } - return type; -} - -void ServerRpcHandler::FinishRpc(const Status &status) { - async_server_context_->StartWriteStatus(status); - CompletionQueue::CompletionType type; - - // HALFCLOSE_OK and RPC_END events come in either order. - type = WaitForNextEvent(); - GPR_ASSERT(type == CompletionQueue::HALFCLOSE_OK || - type == CompletionQueue::RPC_END); - type = WaitForNextEvent(); - GPR_ASSERT(type == CompletionQueue::HALFCLOSE_OK || - type == CompletionQueue::RPC_END); - - cq_.Shutdown(); - type = WaitForNextEvent(); - GPR_ASSERT(type == CompletionQueue::QUEUE_CLOSED); -} - -} // namespace grpc diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h index c53f7a7517..8a28c87704 100644 --- a/src/cpp/server/thread_pool.h +++ b/src/cpp/server/thread_pool.h @@ -44,12 +44,12 @@ namespace grpc { -class ThreadPool : public ThreadPoolInterface { +class ThreadPool final : public ThreadPoolInterface { public: explicit ThreadPool(int num_threads); ~ThreadPool(); - void ScheduleCallback(const std::function<void()> &callback) final; + void ScheduleCallback(const std::function<void()> &callback) override; private: std::mutex mu_; diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc deleted file mode 100644 index e4f344dbb9..0000000000 --- a/src/cpp/stream/stream_context.cc +++ /dev/null @@ -1,179 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/cpp/stream/stream_context.h" - -#include <grpc/support/log.h> -#include "src/cpp/proto/proto_utils.h" -#include "src/cpp/util/time.h" -#include <grpc++/client_context.h> -#include <grpc++/config.h> -#include <grpc++/impl/rpc_method.h> -#include <google/protobuf/message.h> - -namespace grpc { - -// Client only ctor -StreamContext::StreamContext(const RpcMethod &method, ClientContext *context, - const google::protobuf::Message *request, - google::protobuf::Message *result) - : is_client_(true), - method_(&method), - call_(context->call()), - cq_(context->cq()), - request_(const_cast<google::protobuf::Message *>(request)), - result_(result), - peer_halfclosed_(false), - self_halfclosed_(false) { - GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC); -} - -// Server only ctor -StreamContext::StreamContext(const RpcMethod &method, grpc_call *call, - grpc_completion_queue *cq, - google::protobuf::Message *request, - google::protobuf::Message *result) - : is_client_(false), - method_(&method), - call_(call), - cq_(cq), - request_(request), - result_(result), - peer_halfclosed_(false), - self_halfclosed_(false) { - GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC); -} - -StreamContext::~StreamContext() {} - -void StreamContext::Start(bool buffered) { - if (is_client_) { - // TODO(yangg) handle metadata send path - int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0; - grpc_call_error error = grpc_call_invoke_old( - call(), cq(), client_metadata_read_tag(), finished_tag(), flag); - GPR_ASSERT(GRPC_CALL_OK == error); - } else { - // TODO(yangg) metadata needs to be added before accept - // TODO(yangg) correctly set flag to accept - GPR_ASSERT(grpc_call_server_accept_old(call(), cq(), finished_tag()) == - GRPC_CALL_OK); - GPR_ASSERT(grpc_call_server_end_initial_metadata_old(call(), 0) == - GRPC_CALL_OK); - } -} - -bool StreamContext::Read(google::protobuf::Message *msg) { - // TODO(yangg) check peer_halfclosed_ here for possible early return. - grpc_call_error err = grpc_call_start_read_old(call(), read_tag()); - GPR_ASSERT(err == GRPC_CALL_OK); - grpc_event *read_ev = - grpc_completion_queue_pluck(cq(), read_tag(), gpr_inf_future); - GPR_ASSERT(read_ev->type == GRPC_READ); - bool ret = true; - if (read_ev->data.read) { - if (!DeserializeProto(read_ev->data.read, msg)) { - ret = false; - grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, - "Failed to parse incoming proto"); - } - } else { - ret = false; - peer_halfclosed_ = true; - } - grpc_event_finish(read_ev); - return ret; -} - -bool StreamContext::Write(const google::protobuf::Message *msg, bool is_last) { - // TODO(yangg) check self_halfclosed_ for possible early return. - bool ret = true; - grpc_event *ev = nullptr; - - if (msg) { - grpc_byte_buffer *out_buf = nullptr; - if (!SerializeProto(*msg, &out_buf)) { - grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, - "Failed to serialize outgoing proto"); - return false; - } - int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; - grpc_call_error err = - grpc_call_start_write_old(call(), out_buf, write_tag(), flag); - grpc_byte_buffer_destroy(out_buf); - GPR_ASSERT(err == GRPC_CALL_OK); - - ev = grpc_completion_queue_pluck(cq(), write_tag(), gpr_inf_future); - GPR_ASSERT(ev->type == GRPC_WRITE_ACCEPTED); - - ret = ev->data.write_accepted == GRPC_OP_OK; - grpc_event_finish(ev); - } - if (ret && is_last) { - grpc_call_error err = grpc_call_writes_done_old(call(), halfclose_tag()); - GPR_ASSERT(err == GRPC_CALL_OK); - ev = grpc_completion_queue_pluck(cq(), halfclose_tag(), gpr_inf_future); - GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED); - grpc_event_finish(ev); - - self_halfclosed_ = true; - } else if (!ret) { // Stream broken - self_halfclosed_ = true; - peer_halfclosed_ = true; - } - - return ret; -} - -const Status &StreamContext::Wait() { - // TODO(yangg) properly support metadata - grpc_event *metadata_ev = grpc_completion_queue_pluck( - cq(), client_metadata_read_tag(), gpr_inf_future); - grpc_event_finish(metadata_ev); - // TODO(yangg) protect states by a mutex, including other places. - if (!self_halfclosed_ || !peer_halfclosed_) { - Cancel(); - } - grpc_event *finish_ev = - grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); - GPR_ASSERT(finish_ev->type == GRPC_FINISHED); - final_status_ = Status( - static_cast<StatusCode>(finish_ev->data.finished.status), - finish_ev->data.finished.details ? finish_ev->data.finished.details : ""); - grpc_event_finish(finish_ev); - return final_status_; -} - -void StreamContext::Cancel() { grpc_call_cancel(call()); } - -} // namespace grpc diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h deleted file mode 100644 index 8def589841..0000000000 --- a/src/cpp/stream/stream_context.h +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__ -#define __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__ - -#include <grpc/grpc.h> -#include <grpc++/status.h> -#include <grpc++/stream_context_interface.h> - -namespace google { -namespace protobuf { -class Message; -} -} - -namespace grpc { -class ClientContext; -class RpcMethod; - -class StreamContext final : public StreamContextInterface { - public: - StreamContext(const RpcMethod &method, ClientContext *context, - const google::protobuf::Message *request, - google::protobuf::Message *result); - StreamContext(const RpcMethod &method, grpc_call *call, - grpc_completion_queue *cq, google::protobuf::Message *request, - google::protobuf::Message *result); - ~StreamContext(); - // Start the stream, if there is a final write following immediately, set - // buffered so that the messages can be sent in batch. - void Start(bool buffered) override; - bool Read(google::protobuf::Message *msg) override; - bool Write(const google::protobuf::Message *msg, bool is_last) override; - const Status &Wait() override; - void Cancel() override; - - google::protobuf::Message *request() override { return request_; } - google::protobuf::Message *response() override { return result_; } - - private: - // Unique tags for plucking events from the c layer. this pointer is casted - // to char* to create single byte step between tags. It implicitly relies on - // that StreamContext is large enough to contain all the pointers. - void *finished_tag() { return reinterpret_cast<char *>(this); } - void *read_tag() { return reinterpret_cast<char *>(this) + 1; } - void *write_tag() { return reinterpret_cast<char *>(this) + 2; } - void *halfclose_tag() { return reinterpret_cast<char *>(this) + 3; } - void *client_metadata_read_tag() { - return reinterpret_cast<char *>(this) + 5; - } - grpc_call *call() { return call_; } - grpc_completion_queue *cq() { return cq_; } - - bool is_client_; - const RpcMethod *method_; // not owned - grpc_call *call_; // not owned - grpc_completion_queue *cq_; // not owned - google::protobuf::Message *request_; // first request, not owned - google::protobuf::Message *result_; // last response, not owned - - bool peer_halfclosed_; - bool self_halfclosed_; - Status final_status_; -}; - -} // namespace grpc - -#endif // __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__ diff --git a/test/core/statistics/census_log_tests.c b/test/core/statistics/census_log_tests.c index c7b2b2e46d..e2ad78a6f2 100644 --- a/test/core/statistics/census_log_tests.c +++ b/test/core/statistics/census_log_tests.c @@ -35,7 +35,7 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#include "src/core/support/cpu.h" +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/sync.h> diff --git a/test/cpp/end2end/async_test_server.cc b/test/cpp/end2end/async_test_server.cc deleted file mode 100644 index f18b6c00bc..0000000000 --- a/test/cpp/end2end/async_test_server.cc +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "test/cpp/end2end/async_test_server.h" - -#include <chrono> - -#include <grpc/support/log.h> -#include "src/cpp/proto/proto_utils.h" -#include "test/cpp/util/echo.pb.h" -#include <grpc++/async_server.h> -#include <grpc++/async_server_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/status.h> -#include <gtest/gtest.h> - -using grpc::cpp::test::util::EchoRequest; -using grpc::cpp::test::util::EchoResponse; - -using std::chrono::duration_cast; -using std::chrono::microseconds; -using std::chrono::seconds; -using std::chrono::system_clock; - -namespace grpc { -namespace testing { - -AsyncTestServer::AsyncTestServer() : server_(&cq_), cq_drained_(false) {} - -AsyncTestServer::~AsyncTestServer() {} - -void AsyncTestServer::AddPort(const grpc::string& addr) { - server_.AddPort(addr); -} - -void AsyncTestServer::Start() { server_.Start(); } - -// Return true if deadline actual is within 0.5s from expected. -bool DeadlineMatched(const system_clock::time_point& actual, - const system_clock::time_point& expected) { - microseconds diff_usecs = duration_cast<microseconds>(expected - actual); - gpr_log(GPR_INFO, "diff_usecs= %d", diff_usecs.count()); - return diff_usecs.count() < 500000 && diff_usecs.count() > -500000; -} - -void AsyncTestServer::RequestOneRpc() { server_.RequestOneRpc(); } - -void AsyncTestServer::MainLoop() { - EchoRequest request; - EchoResponse response; - void* tag = nullptr; - - RequestOneRpc(); - - while (true) { - CompletionQueue::CompletionType t = cq_.Next(&tag); - AsyncServerContext* server_context = static_cast<AsyncServerContext*>(tag); - switch (t) { - case CompletionQueue::SERVER_RPC_NEW: - gpr_log(GPR_INFO, "SERVER_RPC_NEW %p", server_context); - if (server_context) { - EXPECT_EQ(server_context->method(), "/foo"); - // TODO(ctiller): verify deadline - server_context->Accept(cq_.cq()); - // Handle only one rpc at a time. - RequestOneRpc(); - server_context->StartRead(&request); - } - break; - case CompletionQueue::RPC_END: - gpr_log(GPR_INFO, "RPC_END %p", server_context); - delete server_context; - break; - case CompletionQueue::SERVER_READ_OK: - gpr_log(GPR_INFO, "SERVER_READ_OK %p", server_context); - response.set_message(request.message()); - server_context->StartWrite(response, 0); - break; - case CompletionQueue::SERVER_READ_ERROR: - gpr_log(GPR_INFO, "SERVER_READ_ERROR %p", server_context); - server_context->StartWriteStatus(Status::OK); - break; - case CompletionQueue::HALFCLOSE_OK: - gpr_log(GPR_INFO, "HALFCLOSE_OK %p", server_context); - // Do nothing, just wait for RPC_END. - break; - case CompletionQueue::SERVER_WRITE_OK: - gpr_log(GPR_INFO, "SERVER_WRITE_OK %p", server_context); - server_context->StartRead(&request); - break; - case CompletionQueue::SERVER_WRITE_ERROR: - EXPECT_TRUE(0); - break; - case CompletionQueue::QUEUE_CLOSED: { - gpr_log(GPR_INFO, "QUEUE_CLOSED"); - HandleQueueClosed(); - return; - } - default: - EXPECT_TRUE(0); - break; - } - } -} - -void AsyncTestServer::HandleQueueClosed() { - std::unique_lock<std::mutex> lock(cq_drained_mu_); - cq_drained_ = true; - cq_drained_cv_.notify_all(); -} - -void AsyncTestServer::Shutdown() { - // The server need to be shut down before cq_ as grpc_server flushes all - // pending requested calls to the completion queue at shutdown. - server_.Shutdown(); - cq_.Shutdown(); - std::unique_lock<std::mutex> lock(cq_drained_mu_); - while (!cq_drained_) { - cq_drained_cv_.wait(lock); - } -} - -} // namespace testing -} // namespace grpc diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 4dea77ea81..52deb0f32d 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -147,8 +147,8 @@ class End2endTest : public ::testing::Test { // Setup server ServerBuilder builder; builder.AddPort(server_address_.str()); - builder.RegisterService(service_.service()); - builder.RegisterService(dup_pkg_service_.service()); + builder.RegisterService(&service_); + builder.RegisterService(&dup_pkg_service_); server_ = builder.BuildAndStart(); } @@ -290,7 +290,7 @@ TEST_F(End2endTest, RequestStreamOneRequest) { request.set_message("hello"); EXPECT_TRUE(stream->Write(request)); stream->WritesDone(); - Status s = stream->Wait(); + Status s = stream->Finish(); EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.IsOk()); @@ -308,7 +308,7 @@ TEST_F(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(stream->Write(request)); EXPECT_TRUE(stream->Write(request)); stream->WritesDone(); - Status s = stream->Wait(); + Status s = stream->Finish(); EXPECT_EQ(response.message(), "hellohello"); EXPECT_TRUE(s.IsOk()); @@ -332,7 +332,7 @@ TEST_F(End2endTest, ResponseStream) { EXPECT_EQ(response.message(), request.message() + "2"); EXPECT_FALSE(stream->Read(&response)); - Status s = stream->Wait(); + Status s = stream->Finish(); EXPECT_TRUE(s.IsOk()); delete stream; @@ -366,7 +366,7 @@ TEST_F(End2endTest, BidiStream) { stream->WritesDone(); EXPECT_FALSE(stream->Read(&response)); - Status s = stream->Wait(); + Status s = stream->Finish(); EXPECT_TRUE(s.IsOk()); delete stream; @@ -422,7 +422,7 @@ TEST_F(End2endTest, BadCredentials) { ClientContext context2; ClientReaderWriter<EchoRequest, EchoResponse>* stream = stub->BidiStream(&context2); - s = stream->Wait(); + s = stream->Finish(); EXPECT_FALSE(s.IsOk()); EXPECT_EQ(StatusCode::UNKNOWN, s.code()); EXPECT_EQ("Rpc sent on a lame channel.", s.details()); diff --git a/test/cpp/end2end/sync_client_async_server_test.cc b/test/cpp/end2end/sync_client_async_server_test.cc deleted file mode 100644 index 9955eb306f..0000000000 --- a/test/cpp/end2end/sync_client_async_server_test.cc +++ /dev/null @@ -1,236 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <chrono> -#include <memory> -#include <sstream> -#include <string> - -#include <grpc/grpc.h> -#include <grpc/support/thd.h> -#include "test/cpp/util/echo.pb.h" -#include <grpc++/channel_arguments.h> -#include <grpc++/channel_interface.h> -#include <grpc++/client_context.h> -#include <grpc++/create_channel.h> -#include <grpc++/impl/internal_stub.h> -#include <grpc++/impl/rpc_method.h> -#include <grpc++/status.h> -#include <grpc++/stream.h> -#include "test/cpp/end2end/async_test_server.h" -#include "test/core/util/port.h" -#include <gtest/gtest.h> - -using grpc::cpp::test::util::EchoRequest; -using grpc::cpp::test::util::EchoResponse; - -using std::chrono::duration_cast; -using std::chrono::microseconds; -using std::chrono::seconds; -using std::chrono::system_clock; - -using grpc::testing::AsyncTestServer; - -namespace grpc { -namespace { - -void ServerLoop(void* s) { - AsyncTestServer* server = static_cast<AsyncTestServer*>(s); - server->MainLoop(); -} - -class End2endTest : public ::testing::Test { - protected: - void SetUp() override { - int port = grpc_pick_unused_port_or_die(); - // TODO(yangg) protobuf has a StringPrintf, maybe use that - std::ostringstream oss; - oss << "[::]:" << port; - // Setup server - server_.reset(new AsyncTestServer()); - server_->AddPort(oss.str()); - server_->Start(); - - RunServerThread(); - - // Setup client - oss.str(""); - oss << "127.0.0.1:" << port; - std::shared_ptr<ChannelInterface> channel = - CreateChannel(oss.str(), ChannelArguments()); - stub_.set_channel(channel); - } - - void RunServerThread() { - gpr_thd_id id; - EXPECT_TRUE(gpr_thd_new(&id, ServerLoop, server_.get(), NULL)); - } - - void TearDown() override { server_->Shutdown(); } - - std::unique_ptr<AsyncTestServer> server_; - InternalStub stub_; -}; - -TEST_F(End2endTest, NoOpTest) { EXPECT_TRUE(stub_.channel() != nullptr); } - -TEST_F(End2endTest, SimpleRpc) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo"); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - Status s = - stub_.channel()->StartBlockingRpc(method, &context, request, &result); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); -} - -TEST_F(End2endTest, KSequentialSimpleRpcs) { - int k = 3; - for (int i = 0; i < k; i++) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo"); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - Status s = - stub_.channel()->StartBlockingRpc(method, &context, request, &result); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); - } -} - -TEST_F(End2endTest, OnePingpongBidiStream) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo", RpcMethod::RpcType::BIDI_STREAMING); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - StreamContextInterface* stream_interface = - stub_.channel()->CreateStream(method, &context, nullptr, nullptr); - std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream( - new ClientReaderWriter<EchoRequest, EchoResponse>(stream_interface)); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&result)); - stream->WritesDone(); - EXPECT_FALSE(stream->Read(&result)); - Status s = stream->Wait(); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); -} - -TEST_F(End2endTest, TwoPingpongBidiStream) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo", RpcMethod::RpcType::BIDI_STREAMING); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - StreamContextInterface* stream_interface = - stub_.channel()->CreateStream(method, &context, nullptr, nullptr); - std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream( - new ClientReaderWriter<EchoRequest, EchoResponse>(stream_interface)); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&result)); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&result)); - EXPECT_EQ(result.message(), request.message()); - stream->WritesDone(); - EXPECT_FALSE(stream->Read(&result)); - Status s = stream->Wait(); - EXPECT_TRUE(s.IsOk()); -} - -TEST_F(End2endTest, OnePingpongClientStream) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo", RpcMethod::RpcType::CLIENT_STREAMING); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - StreamContextInterface* stream_interface = - stub_.channel()->CreateStream(method, &context, nullptr, &result); - std::unique_ptr<ClientWriter<EchoRequest>> stream( - new ClientWriter<EchoRequest>(stream_interface)); - EXPECT_TRUE(stream->Write(request)); - stream->WritesDone(); - Status s = stream->Wait(); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); -} - -TEST_F(End2endTest, OnePingpongServerStream) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo", RpcMethod::RpcType::SERVER_STREAMING); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - StreamContextInterface* stream_interface = - stub_.channel()->CreateStream(method, &context, &request, nullptr); - std::unique_ptr<ClientReader<EchoResponse>> stream( - new ClientReader<EchoResponse>(stream_interface)); - EXPECT_TRUE(stream->Read(&result)); - EXPECT_FALSE(stream->Read(nullptr)); - Status s = stream->Wait(); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); -} - -} // namespace -} // namespace grpc - -int main(int argc, char** argv) { - grpc_init(); - ::testing::InitGoogleTest(&argc, argv); - int result = RUN_ALL_TESTS(); - grpc_shutdown(); - return result; -} diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 0fa76f0e02..29bbe4d931 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -248,7 +248,7 @@ void DoRequestStreaming() { aggregated_payload_size += request_stream_sizes[i]; } stream->WritesDone(); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size); GPR_ASSERT(s.IsOk()); @@ -278,7 +278,7 @@ void DoResponseStreaming() { ++i; } GPR_ASSERT(response_stream_sizes.size() == i); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Response streaming done."); @@ -311,7 +311,7 @@ void DoResponseStreamingWithSlowConsumer() { ++i; } GPR_ASSERT(kNumResponseMessages == i); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Response streaming done."); @@ -345,7 +345,7 @@ void DoHalfDuplex() { ++i; } GPR_ASSERT(response_stream_sizes.size() == i); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Half-duplex streaming rpc done."); } @@ -378,7 +378,7 @@ void DoPingPong() { stream->WritesDone(); GPR_ASSERT(!stream->Read(&response)); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Ping pong streaming done."); } diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc index 8a6be57929..a8399779b9 100644 --- a/test/cpp/interop/server.cc +++ b/test/cpp/interop/server.cc @@ -200,7 +200,7 @@ void RunServer() { ServerBuilder builder; builder.AddPort(server_address.str()); - builder.RegisterService(service.service()); + builder.RegisterService(&service); if (FLAGS_enable_ssl) { SslServerCredentialsOptions ssl_opts = { "", {{test_server1_key, test_server1_cert}}}; diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index c35d9ebdd8..4e1d2cab0e 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -125,7 +125,7 @@ static void RunServer() { ServerBuilder builder; builder.AddPort(server_address); - builder.RegisterService(service.service()); + builder.RegisterService(&service); std::unique_ptr<Server> server(builder.BuildAndStart()); gpr_log(GPR_INFO, "Server listening on %s\n", server_address); diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 197dc3b2ba..2ab3f50bb2 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -283,10 +283,6 @@ }, { "language": "c++", - "name": "sync_client_async_server_test" - }, - { - "language": "c++", "name": "thread_pool_test" }, { |