All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
as_event_internal.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2023 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
19 #include <aerospike/as_admin.h>
20 #include <aerospike/as_cluster.h>
21 #include <aerospike/as_listener.h>
22 #include <aerospike/as_queue.h>
23 #include <aerospike/as_proto.h>
24 #include <aerospike/as_socket.h>
25 #include <citrusleaf/cf_ll.h>
26 #include <pthread.h>
27 
28 #if defined(AS_USE_LIBEV)
29 #include <ev.h>
30 #elif defined(AS_USE_LIBUV)
31 #include <uv.h>
32 struct as_uv_tls;
33 #elif defined(AS_USE_LIBEVENT)
34 #include <event2/event.h>
35 #else
36 #endif
37 
38 #ifdef __cplusplus
39 extern "C" {
40 #endif
41 
42 /******************************************************************************
43  * TYPES
44  *****************************************************************************/
45 
46 #define AS_ASYNC_STATE_UNREGISTERED 0
47 #define AS_ASYNC_STATE_REGISTERED 1
48 #define AS_ASYNC_STATE_DELAY_QUEUE 2
49 #define AS_ASYNC_STATE_CONNECT 3
50 #define AS_ASYNC_STATE_TLS_CONNECT 4
51 #define AS_ASYNC_STATE_AUTH_WRITE 5
52 #define AS_ASYNC_STATE_AUTH_READ_HEADER 6
53 #define AS_ASYNC_STATE_AUTH_READ_BODY 7
54 #define AS_ASYNC_STATE_COMMAND_WRITE 8
55 #define AS_ASYNC_STATE_COMMAND_READ_HEADER 9
56 #define AS_ASYNC_STATE_COMMAND_READ_BODY 10
57 #define AS_ASYNC_STATE_QUEUE_ERROR 11
58 #define AS_ASYNC_STATE_RETRY 12
59 
60 #define AS_ASYNC_FLAGS_DESERIALIZE 1
61 #define AS_ASYNC_FLAGS_READ 2
62 #define AS_ASYNC_FLAGS_HAS_TIMER 4
63 #define AS_ASYNC_FLAGS_USING_SOCKET_TIMER 8
64 #define AS_ASYNC_FLAGS_EVENT_RECEIVED 16
65 #define AS_ASYNC_FLAGS_FREE_BUF 32
66 #define AS_ASYNC_FLAGS_LINEARIZE 64
67 #define AS_ASYNC_FLAGS_HEAP_REC 128
68 
69 #define AS_ASYNC_AUTH_RETURN_CODE 1
70 
71 #define AS_EVENT_CONNECTION_COMPLETE 0
72 #define AS_EVENT_CONNECTION_PENDING 1
73 #define AS_EVENT_CONNECTION_ERROR 2
74 
75 #define AS_EVENT_QUEUE_INITIAL_CAPACITY 256
76 
77 struct as_event_command;
78 struct as_event_executor;
79 
80 typedef struct {
81 #if defined(AS_USE_LIBEV)
82  struct ev_io watcher;
83  as_socket socket;
84 #elif defined(AS_USE_LIBUV)
85  uv_tcp_t socket;
86  struct as_uv_tls* tls;
87  // Reuse memory for requests, because only one request is active at a time.
88  union {
89  uv_connect_t connect;
90  uv_write_t write;
91  } req;
92  uint64_t last_used;
93 #elif defined(AS_USE_LIBEVENT)
94  struct event watcher;
95  as_socket socket;
96 #else
97 #endif
98  int watching;
99  bool pipeline;
101 
102 typedef struct {
106 
107 typedef struct {
109  void* udata;
111 
112 typedef void (*as_event_executable) (as_event_loop* event_loop, void* udata);
113 typedef bool (*as_event_parse_results_fn) (struct as_event_command* cmd);
114 typedef void (*as_event_executor_complete_fn) (struct as_event_executor* executor);
115 
116 typedef struct as_event_command {
117 #if defined(AS_USE_LIBEV)
118  struct ev_timer timer;
119 #elif defined(AS_USE_LIBUV)
120  uv_timer_t timer;
121 #elif defined(AS_USE_LIBEVENT)
122  struct event timer;
123 #else
124 #endif
125  uint64_t total_deadline;
126  uint32_t socket_timeout;
127  uint32_t max_retries;
128  uint32_t iteration;
135  const char* ns;
136  void* partition; // as_partition* or as_partition_shm*
137  void* udata;
140  cf_ll_element pipe_link;
141 
142  uint8_t* buf;
144  uint32_t write_offset;
145  uint32_t write_len;
146  uint32_t read_capacity;
147  uint32_t len;
148  uint32_t pos;
149 
150  uint8_t type;
151  uint8_t proto_type;
152  uint8_t proto_type_rcv;
153  uint8_t state;
154  uint8_t flags;
155  uint8_t replica_size;
156  uint8_t replica_index;
157  uint8_t replica_index_sc; // Used in batch only.
159 
160 typedef struct {
162  void* udata;
164 
165 typedef struct as_event_executor {
166  pthread_mutex_t lock;
170  void* udata;
172  char* ns;
173  uint64_t cluster_key;
174  uint32_t max_concurrent;
175  uint32_t max;
176  uint32_t count;
177  uint32_t queued;
178  bool notify;
179  bool valid;
181 
182 /******************************************************************************
183  * COMMON FUNCTIONS
184  *****************************************************************************/
185 
186 as_status
188 
189 void
191 
192 bool
194 
195 bool
197 
198 bool
200 
201 void
203 
204 void
206 
207 void
209 
210 bool
211 as_event_command_retry(as_event_command* cmd, bool timeout);
212 
213 void
215 
216 void
218 
219 void
221 
222 void
223 as_event_executor_error(as_event_executor* executor, as_error* err, uint32_t command_count);
224 
225 void
226 as_event_executor_cancel(as_event_executor* executor, uint32_t queued_count);
227 
228 void
230 
231 void
233 
234 void
236 
237 void
239 
240 void
242 
243 void
245 
246 bool
248 
249 bool
251 
252 bool
254 
255 bool
257 
258 void
260 
261 void
263 
264 void
266 
267 void
269 
270 /******************************************************************************
271  * IMPLEMENTATION SPECIFIC FUNCTIONS
272  *****************************************************************************/
273 
274 bool
276 
277 void
279 
280 /**
281  * Schedule execution of function on specified event loop.
282  * Command is placed on event loop queue and is never executed directly.
283  */
284 bool
285 as_event_execute(as_event_loop* event_loop, as_event_executable executable, void* udata);
286 
287 void
289 
290 void
292 
293 void
295 
296 /******************************************************************************
297  * LIBEV INLINE FUNCTIONS
298  *****************************************************************************/
299 
300 #if defined(AS_USE_LIBEV)
301 
302 void as_ev_timer_cb(struct ev_loop* loop, ev_timer* timer, int revents);
303 void as_ev_repeat_cb(struct ev_loop* loop, ev_timer* timer, int revents);
304 
305 static inline bool
306 as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
307 {
308  return as_socket_current_trim(conn->socket.last_used, max_socket_idle_ns);
309 }
310 
311 static inline bool
312 as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
313 {
314  return as_socket_current_tran(conn->socket.last_used, max_socket_idle_ns);
315 }
316 
317 static inline int
319 {
320  return as_socket_validate_fd(conn->socket.fd);
321 }
322 
323 static inline void
325 {
326  as_socket_close(&conn->socket);
327  cf_free(conn);
328 }
329 
330 static inline void
332 {
333  conn->socket.last_used = cf_getns();
334 }
335 
336 static inline void
337 as_event_timer_once(as_event_command* cmd, uint64_t timeout)
338 {
339  ev_timer_init(&cmd->timer, as_ev_timer_cb, (double)timeout / 1000.0, 0.0);
340  cmd->timer.data = cmd;
341  ev_timer_start(cmd->event_loop->loop, &cmd->timer);
343 }
344 
345 static inline void
346 as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
347 {
348  ev_init(&cmd->timer, as_ev_repeat_cb);
349  cmd->timer.repeat = (double)repeat / 1000.0;
350  cmd->timer.data = cmd;
351  ev_timer_again(cmd->event_loop->loop, &cmd->timer);
353 }
354 
355 static inline void
357 {
358  ev_timer_again(cmd->event_loop->loop, &cmd->timer);
359 }
360 
361 static inline void
363 {
364  if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
365  ev_timer_stop(cmd->event_loop->loop, &cmd->timer);
366  }
367 }
368 
369 static inline void
371 {
372  ev_io_stop(cmd->event_loop->loop, &conn->watcher);
373  conn->watching = 0;
374 }
375 
376 static inline void
378 {
379  // This method only needed for libuv pipelined connections.
380 }
381 
382 static inline void
384 {
386 }
387 
388 /******************************************************************************
389  * LIBUV INLINE FUNCTIONS
390  *****************************************************************************/
391 
392 #elif defined(AS_USE_LIBUV)
393 
394 void as_uv_timer_cb(uv_timer_t* timer);
395 void as_uv_repeat_cb(uv_timer_t* timer);
397 
398 static inline bool
399 as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
400 {
401  return as_socket_current_trim(conn->last_used, max_socket_idle_ns);
402 }
403 
404 static inline bool
405 as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
406 {
407  return as_socket_current_tran(conn->last_used, max_socket_idle_ns);
408 }
409 
410 static inline int
412 {
413  // Libuv does not have a peek function, so use fd directly.
414  uv_os_fd_t fd;
415 
416  if (uv_fileno((uv_handle_t*)&conn->socket, &fd) == 0) {
418  }
419  return -1;
420 }
421 
422 static inline void
424 {
425  conn->last_used = cf_getns();
426 }
427 
428 static inline void
429 as_event_timer_once(as_event_command* cmd, uint64_t timeout)
430 {
431  if (!(cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER)) {
432  uv_timer_init(cmd->event_loop->loop, &cmd->timer);
433  cmd->timer.data = cmd;
434  }
435  uv_timer_start(&cmd->timer, as_uv_timer_cb, timeout, 0);
437 }
438 
439 static inline void
440 as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
441 {
442  if (!(cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER)) {
443  uv_timer_init(cmd->event_loop->loop, &cmd->timer);
444  cmd->timer.data = cmd;
445  }
446  uv_timer_start(&cmd->timer, as_uv_repeat_cb, repeat, repeat);
448 }
449 
450 static inline void
452 {
453  // libuv socket timers automatically repeat.
454 }
455 
456 static inline void
458 {
459  if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
460  uv_timer_stop(&cmd->timer);
461  }
462 }
463 
464 static inline void
466 {
467  // uv_read_stop() will handle case where read is already stopped.
468  // Do not set watching to zero because conn is still initialized and active.
469  // libuv works differently here.
470  uv_read_stop((uv_stream_t*)conn);
471 }
472 
473 static inline void
475 {
476  uv_read_stop((uv_stream_t*)conn);
477 }
478 
479 void
480 as_uv_timer_closed(uv_handle_t* handle);
481 
482 static inline void
484 {
485  if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
486  // libuv requires that cmd can't be freed until timer is closed.
487  uv_close((uv_handle_t*)&cmd->timer, as_uv_timer_closed);
488  }
489  else {
491  }
492 }
493 
494 /******************************************************************************
495  * LIBEVENT INLINE FUNCTIONS
496  *****************************************************************************/
497 
498 #elif defined(AS_USE_LIBEVENT)
499 
500 void as_libevent_timer_cb(evutil_socket_t sock, short events, void* udata);
501 void as_libevent_repeat_cb(evutil_socket_t sock, short events, void* udata);
502 
503 static inline bool
504 as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
505 {
506  return as_socket_current_trim(conn->socket.last_used, max_socket_idle_ns);
507 }
508 
509 static inline bool
510 as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
511 {
512  return as_socket_current_tran(conn->socket.last_used, max_socket_idle_ns);
513 }
514 
515 static inline int
517 {
518  return as_socket_validate_fd(conn->socket.fd);
519 }
520 
521 static inline void
523 {
524  as_socket_close(&conn->socket);
525  cf_free(conn);
526 }
527 
528 static inline void
530 {
531  conn->socket.last_used = cf_getns();
532 }
533 
534 static inline void
535 as_event_timer_once(as_event_command* cmd, uint64_t timeout)
536 {
537  evtimer_assign(&cmd->timer, cmd->event_loop->loop, as_libevent_timer_cb, cmd);
538  struct timeval tv;
539  tv.tv_sec = (uint32_t)timeout / 1000;
540  tv.tv_usec = ((uint32_t)timeout % 1000) * 1000;
541  evtimer_add(&cmd->timer, &tv);
543 }
544 
545 static inline void
546 as_event_timer_repeat(as_event_command* cmd, uint64_t repeat)
547 {
548  event_assign(&cmd->timer, cmd->event_loop->loop, -1, EV_PERSIST, as_libevent_repeat_cb, cmd);
549  struct timeval tv;
550  tv.tv_sec = (uint32_t)repeat / 1000;
551  tv.tv_usec = ((uint32_t)repeat % 1000) * 1000;
552  evtimer_add(&cmd->timer, &tv);
554 }
555 
556 static inline void
558 {
559  // libevent socket timers automatically repeat.
560 }
561 
562 static inline void
564 {
565  if (cmd->flags & AS_ASYNC_FLAGS_HAS_TIMER) {
566  evtimer_del(&cmd->timer);
567  }
568 }
569 
570 static inline void
572 {
573  event_del(&conn->watcher);
574  conn->watching = 0;
575 }
576 
577 static inline void
579 {
580  // This method only needed for libuv pipelined connections.
581 }
582 
583 static inline void
585 {
587 }
588 
589 /******************************************************************************
590  * EVENT_LIB NOT DEFINED INLINE FUNCTIONS
591  *****************************************************************************/
592 
593 #else
594 
595 static inline bool
596 as_event_conn_current_trim(as_event_connection* conn, uint64_t max_socket_idle_ns)
597 {
598  return false;
599 }
600 
601 static inline bool
602 as_event_conn_current_tran(as_event_connection* conn, uint64_t max_socket_idle_ns)
603 {
604  return false;
605 }
606 
607 static inline int
609 {
610  return -1;
611 }
612 
613 static inline void
615 {
616 }
617 
618 static inline void
620 {
621 }
622 
623 static inline void
624 as_event_timer_once(as_event_command* cmd, uint64_t timeout)
625 {
626 }
627 
628 static inline void
630 {
631 }
632 
633 static inline void
635 {
636 }
637 
638 static inline void
640 {
641 }
642 
643 static inline void
645 {
646 }
647 
648 static inline void
650 {
651 }
652 
653 static inline void
655 {
656 }
657 
658 #endif
659 
660 /******************************************************************************
661  * COMMON INLINE FUNCTIONS
662  *****************************************************************************/
663 
664 static inline as_event_loop*
666 {
667  // Assign event loop using round robin distribution if not specified.
668  return event_loop ? event_loop : as_event_loop_get();
669 }
670 
671 static inline void
673 {
674  // Authentication write buffer is always located after command write buffer.
675  uint8_t* buf = (uint8_t*)cmd + cmd->write_offset + cmd->write_len;
676  uint32_t len = as_authenticate_set(cmd->cluster, session, buf);
677  cmd->len = cmd->write_len + len;
678  cmd->pos = cmd->write_len;
679 }
680 
681 static inline void
683 {
684  // Authenticate read buffer uses the standard read buffer (buf).
685  cmd->len = sizeof(as_proto);
686  cmd->pos = 0;
688 }
689 
690 static inline bool
692 {
693  // Authenticate read buffer uses the standard read buffer (buf).
694  as_proto* proto = (as_proto*)cmd->buf;
695 
696  if (! as_event_proto_parse_auth(cmd, proto)) {
697  return false;
698  }
699 
700  cmd->len = (uint32_t)proto->sz;
701  cmd->pos = 0;
703  return true;
704 }
705 
706 static inline void
708 {
709  cmd->len = cmd->write_len;
710  cmd->pos = 0;
711 }
712 
713 static inline void
714 as_async_conn_pool_init(as_async_conn_pool* pool, uint32_t min_size, uint32_t max_size)
715 {
716  as_queue_init(&pool->queue, sizeof(void*), max_size);
717  pool->min_size = min_size;
718  pool->limit = max_size;
719  pool->opened = 0;
720  pool->closed = 0;
721 }
722 
723 static inline bool
725 {
726  if (pool->queue.total >= pool->limit) {
727  return false;
728  }
729  pool->queue.total++;
730  return true;
731 }
732 
733 static inline bool
735 {
736  if (pool->queue.total > pool->limit) {
737  return false;
738  }
739  return as_queue_push_head(&pool->queue, &conn);
740 }
741 
742 static inline bool
744 {
745  if (pool->queue.total > pool->limit) {
746  return false;
747  }
748  return as_queue_push(&pool->queue, &conn);
749 }
750 
751 static inline void
753 {
755  as_queue_decr_total(&pool->queue);
756  pool->closed++;
757 }
758 
759 static inline void
761 {
763  as_event_release_connection(cmd->conn, pool);
765 }
766 
767 static inline void
769 {
770  as_async_conn_pool* pool = cmd->pipe_listener != NULL ?
771  &cmd->node->pipe_conn_pools[cmd->event_loop->index] :
772  &cmd->node->async_conn_pools[cmd->event_loop->index];
773 
774  as_queue_decr_total(&pool->queue);
775 }
776 
777 static inline void
779 {
780  as_event_connection* conn = cmd->conn;
781 
782  if (conn) {
783  if (conn->watching > 0) {
784  as_event_stop_watcher(cmd, conn);
785  as_event_release_connection(conn, pool);
787  }
788  else {
789  cf_free(conn);
790  as_queue_decr_total(&pool->queue);
791  pool->closed++;
792  }
793  }
794 }
795 
796 static inline bool
798 {
799  if (cmd->pipe_listener) {
800  return false;
801  }
802 
803  as_event_stop_watcher(cmd, cmd->conn);
805  return as_event_command_retry(cmd, false);
806 }
807 
808 static inline void
810 {
811  // Use this function to free batch/scan/query commands that were never started.
812  as_node_release(cmd->node);
813  cf_free(cmd);
814 }
815 
816 static inline void
818 {
819  as_queue_destroy(&event_loop->queue);
820  as_queue_destroy(&event_loop->delay_queue);
821  as_queue_destroy(&event_loop->pipe_cb_queue);
822  pthread_mutex_destroy(&event_loop->lock);
823 }
824 
825 #ifdef __cplusplus
826 } // end extern "C"
827 #endif
as_event_loop * event_loop
uint32_t min_size
Definition: as_node.h:189
as_event_parse_results_fn parse_results
as_queue queue
Definition: as_event.h:125
as_queue delay_queue
Definition: as_event.h:126
bool as_event_command_parse_success_failure(as_event_command *cmd)
static void as_event_timer_stop(as_event_command *cmd)
void(* as_event_executable)(as_event_loop *event_loop, void *udata)
#define AS_ASYNC_FLAGS_USING_SOCKET_TIMER
static void as_event_command_destroy(as_event_command *cmd)
as_policy_replica
Definition: as_policy.h:272
as_event_state * event_state
static void as_event_release_connection(as_event_connection *conn, as_async_conn_pool *pool)
as_event_executor_complete_fn complete_fn
bool(* as_event_parse_results_fn)(struct as_event_command *cmd)
#define AS_ASYNC_STATE_AUTH_READ_HEADER
as_pipe_listener listener
as_status
Definition: as_status.h:30
void as_event_command_write_start(as_event_command *cmd)
AS_EXTERN bool as_queue_init(as_queue *queue, uint32_t item_size, uint32_t capacity)
#define as_socket_fd
Definition: as_socket.h:32
static bool as_event_socket_retry(as_event_command *cmd)
void as_event_command_free(as_event_command *cmd)
static bool as_event_set_auth_parse_header(as_event_command *cmd)
static void as_event_set_auth_read_header(as_event_command *cmd)
static void as_node_incr_error_count(as_node *node)
Definition: as_cluster.h:547
bool as_event_command_parse_header(as_event_command *cmd)
static void as_event_command_release(as_event_command *cmd)
pthread_mutex_t lock
Definition: as_event.h:124
void as_event_node_destroy(as_node *node)
void(* as_event_executor_complete_fn)(struct as_event_executor *executor)
void as_event_process_timer(as_event_command *cmd)
void as_event_notify_error(as_event_command *cmd, as_error *err)
static void as_event_release_async_connection(as_event_command *cmd)
AS_EXTERN bool as_queue_push_head(as_queue *queue, const void *ptr)
AS_EXTERN bool as_queue_push(as_queue *queue, const void *ptr)
void as_event_socket_error(as_event_command *cmd, as_error *err)
as_queue queue
Definition: as_node.h:184
void as_event_query_complete(as_event_command *cmd)
as_cluster * cluster
static void as_event_timer_repeat(as_event_command *cmd, uint64_t repeat)
struct as_event_command ** commands
static bool as_async_conn_pool_push(as_async_conn_pool *pool, as_event_connection *conn)
static void as_event_set_auth_write(as_event_command *cmd, as_session *session)
as_queue pipe_cb_queue
Definition: as_event.h:127
bool as_event_create_loop(as_event_loop *event_loop)
static bool as_async_conn_pool_push_head(as_async_conn_pool *pool, as_event_connection *conn)
as_event_loop * event_loop
as_status as_event_command_execute(as_event_command *cmd, as_error *err)
as_async_conn_pool * async_conn_pools
Definition: as_node.h:283
int as_socket_validate_fd(as_socket_fd fd)
void * loop
Definition: as_event.h:120
static void as_event_set_conn_last_used(as_event_connection *conn)
static bool as_event_conn_current_trim(as_event_connection *conn, uint64_t max_socket_idle_ns)
as_event_executable executable
as_proto proto
Definition: as_proto.h:35
bool as_event_proto_parse_auth(as_event_command *cmd, as_proto *proto)
bool as_event_command_retry(as_event_command *cmd, bool timeout)
void as_event_close_cluster(as_cluster *cluster)
void as_event_connector_success(as_event_command *cmd)
#define AS_ASYNC_STATE_AUTH_READ_BODY
uint32_t limit
Definition: as_node.h:194
static void as_event_stop_watcher(as_event_command *cmd, as_event_connection *conn)
void(* as_pipe_listener)(void *udata, as_event_loop *event_loop)
Definition: as_listener.h:72
static void as_event_set_write(as_event_command *cmd)
void as_event_response_error(as_event_command *cmd, as_error *err)
static void as_event_decr_conn(as_event_command *cmd)
void as_event_execute_retry(as_event_command *cmd)
void as_event_total_timeout(as_event_command *cmd)
static void as_event_loop_destroy(as_event_loop *event_loop)
bool as_event_execute(as_event_loop *event_loop, as_event_executable executable, void *udata)
as_async_conn_pool * pipe_conn_pools
Definition: as_node.h:288
void as_event_connect(as_event_command *cmd, as_async_conn_pool *pool)
uint32_t opened
Definition: as_node.h:199
void as_event_register_external_loop(as_event_loop *event_loop)
void as_event_command_schedule(as_event_command *cmd)
static void as_event_timer_again(as_event_command *cmd)
static void as_event_close_connection(as_event_connection *conn)
uint32_t closed
Definition: as_node.h:204
uint32_t index
Definition: as_event.h:129
uint32_t total
Definition: as_queue.h:63
bool as_event_command_parse_info(as_event_command *cmd)
static void as_event_connection_timeout(as_event_command *cmd, as_async_conn_pool *pool)
void as_event_socket_timeout(as_event_command *cmd)
static as_event_loop * as_event_loop_get()
Definition: as_event.h:380
void as_event_executor_cancel(as_event_executor *executor, uint32_t queued_count)
cf_ll_element pipe_link
as_event_connection * conn
void as_event_executor_complete(as_event_executor *executor)
void as_event_executor_error(as_event_executor *executor, as_error *err, uint32_t command_count)
uint32_t as_authenticate_set(struct as_cluster_s *cluster, struct as_session_s *session, uint8_t *buffer)
static void as_node_release(as_node *node)
Definition: as_node.h:495
pthread_mutex_t lock
as_pipe_listener pipe_listener
void as_socket_close(as_socket *sock)
as_policy_replica replica
bool as_event_command_parse_result(as_event_command *cmd)
static bool as_socket_current_trim(uint64_t last_used, uint64_t max_socket_idle_ns)
Definition: as_socket.h:207
struct as_event_command * cmd
static int as_event_conn_validate(as_event_connection *conn)
static as_event_loop * as_event_assign(as_event_loop *event_loop)
static bool as_event_conn_current_tran(as_event_connection *conn, uint64_t max_socket_idle_ns)
bool as_event_proto_parse(as_event_command *cmd, as_proto *proto)
as_event_connection base
static void as_event_stop_read(as_event_connection *conn)
void as_event_error_callback(as_event_command *cmd, as_error *err)
AS_EXTERN void as_queue_destroy(as_queue *queue)
static void as_event_timer_once(as_event_command *cmd, uint64_t timeout)
void as_event_parse_error(as_event_command *cmd, as_error *err)
static bool as_async_conn_pool_incr_total(as_async_conn_pool *pool)
void as_event_batch_complete(as_event_command *cmd)
static bool as_socket_current_tran(uint64_t last_used, uint64_t max_socket_idle_ns)
Definition: as_socket.h:197
bool as_event_decompress(as_event_command *cmd)
void as_event_create_connections(as_node *node, as_async_conn_pool *pools)
#define AS_ASYNC_FLAGS_HAS_TIMER
static void as_async_conn_pool_init(as_async_conn_pool *pool, uint32_t min_size, uint32_t max_size)
uint32_t command_sent_counter
static void as_queue_decr_total(as_queue *queue)
Definition: as_queue.h:217