PMDK C++ bindings  1.13.0-git107.g7e59f08f
This is the C++ bindings documentation for PMDK's libpmemobj.
mpsc_queue.hpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 /* Copyright 2021, Intel Corporation */
3 
9 #ifndef LIBPMEMOBJ_MPSC_QUEUE_HPP
10 #define LIBPMEMOBJ_MPSC_QUEUE_HPP
11 
19 
20 #include <atomic>
21 #include <cstddef>
22 #include <cstring>
23 #include <iterator>
24 #include <memory>
25 
26 namespace pmem
27 {
28 
29 namespace obj
30 {
31 
32 namespace experimental
33 {
34 
52 class mpsc_queue {
53 public:
54  class worker;
55  class pmem_log_type;
56  class batch_type;
57 
58  mpsc_queue(pmem_log_type &pmem, size_t max_workers = 1);
59 
61 
62  template <typename Function>
63  bool try_consume_batch(Function &&f);
64 
65 private:
66  struct first_block {
67  static constexpr size_t CAPACITY =
68  pmem::detail::CACHELINE_SIZE - sizeof(size_t);
69  static constexpr size_t DIRTY_FLAG =
70  (1ULL << (sizeof(size_t) * 8 - 1));
71 
73  char data[CAPACITY];
74  };
75 
76  struct iterator {
77  iterator(char *data, char *end);
78 
79  iterator &operator++();
80 
81  bool operator==(const iterator &rhs) const;
82  bool operator!=(const iterator &rhs) const;
83 
84  pmem::obj::string_view operator*() const;
85 
86  private:
87  first_block *seek_next(first_block *);
88 
89  char *data;
90  char *end;
91  };
92 
93  void clear_cachelines(first_block *block, size_t size);
94  void restore_offsets();
95 
96  size_t consume_cachelines(size_t *offset);
97  void release_cachelines(size_t len);
98 
99  inline pmem::detail::id_manager &get_id_manager();
100 
101  /* ringbuf_t handle. Important: mpsc_queue operates on cachelines hence
102  * ringbuf_produce/release functions are called with number of
103  * cachelines, not bytes. */
104  std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
105  char *buf;
107  size_t buf_size;
109 
110  /* Stores offset and length of next message to be consumed. Only
111  * valid if ring_buffer->consume_in_progress. */
112  size_t consume_offset = 0;
113  size_t consume_len = 0;
114 
115 public:
120  class batch_type {
121  public:
122  batch_type(iterator begin, iterator end);
123 
124  iterator begin() const;
125  iterator end() const;
126 
127  private:
128  iterator begin_;
129  iterator end_;
130  };
131 
143  class worker {
144  public:
145  worker(mpsc_queue *q);
146  ~worker();
147 
148  worker(const worker &) = delete;
149  worker &operator=(const worker &) = delete;
150 
151  worker(worker &&other);
152  worker &operator=(worker &&other);
153 
154  template <typename Function = void (*)(pmem::obj::string_view)>
155  bool try_produce(
157  Function &&on_produce =
158  [](pmem::obj::string_view target) {});
159 
160  private:
161  mpsc_queue *queue;
162  ringbuf::ringbuf_worker_t *w;
163  size_t id;
164 
165  ptrdiff_t acquire_cachelines(size_t len);
166  void produce_cachelines();
167  void store_to_log(pmem::obj::string_view data, char *log_data);
168 
169  friend class mpsc_queue;
170  };
171 
183  public:
184  pmem_log_type(size_t size);
185 
187 
188  private:
190  pmem::obj::p<size_t> written;
191 
192  friend class mpsc_queue;
193  };
194 };
195 
204 {
206 
207  auto buf_data = pmem.data();
208 
209  buf = const_cast<char *>(buf_data.data());
210  buf_size = buf_data.size();
211 
212  assert(buf_size % pmem::detail::CACHELINE_SIZE == 0);
213 
214  ring_buffer =
215  std::unique_ptr<ringbuf::ringbuf_t>(new ringbuf::ringbuf_t(
216  max_workers, buf_size / pmem::detail::CACHELINE_SIZE));
217 
218  this->pmem = &pmem;
219 
220  restore_offsets();
221 }
222 
223 ptrdiff_t
224 mpsc_queue::worker::acquire_cachelines(size_t len)
225 {
226  assert(len % pmem::detail::CACHELINE_SIZE == 0);
227  auto ret = ringbuf_acquire(queue->ring_buffer.get(), w,
228  len / pmem::detail::CACHELINE_SIZE);
229 
230  if (ret < 0)
231  return ret;
232 
233  return ret * static_cast<ptrdiff_t>(pmem::detail::CACHELINE_SIZE);
234 }
235 
236 void
237 mpsc_queue::worker::produce_cachelines()
238 {
239  ringbuf_produce(queue->ring_buffer.get(), w);
240 }
241 
242 size_t
243 mpsc_queue::consume_cachelines(size_t *offset)
244 {
245  auto ret = ringbuf_consume(ring_buffer.get(), offset);
246  if (ret) {
247  *offset *= pmem::detail::CACHELINE_SIZE;
248  return ret * pmem::detail::CACHELINE_SIZE;
249  }
250 
251  return 0;
252 }
253 
254 void
255 mpsc_queue::release_cachelines(size_t len)
256 {
257  assert(len % pmem::detail::CACHELINE_SIZE == 0);
258  ringbuf_release(ring_buffer.get(), len / pmem::detail::CACHELINE_SIZE);
259 }
260 
261 void
262 mpsc_queue::restore_offsets()
263 {
264  /* Invariant */
265  assert(pmem->written < buf_size);
266 
267  /* XXX: implement restore_offset function in ringbuf */
268 
269  auto w = register_worker();
270 
271  if (!pmem->written) {
272  /* If pmem->written == 0 it means that consumer should start
273  * reading from the beginning. There might be elements produced
274  * anywhere in the log. Since we want to prohibit any producers
275  * from overwriting the original content - mark the entire log
276  * as produced. */
277 
278  auto acq = w.acquire_cachelines(buf_size -
279  pmem::detail::CACHELINE_SIZE);
280  assert(acq == 0);
281  (void)acq;
282 
283  w.produce_cachelines();
284 
285  return;
286  }
287 
288  /* If pmem->written != 0 there still might be element in the log.
289  * Moreover, to guarantee proper order of elements on recovery, we must
290  * restore consumer offset. (If we would start consuming from the
291  * beginning of the log, we could consume newer elements first.) Offsets
292  * are restored by following operations:
293  *
294  * produce(pmem->written);
295  * consume();
296  * produce(size - pmem->written);
297  * produce(pmem->written - CACHELINE_SIZE);
298  *
299  * This results in producer offset equal to pmem->written -
300  * CACHELINE_SIZE and consumer offset equal to pmem->written.
301  */
302 
303  auto acq = w.acquire_cachelines(pmem->written);
304  assert(acq == 0);
305  w.produce_cachelines();
306 
307  /* Restore consumer offset */
308  size_t offset;
309  auto len = consume_cachelines(&offset);
310  assert(len == pmem->written);
311  release_cachelines(len);
312 
313  assert(offset == 0);
314  assert(len == pmem->written);
315 
316  acq = w.acquire_cachelines(buf_size - pmem->written);
317  assert(acq >= 0);
318  assert(static_cast<size_t>(acq) == pmem->written);
319  w.produce_cachelines();
320 
321  acq = w.acquire_cachelines(pmem->written -
322  pmem::detail::CACHELINE_SIZE);
323  assert(acq == 0);
324  (void)acq;
325  w.produce_cachelines();
326 }
327 
334  : data_(size, 0), written(0)
335 {
336 }
337 
346 {
347  auto addr = reinterpret_cast<uintptr_t>(&data_[0]);
348  auto aligned_addr =
349  pmem::detail::align_up(addr, pmem::detail::CACHELINE_SIZE);
350 
351  auto size = data_.size() - (aligned_addr - addr);
352  auto aligned_size =
353  pmem::detail::align_down(size, pmem::detail::CACHELINE_SIZE);
354 
355  return pmem::obj::string_view(
356  reinterpret_cast<const char *>(aligned_addr), aligned_size);
357 }
358 
360 mpsc_queue::get_id_manager()
361 {
362  static pmem::detail::id_manager manager;
363  return manager;
364 }
365 
374 inline mpsc_queue::worker
376 {
377  return worker(this);
378 }
379 
399 template <typename Function>
400 inline bool
402 {
403  if (pmemobj_tx_stage() != TX_STAGE_NONE)
405  "Function called inside a transaction scope.");
406 
407  bool consumed = false;
408 
409  /* Need to call try_consume twice, as some data may be at the end
410  * of buffer, and some may be at the beginning. Ringbuffer does not
411  * merge those two parts into one try_consume. If all data was
412  * consumed during first try_consume, second will do nothing. */
413  for (int i = 0; i < 2; i++) {
414  /* If there is no consume in progress, it's safe to call
415  * ringbuf_consume. */
416  if (!ring_buffer->consume_in_progress) {
417  size_t offset;
418  auto len = consume_cachelines(&offset);
419  if (!len)
420  return consumed;
421 
422  consume_offset = offset;
423  consume_len = len;
424  } else {
425  assert(consume_len != 0);
426  }
427 
428 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
429  ANNOTATE_HAPPENS_AFTER(ring_buffer.get());
430 #endif
431 
432  auto data = buf + consume_offset;
433  auto begin = iterator(data, data + consume_len);
434  auto end = iterator(data + consume_len, data + consume_len);
435 
437  if (begin != end) {
438  consumed = true;
439  f(batch_type(begin, end));
440  }
441 
442  auto b = reinterpret_cast<first_block *>(data);
443  clear_cachelines(b, consume_len);
444 
445  if (consume_offset + consume_len < buf_size)
446  pmem->written = consume_offset + consume_len;
447  else if (consume_offset + consume_len == buf_size)
448  pmem->written = 0;
449  else
450  assert(false);
451  });
452 
453 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
454  ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
455 #endif
456 
457  release_cachelines(consume_len);
458 
459  assert(!ring_buffer->consume_in_progress);
460 
461  /* XXX: it would be better to call f once - hide
462  * wraparound behind iterators */
463  /* XXX: add param to ringbuf_consume and do not
464  * call store_explicit in consume */
465  }
466 
467  return consumed;
468 }
469 
475 {
476  queue = q;
477  auto &manager = queue->get_id_manager();
478 
479 #if LIBPMEMOBJ_CPP_VG_DRD_ENABLED
480  ANNOTATE_BENIGN_RACE_SIZED(
481  &manager, sizeof(std::mutex),
482  "https://bugs.kde.org/show_bug.cgi?id=416286");
483 #endif
484 
485  id = manager.get();
486 
487  assert(id < q->ring_buffer->nworkers);
488 
489  w = ringbuf_register(queue->ring_buffer.get(), id);
490 }
491 
496 {
497  *this = std::move(other);
498 }
499 
503 inline mpsc_queue::worker &
504 mpsc_queue::worker::operator=(worker &&other)
505 {
506  if (this != &other) {
507  queue = other.queue;
508  w = other.w;
509  id = other.id;
510 
511  other.queue = nullptr;
512  other.w = nullptr;
513  }
514  return *this;
515 }
516 
522 {
523  if (w) {
524  ringbuf_unregister(queue->ring_buffer.get(), w);
525  auto &manager = queue->get_id_manager();
526  manager.release(id);
527  }
528 }
529 
542 template <typename Function>
543 bool
545  Function &&on_produce)
546 {
547  auto req_size =
548  pmem::detail::align_up(data.size() + sizeof(first_block::size),
549  pmem::detail::CACHELINE_SIZE);
550  auto offset = acquire_cachelines(req_size);
551 
552 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
553  ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
554 #endif
555 
556  if (offset == -1)
557  return false;
558 
559  store_to_log(data, queue->buf + offset);
560 
561 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
562  ANNOTATE_HAPPENS_BEFORE(queue->ring_buffer.get());
563 #endif
564 
565  on_produce(pmem::obj::string_view(
566  queue->buf + offset + sizeof(first_block::size), data.size()));
567 
568  produce_cachelines();
569 
570  return true;
571 }
572 
573 inline void
574 mpsc_queue::worker::store_to_log(pmem::obj::string_view data, char *log_data)
575 {
576  assert(reinterpret_cast<uintptr_t>(log_data) %
577  pmem::detail::CACHELINE_SIZE ==
578  0);
579 
580 /* Invariant: producer can only produce data to cachelines which have
581  * first 8 bytes zeroed.
582  */
583 #ifndef NDEBUG
584  auto b = reinterpret_cast<first_block *>(log_data);
585  auto s = pmem::detail::align_up(data.size() + sizeof(first_block::size),
586  pmem::detail::CACHELINE_SIZE);
587  auto e = b + s / pmem::detail::CACHELINE_SIZE;
588  while (b < e) {
589  assert(b->size == 0);
590  b++;
591  }
592 #endif
593 
594  assert(reinterpret_cast<first_block *>(log_data)->size == 0);
595 
596  first_block fblock;
597  fblock.size = data.size() | size_t(first_block::DIRTY_FLAG);
598 
599  /*
600  * First step is to copy up to 56B of data and store
601  * data.size() with DIRTY flag set. After that, we store
602  * rest of the data in two steps:
603  * 1. Remainder of the data is aligned down to
604  * cacheline and copied.
605  * Now, we are left with between 0 to 63 bytes. If
606  * nonzero:
607  * 2. Create a stack allocated cacheline-sized
608  * buffer, fill in the remainder of the data, and
609  * copy the entire cacheline. After all data is
610  * stored, we clear the dirty flag from size.
611  *
612  * This is done so that we avoid a cache-miss on
613  * misaligned writes.
614  */
615 
616  size_t ncopy = (std::min)(data.size(), size_t(first_block::CAPACITY));
617  std::copy_n(data.data(), ncopy, fblock.data);
618 
619  pmemobj_memcpy(queue->pop.handle(), log_data,
620  reinterpret_cast<char *>(&fblock),
621  pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
622 
623  size_t remaining_size = ncopy > data.size() ? 0 : data.size() - ncopy;
624 
625  const char *srcof = data.data() + ncopy;
626  size_t rcopy = pmem::detail::align_down(remaining_size,
627  pmem::detail::CACHELINE_SIZE);
628  size_t lcopy = remaining_size - rcopy;
629 
630  char last_cacheline[pmem::detail::CACHELINE_SIZE];
631  if (lcopy != 0)
632  std::copy_n(srcof + rcopy, lcopy, last_cacheline);
633 
634  if (rcopy != 0) {
635  char *dest = log_data + pmem::detail::CACHELINE_SIZE;
636 
637  pmemobj_memcpy(queue->pop.handle(), dest, srcof, rcopy,
638  PMEMOBJ_F_MEM_NODRAIN |
639  PMEMOBJ_F_MEM_NONTEMPORAL);
640  }
641 
642  if (lcopy != 0) {
643  void *dest = log_data + pmem::detail::CACHELINE_SIZE + rcopy;
644 
645  pmemobj_memcpy(queue->pop.handle(), dest, last_cacheline,
646  pmem::detail::CACHELINE_SIZE,
647  PMEMOBJ_F_MEM_NODRAIN |
648  PMEMOBJ_F_MEM_NONTEMPORAL);
649  }
650 
651  pmemobj_drain(queue->pop.handle());
652 
653  fblock.size &= (~size_t(first_block::DIRTY_FLAG));
654 
655  pmemobj_memcpy(queue->pop.handle(), log_data,
656  reinterpret_cast<char *>(&fblock),
657  pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
658 }
659 
666 inline mpsc_queue::batch_type::batch_type(iterator begin_, iterator end_)
667  : begin_(begin_), end_(end_)
668 {
669 }
670 
677 inline mpsc_queue::iterator
679 {
680  return begin_;
681 }
682 
689 inline mpsc_queue::iterator
691 {
692  return end_;
693 }
694 
695 mpsc_queue::iterator::iterator(char *data, char *end) : data(data), end(end)
696 {
697  auto b = reinterpret_cast<first_block *>(data);
698  auto next = seek_next(b);
699  assert(next >= b);
700  this->data = reinterpret_cast<char *>(next);
701 }
702 
703 void
704 mpsc_queue::clear_cachelines(first_block *block, size_t size)
705 {
706  assert(size % pmem::detail::CACHELINE_SIZE == 0);
707  assert(pmemobj_tx_stage() == TX_STAGE_WORK);
708 
709  auto end = block +
710  static_cast<ptrdiff_t>(size / pmem::detail::CACHELINE_SIZE);
711 
712  while (block < end) {
713  /* data in block might be uninitialized. */
714  detail::conditional_add_to_tx(&block->size, 1,
715  POBJ_XADD_ASSUME_INITIALIZED);
716  block->size = 0;
717  block++;
718  }
719 
720  assert(end <= reinterpret_cast<first_block *>(buf + buf_size));
721 }
722 
723 mpsc_queue::iterator &
724 mpsc_queue::iterator::operator++()
725 {
726  auto block = reinterpret_cast<first_block *>(data);
727  assert(block->size != 0);
728 
729  auto element_size =
730  pmem::detail::align_up(block->size + sizeof(block->size),
731  pmem::detail::CACHELINE_SIZE);
732 
733  block += element_size / pmem::detail::CACHELINE_SIZE;
734 
735  auto next = seek_next(block);
736  assert(next >= block);
737  block = next;
738 
739  data = reinterpret_cast<char *>(block);
740 
741  return *this;
742 }
743 
744 bool
745 mpsc_queue::iterator::operator==(const mpsc_queue::iterator &rhs) const
746 {
747  return data == rhs.data;
748 }
749 
750 bool
751 mpsc_queue::iterator::operator!=(const mpsc_queue::iterator &rhs) const
752 {
753  return data != rhs.data;
754 }
755 
756 pmem::obj::string_view mpsc_queue::iterator::operator*() const
757 {
758  auto b = reinterpret_cast<first_block *>(data);
759  return pmem::obj::string_view(b->data, b->size);
760 }
761 
762 mpsc_queue::first_block *
763 mpsc_queue::iterator::seek_next(mpsc_queue::first_block *b)
764 {
765  auto e = reinterpret_cast<first_block *>(end);
766 
767  /* Advance to first, unconsumed element. Each cacheline can be in one of
768  * 3 states:
769  * 1. First 8 bytes (size) are equal to 0 - there is no data in this
770  * cacheline.
771  * 2. First 8 bytes (size) are non-zero and have dirty flag set - next
772  * size bytes are junk.
773  * 3. First 8 bytes (size) are non-zero and have dirty flag unset - next
774  * size bytes are ready to be consumed (they represent consistent data).
775  */
776  while (b < e) {
777  if (b->size == 0) {
778  b++;
779  } else if (b->size & size_t(first_block::DIRTY_FLAG)) {
780  auto size =
781  b->size & (~size_t(first_block::DIRTY_FLAG));
782  auto aligned_size = pmem::detail::align_up(
783  size + sizeof(b->size),
784  pmem::detail::CACHELINE_SIZE);
785 
786  b += aligned_size / pmem::detail::CACHELINE_SIZE;
787  } else {
788  break;
789  }
790  }
791 
792  assert(b <= e);
793 
794  return b;
795 }
796 
797 } /* namespace experimental */
798 } /* namespace obj */
799 } /* namespace pmem */
800 
801 #endif /* LIBPMEMOBJ_MPSC_QUEUE_HPP */
Our partial std::string_view implementation.
Definition: string_view.hpp:48
constexpr size_type size() const noexcept
Returns count of characters stored in this pmem::obj::string_view data.
Definition: string_view.hpp:397
constexpr const CharT * data() const noexcept
Returns pointer to data stored in this pmem::obj::string_view.
Definition: string_view.hpp:359
Type representing the range of the mpsc_queue elements.
Definition: mpsc_queue.hpp:120
batch_type(iterator begin, iterator end)
Constructs batch_type object.
Definition: mpsc_queue.hpp:666
iterator begin() const
Returns an iterator to the beginning of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:678
iterator end() const
Returns an iterator to the end of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:690
Type representing persistent data, which may be managed by mpsc_queue.
Definition: mpsc_queue.hpp:182
pmem_log_type(size_t size)
Constructs pmem_log_type object.
Definition: mpsc_queue.hpp:333
pmem::obj::string_view data()
Returns pmem::obj::string_view which allows to read-only access to the underlying buffer.
Definition: mpsc_queue.hpp:345
mpsc_queue producer worker class.
Definition: mpsc_queue.hpp:143
~worker()
Default destructor of worker object.
Definition: mpsc_queue.hpp:521
bool try_produce(pmem::obj::string_view data, Function &&on_produce=[](pmem::obj::string_view target) {})
Copies data from pmem::obj::string_view into the mpsc_queue.
Definition: mpsc_queue.hpp:544
worker(mpsc_queue *q)
Default constructor of worker object.
Definition: mpsc_queue.hpp:474
Persistent memory aware implementation of multi producer single consumer queue.
Definition: mpsc_queue.hpp:52
bool try_consume_batch(Function &&f)
Evaluates callback function f() for the data, which is ready to be consumed.
Definition: mpsc_queue.hpp:401
mpsc_queue(pmem_log_type &pmem, size_t max_workers=1)
mpsc_queue constructor.
Definition: mpsc_queue.hpp:203
worker register_worker()
Registers the producer worker.
Definition: mpsc_queue.hpp:375
static void run(obj::pool_base &pool, std::function< void()> tx, Locks &... locks)
Execute a closure-like transaction and lock locks.
Definition: transaction.hpp:810
The non-template pool base class.
Definition: pool.hpp:51
Custom transaction error class.
Definition: pexceptions.hpp:167
Commonly used functionality.
A persistent version of thread-local storage.
basic_string_view< char > string_view
The most typical string_view usage - the char specialization.
Definition: string_view.hpp:169
persistent_ptr transactional allocation functions for objects.
void conditional_add_to_tx(const T *that, std::size_t count=1, uint64_t flags=0)
Conditionally add 'count' objects to a transaction.
Definition: common.hpp:176
pool_base pool_by_vptr(const T *that)
Retrieve pool handle for the given pointer.
Definition: utils.hpp:32
Persistent memory namespace.
Definition: allocation_flag.hpp:15
Persistent smart pointer.
Lock-free multi-producer single-consumer (MPSC) ring buffer.
Our partial std::string_view implementation.
This structure is used for assigning unique thread ids so that those ids will be reused in case of th...
Definition: enumerable_thread_specific.hpp:35
C++ pmemobj transactions.