aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/transport/transport.h
blob: 585b9dfae9196ac7a92827daa2ac110d22688822 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
/*
 *
 * Copyright 2015 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
#define GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H

#include <grpc/support/port_platform.h>

#include <stddef.h>

#include "src/core/lib/channel/context.h"
#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/metadata_batch.h"

/* Minimum and maximum protocol accepted versions. */
#define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2
#define GRPC_PROTOCOL_VERSION_MAX_MINOR 1
#define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2
#define GRPC_PROTOCOL_VERSION_MIN_MINOR 1

/* forward declarations */

typedef struct grpc_transport grpc_transport;

/* grpc_stream doesn't actually exist. It's used as a typesafe
   opaque pointer for whatever data the transport wants to track
   for a stream. */
typedef struct grpc_stream grpc_stream;

extern grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount;

typedef struct grpc_stream_refcount {
  gpr_refcount refs;
  grpc_closure destroy;
#ifndef NDEBUG
  const char* object_type;
#endif
  grpc_slice_refcount slice_refcount;
} grpc_stream_refcount;

#ifndef NDEBUG
void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
                          grpc_iomgr_cb_func cb, void* cb_arg,
                          const char* object_type);
void grpc_stream_ref(grpc_stream_refcount* refcount, const char* reason);
void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason);
#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
  grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
#else
void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
                          grpc_iomgr_cb_func cb, void* cb_arg);
void grpc_stream_ref(grpc_stream_refcount* refcount);
void grpc_stream_unref(grpc_stream_refcount* refcount);
#define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
  grpc_stream_ref_init(rc, ir, cb, cb_arg)
#endif

/* Wrap a buffer that is owned by some stream object into a slice that shares
   the same refcount */
grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount* refcount,
                                               void* buffer, size_t length);

typedef struct {
  uint64_t framing_bytes;
  uint64_t data_bytes;
  uint64_t header_bytes;
} grpc_transport_one_way_stats;

typedef struct grpc_transport_stream_stats {
  grpc_transport_one_way_stats incoming;
  grpc_transport_one_way_stats outgoing;
} grpc_transport_stream_stats;

void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats* from,
                                       grpc_transport_one_way_stats* to);

void grpc_transport_move_stats(grpc_transport_stream_stats* from,
                               grpc_transport_stream_stats* to);

// This struct (which is present in both grpc_transport_stream_op_batch
// and grpc_transport_op_batch) is a convenience to allow filters or
// transports to schedule a closure related to a particular batch without
// having to allocate memory.  The general pattern is to initialize the
// closure with the callback arg set to the batch and extra_arg set to
// whatever state is associated with the handler (e.g., the call element
// or the transport stream object).
//
// Note that this can only be used by the current handler of a given
// batch on the way down the stack (i.e., whichever filter or transport is
// currently handling the batch).  Once a filter or transport passes control
// of the batch to the next handler, it cannot depend on the contents of
// this struct anymore, because the next handler may reuse it.
typedef struct {
  void* extra_arg;
  grpc_closure closure;
} grpc_handler_private_op_data;

typedef struct grpc_transport_stream_op_batch_payload
    grpc_transport_stream_op_batch_payload;

/* Transport stream op: a set of operations to perform on a transport
   against a single stream */
