aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/private/GRPCChannelPool.m
blob: a323f0490c8cd6cd0a09bd2897fabac33989846c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/*
 *
 * Copyright 2015 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#import <Foundation/Foundation.h>

#import "../internal/GRPCCallOptions+Internal.h"
#import "GRPCChannel.h"
#import "GRPCChannelFactory.h"
#import "GRPCChannelPool+Test.h"
#import "GRPCChannelPool.h"
#import "GRPCCompletionQueue.h"
#import "GRPCConnectivityMonitor.h"
#import "GRPCCronetChannelFactory.h"
#import "GRPCInsecureChannelFactory.h"
#import "GRPCSecureChannelFactory.h"
#import "GRPCWrappedCall.h"
#import "version.h"

#import <GRPCClient/GRPCCall+Cronet.h>
#include <grpc/support/log.h>

extern const char *kCFStreamVarName;

static GRPCChannelPool *gChannelPool;
static dispatch_once_t gInitChannelPool;

/** When all calls of a channel are destroyed, destroy the channel after this much seconds. */
static const NSTimeInterval kDefaultChannelDestroyDelay = 30;

@implementation GRPCPooledChannel {
  GRPCChannelConfiguration *_channelConfiguration;
  NSTimeInterval _destroyDelay;

  NSHashTable<GRPCWrappedCall *> *_wrappedCalls;
  GRPCChannel *_wrappedChannel;
  NSDate *_lastTimedDestroy;
  dispatch_queue_t _timerQueue;
}

- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration {
  return [self initWithChannelConfiguration:channelConfiguration
                               destroyDelay:kDefaultChannelDestroyDelay];
}

