58 #define __predict_false(x) (x)
60 #define __predict_false(x) __builtin_expect((x) != 0, 0)
69 namespace experimental
74 static constexpr
size_t RBUF_OFF_MASK = 0x00000000ffffffffUL;
75 static constexpr
size_t WRAP_LOCK_BIT = 0x8000000000000000UL;
76 static constexpr
size_t RBUF_OFF_MAX = UINT64_MAX & ~WRAP_LOCK_BIT;
78 static constexpr
size_t WRAP_COUNTER = 0x7fffffff00000000UL;
82 return ((x + 0x100000000UL) & WRAP_COUNTER);
85 typedef uint64_t ringbuf_off_t;
87 struct ringbuf_worker_t {
88 std::atomic<ringbuf_off_t> seen_off{0};
89 std::atomic<int> registered{0};
101 std::atomic<ringbuf_off_t> next{0};
102 std::atomic<ringbuf_off_t> end{0};
105 std::atomic<ringbuf_off_t> written{0};
107 std::unique_ptr<ringbuf_worker_t[]> workers;
110 bool consume_in_progress;
117 ringbuf_t(
size_t max_workers,
size_t length)
118 : workers(new ringbuf_worker_t[max_workers])
120 if (length >= RBUF_OFF_MASK)
121 throw std::out_of_range(
"ringbuf length too big");
125 nworkers = max_workers;
126 consume_in_progress =
false;
129 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
130 VALGRIND_HG_DISABLE_CHECKING(&next,
sizeof(next));
131 VALGRIND_HG_DISABLE_CHECKING(&end,
sizeof(end));
132 VALGRIND_HG_DISABLE_CHECKING(&written,
sizeof(written));
134 for (
size_t i = 0; i < max_workers; i++) {
135 VALGRIND_HG_DISABLE_CHECKING(
136 &workers[i].seen_off,
137 sizeof(workers[i].seen_off));
138 VALGRIND_HG_DISABLE_CHECKING(
139 &workers[i].registered,
140 sizeof(workers[i].registered));
150 inline ringbuf_worker_t *
151 ringbuf_register(ringbuf_t *rbuf,
unsigned i)
153 ringbuf_worker_t *w = &rbuf->workers[i];
155 w->seen_off = RBUF_OFF_MAX;
156 std::atomic_store_explicit<int>(&w->registered,
true,
157 std::memory_order_release);
162 ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
164 w->registered =
false;
171 static inline ringbuf_off_t
172 stable_nextoff(ringbuf_t *rbuf)
175 for (pmem::detail::atomic_backoff backoff;;) {
176 next = std::atomic_load_explicit<ringbuf_off_t>(
177 &rbuf->next, std::memory_order_acquire);
178 if (next & WRAP_LOCK_BIT) {
184 assert((next & RBUF_OFF_MASK) < rbuf->space);
191 static inline ringbuf_off_t
192 stable_seenoff(ringbuf_worker_t *w)
194 ringbuf_off_t seen_off;
195 for (pmem::detail::atomic_backoff backoff;;) {
196 seen_off = std::atomic_load_explicit<ringbuf_off_t>(
197 &w->seen_off, std::memory_order_acquire);
198 if (seen_off & WRAP_LOCK_BIT) {
214 ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w,
size_t len)
216 ringbuf_off_t seen, next, target;
218 assert(len > 0 && len <= rbuf->space);
219 assert(w->seen_off == RBUF_OFF_MAX);
222 ringbuf_off_t written;
233 seen = stable_nextoff(rbuf);
234 next = seen & RBUF_OFF_MASK;
235 assert(next < rbuf->space);
236 std::atomic_store_explicit<ringbuf_off_t>(
237 &w->seen_off, next | WRAP_LOCK_BIT,
238 std::memory_order_relaxed);
245 written = rbuf->written;
246 if (__predict_false(next < written && target >= written)) {
248 std::atomic_store_explicit<ringbuf_off_t>(
249 &w->seen_off, RBUF_OFF_MAX,
250 std::memory_order_release);
254 if (__predict_false(target >= rbuf->space)) {
255 const bool exceed = target > rbuf->space;
267 target = exceed ? (WRAP_LOCK_BIT | len) : 0;
268 if ((target & RBUF_OFF_MASK) >= written) {
269 std::atomic_store_explicit<ringbuf_off_t>(
270 &w->seen_off, RBUF_OFF_MAX,
271 std::memory_order_release);
275 target |= WRAP_INCR(seen & WRAP_COUNTER);
278 target |= seen & WRAP_COUNTER;
280 }
while (!std::atomic_compare_exchange_weak<ringbuf_off_t>(
281 &rbuf->next, &seen, target));
289 std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off,
290 w->seen_off & ~WRAP_LOCK_BIT,
291 std::memory_order_relaxed);
298 if (__predict_false(target & WRAP_LOCK_BIT)) {
300 assert(rbuf->written <= next);
301 assert(rbuf->end == RBUF_OFF_MAX);
309 std::atomic_store_explicit<ringbuf_off_t>(
310 &rbuf->next, (target & ~WRAP_LOCK_BIT),
311 std::memory_order_release);
313 assert((target & RBUF_OFF_MASK) <= rbuf->space);
314 return (ptrdiff_t)next;
322 ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
325 assert(w->registered);
326 assert(w->seen_off != RBUF_OFF_MAX);
327 std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off, RBUF_OFF_MAX,
328 std::memory_order_release);
337 ringbuf_consume(ringbuf_t *rbuf,
size_t *offset)
339 assert(!rbuf->consume_in_progress);
341 ringbuf_off_t written = rbuf->written, next, ready;
350 next = stable_nextoff(rbuf) & RBUF_OFF_MASK;
351 if (written == next) {
363 ready = RBUF_OFF_MAX;
365 for (
unsigned i = 0; i < rbuf->nworkers; i++) {
366 ringbuf_worker_t *w = &rbuf->workers[i];
367 ringbuf_off_t seen_off;
375 if (!std::atomic_load_explicit<int>(&w->registered,
376 std::memory_order_relaxed))
378 seen_off = stable_seenoff(w);
385 if (seen_off >= written) {
386 ready = std::min<ringbuf_off_t>(seen_off, ready);
388 assert(ready >= written);
395 if (next < written) {
396 const ringbuf_off_t end =
397 std::min<ringbuf_off_t>(rbuf->space, rbuf->end);
407 if (ready == RBUF_OFF_MAX && written == end) {
411 if (rbuf->end != RBUF_OFF_MAX) {
412 rbuf->end = RBUF_OFF_MAX;
419 std::atomic_store_explicit<ringbuf_off_t>(
420 &rbuf->written, written,
421 std::memory_order_release);
431 assert(ready > next);
432 ready = std::min<ringbuf_off_t>(ready, end);
433 assert(ready >= written);
439 ready = std::min<ringbuf_off_t>(ready, next);
441 towrite = ready - written;
444 assert(ready >= written);
445 assert(towrite <= rbuf->space);
448 rbuf->consume_in_progress =
true;
457 ringbuf_release(ringbuf_t *rbuf,
size_t nbytes)
459 rbuf->consume_in_progress =
false;
461 const size_t nwritten = rbuf->written + nbytes;
463 assert(rbuf->written <= rbuf->space);
464 assert(rbuf->written <= rbuf->end);
465 assert(nwritten <= rbuf->space);
467 rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten;
Atomic backoff, for time delay.
Persistent memory namespace.
Definition: allocation_flag.hpp:15