9 #ifndef LIBPMEMOBJ_MPSC_QUEUE_HPP
10 #define LIBPMEMOBJ_MPSC_QUEUE_HPP
32 namespace experimental
62 template <
typename Function>
67 static constexpr
size_t CAPACITY =
68 pmem::detail::CACHELINE_SIZE -
sizeof(size_t);
69 static constexpr
size_t DIRTY_FLAG =
70 (1ULL << (
sizeof(size_t) * 8 - 1));
77 iterator(
char *data,
char *end);
79 iterator &operator++();
81 bool operator==(
const iterator &rhs)
const;
82 bool operator!=(
const iterator &rhs)
const;
87 first_block *seek_next(first_block *);
93 void clear_cachelines(first_block *block,
size_t size);
94 void restore_offsets();
96 size_t consume_cachelines(
size_t *offset);
97 void release_cachelines(
size_t len);
104 std::unique_ptr<ringbuf::ringbuf_t> ring_buffer;
112 size_t consume_offset = 0;
113 size_t consume_len = 0;
124 iterator
begin()
const;
125 iterator
end()
const;
154 template <
typename Function =
void (*)(pmem::obj::
string_view)>
157 Function &&on_produce =
162 ringbuf::ringbuf_worker_t *w;
165 ptrdiff_t acquire_cachelines(
size_t len);
166 void produce_cachelines();
207 auto buf_data =
pmem.data();
209 buf =
const_cast<char *
>(buf_data.data());
210 buf_size = buf_data.size();
212 assert(buf_size % pmem::detail::CACHELINE_SIZE == 0);
215 std::unique_ptr<ringbuf::ringbuf_t>(
new ringbuf::ringbuf_t(
216 max_workers, buf_size / pmem::detail::CACHELINE_SIZE));
224 mpsc_queue::worker::acquire_cachelines(
size_t len)
226 assert(len % pmem::detail::CACHELINE_SIZE == 0);
227 auto ret = ringbuf_acquire(queue->ring_buffer.get(), w,
228 len / pmem::detail::CACHELINE_SIZE);
233 return ret *
static_cast<ptrdiff_t
>(pmem::detail::CACHELINE_SIZE);
237 mpsc_queue::worker::produce_cachelines()
239 ringbuf_produce(queue->ring_buffer.get(), w);
243 mpsc_queue::consume_cachelines(
size_t *offset)
245 auto ret = ringbuf_consume(ring_buffer.get(), offset);
247 *offset *= pmem::detail::CACHELINE_SIZE;
248 return ret * pmem::detail::CACHELINE_SIZE;
255 mpsc_queue::release_cachelines(
size_t len)
257 assert(len % pmem::detail::CACHELINE_SIZE == 0);
258 ringbuf_release(ring_buffer.get(), len / pmem::detail::CACHELINE_SIZE);
262 mpsc_queue::restore_offsets()
265 assert(
pmem->written < buf_size);
271 if (!
pmem->written) {
278 auto acq = w.acquire_cachelines(buf_size -
279 pmem::detail::CACHELINE_SIZE);
283 w.produce_cachelines();
303 auto acq = w.acquire_cachelines(
pmem->written);
305 w.produce_cachelines();
309 auto len = consume_cachelines(&offset);
310 assert(len ==
pmem->written);
311 release_cachelines(len);
314 assert(len ==
pmem->written);
316 acq = w.acquire_cachelines(buf_size -
pmem->written);
318 assert(
static_cast<size_t>(acq) ==
pmem->written);
319 w.produce_cachelines();
321 acq = w.acquire_cachelines(
pmem->written -
322 pmem::detail::CACHELINE_SIZE);
325 w.produce_cachelines();
334 : data_(size, 0), written(0)
347 auto addr =
reinterpret_cast<uintptr_t
>(&data_[0]);
349 pmem::detail::align_up(addr, pmem::detail::CACHELINE_SIZE);
351 auto size = data_.size() - (aligned_addr - addr);
353 pmem::detail::align_down(size, pmem::detail::CACHELINE_SIZE);
356 reinterpret_cast<const char *
>(aligned_addr), aligned_size);
360 mpsc_queue::get_id_manager()
399 template <
typename Function>
403 if (pmemobj_tx_stage() != TX_STAGE_NONE)
405 "Function called inside a transaction scope.");
407 bool consumed =
false;
413 for (
int i = 0; i < 2; i++) {
416 if (!ring_buffer->consume_in_progress) {
418 auto len = consume_cachelines(&offset);
422 consume_offset = offset;
425 assert(consume_len != 0);
428 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
429 ANNOTATE_HAPPENS_AFTER(ring_buffer.get());
432 auto data = buf + consume_offset;
433 auto begin = iterator(data, data + consume_len);
434 auto end = iterator(data + consume_len, data + consume_len);
442 auto b =
reinterpret_cast<first_block *
>(data);
443 clear_cachelines(b, consume_len);
445 if (consume_offset + consume_len < buf_size)
446 pmem->written = consume_offset + consume_len;
447 else if (consume_offset + consume_len == buf_size)
453 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
454 ANNOTATE_HAPPENS_BEFORE(ring_buffer.get());
457 release_cachelines(consume_len);
459 assert(!ring_buffer->consume_in_progress);
477 auto &manager = queue->get_id_manager();
479 #if LIBPMEMOBJ_CPP_VG_DRD_ENABLED
480 ANNOTATE_BENIGN_RACE_SIZED(
481 &manager,
sizeof(std::mutex),
482 "https://bugs.kde.org/show_bug.cgi?id=416286");
487 assert(id < q->ring_buffer->nworkers);
489 w = ringbuf_register(queue->ring_buffer.get(),
id);
497 *
this = std::move(other);
504 mpsc_queue::worker::operator=(
worker &&other)
506 if (
this != &other) {
511 other.queue =
nullptr;
524 ringbuf_unregister(queue->ring_buffer.get(), w);
525 auto &manager = queue->get_id_manager();
542 template <
typename Function>
545 Function &&on_produce)
548 pmem::detail::align_up(data.
size() +
sizeof(first_block::size),
549 pmem::detail::CACHELINE_SIZE);
550 auto offset = acquire_cachelines(req_size);
552 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
553 ANNOTATE_HAPPENS_AFTER(queue->ring_buffer.get());
559 store_to_log(data, queue->buf + offset);
561 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
562 ANNOTATE_HAPPENS_BEFORE(queue->ring_buffer.get());
566 queue->buf + offset +
sizeof(first_block::size), data.
size()));
568 produce_cachelines();
576 assert(
reinterpret_cast<uintptr_t
>(log_data) %
577 pmem::detail::CACHELINE_SIZE ==
584 auto b =
reinterpret_cast<first_block *
>(log_data);
585 auto s = pmem::detail::align_up(data.
size() +
sizeof(first_block::size),
586 pmem::detail::CACHELINE_SIZE);
587 auto e = b + s / pmem::detail::CACHELINE_SIZE;
589 assert(b->size == 0);
594 assert(
reinterpret_cast<first_block *
>(log_data)->size == 0);
597 fblock.size = data.
size() | size_t(first_block::DIRTY_FLAG);
616 size_t ncopy = (std::min)(data.
size(), size_t(first_block::CAPACITY));
617 std::copy_n(data.
data(), ncopy, fblock.data);
619 pmemobj_memcpy(queue->pop.handle(), log_data,
620 reinterpret_cast<char *
>(&fblock),
621 pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
623 size_t remaining_size = ncopy > data.
size() ? 0 : data.
size() - ncopy;
625 const char *srcof = data.
data() + ncopy;
626 size_t rcopy = pmem::detail::align_down(remaining_size,
627 pmem::detail::CACHELINE_SIZE);
628 size_t lcopy = remaining_size - rcopy;
630 char last_cacheline[pmem::detail::CACHELINE_SIZE];
632 std::copy_n(srcof + rcopy, lcopy, last_cacheline);
635 char *dest = log_data + pmem::detail::CACHELINE_SIZE;
637 pmemobj_memcpy(queue->pop.handle(), dest, srcof, rcopy,
638 PMEMOBJ_F_MEM_NODRAIN |
639 PMEMOBJ_F_MEM_NONTEMPORAL);
643 void *dest = log_data + pmem::detail::CACHELINE_SIZE + rcopy;
645 pmemobj_memcpy(queue->pop.handle(), dest, last_cacheline,
646 pmem::detail::CACHELINE_SIZE,
647 PMEMOBJ_F_MEM_NODRAIN |
648 PMEMOBJ_F_MEM_NONTEMPORAL);
651 pmemobj_drain(queue->pop.handle());
653 fblock.size &= (~size_t(first_block::DIRTY_FLAG));
655 pmemobj_memcpy(queue->pop.handle(), log_data,
656 reinterpret_cast<char *
>(&fblock),
657 pmem::detail::CACHELINE_SIZE, PMEMOBJ_F_MEM_NONTEMPORAL);
667 : begin_(begin_), end_(end_)
677 inline mpsc_queue::iterator
689 inline mpsc_queue::iterator
695 mpsc_queue::iterator::iterator(
char *data,
char *end) : data(data), end(end)
697 auto b =
reinterpret_cast<first_block *
>(data);
698 auto next = seek_next(b);
700 this->data =
reinterpret_cast<char *
>(next);
704 mpsc_queue::clear_cachelines(first_block *block,
size_t size)
706 assert(size % pmem::detail::CACHELINE_SIZE == 0);
707 assert(pmemobj_tx_stage() == TX_STAGE_WORK);
710 static_cast<ptrdiff_t
>(size / pmem::detail::CACHELINE_SIZE);
712 while (block < end) {
715 POBJ_XADD_ASSUME_INITIALIZED);
720 assert(end <=
reinterpret_cast<first_block *
>(buf + buf_size));
723 mpsc_queue::iterator &
724 mpsc_queue::iterator::operator++()
726 auto block =
reinterpret_cast<first_block *
>(data);
727 assert(block->size != 0);
730 pmem::detail::align_up(block->size +
sizeof(block->size),
731 pmem::detail::CACHELINE_SIZE);
733 block += element_size / pmem::detail::CACHELINE_SIZE;
735 auto next = seek_next(block);
736 assert(next >= block);
739 data =
reinterpret_cast<char *
>(block);
745 mpsc_queue::iterator::operator==(
const mpsc_queue::iterator &rhs)
const
747 return data == rhs.
data;
751 mpsc_queue::iterator::operator!=(
const mpsc_queue::iterator &rhs)
const
753 return data != rhs.
data;
758 auto b =
reinterpret_cast<first_block *
>(data);
762 mpsc_queue::first_block *
763 mpsc_queue::iterator::seek_next(mpsc_queue::first_block *b)
765 auto e =
reinterpret_cast<first_block *
>(end);
779 }
else if (b->size &
size_t(first_block::DIRTY_FLAG)) {
781 b->size & (~size_t(first_block::DIRTY_FLAG));
782 auto aligned_size = pmem::detail::align_up(
783 size +
sizeof(b->size),
784 pmem::detail::CACHELINE_SIZE);
786 b += aligned_size / pmem::detail::CACHELINE_SIZE;
Our partial std::string_view implementation.
Definition: string_view.hpp:48
constexpr size_type size() const noexcept
Returns count of characters stored in this pmem::obj::string_view data.
Definition: string_view.hpp:397
constexpr const CharT * data() const noexcept
Returns pointer to data stored in this pmem::obj::string_view.
Definition: string_view.hpp:359
Type representing the range of the mpsc_queue elements.
Definition: mpsc_queue.hpp:120
batch_type(iterator begin, iterator end)
Constructs batch_type object.
Definition: mpsc_queue.hpp:666
iterator begin() const
Returns an iterator to the beginning of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:678
iterator end() const
Returns an iterator to the end of the accessed range of the mpsc_queue.
Definition: mpsc_queue.hpp:690
Type representing persistent data, which may be managed by mpsc_queue.
Definition: mpsc_queue.hpp:182
pmem_log_type(size_t size)
Constructs pmem_log_type object.
Definition: mpsc_queue.hpp:333
pmem::obj::string_view data()
Returns pmem::obj::string_view which allows to read-only access to the underlying buffer.
Definition: mpsc_queue.hpp:345
mpsc_queue producer worker class.
Definition: mpsc_queue.hpp:143
~worker()
Default destructor of worker object.
Definition: mpsc_queue.hpp:521
bool try_produce(pmem::obj::string_view data, Function &&on_produce=[](pmem::obj::string_view target) {})
Copies data from pmem::obj::string_view into the mpsc_queue.
Definition: mpsc_queue.hpp:544
worker(mpsc_queue *q)
Default constructor of worker object.
Definition: mpsc_queue.hpp:474
Persistent memory aware implementation of multi producer single consumer queue.
Definition: mpsc_queue.hpp:52
bool try_consume_batch(Function &&f)
Evaluates callback function f() for the data, which is ready to be consumed.
Definition: mpsc_queue.hpp:401
mpsc_queue(pmem_log_type &pmem, size_t max_workers=1)
mpsc_queue constructor.
Definition: mpsc_queue.hpp:203
worker register_worker()
Registers the producer worker.
Definition: mpsc_queue.hpp:375
static void run(obj::pool_base &pool, std::function< void()> tx, Locks &... locks)
Execute a closure-like transaction and lock locks.
Definition: transaction.hpp:810
The non-template pool base class.
Definition: pool.hpp:51
Custom transaction error class.
Definition: pexceptions.hpp:167
Commonly used functionality.
A persistent version of thread-local storage.
basic_string_view< char > string_view
The most typical string_view usage - the char specialization.
Definition: string_view.hpp:169
persistent_ptr transactional allocation functions for objects.
void conditional_add_to_tx(const T *that, std::size_t count=1, uint64_t flags=0)
Conditionally add 'count' objects to a transaction.
Definition: common.hpp:176
pool_base pool_by_vptr(const T *that)
Retrieve pool handle for the given pointer.
Definition: utils.hpp:32
Persistent memory namespace.
Definition: allocation_flag.hpp:15
Persistent smart pointer.
Lock-free multi-producer single-consumer (MPSC) ring buffer.
Our partial std::string_view implementation.
This structure is used for assigning unique thread ids so that those ids will be reused in case of th...
Definition: enumerable_thread_specific.hpp:35
C++ pmemobj transactions.