All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
as_async.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2023 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
20 #include <aerospike/as_command.h>
21 #include <aerospike/as_cluster.h>
23 #include <aerospike/as_listener.h>
24 #include <citrusleaf/alloc.h>
25 
26 #ifdef __cplusplus
27 extern "C" {
28 #endif
29 
30 /******************************************************************************
31  * TYPES
32  *****************************************************************************/
33 
34 #define AS_ASYNC_TYPE_WRITE 0
35 #define AS_ASYNC_TYPE_RECORD 1
36 #define AS_ASYNC_TYPE_VALUE 2
37 #define AS_ASYNC_TYPE_BATCH 3
38 #define AS_ASYNC_TYPE_SCAN 4
39 #define AS_ASYNC_TYPE_QUERY 5
40 #define AS_ASYNC_TYPE_INFO 6
41 #define AS_ASYNC_TYPE_SCAN_PARTITION 7
42 #define AS_ASYNC_TYPE_QUERY_PARTITION 8
43 #define AS_ASYNC_TYPE_CONNECTOR 9
44 
45 #define AS_AUTHENTICATION_MAX_SIZE 158
46 
47 #define AS_ASYNC_CONNECTION_COMPLETE 0
48 #define AS_ASYNC_CONNECTION_PENDING 1
49 #define AS_ASYNC_CONNECTION_ERROR 2
50 
51 typedef struct as_async_write_command {
54  uint8_t space[];
56 
57 typedef struct as_async_record_command {
60  uint8_t space[];
62 
63 typedef struct as_async_value_command {
66  uint8_t space[];
68 
69 typedef struct as_async_info_command {
72  uint8_t space[];
74 
75 /******************************************************************************
76  * FUNCTIONS
77  *****************************************************************************/
78 
79 static inline as_event_command*
81  as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
82  as_policy_replica replica, as_async_write_listener listener, void* udata,
83  as_event_loop* event_loop, as_pipe_listener pipe_listener, size_t size,
84  as_event_parse_results_fn parse_results
85  )
86 {
87  // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
88  // Then, round up memory size in 1KB increments.
89  size_t s = (sizeof(as_async_write_command) + size + AS_AUTHENTICATION_MAX_SIZE + 1023) & ~1023;
90  as_event_command* cmd = (as_event_command*)cf_malloc(s);
92  cmd->total_deadline = policy->total_timeout;
93  cmd->socket_timeout = policy->socket_timeout;
94  cmd->max_retries = policy->max_retries;
95  cmd->iteration = 0;
96  cmd->replica = as_command_write_replica(replica);
97  cmd->event_loop = as_event_assign(event_loop);
98  cmd->cluster = cluster;
99  cmd->node = NULL;
100  cmd->ns = pi->ns;
101  cmd->partition = pi->partition;
102  cmd->udata = udata;
103  cmd->parse_results = parse_results;
104  cmd->pipe_listener = pipe_listener;
105  cmd->buf = wcmd->space;
106  cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_write_command));
107  cmd->type = AS_ASYNC_TYPE_WRITE;
110  cmd->flags = 0;
111  cmd->replica_size = pi->replica_size;
112  cmd->replica_index = 0;
113  wcmd->listener = listener;
114  return cmd;
115 }
116 
117 static inline as_event_command*
119  as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
120  as_policy_replica replica, uint8_t replica_index, bool deserialize, bool heap_rec,
121  uint8_t flags, as_async_record_listener listener, void* udata, as_event_loop* event_loop,
122  as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results
123  )
124 {
125  // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
126  // Then, round up memory size in 4KB increments to reduce fragmentation and to allow socket
127  // read to reuse buffer for small socket write sizes.
128  size_t s = (sizeof(as_async_record_command) + size + AS_AUTHENTICATION_MAX_SIZE + 4095) & ~4095;
129  as_event_command* cmd = (as_event_command*)cf_malloc(s);
131  cmd->total_deadline = policy->total_timeout;
132  cmd->socket_timeout = policy->socket_timeout;
133  cmd->max_retries = policy->max_retries;
134  cmd->iteration = 0;
135  cmd->replica = replica;
136  cmd->event_loop = as_event_assign(event_loop);
137  cmd->cluster = cluster;
138  cmd->node = NULL;
139  cmd->ns = pi->ns;
140  cmd->partition = pi->partition;
141  cmd->udata = udata;
142  cmd->parse_results = parse_results;
143  cmd->pipe_listener = pipe_listener;
144  cmd->buf = rcmd->space;
145  cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_record_command));
146  cmd->type = AS_ASYNC_TYPE_RECORD;
149  cmd->flags = flags;
150 
151  if (deserialize) {
153  }
154 
155  if (heap_rec) {
157  }
158 
159  cmd->replica_size = pi->replica_size;
160  cmd->replica_index = replica_index;
161  rcmd->listener = listener;
162  return cmd;
163 }
164 
165 static inline as_event_command*
167  as_cluster* cluster, const as_policy_base* policy, as_partition_info* pi,
168  as_policy_replica replica, as_async_value_listener listener, void* udata,
169  as_event_loop* event_loop, as_pipe_listener pipe_listener, size_t size,
170  as_event_parse_results_fn parse_results
171  )
172 {
173  // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
174  // Then, round up memory size in 4KB increments to reduce fragmentation and to allow socket
175  // read to reuse buffer for small socket write sizes.
176  size_t s = (sizeof(as_async_value_command) + size + AS_AUTHENTICATION_MAX_SIZE + 4095) & ~4095;
177  as_event_command* cmd = (as_event_command*)cf_malloc(s);
179  cmd->total_deadline = policy->total_timeout;
180  cmd->socket_timeout = policy->socket_timeout;
181  cmd->max_retries = policy->max_retries;
182  cmd->iteration = 0;
183  cmd->replica = as_command_write_replica(replica);
184  cmd->event_loop = as_event_assign(event_loop);
185  cmd->cluster = cluster;
186  cmd->node = NULL;
187  cmd->ns = pi->ns;
188  cmd->partition = pi->partition;
189  cmd->udata = udata;
190  cmd->parse_results = parse_results;
191  cmd->pipe_listener = pipe_listener;
192  cmd->buf = vcmd->space;
193  cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_value_command));
194  cmd->type = AS_ASYNC_TYPE_VALUE;
197  cmd->flags = 0;
198  cmd->replica_size = pi->replica_size;
199  cmd->replica_index = 0;
200  vcmd->listener = listener;
201  return cmd;
202 }
203 
204 static inline as_event_command*
206  as_node* node, const as_policy_info* policy, as_async_info_listener listener, void* udata,
207  as_event_loop* event_loop, size_t size
208  )
209 {
210  // Allocate enough memory to cover: struct size + write buffer size + auth max buffer size
211  // Then, round up memory size in 1KB increments.
212  size_t s = (sizeof(as_async_info_command) + size + AS_AUTHENTICATION_MAX_SIZE + 1023) & ~1023;
213  as_event_command* cmd = (as_event_command*)cf_malloc(s);
215  cmd->total_deadline = policy->timeout;
216  cmd->socket_timeout = policy->timeout;
217  cmd->max_retries = 1;
218  cmd->iteration = 0;
220  cmd->event_loop = as_event_assign(event_loop);
221  cmd->cluster = node->cluster;
222  cmd->node = node;
223  cmd->ns = NULL;
224  cmd->partition = NULL;
225  cmd->udata = udata;
227  cmd->pipe_listener = NULL;
228  cmd->buf = icmd->space;
229  cmd->read_capacity = (uint32_t)(s - size - sizeof(as_async_info_command));
230  cmd->type = AS_ASYNC_TYPE_INFO;
233  cmd->flags = 0;
234  cmd->replica_size = 1;
235  cmd->replica_index = 0;
236  icmd->listener = listener;
237  return cmd;
238 }
239 
240 #ifdef __cplusplus
241 } // end extern "C"
242 #endif
as_event_loop * event_loop
as_event_command command
Definition: as_async.h:58
void(* as_async_value_listener)(as_error *err, as_val *val, void *udata, as_event_loop *event_loop)
Definition: as_listener.h:61
as_event_parse_results_fn parse_results
#define AS_ASYNC_TYPE_RECORD
Definition: as_async.h:35
uint32_t socket_timeout
Definition: as_policy.h:405
static as_event_command * as_async_info_command_create(as_node *node, const as_policy_info *policy, as_async_info_listener listener, void *udata, as_event_loop *event_loop, size_t size)
Definition: as_async.h:205
as_policy_replica
Definition: as_policy.h:272
bool(* as_event_parse_results_fn)(struct as_event_command *cmd)
uint32_t max_retries
Definition: as_policy.h:442
#define AS_ASYNC_TYPE_VALUE
Definition: as_async.h:36
struct as_cluster_s * cluster
Definition: as_node.h:272
#define AS_ASYNC_FLAGS_HEAP_REC
void(* as_async_record_listener)(as_error *err, as_record *record, void *udata, as_event_loop *event_loop)
Definition: as_listener.h:48
as_event_command command
Definition: as_async.h:52
as_cluster * cluster
void(* as_async_info_listener)(as_error *err, char *response, void *udata, as_event_loop *event_loop)
Definition: as_listener.h:85
#define AS_AUTHENTICATION_MAX_SIZE
Definition: as_async.h:45
uint32_t timeout
Definition: as_policy.h:1258
const char * ns
Definition: as_partition.h:91
static as_policy_replica as_command_write_replica(as_policy_replica replica)
Definition: as_command.h:688
as_async_info_listener listener
Definition: as_async.h:71
as_async_record_listener listener
Definition: as_async.h:59
void(* as_async_write_listener)(as_error *err, void *udata, as_event_loop *event_loop)
Definition: as_listener.h:36
#define AS_ASYNC_TYPE_INFO
Definition: as_async.h:40
void(* as_pipe_listener)(void *udata, as_event_loop *event_loop)
Definition: as_listener.h:72
static as_event_command * as_async_value_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, as_async_value_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results)
Definition: as_async.h:166
static as_event_command * as_async_record_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, uint8_t replica_index, bool deserialize, bool heap_rec, uint8_t flags, as_async_record_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results)
Definition: as_async.h:118
as_async_value_listener listener
Definition: as_async.h:65
#define AS_ASYNC_STATE_UNREGISTERED
bool as_event_command_parse_info(as_event_command *cmd)
uint32_t total_timeout
Definition: as_policy.h:420
as_async_write_listener listener
Definition: as_async.h:53
as_pipe_listener pipe_listener
as_event_command command
Definition: as_async.h:64
as_policy_replica replica
static as_event_loop * as_event_assign(as_event_loop *event_loop)
#define AS_ASYNC_FLAGS_DESERIALIZE
#define AS_ASYNC_TYPE_WRITE
Definition: as_async.h:34
uint8_t replica_size
Definition: as_partition.h:94
#define AS_MESSAGE_TYPE
Definition: as_proto.h:38
#define AS_INFO_MESSAGE_TYPE
Definition: as_proto.h:36
as_event_command command
Definition: as_async.h:70
static as_event_command * as_async_write_command_create(as_cluster *cluster, const as_policy_base *policy, as_partition_info *pi, as_policy_replica replica, as_async_write_listener listener, void *udata, as_event_loop *event_loop, as_pipe_listener pipe_listener, size_t size, as_event_parse_results_fn parse_results)
Definition: as_async.h:80