- (nullable instancetype)initWithChannelConfiguration:
                             (GRPCChannelConfiguration *)channelConfiguration
                                         destroyDelay:(NSTimeInterval)destroyDelay {
  NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
  if (channelConfiguration == nil) {
    return nil;
  }

  if ((self = [super init])) {
    _channelConfiguration = [channelConfiguration copy];
    _destroyDelay = destroyDelay;
    _wrappedCalls = [NSHashTable weakObjectsHashTable];
    _wrappedChannel = nil;
    _lastTimedDestroy = nil;
#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
    if (@available(iOS 8.0, macOS 10.10, *)) {
      _timerQueue = dispatch_queue_create(NULL, dispatch_queue_attr_make_with_qos_class(
                                                    DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
    } else {
#else
    {
#endif
      _timerQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
    }
  }

  return self;
}

- (void)dealloc {
  // Disconnect GRPCWrappedCall objects created but not yet removed
  if (_wrappedCalls.allObjects.count != 0) {
    for (GRPCWrappedCall *wrappedCall in _wrappedCalls.allObjects) {
      [wrappedCall channelDisconnected];
    };
  }
}

- (GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path
                         completionQueue:(GRPCCompletionQueue *)queue
                             callOptions:(GRPCCallOptions *)callOptions {
  NSAssert(path.length > 0, @"path must not be empty.");
  NSAssert(queue != nil, @"completionQueue must not be empty.");
  NSAssert(callOptions, @"callOptions must not be empty.");
  if (path.length == 0 || queue == nil || callOptions == nil) {
    return nil;
  }

  GRPCWrappedCall *call = nil;

  @synchronized(self) {
    if (_wrappedChannel == nil) {
      _wrappedChannel = [[GRPCChannel alloc] initWithChannelConfiguration:_channelConfiguration];
      if (_wrappedChannel == nil) {
        NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy.");
        return nil;
      }
    }
    _lastTimedDestroy = nil;

    grpc_call *unmanagedCall =
        [_wrappedChannel unmanagedCallWithPath:path
                               completionQueue:[GRPCCompletionQueue completionQueue]
                                   callOptions:callOptions];
    if (unmanagedCall == NULL) {
      NSAssert(unmanagedCall != NULL, @"Unable to create grpc_call object");
      return nil;
    }

    call = [[GRPCWrappedCall alloc] initWithUnmanagedCall:unmanagedCall pooledChannel:self];
    if (call == nil) {
      NSAssert(call != nil, @"Unable to create GRPCWrappedCall object");
      grpc_call_unref(unmanagedCall);
      return nil;
    }

    [_wrappedCalls addObject:call];
  }
  return call;
}

- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall {
  NSAssert(wrappedCall != nil, @"wrappedCall cannot be empty.");
  if (wrappedCall == nil) {
    return;
  }
  @synchronized(self) {
    // Detect if all objects weakly referenced in _wrappedCalls are (implicitly) removed.
    // _wrappedCalls.count does not work here since the hash table may include deallocated weak
    // references. _wrappedCalls.allObjects forces removal of those objects.
    if (_wrappedCalls.allObjects.count == 0) {
      // No more call has reference to this channel. We may start the timer for destroying the
      // channel now.
      NSDate *now = [NSDate date];
      NSAssert(now != nil, @"Unable to create NSDate object 'now'.");
      _lastTimedDestroy = now;
      dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)_destroyDelay * NSEC_PER_SEC),
                     _timerQueue, ^{
                       @synchronized(self) {
                         // Check _lastTimedDestroy against now in case more calls are created (and
                         // maybe destroyed) after this dispatch_async. In that case the current
                         // dispatch_after block should be discarded; the channel should be
                         // destroyed in a later dispatch_after block.
                         if (now != nil && self->_lastTimedDestroy == now) {
                           self->_wrappedChannel = nil;
                           self->_lastTimedDestroy = nil;
                         }
                       }
                     });
    }
  }
}

- (void)disconnect {
  NSArray<GRPCWrappedCall *> *copiedWrappedCalls = nil;
  @synchronized(self) {
    if (_wrappedChannel != nil) {
      _wrappedChannel = nil;
      copiedWrappedCalls = _wrappedCalls.allObjects;
      [_wrappedCalls removeAllObjects];
    }
  }
  for (GRPCWrappedCall *wrappedCall in copiedWrappedCalls) {
    [wrappedCall channelDisconnected];
  }
}

- (GRPCChannel *)wrappedChannel {
  GRPCChannel *channel = nil;
  @synchronized(self) {
    channel = _wrappedChannel;
  }
  return channel;
}

@end

@interface GRPCChannelPool ()

- (instancetype)initPrivate NS_DESIGNATED_INITIALIZER;

@end

@implementation GRPCChannelPool {
  NSMutableDictionary<GRPCChannelConfiguration *, GRPCPooledChannel *> *_channelPool;
}

+ (instancetype)sharedInstance {
  dispatch_once(&gInitChannelPool, ^{
    gChannelPool = [[GRPCChannelPool alloc] initPrivate];
    NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool.");
  });
  return gChannelPool;
}

- (instancetype)initPrivate {
  if ((self = [super init])) {
    _channelPool = [NSMutableDictionary dictionary];

    // Connectivity monitor is not required for CFStream
    char *enableCFStream = getenv(kCFStreamVarName);
    if (enableCFStream == nil || enableCFStream[0] != '1') {
      [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChange:)];
    }
  }
  return self;
}

- (void)dealloc {
  [GRPCConnectivityMonitor unregisterObserver:self];
}

- (GRPCPooledChannel *)channelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions {
  NSAssert(host.length > 0, @"Host must not be empty.");
  NSAssert(callOptions != nil, @"callOptions must not be empty.");
  if (host.length == 0 || callOptions == nil) {
    return nil;
  }

  GRPCPooledChannel *pooledChannel = nil;
  GRPCChannelConfiguration *configuration =
      [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions];
  @synchronized(self) {
    pooledChannel = _channelPool[configuration];
    if (pooledChannel == nil) {
      pooledChannel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration];
      _channelPool[configuration] = pooledChannel;
    }
  }
  return pooledChannel;
}

- (void)disconnectAllChannels {
  NSArray<GRPCPooledChannel *> *copiedPooledChannels;
  @synchronized(self) {
    copiedPooledChannels = _channelPool.allValues;
  }

  // Disconnect pooled channels.
  for (GRPCPooledChannel *pooledChannel in copiedPooledChannels) {
    [pooledChannel disconnect];
  }
}

- (void)connectivityChange:(NSNotification *)note {
  [self disconnectAllChannels];
}

@end

@implementation GRPCChannelPool (Test)

- (instancetype)initTestPool {
  return [self initPrivate];
}

@end