PMDK C++ bindings  1.13.0-git23.gf49772ac
This is the C++ bindings documentation for PMDK's libpmemobj.
mpsc_queue.hpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 /* Copyright 2021, Intel Corporation */
3 
9 #ifndef LIBPMEMOBJ_MPSC_QUEUE_HPP
10 #define LIBPMEMOBJ_MPSC_QUEUE_HPP
11 
14 #include <libpmemobj++/detail/ringbuf.hpp>
19 
20 #include <atomic>
21 #include <cstddef>
22 #include <cstring>
23 #include <iterator>
24 #include <memory>
25 
26 namespace pmem
27 {
28 
29 namespace obj
30 {
31 
32 namespace experimental
33 {
34 
50 class mpsc_queue {
51 public:
52  class worker;
53  class pmem_log_type;
54  class batch_type;
55 
56  mpsc_queue(pmem_log_type &pmem, size_t max_workers = 1);
57 
59 
60  template <typename Function>
61  bool try_consume_batch(Function &&f);
62 
63 private:
64  struct first_block {
65  static constexpr size_t CAPACITY =
66  pmem::detail::CACHELINE_SIZE - sizeof(size_t);
67  static constexpr size_t DIRTY_FLAG =
68  (1ULL << (sizeof(size_t) * 8 - 1));
69 
71  char data[CAPACITY];
72  };
73 
74  struct iterator {
75  iterator(char *data, char *end);
76 
77  iterator &operator++();
78 
79  bool operator==(const iterator &rhs) const;
80  bool operator!=(const iterator &rhs) const;
81 
82  pmem::obj::string_view operator*() const;
83 
84  private:
85  first_block *seek_next(first_block *);
86 
87  char *data;
88  char *end;
89  };
90 
91  void clear_cachelines(first_block *block, size_t size);
92  void restore_offsets();
93 
94  size_t consume_cachelines(size_t *offset);
95  void release_cachelines(size_t len);
96 
97  inline pmem::detail::id_manager &get_id_manager();
98 
99  /* ringbuf_t handle. Important: mpsc_queue operates on cachelines hence
100  * ringbuf_produce/release functions are called with number of
101  * cachelines, not bytes. */
102  std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
103  char *buf;
105  size_t buf_size;
107 
108  /* Stores offset and length of next message to be consumed. Only
109  * valid if ring_buffer->consume_in_progress. */
110  size_t consume_offset = 0;
111  size_t consume_len = 0;
112 
113 public:
118  class batch_type {
119  public:
120  batch_type(iterator begin, iterator end);
121 
122  iterator begin() const;
123  iterator end() const;
124 
125  private:
126  iterator begin_;
127  iterator end_;
128  };
129 
141  class worker {
142  public:
143  worker(mpsc_queue *q);
144  ~worker();
145 
146  worker(const worker &) = delete;
147  worker &operator=(const worker &) = delete;
148 
149  worker(worker &&other);
150  worker &operator=(worker &&other);
151 
152  template <typename Function = void (*)(pmem::obj::string_view)>
153  bool try_produce(
155  Function &&on_produce =
156  [](pmem::obj::string_view target) {});
157 
158  private:
159  mpsc_queue *queue;
160  ringbuf::ringbuf_worker_t *w;
161  size_t id;
162 
163  ptrdiff_t acquire_cachelines(size_t len);
164  void produce_cachelines();
165  void store_to_log(pmem::obj::string_view data, char *log_data);
166 
167  friend class mpsc_queue;
168  };
169 
181  public:
182  pmem_log_type(size_t size);
183 
185 
186  private:
188  pmem::obj::p<size_t> written;
189 
190  friend class mpsc_queue;
191  };
192 };
193 
202 {
204 
205  auto buf_data = pmem.data();
206 
207  buf = const_cast<char *>(buf_data.data());
208  buf_size = buf_data.size();
209 
210  assert(buf_size % pmem::detail::CACHELINE_SIZE == 0);
211 
212  ring_buffer =
213  std::unique_ptr<ringbuf::ringbuf_t>(new ringbuf::ringbuf_t(
214  max_workers, buf_size / pmem::detail::CACHELINE_SIZE));
215 
216  this->pmem = &pmem;
217 
218  restore_offsets();
219 }
220 
221 ptrdiff_t
222 mpsc_queue::worker::acquire_cachelines(size_t len)
223 {
224  assert(len % pmem::detail::CACHELINE_SIZE == 0);
225  auto ret = ringbuf_acquire(queue->ring_buffer.get(), w,
226  len / pmem::detail::CACHELINE_SIZE);
227 
228  if (ret < 0)
229  return ret;
230 
231  return ret * static_cast<ptrdiff_t>(pmem::detail::CACHELINE_SIZE);
232 }
233 
234 void
235 mpsc_queue::worker::produce_cachelines()
236 {
237  ringbuf_produce(queue->ring_buffer.get(), w);
238 }
239 
240 size_t
241 mpsc_queue::consume_cachelines(size_t *offset)
242 {
243  auto ret = ringbuf_consume(ring_buffer.get(), offset);
244  if (ret) {
245  *offset *= pmem::detail::CACHELINE_SIZE;
246  return ret * pmem::detail::CACHELINE_SIZE;
247  }
248 
249  return 0;
250 }
251 
252 void
253 mpsc_queue::release_cachelines(size_t len)
254 {
255  assert(len % pmem::detail::CACHELINE_SIZE == 0);
256  ringbuf_release(ring_buffer.get(), len / pmem::detail::CACHELINE_SIZE);
257 }
258 
259 void
260 mpsc_queue::restore_offsets()
261 {
262  /* Invariant */
263  assert(pmem->written < buf_size);
264 
265  /* XXX: implement restore_offset function in ringbuf */
266 
267  auto w = register_worker();
268 
269  if (!pmem->written) {
270  /* If pmem->written == 0 it means that consumer should start
271  * reading from the beginning. There might be elements produced
272  * anywhere in the log. Since we want to prohibit any producers
273  * from overwriting the original content - mark the entire log
274  * as produced. */
275 
276  auto acq = w.acquire_cachelines(buf_size -
277  pmem::detail::CACHELINE_SIZE);
278  assert(acq == 0);
279  (void)acq;
280 
281  w.produce_cachelines();
282 
283  return;
284  }
285 
286  /* If pmem->written != 0 there still might be element in the log.
287  * Moreover, to guarantee proper order of elements on recovery, we must
288  * restore consumer offset. (If we would start consuming from the
289  * beginning of the log, we could consume newer elements first.) Offsets
290  * are restored by following operations:
291  *
292  * produce(pmem->written);
293  * consume();
294  * produce(size - pmem->written);
295  * produce(pmem->written - CACHELINE_SIZE);
296  *
297  * This results in producer offset equal to pmem->written -
298  * CACHELINE_SIZE and consumer offset equal to pmem->written.
299  */
300 
301  auto acq = w.acquire_cachelines(pmem->written);
302  assert(acq == 0);
303  w.produce_cachelines();
304 
305  /* Restore consumer offset */
306  size_t offset;
307  auto len = consume_cachelines(&offset);
308  assert(len == pmem->written);
309  release_cachelines(len);
310 
311  assert(offset == 0);
312  assert(len == pmem->written);
313 
314  acq = w.acquire_cachelines(buf_size - pmem->written);
315  assert(acq >= 0);
316  assert(static_cast<size_t>(acq) == pmem->written);
317  w.produce_cachelines();
318 
319  acq = w.acquire_cachelines(pmem->written -
320  pmem::detail::CACHELINE_SIZE);
321  assert(acq == 0);
322  (void)acq;
323  w.produce_cachelines();
324 }
325 
332  : data_(size, 0), written(0)
333 {
334 }
335 
344 {
345  auto addr = reinterpret_cast<uintptr_t>(&data_[0]);
346  auto aligned_addr =
347  pmem::detail::align_up(addr, pmem::detail::CACHELINE_SIZE);
348 
349  auto size = data_.size() - (aligned_addr - addr);
350  auto aligned_size =
351  pmem::detail::align_down(size, pmem::detail::CACHELINE_SIZE);
352 
353  return pmem::obj::string_view(
354  reinterpret_cast<const char *>(aligned_addr), aligned_size);
355 }
356 
358 mpsc_queue::get_id_manager()
359 {
360  static pmem::detail::id_manager manager;
361  return manager;
362 }
363 
372 inline mpsc_queue::worker
374 {
375  return worker(this);
376 }
377 
397 template <typename Function>
398 inline bool
400 {
401  if (pmemobj_tx_stage() != TX_STAGE_NONE)
403  "Function called inside a transaction scope.");
404 
405  bool consumed = false;
406 
407  /* Need to call try_consume twice, as some data may be at the end
408  * of buffer, and some may be at the beginning. Ringbuffer does not
409  * merge those two parts into one try_consume. If all data was
410  * consumed during first try_consume, second will do nothing. */
411  for (int i = 0; i < 2; i++) {
412  /* If there is no consume in progress, it's safe to call
413  * ringbuf_consume. */
414  if (!ring_buffer->consume_in_progress) {
415  size_t offset;
416  auto len = consume_cachelines(&offset);
417  if (!len)
418  return consumed;
419 
420  consume_offset = offset;
421  consume_len = len;
422  } else {
423  assert(consume_len != 0);
424  }
425 
426 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
427  ANNOTATE_HAPPENS_AFTER(ring_buffer.get());
428 #endif
429 
430  auto data = buf + consume_offset;
431  auto begin = iterator(data, data + consume_len);
432  auto end = iterator(data + consume_len, data + consume_len);
433 
435  if (begin != end) {
436  consumed = true;
437  f(batch_type(begin, end));
438  }
439 
440  auto b = reinterpret_cast<first_block *>(data);
441  clear_cachelines(b, consume_len);
442 
443  if (consume_offset + consume_len < buf_size)
444  pmem->written = consume_offset + consume_len;
445  else if (consume_offset + consume_len == buf_size)
446  pmem->written = 0;
447  else
448  assert(false);
449  });
450 
451 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
452  ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
453 #endif
454 
455  release_cachelines(consume_len);
456 
457  assert(!ring_buffer->consume_in_progress);
458 
459  /* XXX: it would be better to call f once - hide
460  * wraparound behind iterators */
461  /* XXX: add param to ringbuf_consume and do not
462  * call store_explicit in consume */
463  }
464 
465  return consumed;
466 }
467 
468 inline mpsc_queue::worker::worker(mpsc_queue *q)
469 {
470  queue = q;
471  auto &manager = queue->get_id_manager();
472 
473 #if LIBPMEMOBJ_CPP_VG_DRD_ENABLED
474  ANNOTATE_BENIGN_RACE_SIZED(
475  &manager, sizeof(std::mutex),
476  "https://bugs.kde.org/show_bug.cgi?id=416286");
477 #endif
478 
479  id = manager.get();
480 
481  assert(id < q->ring_buffer->nworkers);
482 
483  w = ringbuf_register(queue->ring_buffer.get(), id);
484 }
485 
486 inline mpsc_queue::worker::worker(mpsc_queue::worker &&other)
487 {
488  *this = std::move(other);
489 }
490 
491 inline mpsc_queue::worker &
492 mpsc_queue::worker::operator=(worker &&other)
493 {
494  if (this != &other) {
495  queue = other.queue;
496  w = other.w;
497  id = other.id;
498 
499  other.queue = nullptr;
500  other.w = nullptr;
501  }
502  return *this;
503 }
504 
505 inline mpsc_queue::worker::~worker()
506 {
507  if (w) {
508  ringbuf_unregister(queue->ring_buffer.get(), w);
509  auto &manager = queue->get_id_manager();
510  manager.release(id);
511  }
512 }
513 
526 template <typename Function>
527 bool
529  Function &&on_produce)
530 {
531  auto req_size =
532  pmem::detail::align_up(data.size() + sizeof(first_block::size),
533  pmem::detail::CACHELINE_SIZE);
534  auto offset = acquire_cachelines(req_size);
535 
536 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
537  ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
538 #endif
539 
540  if (offset == -1)
541  return false;
542 
543  store_to_log(data, queue->buf + offset);
544 
545 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
546  ANNOTATE_HAPPENS_BEFORE(queue->ring_buffer.get());
547 #endif
548 
549  on_produce(pmem::obj::string_view(
550  queue->buf + offset + sizeof(first_block::size), data.size()));
551 
552  produce_cachelines();
553 
554  return true;
555 }
556 
557 inline void
558 mpsc_queue::worker::store_to_log(pmem::obj::string_view data, char *log_data)
559 {
560  assert(reinterpret_cast<uintptr_t>(log_data) %
561  pmem::detail::CACHELINE_SIZE ==
562  0);
563 
564 /* Invariant: producer can only produce data to cachelines which have
565  * first 8 bytes zeroed.
566  */
567 #ifndef NDEBUG
568  auto b = reinterpret_cast<first_block *>(log_data);
569  auto s = pmem::detail::align_up(data.size() + sizeof(first_block::size),
570  pmem::detail::CACHELINE_SIZE);
571  auto e = b + s / pmem::detail::CACHELINE_SIZE;
572  while (b < e) {
573  assert(b->size == 0);
574  b++;
575  }
576 #endif
577 
578  assert(reinterpret_cast<first_block *>(log_data)->size == 0);
579 
580  first_block fblock;
581  fblock.size = data.size() | size_t(first_block::DIRTY_FLAG);
582 
583  /*
584  * First step is to copy up to 56B of data and store
585  * data.size() with DIRTY flag set. After that, we store
586  * rest of the data in two steps:
587  * 1. Remainder of the data is aligned down to
588  * cacheline and copied.
589  * Now, we are left with between 0 to 63 bytes. If
590  * nonzero:
591  * 2. Create a stack allocated cacheline-sized
592  * buffer, fill in the remainder of the data, and
593  * copy the entire cacheline. After all data is
594  * stored, we clear the dirty flag from size.
595  *
596  * This is done so that we avoid a cache-miss on
597  * misaligned writes.
598  */
599 
600  size_t ncopy = (std::min)(data.size(), size_t(first_block::CAPACITY));
601  std::copy_n(data.data(), ncopy, fblock.data);
602 
603  pmemobj_memcpy(queue->pop.handle(), log_data,
604  reinterpret_cast<char *>(&fblock),
605  pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
606 
607  size_t remaining_size = ncopy > data.size() ? 0 : data.size() - ncopy;
608 
609  const char *srcof = data.data() + ncopy;
610  size_t rcopy = pmem::detail::align_down(remaining_size,
611  pmem::detail::CACHELINE_SIZE);
612  size_t lcopy = remaining_size - rcopy;
613 
614  char last_cacheline[pmem::detail::CACHELINE_SIZE];
615  if (lcopy != 0)
616  std::copy_n(srcof + rcopy, lcopy, last_cacheline);
617 
618  if (rcopy != 0) {
619  char *dest = log_data + pmem::detail::CACHELINE_SIZE;
620 
621  pmemobj_memcpy(queue->pop.handle(), dest, srcof, rcopy,
622  PMEMOBJ_F_MEM_NODRAIN |
623  PMEMOBJ_F_MEM_NONTEMPORAL);
624  }
625 
626  if (lcopy != 0) {
627  void *dest = log_data + pmem::detail::CACHELINE_SIZE + rcopy;
628 
629  pmemobj_memcpy(queue->pop.handle(), dest, last_cacheline,
630  pmem::detail::CACHELINE_SIZE,
631  PMEMOBJ_F_MEM_NODRAIN |
632  PMEMOBJ_F_MEM_NONTEMPORAL);
633  }
634 
635  pmemobj_drain(queue->pop.handle());
636 
637  fblock.size &= (~size_t(first_block::DIRTY_FLAG));
638 
639  pmemobj_memcpy(queue->pop.handle(), log_data,
640  reinterpret_cast<char *>(&fblock),
641  pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
642 }
643 
644 inline mpsc_queue::batch_type::batch_type(iterator begin_, iterator end_)
645  : begin_(begin_), end_(end_)
646 {
647 }
648 
655 inline mpsc_queue::iterator
657 {
658  return begin_;
659 }
660 
667 inline mpsc_queue::iterator
669 {
670  return end_;
671 }
672 
673 mpsc_queue::iterator::iterator(char *data, char *end) : data(data), end(end)
674 {
675  auto b = reinterpret_cast<first_block *>(data);
676  auto next = seek_next(b);
677  assert(next >= b);
678  this->data = reinterpret_cast<char *>(next);
679 }
680 
681 void
682 mpsc_queue::clear_cachelines(first_block *block, size_t size)
683 {
684  assert(size % pmem::detail::CACHELINE_SIZE == 0);
685  assert(pmemobj_tx_stage() == TX_STAGE_WORK);
686 
687  auto end = block +
688  static_cast<ptrdiff_t>(size / pmem::detail::CACHELINE_SIZE);
689 
690  while (block < end) {
691  /* data in block might be uninitialized. */
692  detail::conditional_add_to_tx(&block->size, 1,
693  POBJ_XADD_ASSUME_INITIALIZED);
694  block->size = 0;
695  block++;
696  }
697 
698  assert(end <= reinterpret_cast<first_block *>(buf + buf_size));
699 }
700 
701 mpsc_queue::iterator &
703 {
704  auto block = reinterpret_cast<first_block *>(data);
705  assert(block->size != 0);
706 
707  auto element_size =
708  pmem::detail::align_up(block->size + sizeof(block->size),
709  pmem::detail::CACHELINE_SIZE);
710 
711  block += element_size / pmem::detail::CACHELINE_SIZE;
712 
713  auto next = seek_next(block);
714  assert(next >= block);
715  block = next;
716 
717  data = reinterpret_cast<char *>(block);
718 
719  return *this;
720 }
721 
722 bool
723 mpsc_queue::iterator::operator==(const mpsc_queue::iterator &rhs) const
724 {
725  return data == rhs.data;
726 }
727 
728 bool
729 mpsc_queue::iterator::operator!=(const mpsc_queue::iterator &rhs) const
730 {
731  return data != rhs.data;
732 }
733 
734 pmem::obj::string_view mpsc_queue::iterator::operator*() const
735 {
736  auto b = reinterpret_cast<first_block *>(data);
737  return pmem::obj::string_view(b->data, b->size);
738 }
739 
740 mpsc_queue::first_block *
741 mpsc_queue::iterator::seek_next(mpsc_queue::first_block *b)
742 {
743  auto e = reinterpret_cast<first_block *>(end);
744 
745  /* Advance to first, unconsumed element. Each cacheline can be in one of
746  * 3 states:
747  * 1. First 8 bytes (size) are equal to 0 - there is no data in this
748  * cacheline.
749  * 2. First 8 bytes (size) are non-zero and have dirty flag set - next
750  * size bytes are junk.
751  * 3. First 8 bytes (size) are non-zero and have dirty flag unset - next
752  * size bytes are ready to be consumed (they represent consistent data).
753  */
754  while (b < e) {
755  if (b->size == 0) {
756  b++;
757  } else if (b->size & size_t(first_block::DIRTY_FLAG)) {
758  auto size =
759  b->size & (~size_t(first_block::DIRTY_FLAG));
760  auto aligned_size = pmem::detail::align_up(
761  size + sizeof(b->size),
762  pmem::detail::CACHELINE_SIZE);
763 
764  b += aligned_size / pmem::detail::CACHELINE_SIZE;
765  } else {
766  break;
767  }
768  }
769 
770  assert(b <= e);
771 
772  return b;
773 }
774 
775 } /* namespace experimental */
776 } /* namespace obj */
777 } /* namespace pmem */
778 
779 #endif /* LIBPMEMOBJ_MPSC_QUEUE_HPP */
Our partial std::string_view implementation.
Definition: string_view.hpp:46
constexpr size_type size() const noexcept
Returns count of characters stored in this pmem::obj::string_view data.
Definition: string_view.hpp:334
constexpr const CharT * data() const noexcept
Returns pointer to data stored in this pmem::obj::string_view.
Definition: string_view.hpp:296
Type representing the range of the mpsc_queue elements.
Definition: mpsc_queue.hpp:118
iterator begin() const
Returns an iterator to the beginning of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:656
iterator end() const
Returns an iterator to the end of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:668
Type representing persistent data, which may be managed by mpsc_queue.
Definition: mpsc_queue.hpp:180
pmem_log_type(size_t size)
Constructs pmem_log_type object.
Definition: mpsc_queue.hpp:331
pmem::obj::string_view data()
Returns pmem::obj::string_view which allows to read-only access to the underlying buffer.
Definition: mpsc_queue.hpp:343
mpsc_queue producer worker class.
Definition: mpsc_queue.hpp:141
bool try_produce(pmem::obj::string_view data, Function &&on_produce=[](pmem::obj::string_view target) {})
Copies data from pmem::obj::string_view into the mpsc_queue.
Definition: mpsc_queue.hpp:528
Persistent memory aware implementation of multi producer single consumer queue.
Definition: mpsc_queue.hpp:50
bool try_consume_batch(Function &&f)
Evaluates callback function f() for the data, which is ready to be consumed.
Definition: mpsc_queue.hpp:399
mpsc_queue(pmem_log_type &pmem, size_t max_workers=1)
mpsc_queue constructor.
Definition: mpsc_queue.hpp:201
worker register_worker()
Registers the producer worker.
Definition: mpsc_queue.hpp:373
static void run(obj::pool_base &pool, std::function< void()> tx, Locks &... locks)
Execute a closure-like transaction and lock locks.
Definition: transaction.hpp:823
The non-template pool base class.
Definition: pool.hpp:50
Custom transaction error class.
Definition: pexceptions.hpp:176
Commonly used functionality.
A persistent version of thread-local storage.
Persistent_ptr transactional allocation functions for objects.
bool operator==(self_relative_ptr< T > const &lhs, self_relative_ptr< Y > const &rhs) noexcept
Equality operator.
Definition: self_relative_ptr.hpp:424
bool operator!=(self_relative_ptr< T > const &lhs, self_relative_ptr< Y > const &rhs) noexcept
Inequality operator.
Definition: self_relative_ptr.hpp:435
p< T > & operator++(p< T > &pp)
Prefix increment operator overload.
Definition: pext.hpp:48
bool operator!=(const allocator< T, P, Tr > &lhs, const OtherAllocator &rhs)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:536
pmem::obj::array< T, N >::iterator end(pmem::obj::array< T, N > &a)
Non-member end.
Definition: array.hpp:849
pool_base pool_by_vptr(const T *that)
Retrieve pool handle for the given pointer.
Definition: utils.hpp:32
bool operator==(standard_alloc_policy< T > const &, standard_alloc_policy< T2 > const &)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:420
pmem::obj::array< T, N >::iterator begin(pmem::obj::array< T, N > &a)
Non-member begin.
Definition: array.hpp:829
Persistent memory namespace.
Definition: allocation_flag.hpp:15
Persistent smart pointer.
Our partial std::string_view implementation.
This structure is used for assigning unique thread ids so that those ids will be reused in case of th...
Definition: enumerable_thread_specific.hpp:35
C++ pmemobj transactions.