All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
as_queue_mt.h
Go to the documentation of this file.
1 /*
2  * Copyright 2008-2019 Aerospike, Inc.
3  *
4  * Portions may be licensed to Aerospike, Inc. under one or more contributor
5  * license agreements.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14  * License for the specific language governing permissions and limitations under
15  * the License.
16  */
17 #pragma once
18 
19 #include <aerospike/as_queue.h>
20 #include <aerospike/as_std.h>
21 #include <pthread.h>
22 
23 #ifdef __cplusplus
24 extern "C" {
25 #endif
26 
27 /******************************************************************************
28  * TYPES
29  ******************************************************************************/
30 
31 /**
32  * A thread-safe multi-threaded dynamic queue implementation.
33  * as_queue_mt is not part of the generic as_val family.
34  */
35 typedef struct as_queue_mt_s {
36  /**
37  * The queue.
38  */
40 
41  /**
42  * The lock.
43  */
44  pthread_mutex_t lock;
45 
46  /**
47  * The notify/wait condition variable.
48  */
49  pthread_cond_t cond;
50 } as_queue_mt;
51 
52 /******************************************************************************
53  * MACROS
54  ******************************************************************************/
55 
56 #define AS_QUEUE_FOREVER -1
57 #define AS_QUEUE_NOWAIT 0
58 
59 /**
60  * Initialize a stack allocated as_queue_mt, with item storage on the stack.
61  * as_queue_mt_inita() will transfer stack memory to the heap if a resize is
62  * required.
63  */
64 #define as_queue_mt_inita(__q, __item_size, __capacity)\
65 as_queue_inita(&(__q)->queue, __item_size, __capacity);\
66 pthread_mutex_init(&(__q)->lock, NULL);\
67 pthread_cond_init(&(__q)->cond, NULL);
68 
69 /******************************************************************************
70  * FUNCTIONS
71  ******************************************************************************/
72 
73 /**
74  * Initialize a stack allocated as_queue, with item storage on the heap.
75  */
76 AS_EXTERN bool
77 as_queue_mt_init(as_queue_mt* queue, uint32_t item_size, uint32_t capacity);
78 
79 /**
80  * Create a heap allocated as_queue, with item storage on the heap.
81  */
83 as_queue_mt_create(uint32_t item_size, uint32_t capacity);
84 
85 /**
86  * Release queue memory.
87  */
88 static inline void
90 {
91  pthread_cond_destroy(&queue->cond);
92  pthread_mutex_destroy(&queue->lock);
93  as_queue_destroy(&queue->queue);
94 }
95 
96 /**
97  * Get the number of elements currently in the queue.
98  */
99 static inline uint32_t
101 {
102  pthread_mutex_lock(&queue->lock);
103  uint32_t size = as_queue_size(&queue->queue);
104  pthread_mutex_unlock(&queue->lock);
105  return size;
106 }
107 
108 /**
109  * Is queue empty?
110  */
111 static inline bool
113 {
114  pthread_mutex_lock(&queue->lock);
115  bool empty = as_queue_empty(&queue->queue);
116  pthread_mutex_unlock(&queue->lock);
117  return empty;
118 }
119 
120 /**
121  * Push to the tail of the queue.
122  */
123 static inline bool
124 as_queue_mt_push(as_queue_mt* queue, const void* ptr)
125 {
126  pthread_mutex_lock(&queue->lock);
127  bool status = as_queue_push(&queue->queue, ptr);
128 
129  if (status) {
130  pthread_cond_signal(&queue->cond);
131  }
132  pthread_mutex_unlock(&queue->lock);
133  return status;
134 }
135 
136 /**
137  * Push element on the queue only if size < capacity.
138  */
139 static inline bool
140 as_queue_mt_push_limit(as_queue_mt* queue, const void* ptr)
141 {
142  pthread_mutex_lock(&queue->lock);
143  bool status = as_queue_push_limit(&queue->queue, ptr);
144 
145  if (status) {
146  pthread_cond_signal(&queue->cond);
147  }
148  pthread_mutex_unlock(&queue->lock);
149  return status;
150 }
151 
152 /**
153  * Push to the front of the queue.
154  */
155 static inline bool
156 as_queue_mt_push_head(as_queue_mt* queue, const void* ptr)
157 {
158  pthread_mutex_lock(&queue->lock);
159  bool status = as_queue_push_head(&queue->queue, ptr);
160 
161  if (status) {
162  pthread_cond_signal(&queue->cond);
163  }
164  pthread_mutex_unlock(&queue->lock);
165  return status;
166 }
167 
168 /**
169  * Push to the front of the queue only if size < capacity.
170  */
171 static inline bool
172 as_queue_mt_push_head_limit(as_queue_mt* queue, const void* ptr)
173 {
174  pthread_mutex_lock(&queue->lock);
175  bool status = as_queue_push_head_limit(&queue->queue, ptr);
176 
177  if (status) {
178  pthread_cond_signal(&queue->cond);
179  }
180  pthread_mutex_unlock(&queue->lock);
181  return status;
182 }
183 
184 /**
185  * Pop from the head of the queue.
186  *
187  * If the queue is empty, wait_ms is the maximum time in milliseconds to wait
188  * for an available entry. If wait_ms is AS_QUEUE_FOREVER (-1), the wait time will be forever.
189  * If wait_ms is AS_QUEUE_NOWAIT (0), the function will not wait.
190  *
191  * The return value is true if an entry was successfully retrieved.
192  */
193 AS_EXTERN bool
194 as_queue_mt_pop(as_queue_mt* queue, void* ptr, int wait_ms);
195 
196 /**
197  * Pop from the tail of the queue.
198  *
199  * If the queue is empty, wait_ms is the maximum time in milliseconds to wait
200  * for an available entry. If wait_ms is AS_QUEUE_FOREVER (-1), the wait time will be forever.
201  * If wait_ms is AS_QUEUE_NOWAIT (0), the function will not wait.
202  *
203  * The return value is true if an entry was successfully retrieved.
204  */
205 AS_EXTERN bool
206 as_queue_mt_pop_tail(as_queue_mt* queue, void* ptr, int wait_ms);
207 
208 #ifdef __cplusplus
209 } // end extern "C"
210 #endif
static uint32_t as_queue_mt_size(as_queue_mt *queue)
Definition: as_queue_mt.h:100
AS_EXTERN bool as_queue_mt_pop(as_queue_mt *queue, void *ptr, int wait_ms)
as_queue queue
Definition: as_queue_mt.h:39
AS_EXTERN bool as_queue_mt_pop_tail(as_queue_mt *queue, void *ptr, int wait_ms)
AS_EXTERN as_queue_mt * as_queue_mt_create(uint32_t item_size, uint32_t capacity)
AS_EXTERN bool as_queue_push_head(as_queue *queue, const void *ptr)
AS_EXTERN bool as_queue_push(as_queue *queue, const void *ptr)
static bool as_queue_mt_push_limit(as_queue_mt *queue, const void *ptr)
Definition: as_queue_mt.h:140
static bool as_queue_mt_push_head(as_queue_mt *queue, const void *ptr)
Definition: as_queue_mt.h:156
#define AS_EXTERN
Definition: as_std.h:25
pthread_mutex_t lock
Definition: as_queue_mt.h:44
static uint32_t as_queue_size(as_queue *queue)
Definition: as_queue.h:114
AS_EXTERN bool as_queue_push_head_limit(as_queue *queue, const void *ptr)
static void as_queue_mt_destroy(as_queue_mt *queue)
Definition: as_queue_mt.h:89
AS_EXTERN bool as_queue_push_limit(as_queue *queue, const void *ptr)
static bool as_queue_mt_push_head_limit(as_queue_mt *queue, const void *ptr)
Definition: as_queue_mt.h:172
static bool as_queue_mt_push(as_queue_mt *queue, const void *ptr)
Definition: as_queue_mt.h:124
pthread_cond_t cond
Definition: as_queue_mt.h:49
static bool as_queue_empty(as_queue *queue)
Definition: as_queue.h:123
AS_EXTERN void as_queue_destroy(as_queue *queue)
AS_EXTERN bool as_queue_mt_init(as_queue_mt *queue, uint32_t item_size, uint32_t capacity)
static bool as_queue_mt_empty(as_queue_mt *queue)
Definition: as_queue_mt.h:112