diff options
author | Craig Tiller <ctiller@google.com> | 2015-04-29 13:46:17 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-04-29 13:46:17 -0700 |
commit | 455253f52cb184208b0fb6bba87f5774230fb7f3 (patch) | |
tree | eb873d6ec32c266f54208dc96353958c8b495a8e /src | |
parent | 27aa7afb728ad83a742f429dc47266900d6fd449 (diff) | |
parent | dba5e0f96be51cce1343a31b5bd3734342642a29 (diff) |
Merge branch 'one-read' into swappy
Diffstat (limited to 'src')
21 files changed, 582 insertions, 201 deletions
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 3825713aba..cef5e09760 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -375,7 +375,7 @@ static void grpc_tcp_continue_read(grpc_tcp *tcp) { slice_state_destroy(&read_state); grpc_tcp_unref(tcp); } else { - /* Spurious read event, consume it here */ + /* We've consumed the edge, request a new one */ slice_state_destroy(&read_state); grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } diff --git a/src/core/profiling/timers.c b/src/core/profiling/timers.c index 7cc79bd22b..bd1700ffd8 100644 --- a/src/core/profiling/timers.c +++ b/src/core/profiling/timers.c @@ -40,10 +40,12 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> #include <grpc/support/sync.h> +#include <grpc/support/thd.h> #include <stdio.h> typedef struct grpc_timer_entry { grpc_precise_clock tm; + gpr_thd_id thd; const char* tag; void* id; const char* file; @@ -85,7 +87,7 @@ static void log_report_locked(grpc_timers_log* log) { grpc_timer_entry* entry = &(log->log[i]); fprintf(fp, "GRPC_LAT_PROF "); grpc_precise_clock_print(&entry->tm, fp); - fprintf(fp, " %s %p %s %d\n", entry->tag, entry->id, entry->file, + fprintf(fp, " %p %s %p %s %d\n", (void*)(gpr_intptr)entry->thd, entry->tag, entry->id, entry->file, entry->line); } @@ -121,6 +123,7 @@ void grpc_timers_log_add(grpc_timers_log* log, const char* tag, void* id, entry->id = id; entry->file = file; entry->line = line; + entry->thd = gpr_thd_currentid(); gpr_mu_unlock(&log->mu); } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index d2f46ddd07..0e4b9fc9d3 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -163,7 +163,7 @@ grpc_credentials *grpc_google_default_credentials_create(void) { gpr_mu_lock(&g_mu); if (default_credentials != NULL) { - result = default_credentials; + result = grpc_credentials_ref(default_credentials); serving_cached_credentials = 1; goto end; } diff --git a/src/core/support/alloc.c b/src/core/support/alloc.c index a19a0141d4..d2ed82e771 100644 --- a/src/core/support/alloc.c +++ b/src/core/support/alloc.c @@ -55,7 +55,7 @@ void *gpr_realloc(void *p, size_t size) { } void *gpr_malloc_aligned(size_t size, size_t alignment_log) { - size_t alignment = 1 << alignment_log; + size_t alignment = ((size_t)1) << alignment_log; size_t extra = alignment - 1 + sizeof(void *); void *p = gpr_malloc(size + extra); void **ret = (void **)(((gpr_uintptr)p + extra) & ~(alignment - 1)); diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index 539470bccf..f4443b5c2d 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -64,7 +64,7 @@ void gpr_sleep_until(gpr_timespec until) { } delta = gpr_time_sub(until, now); - sleep_millis = delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; + sleep_millis = (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS; Sleep(sleep_millis); } } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index d21bfaa592..e7767cc22d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -34,6 +34,7 @@ #include "src/core/surface/call.h" #include "src/core/channel/channel_stack.h" #include "src/core/iomgr/alarm.h" +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/channel.h" @@ -694,6 +695,7 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { static void call_on_done_recv(void *pc, int success) { grpc_call *call = pc; size_t i; + GRPC_TIMER_MARK(CALL_ON_DONE_RECV_BEGIN, 0); lock(call); call->receiving = 0; if (success) { @@ -734,6 +736,7 @@ static void call_on_done_recv(void *pc, int success) { unlock(call); GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0); + GRPC_TIMER_MARK(CALL_ON_DONE_RECV_END, 0); } static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 65161bd1af..dd7c60710b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -37,6 +37,7 @@ #include <stdio.h> #include <string.h> +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/frame_data.h" #include "src/core/transport/chttp2/frame_goaway.h" @@ -783,6 +784,8 @@ static void unlock(transport *t) { grpc_stream_op_buffer nuke_now; const grpc_transport_callbacks *cb = t->cb; + GRPC_TIMER_MARK(HTTP2_UNLOCK_BEGIN, 0); + grpc_sopb_init(&nuke_now); if (t->nuke_later_sopb.nops) { grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb); @@ -831,6 +834,8 @@ static void unlock(transport *t) { /* finally unlock */ gpr_mu_unlock(&t->mu); + GRPC_TIMER_MARK(HTTP2_UNLOCK_CLEANUP, 0); + /* perform some callbacks if necessary */ for (i = 0; i < num_goaways; i++) { cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug); @@ -861,6 +866,8 @@ static void unlock(transport *t) { grpc_sopb_destroy(&nuke_now); gpr_free(goaways); + + GRPC_TIMER_MARK(HTTP2_UNLOCK_END, 0); } /* @@ -907,8 +914,8 @@ static int prepare_write(transport *t) { window_delta = grpc_chttp2_preencode( s->outgoing_sopb->ops, &s->outgoing_sopb->nops, GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb); - FLOWCTL_TRACE(t, t, outgoing, 0, -window_delta); - FLOWCTL_TRACE(t, s, outgoing, s->id, -window_delta); + FLOWCTL_TRACE(t, t, outgoing, 0, -(gpr_int64)window_delta); + FLOWCTL_TRACE(t, s, outgoing, s->id, -(gpr_int64)window_delta); t->outgoing_window -= window_delta; s->outgoing_window -= window_delta; @@ -1270,8 +1277,8 @@ static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) { return GRPC_CHTTP2_CONNECTION_ERROR; } - FLOWCTL_TRACE(t, t, incoming, 0, -t->incoming_frame_size); - FLOWCTL_TRACE(t, s, incoming, s->id, -t->incoming_frame_size); + FLOWCTL_TRACE(t, t, incoming, 0, -(gpr_int64)t->incoming_frame_size); + FLOWCTL_TRACE(t, s, incoming, s->id, -(gpr_int64)t->incoming_frame_size); t->incoming_window -= t->incoming_frame_size; s->incoming_window -= t->incoming_frame_size; @@ -1411,7 +1418,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) { gpr_log(GPR_ERROR, "ignoring out of order new stream request on server; last stream " "id=%d, new stream id=%d", - t->last_incoming_stream_id, t->incoming_stream); + t->last_incoming_stream_id, t->incoming_stream_id); + return init_skip_frame(t, 1); + } else if ((t->incoming_stream_id & 1) == 0) { + gpr_log(GPR_ERROR, "ignoring stream with non-client generated index %d", t->incoming_stream_id); return init_skip_frame(t, 1); } t->incoming_stream = NULL; diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 04c1d79022..7bbe8276c3 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -41,6 +41,9 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> +/* These routines are here to facilitate debugging - they produce string + representations of various transport data structures */ + static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { gpr_strvec_add(b, gpr_strdup("key=")); gpr_strvec_add( diff --git a/src/objective-c/examples/Sample/Podfile.lock b/src/objective-c/examples/Sample/Podfile.lock deleted file mode 100644 index ccf5aa0f06..0000000000 --- a/src/objective-c/examples/Sample/Podfile.lock +++ /dev/null @@ -1,44 +0,0 @@ -PODS: - - gRPC (0.0.1): - - gRPC/C-Core (= 0.0.1) - - gRPC/GRPCClient (= 0.0.1) - - gRPC/ProtoRPC (= 0.0.1) - - gRPC/RxLibrary (= 0.0.1) - - gRPC/C-Core (0.0.1): - - OpenSSL (~> 1.0.200) - - gRPC/GRPCClient (0.0.1): - - gRPC/C-Core - - gRPC/RxLibrary - - gRPC/ProtoRPC (0.0.1): - - gRPC/GRPCClient - - gRPC/RxLibrary - - gRPC/RxLibrary (0.0.1) - - OpenSSL (1.0.201) - - ProtocolBuffers (1.9.8) - - RemoteTest (0.0.1): - - gRPC (~> 0.0) - - ProtocolBuffers (~> 1.9) - - Route_guide (0.0.1): - - ProtocolBuffers (~> 1.9) - -DEPENDENCIES: - - gRPC (from `../../../..`) - - RemoteTest (from `RemoteTestClient`) - - Route_guide (from `RouteGuideClient`) - -EXTERNAL SOURCES: - gRPC: - :path: ../../../.. - RemoteTest: - :path: RemoteTestClient - Route_guide: - :path: RouteGuideClient - -SPEC CHECKSUMS: - gRPC: f6c1bf5dde59ab543e4bd1d5e2ea56da4a9a0253 - OpenSSL: 4e990d04b14015c49c800c400b86ae44a4818a5c - ProtocolBuffers: 9a4a171c0c7cc8f21dd29aeca4f9ac775d84a880 - RemoteTest: 021a51c04d5795f286b379ca5ef14d0be5b2fb9b - Route_guide: a277da8eef182774abb050d7b81109f5878f8652 - -COCOAPODS: 0.36.0 diff --git a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m index 6c5de81277..2ef6a6e4ab 100644 --- a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m +++ b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m @@ -34,6 +34,7 @@ #import <UIKit/UIKit.h> #import <XCTest/XCTest.h> +#import <gRPC/GRXWriter+Immediate.h> #import <RemoteTest/Messages.pb.h> #import <RemoteTest/Test.pb.h> @@ -48,43 +49,85 @@ _service = [[RMTTestService alloc] initWithHost:@"grpc-test.sandbox.google.com"]; } -- (void)testEmptyRPC { - __weak XCTestExpectation *noRPCError = [self expectationWithDescription:@"RPC succeeded."]; - __weak XCTestExpectation *responded = [self expectationWithDescription:@"Response received."]; +// Tests as described here: https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md - [_service emptyCallWithRequest:[RMTEmpty defaultInstance] - handler:^(RMTEmpty *response, NSError *error) { +- (void)testEmptyUnaryRPC { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"]; + + RMTEmpty *request = [RMTEmpty defaultInstance]; + + [_service emptyCallWithRequest:request handler:^(RMTEmpty *response, NSError *error) { XCTAssertNil(error, @"Finished with unexpected error: %@", error); - [noRPCError fulfill]; - XCTAssertNotNil(response, @"nil response received."); - [responded fulfill]; + + id expectedResponse = [RMTEmpty defaultInstance]; + XCTAssertEqualObjects(response, expectedResponse); + + [expectation fulfill]; }]; [self waitForExpectationsWithTimeout:2. handler:nil]; } -- (void)testSimpleProtoRPC { - __weak XCTestExpectation *noRPCError = [self expectationWithDescription:@"RPC succeeded."]; - __weak XCTestExpectation *responded = [self expectationWithDescription:@"Response received."]; - __weak XCTestExpectation *validResponse = [self expectationWithDescription:@"Valid response."]; +- (void)testLargeUnaryRPC { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"]; RMTSimpleRequest *request = [[[[[[RMTSimpleRequestBuilder alloc] init] - setResponseSize:100] - setFillUsername:YES] - setFillOauthScope:YES] + setResponseType:RMTPayloadTypeCompressable] + setResponseSize:314159] + setPayloadBuilder:[[[RMTPayloadBuilder alloc] init] + setBody:[NSMutableData dataWithLength:271828]]] build]; + [_service unaryCallWithRequest:request handler:^(RMTSimpleResponse *response, NSError *error) { XCTAssertNil(error, @"Finished with unexpected error: %@", error); - [noRPCError fulfill]; - XCTAssertNotNil(response, @"nil response received."); - [responded fulfill]; - // We expect empty strings, not nil: - XCTAssertNotNil(response.username, @"Response's username is nil."); - XCTAssertNotNil(response.oauthScope, @"Response's OAuth scope is nil."); - [validResponse fulfill]; + + id expectedResponse = [[[[RMTSimpleResponseBuilder alloc] init] + setPayloadBuilder:[[[[RMTPayloadBuilder alloc] init] + setType:RMTPayloadTypeCompressable] + setBody:[NSMutableData dataWithLength:314159]]] + build]; + XCTAssertEqualObjects(response, expectedResponse); + + [expectation fulfill]; }]; - [self waitForExpectationsWithTimeout:2. handler:nil]; + [self waitForExpectationsWithTimeout:4. handler:nil]; +} + +- (void)testClientStreamingRPC { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"]; + + id request1 = [[[[RMTStreamingInputCallRequestBuilder alloc] init] + setPayloadBuilder:[[[RMTPayloadBuilder alloc] init] + setBody:[NSMutableData dataWithLength:27182]]] + build]; + id request2 = [[[[RMTStreamingInputCallRequestBuilder alloc] init] + setPayloadBuilder:[[[RMTPayloadBuilder alloc] init] + setBody:[NSMutableData dataWithLength:8]]] + build]; + id request3 = [[[[RMTStreamingInputCallRequestBuilder alloc] init] + setPayloadBuilder:[[[RMTPayloadBuilder alloc] init] + setBody:[NSMutableData dataWithLength:1828]]] + build]; + id request4 = [[[[RMTStreamingInputCallRequestBuilder alloc] init] + setPayloadBuilder:[[[RMTPayloadBuilder alloc] init] + setBody:[NSMutableData dataWithLength:45904]]] + build]; + id<GRXWriter> writer = [GRXWriter writerWithContainer:@[request1, request2, request3, request4]]; + + [_service streamingInputCallWithRequestsWriter:writer + handler:^(RMTStreamingInputCallResponse *response, NSError *error) { + XCTAssertNil(error, @"Finished with unexpected error: %@", error); + + id expectedResponse = [[[[RMTStreamingInputCallResponseBuilder alloc] init] + setAggregatedPayloadSize:74922] + build]; + XCTAssertEqualObjects(response, expectedResponse); + + [expectation fulfill]; + }]; + + [self waitForExpectationsWithTimeout:4. handler:nil]; } @end diff --git a/src/python/src/grpc/_adapter/_c_test.py b/src/python/src/grpc/_adapter/_c_test.py index 437a6730cd..6e15adbda8 100644 --- a/src/python/src/grpc/_adapter/_c_test.py +++ b/src/python/src/grpc/_adapter/_c_test.py @@ -83,8 +83,11 @@ class _CTest(unittest.TestCase): _c.init() channel = _c.Channel('%s:%d' % (host, 12345), None) - call = _c.Call(channel, method, host, time.time() + _TIMEOUT) + completion_queue = _c.CompletionQueue() + call = _c.Call(channel, completion_queue, method, host, + time.time() + _TIMEOUT) del call + del completion_queue del channel _c.shut_down() diff --git a/src/python/src/grpc/_adapter/_call.c b/src/python/src/grpc/_adapter/_call.c index bf96c1a3fa..d833268fc9 100644 --- a/src/python/src/grpc/_adapter/_call.c +++ b/src/python/src/grpc/_adapter/_call.c @@ -36,90 +36,166 @@ #include <math.h> #include <Python.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include "grpc/_adapter/_channel.h" #include "grpc/_adapter/_completion_queue.h" #include "grpc/_adapter/_error.h" +#include "grpc/_adapter/_tag.h" -static int pygrpc_call_init(Call *self, PyObject *args, PyObject *kwds) { - const PyObject *channel; +static PyObject *pygrpc_call_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { + Call *self = (Call *)type->tp_alloc(type, 0); + Channel *channel; + CompletionQueue *completion_queue; const char *method; const char *host; double deadline; - static char *kwlist[] = {"channel", "method", "host", "deadline", NULL}; - - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!ssd:Call", kwlist, - &pygrpc_ChannelType, &channel, &method, - &host, &deadline)) { - return -1; + static char *kwlist[] = {"channel", "completion_queue", + "method", "host", "deadline", NULL}; + + if (!PyArg_ParseTupleAndKeywords( + args, kwds, "O!O!ssd:Call", kwlist, + &pygrpc_ChannelType, &channel, + &pygrpc_CompletionQueueType, &completion_queue, + &method, &host, &deadline)) { + return NULL; } /* TODO(nathaniel): Hoist the gpr_timespec <-> PyFloat arithmetic into its own * function with its own test coverage. */ - self->c_call = grpc_channel_create_call_old( - ((Channel *)channel)->c_channel, method, host, + self->c_call = grpc_channel_create_call( + channel->c_channel, completion_queue->c_completion_queue, method, host, gpr_time_from_nanos(deadline * GPR_NS_PER_SEC)); - - return 0; + self->completion_queue = completion_queue; + Py_INCREF(self->completion_queue); + self->channel = channel; + Py_INCREF(self->channel); + grpc_call_details_init(&self->call_details); + grpc_metadata_array_init(&self->recv_metadata); + grpc_metadata_array_init(&self->recv_trailing_metadata); + self->send_metadata = NULL; + self->send_metadata_count = 0; + self->send_trailing_metadata = NULL; + self->send_trailing_metadata_count = 0; + self->send_message = NULL; + self->recv_message = NULL; + self->adding_to_trailing = 0; + + return (PyObject *)self; } static void pygrpc_call_dealloc(Call *self) { if (self->c_call != NULL) { grpc_call_destroy(self->c_call); } + Py_XDECREF(self->completion_queue); + Py_XDECREF(self->channel); + Py_XDECREF(self->server); + grpc_call_details_destroy(&self->call_details); + grpc_metadata_array_destroy(&self->recv_metadata); + grpc_metadata_array_destroy(&self->recv_trailing_metadata); + if (self->send_message) { + grpc_byte_buffer_destroy(self->send_message); + } + if (self->recv_message) { + grpc_byte_buffer_destroy(self->recv_message); + } + gpr_free(self->status_details); + gpr_free(self->send_metadata); + gpr_free(self->send_trailing_metadata); self->ob_type->tp_free((PyObject *)self); } static const PyObject *pygrpc_call_invoke(Call *self, PyObject *args) { - const PyObject *completion_queue; - const PyObject *metadata_tag; - const PyObject *finish_tag; + PyObject *completion_queue; + PyObject *metadata_tag; + PyObject *finish_tag; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_init_metadata_tag; + pygrpc_tag *c_metadata_tag; + pygrpc_tag *c_finish_tag; + grpc_op send_initial_metadata; + grpc_op recv_initial_metadata; + grpc_op recv_status_on_client; if (!(PyArg_ParseTuple(args, "O!OO:invoke", &pygrpc_CompletionQueueType, &completion_queue, &metadata_tag, &finish_tag))) { return NULL; } - - call_error = grpc_call_invoke_old( - self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue, - (void *)metadata_tag, (void *)finish_tag, 0); - + send_initial_metadata.op = GRPC_OP_SEND_INITIAL_METADATA; + send_initial_metadata.data.send_initial_metadata.metadata = self->send_metadata; + send_initial_metadata.data.send_initial_metadata.count = self->send_metadata_count; + recv_initial_metadata.op = GRPC_OP_RECV_INITIAL_METADATA; + recv_initial_metadata.data.recv_initial_metadata = &self->recv_metadata; + recv_status_on_client.op = GRPC_OP_RECV_STATUS_ON_CLIENT; + recv_status_on_client.data.recv_status_on_client.trailing_metadata = &self->recv_trailing_metadata; + recv_status_on_client.data.recv_status_on_client.status = &self->status; + recv_status_on_client.data.recv_status_on_client.status_details = &self->status_details; + recv_status_on_client.data.recv_status_on_client.status_details_capacity = &self->status_details_capacity; + c_init_metadata_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self); + c_metadata_tag = pygrpc_tag_new(PYGRPC_CLIENT_METADATA_READ, metadata_tag, self); + c_finish_tag = pygrpc_tag_new(PYGRPC_FINISHED_CLIENT, finish_tag, self); + + call_error = grpc_call_start_batch(self->c_call, &send_initial_metadata, 1, c_init_metadata_tag); + result = pygrpc_translate_call_error(call_error); + if (result == NULL) { + pygrpc_tag_destroy(c_init_metadata_tag); + pygrpc_tag_destroy(c_metadata_tag); + pygrpc_tag_destroy(c_finish_tag); + return result; + } + call_error = grpc_call_start_batch(self->c_call, &recv_initial_metadata, 1, c_metadata_tag); + result = pygrpc_translate_call_error(call_error); + if (result == NULL) { + pygrpc_tag_destroy(c_metadata_tag); + pygrpc_tag_destroy(c_finish_tag); + return result; + } + call_error = grpc_call_start_batch(self->c_call, &recv_status_on_client, 1, c_finish_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(metadata_tag); - Py_INCREF(finish_tag); + if (result == NULL) { + pygrpc_tag_destroy(c_finish_tag); + return result; } + return result; } static const PyObject *pygrpc_call_write(Call *self, PyObject *args) { const char *bytes; int length; - const PyObject *tag; + PyObject *tag; gpr_slice slice; grpc_byte_buffer *byte_buffer; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag; + grpc_op op; if (!(PyArg_ParseTuple(args, "s#O:write", &bytes, &length, &tag))) { return NULL; } + c_tag = pygrpc_tag_new(PYGRPC_WRITE_ACCEPTED, tag, self); slice = gpr_slice_from_copied_buffer(bytes, length); byte_buffer = grpc_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); - call_error = - grpc_call_start_write_old(self->c_call, byte_buffer, (void *)tag, 0); + if (self->send_message) { + grpc_byte_buffer_destroy(self->send_message); + } + self->send_message = byte_buffer; + + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message = self->send_message; - grpc_byte_buffer_destroy(byte_buffer); + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(tag); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } return result; } @@ -127,36 +203,42 @@ static const PyObject *pygrpc_call_write(Call *self, PyObject *args) { static const PyObject *pygrpc_call_complete(Call *self, PyObject *tag) { grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self); + grpc_op op; - call_error = grpc_call_writes_done_old(self->c_call, (void *)tag); + op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(tag); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } return result; } static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) { - const PyObject *completion_queue; - const PyObject *tag; + PyObject *completion_queue; + PyObject *tag; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag; + grpc_op op; if (!(PyArg_ParseTuple(args, "O!O:accept", &pygrpc_CompletionQueueType, &completion_queue, &tag))) { return NULL; } - call_error = grpc_call_server_accept_old( - self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue, - (void *)tag); - result = pygrpc_translate_call_error(call_error); + op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op.data.recv_close_on_server.cancelled = &self->cancelled; + c_tag = pygrpc_tag_new(PYGRPC_FINISHED_SERVER, tag, self); - if (result != NULL) { - Py_INCREF(tag); + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); + result = pygrpc_translate_call_error(call_error); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } - return result; } @@ -171,24 +253,52 @@ static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) { metadata.key = key; metadata.value = value; metadata.value_length = value_length; - return pygrpc_translate_call_error( - grpc_call_add_metadata_old(self->c_call, &metadata, 0)); + if (self->adding_to_trailing) { + self->send_trailing_metadata = gpr_realloc(self->send_trailing_metadata, (self->send_trailing_metadata_count + 1) * sizeof(grpc_metadata)); + self->send_trailing_metadata[self->send_trailing_metadata_count] = metadata; + self->send_trailing_metadata_count = self->send_trailing_metadata_count + 1; + } else { + self->send_metadata = gpr_realloc(self->send_metadata, (self->send_metadata_count + 1) * sizeof(grpc_metadata)); + self->send_metadata[self->send_metadata_count] = metadata; + self->send_metadata_count = self->send_metadata_count + 1; + } + return pygrpc_translate_call_error(GRPC_CALL_OK); } static const PyObject *pygrpc_call_premetadata(Call *self) { - return pygrpc_translate_call_error( - grpc_call_server_end_initial_metadata_old(self->c_call, 0)); + grpc_op op; + grpc_call_error call_error; + const PyObject *result; + pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self); + op.op = GRPC_OP_SEND_INITIAL_METADATA; + op.data.send_initial_metadata.metadata = self->send_metadata; + op.data.send_initial_metadata.count = self->send_metadata_count; + self->adding_to_trailing = 1; + + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); + result = pygrpc_translate_call_error(call_error); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); + } + return result; } static const PyObject *pygrpc_call_read(Call *self, PyObject *tag) { + grpc_op op; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_READ, tag, self); - call_error = grpc_call_start_read_old(self->c_call, (void *)tag); - + op.op = GRPC_OP_RECV_MESSAGE; + if (self->recv_message) { + grpc_byte_buffer_destroy(self->recv_message); + self->recv_message = NULL; + } + op.data.recv_message = &self->recv_message; + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(tag); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } return result; } @@ -197,15 +307,18 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) { PyObject *status; PyObject *code; PyObject *details; - const PyObject *tag; + PyObject *tag; grpc_status_code c_code; char *c_message; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag; + grpc_op op; if (!(PyArg_ParseTuple(args, "OO:status", &status, &tag))) { return NULL; } + c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self); code = PyObject_GetAttrString(status, "code"); if (code == NULL) { @@ -227,13 +340,21 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) { if (c_message == NULL) { return NULL; } - - call_error = grpc_call_start_write_status_old(self->c_call, c_code, c_message, - (void *)tag); - + if (self->status_details) { + gpr_free(self->status_details); + } + self->status_details = gpr_malloc(strlen(c_message)+1); + strcpy(self->status_details, c_message); + op.op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op.data.send_status_from_server.trailing_metadata_count = self->send_trailing_metadata_count; + op.data.send_status_from_server.trailing_metadata = self->send_trailing_metadata; + op.data.send_status_from_server.status = c_code; + op.data.send_status_from_server.status_details = self->status_details; + + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(tag); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } return result; } @@ -301,9 +422,9 @@ PyTypeObject pygrpc_CallType = { 0, /* tp_descr_get */ 0, /* tp_descr_set */ 0, /* tp_dictoffset */ - (initproc)pygrpc_call_init, /* tp_init */ + 0, /* tp_init */ 0, /* tp_alloc */ - PyType_GenericNew, /* tp_new */ + pygrpc_call_new, /* tp_new */ }; int pygrpc_add_call(PyObject *module) { diff --git a/src/python/src/grpc/_adapter/_call.h b/src/python/src/grpc/_adapter/_call.h index c04a2285f7..fb9160901b 100644 --- a/src/python/src/grpc/_adapter/_call.h +++ b/src/python/src/grpc/_adapter/_call.h @@ -37,8 +37,36 @@ #include <Python.h> #include <grpc/grpc.h> +#include "grpc/_adapter/_completion_queue.h" +#include "grpc/_adapter/_channel.h" +#include "grpc/_adapter/_server.h" + typedef struct { PyObject_HEAD + + CompletionQueue *completion_queue; + Channel *channel; + Server *server; + + /* Legacy state. */ + grpc_call_details call_details; + grpc_metadata_array recv_metadata; + grpc_metadata_array recv_trailing_metadata; + grpc_metadata *send_metadata; + size_t send_metadata_count; + grpc_metadata *send_trailing_metadata; + size_t send_trailing_metadata_count; + int adding_to_trailing; + + grpc_byte_buffer *send_message; + grpc_byte_buffer *recv_message; + + grpc_status_code status; + char *status_details; + size_t status_details_capacity; + + int cancelled; + grpc_call *c_call; } Call; diff --git a/src/python/src/grpc/_adapter/_completion_queue.c b/src/python/src/grpc/_adapter/_completion_queue.c index a639eff53e..5f1cb2a3a6 100644 --- a/src/python/src/grpc/_adapter/_completion_queue.c +++ b/src/python/src/grpc/_adapter/_completion_queue.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include "grpc/_adapter/_call.h" +#include "grpc/_adapter/_tag.h" static PyObject *status_class; static PyObject *service_acceptance_class; @@ -138,74 +139,70 @@ static PyObject *pygrpc_stop_event_args(grpc_event *c_event) { } static PyObject *pygrpc_write_event_args(grpc_event *c_event) { - PyObject *write_accepted = - c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False; - return PyTuple_Pack(8, write_event_kind, (PyObject *)c_event->tag, + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + PyObject *write_accepted = Py_True; + return PyTuple_Pack(8, write_event_kind, user_tag, write_accepted, Py_None, Py_None, Py_None, Py_None, Py_None); } static PyObject *pygrpc_complete_event_args(grpc_event *c_event) { - PyObject *complete_accepted = - c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False; - return PyTuple_Pack(8, complete_event_kind, (PyObject *)c_event->tag, + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + PyObject *complete_accepted = Py_True; + return PyTuple_Pack(8, complete_event_kind, user_tag, Py_None, complete_accepted, Py_None, Py_None, Py_None, Py_None); } static PyObject *pygrpc_service_event_args(grpc_event *c_event) { - if (c_event->data.server_rpc_new.method == NULL) { + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + if (tag->call->call_details.method == NULL) { return PyTuple_Pack( - 8, service_event_kind, c_event->tag, Py_None, Py_None, Py_None, Py_None, + 8, service_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None, Py_None, Py_None); } else { PyObject *method = NULL; PyObject *host = NULL; PyObject *service_deadline = NULL; - Call *call = NULL; PyObject *service_acceptance = NULL; PyObject *metadata = NULL; PyObject *event_args = NULL; - method = PyBytes_FromString(c_event->data.server_rpc_new.method); + method = PyBytes_FromString(tag->call->call_details.method); if (method == NULL) { goto error; } - host = PyBytes_FromString(c_event->data.server_rpc_new.host); + host = PyBytes_FromString(tag->call->call_details.host); if (host == NULL) { goto error; } service_deadline = - pygrpc_as_py_time(&c_event->data.server_rpc_new.deadline); + pygrpc_as_py_time(&tag->call->call_details.deadline); if (service_deadline == NULL) { goto error; } - call = PyObject_New(Call, &pygrpc_CallType); - if (call == NULL) { - goto error; - } - call->c_call = c_event->call; - service_acceptance = - PyObject_CallFunctionObjArgs(service_acceptance_class, call, method, - host, service_deadline, NULL); + PyObject_CallFunctionObjArgs(service_acceptance_class, tag->call, + method, host, service_deadline, NULL); if (service_acceptance == NULL) { goto error; } metadata = pygrpc_metadata_collection_get( - c_event->data.server_rpc_new.metadata_elements, - c_event->data.server_rpc_new.metadata_count); + tag->call->recv_metadata.metadata, + tag->call->recv_metadata.count); event_args = PyTuple_Pack(8, service_event_kind, - (PyObject *)c_event->tag, Py_None, Py_None, + user_tag, Py_None, Py_None, service_acceptance, Py_None, Py_None, metadata); Py_DECREF(service_acceptance); Py_DECREF(metadata); error: - Py_XDECREF(call); Py_XDECREF(method); Py_XDECREF(host); Py_XDECREF(service_deadline); @@ -215,8 +212,10 @@ error: } static PyObject *pygrpc_read_event_args(grpc_event *c_event) { - if (c_event->data.read == NULL) { - return PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag, + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + if (tag->call->recv_message == NULL) { + return PyTuple_Pack(8, read_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None, Py_None, Py_None); } else { size_t length; @@ -227,8 +226,8 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) { PyObject *bytes; PyObject *event_args; - length = grpc_byte_buffer_length(c_event->data.read); - reader = grpc_byte_buffer_reader_create(c_event->data.read); + length = grpc_byte_buffer_length(tag->call->recv_message); + reader = grpc_byte_buffer_reader_create(tag->call->recv_message); c_bytes = gpr_malloc(length); offset = 0; while (grpc_byte_buffer_reader_next(reader, &slice)) { @@ -242,7 +241,7 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) { if (bytes == NULL) { return NULL; } - event_args = PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag, + event_args = PyTuple_Pack(8, read_event_kind, user_tag, Py_None, Py_None, Py_None, bytes, Py_None, Py_None); Py_DECREF(bytes); @@ -251,32 +250,65 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) { } static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) { + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; PyObject *metadata = pygrpc_metadata_collection_get( - c_event->data.client_metadata_read.elements, - c_event->data.client_metadata_read.count); + tag->call->recv_metadata.metadata, + tag->call->recv_metadata.count); PyObject* result = PyTuple_Pack( - 8, metadata_event_kind, (PyObject *)c_event->tag, Py_None, Py_None, + 8, metadata_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None, Py_None, metadata); Py_DECREF(metadata); return result; } -static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { +static PyObject *pygrpc_finished_server_event_args(grpc_event *c_event) { + PyObject *code; + PyObject *details; + PyObject *status; + PyObject *event_args; + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + + code = pygrpc_status_code(tag->call->cancelled ? GRPC_STATUS_CANCELLED : GRPC_STATUS_OK); + if (code == NULL) { + PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!"); + return NULL; + } + details = PyBytes_FromString(""); + if (details == NULL) { + return NULL; + } + status = PyObject_CallFunctionObjArgs(status_class, code, details, NULL); + Py_DECREF(details); + if (status == NULL) { + return NULL; + } + event_args = PyTuple_Pack(8, finish_event_kind, user_tag, + Py_None, Py_None, Py_None, Py_None, status, + Py_None); + Py_DECREF(status); + return event_args; +} + +static PyObject *pygrpc_finished_client_event_args(grpc_event *c_event) { PyObject *code; PyObject *details; PyObject *status; PyObject *event_args; PyObject *metadata; + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; - code = pygrpc_status_code(c_event->data.finished.status); + code = pygrpc_status_code(tag->call->status); if (code == NULL) { PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!"); return NULL; } - if (c_event->data.finished.details == NULL) { + if (tag->call->status_details == NULL) { details = PyBytes_FromString(""); } else { - details = PyBytes_FromString(c_event->data.finished.details); + details = PyBytes_FromString(tag->call->status_details); } if (details == NULL) { return NULL; @@ -287,9 +319,9 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { return NULL; } metadata = pygrpc_metadata_collection_get( - c_event->data.finished.metadata_elements, - c_event->data.finished.metadata_count); - event_args = PyTuple_Pack(8, finish_event_kind, (PyObject *)c_event->tag, + tag->call->recv_trailing_metadata.metadata, + tag->call->recv_trailing_metadata.count); + event_args = PyTuple_Pack(8, finish_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None, status, metadata); Py_DECREF(status); @@ -348,28 +380,51 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self, Py_RETURN_NONE; } + pygrpc_tag *tag = (pygrpc_tag *)c_event->tag; + switch (c_event->type) { case GRPC_QUEUE_SHUTDOWN: event_args = pygrpc_stop_event_args(c_event); break; - case GRPC_WRITE_ACCEPTED: - event_args = pygrpc_write_event_args(c_event); - break; - case GRPC_FINISH_ACCEPTED: - event_args = pygrpc_complete_event_args(c_event); - break; - case GRPC_SERVER_RPC_NEW: - event_args = pygrpc_service_event_args(c_event); - break; - case GRPC_READ: - event_args = pygrpc_read_event_args(c_event); - break; - case GRPC_CLIENT_METADATA_READ: - event_args = pygrpc_metadata_event_args(c_event); - break; - case GRPC_FINISHED: - event_args = pygrpc_finished_event_args(c_event); + case GRPC_OP_COMPLETE: { + if (!tag) { + PyErr_SetString(PyExc_Exception, "Unrecognized event type!"); + return NULL; + } + switch (tag->type) { + case PYGRPC_INITIAL_METADATA: + if (tag) { + pygrpc_tag_destroy(tag); + } + grpc_event_finish(c_event); + return pygrpc_completion_queue_get(self, args); + case PYGRPC_WRITE_ACCEPTED: + event_args = pygrpc_write_event_args(c_event); + break; + case PYGRPC_FINISH_ACCEPTED: + event_args = pygrpc_complete_event_args(c_event); + break; + case PYGRPC_SERVER_RPC_NEW: + event_args = pygrpc_service_event_args(c_event); + break; + case PYGRPC_READ: + event_args = pygrpc_read_event_args(c_event); + break; + case PYGRPC_CLIENT_METADATA_READ: + event_args = pygrpc_metadata_event_args(c_event); + break; + case PYGRPC_FINISHED_CLIENT: + event_args = pygrpc_finished_client_event_args(c_event); + break; + case PYGRPC_FINISHED_SERVER: + event_args = pygrpc_finished_server_event_args(c_event); + break; + default: + PyErr_SetString(PyExc_Exception, "Unrecognized op event type!"); + return NULL; + } break; + } default: PyErr_SetString(PyExc_Exception, "Unrecognized event type!"); return NULL; @@ -382,7 +437,9 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self, event = PyObject_CallObject(event_class, event_args); Py_DECREF(event_args); - Py_XDECREF((PyObject *)c_event->tag); + if (tag) { + pygrpc_tag_destroy(tag); + } grpc_event_finish(c_event); return event; diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index 826c586a1e..09c4660a2b 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -56,7 +56,7 @@ class LonelyClientTest(unittest.TestCase): completion_queue = _low.CompletionQueue() channel = _low.Channel('%s:%d' % (host, port), None) - client_call = _low.Call(channel, method, host, deadline) + client_call = _low.Call(channel, completion_queue, method, host, deadline) client_call.invoke(completion_queue, metadata_tag, finish_tag) first_event = completion_queue.get(after_deadline) @@ -138,7 +138,8 @@ class EchoTest(unittest.TestCase): server_data = [] client_data = [] - client_call = _low.Call(self.channel, method, self.host, deadline) + client_call = _low.Call(self.channel, self.client_completion_queue, + method, self.host, deadline) client_call.add_metadata(client_metadata_key, client_metadata_value) client_call.add_metadata(client_binary_metadata_key, client_binary_metadata_value) @@ -335,7 +336,8 @@ class CancellationTest(unittest.TestCase): server_data = [] client_data = [] - client_call = _low.Call(self.channel, method, self.host, deadline) + client_call = _low.Call(self.channel, self.client_completion_queue, + method, self.host, deadline) client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) diff --git a/src/python/src/grpc/_adapter/_server.c b/src/python/src/grpc/_adapter/_server.c index 181b6c21fc..e7c5917724 100644 --- a/src/python/src/grpc/_adapter/_server.c +++ b/src/python/src/grpc/_adapter/_server.c @@ -36,12 +36,14 @@ #include <Python.h> #include <grpc/grpc.h> +#include "grpc/_adapter/_call.h" #include "grpc/_adapter/_completion_queue.h" #include "grpc/_adapter/_error.h" #include "grpc/_adapter/_server_credentials.h" +#include "grpc/_adapter/_tag.h" static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) { - const PyObject *completion_queue; + CompletionQueue *completion_queue; static char *kwlist[] = {"completion_queue", NULL}; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!:Server", kwlist, @@ -50,7 +52,9 @@ static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) { return -1; } self->c_server = grpc_server_create( - ((CompletionQueue *)completion_queue)->c_completion_queue, NULL); + completion_queue->c_completion_queue, NULL); + self->completion_queue = completion_queue; + Py_INCREF(completion_queue); return 0; } @@ -58,6 +62,7 @@ static void pygrpc_server_dealloc(Server *self) { if (self->c_server != NULL) { grpc_server_destroy(self->c_server); } + Py_XDECREF(self->completion_queue); self->ob_type->tp_free((PyObject *)self); } @@ -109,8 +114,15 @@ static PyObject *pygrpc_server_start(Server *self) { static const PyObject *pygrpc_server_service(Server *self, PyObject *tag) { grpc_call_error call_error; const PyObject *result; - - call_error = grpc_server_request_call_old(self->c_server, (void *)tag); + pygrpc_tag *c_tag = pygrpc_tag_new_server_rpc_call(tag); + c_tag->call->completion_queue = self->completion_queue; + c_tag->call->server = self; + Py_INCREF(c_tag->call->completion_queue); + Py_INCREF(c_tag->call->server); + call_error = grpc_server_request_call( + self->c_server, &c_tag->call->c_call, &c_tag->call->call_details, + &c_tag->call->recv_metadata, self->completion_queue->c_completion_queue, + c_tag); result = pygrpc_translate_call_error(call_error); if (result != NULL) { diff --git a/src/python/src/grpc/_adapter/_server.h b/src/python/src/grpc/_adapter/_server.h index 4836bb638c..d31d4e678b 100644 --- a/src/python/src/grpc/_adapter/_server.h +++ b/src/python/src/grpc/_adapter/_server.h @@ -37,8 +37,12 @@ #include <Python.h> #include <grpc/grpc.h> +#include "grpc/_adapter/_completion_queue.h" + typedef struct { PyObject_HEAD + + CompletionQueue *completion_queue; grpc_server *c_server; } Server; diff --git a/src/python/src/grpc/_adapter/_tag.c b/src/python/src/grpc/_adapter/_tag.c new file mode 100644 index 0000000000..9c6ee19d79 --- /dev/null +++ b/src/python/src/grpc/_adapter/_tag.c @@ -0,0 +1,65 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "grpc/_adapter/_tag.h" + +#include <Python.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag, + Call *call) { + pygrpc_tag *self = (pygrpc_tag *)gpr_malloc(sizeof(pygrpc_tag)); + memset(self, 0, sizeof(pygrpc_tag)); + if (user_tag == NULL) { + self->user_tag = Py_None; + } else { + self->user_tag = user_tag; + } + Py_INCREF(self->user_tag); + self->type = type; + self->call = call; + Py_INCREF(call); + return self; +} + +pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag) { + return pygrpc_tag_new(PYGRPC_SERVER_RPC_NEW, user_tag, + (Call *)pygrpc_CallType.tp_alloc(&pygrpc_CallType, 0)); +} + +void pygrpc_tag_destroy(pygrpc_tag *self) { + Py_XDECREF(self->user_tag); + Py_XDECREF(self->call); + gpr_free(self); +} diff --git a/src/python/src/grpc/_adapter/_tag.h b/src/python/src/grpc/_adapter/_tag.h new file mode 100644 index 0000000000..82987ea102 --- /dev/null +++ b/src/python/src/grpc/_adapter/_tag.h @@ -0,0 +1,70 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__TAG_H_ +#define _ADAPTER__TAG_H_ + +#include <Python.h> +#include <grpc/grpc.h> + +#include "grpc/_adapter/_call.h" +#include "grpc/_adapter/_completion_queue.h" + +/* grpc_completion_type is becoming meaningless in grpc_event; this is a partial + replacement for its descriptive functionality until Python can move its whole + C and C adapter stack to more closely resemble the core batching API. */ +typedef enum { + PYGRPC_SERVER_RPC_NEW = 0, + PYGRPC_INITIAL_METADATA = 1, + PYGRPC_READ = 2, + PYGRPC_WRITE_ACCEPTED = 3, + PYGRPC_FINISH_ACCEPTED = 4, + PYGRPC_CLIENT_METADATA_READ = 5, + PYGRPC_FINISHED_CLIENT = 6, + PYGRPC_FINISHED_SERVER = 7, +} pygrpc_tag_type; + +typedef struct { + pygrpc_tag_type type; + PyObject *user_tag; + + Call *call; +} pygrpc_tag; + +pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag, + Call *call); +pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag); +void pygrpc_tag_destroy(pygrpc_tag *self); + +#endif /* _ADAPTER__TAG_H_ */ + diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index 2b93aa6331..dd0a486117 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -246,7 +246,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated): timeout: A duration of time in seconds to allow for the RPC. """ request_serializer = self._request_serializers[name] - call = _low.Call(self._channel, name, self._host, time.time() + timeout) + call = _low.Call(self._channel, self._completion_queue, name, self._host, time.time() + timeout) if self._metadata_transformer is not None: metadata = self._metadata_transformer([]) for metadata_key, metadata_value in metadata: diff --git a/src/python/src/setup.py b/src/python/src/setup.py index 32ac41e285..7149a9eb85 100644 --- a/src/python/src/setup.py +++ b/src/python/src/setup.py @@ -42,6 +42,7 @@ _EXTENSION_SOURCES = ( 'grpc/_adapter/_server.c', 'grpc/_adapter/_client_credentials.c', 'grpc/_adapter/_server_credentials.c', + 'grpc/_adapter/_tag.c' ) _EXTENSION_INCLUDE_DIRECTORIES = ( |