typedef struct grpc_transport_stream_op_batch {
  /** Should be scheduled when all of the non-recv operations in the batch
      are complete.

      The recv ops (recv_initial_metadata, recv_message, and
      recv_trailing_metadata) each have their own callbacks.  If a batch
      contains both recv ops and non-recv ops, on_complete should be
      scheduled as soon as the non-recv ops are complete, regardless of
      whether or not the recv ops are complete.  If a batch contains
      only recv ops, on_complete can be null. */
  grpc_closure* on_complete;

  /** Values for the stream op (fields set are determined by flags above) */
  grpc_transport_stream_op_batch_payload* payload;

  /** Send initial metadata to the peer, from the provided metadata batch. */
  bool send_initial_metadata : 1;

  /** Send trailing metadata to the peer, from the provided metadata batch. */
  bool send_trailing_metadata : 1;

  /** Send message data to the peer, from the provided byte stream. */
  bool send_message : 1;

  /** Receive initial metadata from the stream, into provided metadata batch. */
  bool recv_initial_metadata : 1;

  /** Receive message data from the stream, into provided byte stream. */
  bool recv_message : 1;

  /** Receive trailing metadata from the stream, into provided metadata batch.
   */
  bool recv_trailing_metadata : 1;

  /** Cancel this stream with the provided error */
  bool cancel_stream : 1;

  /***************************************************************************
   * remaining fields are initialized and used at the discretion of the
   * current handler of the op */

  grpc_handler_private_op_data handler_private;
} grpc_transport_stream_op_batch;

struct grpc_transport_stream_op_batch_payload {
  struct {
    grpc_metadata_batch* send_initial_metadata;
    /** Iff send_initial_metadata != NULL, flags associated with
        send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
    uint32_t send_initial_metadata_flags;
    // If non-NULL, will be set by the transport to the peer string (a char*).
    // The transport retains ownership of the string.
    // Note: This pointer may be used by the transport after the
    // send_initial_metadata op is completed.  It must remain valid
    // until the call is destroyed.
    gpr_atm* peer_string;
  } send_initial_metadata;

  struct {
    grpc_metadata_batch* send_trailing_metadata;
  } send_trailing_metadata;

  struct {
    // The transport (or a filter that decides to return a failure before
    // the op gets down to the transport) takes ownership.
    // The batch's on_complete will not be called until after the byte
    // stream is orphaned.
    grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message;
  } send_message;

  struct {
    grpc_metadata_batch* recv_initial_metadata;
    // Flags are used only on the server side.  If non-null, will be set to
    // a bitfield of the GRPC_INITIAL_METADATA_xxx macros (e.g., to
    // indicate if the call is idempotent).
    uint32_t* recv_flags;
    /** Should be enqueued when initial metadata is ready to be processed. */
    grpc_closure* recv_initial_metadata_ready;
    // If not NULL, will be set to true if trailing metadata is
    // immediately available.  This may be a signal that we received a
    // Trailers-Only response.
    bool* trailing_metadata_available;
    // If non-NULL, will be set by the transport to the peer string (a char*).
    // The transport retains ownership of the string.
    // Note: This pointer may be used by the transport after the
    // recv_initial_metadata op is completed.  It must remain valid
    // until the call is destroyed.
    gpr_atm* peer_string;
  } recv_initial_metadata;

  struct {
    // Will be set by the transport to point to the byte stream
    // containing a received message.
    // Will be NULL if trailing metadata is received instead of a message.
    grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
    /** Should be enqueued when one message is ready to be processed. */
    grpc_closure* recv_message_ready;
  } recv_message;

  struct {
    grpc_metadata_batch* recv_trailing_metadata;
    grpc_transport_stream_stats* collect_stats;
    /** Should be enqueued when initial metadata is ready to be processed. */
    grpc_closure* recv_trailing_metadata_ready;
  } recv_trailing_metadata;

  /** Forcefully close this stream.
      The HTTP2 semantics should be:
      - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
        trailing metadata has not been sent, send trailing metadata with status
        and message from cancel_error (use grpc_error_get_status) followed by
        a RST_STREAM with error=GRPC_CHTTP2_NO_ERROR to force a full close
      - at all other times: use grpc_error_get_status to get a status code, and
        convert to a HTTP2 error code using
        grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this
        error. */
  struct {
    // Error contract: the transport that gets this op must cause cancel_error
    //                 to be unref'ed after processing it
    grpc_error* cancel_error;
  } cancel_stream;

  /* Indexes correspond to grpc_context_index enum values */
  grpc_call_context_element* context;
};

