All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
aerospike_query.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2023 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
19 /**
20  * @defgroup query_operations Query Operations
21  * @ingroup client_operations
22  *
23  * The Aerospike Query Operations provide the ability to query data in the
24  * Aerospike database. The queries can optionally be performed on secondary indexes,
25  * which have been created in the database.
26  *
27  * ## Usage
28  *
29  * Before you can execute a query, you first need to build a query using
30  * as_query. See as_query for details on building queries.
31  *
32  * Once you have a query defined, then you can execute the query.
33  * When aerospike_query_foreach() is executed, it will process the results
34  * and create records on the stack. Because the records are on the stack,
35  * they will only be available within the context of the callback function.
36  *
37  * ## Walk-through
38  *
39  * First, we define a query using as_query. The query will be for the "test"
40  * namespace and "demo" set. We will add a where predicate on "bin2", on which
41  * we have already created a secondary index.
42  *
43  * ~~~~~~~~~~{.c}
44  * as_query query;
45  * as_query_init(&query, "test", "demo");
46  *
47  * as_query_where_init(&query, 1);
48  * as_query_where(&query, "bin2", as_integer_equals(100));
49  * ~~~~~~~~~~
50  *
51  * Now that we have a query defined, we want to execute it using
52  * aerospike_query_foreach().
53  *
54  * ~~~~~~~~~~{.c}
55  * if (aerospike_query_foreach(&as, &err, NULL, &query, callback, NULL) != AEROSPIKE_OK) {
56  * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
57  * }
58  * ~~~~~~~~~~
59  *
60  * The callback provided to the function above is implemented as:
61  *
62  * ~~~~~~~~~~{.c}
63  * bool callback(const as_val* val, void* udata)
64  * {
65  * if (!val) {
66  * return false; // Query complete.
67  * }
68  *
69  * as_record* rec = as_record_fromval(val);
70  * // Process record
71  * // Do not call as_record_destroy() because the calling function will do that for you.
72  * return true;
73  * }
74  * ~~~~~~~~~~
75  *
76  * When you are finished with the query, you should destroy the resources allocated to it.
77  *
78  * ~~~~~~~~~~{.c}
79  * as_query_destroy(&query);
80  * ~~~~~~~~~~
81  */
82 
83 #include <aerospike/aerospike.h>
84 #include <aerospike/as_error.h>
85 #include <aerospike/as_event.h>
86 #include <aerospike/as_job.h>
87 #include <aerospike/as_policy.h>
88 #include <aerospike/as_query.h>
89 #include <aerospike/as_record.h>
90 #include <aerospike/as_status.h>
91 #include <aerospike/as_stream.h>
92 
93 #ifdef __cplusplus
94 extern "C" {
95 #endif
96 
97 /******************************************************************************
98  * TYPES
99  *****************************************************************************/
100 
101 /**
102  * This callback will be called for each value or record returned from a query.
103  * Multiple threads will likely be calling this callback in parallel. Therefore,
104  * your callback implementation should be thread safe.
105  *
106  * A regular foreground query always returns as_record instances:
107  *
108  * ~~~~~~~~~~{.c}
109  * bool callback(const as_val* val, void* udata)
110  * {
111  * if (!val) {
112  * return false; // Query complete.
113  * }
114  *
115  * as_record* rec = as_record_fromval(val);
116  * // Process record
117  * // Do not call as_record_destroy() because the calling function will do that for you.
118  * return true;
119  * }
120  * ~~~~~~~~~~
121  *
122  * An aggregation query using a UDF returns as_val instances. The as_val type depends on
123  * what the UDF returns:
124  *
125  * ~~~~~~~~~~{.c}
126  * bool callback(const as_val* val, void* udata)
127  * {
128  * if (!val) {
129  * return false; // Query complete.
130  * }
131  *
132  * // Ensure UDF returned val is the expected type:
133  * as_integer* i = as_integer_fromval(val);
134  *
135  * if (!i) {
136  * return false;
137  * }
138  *
139  * // Process integer
140  * return true;
141  * }
142  * ~~~~~~~~~~
143  *
144  * @param val The value received from the query.
145  * @param udata User-data provided to the calling function.
146  *
147  * @return `true` to continue to the next value. Otherwise, iteration will end.
148  * @ingroup query_operations
149  */
150 typedef bool (*aerospike_query_foreach_callback)(const as_val* val, void* udata);
151 
152 /**
153  * Asynchronous query user callback. This function is called for each record returned.
154  * This function is also called once when the query completes or an error has occurred.
155  *
156  * @param err This error structure is only populated on command failure. NULL on success.
157  * @param record Returned record. The record will be NULL on final query completion or query
158  * error.
159  * @param udata User data that is forwarded from asynchronous command function.
160  * @param event_loop Event loop that this command was executed on. Use this event loop when
161  * running nested asynchronous commands when single threaded behavior is
162  * desired for the group of commands.
163  *
164  * @return `true` to continue to the next value. Otherwise, the query will end.
165  * @ingroup query_operations
166  */
168  as_error* err, as_record* record, void* udata, as_event_loop* event_loop
169  );
170 
171 /******************************************************************************
172  * FUNCTIONS
173  *****************************************************************************/
174 
175 /**
176  * Execute a query and call the callback function for each result item.
177  * Multiple threads will likely be calling the callback in parallel. Therefore,
178  * your callback implementation should be thread safe.
179  *
180  * ~~~~~~~~~~{.c}
181  * bool callback(const as_val* val, void* udata)
182  * {
183  * if (!val) {
184  * return false; // Query complete.
185  * }
186  *
187  * as_record* rec = as_record_fromval(val);
188  * // Process record
189  * // Do not call as_record_destroy() because the calling function will do that for you.
190  * return true;
191  * }
192  *
193  * as_query query;
194  * as_query_init(&query, "test", "demo");
195  * as_query_select(&query, "bin1");
196  * as_query_where(&query, "bin2", as_integer_equals(100));
197  *
198  * if (aerospike_query_foreach(&as, &err, NULL, &query, callback, NULL) != AEROSPIKE_OK) {
199  * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
200  * }
201  * as_query_destroy(&query);
202  * ~~~~~~~~~~
203  *
204  * @param as Aerospike cluster instance.
205  * @param err Error detail structure that is populated if an error occurs.
206  * @param policy Query policy configuration parameters, pass in NULL for default.
207  * @param query Query definition.
208  * @param callback Query callback function called for each result value.
209  * @param udata User-data to be passed to the callback.
210  *
211  * @return AEROSPIKE_OK on success, otherwise an error.
212  * @ingroup query_operations
213  */
216  aerospike* as, as_error* err, const as_policy_query* policy, as_query* query,
217  aerospike_query_foreach_callback callback, void* udata
218  );
219 
220 /**
221  * Query records with a partition filter. Multiple threads will likely be calling the callback
222  * in parallel. Therefore, your callback implementation should be thread safe.
223  * Requires server version 6.0+.
224  *
225  * ~~~~~~~~~~{.c}
226  * bool callback(const as_val* val, void* udata)
227  * {
228  * if (!val) {
229  * return false; // Query complete.
230  * }
231  *
232  * as_record* rec = as_record_fromval(val);
233  * // Process record
234  * // Do not call as_record_destroy() because the calling function will do that for you.
235  * return true;
236  * }
237  *
238  * as_query query;
239  * as_query_init(&query, "test", "demo");
240  * as_query_select(&query, "bin1");
241  * as_query_where(&query, "bin2", as_integer_equals(100));
242  *
243  * as_partition_filter pf;
244  * as_partition_filter_set_range(&pf, 0, 1024);
245  *
246  * if (aerospike_query_partitions(&as, &err, NULL, &query, &pf, callback, NULL) != AEROSPIKE_OK) {
247  * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
248  * }
249  * as_query_destroy(&query);
250  * ~~~~~~~~~~
251  *
252  * @param as Aerospike cluster instance.
253  * @param err Error detail structure that is populated if an error occurs.
254  * @param policy Query policy configuration parameters, pass in NULL for default.
255  * @param query Query definition.
256  * @param pf Partition filter.
257  * @param callback Query callback function called for each result value.
258  * @param udata User-data to be passed to the callback.
259  *
260  * @return AEROSPIKE_OK on success. Otherwise an error occurred.
261  * @ingroup query_operations
262  */
265  aerospike* as, as_error* err, const as_policy_query* policy, as_query* query,
266  as_partition_filter* pf, aerospike_query_foreach_callback callback, void* udata
267  );
268 
269 /**
270  * Asynchronously execute a query and call the listener function for each result item.
271  * Standard queries are supported, but aggregation queries are not supported in async mode.
272  *
273  * ~~~~~~~~~~{.c}
274  * bool my_listener(as_error* err, as_record* record, void* udata, as_event_loop* event_loop)
275  * {
276  * if (err) {
277  * printf("Query failed: %d %s\n", err->code, err->message);
278  * return false;
279  * }
280  *
281  * if (! record) {
282  * printf("Query ended\n");
283  * return false;
284  * }
285  *
286  * // Process record
287  * // Do not call as_record_destroy() because the calling function will do that for you.
288  * return true;
289  * }
290  *
291  * as_query query;
292  * as_query_init(&query, "test", "demo");
293  * as_query_select(&query, "bin1");
294  * as_query_where(&query, "bin2", as_integer_equals(100));
295  *
296  * if (aerospike_query_async(&as, &err, NULL, &query, my_listener, NULL, NULL) != AEROSPIKE_OK) {
297  * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
298  * }
299  * as_query_destroy(&query);
300  * ~~~~~~~~~~
301  *
302  * @param as Aerospike cluster instance.
303  * @param err Error detail structure that is populated if an error occurs.
304  * @param policy Query policy configuration parameters, pass in NULL for default.
305  * @param query Query definition.
306  * @param listener The function to be called for each returned value.
307  * @param udata User-data to be passed to the callback.
308  * @param event_loop Event loop assigned to run this command. If NULL, an event loop will be
309  * chosen by round-robin.
310  *
311  * @return AEROSPIKE_OK if async query succesfully queued. Otherwise an error.
312  * @ingroup query_operations
313  */
316  aerospike* as, as_error* err, const as_policy_query* policy, as_query* query,
317  as_async_query_record_listener listener, void* udata, as_event_loop* event_loop
318  );
319 
320 /**
321  * Asynchronously query records with a partition filter. Standard queries are supported, but
322  * aggregation queries are not supported in async mode.
323  * Requires server version 6.0+.
324  *
325  * ~~~~~~~~~~{.c}
326  * bool my_listener(as_error* err, as_record* record, void* udata, as_event_loop* event_loop)
327  * {
328  * if (err) {
329  * printf("Query failed: %d %s\n", err->code, err->message);
330  * return false;
331  * }
332  *
333  * if (! record) {
334  * printf("Query ended\n");
335  * return false;
336  * }
337  *
338  * // Process record
339  * // Do not call as_record_destroy() because the calling function will do that for you.
340  * return true;
341  * }
342  *
343  * as_query query;
344  * as_query_init(&query, "test", "demo");
345  * as_query_select(&query, "bin1");
346  * as_query_where(&query, "bin2", as_integer_equals(100));
347  *
348  * as_partition_filter pf;
349  * as_partition_filter_set_range(&pf, 0, 1024);
350  *
351  * if (aerospike_query_partitions_async(&as, &err, NULL, &query, &pf, my_listener, NULL, NULL)
352  * != AEROSPIKE_OK) {
353  * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
354  * }
355  * as_query_destroy(&query);
356  * ~~~~~~~~~~
357  *
358  * @param as Aerospike cluster instance.
359  * @param err Error detail structure that is populated if an error occurs.
360  * @param policy Query policy configuration parameters, pass in NULL for default.
361  * @param query Query definition.
362  * @param pf Partition filter.
363  * @param listener The function to be called for each returned value.
364  * @param udata User-data to be passed to the callback.
365  * @param event_loop Event loop assigned to run this command. If NULL, an event loop will be
366  * chosen by round-robin.
367  *
368  * @return AEROSPIKE_OK if async query succesfully queued. Otherwise an error.
369  * @ingroup query_operations
370  */
373  aerospike* as, as_error* err, const as_policy_query* policy, as_query* query,
374  as_partition_filter* pf, as_async_query_record_listener listener, void* udata,
375  as_event_loop* event_loop
376  );
377 
378 /**
379  * Apply user defined function on records that match the query filter. Records are not returned to
380  * the client. This asynchronous server call will return before the command is complete. The user
381  * can optionally wait for command completion.
382  *
383  * ~~~~~~~~~~{.c}
384  * as_query query;
385  * as_query_init(&query, "test", "demo");
386  * as_query_select(&query, "bin1");
387  * as_query_where(&query, "bin2", as_integer_equals(100));
388  * as_query_apply(&query, "my_lua.lua", "my_lua_function", NULL);
389  * uint64_t query_id = 0;
390  *
391  * if (aerospike_query_background(&as, &err, NULL, &query, &query_id) == AEROSPIKE_OK) {
392  * aerospike_query_wait(as, &err, NULL, &query, query_id, 0);
393  * }
394  * else {
395  * fprintf(stderr, "error(%d) %s at [%s:%d]", err.code, err.message, err.file, err.line);
396  * }
397  * as_query_destroy(&query);
398  * ~~~~~~~~~~
399  *
400  * @param as Aerospike cluster instance.
401  * @param err Error detail structure that is populated if an error occurs.
402  * @param policy Write configuration parameters, pass in NULL for default.
403  * @param query The query to execute against the cluster.
404  * @param query_id The id for the query job, which can be used for obtaining query status.
405  *
406  * @return AEROSPIKE_OK on success, otherwise an error.
407  * @ingroup query_operations
408  */
411  aerospike* as, as_error* err, const as_policy_write* policy,
412  const as_query* query, uint64_t* query_id
413  );
414 
415 /**
416  * Wait for a background query to be completed by servers.
417  *
418  * @param as Aerospike cluster instance.
419  * @param err Error detail structure that is populated if an error occurs.
420  * @param policy Info configuration parameters, pass in NULL for default.
421  * @param query The query that was executed against the cluster.
422  * @param query_id The id for the query job, which can be used for obtaining query status.
423  * @param interval_ms Polling interval in milliseconds. If zero, 1000 ms is used.
424  *
425  * @return AEROSPIKE_OK on success, otherwise an error.
426  * @ingroup query_operations
427  */
428 static inline as_status
430  aerospike* as, as_error* err, const as_policy_info* policy,
431  const as_query* query, uint64_t query_id, uint32_t interval_ms
432  )
433 {
434  const char* module = (query->where.size > 0)? "query" : "scan";
435  return aerospike_job_wait(as, err, policy, module, query_id, interval_ms);
436 }
437 
438 /**
439  * Check the progress of a background query running on the database.
440  *
441  * @param as Aerospike cluster instance.
442  * @param err Error detail structure that is populated if an error occurs.
443  * @param policy Info configuration parameters, pass in NULL for default.
444  * @param query The query that was executed against the cluster.
445  * @param query_id The id for the query job, which can be used for obtaining query status.
446  * @param info Information about this background query, to be populated by this operation.
447  *
448  * @return AEROSPIKE_OK on success, otherwise an error.
449  * @ingroup query_operations
450  */
451 static inline as_status
453  aerospike* as, as_error* err, const as_policy_info* policy,
454  const as_query* query, uint64_t query_id, as_job_info* info
455  )
456 {
457  const char* module = (query->where.size > 0)? "query" : "scan";
458  return aerospike_job_info(as, err, policy, module, query_id, false, info);
459 }
460 
461 #ifdef __cplusplus
462 } // end extern "C"
463 #endif
AS_EXTERN as_status aerospike_query_background(aerospike *as, as_error *err, const as_policy_write *policy, const as_query *query, uint64_t *query_id)
as_status
Definition: as_status.h:30
AS_EXTERN as_status aerospike_query_async(aerospike *as, as_error *err, const as_policy_query *policy, as_query *query, as_async_query_record_listener listener, void *udata, as_event_loop *event_loop)
AS_EXTERN as_status aerospike_query_partitions(aerospike *as, as_error *err, const as_policy_query *policy, as_query *query, as_partition_filter *pf, aerospike_query_foreach_callback callback, void *udata)
Definition: as_val.h:61
bool(* as_async_query_record_listener)(as_error *err, as_record *record, void *udata, as_event_loop *event_loop)
#define AS_EXTERN
Definition: as_std.h:25
AS_EXTERN as_status aerospike_query_foreach(aerospike *as, as_error *err, const as_policy_query *policy, as_query *query, aerospike_query_foreach_callback callback, void *udata)
AS_EXTERN as_status aerospike_job_info(aerospike *as, as_error *err, const as_policy_info *policy, const char *module, uint64_t job_id, bool stop_if_in_progress, as_job_info *info)
static as_status aerospike_query_wait(aerospike *as, as_error *err, const as_policy_info *policy, const as_query *query, uint64_t query_id, uint32_t interval_ms)
AS_EXTERN as_status aerospike_job_wait(aerospike *as, as_error *err, const as_policy_info *policy, const char *module, uint64_t job_id, uint32_t interval_ms)
as_query_predicates where
Definition: as_query.h:508
bool(* aerospike_query_foreach_callback)(const as_val *val, void *udata)
AS_EXTERN as_status aerospike_query_partitions_async(aerospike *as, as_error *err, const as_policy_query *policy, as_query *query, as_partition_filter *pf, as_async_query_record_listener listener, void *udata, as_event_loop *event_loop)
static as_status aerospike_query_info(aerospike *as, as_error *err, const as_policy_info *policy, const as_query *query, uint64_t query_id, as_job_info *info)