diff options
Diffstat (limited to 'src')
28 files changed, 396 insertions, 309 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 349d8681d5..ade88da4cb 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -335,6 +335,9 @@ static void add_to_storage(struct stream_obj* s, /* add new op at the beginning of the linked list. The memory is freed in remove_from_storage */ op_and_state* new_op = grpc_core::New<op_and_state>(s, *op); + // Pontential fix to crash on GPR_ASSERT(!curr->done) + // TODO (mxyan): check if this is indeed necessary. + new_op->done = false; gpr_mu_lock(&s->mu); storage->head = new_op; storage->num_pending_ops++; @@ -391,7 +394,7 @@ static void execute_from_storage(stream_obj* s) { gpr_mu_lock(&s->mu); for (struct op_and_state* curr = s->storage.head; curr != nullptr;) { CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); - GPR_ASSERT(curr->done == 0); + GPR_ASSERT(!curr->done); enum e_op_result result = execute_stream_op(curr); CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, op_result_string(result)); @@ -400,13 +403,12 @@ static void execute_from_storage(stream_obj* s) { struct op_and_state* next = curr->next; remove_from_storage(s, curr); curr = next; - } - /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */ - if (result == NO_ACTION_POSSIBLE) { + } else if (result == NO_ACTION_POSSIBLE) { curr = curr->next; } else if (result == ACTION_TAKEN_WITH_CALLBACK) { + /* wait for the callback */ break; - } + } /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */ } gpr_mu_unlock(&s->mu); } diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 7c0062eb4e..402f8904ea 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -29,6 +29,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/profiling/timers.h" grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); @@ -228,8 +229,14 @@ bool grpc_combiner_continue_exec_ctx() { grpc_core::ExecCtx::Get()->IsReadyToFinish(), lock->time_to_execute_final_list)); + // offload only if all the following conditions are true: + // 1. the combiner is contended and has more than one closure to execute + // 2. the current execution context needs to finish as soon as possible + // 3. the DEFAULT executor is threaded + // 4. the current thread is not a worker for any background poller if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - grpc_executor_is_threaded()) { + grpc_executor_is_threaded() && + !grpc_iomgr_is_any_background_poller_thread()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be // picked up on the executor diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 4b8c891e9b..9eb4c089d8 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1287,6 +1289,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7a4870db78..0a0891013a 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag, * Event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1644,6 +1646,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 67cbfbbd02..c479206410 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() { * event engine binding */ +static bool is_any_background_poller_thread(void) { return false; } + static void shutdown_background_closure(void) {} static void shutdown_engine(void) { @@ -1828,6 +1830,7 @@ static const grpc_event_engine_vtable vtable = { pollset_set_add_fd, pollset_set_del_fd, + is_any_background_poller_thread, shutdown_background_closure, shutdown_engine, }; diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 32d1b6c43e..fb2e70eee4 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -399,6 +399,10 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { g_event_engine->pollset_set_del_fd(pollset_set, fd); } +bool grpc_is_any_background_poller_thread(void) { + return g_event_engine->is_any_background_poller_thread(); +} + void grpc_shutdown_background_closure(void) { g_event_engine->shutdown_background_closure(); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 812c7a0f0f..94ac9fdba6 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -80,6 +80,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd); + bool (*is_any_background_poller_thread)(void); void (*shutdown_background_closure)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; @@ -181,6 +182,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd); void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); +/* Returns true if the caller is a worker thread for any background poller. */ +bool grpc_is_any_background_poller_thread(); + /* Shut down all the closures registered in the background poller. */ void grpc_shutdown_background_closure(); diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index eb29973514..a492146857 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -161,6 +161,10 @@ void grpc_iomgr_shutdown_background_closure() { grpc_iomgr_platform_shutdown_background_closure(); } +bool grpc_iomgr_is_any_background_poller_thread() { + return grpc_iomgr_platform_is_any_background_poller_thread(); +} + void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) { obj->name = gpr_strdup(name); gpr_mu_lock(&g_mu); diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 8ea9289e06..6261aa550c 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -39,6 +39,9 @@ void grpc_iomgr_shutdown(); * background poller. */ void grpc_iomgr_shutdown_background_closure(); +/** Returns true if the caller is a worker thread for any background poller. */ +bool grpc_iomgr_is_any_background_poller_thread(); + /* Exposed only for testing */ size_t grpc_iomgr_count_objects_for_testing(); diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index 4b112c9097..e1cd8f7310 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -41,10 +41,14 @@ static void iomgr_platform_init(void) { static void iomgr_platform_flush(void) {} static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } static void iomgr_platform_shutdown_background_closure(void) {} +static bool iomgr_platform_is_any_background_poller_thread(void) { + return false; +} static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_custom_iomgr_init(grpc_socket_vtable* socket, grpc_custom_resolver_vtable* resolver, diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc index b6c9211865..e68b1cf581 100644 --- a/src/core/lib/iomgr/iomgr_internal.cc +++ b/src/core/lib/iomgr/iomgr_internal.cc @@ -45,3 +45,7 @@ void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); } void grpc_iomgr_platform_shutdown_background_closure() { iomgr_platform_vtable->shutdown_background_closure(); } + +bool grpc_iomgr_platform_is_any_background_poller_thread() { + return iomgr_platform_vtable->is_any_background_poller_thread(); +} diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h index bca7409907..2250ad9a18 100644 --- a/src/core/lib/iomgr/iomgr_internal.h +++ b/src/core/lib/iomgr/iomgr_internal.h @@ -36,6 +36,7 @@ typedef struct grpc_iomgr_platform_vtable { void (*flush)(void); void (*shutdown)(void); void (*shutdown_background_closure)(void); + bool (*is_any_background_poller_thread)(void); } grpc_iomgr_platform_vtable; void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name); @@ -56,6 +57,9 @@ void grpc_iomgr_platform_shutdown(void); /** shut down all the closures registered in the background poller */ void grpc_iomgr_platform_shutdown_background_closure(void); +/** return true is the caller is a worker thread for any background poller */ +bool grpc_iomgr_platform_is_any_background_poller_thread(void); + bool grpc_iomgr_abort_on_leaks(void); #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 9386adf060..278c8de688 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -55,9 +55,14 @@ static void iomgr_platform_shutdown_background_closure(void) { grpc_shutdown_background_closure(); } +static bool iomgr_platform_is_any_background_poller_thread(void) { + return grpc_is_any_background_poller_thread(); +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable); diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index 552ef4309c..462ac41fcd 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -58,9 +58,14 @@ static void iomgr_platform_shutdown_background_closure(void) { grpc_shutdown_background_closure(); } +static bool iomgr_platform_is_any_background_poller_thread(void) { + return grpc_is_any_background_poller_thread(); +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { char* enable_cfstream = getenv(grpc_cfstream_env_var); diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc index 24ef0dba7b..0579e16aa7 100644 --- a/src/core/lib/iomgr/iomgr_windows.cc +++ b/src/core/lib/iomgr/iomgr_windows.cc @@ -73,9 +73,14 @@ static void iomgr_platform_shutdown(void) { static void iomgr_platform_shutdown_background_closure(void) {} +static bool iomgr_platform_is_any_background_poller_thread(void) { + return false; +} + static grpc_iomgr_platform_vtable vtable = { iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown, - iomgr_platform_shutdown_background_closure}; + iomgr_platform_shutdown_background_closure, + iomgr_platform_is_any_background_poller_thread}; void grpc_set_default_iomgr_platform() { grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable); diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 83c6edc6e3..e8fae09a1f 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -56,7 +56,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Make them read-write. @property(atomic, strong) NSDictionary *responseHeaders; @property(atomic, strong) NSDictionary *responseTrailers; -@property(atomic) BOOL isWaitingForToken; - (instancetype)initWithHost:(NSString *)host path:(NSString *)path @@ -425,9 +424,6 @@ const char *kCFStreamVarName = "grpc_cfstream"; // queue dispatch_queue_t _responseQueue; - // Whether the call is finished. If it is, should not call finishWithError again. - BOOL _finished; - // The OAuth2 token fetched from a token provider. NSString *_fetchedOauth2AccessToken; } @@ -448,24 +444,28 @@ const char *kCFStreamVarName = "grpc_cfstream"; return; } NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - switch (callSafety) { - case GRPCCallSafetyDefault: - callFlags[hostAndPath] = @0; - break; - case GRPCCallSafetyIdempotentRequest: - callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; - break; - case GRPCCallSafetyCacheableRequest: - callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; - break; - default: - break; + @synchronized(callFlags) { + switch (callSafety) { + case GRPCCallSafetyDefault: + callFlags[hostAndPath] = @0; + break; + case GRPCCallSafetyIdempotentRequest: + callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; + break; + case GRPCCallSafetyCacheableRequest: + callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; + break; + default: + break; + } } } + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; - return [callFlags[hostAndPath] intValue]; + @synchronized(callFlags) { + return [callFlags[hostAndPath] intValue]; + } } // Designated initializer @@ -506,7 +506,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; _callOptions = [callOptions copy]; // Serial queue to invoke the non-reentrant methods of the grpc_call object. - _callQueue = dispatch_queue_create("io.grpc.call", NULL); + _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL); _requestWriter = requestWriter; @@ -523,66 +523,48 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)setResponseDispatchQueue:(dispatch_queue_t)queue { - if (_state != GRXWriterStateNotStarted) { - return; + @synchronized(self) { + if (_state != GRXWriterStateNotStarted) { + return; + } + _responseQueue = queue; } - _responseQueue = queue; } #pragma mark Finish +// This function should support being called within a @synchronized(self) block in another function +// Should not manipulate _requestWriter for deadlock prevention. - (void)finishWithError:(NSError *)errorOrNil { @synchronized(self) { + if (_state == GRXWriterStateFinished) { + return; + } _state = GRXWriterStateFinished; - } - - // If there were still request messages coming, stop them. - @synchronized(_requestWriter) { - _requestWriter.state = GRXWriterStateFinished; - } - - if (errorOrNil) { - [_responseWriteable cancelWithError:errorOrNil]; - } else { - [_responseWriteable enqueueSuccessfulCompletion]; - } - - [GRPCConnectivityMonitor unregisterObserver:self]; - // If the call isn't retained anywhere else, it can be deallocated now. - _retainSelf = nil; -} + if (errorOrNil) { + [_responseWriteable cancelWithError:errorOrNil]; + } else { + [_responseWriteable enqueueSuccessfulCompletion]; + } -- (void)cancelCall { - // Can be called from any thread, any number of times. - @synchronized(self) { - [_wrappedCall cancel]; + // If the call isn't retained anywhere else, it can be deallocated now. + _retainSelf = nil; } } - (void)cancel { @synchronized(self) { - [self cancelCall]; - self.isWaitingForToken = NO; - } - [self - maybeFinishWithError:[NSError - errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeCancelled - userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; -} - -- (void)maybeFinishWithError:(NSError *)errorOrNil { - BOOL toFinish = NO; - @synchronized(self) { - if (_finished == NO) { - _finished = YES; - toFinish = YES; + if (_state == GRXWriterStateFinished) { + return; } + [self finishWithError:[NSError + errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; + [_wrappedCall cancel]; } - if (toFinish == YES) { - [self finishWithError:errorOrNil]; - } + _requestWriter.state = GRXWriterStateFinished; } - (void)dealloc { @@ -609,21 +591,24 @@ const char *kCFStreamVarName = "grpc_cfstream"; // TODO(jcanizales): Rename to readResponseIfNotPaused. - (void)startNextRead { @synchronized(self) { - if (self.state == GRXWriterStatePaused) { + if (_state != GRXWriterStateStarted) { return; } } dispatch_async(_callQueue, ^{ __weak GRPCCall *weakSelf = self; - __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable; [self startReadWithHandler:^(grpc_byte_buffer *message) { - __strong GRPCCall *strongSelf = weakSelf; - __strong GRXConcurrentWriteable *strongWriteable = weakWriteable; + NSLog(@"message received"); if (message == NULL) { // No more messages from the server return; } + __strong GRPCCall *strongSelf = weakSelf; + if (strongSelf == nil) { + grpc_byte_buffer_destroy(message); + return; + } NSData *data = [NSData grpc_dataWithByteBuffer:message]; grpc_byte_buffer_destroy(message); if (!data) { @@ -631,21 +616,26 @@ const char *kCFStreamVarName = "grpc_cfstream"; // don't want to throw, because the app shouldn't crash for a behavior // that's on the hands of any server to have. Instead we finish and ask // the server to cancel. - [strongSelf cancelCall]; - [strongSelf - maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeResourceExhausted - userInfo:@{ - NSLocalizedDescriptionKey : - @"Client does not have enough memory to " - @"hold the server response." - }]]; - return; + @synchronized(strongSelf) { + [strongSelf + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeResourceExhausted + userInfo:@{ + NSLocalizedDescriptionKey : + @"Client does not have enough memory to " + @"hold the server response." + }]]; + [strongSelf->_wrappedCall cancel]; + } + strongSelf->_requestWriter.state = GRXWriterStateFinished; + } else { + @synchronized(strongSelf) { + [strongSelf->_responseWriteable enqueueValue:data + completionHandler:^{ + [strongSelf startNextRead]; + }]; + } } - [strongWriteable enqueueValue:data - completionHandler:^{ - [strongSelf startNextRead]; - }]; }]; }); } @@ -684,11 +674,13 @@ const char *kCFStreamVarName = "grpc_cfstream"; initWithMetadata:headers flags:callSafetyFlags handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA - if (!_unaryCall) { - [_wrappedCall startBatchWithOperations:@[ op ]]; - } else { - [_unaryOpBatch addObject:op]; - } + dispatch_async(_callQueue, ^{ + if (!self->_unaryCall) { + [self->_wrappedCall startBatchWithOperations:@[ op ]]; + } else { + [self->_unaryOpBatch addObject:op]; + } + }); } #pragma mark GRXWriteable implementation @@ -703,9 +695,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; // Resume the request writer. GRPCCall *strongSelf = weakSelf; if (strongSelf) { - @synchronized(strongSelf->_requestWriter) { - strongSelf->_requestWriter.state = GRXWriterStateStarted; - } + strongSelf->_requestWriter.state = GRXWriterStateStarted; } }; @@ -721,13 +711,17 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)writeValue:(id)value { - // TODO(jcanizales): Throw/assert if value isn't NSData. + NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); + + @synchronized(self) { + if (_state == GRXWriterStateFinished) { + return; + } + } // Pause the input and only resume it when the C layer notifies us that writes // can proceed. - @synchronized(_requestWriter) { - _requestWriter.state = GRXWriterStatePaused; - } + _requestWriter.state = GRXWriterStatePaused; dispatch_async(_callQueue, ^{ // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT @@ -766,17 +760,20 @@ const char *kCFStreamVarName = "grpc_cfstream"; // The second one (completionHandler), whenever the RPC finishes for any reason. - (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler { - // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; - [_wrappedCall - startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + dispatch_async(_callQueue, ^{ + // TODO(jcanizales): Add error handlers for async failures + [self->_wrappedCall + startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; + [self->_wrappedCall + startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; + }); } - (void)invokeCall { __weak GRPCCall *weakSelf = self; [self invokeCallWithHeadersHandler:^(NSDictionary *headers) { // Response headers received. + NSLog(@"response received"); __strong GRPCCall *strongSelf = weakSelf; if (strongSelf) { strongSelf.responseHeaders = headers; @@ -784,6 +781,7 @@ const char *kCFStreamVarName = "grpc_cfstream"; } } completionHandler:^(NSError *error, NSDictionary *trailers) { + NSLog(@"completion received"); __strong GRPCCall *strongSelf = weakSelf; if (strongSelf) { strongSelf.responseTrailers = trailers; @@ -794,112 +792,114 @@ const char *kCFStreamVarName = "grpc_cfstream"; [userInfo addEntriesFromDictionary:error.userInfo]; } userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; - // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be - // called before this one, so an error might end up with trailers but no headers. We - // shouldn't call finishWithError until ater both blocks are called. It is also when - // this is done that we can provide a merged view of response headers and trailers in a - // thread-safe way. - if (strongSelf.responseHeaders) { - userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; - } + // Since gRPC core does not guarantee the headers block being called before this block, + // responseHeaders might be nil. + userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; } - [strongSelf maybeFinishWithError:error]; + [strongSelf finishWithError:error]; + strongSelf->_requestWriter.state = GRXWriterStateFinished; } }]; - // Now that the RPC has been initiated, request writes can start. - @synchronized(_requestWriter) { - [_requestWriter startWithWriteable:self]; - } } #pragma mark GRXWriter implementation +// Lock acquired inside startWithWriteable: - (void)startCallWithWriteable:(id<GRXWriteable>)writeable { - _responseWriteable = - [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; - - GRPCPooledChannel *channel = - [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; - GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path - completionQueue:[GRPCCompletionQueue completionQueue] - callOptions:_callOptions]; - - if (wrappedCall == nil) { - [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : - @"Failed to create call or channel." - }]]; - return; - } - @synchronized(self) { - _wrappedCall = wrappedCall; - } + if (_state == GRXWriterStateFinished) { + return; + } + + _responseWriteable = + [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; + + GRPCPooledChannel *channel = + [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions]; + _wrappedCall = [channel wrappedCallWithPath:_path + completionQueue:[GRPCCompletionQueue completionQueue] + callOptions:_callOptions]; + + if (_wrappedCall == nil) { + [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : + @"Failed to create call or channel." + }]]; + return; + } - [self sendHeaders]; - [self invokeCall]; + [self sendHeaders]; + [self invokeCall]; - // Connectivity monitor is not required for CFStream - char *enableCFStream = getenv(kCFStreamVarName); - if (enableCFStream == nil || enableCFStream[0] != '1') { - [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; + // Connectivity monitor is not required for CFStream + char *enableCFStream = getenv(kCFStreamVarName); + if (enableCFStream == nil || enableCFStream[0] != '1') { + [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; + } } + + // Now that the RPC has been initiated, request writes can start. + [_requestWriter startWithWriteable:self]; } - (void)startWithWriteable:(id<GRXWriteable>)writeable { + id<GRPCAuthorizationProtocol> tokenProvider = nil; @synchronized(self) { _state = GRXWriterStateStarted; - } - // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). - // This makes RPCs in which the call isn't externally retained possible (as long as it is started - // before being autoreleased). - // Care is taken not to retain self strongly in any of the blocks used in this implementation, so - // that the life of the instance is determined by this retain cycle. - _retainSelf = self; + // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). + // This makes RPCs in which the call isn't externally retained possible (as long as it is + // started before being autoreleased). Care is taken not to retain self strongly in any of the + // blocks used in this implementation, so that the life of the instance is determined by this + // retain cycle. + _retainSelf = self; + + if (_callOptions == nil) { + GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy]; + if (_serverName.length != 0) { + callOptions.serverAuthority = _serverName; + } + if (_timeout > 0) { + callOptions.timeout = _timeout; + } + uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path]; + if (callFlags != 0) { + if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { + _callSafety = GRPCCallSafetyIdempotentRequest; + } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { + _callSafety = GRPCCallSafetyCacheableRequest; + } + } - if (_callOptions == nil) { - GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy]; - if (_serverName.length != 0) { - callOptions.serverAuthority = _serverName; - } - if (_timeout > 0) { - callOptions.timeout = _timeout; - } - uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path]; - if (callFlags != 0) { - if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { - _callSafety = GRPCCallSafetyIdempotentRequest; - } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { - _callSafety = GRPCCallSafetyCacheableRequest; + id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider; + if (tokenProvider != nil) { + callOptions.authTokenProvider = tokenProvider; } + _callOptions = callOptions; } - id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider; - if (tokenProvider != nil) { - callOptions.authTokenProvider = tokenProvider; - } - _callOptions = callOptions; + NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil, + @"authTokenProvider and oauth2AccessToken cannot be set at the same time"); + + tokenProvider = _callOptions.authTokenProvider; } - NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil, - @"authTokenProvider and oauth2AccessToken cannot be set at the same time"); - if (_callOptions.authTokenProvider != nil) { - @synchronized(self) { - self.isWaitingForToken = YES; - } - [_callOptions.authTokenProvider getTokenWithHandler:^(NSString *token) { - @synchronized(self) { - if (self.isWaitingForToken) { - if (token) { - self->_fetchedOauth2AccessToken = [token copy]; + if (tokenProvider != nil) { + __weak typeof(self) weakSelf = self; + [tokenProvider getTokenWithHandler:^(NSString *token) { + __strong typeof(self) strongSelf = weakSelf; + if (strongSelf) { + @synchronized(strongSelf) { + if (strongSelf->_state == GRXWriterStateNotStarted) { + if (token) { + strongSelf->_fetchedOauth2AccessToken = [token copy]; + } } - [self startCallWithWriteable:writeable]; - self.isWaitingForToken = NO; } + [strongSelf startCallWithWriteable:writeable]; } }]; } else { @@ -938,16 +938,21 @@ const char *kCFStreamVarName = "grpc_cfstream"; } - (void)connectivityChanged:(NSNotification *)note { - // Cancel underlying call upon this notification + // Cancel underlying call upon this notification. + + // Retain because connectivity manager only keeps weak reference to GRPCCall. __strong GRPCCall *strongSelf = self; if (strongSelf) { - [self cancelCall]; - [self - maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeUnavailable - userInfo:@{ - NSLocalizedDescriptionKey : @"Connectivity lost." - }]]; + @synchronized(strongSelf) { + [_wrappedCall cancel]; + [strongSelf + finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{ + NSLocalizedDescriptionKey : @"Connectivity lost." + }]]; + } + strongSelf->_requestWriter.state = GRXWriterStateFinished; } } diff --git a/src/objective-c/GRPCClient/private/GRPCChannelPool.m b/src/objective-c/GRPCClient/private/GRPCChannelPool.m index a323f0490c..60a33eda82 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannelPool.m +++ b/src/objective-c/GRPCClient/private/GRPCChannelPool.m @@ -236,6 +236,12 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30; return nil; } + // remove trailing slash of hostname + NSURL *hostURL = [NSURL URLWithString:[@"https://" stringByAppendingString:host]]; + if (hostURL.host && hostURL.port == nil) { + host = [hostURL.host stringByAppendingString:@":443"]; + } + GRPCPooledChannel *pooledChannel = nil; GRPCChannelConfiguration *configuration = [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions]; diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index a871ea895a..ae08cc315b 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -36,8 +36,7 @@ * crash. If you want to react to flow control signals to prevent that, instead of using this class * you can implement an object that conforms to GRXWriter. * - * Thread-safety: - * The methods of an object of this class should not be called concurrently from different threads. + * Thread-safety: the methods of this class are thread-safe. */ @interface GRXBufferedPipe : GRXWriter<GRXWriteable> diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 546d46cba3..74e2f03da6 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -51,16 +51,22 @@ // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. value = [value copy]; } - __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^(void) { - [weakSelf.writeable writeValue:value]; + @synchronized(self) { + if (self->_state == GRXWriterStateFinished) { + return; + } + [self.writeable writeValue:value]; + } }); } - (void)writesFinishedWithError:(NSError *)errorOrNil { - __weak GRXBufferedPipe *weakSelf = self; dispatch_async(_writeQueue, ^{ - [weakSelf finishWithError:errorOrNil]; + if (self->_state == GRXWriterStateFinished) { + return; + } + [self finishWithError:errorOrNil]; }); } @@ -100,14 +106,15 @@ } - (void)startWithWriteable:(id<GRXWriteable>)writeable { - self.writeable = writeable; - _state = GRXWriterStateStarted; + @synchronized(self) { + self.writeable = writeable; + _state = GRXWriterStateStarted; + } dispatch_resume(_writeQueue); } - (void)finishWithError:(NSError *)errorOrNil { [self.writeable writesFinishedWithError:errorOrNil]; - self.state = GRXWriterStateFinished; } - (void)dealloc { diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index abb831e6fb..5beca9d41e 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -23,10 +23,10 @@ /** * This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a - * GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last - * message sent to it (no matter what messages are sent to the wrapper, in what order, nor from - * which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g. - * by the app cancelling the writes), no further messages are sent to the writeable except + * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is + * the last message sent to it (no matter what messages are sent to the wrapper, in what order, nor + * from which thread). It also guarantees that, if cancelWithError: is called (e.g. by the app + * cancelling the writes), no further messages are sent to the writeable except * writesFinishedWithError:. * * TODO(jcanizales): Let the user specify another queue for the writeable callbacks. @@ -43,21 +43,22 @@ - (instancetype)initWithWriteable:(id<GRXWriteable>)writeable; /** - * Enqueues writeValue: to be sent to the writeable in the main thread. - * The passed handler is invoked from the main thread after writeValue: returns. + * Enqueues writeValue: to be sent to the writeable from the designated dispatch queue. + * The passed handler is invoked from designated dispatch queue after writeValue: returns. */ - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler; /** - * Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that - * message is sent to the writeable, all other methods of this object are effectively noops. + * Enqueues writesFinishedWithError:nil to be sent to the writeable in the designated dispatch + * queue. After that message is sent to the writeable, all other methods of this object are + * effectively noops. */ - (void)enqueueSuccessfulCompletion; /** * If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one - * to be sent to it in the main thread, and cancel all other pending messages to the writeable - * enqueued by this object (both past and future). + * to be sent to it in the designated dispatch queue, and cancel all other pending messages to the + * writeable enqueued by this object (both past and future). * The error argument cannot be nil. */ - (void)cancelWithError:(NSError *)error; diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 81ccc3fbce..115195463d 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -27,8 +27,15 @@ @implementation GRXConcurrentWriteable { dispatch_queue_t _writeableQueue; - // This ensures that writesFinishedWithError: is only sent once to the writeable. + + // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected + // by _writeableQueue. BOOL _alreadyFinished; + + // This ivar ensures that a cancelWithError: call prevents further values to be sent to + // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be + // protected by self lock. + BOOL _cancelled; } - (instancetype)init { @@ -41,6 +48,8 @@ if (self = [super init]) { _writeableQueue = queue; _writeable = writeable; + _alreadyFinished = NO; + _cancelled = NO; } return self; } @@ -51,78 +60,63 @@ - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { dispatch_async(_writeableQueue, ^{ - // We're racing a possible cancellation performed by another thread. To turn all already- - // enqueued messages into noops, cancellation nillifies the writeable property. If we get it - // before it's nil, we won the race. - id<GRXWriteable> writeable = self.writeable; - if (writeable) { - [writeable writeValue:value]; - handler(); + if (self->_alreadyFinished) { + return; + } + + @synchronized(self) { + if (self->_cancelled) { + return; + } } + + [self.writeable writeValue:value]; + handler(); }); } - (void)enqueueSuccessfulCompletion { - __weak typeof(self) weakSelf = self; dispatch_async(_writeableQueue, ^{ - typeof(self) strongSelf = weakSelf; - if (strongSelf) { - BOOL finished = NO; - @synchronized(strongSelf) { - if (!strongSelf->_alreadyFinished) { - strongSelf->_alreadyFinished = YES; - } else { - finished = YES; - } - } - if (!finished) { - // Cancellation is now impossible. None of the other three blocks can run concurrently with - // this one. - [strongSelf.writeable writesFinishedWithError:nil]; - // Skip any possible message to the wrapped writeable enqueued after this one. - strongSelf.writeable = nil; + if (self->_alreadyFinished) { + return; + } + @synchronized(self) { + if (self->_cancelled) { + return; } } + [self.writeable writesFinishedWithError:nil]; + + // Skip any possible message to the wrapped writeable enqueued after this one. + self->_alreadyFinished = YES; + self.writeable = nil; }); } - (void)cancelWithError:(NSError *)error { - NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); - BOOL finished = NO; + NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); @synchronized(self) { - if (!_alreadyFinished) { - _alreadyFinished = YES; - } else { - finished = YES; - } + self->_cancelled = YES; } - if (!finished) { - // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to - // nillify writeable because we might be running concurrently with the blocks in - // _writeableQueue, and assignment with ARC isn't atomic. - id<GRXWriteable> writeable = self.writeable; - self.writeable = nil; + dispatch_async(_writeableQueue, ^{ + if (self->_alreadyFinished) { + // a cancel or a successful completion is already issued + return; + } + [self.writeable writesFinishedWithError:error]; - dispatch_async(_writeableQueue, ^{ - [writeable writesFinishedWithError:error]; - }); - } + // Skip any possible message to the wrapped writeable enqueued after this one. + self->_alreadyFinished = YES; + self.writeable = nil; + }); } - (void)cancelSilently { - BOOL finished = NO; - @synchronized(self) { - if (!_alreadyFinished) { - _alreadyFinished = YES; - } else { - finished = YES; + dispatch_async(_writeableQueue, ^{ + if (self->_alreadyFinished) { + return; } - } - if (!finished) { - // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to - // nillify writeable because we might be running concurrently with the blocks in - // _writeableQueue, and assignment with ARC isn't atomic. self.writeable = nil; - } + }); } @end diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h index 3814ff8831..00366b6416 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.h +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h @@ -25,11 +25,7 @@ * input writer, and for classes that represent objects with input and * output sequences of values, like an RPC. * - * Thread-safety: - * All messages sent to this object need to be serialized. When it is started, the writer it wraps - * is started in the same thread. Manual state changes are propagated to the wrapped writer in the - * same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be - * serialized with any message sent to this object. + * Thread-safety: the methods of this class are thread safe. */ @interface GRXForwardingWriter : GRXWriter - (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER; diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m index 3e522ef24e..27ac0acdff 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.m +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m @@ -54,48 +54,65 @@ [writeable writesFinishedWithError:errorOrNil]; } -// This is used to stop the input writer. It nillifies our reference to it -// to release it. -- (void)finishInput { - GRXWriter *writer = _writer; - _writer = nil; - writer.state = GRXWriterStateFinished; -} - #pragma mark GRXWriteable implementation - (void)writeValue:(id)value { - [_writeable writeValue:value]; + @synchronized(self) { + [_writeable writeValue:value]; + } } - (void)writesFinishedWithError:(NSError *)errorOrNil { - _writer = nil; - [self finishOutputWithError:errorOrNil]; + @synchronized(self) { + _writer = nil; + [self finishOutputWithError:errorOrNil]; + } } #pragma mark GRXWriter implementation - (GRXWriterState)state { - return _writer ? _writer.state : GRXWriterStateFinished; + GRXWriter *copiedWriter; + @synchronized(self) { + copiedWriter = _writer; + } + return copiedWriter ? copiedWriter.state : GRXWriterStateFinished; } - (void)setState:(GRXWriterState)state { + GRXWriter *copiedWriter = nil; if (state == GRXWriterStateFinished) { - _writeable = nil; - [self finishInput]; + @synchronized(self) { + _writeable = nil; + copiedWriter = _writer; + _writer = nil; + } + copiedWriter.state = GRXWriterStateFinished; } else { - _writer.state = state; + @synchronized(self) { + copiedWriter = _writer; + } + copiedWriter.state = state; } } - (void)startWithWriteable:(id<GRXWriteable>)writeable { - _writeable = writeable; - [_writer startWithWriteable:self]; + GRXWriter *copiedWriter = nil; + @synchronized(self) { + _writeable = writeable; + copiedWriter = _writer; + } + [copiedWriter startWithWriteable:self]; } - (void)finishWithError:(NSError *)errorOrNil { - [self finishOutputWithError:errorOrNil]; - [self finishInput]; + GRXWriter *copiedWriter = nil; + @synchronized(self) { + [self finishOutputWithError:errorOrNil]; + copiedWriter = _writer; + _writer = nil; + } + copiedWriter.state = GRXWriterStateFinished; } @end diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h index 601abdc6b9..2fa38b3dce 100644 --- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h @@ -23,6 +23,8 @@ /** * Utility to construct GRXWriter instances from values that are immediately available when * required. + * + * Thread safety: the methods of this class are thread safe. */ @interface GRXImmediateSingleWriter : GRXImmediateWriter diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m index 3126ae4bd1..079c11b94f 100644 --- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m @@ -20,7 +20,6 @@ @implementation GRXImmediateSingleWriter { id _value; - id<GRXWriteable> _writeable; } @synthesize state = _state; @@ -38,17 +37,16 @@ } - (void)startWithWriteable:(id<GRXWriteable>)writeable { - _state = GRXWriterStateStarted; - _writeable = writeable; - [writeable writeValue:_value]; - [self finish]; -} - -- (void)finish { - _state = GRXWriterStateFinished; - _value = nil; - id<GRXWriteable> writeable = _writeable; - _writeable = nil; + id copiedValue = nil; + @synchronized(self) { + if (_state != GRXWriterStateNotStarted) { + return; + } + copiedValue = _value; + _value = nil; + _state = GRXWriterStateFinished; + } + [writeable writeValue:copiedValue]; [writeable writesFinishedWithError:nil]; } @@ -65,9 +63,11 @@ // the original \a map function returns a new Writer of another type. So we // need to override this function here. - (GRXWriter *)map:(id (^)(id))map { - // Since _value is available when creating the object, we can simply - // apply the map and store the output. - _value = map(_value); + @synchronized(self) { + // Since _value is available when creating the object, we can simply + // apply the map and store the output. + _value = map(_value); + } return self; } diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h index 5d99583a92..df4c80c28d 100644 --- a/src/objective-c/RxLibrary/GRXWriter.h +++ b/src/objective-c/RxLibrary/GRXWriter.h @@ -80,9 +80,9 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { * This property can be used to query the current state of the writer, which determines how it might * currently use its writeable. Some state transitions can be triggered by setting this property to * the corresponding value, and that's useful for advanced use cases like pausing an writer. For - * more details, see the documentation of the enum further down. + * more details, see the documentation of the enum further down. The property is thread safe. */ -@property(nonatomic) GRXWriterState state; +@property GRXWriterState state; /** * Transition to the Started state, and start sending messages to the writeable (a reference to it diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 70d7618e05..8613bc501f 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -24,7 +24,7 @@ from grpc._cython import cygrpc as _cygrpc logging.getLogger(__name__).addHandler(logging.NullHandler()) try: - from ._grpcio_metadata import __version__ + from grpc._grpcio_metadata import __version__ except ImportError: __version__ = "dev0" diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index d72648a35d..ef74f61e04 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -149,6 +149,6 @@ cdef class Server: grpc_server_destroy(self.c_server) self.c_server = NULL - def __dealloc(self): + def __dealloc__(self): if self.c_server == NULL: grpc_shutdown() |