aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/subchannel_index.cc
blob: 0ae7898c5a475b5ccf4f976cbe1784de8e949de0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
//
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#include <grpc/support/port_platform.h>

#include "src/core/ext/filters/client_channel/subchannel_index.h"

#include <stdbool.h>
#include <string.h>

#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>

#include "src/core/lib/avl/avl.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/tls.h"

// a map of subchannel_key --> subchannel, used for detecting connections
// to the same destination in order to share them
static grpc_avl g_subchannel_index;

static gpr_mu g_mu;

static gpr_refcount g_refcount;

struct grpc_subchannel_key {
  grpc_subchannel_args args;
};

static bool g_force_creation = false;

static grpc_subchannel_key* create_key(
    const grpc_subchannel_args* args,
    grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) {
  grpc_subchannel_key* k =
      static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k)));
  k->args.args = copy_channel_args(args->args);
  return k;
}

grpc_subchannel_key* grpc_subchannel_key_create(
    const grpc_subchannel_args* args) {
  return create_key(args, grpc_channel_args_normalize);
}

static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) {
  return create_key(&k->args, grpc_channel_args_copy);
}

int grpc_subchannel_key_compare(const grpc_subchannel_key* a,
                                const grpc_subchannel_key* b) {
  // To pretend the keys are different, return a non-zero value.
  if (GPR_UNLIKELY(g_force_creation)) return 1;
  return grpc_channel_args_compare(a->args.args, b->args.args);
}

void grpc_subchannel_key_destroy(grpc_subchannel_key* k) {
  grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args));
  gpr_free(k);
}

static void sck_avl_destroy(void* p, void* unused) {
  grpc_subchannel_key_destroy(static_cast<grpc_subchannel_key*>(p));
}

static void* sck_avl_copy(void* p, void* unused) {
  return subchannel_key_copy(static_cast<grpc_subchannel_key*>(p));
}

static long sck_avl_compare(void* a, void* b, void* unused) {
  return grpc_subchannel_key_compare(static_cast<grpc_subchannel_key*>(a),
                                     static_cast<grpc_subchannel_key*>(b));
}

static void scv_avl_destroy(void* p, void* unused) {
  GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "subchannel_index");
}

static void* scv_avl_copy(void* p, void* unused) {
  GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "subchannel_index");
  return p;
}

static const grpc_avl_vtable subchannel_avl_vtable = {
    sck_avl_destroy,  // destroy_key
    sck_avl_copy,     // copy_key
    sck_avl_compare,  // compare_keys
    scv_avl_destroy,  // destroy_value
    scv_avl_copy      // copy_value
};

void grpc_subchannel_index_init(void) {
  g_subchannel_index = grpc_avl_create(&subchannel_avl_vtable);
  gpr_mu_init(&g_mu);
  gpr_ref_init(&g_refcount, 1);
}

void grpc_subchannel_index_shutdown(void) {
  // TODO(juanlishen): This refcounting mechanism may lead to memory leackage.
  // To solve that, we should force polling to flush any pending callbacks, then
  // shutdown safely.
  grpc_subchannel_index_unref();
}

void grpc_subchannel_index_unref(void) {
  if (gpr_unref(&g_refcount)) {
    gpr_mu_destroy(&g_mu);
    grpc_avl_unref(g_subchannel_index, nullptr);
  }
}

void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); }

grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) {
  // Lock, and take a reference to the subchannel index.
  // We don't need to do the search under a lock as avl's are immutable.
  gpr_mu_lock(&g_mu);
  grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr);
  gpr_mu_unlock(&g_mu);

  grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(
      (grpc_subchannel*)grpc_avl_get(index, key, nullptr), "index_find");
  grpc_avl_unref(index, nullptr);

  return c;
}

grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key,
                                                grpc_subchannel* constructed) {
  grpc_subchannel* c = nullptr;
  bool need_to_unref_constructed = false;

  while (c == nullptr) {
    need_to_unref_constructed = false;

    // Compare and swap loop:
    // - take a reference to the current index
    gpr_mu_lock(&g_mu);
    grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr);
    gpr_mu_unlock(&g_mu);

    // - Check to see if a subchannel already exists
    c = static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr));
    if (c != nullptr) {
      c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register");
    }
    if (c != nullptr) {
      // yes -> we're done
      need_to_unref_constructed = true;
    } else {
      // no -> update the avl and compare/swap
      grpc_avl updated = grpc_avl_add(
          grpc_avl_ref(index, nullptr), subchannel_key_copy(key),
          GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), nullptr);

      // it may happen (but it's expected to be unlikely)
      // that some other thread has changed the index:
      // compare/swap here to check that, and retry as necessary
      gpr_mu_lock(&g_mu);
      if (index.root == g_subchannel_index.root) {
        GPR_SWAP(grpc_avl, updated, g_subchannel_index);
        c = constructed;
      }
      gpr_mu_unlock(&g_mu);

      grpc_avl_unref(updated, nullptr);
    }
    grpc_avl_unref(index, nullptr);
  }

  if (need_to_unref_constructed) {
    GRPC_SUBCHANNEL_UNREF(constructed, "index_register");
  }

  return c;
}

void grpc_subchannel_index_unregister(grpc_subchannel_key* key,
                                      grpc_subchannel* constructed) {
  bool done = false;
  while (!done) {
    // Compare and swap loop:
    // - take a reference to the current index
    gpr_mu_lock(&g_mu);
    grpc_avl index = grpc_avl_ref(g_subchannel_index, nullptr);
    gpr_mu_unlock(&g_mu);

    // Check to see if this key still refers to the previously
    // registered subchannel
    grpc_subchannel* c =
        static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr));
    if (c != constructed) {
      grpc_avl_unref(index, nullptr);
      break;
    }

    // compare and swap the update (some other thread may have
    // mutated the index behind us)
    grpc_avl updated =
        grpc_avl_remove(grpc_avl_ref(index, nullptr), key, nullptr);

    gpr_mu_lock(&g_mu);
    if (index.root == g_subchannel_index.root) {
      GPR_SWAP(grpc_avl, updated, g_subchannel_index);
      done = true;
    }
    gpr_mu_unlock(&g_mu);

    grpc_avl_unref(updated, nullptr);
    grpc_avl_unref(index, nullptr);
  }
}

void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
  g_force_creation = force_creation;
}