PMDK C++ bindings  1.12.1-rc1
This is the C++ bindings documentation for PMDK's libpmemobj.
concurrent_hash_map.hpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 /* Copyright 2019-2020, Intel Corporation */
3 
10 #ifndef PMEMOBJ_CONCURRENT_HASH_MAP_HPP
11 #define PMEMOBJ_CONCURRENT_HASH_MAP_HPP
12 
15 #include <libpmemobj++/detail/pair.hpp>
17 
18 #include <libpmemobj++/defrag.hpp>
20 #include <libpmemobj++/mutex.hpp>
21 #include <libpmemobj++/p.hpp>
24 
25 #include <libpmemobj++/detail/persistent_pool_ptr.hpp>
27 
29 
30 #include <atomic>
31 #include <cassert>
32 #include <functional>
33 #include <initializer_list>
34 #include <iterator> // for std::distance
35 #include <memory>
36 #include <mutex>
37 #include <thread>
38 #include <type_traits>
39 #include <utility>
40 #include <vector>
41 
42 namespace std
43 {
47 template <typename T>
48 struct hash<pmem::obj::p<T>> {
49  size_t
50  operator()(const pmem::obj::p<T> &x) const
51  {
52  return hash<T>()(x.get_ro());
53  }
54 };
55 } /* namespace std */
56 
57 namespace pmem
58 {
59 namespace obj
60 {
61 
62 namespace concurrent_hash_map_internal
63 {
64 template <typename SharedMutexT>
65 class shared_mutex_scoped_lock {
66  using rw_mutex_type = SharedMutexT;
67 
68 public:
69  shared_mutex_scoped_lock(const shared_mutex_scoped_lock &) = delete;
70  shared_mutex_scoped_lock &
71  operator=(const shared_mutex_scoped_lock &) = delete;
72 
74  shared_mutex_scoped_lock() : mutex(nullptr), is_writer(false)
75  {
76  }
77 
79  shared_mutex_scoped_lock(rw_mutex_type &m, bool write = true)
80  : mutex(nullptr)
81  {
82  acquire(m, write);
83  }
84 
86  ~shared_mutex_scoped_lock()
87  {
88  if (mutex)
89  release();
90  }
91 
93  void
94  acquire(rw_mutex_type &m, bool write = true)
95  {
96  is_writer = write;
97  mutex = &m;
98  if (write)
99  mutex->lock();
100  else
101  mutex->lock_shared();
102  }
103 
107  void
108  release()
109  {
110  assert(mutex);
111  rw_mutex_type *m = mutex;
112  mutex = nullptr;
113  if (is_writer) {
114  m->unlock();
115  } else {
116  m->unlock_shared();
117  }
118  }
119 
124  bool
125  try_acquire(rw_mutex_type &m, bool write = true)
126  {
127  assert(!mutex);
128  bool result;
129  is_writer = write;
130  result = write ? m.try_lock() : m.try_lock_shared();
131  if (result)
132  mutex = &m;
133  return result;
134  }
135 
136 protected:
141  rw_mutex_type *mutex;
146  bool is_writer;
147 }; /* class shared_mutex_scoped_lock */
148 
149 template <typename ScopedLockType>
150 using scoped_lock_upgrade_to_writer =
151  decltype(std::declval<ScopedLockType>().upgrade_to_writer());
152 
153 template <typename ScopedLockType>
154 using scoped_lock_has_upgrade_to_writer =
155  detail::supports<ScopedLockType, scoped_lock_upgrade_to_writer>;
156 
157 template <typename ScopedLockType>
158 using scoped_lock_downgrade_to_reader =
159  decltype(std::declval<ScopedLockType>().downgrade_to_reader());
160 
161 template <typename ScopedLockType>
162 using scoped_lock_has_downgrade_to_reader =
163  detail::supports<ScopedLockType, scoped_lock_downgrade_to_reader>;
164 
165 template <typename ScopedLockType,
166  bool = scoped_lock_has_upgrade_to_writer<ScopedLockType>::value
167  &&scoped_lock_has_downgrade_to_reader<ScopedLockType>::value>
168 class scoped_lock_traits {
169 public:
170  using scope_lock_type = ScopedLockType;
171 
172  static bool
173  initial_rw_state(bool write)
174  {
175  /* For upgradeable locks, initial state is always read */
176  return false;
177  }
178 
179  static bool
180  upgrade_to_writer(scope_lock_type &lock)
181  {
182  return lock.upgrade_to_writer();
183  }
184 
185  static bool
186  downgrade_to_reader(scope_lock_type &lock)
187  {
188  return lock.downgrade_to_reader();
189  }
190 };
191 
192 template <typename ScopedLockType>
193 class scoped_lock_traits<ScopedLockType, false> {
194 public:
195  using scope_lock_type = ScopedLockType;
196 
197  static bool
198  initial_rw_state(bool write)
199  {
200  /* For non-upgradeable locks, we take lock in required mode
201  * immediately */
202  return write;
203  }
204 
205  static bool
206  upgrade_to_writer(scope_lock_type &lock)
207  {
208  /* This overload is for locks which do not support upgrade
209  * operation. For those locks, upgrade_to_writer should not be
210  * called when holding a read lock */
211  return true;
212  }
213 
214  static bool
215  downgrade_to_reader(scope_lock_type &lock)
216  {
217  /* This overload is for locks which do not support downgrade
218  * operation. For those locks, downgrade_to_reader should never
219  * be called */
220  assert(false);
221 
222  return false;
223  }
224 };
225 }
226 
227 template <typename Key, typename T, typename Hash = std::hash<Key>,
228  typename KeyEqual = std::equal_to<Key>,
229  typename MutexType = pmem::obj::shared_mutex,
230  typename ScopedLockType = concurrent_hash_map_internal::
231  shared_mutex_scoped_lock<MutexType>>
232 class concurrent_hash_map;
233 
235 namespace concurrent_hash_map_internal
236 {
237 /* Helper method which throws an exception when called in a tx */
238 static inline void
239 check_outside_tx()
240 {
241  if (pmemobj_tx_stage() != TX_STAGE_NONE)
243  "Function called inside transaction scope.");
244 }
245 
246 template <typename Hash>
247 using transparent_key_equal = typename Hash::transparent_key_equal;
248 
249 template <typename Hash>
250 using has_transparent_key_equal = detail::supports<Hash, transparent_key_equal>;
251 
252 template <typename Hash, typename Pred,
253  bool = has_transparent_key_equal<Hash>::value>
254 struct key_equal_type {
255  using type = typename Hash::transparent_key_equal;
256 };
257 
258 template <typename Hash, typename Pred>
259 struct key_equal_type<Hash, Pred, false> {
260  using type = Pred;
261 };
262 
263 template <typename Mutex, typename ScopedLockType>
264 void
265 assert_not_locked(Mutex &mtx)
266 {
267 #ifndef NDEBUG
268  ScopedLockType scoped_lock;
269  assert(scoped_lock.try_acquire(mtx));
270  scoped_lock.release();
271 #else
272  (void)mtx;
273 #endif
274 }
275 
276 template <typename Key, typename T, typename MutexType, typename ScopedLockType>
277 struct hash_map_node {
279  using mutex_t = MutexType;
280 
282  using scoped_t = ScopedLockType;
283 
284  using value_type = detail::pair<const Key, T>;
285 
287  using node_ptr_t = detail::persistent_pool_ptr<
288  hash_map_node<Key, T, mutex_t, scoped_t>>;
289 
291  node_ptr_t next;
292 
294  mutex_t mutex;
295 
297  value_type item;
298 
299  hash_map_node(const node_ptr_t &_next, const Key &key)
300  : next(_next),
301  item(std::piecewise_construct, std::forward_as_tuple(key),
302  std::forward_as_tuple())
303  {
304  }
305 
306  hash_map_node(const node_ptr_t &_next, const Key &key, const T &t)
307  : next(_next), item(key, t)
308  {
309  }
310 
311  hash_map_node(const node_ptr_t &_next, value_type &&i)
312  : next(_next), item(std::move(i))
313  {
314  }
315 
316  template <typename... Args>
317  hash_map_node(const node_ptr_t &_next, Args &&... args)
318  : next(_next), item(std::forward<Args>(args)...)
319  {
320  }
321 
322  hash_map_node(const node_ptr_t &_next, const value_type &i)
323  : next(_next), item(i)
324  {
325  }
326 
328  hash_map_node(const hash_map_node &) = delete;
329 
331  hash_map_node &operator=(const hash_map_node &) = delete;
332 }; /* struct node */
333 
338 template <typename Bucket>
339 class segment_traits {
340 public:
342  using segment_index_t = size_t;
343  using size_type = size_t;
344  using bucket_type = Bucket;
345 
346 protected:
348  constexpr static size_type max_allocation_size = PMEMOBJ_MAX_ALLOC_SIZE;
349 
351  constexpr static segment_index_t first_big_block = 27;
352  /* TODO: avoid hardcoded value; need constexpr similar to:
353  * Log2(max_allocation_size / sizeof(bucket_type)) */
354 
356  constexpr static size_type big_block_size = size_type(1)
357  << first_big_block;
358 
359  /* Block size in bytes cannot exceed max_allocation_size */
360  static_assert((big_block_size * sizeof(bucket_type)) <
361  max_allocation_size,
362  "Block size exceeds max_allocation_size");
363 
365  constexpr static segment_index_t
366  first_block_in_segment(segment_index_t seg)
367  {
368  return seg < first_big_block
369  ? seg
370  : (first_big_block +
371  (segment_index_t(1) << (seg - first_big_block)) - 1);
372  }
373 
375  constexpr static size_type
376  blocks_in_segment(segment_index_t seg)
377  {
378  return seg < first_big_block
379  ? segment_index_t(1)
380  : segment_index_t(1) << (seg - first_big_block);
381  }
382 
384  constexpr static size_type
385  block_size(segment_index_t b)
386  {
387  return b < first_big_block ? segment_size(b ? b : 1)
388  : big_block_size;
389  }
390 
391 public:
393  constexpr static segment_index_t embedded_segments = 1;
394 
396  constexpr static size_type embedded_buckets = 1 << embedded_segments;
397 
399  constexpr static segment_index_t number_of_segments = 32;
400 
402  static const size_type first_block = 8;
403 
405  constexpr static segment_index_t
406  number_of_blocks()
407  {
408  return first_block_in_segment(number_of_segments);
409  }
410 
412  static segment_index_t
413  segment_index_of(size_type index)
414  {
415  return segment_index_t(detail::Log2(index | 1));
416  }
417 
419  constexpr static segment_index_t
420  segment_base(segment_index_t k)
421  {
422  return (segment_index_t(1) << k) & ~segment_index_t(1);
423  }
424 
426  constexpr static size_type
427  segment_size(segment_index_t k)
428  {
429  return size_type(1) << k; // fake value for k == 0
430  }
431  static_assert(
432  embedded_segments < first_big_block,
433  "Number of embedded segments cannot exceed max_allocation_size");
434 }; /* End of class segment_traits */
435 
452 template <typename BlockTable, typename SegmentTraits, bool is_const>
453 class segment_facade_impl : public SegmentTraits {
454 private:
455  using traits_type = SegmentTraits;
456  using traits_type::block_size;
457  using traits_type::blocks_in_segment;
458  using traits_type::embedded_buckets;
459  using traits_type::embedded_segments;
460  using traits_type::first_block;
461  using traits_type::first_block_in_segment;
462  using traits_type::segment_base;
463  using traits_type::segment_size;
464 
465 public:
466  using table_reference =
467  typename std::conditional<is_const, const BlockTable &,
468  BlockTable &>::type;
469 
470  using table_pointer =
471  typename std::conditional<is_const, const BlockTable *,
472  BlockTable *>::type;
473 
474  using bucket_type = typename traits_type::bucket_type;
475  using segment_index_t = typename traits_type::segment_index_t;
476  using size_type = typename traits_type::size_type;
477 
479  segment_facade_impl(table_reference table, segment_index_t s)
480  : my_table(&table), my_seg(s)
481  {
482  assert(my_seg < traits_type::number_of_segments);
483  }
484 
486  segment_facade_impl(const segment_facade_impl &src)
487  : my_table(src.my_table), my_seg(src.my_seg)
488  {
489  }
490 
491  segment_facade_impl(segment_facade_impl &&src) = default;
492 
494  segment_facade_impl &
495  operator=(const segment_facade_impl &src)
496  {
497  my_table = src.my_table;
498  my_seg = src.my_seg;
499  return *this;
500  }
501 
503  segment_facade_impl &
504  operator=(segment_facade_impl &&src)
505  {
506  my_table = src.my_table;
507  my_seg = src.my_seg;
508  return *this;
509  }
510 
517  bucket_type &operator[](size_type i) const
518  {
519  assert(i < size());
520 
521  segment_index_t table_block = first_block_in_segment(my_seg);
522  size_type b_size = block_size(table_block);
523 
524  table_block += i / b_size;
525  i = i % b_size;
526 
527  return (*my_table)[table_block][static_cast<std::ptrdiff_t>(i)];
528  }
529 
533  segment_facade_impl &
534  operator++()
535  {
536  ++my_seg;
537  return *this;
538  }
539 
543  segment_facade_impl
544  operator++(int)
545  {
546  segment_facade_impl tmp = *this;
547  ++(*this);
548  return tmp;
549  }
550 
554  segment_facade_impl &
555  operator--()
556  {
557  --my_seg;
558  return *this;
559  }
560 
564  segment_facade_impl
565  operator--(int)
566  {
567  segment_facade_impl tmp = *this;
568  --(*this);
569  return tmp;
570  }
571 
575  segment_facade_impl &
576  operator+=(segment_index_t off)
577  {
578  my_seg += off;
579  return *this;
580  }
581 
585  segment_facade_impl &
586  operator-=(segment_index_t off)
587  {
588  my_seg -= off;
589  return *this;
590  }
591 
595  segment_facade_impl
596  operator+(segment_index_t off) const
597  {
598  return segment_facade_impl(*(this->my_table),
599  this->my_seg + off);
600  }
601 
605  segment_facade_impl
606  operator-(segment_index_t off) const
607  {
608  return segment_facade_impl(*(this->my_table),
609  this->my_seg - off);
610  }
611 
615  void
616  enable(pool_base &pop)
617  {
618  assert(my_seg >= embedded_segments);
619 
620  if (my_seg < first_block) {
621  enable_first_block(pop);
622  } else {
623  enable_big_segment(pop);
624  }
625  }
626 
630  void
631  disable()
632  {
633  assert(my_seg >= embedded_segments);
634 
635  if (my_seg < first_block) {
636  if (my_seg == embedded_segments) {
637  size_type sz = segment_size(first_block) -
638  embedded_buckets;
639  delete_persistent<bucket_type[]>(
640  (*my_table)[my_seg], sz);
641  }
642  (*my_table)[my_seg] = nullptr;
643  } else {
644  block_range blocks = segment_blocks(my_seg);
645 
646  for (segment_index_t b = blocks.first;
647  b < blocks.second; ++b) {
648  if ((*my_table)[b] != nullptr) {
649  delete_persistent<bucket_type[]>(
650  (*my_table)[b], block_size(b));
651  (*my_table)[b] = nullptr;
652  }
653  }
654  }
655  }
656 
660  constexpr size_type
661  size() const
662  {
663  return segment_size(my_seg ? my_seg : 1);
664  }
665 
671  bool
672  is_valid() const
673  {
674  block_range blocks = segment_blocks(my_seg);
675 
676  for (segment_index_t b = blocks.first; b < blocks.second; ++b) {
677  if ((*my_table)[b] == nullptr)
678  return false;
679  }
680 
681  return true;
682  }
683 
684 private:
685  using block_range = std::pair<segment_index_t, segment_index_t>;
686 
690  static block_range
691  segment_blocks(segment_index_t seg)
692  {
693  segment_index_t begin = first_block_in_segment(seg);
694 
695  return block_range(begin, begin + blocks_in_segment(seg));
696  }
697 
698  void
699  enable_first_block(pool_base &pop)
700  {
701  assert(my_seg == embedded_segments);
702  {
703  flat_transaction::manual tx(pop);
704 
705  size_type sz =
706  segment_size(first_block) - embedded_buckets;
707  (*my_table)[my_seg] =
708  make_persistent<bucket_type[]>(sz);
709 
710  persistent_ptr<bucket_type> base =
711  (*my_table)[embedded_segments].raw();
712 
713  for (segment_index_t s = my_seg + 1; s < first_block;
714  ++s) {
715  std::ptrdiff_t off =
716  static_cast<std::ptrdiff_t>(
717  segment_base(s) -
718  segment_base(my_seg));
719 
720  (*my_table)[s] = (base + off).raw();
721  }
722 
724  }
725  }
726 
727  void
728  enable_big_segment(pool_base &pop)
729  {
730  block_range blocks = segment_blocks(my_seg);
731  {
732  flat_transaction::manual tx(pop);
733 
734  for (segment_index_t b = blocks.first;
735  b < blocks.second; ++b) {
736  assert((*my_table)[b] == nullptr);
737  (*my_table)[b] = make_persistent<bucket_type[]>(
738  block_size(b));
739  }
740 
742  }
743  }
744 
746  table_pointer my_table;
747 
749  segment_index_t my_seg;
750 }; /* End of class segment_facade_impl */
751 
758 template <typename Key, typename T, typename MutexType, typename ScopedLockType>
759 class hash_map_base {
760 public:
761  using mutex_t = MutexType;
762  using scoped_t = ScopedLockType;
763 
765  using size_type = size_t;
766 
768  using hashcode_type = size_t;
769 
771  using node = hash_map_node<Key, T, mutex_t, scoped_t>;
772 
774  using node_ptr_t = detail::persistent_pool_ptr<node>;
775 
777  struct bucket {
778  using mutex_t = MutexType;
779  using scoped_t = ScopedLockType;
780 
782  mutex_t mutex;
783 
785  p<std::atomic<uint64_t>> rehashed;
786 
788  node_ptr_t node_list;
789 
791  bucket() : node_list(nullptr)
792  {
793 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
794  VALGRIND_HG_DISABLE_CHECKING(&rehashed,
795  sizeof(rehashed));
796 #endif
797  rehashed.get_rw() = false;
798  }
799 
805  bool
806  is_rehashed(std::memory_order order)
807  {
808  return rehashed.get_ro().load(order);
809  }
810 
811  void
812  set_rehashed(std::memory_order order)
813  {
814  rehashed.get_rw().store(true, order);
815  }
816 
818  bucket(const bucket &) = delete;
819 
821  bucket &operator=(const bucket &) = delete;
822  }; /* End of struct bucket */
823 
825  using segment_traits_t = segment_traits<bucket>;
826 
828  using segment_index_t = typename segment_traits_t::segment_index_t;
829 
831  static const size_type embedded_buckets =
832  segment_traits_t::embedded_buckets;
833 
835  static const size_type first_block = segment_traits_t::first_block;
836 
838  constexpr static size_type block_table_size =
839  segment_traits_t::number_of_blocks();
840 
842  using segment_ptr_t = persistent_ptr<bucket[]>;
843 
845  using bucket_ptr_t = persistent_ptr<bucket>;
846 
848  using blocks_table_t = segment_ptr_t[block_table_size];
849 
851  using segment_enable_mutex_t = pmem::obj::mutex;
852 
854  struct tls_data_t {
855  p<int64_t> size_diff = 0;
856  std::aligned_storage<56, 8> padding;
857  };
858 
859  using tls_t = detail::enumerable_thread_specific<tls_data_t>;
860 
861  enum feature_flags : uint32_t { FEATURE_CONSISTENT_SIZE = 1 };
862 
864  struct features {
865  p<uint32_t> compat;
866  p<uint32_t> incompat;
867  };
868 
869  /* --------------------------------------------------------- */
870 
872  p<uint64_t> my_pool_uuid;
873 
876  features layout_features;
877 
880  std::aligned_storage<sizeof(size_t), sizeof(size_t)>::type
881  my_mask_reserved;
882 
884  /* my_mask always restored on restart. */
885  std::atomic<hashcode_type> my_mask;
886 
887  /* Size of value (key and value pair) stored in a pool */
888  std::size_t value_size;
889 
891  std::aligned_storage<24, 8>::type padding1;
892 
897  blocks_table_t my_table;
898 
899  /* It must be in separate cache line from my_mask due to performance
900  * effects */
902  std::atomic<size_type> my_size;
903 
905  std::aligned_storage<24, 8>::type padding2;
906 
908  persistent_ptr<tls_t> tls_ptr;
909 
915  p<size_t> on_init_size;
916 
918  std::aligned_storage<40, 8>::type reserved;
919 
921  segment_enable_mutex_t my_segment_enable_mutex;
922 
924  bucket my_embedded_segment[embedded_buckets];
925 
926  /* --------------------------------------------------------- */
927 
929  static constexpr features
930  header_features()
931  {
932  return {FEATURE_CONSISTENT_SIZE, 0};
933  }
934 
935  const std::atomic<hashcode_type> &
936  mask() const noexcept
937  {
938  return my_mask;
939  }
940 
941  std::atomic<hashcode_type> &
942  mask() noexcept
943  {
944  return my_mask;
945  }
946 
947  size_t
948  size() const
949  {
950  return my_size.load(std::memory_order_relaxed);
951  }
952 
953  p<int64_t> &
954  thread_size_diff()
955  {
956  assert(this->tls_ptr != nullptr);
957  return this->tls_ptr->local().size_diff;
958  }
959 
961  void
962  tls_restore()
963  {
964  assert(this->tls_ptr != nullptr);
965 
966  pool_base pop = pool_base{pmemobj_pool_by_ptr(this)};
967 
968  int64_t last_run_size = 0;
969  for (auto &data : *tls_ptr)
970  last_run_size += data.size_diff;
971 
972  /* Make sure that on_init_size + last_run_size >= 0 */
973  assert(last_run_size >= 0 ||
974  static_cast<int64_t>(static_cast<size_t>(last_run_size) +
975  on_init_size) >= 0);
976 
977  flat_transaction::run(pop, [&] {
978  on_init_size += static_cast<size_t>(last_run_size);
979  tls_ptr->clear();
980  });
981 
982  this->my_size = on_init_size;
983  }
984 
986  using const_segment_facade_t =
987  segment_facade_impl<blocks_table_t, segment_traits_t, true>;
988 
990  using segment_facade_t =
991  segment_facade_impl<blocks_table_t, segment_traits_t, false>;
992 
994  hash_map_base()
995  {
996  static_assert(
997  sizeof(size_type) == sizeof(std::atomic<size_type>),
998  "std::atomic should have the same layout as underlying integral type");
999 
1000 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
1001  VALGRIND_HG_DISABLE_CHECKING(&my_mask, sizeof(my_mask));
1002 #endif
1003  layout_features = {0, 0};
1004 
1005  PMEMoid oid = pmemobj_oid(this);
1006 
1007  assert(!OID_IS_NULL(oid));
1008 
1009  my_pool_uuid = oid.pool_uuid_lo;
1010 
1011  pool_base pop = get_pool_base();
1012  /* enable embedded segments */
1013  for (size_type i = 0; i < segment_traits_t::embedded_segments;
1014  ++i) {
1015  my_table[i] =
1016  pmemobj_oid(my_embedded_segment +
1017  segment_traits_t::segment_base(i));
1018  segment_facade_t seg(my_table, i);
1019  mark_rehashed<false>(pop, seg);
1020  }
1021 
1022  on_init_size = 0;
1023 
1024  value_size = 0;
1025 
1026  this->tls_ptr = nullptr;
1027  }
1028 
1029  /*
1030  * Should be called before concurrent_hash_map destructor is called.
1031  * Otherwise, program can terminate if an exception occurs wile freeing
1032  * memory inside dtor.
1033  */
1034  void
1035  free_tls()
1036  {
1037  auto pop = get_pool_base();
1038 
1039  if ((layout_features.compat & FEATURE_CONSISTENT_SIZE) &&
1040  tls_ptr) {
1041  flat_transaction::run(pop, [&] {
1042  delete_persistent<tls_t>(tls_ptr);
1043  tls_ptr = nullptr;
1044  });
1045  }
1046  }
1047 
1051  void
1052  calculate_mask()
1053  {
1054 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
1055  VALGRIND_HG_DISABLE_CHECKING(&my_size, sizeof(my_size));
1056  VALGRIND_HG_DISABLE_CHECKING(&my_mask, sizeof(my_mask));
1057 #endif
1058 #if LIBPMEMOBJ_CPP_VG_PMEMCHECK_ENABLED
1059  VALGRIND_PMC_REMOVE_PMEM_MAPPING(&my_size, sizeof(my_size));
1060  VALGRIND_PMC_REMOVE_PMEM_MAPPING(&my_mask, sizeof(my_mask));
1061 #endif
1062 
1063  hashcode_type m = embedded_buckets - 1;
1064 
1065  const_segment_facade_t segment(
1066  my_table, segment_traits_t::embedded_segments);
1067 
1068  while (segment.is_valid()) {
1069  m += segment.size();
1070  ++segment;
1071  }
1072 
1073  mask().store(m, std::memory_order_relaxed);
1074  }
1075 
1079  template <bool Flush = true>
1080  void
1081  mark_rehashed(pool_base &pop, segment_facade_t &segment)
1082  {
1083  for (size_type i = 0; i < segment.size(); ++i) {
1084  bucket *b = &(segment[i]);
1085 
1086  assert_not_locked<mutex_t, scoped_t>(b->mutex);
1087 
1088  b->set_rehashed(std::memory_order_relaxed);
1089  }
1090 
1091  if (Flush) {
1092  /* Flush in separate loop to avoid read-after-flush */
1093  for (size_type i = 0; i < segment.size(); ++i) {
1094  bucket *b = &(segment[i]);
1095  pop.flush(b->rehashed);
1096  }
1097 
1098  pop.drain();
1099  }
1100  }
1101 
1105  void
1106  enable_segment(segment_index_t k, bool is_initial = false)
1107  {
1108  assert(k);
1109 
1110  pool_base pop = get_pool_base();
1111  size_type sz;
1112 
1113  if (k >= first_block) {
1114  segment_facade_t new_segment(my_table, k);
1115 
1116  sz = new_segment.size();
1117  if (!new_segment.is_valid())
1118  new_segment.enable(pop);
1119 
1120  if (is_initial) {
1121  mark_rehashed(pop, new_segment);
1122  }
1123 
1124  /* double it to get entire capacity of the container */
1125  sz <<= 1;
1126  } else {
1127  /* the first block */
1128  assert(k == segment_traits_t::embedded_segments);
1129 
1130  for (segment_index_t i = k; i < first_block; ++i) {
1131  segment_facade_t new_segment(my_table, i);
1132 
1133  if (!new_segment.is_valid())
1134  new_segment.enable(pop);
1135 
1136  if (is_initial) {
1137  mark_rehashed(pop, new_segment);
1138  }
1139  }
1140 
1141  sz = segment_traits_t::segment_size(first_block);
1142  }
1143 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
1144  ANNOTATE_HAPPENS_BEFORE(&my_mask);
1145 #endif
1146  mask().store(sz - 1, std::memory_order_release);
1147  }
1148 
1153  bucket *
1154  get_bucket(hashcode_type h) const
1155  {
1156  segment_index_t s = segment_traits_t::segment_index_of(h);
1157 
1158  h -= segment_traits_t::segment_base(s);
1159 
1160  const_segment_facade_t segment(my_table, s);
1161 
1162  assert(segment.is_valid());
1163 
1164  return &(segment[h]);
1165  }
1166 
1170  inline bool
1171  check_mask_race(hashcode_type h, hashcode_type &m) const
1172  {
1173  hashcode_type m_now, m_old = m;
1174 
1175  m_now = mask().load(std::memory_order_acquire);
1176 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
1177  ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
1178 #endif
1179 
1180  if (m_old != m_now)
1181  return check_rehashing_collision(h, m_old, m = m_now);
1182 
1183  return false;
1184  }
1185 
1189  bool
1190  check_rehashing_collision(hashcode_type h, hashcode_type m_old,
1191  hashcode_type m) const
1192  {
1193  assert(m_old != m);
1194 
1195  if ((h & m_old) != (h & m)) {
1196  /* mask changed for this hashcode, rare event condition
1197  * above proves that 'h' has some other bits set beside
1198  * 'm_old', find next applicable mask after m_old */
1199 
1200  for (++m_old; !(h & m_old); m_old <<= 1)
1201  ;
1202 
1203  m_old = (m_old << 1) - 1; /* get full mask from a bit */
1204 
1205  assert((m_old & (m_old + 1)) == 0 && m_old <= m);
1206 
1207  /* check whether it is rehashing/ed */
1208  bucket *b = get_bucket(h & m_old);
1209  return b->is_rehashed(std::memory_order_acquire);
1210  }
1211 
1212  return false;
1213  }
1214 
1219  template <typename Node, typename... Args>
1220  void
1221  insert_new_node_internal(bucket *b,
1222  detail::persistent_pool_ptr<Node> &new_node,
1223  Args &&... args)
1224  {
1225  assert(pmemobj_tx_stage() == TX_STAGE_WORK);
1226 
1227  new_node = pmem::obj::make_persistent<Node>(
1228  b->node_list, std::forward<Args>(args)...);
1229  b->node_list = new_node; /* bucket is locked */
1230  }
1231 
1236  template <typename Node, typename... Args>
1237  size_type
1238  insert_new_node(bucket *b, detail::persistent_pool_ptr<Node> &new_node,
1239  Args &&... args)
1240  {
1241  pool_base pop = get_pool_base();
1242 
1243  /*
1244  * This is only true when called from singlethreaded methods
1245  * like swap() or operator=. In that case it's safe to directly
1246  * modify on_init_size.
1247  */
1248  if (pmemobj_tx_stage() == TX_STAGE_WORK) {
1249  insert_new_node_internal(b, new_node,
1250  std::forward<Args>(args)...);
1251  this->on_init_size++;
1252  } else {
1253  auto &size_diff = thread_size_diff();
1254 
1256  insert_new_node_internal(
1257  b, new_node,
1258  std::forward<Args>(args)...);
1259  ++size_diff;
1260  });
1261  }
1262 
1263  /* Increment volatile size */
1264  return ++(this->my_size);
1265  }
1266 
1271  bool
1272  check_growth(hashcode_type m, size_type sz)
1273  {
1274  if (sz >= m) {
1275  segment_index_t new_seg =
1276  static_cast<segment_index_t>(detail::Log2(
1277  m +
1278  1)); /* optimized segment_index_of */
1279 
1280  assert(segment_facade_t(my_table, new_seg - 1)
1281  .is_valid());
1282 
1283  std::unique_lock<segment_enable_mutex_t> lock(
1284  my_segment_enable_mutex, std::try_to_lock);
1285 
1286  if (lock) {
1287  if (mask().load(std::memory_order_relaxed) ==
1288  m) {
1289  /* Otherwise, other thread enable this
1290  * segment */
1291  enable_segment(new_seg);
1292 
1293  return true;
1294  }
1295  }
1296  }
1297 
1298  return false;
1299  }
1300 
1304  void
1305  reserve(size_type buckets)
1306  {
1307  if (buckets == 0)
1308  return;
1309 
1310  --buckets;
1311 
1312  bool is_initial = this->size() == 0;
1313 
1314  for (size_type m = mask(); buckets > m; m = mask())
1315  enable_segment(
1316  segment_traits_t::segment_index_of(m + 1),
1317  is_initial);
1318  }
1319 
1324  void
1325  internal_swap(hash_map_base<Key, T, mutex_t, scoped_t> &table)
1326  {
1327  pool_base p = get_pool_base();
1328  {
1330 
1331  this->my_pool_uuid.swap(table.my_pool_uuid);
1332 
1333  /*
1334  * As internal_swap can only be called
1335  * from one thread, and there can be an outer
1336  * transaction we must make sure that mask and size
1337  * changes are transactional
1338  */
1339  flat_transaction::snapshot((size_t *)&this->my_mask);
1340  flat_transaction::snapshot((size_t *)&this->my_size);
1341 
1342  this->mask() = table.mask().exchange(
1343  this->mask(), std::memory_order_relaxed);
1344 
1345  this->my_size = table.my_size.exchange(
1346  this->my_size, std::memory_order_relaxed);
1347 
1348  /* Swap consistent size */
1349  std::swap(this->tls_ptr, table.tls_ptr);
1350 
1351  for (size_type i = 0; i < embedded_buckets; ++i)
1352  this->my_embedded_segment[i].node_list.swap(
1353  table.my_embedded_segment[i].node_list);
1354 
1355  for (size_type i = segment_traits_t::embedded_segments;
1356  i < block_table_size; ++i)
1357  this->my_table[i].swap(table.my_table[i]);
1358 
1360  }
1361  }
1362 
1367  pool_base
1368  get_pool_base()
1369  {
1370  PMEMobjpool *pop =
1371  pmemobj_pool_by_oid(PMEMoid{my_pool_uuid, 0});
1372 
1373  return pool_base(pop);
1374  }
1375 }; /* End of class hash_map_base */
1376 
1382 template <typename Container, bool is_const>
1383 class hash_map_iterator {
1384 public:
1385  using iterator_category = std::forward_iterator_tag;
1386  using difference_type = ptrdiff_t;
1387  using map_type = Container;
1388  using value_type = typename map_type::value_type;
1389  using node = typename map_type::node;
1390  using bucket = typename map_type::bucket;
1391  using map_ptr = typename std::conditional<is_const, const map_type *,
1392  map_type *>::type;
1393  using reference =
1394  typename std::conditional<is_const,
1395  typename map_type::const_reference,
1396  typename map_type::reference>::type;
1397  using pointer =
1398  typename std::conditional<is_const,
1399  typename map_type::const_pointer,
1400  typename map_type::pointer>::type;
1401 
1402  template <typename C, bool M, bool U>
1403  friend bool operator==(const hash_map_iterator<C, M> &i,
1404  const hash_map_iterator<C, U> &j);
1405 
1406  template <typename C, bool M, bool U>
1407  friend bool operator!=(const hash_map_iterator<C, M> &i,
1408  const hash_map_iterator<C, U> &j);
1409 
1410  friend class hash_map_iterator<map_type, true>;
1411 
1412 #if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
1413 private:
1414  template <typename Key, typename T, typename Hash, typename KeyEqual,
1415  typename MutexType, typename ScopedLockType>
1416  friend class ::pmem::obj::concurrent_hash_map;
1417 #else
1418 public: /* workaround */
1419 #endif
1420  hash_map_iterator(map_ptr map, size_t index)
1421  : my_map(map), my_index(index), my_bucket(nullptr), my_node(nullptr)
1422  {
1423  if (my_index <= my_map->mask()) {
1424  bucket_accessor acc(my_map, my_index);
1425  my_bucket = acc.get();
1426  my_node = static_cast<node *>(
1427  my_bucket->node_list.get(my_map->my_pool_uuid));
1428 
1429  if (!my_node) {
1430  advance_to_next_bucket();
1431  }
1432  }
1433  }
1434 
1435 public:
1437  hash_map_iterator() = default;
1438 
1440  hash_map_iterator(const hash_map_iterator &other)
1441  : my_map(other.my_map),
1442  my_index(other.my_index),
1443  my_bucket(other.my_bucket),
1444  my_node(other.my_node)
1445  {
1446  }
1447 
1449  template <typename U = void,
1450  typename = typename std::enable_if<is_const, U>::type>
1451  hash_map_iterator(const hash_map_iterator<map_type, false> &other)
1452  : my_map(other.my_map),
1453  my_index(other.my_index),
1454  my_bucket(other.my_bucket),
1455  my_node(other.my_node)
1456  {
1457  }
1458 
1459  hash_map_iterator &operator=(const hash_map_iterator &it) = default;
1460 
1462  reference operator*() const
1463  {
1464  assert(my_node);
1465  return my_node->item;
1466  }
1467 
1469  pointer operator->() const
1470  {
1471  return &operator*();
1472  }
1473 
1475  hash_map_iterator &
1476  operator++()
1477  {
1478  my_node = static_cast<node *>(
1479  my_node->next.get((my_map->my_pool_uuid)));
1480 
1481  if (!my_node)
1482  advance_to_next_bucket();
1483 
1484  return *this;
1485  }
1486 
1488  hash_map_iterator
1489  operator++(int)
1490  {
1491  hash_map_iterator old(*this);
1492  operator++();
1493  return old;
1494  }
1495 
1496 private:
1498  map_ptr my_map = nullptr;
1499 
1501  size_t my_index = 0;
1502 
1504  bucket *my_bucket = nullptr;
1505 
1507  node *my_node = nullptr;
1508 
1509  class bucket_accessor {
1510  public:
1511  bucket_accessor(map_ptr m, size_t index)
1512  {
1513  my_bucket = m->get_bucket(index);
1514  }
1515 
1516  bucket *
1517  get() const
1518  {
1519  return my_bucket;
1520  }
1521 
1522  private:
1523  bucket *my_bucket;
1524  };
1525 
1526  void
1527  advance_to_next_bucket()
1528  {
1529  size_t k = my_index + 1;
1530 
1531  assert(my_bucket);
1532 
1533  while (k <= my_map->mask()) {
1534  bucket_accessor acc(my_map, k);
1535  my_bucket = acc.get();
1536 
1537  if (my_bucket->node_list) {
1538  my_node = static_cast<node *>(
1539  my_bucket->node_list.get(
1540  my_map->my_pool_uuid));
1541 
1542  my_index = k;
1543 
1544  return;
1545  }
1546 
1547  ++k;
1548  }
1549 
1550  my_bucket = 0;
1551  my_node = 0;
1552  my_index = k;
1553  }
1554 };
1555 
1556 template <typename Container, bool M, bool U>
1557 bool
1558 operator==(const hash_map_iterator<Container, M> &i,
1559  const hash_map_iterator<Container, U> &j)
1560 {
1561  return i.my_node == j.my_node && i.my_map == j.my_map;
1562 }
1563 
1564 template <typename Container, bool M, bool U>
1565 bool
1566 operator!=(const hash_map_iterator<Container, M> &i,
1567  const hash_map_iterator<Container, U> &j)
1568 {
1569  return i.my_node != j.my_node || i.my_map != j.my_map;
1570 }
1571 } /* namespace concurrent_hash_map_internal */
1624 template <typename Key, typename T, typename Hash, typename KeyEqual,
1625  typename MutexType, typename ScopedLockType>
1627  : protected concurrent_hash_map_internal::hash_map_base<Key, T, MutexType,
1628  ScopedLockType> {
1629  template <typename Container, bool is_const>
1630  friend class concurrent_hash_map_internal::hash_map_iterator;
1631 
1632 public:
1633  using size_type = typename concurrent_hash_map_internal::hash_map_base<
1634  Key, T, MutexType, ScopedLockType>::size_type;
1635  using hashcode_type =
1636  typename concurrent_hash_map_internal::hash_map_base<
1637  Key, T, MutexType, ScopedLockType>::hashcode_type;
1638  using key_type = Key;
1639  using mapped_type = T;
1640  using value_type = typename concurrent_hash_map_internal::hash_map_base<
1641  Key, T, MutexType, ScopedLockType>::node::value_type;
1642  using difference_type = ptrdiff_t;
1643  using pointer = value_type *;
1644  using const_pointer = const value_type *;
1645  using reference = value_type &;
1646  using const_reference = const value_type &;
1647  using iterator = concurrent_hash_map_internal::hash_map_iterator<
1648  concurrent_hash_map, false>;
1649  using const_iterator = concurrent_hash_map_internal::hash_map_iterator<
1650  concurrent_hash_map, true>;
1651  using hasher = Hash;
1652  using key_equal = typename concurrent_hash_map_internal::key_equal_type<
1653  Hash, KeyEqual>::type;
1654 
1655 protected:
1656  using mutex_t = MutexType;
1657  using scoped_t = ScopedLockType;
1658  /*
1659  * Explicitly use methods and types from template base class
1660  */
1661  using hash_map_base =
1662  concurrent_hash_map_internal::hash_map_base<Key, T, mutex_t,
1663  scoped_t>;
1664  using hash_map_base::calculate_mask;
1665  using hash_map_base::check_growth;
1666  using hash_map_base::check_mask_race;
1667  using hash_map_base::embedded_buckets;
1668  using hash_map_base::FEATURE_CONSISTENT_SIZE;
1669  using hash_map_base::get_bucket;
1670  using hash_map_base::get_pool_base;
1671  using hash_map_base::header_features;
1672  using hash_map_base::insert_new_node;
1673  using hash_map_base::internal_swap;
1674  using hash_map_base::layout_features;
1675  using hash_map_base::mask;
1676  using hash_map_base::reserve;
1677  using tls_t = typename hash_map_base::tls_t;
1678  using node = typename hash_map_base::node;
1679  using node_mutex_t = typename node::mutex_t;
1680  using node_ptr_t = typename hash_map_base::node_ptr_t;
1681  using bucket = typename hash_map_base::bucket;
1682  using bucket_lock_type = typename bucket::scoped_t;
1683  using segment_index_t = typename hash_map_base::segment_index_t;
1684  using segment_traits_t = typename hash_map_base::segment_traits_t;
1685  using segment_facade_t = typename hash_map_base::segment_facade_t;
1686  using scoped_lock_traits_type =
1687  concurrent_hash_map_internal::scoped_lock_traits<scoped_t>;
1688 
1689  friend class const_accessor;
1690  using persistent_node_ptr_t = detail::persistent_pool_ptr<node>;
1691 
1692  void
1693  delete_node(const node_ptr_t &n)
1694  {
1695  delete_persistent<node>(
1696  detail::static_persistent_pool_pointer_cast<node>(n)
1697  .get_persistent_ptr(this->my_pool_uuid));
1698  }
1699 
1700  template <typename K>
1701  persistent_node_ptr_t
1702  search_bucket(const K &key, bucket *b) const
1703  {
1704  assert(b->is_rehashed(std::memory_order_relaxed));
1705 
1706  persistent_node_ptr_t n =
1707  detail::static_persistent_pool_pointer_cast<node>(
1708  b->node_list);
1709 
1710  while (n &&
1711  !key_equal{}(key,
1712  n.get(this->my_pool_uuid)->item.first)) {
1713  n = detail::static_persistent_pool_pointer_cast<node>(
1714  n.get(this->my_pool_uuid)->next);
1715  }
1716 
1717  return n;
1718  }
1719 
1724  class bucket_accessor : public bucket_lock_type {
1725  bucket *my_b;
1726 
1727  public:
1728  bucket_accessor(bucket_accessor &&b) noexcept : my_b(b.my_b)
1729  {
1730  bucket_lock_type::mutex = b.bucket_lock_type::mutex;
1731  bucket_lock_type::is_writer =
1732  b.bucket_lock_type::is_writer;
1733  b.my_b = nullptr;
1734  b.bucket_lock_type::mutex = nullptr;
1735  b.bucket_lock_type::is_writer = false;
1736  }
1737 
1739  const hashcode_type h, bool writer = false)
1740  {
1741  acquire(base, h, writer);
1742  }
1743 
1744  bucket_accessor(const bucket_accessor &other) = delete;
1745 
1746  bucket_accessor &
1747  operator=(const bucket_accessor &other) = delete;
1748 
1753  inline void
1754  acquire(concurrent_hash_map *base, const hashcode_type h,
1755  bool writer = false)
1756  {
1757  my_b = base->get_bucket(h);
1758 
1759  if (my_b->is_rehashed(std::memory_order_acquire) ==
1760  false &&
1761  bucket_lock_type::try_acquire(this->my_b->mutex,
1762  /*write=*/true)) {
1763  if (my_b->is_rehashed(
1764  std::memory_order_relaxed) ==
1765  false) {
1766  /* recursive rehashing */
1767  base->rehash_bucket<false>(my_b, h);
1768  }
1769  } else {
1770  bucket_lock_type::acquire(my_b->mutex, writer);
1771  }
1772 
1773  assert(my_b->is_rehashed(std::memory_order_relaxed));
1774  }
1775 
1779  bool
1780  is_writer() const
1781  {
1782  return bucket_lock_type::is_writer;
1783  }
1784 
1789  bucket *
1790  get() const
1791  {
1792  return my_b;
1793  }
1794 
1799  bucket *operator->() const
1800  {
1801  return this->get();
1802  }
1803  };
1804 
1809  bucket *my_b;
1810 
1811  public:
1813  const hashcode_type h,
1814  bool writer = false)
1815  {
1816  acquire(base, h, writer);
1817  }
1818 
1819  /*
1820  * Find a bucket by masked hashcode, optionally rehash
1821  */
1822  inline void
1823  acquire(concurrent_hash_map *base, const hashcode_type h,
1824  bool writer = false)
1825  {
1826  my_b = base->get_bucket(h);
1827 
1828  if (my_b->is_rehashed(std::memory_order_relaxed) ==
1829  false) {
1830  /* recursive rehashing */
1831  base->rehash_bucket<true>(my_b, h);
1832  }
1833 
1834  assert(my_b->is_rehashed(std::memory_order_relaxed));
1835  }
1836 
1843  bool
1844  is_writer() const
1845  {
1846  return true;
1847  }
1848 
1853  bucket *
1854  get() const
1855  {
1856  return my_b;
1857  }
1858 
1863  bucket *operator->() const
1864  {
1865  return this->get();
1866  }
1867  };
1868 
1869  hashcode_type
1870  get_hash_code(node_ptr_t &n)
1871  {
1872  return hasher{}(
1873  detail::static_persistent_pool_pointer_cast<node>(n)(
1874  this->my_pool_uuid)
1875  ->item.first);
1876  }
1877 
1878  template <bool serial>
1879  void
1880  rehash_bucket(bucket *b_new, const hashcode_type h)
1881  {
1882  using accessor_type = typename std::conditional<
1883  serial, serial_bucket_accessor, bucket_accessor>::type;
1884 
1885  using scoped_lock_traits_type =
1886  concurrent_hash_map_internal::scoped_lock_traits<
1887  accessor_type>;
1888 
1889  /* First two bucket should be always rehashed */
1890  assert(h > 1);
1891 
1892  pool_base pop = get_pool_base();
1893  node_ptr_t *p_new = &(b_new->node_list);
1894 
1895  /* This condition is only true when there was a failure just
1896  * before setting rehashed flag */
1897  if (*p_new != nullptr) {
1898  assert(!b_new->is_rehashed(std::memory_order_relaxed));
1899 
1900  b_new->set_rehashed(std::memory_order_relaxed);
1901  pop.persist(b_new->rehashed);
1902 
1903  return;
1904  }
1905 
1906  /* get parent mask from the topmost bit */
1907  hashcode_type mask = (1u << detail::Log2(h)) - 1;
1908  assert((h & mask) < h);
1909  accessor_type b_old(
1910  this, h & mask,
1911  scoped_lock_traits_type::initial_rw_state(true));
1912 
1914  /* get full mask for new bucket */
1915  mask = (mask << 1) | 1;
1916  assert((mask & (mask + 1)) == 0 && (h & mask) == h);
1917 
1918  restart:
1919  for (node_ptr_t *p_old = &(b_old->node_list),
1920  n = *p_old;
1921  n; n = *p_old) {
1922  hashcode_type c = get_hash_code(n);
1923 #ifndef NDEBUG
1924  hashcode_type bmask = h & (mask >> 1);
1925 
1926  bmask = bmask == 0
1927  ? 1 /* minimal mask of parent bucket */
1928  : (1u << (detail::Log2(bmask) + 1)) - 1;
1929 
1930  assert((c & bmask) == (h & bmask));
1931 #endif
1932 
1933  if ((c & mask) == h) {
1934  if (!b_old.is_writer() &&
1935  !scoped_lock_traits_type::
1936  upgrade_to_writer(b_old)) {
1937  goto restart;
1938  /* node ptr can be invalid due
1939  * to concurrent erase */
1940  }
1941 
1942  /* Add to new b_new */
1943  *p_new = n;
1944 
1945  /* exclude from b_old */
1946  *p_old = n(this->my_pool_uuid)->next;
1947 
1948  p_new = &(n(this->my_pool_uuid)->next);
1949  } else {
1950  /* iterate to next item */
1951  p_old = &(n(this->my_pool_uuid)->next);
1952  }
1953  }
1954 
1955  *p_new = nullptr;
1956  });
1957 
1958  /* mark rehashed */
1959  b_new->set_rehashed(std::memory_order_release);
1960  pop.persist(b_new->rehashed);
1961  }
1962 
1963  void
1964  check_incompat_features()
1965  {
1966  if (layout_features.incompat != header_features().incompat)
1967  throw pmem::layout_error(
1968  "Incompat flags mismatch, for more details go to: https://pmem.io/pmdk/cpp_obj/ \n");
1969 
1970  if ((layout_features.compat & FEATURE_CONSISTENT_SIZE) &&
1971  this->value_size != sizeof(value_type))
1972  throw pmem::layout_error(
1973  "Size of value_type is different than the one stored in the pool \n");
1974  }
1975 
1976 public:
1977  class accessor;
1982  : protected node::scoped_t /*which derived from no_copy*/ {
1983  friend class concurrent_hash_map<Key, T, Hash, KeyEqual,
1984  mutex_t, scoped_t>;
1985  friend class accessor;
1987  using node::scoped_t::try_acquire;
1988 
1989  public:
1993  using value_type =
1994  const typename concurrent_hash_map::value_type;
1995 
2000  bool
2001  empty() const
2002  {
2003  return !my_node;
2004  }
2005 
2012  void
2014  {
2015  concurrent_hash_map_internal::check_outside_tx();
2016 
2017  if (my_node) {
2018  node::scoped_t::release();
2019  my_node = 0;
2020  }
2021  }
2022 
2026  const_reference operator*() const
2027  {
2028  assert(my_node);
2029 
2030  return my_node->item;
2031  }
2032 
2036  const_pointer operator->() const
2037  {
2038  return &operator*();
2039  }
2040 
2046  const_accessor() : my_node(OID_NULL), my_hash()
2047  {
2048  concurrent_hash_map_internal::check_outside_tx();
2049  }
2050 
2055  {
2056  my_node = OID_NULL; // scoped lock's release() is called
2057  // in its destructor
2058  }
2059 
2060  protected:
2061  node_ptr_t my_node;
2062 
2063  hashcode_type my_hash;
2064  };
2065 
2070  class accessor : public const_accessor {
2071  public:
2073  using value_type = typename concurrent_hash_map::value_type;
2074 
2076  reference operator*() const
2077  {
2078  assert(this->my_node);
2079 
2080  return this->my_node->item;
2081  }
2082 
2084  pointer operator->() const
2085  {
2086  return &operator*();
2087  }
2088  };
2089 
2093  concurrent_hash_map() : hash_map_base()
2094  {
2096  }
2097 
2102  concurrent_hash_map(size_type n) : hash_map_base()
2103  {
2105 
2106  reserve(n);
2107  }
2108 
2112  concurrent_hash_map(const concurrent_hash_map &table) : hash_map_base()
2113  {
2115 
2116  reserve(table.size());
2117 
2118  internal_copy(table);
2119  }
2120 
2124  concurrent_hash_map(concurrent_hash_map &&table) : hash_map_base()
2125  {
2127 
2128  swap(table);
2129  }
2130 
2134  template <typename I>
2135  concurrent_hash_map(I first, I last)
2136  {
2138 
2139  reserve(static_cast<size_type>(std::distance(first, last)));
2140 
2141  internal_copy(first, last);
2142  }
2143 
2147  concurrent_hash_map(std::initializer_list<value_type> il)
2148  {
2150 
2151  reserve(il.size());
2152 
2153  internal_copy(il.begin(), il.end());
2154  }
2155 
2164  void
2166  {
2167  check_incompat_features();
2168 
2169  calculate_mask();
2170 
2171  /*
2172  * Handle case where hash_map was created without
2173  * FEATURE_CONSISTENT_SIZE.
2174  */
2175  if (!(layout_features.compat & FEATURE_CONSISTENT_SIZE)) {
2176  auto actual_size =
2177  std::distance(this->begin(), this->end());
2178  assert(actual_size >= 0);
2179 
2180  this->my_size = static_cast<size_t>(actual_size);
2181 
2182  auto pop = get_pool_base();
2183  flat_transaction::run(pop, [&] {
2184  this->tls_ptr = make_persistent<tls_t>();
2185  this->on_init_size =
2186  static_cast<size_t>(actual_size);
2187  this->value_size = sizeof(value_type);
2188 
2189  layout_features.compat |=
2190  FEATURE_CONSISTENT_SIZE;
2191  });
2192  } else {
2193  assert(this->tls_ptr != nullptr);
2194  this->tls_restore();
2195  }
2196 
2197  assert(this->size() ==
2198  size_type(std::distance(this->begin(), this->end())));
2199  }
2200 
2201  [[deprecated(
2202  "runtime_initialize(bool) is now deprecated, use runtime_initialize(void)")]] void
2203  runtime_initialize(bool graceful_shutdown)
2204  {
2205  check_incompat_features();
2206 
2207  calculate_mask();
2208 
2209  if (!graceful_shutdown) {
2210  auto actual_size =
2211  std::distance(this->begin(), this->end());
2212  assert(actual_size >= 0);
2213  this->my_size = static_cast<size_type>(actual_size);
2214  } else {
2215  assert(this->size() ==
2216  size_type(std::distance(this->begin(),
2217  this->end())));
2218  }
2219  }
2220 
2232  concurrent_hash_map &
2234  {
2235  if (this != &table) {
2236  clear();
2237  internal_copy(table);
2238  }
2239 
2240  return *this;
2241  }
2242 
2255  operator=(std::initializer_list<value_type> il)
2256  {
2257  clear();
2258 
2259  reserve(il.size());
2260 
2261  internal_copy(il.begin(), il.end());
2262 
2263  return *this;
2264  }
2265 
2274  void rehash(size_type n = 0);
2275 
2282  void clear();
2283 
2301  void
2303  {
2304  if (!this->tls_ptr)
2305  return;
2306 
2307  auto pop = get_pool_base();
2308 
2309  flat_transaction::run(pop, [&] {
2310  clear();
2311  this->free_tls();
2312  });
2313  }
2314 
2324  {
2325  try {
2326  free_data();
2327  } catch (...) {
2328  std::terminate();
2329  }
2330  }
2331 
2332  //------------------------------------------------------------------------
2333  // STL support - not thread-safe methods
2334  //------------------------------------------------------------------------
2335 
2342  iterator
2344  {
2345  return iterator(this, 0);
2346  }
2347 
2352  iterator
2354  {
2355  return iterator(this, mask() + 1);
2356  }
2357 
2362  const_iterator
2363  begin() const
2364  {
2365  return const_iterator(this, 0);
2366  }
2367 
2372  const_iterator
2373  end() const
2374  {
2375  return const_iterator(this, mask() + 1);
2376  }
2377 
2381  size_type
2382  size() const
2383  {
2384  return hash_map_base::size();
2385  }
2386 
2390  bool
2391  empty() const
2392  {
2393  return this->size() == 0;
2394  }
2395 
2399  size_type
2400  max_size() const
2401  {
2402  return (~size_type(0)) / sizeof(node);
2403  }
2404 
2408  size_type
2410  {
2411  return mask() + 1;
2412  }
2413 
2417  void swap(concurrent_hash_map &table);
2418 
2419  //------------------------------------------------------------------------
2420  // concurrent map operations
2421  //------------------------------------------------------------------------
2422 
2428  size_type
2429  count(const Key &key) const
2430  {
2431  concurrent_hash_map_internal::check_outside_tx();
2432 
2433  return const_cast<concurrent_hash_map *>(this)->internal_find(
2434  key, nullptr, false);
2435  }
2436 
2448  template <typename K,
2449  typename = typename std::enable_if<
2450  concurrent_hash_map_internal::
2451  has_transparent_key_equal<hasher>::value,
2452  K>::type>
2453  size_type
2454  count(const K &key) const
2455  {
2456  concurrent_hash_map_internal::check_outside_tx();
2457 
2458  return const_cast<concurrent_hash_map *>(this)->internal_find(
2459  key, nullptr, false);
2460  }
2461 
2468  bool
2469  find(const_accessor &result, const Key &key) const
2470  {
2471  concurrent_hash_map_internal::check_outside_tx();
2472 
2473  result.release();
2474 
2475  return const_cast<concurrent_hash_map *>(this)->internal_find(
2476  key, &result, false);
2477  }
2478 
2492  template <typename K,
2493  typename = typename std::enable_if<
2494  concurrent_hash_map_internal::
2495  has_transparent_key_equal<hasher>::value,
2496  K>::type>
2497  bool
2498  find(const_accessor &result, const K &key) const
2499  {
2500  concurrent_hash_map_internal::check_outside_tx();
2501 
2502  result.release();
2503 
2504  return const_cast<concurrent_hash_map *>(this)->internal_find(
2505  key, &result, false);
2506  }
2507 
2514  bool
2515  find(accessor &result, const Key &key)
2516  {
2517  concurrent_hash_map_internal::check_outside_tx();
2518 
2519  result.release();
2520 
2521  return internal_find(key, &result, true);
2522  }
2523 
2537  template <typename K,
2538  typename = typename std::enable_if<
2539  concurrent_hash_map_internal::
2540  has_transparent_key_equal<hasher>::value,
2541  K>::type>
2542  bool
2543  find(accessor &result, const K &key)
2544  {
2545  concurrent_hash_map_internal::check_outside_tx();
2546 
2547  result.release();
2548 
2549  return internal_find(key, &result, true);
2550  }
2558  bool
2559  insert(const_accessor &result, const Key &key)
2560  {
2561  concurrent_hash_map_internal::check_outside_tx();
2562 
2563  result.release();
2564 
2565  return internal_insert(key, &result, false, key);
2566  }
2567 
2575  bool
2576  insert(accessor &result, const Key &key)
2577  {
2578  concurrent_hash_map_internal::check_outside_tx();
2579 
2580  result.release();
2581 
2582  return internal_insert(key, &result, true, key);
2583  }
2584 
2592  bool
2593  insert(const_accessor &result, const value_type &value)
2594  {
2595  concurrent_hash_map_internal::check_outside_tx();
2596 
2597  result.release();
2598 
2599  return internal_insert(value.first, &result, false, value);
2600  }
2601 
2609  bool
2610  insert(accessor &result, const value_type &value)
2611  {
2612  concurrent_hash_map_internal::check_outside_tx();
2613 
2614  result.release();
2615 
2616  return internal_insert(value.first, &result, true, value);
2617  }
2618 
2625  bool
2626  insert(const value_type &value)
2627  {
2628  concurrent_hash_map_internal::check_outside_tx();
2629 
2630  return internal_insert(value.first, nullptr, false, value);
2631  }
2632 
2640  bool
2641  insert(const_accessor &result, value_type &&value)
2642  {
2643  concurrent_hash_map_internal::check_outside_tx();
2644 
2645  result.release();
2646 
2647  return internal_insert(value.first, &result, false,
2648  std::move(value));
2649  }
2650 
2658  bool
2659  insert(accessor &result, value_type &&value)
2660  {
2661  concurrent_hash_map_internal::check_outside_tx();
2662 
2663  result.release();
2664 
2665  return internal_insert(value.first, &result, true,
2666  std::move(value));
2667  }
2668 
2675  bool
2676  insert(value_type &&value)
2677  {
2678  concurrent_hash_map_internal::check_outside_tx();
2679 
2680  return internal_insert(value.first, nullptr, false,
2681  std::move(value));
2682  }
2683 
2689  template <typename I>
2690  void
2691  insert(I first, I last)
2692  {
2693  concurrent_hash_map_internal::check_outside_tx();
2694 
2695  for (; first != last; ++first)
2696  insert(*first);
2697  }
2698 
2704  void
2705  insert(std::initializer_list<value_type> il)
2706  {
2707  concurrent_hash_map_internal::check_outside_tx();
2708 
2709  insert(il.begin(), il.end());
2710  }
2711 
2720  template <typename M>
2721  bool
2722  insert_or_assign(const key_type &key, M &&obj)
2723  {
2724  concurrent_hash_map_internal::check_outside_tx();
2725 
2726  accessor acc;
2727  auto result = internal_insert(key, &acc, true, key,
2728  std::forward<M>(obj));
2729 
2730  if (!result) {
2731  pool_base pop = get_pool_base();
2733  acc->second = std::forward<M>(obj);
2735  }
2736 
2737  return result;
2738  }
2739 
2748  template <typename M>
2749  bool
2750  insert_or_assign(key_type &&key, M &&obj)
2751  {
2752  concurrent_hash_map_internal::check_outside_tx();
2753 
2754  accessor acc;
2755  auto result = internal_insert(key, &acc, true, std::move(key),
2756  std::forward<M>(obj));
2757 
2758  if (!result) {
2759  pool_base pop = get_pool_base();
2761  acc->second = std::forward<M>(obj);
2763  }
2764 
2765  return result;
2766  }
2767 
2776  template <
2777  typename K, typename M,
2778  typename = typename std::enable_if<
2779  concurrent_hash_map_internal::has_transparent_key_equal<
2780  hasher>::value &&
2781  std::is_constructible<key_type, K>::value,
2782  K>::type>
2783  bool
2784  insert_or_assign(K &&key, M &&obj)
2785  {
2786  concurrent_hash_map_internal::check_outside_tx();
2787 
2788  accessor acc;
2789  auto result =
2790  internal_insert(key, &acc, true, std::forward<K>(key),
2791  std::forward<M>(obj));
2792 
2793  if (!result) {
2794  pool_base pop = get_pool_base();
2796  acc->second = std::forward<M>(obj);
2798  }
2799 
2800  return result;
2801  }
2802 
2811  bool
2812  erase(const Key &key)
2813  {
2814  concurrent_hash_map_internal::check_outside_tx();
2815 
2816  return internal_erase(key);
2817  }
2818 
2837  pobj_defrag_result
2838  defragment(double start_percent = 0, double amount_percent = 100)
2839  {
2840  double end_percent = start_percent + amount_percent;
2841  if (start_percent < 0 || start_percent >= 100 ||
2842  end_percent < 0 || end_percent > 100 ||
2843  start_percent >= end_percent) {
2844  throw std::range_error("incorrect range");
2845  }
2846 
2847  size_t max_index = mask().load(std::memory_order_acquire);
2848  size_t start_index =
2849  static_cast<size_t>((start_percent * max_index) / 100);
2850  size_t end_index =
2851  static_cast<size_t>((end_percent * max_index) / 100);
2852 
2853  /* Make sure we do not use too big index, even in case of
2854  * rounding errors. */
2855  end_index = (std::min)(end_index, max_index);
2856 
2857 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
2858  ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
2859 #endif
2860 
2861  /* Create defrag object for elements in the current pool */
2862  pmem::obj::defrag my_defrag(this->get_pool_base());
2863  mutex_vector mv;
2864 
2865  /*
2866  * Locks are taken in the backward order to avoid deadlocks
2867  * with the rehashing of buckets.
2868  *
2869  * We do '+ 1' and '- 1' to handle the 'i == 0' case.
2870  */
2871  for (size_t i = end_index + 1; i >= start_index + 1; i--) {
2872  /*
2873  * All locks will be unlocked automatically
2874  * in the destructor of 'mv'.
2875  */
2876  bucket *b = mv.push_and_try_lock(this, i - 1);
2877  if (b == nullptr)
2878  continue;
2879 
2880  defrag_save_nodes(b, my_defrag);
2881  }
2882 
2883  return my_defrag.run();
2884  }
2885 
2900  template <typename K,
2901  typename = typename std::enable_if<
2902  concurrent_hash_map_internal::
2903  has_transparent_key_equal<hasher>::value,
2904  K>::type>
2905  bool
2906  erase(const K &key)
2907  {
2908  concurrent_hash_map_internal::check_outside_tx();
2909 
2910  return internal_erase(key);
2911  }
2912 
2913 protected:
2914  /*
2915  * Try to acquire the mutex for read or write.
2916  *
2917  * If acquiring succeeds returns true, otherwise retries for few times.
2918  * If acquiring fails after all attempts returns false.
2919  */
2920  bool try_acquire_item(const_accessor *result, node_mutex_t &mutex,
2921  bool write);
2922 
2928  public:
2929  using mutex_t = MutexType;
2930 
2932  bucket *
2933  push_and_try_lock(concurrent_hash_map *base, hashcode_type h)
2934  {
2935  vec.emplace_back(base, h, true /*writer*/);
2936  bucket *b = vec.back().get();
2937 
2938  auto node_ptr = static_cast<node *>(
2939  b->node_list.get(base->my_pool_uuid));
2940 
2941  while (node_ptr) {
2942  const_accessor ca;
2943  if (!base->try_acquire_item(&ca,
2944  node_ptr->mutex,
2945  /*write=*/true)) {
2946  vec.pop_back();
2947  return nullptr;
2948  }
2949 
2950  node_ptr =
2951  static_cast<node *>(node_ptr->next.get(
2952  (base->my_pool_uuid)));
2953  }
2954 
2955  return b;
2956  }
2957 
2958  private:
2959  std::vector<bucket_accessor> vec;
2960  };
2961 
2962  template <typename K>
2963  bool internal_find(const K &key, const_accessor *result, bool write);
2964 
2965  template <typename K, typename... Args>
2966  bool internal_insert(const K &key, const_accessor *result, bool write,
2967  Args &&... args);
2968 
2969  /* Obtain pointer to node and lock bucket */
2970  template <bool Bucket_rw_lock, typename K>
2971  persistent_node_ptr_t
2972  get_node(const K &key, bucket_accessor &b)
2973  {
2974  /* find a node */
2975  auto n = search_bucket(key, b.get());
2976 
2977  if (!n) {
2978  if (Bucket_rw_lock && !b.is_writer() &&
2979  !scoped_lock_traits_type::upgrade_to_writer(b)) {
2980  /* Rerun search_list, in case another
2981  * thread inserted the item during the
2982  * upgrade. */
2983  n = search_bucket(key, b.get());
2984  if (n) {
2985  /* unfortunately, it did */
2986  scoped_lock_traits_type::
2987  downgrade_to_reader(b);
2988  return n;
2989  }
2990  }
2991  }
2992 
2993  return n;
2994  }
2995 
2996  template <typename K>
2997  bool internal_erase(const K &key);
2998 
2999  void clear_segment(segment_index_t s);
3000 
3004  void internal_copy(const concurrent_hash_map &source);
3005 
3006  template <typename I>
3007  void internal_copy(I first, I last);
3008 
3013  void
3015  {
3016  auto node_ptr = static_cast<node *>(
3017  b->node_list.get(this->my_pool_uuid));
3018 
3019  while (node_ptr) {
3020  /*
3021  * We do not perform the defragmentation
3022  * on node pointers, because nodes
3023  * always have the same size.
3024  */
3025  defrag.add(node_ptr->item.first);
3026  defrag.add(node_ptr->item.second);
3027 
3028  node_ptr = static_cast<node *>(
3029  node_ptr->next.get((this->my_pool_uuid)));
3030  }
3031  }
3032 }; // class concurrent_hash_map
3033 
3034 template <typename Key, typename T, typename Hash, typename KeyEqual,
3035  typename MutexType, typename ScopedLockType>
3036 bool
3037 concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3038  ScopedLockType>::try_acquire_item(const_accessor *result,
3039  node_mutex_t &mutex,
3040  bool write)
3041 {
3042  /* acquire the item */
3043  if (!result->try_acquire(mutex, write)) {
3044  for (detail::atomic_backoff backoff(true);;) {
3045  if (result->try_acquire(mutex, write))
3046  break;
3047 
3048  if (!backoff.bounded_pause())
3049  return false;
3050  }
3051  }
3052 
3053  return true;
3054 }
3055 
3056 template <typename Key, typename T, typename Hash, typename KeyEqual,
3057  typename MutexType, typename ScopedLockType>
3058 template <typename K>
3059 bool
3060 concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3061  ScopedLockType>::internal_find(const K &key,
3062  const_accessor *result,
3063  bool write)
3064 {
3065  assert(!result || !result->my_node);
3066 
3067  hashcode_type m = mask().load(std::memory_order_acquire);
3068 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3069  ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3070 #endif
3071 
3072  assert((m & (m + 1)) == 0);
3073 
3074  hashcode_type const h = hasher{}(key);
3075 
3076  persistent_node_ptr_t node;
3077 
3078  while (true) {
3079  /* get bucket and acquire the lock */
3080  bucket_accessor b(
3081  this, h & m,
3082  scoped_lock_traits_type::initial_rw_state(false));
3083  node = get_node<false>(key, b);
3084 
3085  if (!node) {
3086  /* Element was possibly relocated, try again */
3087  if (check_mask_race(h, m)) {
3088  b.release();
3089  continue;
3090  } else {
3091  return false;
3092  }
3093  }
3094 
3095  /* No need to acquire the item or item acquired */
3096  if (!result ||
3097  try_acquire_item(
3098  result, node.get(this->my_pool_uuid)->mutex, write))
3099  break;
3100 
3101  /* the wait takes really long, restart the
3102  * operation */
3103  b.release();
3104 
3105  std::this_thread::yield();
3106 
3107  m = mask().load(std::memory_order_acquire);
3108 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3109  ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3110 #endif
3111  }
3112 
3113  if (result) {
3114  result->my_node = node.get_persistent_ptr(this->my_pool_uuid);
3115  result->my_hash = h;
3116  }
3117 
3118  return true;
3119 }
3120 
3121 template <typename Key, typename T, typename Hash, typename KeyEqual,
3122  typename MutexType, typename ScopedLockType>
3123 template <typename K, typename... Args>
3124 bool
3125 concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3126  ScopedLockType>::internal_insert(const K &key,
3127  const_accessor *result,
3128  bool write,
3129  Args &&... args)
3130 {
3131  assert(!result || !result->my_node);
3132 
3133  hashcode_type m = mask().load(std::memory_order_acquire);
3134 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3135  ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3136 #endif
3137 
3138  assert((m & (m + 1)) == 0);
3139 
3140  hashcode_type const h = hasher{}(key);
3141 
3142  persistent_node_ptr_t node;
3143  size_t new_size = 0;
3144  bool inserted = false;
3145 
3146  while (true) {
3147  /* get bucket and acquire the lock */
3148  bucket_accessor b(
3149  this, h & m,
3150  scoped_lock_traits_type::initial_rw_state(true));
3151  node = get_node<true>(key, b);
3152 
3153  if (!node) {
3154  /* Element was possibly relocated, try again */
3155  if (check_mask_race(h, m)) {
3156  b.release();
3157  continue;
3158  }
3159 
3160  /* insert and set flag to grow the container */
3161  new_size = insert_new_node(b.get(), node,
3162  std::forward<Args>(args)...);
3163  inserted = true;
3164  }
3165 
3166  /* No need to acquire the item or item acquired */
3167  if (!result ||
3168  try_acquire_item(
3169  result, node.get(this->my_pool_uuid)->mutex, write))
3170  break;
3171 
3172  /* the wait takes really long, restart the
3173  * operation */
3174  b.release();
3175 
3176  std::this_thread::yield();
3177 
3178  m = mask().load(std::memory_order_acquire);
3179 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3180  ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3181 #endif
3182  }
3183 
3184  if (result) {
3185  result->my_node = node.get_persistent_ptr(this->my_pool_uuid);
3186  result->my_hash = h;
3187  }
3188 
3189  check_growth(m, new_size);
3190 
3191  return inserted;
3192 }
3193 
3194 template <typename Key, typename T, typename Hash, typename KeyEqual,
3195  typename MutexType, typename ScopedLockType>
3196 template <typename K>
3197 bool
3198 concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3199  ScopedLockType>::internal_erase(const K &key)
3200 {
3201  node_ptr_t n;
3202  hashcode_type const h = hasher{}(key);
3203  hashcode_type m = mask().load(std::memory_order_acquire);
3204 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
3205  ANNOTATE_HAPPENS_AFTER(&(this->my_mask));
3206 #endif
3207 
3208  pool_base pop = get_pool_base();
3209 
3210 restart : {
3211  /* lock scope */
3212  /* get bucket */
3213  bucket_accessor b(this, h & m,
3214  scoped_lock_traits_type::initial_rw_state(true));
3215 
3216 search:
3217  node_ptr_t *p = &b->node_list;
3218  n = *p;
3219 
3220  while (n &&
3221  !key_equal{}(key,
3222  detail::static_persistent_pool_pointer_cast<node>(
3223  n)(this->my_pool_uuid)
3224  ->item.first)) {
3225  p = &n(this->my_pool_uuid)->next;
3226  n = *p;
3227  }
3228 
3229  if (!n) {
3230  /* not found, but mask could be changed */
3231  if (check_mask_race(h, m))
3232  goto restart;
3233 
3234  return false;
3235  } else if (!b.is_writer() &&
3236  !scoped_lock_traits_type::upgrade_to_writer(b)) {
3237  if (check_mask_race(h, m)) /* contended upgrade, check mask */
3238  goto restart;
3239 
3240  goto search;
3241  }
3242 
3243  persistent_ptr<node> del = n(this->my_pool_uuid);
3244 
3245  {
3246  /* We cannot remove this element immediately because
3247  * other threads might work with this element via
3248  * accessors. The item_locker required to wait while
3249  * other threads use the node. */
3250  const_accessor acc;
3251  if (!try_acquire_item(&acc, del->mutex, true)) {
3252  /* the wait takes really long, restart the operation */
3253  b.release();
3254 
3255  std::this_thread::yield();
3256 
3257  m = mask().load(std::memory_order_acquire);
3258 
3259  goto restart;
3260  }
3261  }
3262 
3263  assert(pmemobj_tx_stage() == TX_STAGE_NONE);
3264 
3265  auto &size_diff = this->thread_size_diff();
3266 
3267  /* Only one thread can delete it due to write lock on the bucket
3268  */
3269  flat_transaction::run(pop, [&] {
3270  *p = del->next;
3271  delete_node(del);
3272 
3273  --size_diff;
3274  });
3275 
3276  --(this->my_size);
3277 }
3278 
3279  return true;
3280 }
3281 
3282 template <typename Key, typename T, typename Hash, typename KeyEqual,
3283  typename MutexType, typename ScopedLockType>
3284 void
3287 {
3288  internal_swap(table);
3289 }
3290 
3291 template <typename Key, typename T, typename Hash, typename KeyEqual,
3292  typename MutexType, typename ScopedLockType>
3293 void
3295  size_type sz)
3296 {
3297  concurrent_hash_map_internal::check_outside_tx();
3298 
3299  reserve(sz);
3300  hashcode_type m = mask();
3301 
3302  /* only the last segment should be scanned for rehashing size or first
3303  * index of the last segment */
3304  hashcode_type b = (m + 1) >> 1;
3305 
3306  /* zero or power of 2 */
3307  assert((b & (b - 1)) == 0);
3308 
3309  for (; b <= m; ++b) {
3310  bucket *bp = get_bucket(b);
3311 
3312  concurrent_hash_map_internal::assert_not_locked<mutex_t,
3313  scoped_t>(
3314  bp->mutex);
3315  /* XXX Need to investigate if this statement is needed */
3316  if (bp->is_rehashed(std::memory_order_relaxed) == false)
3317  rehash_bucket<true>(bp, b);
3318  }
3319 }
3320 
3321 template <typename Key, typename T, typename Hash, typename KeyEqual,
3322  typename MutexType, typename ScopedLockType>
3323 void
3325 {
3326  hashcode_type m = mask();
3327 
3328  assert((m & (m + 1)) == 0);
3329 
3330 #ifndef NDEBUG
3331  /* check consistency */
3332  for (segment_index_t b = 0; b <= m; ++b) {
3333  bucket *bp = get_bucket(b);
3334  concurrent_hash_map_internal::assert_not_locked<mutex_t,
3335  scoped_t>(
3336  bp->mutex);
3337  }
3338 #endif
3339 
3340  pool_base pop = get_pool_base();
3341  { /* transaction scope */
3342 
3343  flat_transaction::manual tx(pop);
3344 
3345  assert(this->tls_ptr != nullptr);
3346  this->tls_ptr->clear();
3347 
3348  this->on_init_size = 0;
3349 
3350  segment_index_t s = segment_traits_t::segment_index_of(m);
3351 
3352  assert(s + 1 == this->block_table_size ||
3353  !segment_facade_t(this->my_table, s + 1).is_valid());
3354 
3355  do {
3356  clear_segment(s);
3357  } while (s-- > 0);
3358 
3359  /*
3360  * As clear can only be called
3361  * from one thread, and there can be an outer
3362  * transaction we must make sure that mask and size
3363  * changes are transactional
3364  */
3365  flat_transaction::snapshot((size_t *)&this->my_mask);
3366  flat_transaction::snapshot((size_t *)&this->my_size);
3367 
3368  mask().store(embedded_buckets - 1, std::memory_order_relaxed);
3369  this->my_size = 0;
3370 
3372  }
3373 }
3374 
3375 template <typename Key, typename T, typename Hash, typename KeyEqual,
3376  typename MutexType, typename ScopedLockType>
3377 void
3378 concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3379  ScopedLockType>::clear_segment(segment_index_t s)
3380 {
3381  segment_facade_t segment(this->my_table, s);
3382 
3383  assert(segment.is_valid());
3384 
3385  size_type sz = segment.size();
3386  for (segment_index_t i = 0; i < sz; ++i) {
3387  for (node_ptr_t n = segment[i].node_list; n;
3388  n = segment[i].node_list) {
3389  segment[i].node_list = n(this->my_pool_uuid)->next;
3390  delete_node(n);
3391  }
3392  }
3393 
3394  if (s >= segment_traits_t::embedded_segments)
3395  segment.disable();
3396 }
3397 
3398 template <typename Key, typename T, typename Hash, typename KeyEqual,
3399  typename MutexType, typename ScopedLockType>
3400 void
3402  internal_copy(const concurrent_hash_map &source)
3403 {
3404  auto pop = get_pool_base();
3405 
3406  reserve(source.size());
3407  internal_copy(source.begin(), source.end());
3408 }
3409 
3410 template <typename Key, typename T, typename Hash, typename KeyEqual,
3411  typename MutexType, typename ScopedLockType>
3412 template <typename I>
3413 void
3414 concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3415  ScopedLockType>::internal_copy(I first, I last)
3416 {
3417  hashcode_type m = mask();
3418 
3419  for (; first != last; ++first) {
3420  hashcode_type h = hasher{}(first->first);
3421  bucket *b = get_bucket(h & m);
3422 
3423  assert(b->is_rehashed(std::memory_order_relaxed));
3424 
3425  detail::persistent_pool_ptr<node> p;
3426  insert_new_node(b, p, *first);
3427  }
3428 }
3429 
3430 template <typename Key, typename T, typename Hash, typename KeyEqual,
3431  typename MutexType, typename ScopedLockType>
3432 inline bool
3433 operator==(const concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3434  ScopedLockType> &a,
3435  const concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3436  ScopedLockType> &b)
3437 {
3438  if (a.size() != b.size())
3439  return false;
3440 
3441  typename concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3442  ScopedLockType>::const_iterator
3443  i(a.begin()),
3444  i_end(a.end());
3445 
3446  typename concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3447  ScopedLockType>::const_iterator j,
3448  j_end(b.end());
3449 
3450  for (; i != i_end; ++i) {
3451  j = b.equal_range(i->first).first;
3452 
3453  if (j == j_end || !(i->second == j->second))
3454  return false;
3455  }
3456 
3457  return true;
3458 }
3459 
3460 template <typename Key, typename T, typename Hash, typename KeyEqual,
3461  typename MutexType, typename ScopedLockType>
3462 inline bool
3463 operator!=(const concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3464  ScopedLockType> &a,
3465  const concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType,
3466  ScopedLockType> &b)
3467 {
3468  return !(a == b);
3469 }
3470 
3471 template <typename Key, typename T, typename Hash, typename KeyEqual,
3472  typename MutexType, typename ScopedLockType>
3473 inline void
3474 swap(concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType, ScopedLockType> &a,
3475  concurrent_hash_map<Key, T, Hash, KeyEqual, MutexType, ScopedLockType> &b)
3476 {
3477  a.swap(b);
3478 }
3479 
3480 } /* namespace obj */
3481 } /* namespace pmem */
3482 
3483 #endif /* PMEMOBJ_CONCURRENT_HASH_MAP_HPP */
pmem::obj::concurrent_hash_map::clear
void clear()
Clear hash map content Not thread safe.
Definition: concurrent_hash_map.hpp:3324
pmem::obj::concurrent_hash_map::insert
bool insert(value_type &&value)
Insert item by copying if there is no such key present already.
Definition: concurrent_hash_map.hpp:2676
pmem::obj::operator+
persistent_ptr< T > operator+(persistent_ptr< T > const &lhs, std::ptrdiff_t s)
Addition operator for persistent pointers.
Definition: persistent_ptr.hpp:808
pmem::obj::mutex
Persistent memory resident mutex implementation.
Definition: mutex.hpp:31
pmem::obj::concurrent_hash_map::concurrent_hash_map
concurrent_hash_map(concurrent_hash_map &&table)
Move constructor.
Definition: concurrent_hash_map.hpp:2124
pmem::obj::concurrent_hash_map::erase
bool erase(const Key &key)
Remove element with corresponding key.
Definition: concurrent_hash_map.hpp:2812
pmem::obj::concurrent_hash_map::bucket_accessor
Bucket accessor is to find, rehash, acquire a lock, and access a bucket.
Definition: concurrent_hash_map.hpp:1724
pmem::obj::concurrent_hash_map::const_accessor::release
void release()
Release accessor.
Definition: concurrent_hash_map.hpp:2013
pmem::obj::concurrent_hash_map::bucket_accessor::is_writer
bool is_writer() const
Check whether bucket is locked for write.
Definition: concurrent_hash_map.hpp:1780
pmem::obj::concurrent_hash_map::count
size_type count(const Key &key) const
Definition: concurrent_hash_map.hpp:2429
pmem::obj::concurrent_hash_map::end
iterator end()
Definition: concurrent_hash_map.hpp:2353
pmem::obj::p::get_ro
const T & get_ro() const noexcept
Retrieves read-only const reference of the object.
Definition: p.hpp:128
pmem::obj::concurrent_hash_map::insert
bool insert(accessor &result, value_type &&value)
Insert item by copying if there is no such key present already and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2659
pmem
Persistent memory namespace.
Definition: allocation_flag.hpp:15
pmem::obj::concurrent_hash_map::~concurrent_hash_map
~concurrent_hash_map()
free_data should be called before concurrent_hash_map destructor is called.
Definition: concurrent_hash_map.hpp:2323
pmem::obj::concurrent_hash_map::operator=
concurrent_hash_map & operator=(const concurrent_hash_map &table)
Assignment Not thread safe.
Definition: concurrent_hash_map.hpp:2233
pmem::obj::begin
pmem::obj::array< T, N >::iterator begin(pmem::obj::array< T, N > &a)
Non-member begin.
Definition: array.hpp:800
pmem::obj::concurrent_hash_map::const_accessor::operator*
const_reference operator*() const
Definition: concurrent_hash_map.hpp:2026
pmem::obj::concurrent_hash_map::accessor
Allows write access to elements and combines data access, locking, and garbage collection.
Definition: concurrent_hash_map.hpp:2070
common.hpp
Commonly used functionality.
pmem::obj::concurrent_hash_map::insert
bool insert(const_accessor &result, const value_type &value)
Insert item by copying if there is no such key present already and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2593
template_helpers.hpp
Commonly used SFINAE helpers.
pmem::obj::concurrent_hash_map::insert_or_assign
bool insert_or_assign(key_type &&key, M &&obj)
Inserts item if there is no such key present already, assigns provided value otherwise.
Definition: concurrent_hash_map.hpp:2750
pmem::obj::concurrent_hash_map::runtime_initialize
void runtime_initialize()
Initialize persistent concurrent hash map after process restart.
Definition: concurrent_hash_map.hpp:2165
pmem::obj::concurrent_hash_map::serial_bucket_accessor::operator->
bucket * operator->() const
Overloaded arrow operator.
Definition: concurrent_hash_map.hpp:1863
pmem::obj::concurrent_hash_map::insert
bool insert(const_accessor &result, value_type &&value)
Insert item by copying if there is no such key present already and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2641
pmem::obj::operator++
p< T > & operator++(p< T > &pp)
Prefix increment operator overload.
Definition: pext.hpp:48
pmem::obj::concurrent_hash_map::find
bool find(accessor &result, const K &key)
Find item and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2543
pmem::obj::operator==
bool operator==(standard_alloc_policy< T > const &, standard_alloc_policy< T2 > const &)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:420
pmem::obj::concurrent_hash_map::serial_bucket_accessor::get
bucket * get() const
Get bucket pointer.
Definition: concurrent_hash_map.hpp:1854
pmem::obj::p
Resides on pmem class.
Definition: p.hpp:35
pmem::obj::defrag
Defrag class.
Definition: defrag.hpp:83
pmem::obj::concurrent_hash_map::free_data
void free_data()
Destroys the concurrent_hash_map.
Definition: concurrent_hash_map.hpp:2302
pmem::obj::concurrent_hash_map::mutex_vector::push_and_try_lock
bucket * push_and_try_lock(concurrent_hash_map *base, hashcode_type h)
Save pointer to the lock in the vector and lock it.
Definition: concurrent_hash_map.hpp:2933
pmem::obj::concurrent_hash_map::const_accessor::~const_accessor
~const_accessor()
Destroy result after releasing the underlying reference.
Definition: concurrent_hash_map.hpp:2054
pmem::obj::concurrent_hash_map::bucket_count
size_type bucket_count() const
Definition: concurrent_hash_map.hpp:2409
defrag.hpp
Defragmentation class.
pmem::obj::concurrent_hash_map::size
size_type size() const
Definition: concurrent_hash_map.hpp:2382
make_persistent.hpp
Persistent_ptr transactional allocation functions for objects.
pmem::obj::concurrent_hash_map::concurrent_hash_map
concurrent_hash_map(std::initializer_list< value_type > il)
Construct table with initializer list.
Definition: concurrent_hash_map.hpp:2147
pmem::obj::concurrent_hash_map::mutex_vector
Vector of locks to be unlocked at the destruction time.
Definition: concurrent_hash_map.hpp:2927
pmem::obj::concurrent_hash_map::concurrent_hash_map
concurrent_hash_map(size_type n)
Construct empty table with n preallocated buckets.
Definition: concurrent_hash_map.hpp:2102
pmem::obj::concurrent_hash_map::count
size_type count(const K &key) const
This overload only participates in overload resolution if the qualified-id Hash::transparent_key_equa...
Definition: concurrent_hash_map.hpp:2454
pmem::obj::concurrent_hash_map::insert
bool insert(accessor &result, const Key &key)
Insert item (if not already present) and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2576
pmem::obj::concurrent_hash_map::find
bool find(accessor &result, const Key &key)
Find item and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2515
pmem::obj::operator-
persistent_ptr< T > operator-(persistent_ptr< T > const &lhs, std::ptrdiff_t s)
Subtraction operator for persistent pointers.
Definition: persistent_ptr.hpp:822
pmem::obj::swap
void swap(pmem::obj::array< T, N > &lhs, pmem::obj::array< T, N > &rhs)
Non-member swap function.
Definition: array.hpp:880
pmem::obj::operator+=
p< T > & operator+=(p< T > &lhs, const p< Y > &rhs)
Addition assignment operator overload.
Definition: pext.hpp:94
pmem::obj::concurrent_hash_map::insert
void insert(std::initializer_list< value_type > il)
Insert initializer list.
Definition: concurrent_hash_map.hpp:2705
pmem::obj::operator!=
bool operator!=(const allocator< T, P, Tr > &lhs, const OtherAllocator &rhs)
Determines if memory from another allocator can be deallocated from this one.
Definition: allocator.hpp:536
pmem::obj::defrag::run
pobj_defrag_result run()
Starts defragmentation with previously stored pointers.
Definition: defrag.hpp:188
pmem::obj::concurrent_hash_map::concurrent_hash_map
concurrent_hash_map(const concurrent_hash_map &table)
Copy constructor.
Definition: concurrent_hash_map.hpp:2112
pmem::obj::concurrent_hash_map::max_size
size_type max_size() const
Upper bound on size.
Definition: concurrent_hash_map.hpp:2400
pmem::obj::concurrent_hash_map::concurrent_hash_map
concurrent_hash_map()
Construct empty table.
Definition: concurrent_hash_map.hpp:2093
pmem::obj::operator-=
p< T > & operator-=(p< T > &lhs, const p< Y > &rhs)
Subtraction assignment operator overload.
Definition: pext.hpp:116
transaction.hpp
C++ pmemobj transactions.
pmem::obj::concurrent_hash_map::begin
const_iterator begin() const
Definition: concurrent_hash_map.hpp:2363
pmem::obj::concurrent_hash_map::bucket_accessor::acquire
void acquire(concurrent_hash_map *base, const hashcode_type h, bool writer=false)
Find a bucket by masked hashcode, optionally rehash, and acquire the lock.
Definition: concurrent_hash_map.hpp:1754
pmem::obj::concurrent_hash_map::insert_or_assign
bool insert_or_assign(const key_type &key, M &&obj)
Inserts item if there is no such key present already, assigns provided value otherwise.
Definition: concurrent_hash_map.hpp:2722
pmem::obj::concurrent_hash_map::erase
bool erase(const K &key)
Remove element with corresponding key.
Definition: concurrent_hash_map.hpp:2906
pmem::obj::persistent_ptr< node >
shared_mutex.hpp
Pmem-resident shared mutex.
pmem::obj::concurrent_hash_map::insert_or_assign
bool insert_or_assign(K &&key, M &&obj)
Inserts item if there is no such key-comparable type present already, assigns provided value otherwis...
Definition: concurrent_hash_map.hpp:2784
pmem::detail::transaction_base< true >::commit
static void commit()
Manually commit a transaction.
Definition: transaction.hpp:325
pmem::obj::concurrent_hash_map::insert
bool insert(accessor &result, const value_type &value)
Insert item by copying if there is no such key present already and acquire a write lock on the item.
Definition: concurrent_hash_map.hpp:2610
p.hpp
Resides on pmem property template.
pmem::obj::concurrent_hash_map::const_accessor::operator->
const_pointer operator->() const
Definition: concurrent_hash_map.hpp:2036
pmem::obj::concurrent_hash_map::operator=
concurrent_hash_map & operator=(std::initializer_list< value_type > il)
Assignment Not thread safe.
Definition: concurrent_hash_map.hpp:2255
pmem::obj::concurrent_hash_map::accessor::operator->
pointer operator->() const
Return pointer to associated value in hash table.
Definition: concurrent_hash_map.hpp:2084
pmem::obj::get
T & get(pmem::obj::array< T, N > &a)
Non-member get function.
Definition: array.hpp:890
pmem::layout_error
Custom layout error class.
Definition: pexceptions.hpp:183
pmem::obj::concurrent_hash_map::bucket_accessor::get
bucket * get() const
Get bucket pointer.
Definition: concurrent_hash_map.hpp:1790
pmem::obj::concurrent_hash_map::const_accessor::value_type
const typename concurrent_hash_map::value_type value_type
Type of value.
Definition: concurrent_hash_map.hpp:1994
pmem::obj::concurrent_hash_map::serial_bucket_accessor::is_writer
bool is_writer() const
This method is added for consistency with bucket_accessor class.
Definition: concurrent_hash_map.hpp:1844
pmem::transaction_scope_error
Custom transaction error class.
Definition: pexceptions.hpp:163
pmem::obj::concurrent_hash_map::defragment
pobj_defrag_result defragment(double start_percent=0, double amount_percent=100)
Defragment the given (by 'start_percent' and 'amount_percent') part of buckets of the hash map.
Definition: concurrent_hash_map.hpp:2838
pmem::obj::concurrent_hash_map::internal_copy
void internal_copy(const concurrent_hash_map &source)
Copy "source" to *this, where *this must start out empty.
Definition: concurrent_hash_map.hpp:3402
pmem::obj::concurrent_hash_map::empty
bool empty() const
Definition: concurrent_hash_map.hpp:2391
pmem::obj::defrag::add
std::enable_if< is_defragmentable< T >), void >::type add(T &t)
Stores address of the referenced object to the defragmentation queue.
Definition: defrag.hpp:112
pmem::obj::concurrent_hash_map::find
bool find(const_accessor &result, const K &key) const
Find item and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2498
pmem::obj::concurrent_hash_map::insert
bool insert(const value_type &value)
Insert item by copying if there is no such key present already.
Definition: concurrent_hash_map.hpp:2626
pmem::obj::concurrent_hash_map::concurrent_hash_map
concurrent_hash_map(I first, I last)
Construction table with copying iteration range.
Definition: concurrent_hash_map.hpp:2135
pmem::obj::pool_base
The non-template pool base class.
Definition: pool.hpp:46
atomic_backoff.hpp
Atomic backoff, for time delay.
pmem::obj::concurrent_hash_map::insert
bool insert(const_accessor &result, const Key &key)
Insert item (if not already present) and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2559
pmem::obj::concurrent_hash_map::const_accessor
Combines data access, locking, and garbage collection.
Definition: concurrent_hash_map.hpp:1982
pmem::obj::concurrent_hash_map::begin
iterator begin()
Definition: concurrent_hash_map.hpp:2343
persistent_ptr.hpp
Persistent smart pointer.
pmem::obj::concurrent_hash_map::bucket_accessor::operator->
bucket * operator->() const
Overloaded arrow operator.
Definition: concurrent_hash_map.hpp:1799
pmem::obj::concurrent_hash_map::accessor::operator*
reference operator*() const
Return reference to associated value in hash table.
Definition: concurrent_hash_map.hpp:2076
pmem::obj::flat_transaction::manual
typename detail::transaction_base< true >::manual manual
C++ manual scope transaction class.
Definition: transaction.hpp:756
pmem::obj::concurrent_hash_map::swap
void swap(concurrent_hash_map &table)
Swap two instances.
Definition: concurrent_hash_map.hpp:3285
pmem::obj::concurrent_hash_map::insert
void insert(I first, I last)
Insert range [first, last)
Definition: concurrent_hash_map.hpp:2691
pmem::obj::operator--
p< T > & operator--(p< T > &pp)
Prefix decrement operator overload.
Definition: pext.hpp:59
pmem::obj::concurrent_hash_map::const_accessor::const_accessor
const_accessor()
Create empty result.
Definition: concurrent_hash_map.hpp:2046
pmem::obj::concurrent_hash_map::find
bool find(const_accessor &result, const Key &key) const
Find item and acquire a read lock on the item.
Definition: concurrent_hash_map.hpp:2469
pmem::obj::flat_transaction::run
static void run(obj::pool_base &pool, std::function< void()> tx, Locks &... locks)
Execute a closure-like transaction and lock locks.
Definition: transaction.hpp:817
pmem::detail::transaction_base< true >::snapshot
static void snapshot(const T *addr, size_t num=1)
Takes a “snapshot” of given elements of type T number (1 by default), located at the given address pt...
Definition: transaction.hpp:428
pmem::obj::concurrent_hash_map
Persistent memory aware implementation of Intel TBB concurrent_hash_map.
Definition: concurrent_hash_map.hpp:1628
enumerable_thread_specific.hpp
A persistent version of thread-local storage.
pmem::obj::concurrent_hash_map::rehash
void rehash(size_type n=0)
Rehashes and optionally resizes the whole table.
Definition: concurrent_hash_map.hpp:3294
mutex.hpp
Pmem-resident mutex.
pmem::obj::concurrent_hash_map::serial_bucket_accessor
Serial bucket accessor used to access bucket in a serial operations.
Definition: concurrent_hash_map.hpp:1808
pmem::obj::concurrent_hash_map::const_accessor::empty
bool empty() const
Definition: concurrent_hash_map.hpp:2001
pmem::obj::concurrent_hash_map::end
const_iterator end() const
Definition: concurrent_hash_map.hpp:2373
pmem::obj::concurrent_hash_map::defrag_save_nodes
void defrag_save_nodes(bucket *b, pmem::obj::defrag &defrag)
Internal method used by defragment().
Definition: concurrent_hash_map.hpp:3014