All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
as_cluster.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2023 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
19 #include <aerospike/as_atomic.h>
20 #include <aerospike/as_config.h>
21 #include <aerospike/as_node.h>
22 #include <aerospike/as_partition.h>
23 #include <aerospike/as_policy.h>
25 
26 #ifdef __cplusplus
27 extern "C" {
28 #endif
29 
30 /******************************************************************************
31  * TYPES
32  *****************************************************************************/
33 
34 /**
35  * @private
36  * Reference counted array of server node pointers.
37  */
38 typedef struct as_nodes_s {
39  /**
40  * @private
41  * Reference count of node array.
42  */
43  uint32_t ref_count;
44 
45  /**
46  * @private
47  * Length of node array.
48  */
49  uint32_t size;
50 
51  /**
52  * @private
53  * Server node array.
54  */
55  as_node* array[];
56 } as_nodes;
57 
58 /**
59  * @private
60  * Cluster state for an event loop.
61  */
62 typedef struct as_event_state_s {
63  /**
64  * Cluster's pending command count for this event loop.
65  */
66  int pending;
67 
68  /**
69  * Is cluster closed for this event loop.
70  */
71  bool closed;
73 
74 /**
75  * @private
76  * Reference counted release function definition.
77  */
78 typedef void (*as_release_fn) (void* value);
79 
80 /**
81  * @private
82  * Reference counted data to be garbage collected.
83  */
84 typedef struct as_gc_item_s {
85  /**
86  * @private
87  * Reference counted data to be garbage collected.
88  */
89  void* data;
90 
91  /**
92  * @private
93  * Release function.
94  */
96 } as_gc_item;
97 
98 /**
99  * Cluster of server nodes.
100  */
101 typedef struct as_cluster_s {
102  /**
103  * @private
104  * Active nodes in cluster.
105  */
107 
108  /**
109  * @private
110  * Hints for best node for a partition.
111  */
113 
114  /**
115  * @private
116  * Nodes to be garbage collected.
117  */
118  as_vector* /* <as_gc_item> */ gc;
119 
120  /**
121  * @private
122  * Shared memory implementation of cluster.
123  */
124  struct as_shm_info_s* shm_info;
125 
126  /**
127  * @private
128  * User name in UTF-8 encoded bytes.
129  */
130  char* user;
131 
132  /**
133  * @private
134  * Password in clear text.
135  */
136  char* password;
137 
138  /**
139  * @private
140  * Password in hashed format.
141  */
143 
144  /**
145  * @private
146  * Expected cluster name for all nodes. May be null.
147  */
149 
150  /**
151  * Cluster event function that will be called when nodes are added/removed from the cluster.
152  */
154 
155  /**
156  * Cluster event user data that will be passed back to event_callback.
157  */
159 
160  /**
161  * Cluster state for all event loops.
162  */
164 
165  /**
166  * @private
167  * Initial seed hosts specified by user.
168  */
169  as_vector* /* <as_host> */ seeds;
170 
171  /**
172  * @private
173  * A IP translation table is used in cases where different clients use different server
174  * IP addresses. This may be necessary when using clients from both inside and outside
175  * a local area network. Default is no translation.
176  *
177  * The key is the IP address returned from friend info requests to other servers. The
178  * value is the real IP address used to connect to the server.
179  */
180  as_vector* /* <as_addr_map> */ ip_map;
181 
182  /**
183  * @private
184  * TLS parameters
185  */
187 
188  /**
189  * @private
190  * Pool of threads used to query server nodes in parallel for batch, scan and query.
191  */
193 
194  /**
195  * @private
196  * Cluster tend thread.
197  */
198  pthread_t tend_thread;
199 
200  /**
201  * @private
202  * Lock for adding/removing seeds.
203  */
204  pthread_mutex_t seed_lock;
205 
206  /**
207  * @private
208  * Lock for the tend thread to wait on with the tend interval as timeout.
209  * Normally locked, resulting in waiting a full interval between
210  * tend iterations. Upon cluster shutdown, unlocked by the main
211  * thread, allowing a fast termination of the tend thread.
212  */
213  pthread_mutex_t tend_lock;
214 
215  /**
216  * @private
217  * Tend thread identifier to be used with tend_lock.
218  */
219  pthread_cond_t tend_cond;
220 
221  /**
222  * @private
223  * Maximum socket idle to validate connections in transactions.
224  */
226 
227  /**
228  * @private
229  * Maximum socket idle to trim peak connections to min connections.
230  */
232 
233  /**
234  * @private
235  * Rack ids
236  */
237  int* rack_ids;
238 
239  /**
240  * @private
241  * Rack ids size
242  */
243  uint32_t rack_ids_size;
244 
245  /**
246  * @private
247  * Max errors per node per error_rate_window.
248  */
249  uint32_t max_error_rate;
250 
251  /**
252  * @private
253  * Number of tend iterations defining window for max_error_rate.
254  */
256 
257  /**
258  * @private
259  * Milliseconds between cluster tends.
260  */
261  uint32_t tend_interval;
262 
263  /**
264  * @private
265  * Cluster tend counter.
266  */
267  uint32_t tend_count;
268 
269  /**
270  * @private
271  * Minimum sync connections per node.
272  */
274 
275  /**
276  * @private
277  * Maximum sync connections per node.
278  */
280 
281  /**
282  * @private
283  * Minimum async connections per node.
284  */
286 
287  /**
288  * @private
289  * Maximum async (non-pipeline) connections per node.
290  */
292 
293  /**
294  * @private
295  * Maximum pipeline connections per node.
296  */
298 
299  /**
300  * @private
301  * Number of synchronous connection pools used for each node.
302  */
304 
305  /**
306  * @private
307  * Initial connection timeout in milliseconds.
308  */
309  uint32_t conn_timeout_ms;
310 
311  /**
312  * @private
313  * Node login timeout in milliseconds.
314  */
316 
317  /**
318  * @private
319  * Random node index.
320  */
321  uint32_t node_index;
322 
323  /**
324  * @private
325  * Count of add node failures in the most recent cluster tend iteration.
326  */
328 
329  /**
330  * @private
331  * Assign tend thread to this specific CPU ID.
332  */
334 
335  /**
336  * @private
337  * Authentication mode.
338  */
340 
341  /**
342  * @private
343  * Total number of data partitions used by cluster.
344  */
345  uint16_t n_partitions;
346 
347  /**
348  * @private
349  * If "services-alternate" should be used instead of "services"
350  */
352 
353  /**
354  * @private
355  * Request server rack ids.
356  */
358 
359  /**
360  * @private
361  * Is authentication enabled
362  */
364 
365  /**
366  * @private
367  * Does cluster support partition queries.
368  */
370 
371  /**
372  * @private
373  * Fail on cluster init if seed node and all peers are not reachable.
374  */
376 
377  /**
378  * @private
379  * Should continue to tend cluster.
380  */
381  volatile bool valid;
382 } as_cluster;
383 
384 /******************************************************************************
385  * FUNCTIONS
386  ******************************************************************************/
387 
388 /**
389  * Create and initialize cluster.
390  */
391 as_status
392 as_cluster_create(as_config* config, as_error* err, as_cluster** cluster);
393 
394 /**
395  * Close all connections and release memory associated with cluster.
396  */
397 void
399 
400 /**
401  * Is cluster connected to any server nodes.
402  */
403 bool
405 
406 /**
407  * Get all node names in cluster.
408  */
409 void
410 as_cluster_get_node_names(as_cluster* cluster, int* n_nodes, char** node_names);
411 
412 /**
413  * Reserve reference counted access to cluster nodes.
414  */
415 static inline as_nodes*
417 {
418  as_nodes* nodes = (as_nodes*)as_load_ptr((void* const*)&cluster->nodes);
419  as_incr_uint32(&nodes->ref_count);
420  return nodes;
421 }
422 
423 /**
424  * Release each individual node and free nodes struct.
425  */
426 AS_EXTERN void
427 as_nodes_destroy(as_nodes* nodes);
428 
429 /**
430  * Release reference counted access to cluster nodes.
431  */
432 static inline void
434 {
435  if (as_aaf_uint32_rls(&nodes->ref_count, -1) == 0) {
436  as_fence_acq();
437  as_nodes_destroy(nodes);
438  }
439 }
440 
441 /**
442  * Reserve nodes. Return error if cluster is empty.
443  */
446 
447 /**
448  * Release nodes.
449  */
450 static inline void
452 {
453  as_nodes_release(nodes);
454 }
455 
456 /**
457  * Verify cluster contains nodes and return node count.
458  */
459 as_status
460 as_cluster_validate_size(as_cluster* cluster, as_error* err, uint32_t* size);
461 
462 /**
463  * Add seed to cluster.
464  */
465 AS_EXTERN void
466 as_cluster_add_seed(as_cluster* cluster, const char* hostname, const char* tls_name, uint16_t port);
467 
468 /**
469  * Remove seed from cluster.
470  */
471 AS_EXTERN void
472 as_cluster_remove_seed(as_cluster* cluster, const char* hostname, uint16_t port);
473 
474 /**
475  * @private
476  * Change user and password that is used to authenticate with cluster servers.
477  */
478 void
479 as_cluster_change_password(as_cluster* cluster, const char* user, const char* password, const char* password_hash);
480 
481 /**
482  * @private
483  * Get random node in the cluster.
484  * as_nodes_release() must be called when done with node.
485  */
488 
489 /**
490  * @private
491  * Get node given node name.
492  * as_nodes_release() must be called when done with node.
493  */
495 as_node_get_by_name(as_cluster* cluster, const char* name);
496 
497 /**
498  * @private
499  * Get mapped node given partition and replica. This function does not reserve the node.
500  * The caller must reserve the node for future use.
501  */
502 as_node*
504  as_cluster* cluster, const char* ns, as_partition* p, as_node* prev_node,
505  as_policy_replica replica, uint8_t replica_size, uint8_t* replica_index
506  );
507 
508 struct as_partition_shm_s;
509 
510 /**
511  * @private
512  * Get mapped node given partition and replica. The function does not reserve the node.
513  * The caller must reserve the node for future use.
514  */
515 as_node*
517  as_cluster* cluster, const char* ns, struct as_partition_shm_s* partition,
518  as_node* prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t* replica_index
519  );
520 
521 /**
522  * @private
523  * Get mapped node given partition and replica. This function does not reserve the node.
524  * The caller must reserve the node for future use.
525  */
526 static inline as_node*
528  as_cluster* cluster, const char* ns, void* partition, as_node* prev_node,
529  as_policy_replica replica, uint8_t replica_size, uint8_t* replica_index
530  )
531 {
532  if (cluster->shm_info) {
533  return as_partition_shm_get_node(cluster, ns, (struct as_partition_shm_s*)partition,
534  prev_node, replica, replica_size, replica_index);
535  }
536  else {
537  return as_partition_reg_get_node(cluster, ns, (as_partition*)partition, prev_node,
538  replica, replica_size, replica_index);
539  }
540 }
541 
542 /**
543  * @private
544  * Increment node's error count.
545  */
546 static inline void
548 {
549  if (node->cluster->max_error_rate > 0) {
550  as_incr_uint32(&node->error_count);
551  }
552 }
553 
554 /**
555  * @private
556  * Reset node's error count.
557  */
558 static inline void
560 {
561  as_store_uint32(&node->error_count, 0);
562 }
563 
564 /**
565  * @private
566  * Get node's error count.
567  */
568 static inline uint32_t
570 {
571  return as_load_uint32(&node->error_count);
572 }
573 
574 /**
575  * @private
576  * Validate node's error count.
577  */
578 static inline bool
580 {
581  uint32_t max = node->cluster->max_error_rate;
582  return max == 0 || max >= as_load_uint32(&node->error_count);
583 }
584 
585 /**
586  * @private
587  * Close connection and increment node's error count.
588  */
589 static inline void
591 {
592  as_node_close_connection(node, sock, pool);
594 }
595 
596 /**
597  * @private
598  * Put connection in pool and increment node's error count.
599  */
600 static inline void
602 {
603  as_node_put_connection(node, sock);
605 }
606 
607 #ifdef __cplusplus
608 } // end extern "C"
609 #endif
#define as_aaf_uint32_rls(_target, _value)
static void as_node_put_connection(as_node *node, as_socket *sock)
Definition: as_node.h:584
as_thread_pool thread_pool
Definition: as_cluster.h:192
#define as_store_uint32(_target, _value)
uint32_t conn_pools_per_node
Definition: as_cluster.h:303
pthread_mutex_t tend_lock
Definition: as_cluster.h:213
char * password_hash
Definition: as_cluster.h:142
uint32_t ref_count
Definition: as_cluster.h:43
as_namespace ns
Definition: as_scan.h:267
as_policy_replica
Definition: as_policy.h:272
static void as_node_close_connection(as_node *node, as_socket *sock, as_conn_pool *pool)
Definition: as_node.h:561
as_nodes * nodes
Definition: as_cluster.h:106
static void as_node_put_conn_error(as_node *node, as_socket *sock)
Definition: as_cluster.h:601
as_status
Definition: as_status.h:30
struct as_cluster_s * cluster
Definition: as_node.h:272
void(* as_release_fn)(void *value)
Definition: as_cluster.h:78
static void as_node_incr_error_count(as_node *node)
Definition: as_cluster.h:547
as_status as_cluster_create(as_config *config, as_error *err, as_cluster **cluster)
void * data
Definition: as_cluster.h:89
as_tls_context * tls_ctx
Definition: as_cluster.h:186
uint32_t conn_timeout_ms
Definition: as_cluster.h:309
char * password
Definition: as_cluster.h:136
AS_EXTERN void as_nodes_destroy(as_nodes *nodes)
static void as_cluster_release_all_nodes(as_nodes *nodes)
Definition: as_cluster.h:451
uint32_t min_conns_per_node
Definition: as_cluster.h:273
as_release_fn release_fn
Definition: as_cluster.h:95
pthread_t tend_thread
Definition: as_cluster.h:198
as_node * as_partition_reg_get_node(as_cluster *cluster, const char *ns, as_partition *p, as_node *prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t *replica_index)
AS_EXTERN as_status as_cluster_reserve_all_nodes(as_cluster *cluster, as_error *err, as_nodes **nodes)
bool use_services_alternate
Definition: as_cluster.h:351
uint32_t async_max_conns_per_node
Definition: as_cluster.h:291
uint32_t max_error_rate
Definition: as_cluster.h:249
as_vector * seeds
Definition: as_cluster.h:169
uint32_t max_conns_per_node
Definition: as_cluster.h:279
#define AS_EXTERN
Definition: as_std.h:25
uint32_t invalid_node_count
Definition: as_cluster.h:327
void as_cluster_get_node_names(as_cluster *cluster, int *n_nodes, char **node_names)
uint32_t login_timeout_ms
Definition: as_cluster.h:315
uint32_t async_min_conns_per_node
Definition: as_cluster.h:285
char * user
Definition: as_cluster.h:130
as_vector * ip_map
Definition: as_cluster.h:180
as_partition_tables partition_tables
Definition: as_cluster.h:112
as_cluster_event_callback event_callback
Definition: as_cluster.h:153
uint32_t node_index
Definition: as_cluster.h:321
as_auth_mode
Definition: as_config.h:91
as_auth_mode auth_mode
Definition: as_cluster.h:339
pthread_cond_t tend_cond
Definition: as_cluster.h:219
uint64_t max_socket_idle_ns_tran
Definition: as_cluster.h:225
uint32_t tend_interval
Definition: as_cluster.h:261
uint32_t tend_count
Definition: as_cluster.h:267
#define as_load_uint32(_target)
Definition: as_atomic_win.h:56
int tend_thread_cpu
Definition: as_cluster.h:333
char * cluster_name
Definition: as_cluster.h:148
uint32_t size
Definition: as_cluster.h:49
bool has_partition_query
Definition: as_cluster.h:369
bool auth_enabled
Definition: as_cluster.h:363
uint16_t n_partitions
Definition: as_cluster.h:345
uint32_t pipe_max_conns_per_node
Definition: as_cluster.h:297
uint32_t rack_ids_size
Definition: as_cluster.h:243
static as_node * as_partition_get_node(as_cluster *cluster, const char *ns, void *partition, as_node *prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t *replica_index)
Definition: as_cluster.h:527
struct as_shm_info_s * shm_info
Definition: as_cluster.h:124
bool fail_if_not_connected
Definition: as_cluster.h:375
void(* as_cluster_event_callback)(as_cluster_event *event)
Definition: as_config.h:176
pthread_mutex_t seed_lock
Definition: as_cluster.h:204
AS_EXTERN void as_cluster_add_seed(as_cluster *cluster, const char *hostname, const char *tls_name, uint16_t port)
static bool as_node_valid_error_count(as_node *node)
Definition: as_cluster.h:579
uint64_t max_socket_idle_ns_trim
Definition: as_cluster.h:231
volatile bool valid
Definition: as_cluster.h:381
#define as_load_ptr(_target)
Definition: as_atomic_win.h:47
void as_cluster_destroy(as_cluster *cluster)
static void as_node_reset_error_count(as_node *node)
Definition: as_cluster.h:559
AS_EXTERN as_node * as_node_get_by_name(as_cluster *cluster, const char *name)
int * rack_ids
Definition: as_cluster.h:237
as_vector * gc
Definition: as_cluster.h:118
as_status as_cluster_validate_size(as_cluster *cluster, as_error *err, uint32_t *size)
AS_EXTERN void as_cluster_remove_seed(as_cluster *cluster, const char *hostname, uint16_t port)
static uint32_t as_node_get_error_count(as_node *node)
Definition: as_cluster.h:569
static as_nodes * as_nodes_reserve(as_cluster *cluster)
Definition: as_cluster.h:416
bool as_cluster_is_connected(as_cluster *cluster)
#define as_fence_acq()
#define as_incr_uint32(_target)
uint32_t error_count
Definition: as_node.h:323
AS_EXTERN as_node * as_node_get_random(as_cluster *cluster)
void as_cluster_change_password(as_cluster *cluster, const char *user, const char *password, const char *password_hash)
bool rack_aware
Definition: as_cluster.h:357
as_event_state * event_state
Definition: as_cluster.h:163
void * event_callback_udata
Definition: as_cluster.h:158
static void as_nodes_release(as_nodes *nodes)
Definition: as_cluster.h:433
static void as_node_close_conn_error(as_node *node, as_socket *sock, as_conn_pool *pool)
Definition: as_cluster.h:590
uint32_t error_rate_window
Definition: as_cluster.h:255
as_node * as_partition_shm_get_node(as_cluster *cluster, const char *ns, struct as_partition_shm_s *partition, as_node *prev_node, as_policy_replica replica, uint8_t replica_size, uint8_t *replica_index)