/** Transport op: a set of operations to perform on a transport as a whole */
typedef struct grpc_transport_op {
  /** Called when processing of this op is done. */
  grpc_closure* on_consumed;
  /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
  grpc_closure* on_connectivity_state_change;
  grpc_connectivity_state* connectivity_state;
  /** should the transport be disconnected
   * Error contract: the transport that gets this op must cause
   *                 disconnect_with_error to be unref'ed after processing it */
  grpc_error* disconnect_with_error;
  /** what should the goaway contain?
   * Error contract: the transport that gets this op must cause
   *                 goaway_error to be unref'ed after processing it */
  grpc_error* goaway_error;
  /** set the callback for accepting new streams;
      this is a permanent callback, unlike the other one-shot closures.
      If true, the callback is set to set_accept_stream_fn, with its
      user_data argument set to set_accept_stream_user_data */
  bool set_accept_stream;
  void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport,
                               const void* server_data);
  void* set_accept_stream_user_data;
  /** add this transport to a pollset */
  grpc_pollset* bind_pollset;
  /** add this transport to a pollset_set */
  grpc_pollset_set* bind_pollset_set;
  /** send a ping, if either on_initiate or on_ack is not NULL */
  struct {
    /** Ping may be delayed by the transport, on_initiate callback will be
        called when the ping is actually being sent. */
    grpc_closure* on_initiate;
    /** Called when the ping ack is received */
    grpc_closure* on_ack;
  } send_ping;

  /***************************************************************************
   * remaining fields are initialized and used at the discretion of the
   * transport implementation */

  grpc_handler_private_op_data handler_private;
} grpc_transport_op;

/* Returns the amount of memory required to store a grpc_stream for this
   transport */
size_t grpc_transport_stream_size(grpc_transport* transport);

/* Initialize transport data for a stream.

   Returns 0 on success, any other (transport-defined) value for failure.
   May assume that stream contains all-zeros.

   Arguments:
     transport   - the transport on which to create this stream
     stream      - a pointer to uninitialized memory to initialize
     server_data - either NULL for a client initiated stream, or a pointer
                   supplied from the accept_stream callback function */
int grpc_transport_init_stream(grpc_transport* transport, grpc_stream* stream,
                               grpc_stream_refcount* refcount,
                               const void* server_data, gpr_arena* arena);

void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
                             grpc_polling_entity* pollent);

/* Destroy transport data for a stream.

   Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been
   received by the up-layer. Must not be called in the same call stack as
   recv_frame.

   Arguments:
     transport - the transport on which to create this stream
     stream    - the grpc_stream to destroy (memory is still owned by the
                 caller, but any child memory must be cleaned up) */
void grpc_transport_destroy_stream(grpc_transport* transport,
                                   grpc_stream* stream,
                                   grpc_closure* then_schedule_closure);

void grpc_transport_stream_op_batch_finish_with_failure(
    grpc_transport_stream_op_batch* op, grpc_error* error,
    grpc_call_combiner* call_combiner);

char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op);
char* grpc_transport_op_string(grpc_transport_op* op);

/* Send a batch of operations on a transport

   Takes ownership of any objects contained in ops.

   Arguments:
     transport - the transport on which to initiate the stream
     stream    - the stream on which to send the operations. This must be
                 non-NULL and previously initialized by the same transport.
     op        - a grpc_transport_stream_op_batch specifying the op to perform
   */
void grpc_transport_perform_stream_op(grpc_transport* transport,
                                      grpc_stream* stream,
                                      grpc_transport_stream_op_batch* op);

void grpc_transport_perform_op(grpc_transport* transport,
                               grpc_transport_op* op);

/* Send a ping on a transport

   Calls cb with user data when a response is received. */
void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb);

/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
                           grpc_slice debug_data);

/* Destroy the transport */
void grpc_transport_destroy(grpc_transport* transport);

/* Get the endpoint used by \a transport */
grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport);

/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
   \a on_consumed and then delete the returned transport op */
grpc_transport_op* grpc_make_transport_op(grpc_closure* on_consumed);
/* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_consumed
   closure
   to \a on_consumed and then delete the returned transport op */
grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
    grpc_closure* on_consumed);

#endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */