All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
as_node.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2022 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
19 #include <aerospike/as_atomic.h>
20 #include <aerospike/as_config.h>
21 #include <aerospike/as_conn_pool.h>
22 #include <aerospike/as_error.h>
23 #include <aerospike/as_event.h>
24 #include <aerospike/as_socket.h>
25 #include <aerospike/as_partition.h>
26 #include <aerospike/as_queue.h>
27 #include <aerospike/as_vector.h>
28 
29 #if !defined(_MSC_VER)
30 #include <netinet/in.h>
31 #include <sys/uio.h>
32 #endif
33 
34 #ifdef __cplusplus
35 extern "C" {
36 #endif
37 
38 /******************************************************************************
39  * MACROS
40  *****************************************************************************/
41 
42 /**
43  * Maximum size (including NULL byte) of a hostname.
44  */
45 #define AS_HOSTNAME_SIZE 256
46 
47 /**
48  * Maximum size of node name
49  */
50 #define AS_NODE_NAME_SIZE 20
51 
52 // Leave this is in for backwards compatibility.
53 #define AS_NODE_NAME_MAX_SIZE AS_NODE_NAME_SIZE
54 
55 #define AS_FEATURES_PARTITION_SCAN (1 << 0)
56 #define AS_FEATURES_QUERY_SHOW (1 << 1)
57 #define AS_FEATURES_BATCH_ANY (1 << 2)
58 #define AS_FEATURES_PARTITION_QUERY (1 << 3)
59 
60 #define AS_ADDRESS4_MAX 4
61 #define AS_ADDRESS6_MAX 8
62 
63 /******************************************************************************
64  * TYPES
65  *****************************************************************************/
66 
67 /**
68  * Socket address information.
69  */
70 typedef struct as_address_s {
71  /**
72  * Socket IP address.
73  */
74  struct sockaddr_storage addr;
75 
76  /**
77  * Socket IP address string representation including port.
78  */
79  char name[AS_IP_ADDRESS_SIZE];
80 
81 } as_address;
82 
83 /**
84  * @private
85  * Host address alias information.
86  */
87 typedef struct as_alias_s {
88  /**
89  * Hostname or IP address string representation.
90  */
91  char name[AS_HOSTNAME_SIZE];
92 
93  /**
94  * Socket IP port.
95  */
96  uint16_t port;
97 
98 } as_alias;
99 
100 /**
101  * @private
102  * Rack.
103  */
104 typedef struct as_rack_s {
105  /**
106  * Namespace
107  */
109 
110  /**
111  * Rack ID
112  */
113  int rack_id;
114 
115 } as_rack;
116 
117 /**
118  * @private
119  * Racks.
120  */
121 typedef struct as_racks_s {
122  /**
123  * Reference count of racks array.
124  */
125  uint32_t ref_count;
126 
127  /**
128  * Rack ID when all namespaces use same rack.
129  */
130  int rack_id;
131 
132  /**
133  * Length of racks array.
134  */
135  uint32_t size;
136 
137  /**
138  * Pad to 8 byte boundary.
139  */
140  uint32_t pad;
141 
142  /**
143  * Racks array.
144  */
145  as_rack racks[];
146 
147 } as_racks;
148 
149 /**
150  * @private
151  * Session info.
152  */
153 typedef struct as_session_s {
154  /**
155  * Reference count of session.
156  */
157  uint32_t ref_count;
158 
159  /**
160  * Session token length.
161  */
162  uint32_t token_length;
163 
164  /**
165  * Session expiration for this node.
166  */
167  uint64_t expiration;
168 
169  /**
170  * Session token for this node.
171  */
172  uint8_t token[];
173 
174 } as_session;
175 
176 /**
177  * @private
178  * Async connection pool.
179  */
180 typedef struct as_async_conn_pool_s {
181  /**
182  * Async connection queue.
183  */
185 
186  /**
187  * Min connections allowed for this pool.
188  */
189  uint32_t min_size;
190 
191  /**
192  * Max connections allowed for this pool.
193  */
194  uint32_t limit;
195 
196  /**
197  * Total async connections opened.
198  */
199  uint32_t opened;
200 
201  /**
202  * Total async connections closed.
203  */
204  uint32_t closed;
205 
207 
208 struct as_cluster_s;
209 
210 /**
211  * Server node representation.
212  */
213 typedef struct as_node_s {
214  /**
215  * Reference count of node.
216  */
217  uint32_t ref_count;
218 
219  /**
220  * Reference count of node in partition maps.
221  */
223 
224  /**
225  * Server's generation count for partition management.
226  */
228 
229  /**
230  * Features supported by server. Stored in bitmap.
231  */
232  uint32_t features;
233 
234  /**
235  * TLS certificate name (needed for TLS only, NULL otherwise).
236  */
237  char* tls_name;
238 
239  /**
240  * The name of the node.
241  */
242  char name[AS_NODE_NAME_SIZE];
243 
244  /**
245  * Primary address index into addresses array.
246  */
247  uint32_t address_index;
248 
249  /**
250  * Number of IPv4 addresses.
251  */
252  uint32_t address4_size;
253 
254  /**
255  * Number of IPv6 addresses.
256  */
257  uint32_t address6_size;
258 
259  /**
260  * Array of IP addresses. Not thread-safe.
261  */
263 
264  /**
265  * Array of hostnames aliases. Not thread-safe.
266  */
267  as_vector /* <as_alias> */ aliases;
268 
269  /**
270  * Cluster from which this node resides.
271  */
272  struct as_cluster_s* cluster;
273 
274  /**
275  * Pools of current, cached sockets.
276  */
278 
279  /**
280  * Array of connection pools used in async commands. There is one pool per node/event loop.
281  * Only used by event loop threads. Not thread-safe.
282  */
284 
285  /**
286  * Pool of connections used in pipelined async commands. Also not thread-safe.
287  */
289 
290  /**
291  * Authentication session.
292  */
294 
295  /**
296  * Racks data.
297  */
299 
300  /**
301  * Socket used exclusively for cluster tend thread info requests.
302  */
304 
305  /**
306  * Connection queue iterator. Not atomic by design.
307  */
308  uint32_t conn_iter;
309 
310  /**
311  * Total sync connections opened.
312  */
314 
315  /**
316  * Total sync connections closed.
317  */
319 
320  /**
321  * Error count for this node's error_rate_window.
322  */
323  uint32_t error_count;
324 
325  /**
326  * Server's generation count for peers.
327  */
329 
330  /**
331  * Number of peers returned by server node.
332  */
333  uint32_t peers_count;
334 
335  /**
336  * Server's generation count for partition rebalancing.
337  */
339 
340  /**
341  * Number of other nodes that consider this node a member of the cluster.
342  */
343  uint32_t friends;
344 
345  /**
346  * Number of consecutive info request failures.
347  */
348  uint32_t failures;
349 
350  /**
351  * Shared memory node array index.
352  */
353  uint32_t index;
354 
355  /**
356  * Should user login to avoid session expiration.
357  */
358  uint8_t perform_login;
359 
360  /**
361  * Is node currently active.
362  */
363  uint8_t active;
364 
365  /**
366  * Did partition change in current cluster tend.
367  */
369 
370  /**
371  * Did rebalance generation change in current cluster tend.
372  */
374 
375 } as_node;
376 
377 /**
378  * @private
379  * Node discovery information.
380  */
381 typedef struct as_node_info_s {
382  /**
383  * Node name.
384  */
385  char name[AS_NODE_NAME_SIZE];
386 
387  /**
388  * Features supported by server. Stored in bitmap.
389  */
390  uint32_t features;
391 
392  /**
393  * Host.
394  */
396 
397  /**
398  * Validated socket.
399  */
401 
402  /**
403  * Socket address.
404  */
405  struct sockaddr_storage addr;
406 
407  /**
408  * Authentication session.
409  */
411 
412 } as_node_info;
413 
414 /******************************************************************************
415  * FUNCTIONS
416  ******************************************************************************/
417 
418 /**
419  * @private
420  * Create new cluster node.
421  */
422 as_node*
423 as_node_create(struct as_cluster_s* cluster, as_node_info* node_info);
424 
425 /**
426  * @private
427  * Close all connections in pool and free resources.
428  */
429 AS_EXTERN void
430 as_node_destroy(as_node* node);
431 
432 /**
433  * @private
434  * Create configured minimum number of connections.
435  */
436 void
438 
439 /**
440  * @private
441  * Check if node is active from a transaction thread.
442  */
443 static inline bool
445 {
446  return (bool)as_load_uint8_acq(&node->active);
447 }
448 
449 /**
450  * @private
451  * Set node to inactive.
452  */
453 static inline void
455 {
456  // Make volatile write so changes are reflected in other threads.
457  as_store_uint8_rls(&node->active, false);
458 }
459 
460 /**
461  * @private
462  * Read volatile node.
463  */
464 static inline as_node*
466 {
467  return (as_node*)as_load_ptr((void* const*)node);
468 }
469 
470 /**
471  * @private
472  * Reserve existing cluster node.
473  */
474 static inline void
476 {
477  as_incr_uint32(&node->ref_count);
478 }
479 
480 /**
481  * @private
482  * Set volatile node.
483  */
484 static inline void
486 {
487  as_store_ptr_rls((void**)trg, src);
488 }
489 
490 /**
491  * @private
492  * Release existing cluster node.
493  */
494 static inline void
496 {
497  if (as_aaf_uint32_rls(&node->ref_count, -1) == 0) {
498  as_fence_acq();
499  as_node_destroy(node);
500  }
501 }
502 
503 /**
504  * @private
505  * Release node on next cluster tend iteration.
506  */
507 void
509 
510 /**
511  * @private
512  * Add socket address to node addresses.
513  */
514 void
515 as_node_add_address(as_node* node, struct sockaddr* addr);
516 
517 /**
518  * @private
519  * Add hostname to node aliases.
520  */
521 void
522 as_node_add_alias(as_node* node, const char* hostname, uint16_t port);
523 
524 /**
525  * Get primary socket address.
526  */
527 static inline as_address*
529 {
530  return &node->addresses[node->address_index];
531 }
532 
533 /**
534  * Get socket address as a string.
535  */
536 static inline const char*
538 {
539  return node->addresses[node->address_index].name;
540 }
541 
542 /**
543  * @private
544  * Attempt to authenticate given current cluster's user and password.
545  */
546 as_status
547 as_node_authenticate_connection(struct as_cluster_s* cluster, uint64_t deadline_ms);
548 
549 /**
550  * @private
551  * Get a connection to the given node from pool and validate. Return 0 on success.
552  */
553 as_status
554 as_node_get_connection(as_error* err, as_node* node, uint32_t socket_timeout, uint64_t deadline_ms, as_socket* sock);
555 
556 /**
557  * @private
558  * Close a node's connection and update node/pool statistics.
559  */
560 static inline void
562 {
563  as_socket_close(sock);
565  as_conn_pool_decr(pool);
566 }
567 
568 /**
569  * @private
570  * Close a node's connection and update node statistics.
571  */
572 static inline void
574 {
575  as_socket_close(sock);
577 }
578 
579 /**
580  * @private
581  * Put connection back into pool.
582  */
583 static inline void
585 {
586  // Save pool.
587  as_conn_pool* pool = sock->pool;
588 
589  // Update last used timestamp.
590  sock->last_used = cf_getns();
591 
592  // Put into pool.
593  if (! as_conn_pool_push_head(pool, sock)) {
594  as_node_close_connection(node, sock, pool);
595  }
596 }
597 
598 /**
599  * @private
600  * Balance sync connections.
601  */
602 void
604 
605 /**
606  * @private
607  * Are hosts equal.
608  */
609 static inline bool
611 {
612  return strcmp(h1->name, h2->name) == 0 && h1->port == h2->port;
613 }
614 
615 /**
616  * @private
617  * Destroy node_info contents.
618  */
619 static inline void
621 {
622  as_socket_close(&node_info->socket);
623  cf_free(node_info->session);
624 }
625 
626 /**
627  * @private
628  * Tell tend thread to perform another node login.
629  */
630 void
632 
633 /**
634  * @private
635  * Does node contain rack.
636  */
637 bool
638 as_node_has_rack(as_node* node, const char* ns, int rack_id);
639 
640 /**
641  * @private
642  * Volatile read session pointer.
643  */
644 static inline as_session*
646 {
647  return (as_session*)as_load_ptr((void* const*)session);
648 }
649 
650 /**
651  * @private
652  * Release existing session.
653  */
654 static inline void
656 {
657  if (as_aaf_uint32_rls(&session->ref_count, -1) == 0) {
658  cf_free(session);
659  }
660 }
661 
662 #ifdef __cplusplus
663 } // end extern "C"
664 #endif
uint32_t min_size
Definition: as_node.h:189
#define as_aaf_uint32_rls(_target, _value)
static void as_node_put_connection(as_node *node, as_socket *sock)
Definition: as_node.h:584
static void as_node_deactivate(as_node *node)
Definition: as_node.h:454
static as_session * as_session_load(as_session **session)
Definition: as_node.h:645
as_host host
Definition: as_node.h:395
as_conn_pool * sync_conn_pools
Definition: as_node.h:277
void as_node_balance_connections(as_node *node)
static void as_conn_pool_decr(as_conn_pool *pool)
Definition: as_conn_pool.h:136
uint8_t active
Definition: as_node.h:363
as_namespace ns
Definition: as_scan.h:267
static void as_node_close_connection(as_node *node, as_socket *sock, as_conn_pool *pool)
Definition: as_node.h:561
#define AS_IP_ADDRESS_SIZE
Definition: as_address.h:32
char name[AS_IP_ADDRESS_SIZE]
Definition: as_node.h:79
#define AS_HOSTNAME_SIZE
Definition: as_node.h:45
uint32_t rebalance_generation
Definition: as_node.h:338
as_status
Definition: as_status.h:30
bool rebalance_changed
Definition: as_node.h:373
struct as_cluster_s * cluster
Definition: as_node.h:272
as_node * as_node_create(struct as_cluster_s *cluster, as_node_info *node_info)
AS_EXTERN void as_node_destroy(as_node *node)
uint32_t sync_conns_closed
Definition: as_node.h:318
uint32_t address_index
Definition: as_node.h:247
static void as_node_store(as_node **trg, as_node *src)
Definition: as_node.h:485
as_queue queue
Definition: as_node.h:184
uint32_t ref_count
Definition: as_node.h:125
uint32_t address4_size
Definition: as_node.h:252
uint8_t perform_login
Definition: as_node.h:358
as_vector aliases
Definition: as_node.h:267
static bool as_host_equals(as_host *h1, as_host *h2)
Definition: as_node.h:610
uint32_t peers_generation
Definition: as_node.h:328
#define AS_NODE_NAME_SIZE
Definition: as_node.h:50
as_socket socket
Definition: as_node.h:400
char * tls_name
Definition: as_node.h:237
#define AS_EXTERN
Definition: as_std.h:25
uint64_t expiration
Definition: as_node.h:167
uint32_t index
Definition: as_node.h:353
uint32_t sync_conns_opened
Definition: as_node.h:313
as_racks * racks
Definition: as_node.h:298
as_async_conn_pool * async_conn_pools
Definition: as_node.h:283
uint32_t ref_count
Definition: as_node.h:217
#define as_load_uint8_acq(_target)
Definition: as_atomic_win.h:97
static void as_node_close_socket(as_node *node, as_socket *sock)
Definition: as_node.h:573
uint32_t pad
Definition: as_node.h:140
void as_node_add_address(as_node *node, struct sockaddr *addr)
char * name
Definition: as_host.h:37
uint32_t size
Definition: as_node.h:135
uint32_t features
Definition: as_node.h:390
static bool as_node_is_active(const as_node *node)
Definition: as_node.h:444
bool as_node_has_rack(as_node *node, const char *ns, int rack_id)
bool partition_changed
Definition: as_node.h:368
uint32_t ref_count
Definition: as_node.h:157
as_session * session
Definition: as_node.h:410
as_status as_node_authenticate_connection(struct as_cluster_s *cluster, uint64_t deadline_ms)
uint32_t limit
Definition: as_node.h:194
struct as_conn_pool_s * pool
Definition: as_socket.h:87
uint32_t features
Definition: as_node.h:232
uint64_t last_used
Definition: as_socket.h:88
int rack_id
Definition: as_node.h:113
#define as_store_uint8_rls(_target, _value)
uint32_t failures
Definition: as_node.h:348
as_socket info_socket
Definition: as_node.h:303
void as_node_signal_login(as_node *node)
as_async_conn_pool * pipe_conn_pools
Definition: as_node.h:288
uint32_t partition_generation
Definition: as_node.h:227
uint32_t opened
Definition: as_node.h:199
void as_node_add_alias(as_node *node, const char *hostname, uint16_t port)
#define as_store_ptr_rls(_target, _value)
static as_node * as_node_load(as_node **node)
Definition: as_node.h:465
uint32_t closed
Definition: as_node.h:204
as_session * session
Definition: as_node.h:293
uint16_t port
Definition: as_node.h:96
#define as_load_ptr(_target)
Definition: as_atomic_win.h:47
int rack_id
Definition: as_node.h:130
uint32_t partition_ref_count
Definition: as_node.h:222
uint16_t port
Definition: as_host.h:47
static void as_node_release(as_node *node)
Definition: as_node.h:495
uint32_t peers_count
Definition: as_node.h:333
static const char * as_node_get_address_string(as_node *node)
Definition: as_node.h:537
static void as_node_info_destroy(as_node_info *node_info)
Definition: as_node.h:620
uint32_t address6_size
Definition: as_node.h:257
void as_socket_close(as_socket *sock)
static void as_session_release(as_session *session)
Definition: as_node.h:655
uint32_t friends
Definition: as_node.h:343
void as_node_create_min_connections(as_node *node)
#define as_fence_acq()
#define as_incr_uint32(_target)
uint32_t conn_iter
Definition: as_node.h:308
uint32_t error_count
Definition: as_node.h:323
as_address * addresses
Definition: as_node.h:262
#define AS_MAX_NAMESPACE_SIZE
Definition: as_partition.h:40
void as_node_release_delayed(as_node *node)
uint32_t token_length
Definition: as_node.h:162
as_status as_node_get_connection(as_error *err, as_node *node, uint32_t socket_timeout, uint64_t deadline_ms, as_socket *sock)
static bool as_conn_pool_push_head(as_conn_pool *pool, as_socket *sock)
Definition: as_conn_pool.h:99
static void as_node_reserve(as_node *node)
Definition: as_node.h:475
static as_address * as_node_get_address(as_node *node)
Definition: as_node.h:528