DPDK 22.11.0-rc2
rte_rcu_qsbr.h
Go to the documentation of this file.
1/* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (c) 2018-2020 Arm Limited
3 */
4
5#ifndef _RTE_RCU_QSBR_H_
6#define _RTE_RCU_QSBR_H_
7
28#ifdef __cplusplus
29extern "C" {
30#endif
31
32#include <inttypes.h>
33#include <stdbool.h>
34#include <stdio.h>
35#include <stdint.h>
36#include <rte_common.h>
37#include <rte_debug.h>
38#include <rte_atomic.h>
39#include <rte_ring.h>
40
41extern int rte_rcu_log_type;
42
43#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
44#define __RTE_RCU_DP_LOG(level, fmt, args...) \
45 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
46 "%s(): " fmt "\n", __func__, ## args)
47#else
48#define __RTE_RCU_DP_LOG(level, fmt, args...)
49#endif
50
51#if defined(RTE_LIBRTE_RCU_DEBUG)
52#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
53 if (v->qsbr_cnt[thread_id].lock_cnt) \
54 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
55 "%s(): " fmt "\n", __func__, ## args); \
56} while (0)
57#else
58#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
59#endif
60
61/* Registered thread IDs are stored as a bitmap of 64b element array.
62 * Given thread id needs to be converted to index into the array and
63 * the id within the array element.
64 */
65#define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
66#define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
67 RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
68 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
69#define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
70 ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
71#define __RTE_QSBR_THRID_INDEX_SHIFT 6
72#define __RTE_QSBR_THRID_MASK 0x3f
73#define RTE_QSBR_THRID_INVALID 0xffffffff
74
75/* Worker thread counter */
76struct rte_rcu_qsbr_cnt {
77 uint64_t cnt;
83 uint32_t lock_cnt;
86
87#define __RTE_QSBR_CNT_THR_OFFLINE 0
88#define __RTE_QSBR_CNT_INIT 1
89#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
90#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
91
92/* RTE Quiescent State variable structure.
93 * This structure has two elements that vary in size based on the
94 * 'max_threads' parameter.
95 * 1) Quiescent state counter array
96 * 2) Register thread ID array
97 */
98struct rte_rcu_qsbr {
99 uint64_t token __rte_cache_aligned;
101 uint64_t acked_token;
106 uint32_t num_elems __rte_cache_aligned;
108 uint32_t num_threads;
110 uint32_t max_threads;
113 struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
120
134typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
135
136#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
137
146#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
147
152 const char *name;
154 uint32_t flags;
156 uint32_t size;
163 uint32_t esize;
185 void *p;
190 struct rte_rcu_qsbr *v;
192};
193
194/* RTE defer queue structure.
195 * This structure holds the defer queue. The defer queue is used to
196 * hold the deleted entries from the data structure that are not
197 * yet freed.
198 */
199struct rte_rcu_qsbr_dq;
200
212size_t
213rte_rcu_qsbr_get_memsize(uint32_t max_threads);
214
230int
231rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
232
253int
254rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
255
271int
272rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
273
299static __rte_always_inline void
300rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
301{
302 uint64_t t;
303
304 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
305
306 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
307 v->qsbr_cnt[thread_id].lock_cnt);
308
309 /* Copy the current value of token.
310 * The fence at the end of the function will ensure that
311 * the following will not move down after the load of any shared
312 * data structure.
313 */
314 t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
315
316 /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
317 * 'cnt' (64b) is accessed atomically.
318 */
319 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
320 t, __ATOMIC_RELAXED);
321
322 /* The subsequent load of the data structure should not
323 * move above the store. Hence a store-load barrier
324 * is required.
325 * If the load of the data structure moves above the store,
326 * writer might not see that the reader is online, even though
327 * the reader is referencing the shared data structure.
328 */
329 rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
330}
331
352static __rte_always_inline void
353rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
354{
355 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
356
357 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
358 v->qsbr_cnt[thread_id].lock_cnt);
359
360 /* The reader can go offline only after the load of the
361 * data structure is completed. i.e. any load of the
362 * data structure can not move after this store.
363 */
364
365 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
366 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
367}
368
389static __rte_always_inline void
390rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
391 __rte_unused unsigned int thread_id)
392{
393 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
394
395#if defined(RTE_LIBRTE_RCU_DEBUG)
396 /* Increment the lock counter */
397 __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
398 1, __ATOMIC_ACQUIRE);
399#endif
400}
401
422static __rte_always_inline void
423rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
424 __rte_unused unsigned int thread_id)
425{
426 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
427
428#if defined(RTE_LIBRTE_RCU_DEBUG)
429 /* Decrement the lock counter */
430 __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
431 1, __ATOMIC_RELEASE);
432
433 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
434 "Lock counter %u. Nested locks?\n",
435 v->qsbr_cnt[thread_id].lock_cnt);
436#endif
437}
438
452static __rte_always_inline uint64_t
453rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
454{
455 uint64_t t;
456
457 RTE_ASSERT(v != NULL);
458
459 /* Release the changes to the shared data structure.
460 * This store release will ensure that changes to any data
461 * structure are visible to the workers before the token
462 * update is visible.
463 */
464 t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
465
466 return t;
467}
468
481static __rte_always_inline void
482rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
483{
484 uint64_t t;
485
486 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
487
488 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
489 v->qsbr_cnt[thread_id].lock_cnt);
490
491 /* Acquire the changes to the shared data structure released
492 * by rte_rcu_qsbr_start.
493 * Later loads of the shared data structure should not move
494 * above this load. Hence, use load-acquire.
495 */
496 t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
497
498 /* Check if there are updates available from the writer.
499 * Inform the writer that updates are visible to this reader.
500 * Prior loads of the shared data structure should not move
501 * beyond this store. Hence use store-release.
502 */
503 if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
504 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
505 t, __ATOMIC_RELEASE);
506
507 __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
508 __func__, t, thread_id);
509}
510
511/* Check the quiescent state counter for registered threads only, assuming
512 * that not all threads have registered.
513 */
514static __rte_always_inline int
515__rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
516{
517 uint32_t i, j, id;
518 uint64_t bmap;
519 uint64_t c;
520 uint64_t *reg_thread_id;
521 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
522
523 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
524 i < v->num_elems;
525 i++, reg_thread_id++) {
526 /* Load the current registered thread bit map before
527 * loading the reader thread quiescent state counters.
528 */
529 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
530 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
531
532 while (bmap) {
533 j = __builtin_ctzl(bmap);
534 __RTE_RCU_DP_LOG(DEBUG,
535 "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
536 __func__, t, wait, bmap, id + j);
537 c = __atomic_load_n(
538 &v->qsbr_cnt[id + j].cnt,
539 __ATOMIC_ACQUIRE);
540 __RTE_RCU_DP_LOG(DEBUG,
541 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
542 __func__, t, wait, c, id+j);
543
544 /* Counter is not checked for wrap-around condition
545 * as it is a 64b counter.
546 */
547 if (unlikely(c !=
548 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
549 /* This thread is not in quiescent state */
550 if (!wait)
551 return 0;
552
553 rte_pause();
554 /* This thread might have unregistered.
555 * Re-read the bitmap.
556 */
557 bmap = __atomic_load_n(reg_thread_id,
558 __ATOMIC_ACQUIRE);
559
560 continue;
561 }
562
563 /* This thread is in quiescent state. Use the counter
564 * to find the least acknowledged token among all the
565 * readers.
566 */
567 if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
568 acked_token = c;
569
570 bmap &= ~(1UL << j);
571 }
572 }
573
574 /* All readers are checked, update least acknowledged token.
575 * There might be multiple writers trying to update this. There is
576 * no need to update this very accurately using compare-and-swap.
577 */
578 if (acked_token != __RTE_QSBR_CNT_MAX)
579 __atomic_store_n(&v->acked_token, acked_token,
580 __ATOMIC_RELAXED);
581
582 return 1;
583}
584
585/* Check the quiescent state counter for all threads, assuming that
586 * all the threads have registered.
587 */
588static __rte_always_inline int
589__rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
590{
591 uint32_t i;
592 struct rte_rcu_qsbr_cnt *cnt;
593 uint64_t c;
594 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
595
596 for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
597 __RTE_RCU_DP_LOG(DEBUG,
598 "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
599 __func__, t, wait, i);
600 while (1) {
601 c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
602 __RTE_RCU_DP_LOG(DEBUG,
603 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
604 __func__, t, wait, c, i);
605
606 /* Counter is not checked for wrap-around condition
607 * as it is a 64b counter.
608 */
609 if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
610 break;
611
612 /* This thread is not in quiescent state */
613 if (!wait)
614 return 0;
615
616 rte_pause();
617 }
618
619 /* This thread is in quiescent state. Use the counter to find
620 * the least acknowledged token among all the readers.
621 */
622 if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
623 acked_token = c;
624 }
625
626 /* All readers are checked, update least acknowledged token.
627 * There might be multiple writers trying to update this. There is
628 * no need to update this very accurately using compare-and-swap.
629 */
630 if (acked_token != __RTE_QSBR_CNT_MAX)
631 __atomic_store_n(&v->acked_token, acked_token,
632 __ATOMIC_RELAXED);
633
634 return 1;
635}
636
668static __rte_always_inline int
669rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
670{
671 RTE_ASSERT(v != NULL);
672
673 /* Check if all the readers have already acknowledged this token */
674 if (likely(t <= v->acked_token)) {
675 __RTE_RCU_DP_LOG(DEBUG,
676 "%s: check: token = %" PRIu64 ", wait = %d",
677 __func__, t, wait);
678 __RTE_RCU_DP_LOG(DEBUG,
679 "%s: status: least acked token = %" PRIu64,
680 __func__, v->acked_token);
681 return 1;
682 }
683
684 if (likely(v->num_threads == v->max_threads))
685 return __rte_rcu_qsbr_check_all(v, t, wait);
686 else
687 return __rte_rcu_qsbr_check_selective(v, t, wait);
688}
689
708void
709rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
710
726int
727rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
728
745__rte_experimental
746struct rte_rcu_qsbr_dq *
748
780__rte_experimental
781int
782rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
783
809__rte_experimental
810int
811rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
812 unsigned int *freed, unsigned int *pending, unsigned int *available);
813
835__rte_experimental
836int
837rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
838
839#ifdef __cplusplus
840}
841#endif
842
843#endif /* _RTE_RCU_QSBR_H_ */
static void rte_atomic_thread_fence(int memorder)
#define likely(x)
#define unlikely(x)
#define __rte_cache_aligned
Definition: rte_common.h:440
#define __rte_unused
Definition: rte_common.h:120
#define __rte_always_inline
Definition: rte_common.h:255
static void rte_pause(void)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:453
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:423
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:300
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:390
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:353
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:134
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:669
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:482
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:190
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:183