/usr/include/urcu/static/wfcqueue.h is in liburcu-dev 0.8.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 | #ifndef _URCU_WFCQUEUE_STATIC_H
#define _URCU_WFCQUEUE_STATIC_H
/*
* urcu/static/wfcqueue.h
*
* Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue
*
* TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu/wfcqueue.h for
* linking dynamically with the userspace rcu library.
*
* Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <pthread.h>
#include <assert.h>
#include <poll.h>
#include <stdbool.h>
#include <urcu/compiler.h>
#include <urcu/uatomic.h>
#ifdef __cplusplus
extern "C" {
#endif
/*
* Concurrent queue with wait-free enqueue/blocking dequeue.
*
* This queue has been designed and implemented collaboratively by
* Mathieu Desnoyers and Lai Jiangshan. Inspired from
* half-wait-free/half-blocking queue implementation done by Paul E.
* McKenney.
*
* Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API
*
* Synchronization table:
*
* External synchronization techniques described in the API below is
* required between pairs marked with "X". No external synchronization
* required between pairs marked with "-".
*
* Legend:
* [1] cds_wfcq_enqueue
* [2] __cds_wfcq_splice (destination queue)
* [3] __cds_wfcq_dequeue
* [4] __cds_wfcq_splice (source queue)
* [5] __cds_wfcq_first
* [6] __cds_wfcq_next
*
* [1] [2] [3] [4] [5] [6]
* [1] - - - - - -
* [2] - - - - - -
* [3] - - X X X X
* [4] - - X - X X
* [5] - - X X - -
* [6] - - X X - -
*
* Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock().
*
* For convenience, cds_wfcq_dequeue_blocking() and
* cds_wfcq_splice_blocking() hold the dequeue lock.
*
* Besides locking, mutual exclusion of dequeue, splice and iteration
* can be ensured by performing all of those operations from a single
* thread, without requiring any lock.
*/
#define WFCQ_ADAPT_ATTEMPTS 10 /* Retry if being set */
#define WFCQ_WAIT 10 /* Wait 10 ms if being set */
/*
* cds_wfcq_node_init: initialize wait-free queue node.
*/
static inline void _cds_wfcq_node_init(struct cds_wfcq_node *node)
{
node->next = NULL;
}
/*
* cds_wfcq_init: initialize wait-free queue.
*/
static inline void _cds_wfcq_init(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
int ret;
/* Set queue head and tail */
_cds_wfcq_node_init(&head->node);
tail->p = &head->node;
ret = pthread_mutex_init(&head->lock, NULL);
assert(!ret);
}
/*
* cds_wfcq_empty: return whether wait-free queue is empty.
*
* No memory barrier is issued. No mutual exclusion is required.
*
* We perform the test on head->node.next to check if the queue is
* possibly empty, but we confirm this by checking if the tail pointer
* points to the head node because the tail pointer is the linearisation
* point of the enqueuers. Just checking the head next pointer could
* make a queue appear empty if an enqueuer is preempted for a long time
* between xchg() and setting the previous node's next pointer.
*/
static inline bool _cds_wfcq_empty(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
/*
* Queue is empty if no node is pointed by head->node.next nor
* tail->p. Even though the tail->p check is sufficient to find
* out of the queue is empty, we first check head->node.next as a
* common case to ensure that dequeuers do not frequently access
* enqueuer's tail->p cache line.
*/
return CMM_LOAD_SHARED(head->node.next) == NULL
&& CMM_LOAD_SHARED(tail->p) == &head->node;
}
static inline void _cds_wfcq_dequeue_lock(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
int ret;
ret = pthread_mutex_lock(&head->lock);
assert(!ret);
}
static inline void _cds_wfcq_dequeue_unlock(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
int ret;
ret = pthread_mutex_unlock(&head->lock);
assert(!ret);
}
static inline bool ___cds_wfcq_append(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail,
struct cds_wfcq_node *new_head,
struct cds_wfcq_node *new_tail)
{
struct cds_wfcq_node *old_tail;
/*
* Implicit memory barrier before uatomic_xchg() orders earlier
* stores to data structure containing node and setting
* node->next to NULL before publication.
*/
old_tail = uatomic_xchg(&tail->p, new_tail);
/*
* Implicit memory barrier after uatomic_xchg() orders store to
* q->tail before store to old_tail->next.
*
* At this point, dequeuers see a NULL tail->p->next, which
* indicates that the queue is being appended to. The following
* store will append "node" to the queue from a dequeuer
* perspective.
*/
CMM_STORE_SHARED(old_tail->next, new_head);
/*
* Return false if queue was empty prior to adding the node,
* else return true.
*/
return old_tail != &head->node;
}
/*
* cds_wfcq_enqueue: enqueue a node into a wait-free queue.
*
* Issues a full memory barrier before enqueue. No mutual exclusion is
* required.
*
* Returns false if the queue was empty prior to adding the node.
* Returns true otherwise.
*/
static inline bool _cds_wfcq_enqueue(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail,
struct cds_wfcq_node *new_tail)
{
return ___cds_wfcq_append(head, tail, new_tail, new_tail);
}
/*
* ___cds_wfcq_busy_wait: adaptative busy-wait.
*
* Returns 1 if nonblocking and needs to block, 0 otherwise.
*/
static inline bool
___cds_wfcq_busy_wait(int *attempt, int blocking)
{
if (!blocking)
return 1;
if (++(*attempt) >= WFCQ_ADAPT_ATTEMPTS) {
poll(NULL, 0, WFCQ_WAIT); /* Wait for 10ms */
*attempt = 0;
} else {
caa_cpu_relax();
}
return 0;
}
/*
* Waiting for enqueuer to complete enqueue and return the next node.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_node_sync_next(struct cds_wfcq_node *node, int blocking)
{
struct cds_wfcq_node *next;
int attempt = 0;
/*
* Adaptative busy-looping waiting for enqueuer to complete enqueue.
*/
while ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
if (___cds_wfcq_busy_wait(&attempt, blocking))
return CDS_WFCQ_WOULDBLOCK;
}
return next;
}
static inline struct cds_wfcq_node *
___cds_wfcq_first(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail,
int blocking)
{
struct cds_wfcq_node *node;
if (_cds_wfcq_empty(head, tail))
return NULL;
node = ___cds_wfcq_node_sync_next(&head->node, blocking);
/* Load head->node.next before loading node's content */
cmm_smp_read_barrier_depends();
return node;
}
/*
* __cds_wfcq_first_blocking: get first node of a queue, without dequeuing.
*
* Content written into the node before enqueue is guaranteed to be
* consistent, but no other memory ordering is ensured.
* Dequeue/splice/iteration mutual exclusion should be ensured by the
* caller.
*
* Used by for-like iteration macros in urcu/wfqueue.h:
* __cds_wfcq_for_each_blocking()
* __cds_wfcq_for_each_blocking_safe()
*
* Returns NULL if queue is empty, first node otherwise.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_first_blocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
return ___cds_wfcq_first(head, tail, 1);
}
/*
* __cds_wfcq_first_nonblocking: get first node of a queue, without dequeuing.
*
* Same as __cds_wfcq_first_blocking, but returns CDS_WFCQ_WOULDBLOCK if
* it needs to block.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_first_nonblocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
return ___cds_wfcq_first(head, tail, 0);
}
static inline struct cds_wfcq_node *
___cds_wfcq_next(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail,
struct cds_wfcq_node *node,
int blocking)
{
struct cds_wfcq_node *next;
/*
* Even though the following tail->p check is sufficient to find
* out if we reached the end of the queue, we first check
* node->next as a common case to ensure that iteration on nodes
* do not frequently access enqueuer's tail->p cache line.
*/
if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
/* Load node->next before tail->p */
cmm_smp_rmb();
if (CMM_LOAD_SHARED(tail->p) == node)
return NULL;
next = ___cds_wfcq_node_sync_next(node, blocking);
}
/* Load node->next before loading next's content */
cmm_smp_read_barrier_depends();
return next;
}
/*
* __cds_wfcq_next_blocking: get next node of a queue, without dequeuing.
*
* Content written into the node before enqueue is guaranteed to be
* consistent, but no other memory ordering is ensured.
* Dequeue/splice/iteration mutual exclusion should be ensured by the
* caller.
*
* Used by for-like iteration macros in urcu/wfqueue.h:
* __cds_wfcq_for_each_blocking()
* __cds_wfcq_for_each_blocking_safe()
*
* Returns NULL if reached end of queue, non-NULL next queue node
* otherwise.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_next_blocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail,
struct cds_wfcq_node *node)
{
return ___cds_wfcq_next(head, tail, node, 1);
}
/*
* __cds_wfcq_next_blocking: get next node of a queue, without dequeuing.
*
* Same as __cds_wfcq_next_blocking, but returns CDS_WFCQ_WOULDBLOCK if
* it needs to block.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_next_nonblocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail,
struct cds_wfcq_node *node)
{
return ___cds_wfcq_next(head, tail, node, 0);
}
static inline struct cds_wfcq_node *
___cds_wfcq_dequeue_with_state(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail,
int *state,
int blocking)
{
struct cds_wfcq_node *node, *next;
if (state)
*state = 0;
if (_cds_wfcq_empty(head, tail)) {
return NULL;
}
node = ___cds_wfcq_node_sync_next(&head->node, blocking);
if (!blocking && node == CDS_WFCQ_WOULDBLOCK) {
return CDS_WFCQ_WOULDBLOCK;
}
if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
/*
* @node is probably the only node in the queue.
* Try to move the tail to &q->head.
* q->head.next is set to NULL here, and stays
* NULL if the cmpxchg succeeds. Should the
* cmpxchg fail due to a concurrent enqueue, the
* q->head.next will be set to the next node.
* The implicit memory barrier before
* uatomic_cmpxchg() orders load node->next
* before loading q->tail.
* The implicit memory barrier before uatomic_cmpxchg
* orders load q->head.next before loading node's
* content.
*/
_cds_wfcq_node_init(&head->node);
if (uatomic_cmpxchg(&tail->p, node, &head->node) == node) {
if (state)
*state |= CDS_WFCQ_STATE_LAST;
return node;
}
next = ___cds_wfcq_node_sync_next(node, blocking);
/*
* In nonblocking mode, if we would need to block to
* get node's next, set the head next node pointer
* (currently NULL) back to its original value.
*/
if (!blocking && next == CDS_WFCQ_WOULDBLOCK) {
head->node.next = node;
return CDS_WFCQ_WOULDBLOCK;
}
}
/*
* Move queue head forward.
*/
head->node.next = next;
/* Load q->head.next before loading node's content */
cmm_smp_read_barrier_depends();
return node;
}
/*
* __cds_wfcq_dequeue_with_state_blocking: dequeue node from queue, with state.
*
* Content written into the node before enqueue is guaranteed to be
* consistent, but no other memory ordering is ensured.
* It is valid to reuse and free a dequeued node immediately.
* Dequeue/splice/iteration mutual exclusion should be ensured by the
* caller.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_dequeue_with_state_blocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail, int *state)
{
return ___cds_wfcq_dequeue_with_state(head, tail, state, 1);
}
/*
* ___cds_wfcq_dequeue_blocking: dequeue node from queue.
*
* Same as __cds_wfcq_dequeue_with_state_blocking, but without saving
* state.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_dequeue_blocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
return ___cds_wfcq_dequeue_with_state_blocking(head, tail, NULL);
}
/*
* __cds_wfcq_dequeue_with_state_nonblocking: dequeue node, with state.
*
* Same as __cds_wfcq_dequeue_blocking, but returns CDS_WFCQ_WOULDBLOCK
* if it needs to block.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_dequeue_with_state_nonblocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail, int *state)
{
return ___cds_wfcq_dequeue_with_state(head, tail, state, 0);
}
/*
* ___cds_wfcq_dequeue_nonblocking: dequeue node from queue.
*
* Same as __cds_wfcq_dequeue_with_state_nonblocking, but without saving
* state.
*/
static inline struct cds_wfcq_node *
___cds_wfcq_dequeue_nonblocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
return ___cds_wfcq_dequeue_with_state_nonblocking(head, tail, NULL);
}
/*
* __cds_wfcq_splice: enqueue all src_q nodes at the end of dest_q.
*
* Dequeue all nodes from src_q.
* dest_q must be already initialized.
* Mutual exclusion for src_q should be ensured by the caller as
* specified in the "Synchronisation table".
* Returns enum cds_wfcq_ret which indicates the state of the src or
* dest queue.
*/
static inline enum cds_wfcq_ret
___cds_wfcq_splice(
struct cds_wfcq_head *dest_q_head,
struct cds_wfcq_tail *dest_q_tail,
struct cds_wfcq_head *src_q_head,
struct cds_wfcq_tail *src_q_tail,
int blocking)
{
struct cds_wfcq_node *head, *tail;
int attempt = 0;
/*
* Initial emptiness check to speed up cases where queue is
* empty: only require loads to check if queue is empty.
*/
if (_cds_wfcq_empty(src_q_head, src_q_tail))
return CDS_WFCQ_RET_SRC_EMPTY;
for (;;) {
/*
* Open-coded _cds_wfcq_empty() by testing result of
* uatomic_xchg, as well as tail pointer vs head node
* address.
*/
head = uatomic_xchg(&src_q_head->node.next, NULL);
if (head)
break; /* non-empty */
if (CMM_LOAD_SHARED(src_q_tail->p) == &src_q_head->node)
return CDS_WFCQ_RET_SRC_EMPTY;
if (___cds_wfcq_busy_wait(&attempt, blocking))
return CDS_WFCQ_RET_WOULDBLOCK;
}
/*
* Memory barrier implied before uatomic_xchg() orders store to
* src_q->head before store to src_q->tail. This is required by
* concurrent enqueue on src_q, which exchanges the tail before
* updating the previous tail's next pointer.
*/
tail = uatomic_xchg(&src_q_tail->p, &src_q_head->node);
/*
* Append the spliced content of src_q into dest_q. Does not
* require mutual exclusion on dest_q (wait-free).
*/
if (___cds_wfcq_append(dest_q_head, dest_q_tail, head, tail))
return CDS_WFCQ_RET_DEST_NON_EMPTY;
else
return CDS_WFCQ_RET_DEST_EMPTY;
}
/*
* __cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q.
*
* Dequeue all nodes from src_q.
* dest_q must be already initialized.
* Mutual exclusion for src_q should be ensured by the caller as
* specified in the "Synchronisation table".
* Returns enum cds_wfcq_ret which indicates the state of the src or
* dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK.
*/
static inline enum cds_wfcq_ret
___cds_wfcq_splice_blocking(
struct cds_wfcq_head *dest_q_head,
struct cds_wfcq_tail *dest_q_tail,
struct cds_wfcq_head *src_q_head,
struct cds_wfcq_tail *src_q_tail)
{
return ___cds_wfcq_splice(dest_q_head, dest_q_tail,
src_q_head, src_q_tail, 1);
}
/*
* __cds_wfcq_splice_nonblocking: enqueue all src_q nodes at the end of dest_q.
*
* Same as __cds_wfcq_splice_blocking, but returns
* CDS_WFCQ_RET_WOULDBLOCK if it needs to block.
*/
static inline enum cds_wfcq_ret
___cds_wfcq_splice_nonblocking(
struct cds_wfcq_head *dest_q_head,
struct cds_wfcq_tail *dest_q_tail,
struct cds_wfcq_head *src_q_head,
struct cds_wfcq_tail *src_q_tail)
{
return ___cds_wfcq_splice(dest_q_head, dest_q_tail,
src_q_head, src_q_tail, 0);
}
/*
* cds_wfcq_dequeue_with_state_blocking: dequeue a node from a wait-free queue.
*
* Content written into the node before enqueue is guaranteed to be
* consistent, but no other memory ordering is ensured.
* Mutual exclusion with cds_wfcq_splice_blocking and dequeue lock is
* ensured.
* It is valid to reuse and free a dequeued node immediately.
*/
static inline struct cds_wfcq_node *
_cds_wfcq_dequeue_with_state_blocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail, int *state)
{
struct cds_wfcq_node *retval;
_cds_wfcq_dequeue_lock(head, tail);
retval = ___cds_wfcq_dequeue_with_state_blocking(head, tail, state);
_cds_wfcq_dequeue_unlock(head, tail);
return retval;
}
/*
* cds_wfcq_dequeue_blocking: dequeue node from queue.
*
* Same as cds_wfcq_dequeue_blocking, but without saving state.
*/
static inline struct cds_wfcq_node *
_cds_wfcq_dequeue_blocking(struct cds_wfcq_head *head,
struct cds_wfcq_tail *tail)
{
return _cds_wfcq_dequeue_with_state_blocking(head, tail, NULL);
}
/*
* cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q.
*
* Dequeue all nodes from src_q.
* dest_q must be already initialized.
* Content written into the node before enqueue is guaranteed to be
* consistent, but no other memory ordering is ensured.
* Mutual exclusion with cds_wfcq_dequeue_blocking and dequeue lock is
* ensured.
* Returns enum cds_wfcq_ret which indicates the state of the src or
* dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK.
*/
static inline enum cds_wfcq_ret
_cds_wfcq_splice_blocking(
struct cds_wfcq_head *dest_q_head,
struct cds_wfcq_tail *dest_q_tail,
struct cds_wfcq_head *src_q_head,
struct cds_wfcq_tail *src_q_tail)
{
enum cds_wfcq_ret ret;
_cds_wfcq_dequeue_lock(src_q_head, src_q_tail);
ret = ___cds_wfcq_splice_blocking(dest_q_head, dest_q_tail,
src_q_head, src_q_tail);
_cds_wfcq_dequeue_unlock(src_q_head, src_q_tail);
return ret;
}
#ifdef __cplusplus
}
#endif
#endif /* _URCU_WFCQUEUE_STATIC_H */
|