All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
as_partition_tracker.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2024 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
19 #include <aerospike/as_std.h>
20 #include <aerospike/as_partition.h>
22 #include <aerospike/as_policy.h>
23 #include <aerospike/as_vector.h>
24 #include <pthread.h>
25 
26 #ifdef __cplusplus
27 extern "C" {
28 #endif
29 
30 /******************************************************************************
31  * TYPES
32  *****************************************************************************/
33 struct as_node_s;
34 struct as_cluster_s;
35 struct as_error_s;
36 
37 /**
38  * @private
39  * List of partitions assigned to a node.
40  */
41 typedef struct as_node_partitions_s {
42  struct as_node_s* node;
45  uint64_t record_count;
46  uint64_t record_max;
48  bool retry;
50 
51 /**
52  * @private
53  * Scan/Query partition tracker.
54  */
55 typedef struct as_partition_tracker_s {
56  pthread_mutex_t lock;
58  uint32_t node_capacity;
59  struct as_node_s* node_filter;
62  uint64_t max_records;
63  uint64_t record_count;
64  uint64_t deadline;
66  uint32_t parts_capacity;
68  uint32_t socket_timeout;
69  uint32_t total_timeout;
70  uint32_t max_retries;
71  uint32_t iteration;
72  bool check_max;
74 
75 /******************************************************************************
76  * FUNCTIONS
77  ******************************************************************************/
78 
79 void
81  as_partition_tracker* pt, struct as_cluster_s* cluster, const as_policy_base* policy,
82  uint64_t max_records, as_policy_replica replica, as_partitions_status** parts_all,
83  bool paginate, uint32_t cluster_size
84  );
85 
86 void
88  as_partition_tracker* pt, struct as_cluster_s* cluster, const as_policy_base* policy,
89  uint64_t max_records, as_policy_replica replica, as_partitions_status** parts_all, bool paginate, struct as_node_s* node
90  );
91 
94  as_partition_tracker* pt, struct as_cluster_s* cluster, const as_policy_base* policy,
95  uint64_t max_records, as_policy_replica replica, as_partitions_status** parts_all,
96  bool paginate, uint32_t cluster_size, as_partition_filter* pf, struct as_error_s* err
97  );
98 
101  as_partition_tracker* pt, struct as_cluster_s* cluster, const char* ns, struct as_error_s* err
102  );
103 
104 void
106  as_partition_tracker* pt, as_node_partitions* np, uint32_t part_id
107  );
108 
109 static inline void
111  as_partition_tracker* pt, as_node_partitions* np, as_digest* digest, uint32_t n_partitions
112  )
113 {
114  uint32_t part_id = as_partition_getid(digest->value, n_partitions);
116  ps->parts[part_id - ps->part_begin].digest = *digest;
117  np->record_count++;
118 }
119 
120 static inline void
122  as_partition_tracker* pt, as_node_partitions* np, as_digest* digest, uint64_t bval,
123  uint32_t n_partitions
124  )
125 {
126  uint32_t part_id = as_partition_getid(digest->value, n_partitions);
128  as_partition_status* p = &ps->parts[part_id - ps->part_begin];
129  p->digest = *digest;
130  p->bval = bval;
131  np->record_count++;
132 }
133 
134 static inline bool
136 {
137  // Sync scan/query runs in multiple threads, so atomics are required.
138  if (pt && pt->check_max && (as_aaf_uint64(&pt->record_count, 1) > pt->max_records)) {
139  // Record was returned, but would exceed max_records. Discard record
140  // and mark node for retry on next scan/query page.
141  np->retry = true;
142  return true;
143  }
144  return false;
145 }
146 
147 static inline bool
149 {
150  // Async scan/query runs in a single event loop thread, so atomics are not necessary.
151  if (pt && pt->check_max && (++pt->record_count > pt->max_records)) {
152  // Record was returned, but would exceed max_records. Discard record
153  // and mark node for retry on next scan/query page.
154  np->retry = true;
155  return true;
156  }
157  return false;
158 }
159 
160 static inline uint16_t
162 {
163  return *(uint16_t*)as_vector_get(list, index);
164 }
165 
166 static inline as_partition_status*
168 {
169  uint16_t part_id = *(uint16_t*)as_vector_get(list, index);
171  return &ps->parts[part_id - ps->part_begin];
172 }
173 
174 as_status
176  as_partition_tracker* pt, struct as_cluster_s* cluster, struct as_error_s* err
177  );
178 
179 bool
182  );
183 
184 void
186 
187 static inline void
189 {
190  // Mark all partitions for retry on fatal errors.
191  if (parts_all) {
192  parts_all->retry = true;
193  }
194 }
195 
196 #ifdef __cplusplus
197 } // end extern "C"
198 #endif
as_policy_replica replica
as_status as_partition_tracker_init_filter(as_partition_tracker *pt, struct as_cluster_s *cluster, const as_policy_base *policy, uint64_t max_records, as_policy_replica replica, as_partitions_status **parts_all, bool paginate, uint32_t cluster_size, as_partition_filter *pf, struct as_error_s *err)
as_namespace ns
Definition: as_scan.h:267
as_policy_replica
Definition: as_policy.h:272
static void as_partition_tracker_set_last(as_partition_tracker *pt, as_node_partitions *np, as_digest *digest, uint64_t bval, uint32_t n_partitions)
as_status
Definition: as_status.h:30
#define as_aaf_uint64(_target, _value)
bool as_partition_tracker_should_retry(as_partition_tracker *pt, as_node_partitions *np, as_status status)
as_digest_value value
Definition: as_key.h:96
void as_partition_tracker_part_unavailable(as_partition_tracker *pt, as_node_partitions *np, uint32_t part_id)
static void as_partition_error(as_partitions_status *parts_all)
static void * as_vector_get(as_vector *vector, uint32_t index)
Definition: as_vector.h:112
static bool as_partition_tracker_reached_max_records_sync(as_partition_tracker *pt, as_node_partitions *np)
as_status as_partition_tracker_assign(as_partition_tracker *pt, struct as_cluster_s *cluster, const char *ns, struct as_error_s *err)
as_partition_status parts[]
static bool as_partition_tracker_reached_max_records_async(as_partition_tracker *pt, as_node_partitions *np)
void as_partition_tracker_init_node(as_partition_tracker *pt, struct as_cluster_s *cluster, const as_policy_base *policy, uint64_t max_records, as_policy_replica replica, as_partitions_status **parts_all, bool paginate, struct as_node_s *node)
as_partitions_status * parts_all
void as_partition_tracker_init_nodes(as_partition_tracker *pt, struct as_cluster_s *cluster, const as_policy_base *policy, uint64_t max_records, as_policy_replica replica, as_partitions_status **parts_all, bool paginate, uint32_t cluster_size)
as_status as_partition_tracker_is_complete(as_partition_tracker *pt, struct as_cluster_s *cluster, struct as_error_s *err)
struct as_node_s * node_filter
static uint16_t as_partition_tracker_get_id(as_vector *list, uint32_t index)
struct as_node_s * node
static uint32_t as_partition_getid(const uint8_t *digest, uint32_t n_partitions)
Definition: as_partition.h:121
void as_partition_tracker_destroy(as_partition_tracker *pt)
static as_partition_status * as_partition_tracker_get_status(as_partition_tracker *pt, as_vector *list, uint32_t index)
static void as_partition_tracker_set_digest(as_partition_tracker *pt, as_node_partitions *np, as_digest *digest, uint32_t n_partitions)