PMDK C++ bindings  1.13.0-git107.g7e59f08f
This is the C++ bindings documentation for PMDK's libpmemobj.
ringbuf.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Mindaugas Rasiukevicius <rmind at noxt eu>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 // SPDX-License-Identifier: BSD-3-Clause
28 /* Copyright 2021, Intel Corporation */
29 
36 #ifndef RINGBUF_HPP
37 #define RINGBUF_HPP
38 
39 #include <cstddef>
40 
41 #include <errno.h>
42 #include <inttypes.h>
43 #include <limits.h>
44 #include <stdbool.h>
45 #include <stddef.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <string.h>
49 
50 #include <algorithm>
51 #include <atomic>
52 #include <cassert>
53 #include <memory>
54 
56 
57 #ifdef _WIN32
58 #define __predict_false(x) (x)
59 #else
60 #define __predict_false(x) __builtin_expect((x) != 0, 0)
61 #endif /* _WIN32 */
62 
63 namespace pmem
64 {
65 
66 namespace obj
67 {
68 
69 namespace experimental
70 {
71 
72 namespace ringbuf
73 {
74 static constexpr size_t RBUF_OFF_MASK = 0x00000000ffffffffUL;
75 static constexpr size_t WRAP_LOCK_BIT = 0x8000000000000000UL;
76 static constexpr size_t RBUF_OFF_MAX = UINT64_MAX & ~WRAP_LOCK_BIT;
77 
78 static constexpr size_t WRAP_COUNTER = 0x7fffffff00000000UL;
79 static size_t
80 WRAP_INCR(size_t x)
81 {
82  return ((x + 0x100000000UL) & WRAP_COUNTER);
83 }
84 
85 typedef uint64_t ringbuf_off_t;
86 
87 struct ringbuf_worker_t {
88  std::atomic<ringbuf_off_t> seen_off{0};
89  std::atomic<int> registered{0};
90 };
91 
92 struct ringbuf_t {
93  /* Ring buffer space. */
94  size_t space;
95 
96  /*
97  * The NEXT hand is atomically updated by the producer.
98  * WRAP_LOCK_BIT is set in case of wrap-around; in such case,
99  * the producer can update the 'end' offset.
100  */
101  std::atomic<ringbuf_off_t> next{0};
102  std::atomic<ringbuf_off_t> end{0};
103 
104  /* The following are updated by the consumer. */
105  std::atomic<ringbuf_off_t> written{0};
106  unsigned nworkers;
107  std::unique_ptr<ringbuf_worker_t[]> workers;
108 
109  /* Set by ringbuf_consume, reset by ringbuf_release. */
110  bool consume_in_progress;
111 
117  ringbuf_t(size_t max_workers, size_t length)
118  : workers(new ringbuf_worker_t[max_workers])
119  {
120  if (length >= RBUF_OFF_MASK)
121  throw std::out_of_range("ringbuf length too big");
122 
123  space = length;
124  end = RBUF_OFF_MAX;
125  nworkers = max_workers;
126  consume_in_progress = false;
127 
128  /* Helgrind/Drd does not understand std::atomic */
129 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
130  VALGRIND_HG_DISABLE_CHECKING(&next, sizeof(next));
131  VALGRIND_HG_DISABLE_CHECKING(&end, sizeof(end));
132  VALGRIND_HG_DISABLE_CHECKING(&written, sizeof(written));
133 
134  for (size_t i = 0; i < max_workers; i++) {
135  VALGRIND_HG_DISABLE_CHECKING(
136  &workers[i].seen_off,
137  sizeof(workers[i].seen_off));
138  VALGRIND_HG_DISABLE_CHECKING(
139  &workers[i].registered,
140  sizeof(workers[i].registered));
141  }
142 #endif
143  }
144 };
145 
146 /*
147  * ringbuf_register: register the worker (thread/process) as a producer
148  * and pass the pointer to its local store.
149  */
150 inline ringbuf_worker_t *
151 ringbuf_register(ringbuf_t *rbuf, unsigned i)
152 {
153  ringbuf_worker_t *w = &rbuf->workers[i];
154 
155  w->seen_off = RBUF_OFF_MAX;
156  std::atomic_store_explicit<int>(&w->registered, true,
157  std::memory_order_release);
158  return w;
159 }
160 
161 inline void
162 ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
163 {
164  w->registered = false;
165  (void)rbuf;
166 }
167 
168 /*
169  * stable_nextoff: capture and return a stable value of the 'next' offset.
170  */
171 static inline ringbuf_off_t
172 stable_nextoff(ringbuf_t *rbuf)
173 {
174  ringbuf_off_t next;
175  for (pmem::detail::atomic_backoff backoff;;) {
176  next = std::atomic_load_explicit<ringbuf_off_t>(
177  &rbuf->next, std::memory_order_acquire);
178  if (next & WRAP_LOCK_BIT) {
179  backoff.pause();
180  } else {
181  break;
182  }
183  }
184  assert((next & RBUF_OFF_MASK) < rbuf->space);
185  return next;
186 }
187 
188 /*
189  * stable_seenoff: capture and return a stable value of the 'seen' offset.
190  */
191 static inline ringbuf_off_t
192 stable_seenoff(ringbuf_worker_t *w)
193 {
194  ringbuf_off_t seen_off;
195  for (pmem::detail::atomic_backoff backoff;;) {
196  seen_off = std::atomic_load_explicit<ringbuf_off_t>(
197  &w->seen_off, std::memory_order_acquire);
198  if (seen_off & WRAP_LOCK_BIT) {
199  backoff.pause();
200  } else {
201  break;
202  }
203  }
204  return seen_off;
205 }
206 
207 /*
208  * ringbuf_acquire: request a space of a given length in the ring buffer.
209  *
210  * => On success: returns the offset at which the space is available.
211  * => On failure: returns -1.
212  */
213 inline ptrdiff_t
214 ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
215 {
216  ringbuf_off_t seen, next, target;
217 
218  assert(len > 0 && len <= rbuf->space);
219  assert(w->seen_off == RBUF_OFF_MAX);
220 
221  do {
222  ringbuf_off_t written;
223 
224  /*
225  * Get the stable 'next' offset. Save the observed 'next'
226  * value (i.e. the 'seen' offset), but mark the value as
227  * unstable (set WRAP_LOCK_BIT).
228  *
229  * Note: CAS will issue a memory_order_release for us and
230  * thus ensures that it reaches global visibility together
231  * with new 'next'.
232  */
233  seen = stable_nextoff(rbuf);
234  next = seen & RBUF_OFF_MASK;
235  assert(next < rbuf->space);
236  std::atomic_store_explicit<ringbuf_off_t>(
237  &w->seen_off, next | WRAP_LOCK_BIT,
238  std::memory_order_relaxed);
239 
240  /*
241  * Compute the target offset. Key invariant: we cannot
242  * go beyond the WRITTEN offset or catch up with it.
243  */
244  target = next + len;
245  written = rbuf->written;
246  if (__predict_false(next < written && target >= written)) {
247  /* The producer must wait. */
248  std::atomic_store_explicit<ringbuf_off_t>(
249  &w->seen_off, RBUF_OFF_MAX,
250  std::memory_order_release);
251  return -1;
252  }
253 
254  if (__predict_false(target >= rbuf->space)) {
255  const bool exceed = target > rbuf->space;
256 
257  /*
258  * Wrap-around and start from the beginning.
259  *
260  * If we would exceed the buffer, then attempt to
261  * acquire the WRAP_LOCK_BIT and use the space in
262  * the beginning. If we used all space exactly to
263  * the end, then reset to 0.
264  *
265  * Check the invariant again.
266  */
267  target = exceed ? (WRAP_LOCK_BIT | len) : 0;
268  if ((target & RBUF_OFF_MASK) >= written) {
269  std::atomic_store_explicit<ringbuf_off_t>(
270  &w->seen_off, RBUF_OFF_MAX,
271  std::memory_order_release);
272  return -1;
273  }
274  /* Increment the wrap-around counter. */
275  target |= WRAP_INCR(seen & WRAP_COUNTER);
276  } else {
277  /* Preserve the wrap-around counter. */
278  target |= seen & WRAP_COUNTER;
279  }
280  } while (!std::atomic_compare_exchange_weak<ringbuf_off_t>(
281  &rbuf->next, &seen, target));
282 
283  /*
284  * Acquired the range. Clear WRAP_LOCK_BIT in the 'seen' value
285  * thus indicating that it is stable now.
286  *
287  * No need for memory_order_release, since CAS issued a fence.
288  */
289  std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off,
290  w->seen_off & ~WRAP_LOCK_BIT,
291  std::memory_order_relaxed);
292 
293  /*
294  * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed
295  * the remaining space and need to wrap-around), then save the
296  * 'end' offset and release the lock.
297  */
298  if (__predict_false(target & WRAP_LOCK_BIT)) {
299  /* Cannot wrap-around again if consumer did not catch-up. */
300  assert(rbuf->written <= next);
301  assert(rbuf->end == RBUF_OFF_MAX);
302  rbuf->end = next;
303  next = 0;
304 
305  /*
306  * Unlock: ensure the 'end' offset reaches global
307  * visibility before the lock is released.
308  */
309  std::atomic_store_explicit<ringbuf_off_t>(
310  &rbuf->next, (target & ~WRAP_LOCK_BIT),
311  std::memory_order_release);
312  }
313  assert((target & RBUF_OFF_MASK) <= rbuf->space);
314  return (ptrdiff_t)next;
315 }
316 
317 /*
318  * ringbuf_produce: indicate the acquired range in the buffer is produced
319  * and is ready to be consumed.
320  */
321 inline void
322 ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
323 {
324  (void)rbuf;
325  assert(w->registered);
326  assert(w->seen_off != RBUF_OFF_MAX);
327  std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off, RBUF_OFF_MAX,
328  std::memory_order_release);
329 }
330 
331 /*
332  * ringbuf_consume: get a contiguous range which is ready to be consumed.
333  *
334  * Nested consumes are not allowed.
335  */
336 inline size_t
337 ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
338 {
339  assert(!rbuf->consume_in_progress);
340 
341  ringbuf_off_t written = rbuf->written, next, ready;
342  size_t towrite;
343 retry:
344  /*
345  * Get the stable 'next' offset. Note: stable_nextoff() issued
346  * a load memory barrier. The area between the 'written' offset
347  * and the 'next' offset will be the *preliminary* target buffer
348  * area to be consumed.
349  */
350  next = stable_nextoff(rbuf) & RBUF_OFF_MASK;
351  if (written == next) {
352  /* If producers did not advance, then nothing to do. */
353  return 0;
354  }
355 
356  /*
357  * Observe the 'ready' offset of each producer.
358  *
359  * At this point, some producer might have already triggered the
360  * wrap-around and some (or all) seen 'ready' values might be in
361  * the range between 0 and 'written'. We have to skip them.
362  */
363  ready = RBUF_OFF_MAX;
364 
365  for (unsigned i = 0; i < rbuf->nworkers; i++) {
366  ringbuf_worker_t *w = &rbuf->workers[i];
367  ringbuf_off_t seen_off;
368 
369  /*
370  * Skip if the worker has not registered.
371  *
372  * Get a stable 'seen' value. This is necessary since we
373  * want to discard the stale 'seen' values.
374  */
375  if (!std::atomic_load_explicit<int>(&w->registered,
376  std::memory_order_relaxed))
377  continue;
378  seen_off = stable_seenoff(w);
379 
380  /*
381  * Ignore the offsets after the possible wrap-around.
382  * We are interested in the smallest seen offset that is
383  * not behind the 'written' offset.
384  */
385  if (seen_off >= written) {
386  ready = std::min<ringbuf_off_t>(seen_off, ready);
387  }
388  assert(ready >= written);
389  }
390 
391  /*
392  * Finally, we need to determine whether wrap-around occurred
393  * and deduct the safe 'ready' offset.
394  */
395  if (next < written) {
396  const ringbuf_off_t end =
397  std::min<ringbuf_off_t>(rbuf->space, rbuf->end);
398 
399  /*
400  * Wrap-around case. Check for the cut off first.
401  *
402  * Reset the 'written' offset if it reached the end of
403  * the buffer or the 'end' offset (if set by a producer).
404  * However, we must check that the producer is actually
405  * done (the observed 'ready' offsets are clear).
406  */
407  if (ready == RBUF_OFF_MAX && written == end) {
408  /*
409  * Clear the 'end' offset if was set.
410  */
411  if (rbuf->end != RBUF_OFF_MAX) {
412  rbuf->end = RBUF_OFF_MAX;
413  }
414 
415  /*
416  * Wrap-around the consumer and start from zero.
417  */
418  written = 0;
419  std::atomic_store_explicit<ringbuf_off_t>(
420  &rbuf->written, written,
421  std::memory_order_release);
422  goto retry;
423  }
424 
425  /*
426  * We cannot wrap-around yet; there is data to consume at
427  * the end. The ready range is smallest of the observed
428  * 'ready' or the 'end' offset. If neither is set, then
429  * the actual end of the buffer.
430  */
431  assert(ready > next);
432  ready = std::min<ringbuf_off_t>(ready, end);
433  assert(ready >= written);
434  } else {
435  /*
436  * Regular case. Up to the observed 'ready' (if set)
437  * or the 'next' offset.
438  */
439  ready = std::min<ringbuf_off_t>(ready, next);
440  }
441  towrite = ready - written;
442  *offset = written;
443 
444  assert(ready >= written);
445  assert(towrite <= rbuf->space);
446 
447  if (towrite)
448  rbuf->consume_in_progress = true;
449 
450  return towrite;
451 }
452 
453 /*
454  * ringbuf_release: indicate that the consumed range can now be released.
455  */
456 inline void
457 ringbuf_release(ringbuf_t *rbuf, size_t nbytes)
458 {
459  rbuf->consume_in_progress = false;
460 
461  const size_t nwritten = rbuf->written + nbytes;
462 
463  assert(rbuf->written <= rbuf->space);
464  assert(rbuf->written <= rbuf->end);
465  assert(nwritten <= rbuf->space);
466 
467  rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten;
468 }
469 
470 } /* namespace ringbuf */
471 } /* namespace experimental */
472 } /* namespace obj*/
473 } /* namespace pmem*/
474 
475 #endif /* RINGBUF_HPP */
Atomic backoff, for time delay.
Persistent memory namespace.
Definition: allocation_flag.hpp:15