All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
aerospike_scan.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2023 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
19 /**
20  * @defgroup scan_operations Scan Operations
21  * @ingroup client_operations
22  *
23  * Aerospike Scan Operations provide the ability to scan all record of a namespace and set
24  * in an Aerospike database.
25  *
26  * ## Usage
27  *
28  * Before you can execute a scan, you first need to define a scan using as_scan. See as_scan
29  * for details on defining scans.
30  *
31  * Once you have a scan defined, then you can execute the scan using either:
32  *
33  * - aerospike_scan_foreach() — Execute a scan on the database, then process the results.
34  * - aerospike_scan_background() — Send a scan to the database, and not wait for completed.
35  * The scan is given an id, which can be used to query the scan status.
36  *
37  * When aerospike_scan_foreach() is executed, it will process the results and create records
38  * on the stack. Because the records are on the stack, they will only be available within the
39  * context of the callback function.
40  *
41  * When aerospike_scan_background() is executed, the client will not wait for results from the
42  * database. Instead, the client will be given a scan_id, which can be used to query the scan
43  * status on the database via aerospike_scan_info().
44  *
45  * ## Walk-through
46  *
47  * First, we build a scan using as_scan. The scan will be on the "test" namespace and "demo" set.
48  * We will select only bins "a" and "b" to be returned for each record.
49  *
50  * ~~~~~~~~~~{.c}
51  * as_scan scan;
52  * as_scan_init(&scan, "test", "demo");
53  *
54  * as_scan_select_inita(&scan, 2);
55  * as_scan_select(&scan, "a");
56  * as_scan_select(&scan, "B");
57  * ~~~~~~~~~~
58  *
59  * Now that we have a scan defined, we want to execute it using
60  * aerospike_scan_foreach().
61  *
62  * ~~~~~~~~~~{.c}
63  * if (aerospike_scan_foreach(&as, &err, NULL, &scan, callback, NULL) != AEROSPIKE_OK) {
64  * printf("error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
65  * }
66  * ~~~~~~~~~~
67  *
68  * The callback provided to the function above is implemented as:
69  *
70  * ~~~~~~~~~~{.c}
71  * bool callback(const as_val* val, void* udata)
72  * {
73  * if (!val) {
74  * return false; // Scan complete.
75  * }
76  *
77  * as_record* rec = as_record_fromval(val);
78  * // Process record
79  * // Do not call as_record_destroy() because the calling function will do that for you.
80  * return true;
81  * }
82  * ~~~~~~~~~~
83  *
84  * When you are finished with the scan, you should destroy the resources allocated to it:
85  *
86  * ~~~~~~~~~~{.c}
87  * as_scan_destroy(&scan);
88  * ~~~~~~~~~~
89  */
90 
91 #include <aerospike/aerospike.h>
92 #include <aerospike/as_listener.h>
93 #include <aerospike/as_error.h>
95 #include <aerospike/as_policy.h>
96 #include <aerospike/as_record.h>
97 #include <aerospike/as_scan.h>
98 #include <aerospike/as_status.h>
99 #include <aerospike/as_val.h>
100 
101 #ifdef __cplusplus
102 extern "C" {
103 #endif
104 
105 /******************************************************************************
106  * TYPES
107  *****************************************************************************/
108 
109 /**
110  * This callback will be called for each value or record returned from a synchronous scan.
111  * Multiple threads will likely be calling this callback in parallel. Therefore,
112  * your callback implementation should be thread safe.
113  *
114  * @param val The value received from the query.
115  * @param udata User-data provided to the calling function.
116  *
117  * @return `true` to continue to the next value. Otherwise, the scan will end.
118  *
119  * @ingroup scan_operations
120  */
121 typedef bool (*aerospike_scan_foreach_callback)(const as_val* val, void* udata);
122 
123 /**
124  * Asynchronous scan user callback. This function is called for each record returned.
125  * This function is also called once when the scan completes or an error has occurred.
126  *
127  * @param err This error structure is only populated when the command fails.
128  * NULL on success.
129  * @param record Returned record. The record will be NULL on final scan completion or scan
130  * error.
131  * @param udata User data that is forwarded from asynchronous command function.
132  * @param event_loop Event loop that this command was executed on. Use this event loop when
133  * running nested asynchronous commands when single threaded behavior is
134  * desired for the group of commands.
135  *
136  * @return `true` to continue to the next value. Otherwise, the scan will end.
137  *
138  * @ingroup scan_operations
139  */
140 typedef bool (*as_async_scan_listener)(
141  as_error* err, as_record* record, void* udata, as_event_loop* event_loop
142  );
143 
144 /******************************************************************************
145  * FUNCTIONS
146  *****************************************************************************/
147 
148 /**
149  * Scan the records in the specified namespace and set in the cluster.
150  *
151  * Scan will be run in the background by a thread on client side.
152  * No callback will be called in this case.
153  *
154  * ~~~~~~~~~~{.c}
155  * as_scan scan;
156  * as_scan_init(&scan, "test", "demo");
157  *
158  * uint64_t scanid = 0;
159  *
160  * if (aerospike_scan_background(&as, &err, NULL, &scan, &scanid) != AEROSPIKE_OK) {
161  * printf("error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
162  * }
163  * else {
164  * printf("Running background scan job: %ll", scanid);
165  * }
166  * as_scan_destroy(&scan);
167  * ~~~~~~~~~~
168  *
169  * The scanid can be used to query the status of the scan running in the
170  * database via aerospike_scan_info().
171  *
172  * @param as The aerospike instance to use for this operation.
173  * @param err The as_error to be populated if an error occurs.
174  * @param policy Scan policy configuration parameters, pass in NULL for default.
175  * @param scan The scan to execute against the cluster.
176  * @param scan_id The id for the scan job, which can be used for obtaining scan status.
177  *
178  * @return AEROSPIKE_OK on success. Otherwise an error occurred.
179  *
180  * @ingroup scan_operations
181  */
184  aerospike* as, as_error* err, const as_policy_scan* policy, const as_scan* scan,
185  uint64_t* scan_id
186  );
187 
188 /**
189  * Wait for a background scan to be completed by servers.
190  *
191  * ~~~~~~~~~~{.c}
192  * uint64_t scan_id = 1234;
193  * aerospike_scan_wait(&as, &err, NULL, scan_id, 0);
194  * ~~~~~~~~~~
195  *
196  * @param as The aerospike instance to use for this operation.
197  * @param err The as_error to be populated if an error occurs.
198  * @param policy Scan policy configuration parameters, pass in NULL for default.
199  * @param scan_id The id for the scan job.
200  * @param interval_ms The polling interval in milliseconds. If zero, 1000 ms is used.
201  *
202  * @return AEROSPIKE_OK on success. Otherwise an error occurred.
203  */
206  aerospike* as, as_error* err, const as_policy_info* policy, uint64_t scan_id,
207  uint32_t interval_ms
208  );
209 
210 /**
211  * Check the progress of a background scan running on the database. The status
212  * of the scan running on the datatabse will be populated into an as_scan_info.
213  *
214  * ~~~~~~~~~~{.c}
215  * uint64_t scan_id = 1234;
216  * as_scan_info scan_info;
217  *
218  * if (aerospike_scan_info(&as, &err, NULL, &scan, scan_id, &scan_info) != AEROSPIKE_OK) {
219  * printf("error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
220  * }
221  * else {
222  * printf("Scan id=%ll, status=%d percent=%d", scan_id, scan_info.status, scan_info.progress_pct);
223  * }
224  * ~~~~~~~~~~
225  *
226  * @param as The aerospike instance to use for this operation.
227  * @param err The as_error to be populated if an error occurs.
228  * @param policy Scan policy configuration parameters, pass in NULL for default.
229  * @param scan_id The id for the scan job to check the status of.
230  * @param info Information about this scan, to be populated by this operation.
231  *
232  * @return AEROSPIKE_OK on success. Otherwise an error occurred.
233  *
234  * @ingroup scan_operations
235  */
238  aerospike* as, as_error* err, const as_policy_info* policy, uint64_t scan_id, as_scan_info* info
239  );
240 
241 /**
242  * Scan the records in the specified namespace and set in the cluster.
243  *
244  * Call the callback function for each record scanned. When all records have
245  * been scanned, then callback will be called with a NULL value for the record.
246  *
247  * If "scan.concurrent" is true (default false), the callback code must be thread-safe.
248  *
249  * ~~~~~~~~~~{.c}
250  * bool callback(const as_val* val, void* udata)
251  * {
252  * if (!val) {
253  * return false; // Scan complete.
254  * }
255  *
256  * as_record* rec = as_record_fromval(val);
257  * // Process record
258  * // Do not call as_record_destroy() because the calling function will do that for you.
259  * return true;
260  * }
261  *
262  * as_scan scan;
263  * as_scan_init(&scan, "test", "demo");
264  *
265  * if (aerospike_scan_foreach(&as, &err, NULL, &scan, callback, NULL) != AEROSPIKE_OK) {
266  * printf("error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
267  * }
268  * as_scan_destroy(&scan);
269  * ~~~~~~~~~~
270  *
271  * @param as The aerospike instance to use for this operation.
272  * @param err The as_error to be populated if an error occurs.
273  * @param policy Scan policy configuration parameters, pass in NULL for default.
274  * @param scan The scan to execute against the cluster.
275  * @param callback The function to be called for each record scanned.
276  * @param udata User-data to be passed to the callback.
277  *
278  * @return AEROSPIKE_OK on success. Otherwise an error occurred.
279  *
280  * @ingroup scan_operations
281  */
284  aerospike* as, as_error* err, const as_policy_scan* policy, as_scan* scan,
285  aerospike_scan_foreach_callback callback, void* udata
286  );
287 
288 /**
289  * Scan the records in the specified namespace and set for a single node.
290  *
291  * The callback function will be called for each record scanned. When all records have
292  * been scanned, then callback will be called with a NULL value for the record.
293  *
294  * ~~~~~~~~~~{.c}
295  * bool callback(const as_val* val, void* udata)
296  * {
297  * if (!val) {
298  * return false; // Scan complete.
299  * }
300  *
301  * as_record* rec = as_record_fromval(val);
302  * // Process record
303  * // Do not call as_record_destroy() because the calling function will do that for you.
304  * return true;
305  * }
306  *
307  * char* node_names = NULL;
308  * int n_nodes = 0;
309  * as_cluster_get_node_names(as->cluster, &n_nodes, &node_names);
310  *
311  * if (n_nodes <= 0)
312  * return <error>;
313  *
314  * as_scan scan;
315  * as_scan_init(&scan, "test", "demo");
316  *
317  * if (aerospike_scan_node(&as, &err, NULL, &scan, node_names[0], callback, NULL) != AEROSPIKE_OK ) {
318  * printf("error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
319  * }
320  *
321  * free(node_names);
322  * as_scan_destroy(&scan);
323  * ~~~~~~~~~~
324  *
325  * @param as The aerospike instance to use for this operation.
326  * @param err The as_error to be populated if an error occurs.
327  * @param policy Scan policy configuration parameters, pass in NULL for default.
328  * @param scan The scan to execute against the cluster.
329  * @param node_name The node name to scan.
330  * @param callback The function to be called for each record scanned.
331  * @param udata User-data to be passed to the callback.
332  *
333  * @return AEROSPIKE_OK on success. Otherwise an error occurred.
334  *
335  * @ingroup scan_operations
336  */
339  aerospike* as, as_error* err, const as_policy_scan* policy, as_scan* scan,
340  const char* node_name, aerospike_scan_foreach_callback callback, void* udata
341  );
342 
343 /**
344  * Scan records in specified namespace, set and partition filter.
345  *
346  * Call the callback function for each record scanned. When all records have
347  * been scanned, then callback will be called with a NULL value for the record.
348  *
349  * If "scan.concurrent" is true (default false), the callback code must be thread-safe.
350  *
351  * ~~~~~~~~~~{.c}
352  * bool callback(const as_val* val, void* udata)
353  * {
354  * if (!val) {
355  * return false; // Scan complete.
356  * }
357  *
358  * as_record* rec = as_record_fromval(val);
359  * // Process record
360  * // Do not call as_record_destroy() because the calling function will do that for you.
361  * return true;
362  * }
363  *
364  * as_scan scan;
365  * as_scan_init(&scan, "test", "demo");
366  *
367  * as_partition_filter pf;
368  * as_partition_filter_set_range(&pf, 0, 1024);
369  *
370  * if (aerospike_scan_partitions(&as, &err, NULL, &scan, &pf, callback, NULL) != AEROSPIKE_OK) {
371  * printf("error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
372  * }
373  * as_scan_destroy(&scan);
374  * ~~~~~~~~~~
375  *
376  * @param as The aerospike instance to use for this operation.
377  * @param err The as_error to be populated if an error occurs.
378  * @param policy Scan policy configuration parameters, pass in NULL for default.
379  * @param scan The scan to execute against the cluster.
380  * @param pf Partition filter.
381  * @param callback The function to be called for each record scanned.
382  * @param udata User-data to be passed to the callback.
383  *
384  * @return AEROSPIKE_OK on success. Otherwise an error occurred.
385  *
386  * @ingroup scan_operations
387  */
390  aerospike* as, as_error* err, const as_policy_scan* policy, as_scan* scan,
391  as_partition_filter* pf, aerospike_scan_foreach_callback callback, void* udata
392  );
393 
394 /**
395  * Asynchronously scan the records in the specified namespace and set in the cluster.
396  *
397  * Call the listener function for each record scanned. When all records have
398  * been scanned, then listener will be called with a NULL value for the record.
399  *
400  * Scans of each node will be run on the same event loop, so the listener's implementation does
401  * not need to be thread safe.
402  *
403  * ~~~~~~~~~~{.c}
404  * bool my_listener(as_error* err, as_record* record, void* udata, as_event_loop* event_loop)
405  * {
406  * if (err) {
407  * printf("Scan failed: %d %s\n", err->code, err->message);
408  * return false;
409  * }
410  *
411  * if (! record) {
412  * printf("Scan ended\n");
413  * return false;
414  * }
415  *
416  * // Process record
417  * // Do not call as_record_destroy() because the calling function will do that for you.
418  * return true;
419  * }
420  *
421  * as_scan scan;
422  * as_scan_init(&scan, "test", "demo");
423  *
424  * as_status status = aerospike_scan_async(&as, &err, NULL, &scan, NULL, my_listener, NULL, NULL);
425  * as_scan_destroy(&scan);
426  * ~~~~~~~~~~
427  *
428  * @param as The aerospike instance to use for this operation.
429  * @param err The as_error to be populated if an error occurs.
430  * @param policy Scan policy configuration parameters, pass in NULL for default.
431  * @param scan The scan to execute against the cluster.
432  * @param scan_id The id for the scan job. Use NULL if the scan_id will not be used.
433  * @param listener The function to be called for each record scanned.
434  * @param udata User-data to be passed to the callback.
435  * @param event_loop Event loop assigned to run this command. If NULL, an event loop will be
436  * chosen by round-robin.
437  *
438  * @return AEROSPIKE_OK if async scan succesfully queued. Otherwise an error.
439  *
440  * @ingroup scan_operations
441  */
444  aerospike* as, as_error* err, const as_policy_scan* policy, as_scan* scan, uint64_t* scan_id,
445  as_async_scan_listener listener, void* udata, as_event_loop* event_loop
446  );
447 
448 /**
449  * Asynchronously scan the records in the specified namespace and set for a single node.
450  *
451  * The listener function will be called for each record scanned. When all records have
452  * been scanned, then callback will be called with a NULL value for the record.
453  *
454  * ~~~~~~~~~~{.c}
455  * bool my_listener(as_error* err, as_record* record, void* udata, as_event_loop* event_loop)
456  * {
457  * if (err) {
458  * printf("Scan failed: %d %s\n", err->code, err->message);
459  * return false;
460  * }
461  *
462  * if (! record) {
463  * printf("Scan ended\n");
464  * return false;
465  * }
466  *
467  * // Process record
468  * // Do not call as_record_destroy() because the calling function will do that for you.
469  * return true;
470  * }
471  *
472  * char* node_names = NULL;
473  * int n_nodes = 0;
474  * as_cluster_get_node_names(as->cluster, &n_nodes, &node_names);
475  *
476  * if (n_nodes <= 0)
477  * return <error>;
478  *
479  * as_scan scan;
480  * as_scan_init(&scan, "test", "demo");
481  *
482  * as_status status = aerospike_scan_node_async(&as, &err, NULL, &scan, NULL, node_names[0], my_listener, NULL, NULL);
483  *
484  * free(node_names);
485  * as_scan_destroy(&scan);
486  * ~~~~~~~~~~
487  *
488  * @param as The aerospike instance to use for this operation.
489  * @param err The as_error to be populated if an error occurs.
490  * @param policy Scan policy configuration parameters, pass in NULL for default.
491  * @param scan The scan to execute against the cluster.
492  * @param scan_id The id for the scan job. Use NULL if the scan_id will not be used.
493  * @param node_name The node name to scan.
494  * @param listener The function to be called for each record scanned.
495  * @param udata User-data to be passed to the callback.
496  * @param event_loop Event loop assigned to run this command. If NULL, an event loop will be
497  * chosen by round-robin.
498  *
499  * @return AEROSPIKE_OK if async scan succesfully queued. Otherwise an error.
500  *
501  * @ingroup scan_operations
502  */
505  aerospike* as, as_error* err, const as_policy_scan* policy, as_scan* scan, uint64_t* scan_id,
506  const char* node_name, as_async_scan_listener listener, void* udata, as_event_loop* event_loop
507  );
508 
509 /**
510  * Asynchronously scan records in specified namespace, set and partition filter.
511  *
512  * Call the listener function for each record scanned. When all records have
513  * been scanned, then listener will be called with a NULL value for the record.
514  *
515  * Scans of each node will be run on the same event loop, so the listener's implementation does
516  * not need to be thread safe.
517  *
518  * ~~~~~~~~~~{.c}
519  * bool my_listener(as_error* err, as_record* record, void* udata, as_event_loop* event_loop)
520  * {
521  * if (err) {
522  * printf("Scan failed: %d %s\n", err->code, err->message);
523  * return false;
524  * }
525  *
526  * if (! record) {
527  * printf("Scan ended\n");
528  * return false;
529  * }
530  *
531  * // Process record
532  * // Do not call as_record_destroy() because the calling function will do that for you.
533  * return true;
534  * }
535  *
536  * as_scan scan;
537  * as_scan_init(&scan, "test", "demo");
538  *
539  * as_partition_filter pf;
540  * as_partition_filter_set_range(&pf, 0, 1024);
541  *
542  * as_status status = aerospike_scan_partitions_async(&as, &err, NULL, &scan, &pf, my_listener, NULL, NULL);
543  * as_scan_destroy(&scan);
544  * ~~~~~~~~~~
545  *
546  * @param as The aerospike instance to use for this operation.
547  * @param err The as_error to be populated if an error occurs.
548  * @param policy Scan policy configuration parameters, pass in NULL for default.
549  * @param scan The scan to execute against the cluster.
550  * @param pf Partition filter.
551  * @param listener The function to be called for each record scanned.
552  * @param udata User-data to be passed to the callback.
553  * @param event_loop Event loop assigned to run this command. If NULL, an event loop will be
554  * chosen by round-robin.
555  *
556  * @return AEROSPIKE_OK if async scan succesfully queued. Otherwise an error.
557  *
558  * @ingroup scan_operations
559  */
562  aerospike* as, as_error* err, const as_policy_scan* policy, as_scan* scan,
563  as_partition_filter* pf, as_async_scan_listener listener, void* udata, as_event_loop* event_loop
564  );
565 
566 #ifdef __cplusplus
567 } // end extern "C"
568 #endif
AS_EXTERN as_status aerospike_scan_async(aerospike *as, as_error *err, const as_policy_scan *policy, as_scan *scan, uint64_t *scan_id, as_async_scan_listener listener, void *udata, as_event_loop *event_loop)
as_status
Definition: as_status.h:30
bool(* aerospike_scan_foreach_callback)(const as_val *val, void *udata)
AS_EXTERN as_status aerospike_scan_wait(aerospike *as, as_error *err, const as_policy_info *policy, uint64_t scan_id, uint32_t interval_ms)
AS_EXTERN as_status aerospike_scan_info(aerospike *as, as_error *err, const as_policy_info *policy, uint64_t scan_id, as_scan_info *info)
Definition: as_val.h:61
AS_EXTERN as_status aerospike_scan_partitions_async(aerospike *as, as_error *err, const as_policy_scan *policy, as_scan *scan, as_partition_filter *pf, as_async_scan_listener listener, void *udata, as_event_loop *event_loop)
#define AS_EXTERN
Definition: as_std.h:25
bool(* as_async_scan_listener)(as_error *err, as_record *record, void *udata, as_event_loop *event_loop)
AS_EXTERN as_status aerospike_scan_foreach(aerospike *as, as_error *err, const as_policy_scan *policy, as_scan *scan, aerospike_scan_foreach_callback callback, void *udata)
AS_EXTERN as_status aerospike_scan_node(aerospike *as, as_error *err, const as_policy_scan *policy, as_scan *scan, const char *node_name, aerospike_scan_foreach_callback callback, void *udata)
AS_EXTERN as_status aerospike_scan_background(aerospike *as, as_error *err, const as_policy_scan *policy, const as_scan *scan, uint64_t *scan_id)
AS_EXTERN as_status aerospike_scan_node_async(aerospike *as, as_error *err, const as_policy_scan *policy, as_scan *scan, uint64_t *scan_id, const char *node_name, as_async_scan_listener listener, void *udata, as_event_loop *event_loop)
AS_EXTERN as_status aerospike_scan_partitions(aerospike *as, as_error *err, const as_policy_scan *policy, as_scan *scan, as_partition_filter *pf, aerospike_scan_foreach_callback callback, void *udata)