PMDK C++ bindings  1.13.0-git107.g7e59f08f
This is the C++ bindings documentation for PMDK's libpmemobj.
enumerable_thread_specific.hpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 /* Copyright 2019-2020, Intel Corporation */
3 
9 #ifndef LIBPMEMOBJ_CPP_ENUMERABLE_THREAD_SPECIFIC_HPP
10 #define LIBPMEMOBJ_CPP_ENUMERABLE_THREAD_SPECIFIC_HPP
11 
14 #include <libpmemobj++/mutex.hpp>
16 
17 #include <cassert>
18 #include <deque>
19 #include <mutex>
20 #include <numeric>
21 #include <thread>
22 #include <unordered_map>
23 
24 namespace pmem
25 {
26 namespace detail
27 {
28 
35 struct id_manager {
36  id_manager();
37 
38  id_manager(const id_manager &) = delete;
39  id_manager &operator=(const id_manager &) = delete;
40 
41  size_t get();
42  void release(size_t id);
43 
44 private:
45  static constexpr size_t initial_queue_capacity = 1024;
46 
47  std::mutex mutex;
48  std::size_t queue_capacity;
49  std::deque<size_t> queue;
50 };
51 
55 
56  thread_id_type(const thread_id_type &) = delete;
57  thread_id_type &operator=(const thread_id_type &) = delete;
58 
60 
61  size_t get();
62 
63 private:
64  size_t id;
65 
66  static id_manager &get_id_manager();
67 };
68 
87 template <typename T, typename Mutex = obj::shared_mutex,
88  typename Storage =
91  using storage_type = Storage;
92  using mutex_type = Mutex;
93 
94 public:
95  /* traits */
96  using value_type = T;
97  using size_type = typename storage_type::size_type;
98  using difference_type = typename storage_type::difference_type;
99  using reference = value_type &;
100  using const_reference = const value_type &;
101  using pointer = value_type *;
102  using const_pointer = const value_type *;
103  using iterator = typename storage_type::iterator;
104  using const_iterator = typename storage_type::const_iterator;
105 
106  /* initialization */
107  template <typename Handler>
108  void initialize(Handler handler = [](reference) {});
109 
110  /* ctors & dtor */
112  ~enumerable_thread_specific() = default;
113 
114  /* access */
115  reference local();
116 
117  /* size */
118  bool empty() const;
119  void clear();
120  size_type size() const;
121 
122  /* iterators */
123  iterator begin();
124  iterator end();
125  const_iterator begin() const;
126  const_iterator end() const;
127 
128 private:
129  /* private helper methods */
130  obj::pool_base get_pool() const noexcept;
131  void set_cached_size(size_t s);
132  size_t get_cached_size();
133 
134  mutex_type _mutex;
135  storage_type _storage;
136 
137  obj::p<std::atomic<size_t>> _storage_size;
138 };
139 
140 inline id_manager::id_manager()
141  : queue_capacity(initial_queue_capacity), queue(initial_queue_capacity, 0)
142 {
143  /* Convert 0, 0, 0, ..., 0 into 0, 1, 2, ..., N */
144  std::iota(queue.begin(), queue.end(), 0);
145 }
146 
152 inline size_t
154 {
155  std::unique_lock<std::mutex> lock(mutex);
156 
157  if (queue.empty())
158  queue.push_front(queue_capacity++);
159 
160  auto front = queue.front();
161  queue.pop_front();
162 
163  return front;
164 }
165 
171 inline void
173 {
174  std::unique_lock<std::mutex> lock(mutex);
175 
176  queue.push_front(id);
177 }
178 
182 inline id_manager &
183 thread_id_type::get_id_manager()
184 {
185  static id_manager manager;
186  return manager;
187 }
188 
195 {
196  auto &manager = get_id_manager();
197 
198  /*
199  * Drd had a bug related to static object initialization and reported
200  * conflicting load on std::mutex access inside id_manager.
201  */
202 #if LIBPMEMOBJ_CPP_VG_DRD_ENABLED
203  ANNOTATE_BENIGN_RACE_SIZED(
204  &manager, sizeof(std::mutex),
205  "https://bugs.kde.org/show_bug.cgi?id=416286");
206 #endif
207 
208  id = manager.get();
209 }
210 
217 {
218  get_id_manager().release(id);
219 }
220 
224 inline size_t
226 {
227  return id;
228 }
229 
233 template <typename T, typename Mutex, typename Storage>
235 {
236  _storage_size.get_rw() = 0;
237 }
238 
242 template <typename T, typename Mutex, typename Storage>
243 void
245 {
246  auto pop = get_pool();
247 
248  /* Helgrind does not understand std::atomic */
249 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
250  VALGRIND_HG_DISABLE_CHECKING(&_storage_size, sizeof(_storage_size));
251 #endif
252 
253 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED || LIBPMEMOBJ_CPP_VG_DRD_ENABLED
254  ANNOTATE_HAPPENS_BEFORE(&_storage_size);
255 #endif
256 
257  _storage_size.get_rw().store(s);
258  pop.persist(_storage_size);
259 }
260 
264 template <typename T, typename Mutex, typename Storage>
265 size_t
266 enumerable_thread_specific<T, Mutex, Storage>::get_cached_size()
267 {
268  auto s = _storage_size.get_ro().load();
269 
270 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED || LIBPMEMOBJ_CPP_VG_DRD_ENABLED
271  ANNOTATE_HAPPENS_AFTER(&_storage_size);
272 #endif
273 
274  return s;
275 }
276 
284 template <typename T, typename Mutex, typename Storage>
285 template <typename Handler>
286 void
288 {
289  for (reference e : *this) {
290  handler(e);
291  }
292  clear();
293 }
294 
303 template <typename T, typename Mutex, typename Storage>
304 typename enumerable_thread_specific<T, Mutex, Storage>::reference
306 {
307  assert(pmemobj_tx_stage() != TX_STAGE_WORK);
308 
309  static thread_local thread_id_type tid;
310  auto index = tid.get();
311 
312  auto cached_size = get_cached_size();
313 
314  if (index >= cached_size) {
315  std::unique_lock<mutex_type> lock(_mutex);
316 
317  /* Size of the storage could have changed before we obtained the
318  * lock. That's why we read size once again. */
319  auto size = _storage.size();
320 
321  if (index >= size) {
322  _storage.resize(index + 1);
323  set_cached_size(index + 1);
324  } else if (size != cached_size) {
325  set_cached_size(size);
326  }
327  }
328 
329  /*
330  * Because _storage can only grow (unless clear() was called which
331  * should not happen simultaneously with this operation), index must be
332  * less than _storage.size().
333  */
334  return _storage[index];
335 }
336 
343 template <typename T, typename Mutex, typename Storage>
344 void
346 {
347  auto pop = get_pool();
348 
349  obj::flat_transaction::run(pop, [&] {
350  _storage_size.get_rw() = 0;
351  _storage.clear();
352  });
353 }
354 
360 template <typename T, typename Mutex, typename Storage>
361 typename enumerable_thread_specific<T, Mutex, Storage>::size_type
363 {
364  return _storage.size();
365 }
366 
372 template <typename T, typename Mutex, typename Storage>
373 bool
375 {
376  return _storage.size() == 0;
377 }
378 
384 template <typename T, typename Mutex, typename Storage>
385 typename enumerable_thread_specific<T, Mutex, Storage>::iterator
387 {
388  return _storage.begin();
389 }
390 
396 template <typename T, typename Mutex, typename Storage>
397 typename enumerable_thread_specific<T, Mutex, Storage>::iterator
399 {
400  return _storage.end();
401 }
402 
408 template <typename T, typename Mutex, typename Storage>
409 typename enumerable_thread_specific<T, Mutex, Storage>::const_iterator
411 {
412  return _storage.begin();
413 }
414 
420 template <typename T, typename Mutex, typename Storage>
421 typename enumerable_thread_specific<T, Mutex, Storage>::const_iterator
423 {
424  return _storage.end();
425 }
426 
434 template <typename T, typename Mutex, typename Storage>
437 {
438  auto pop = pmemobj_pool_by_ptr(this);
439  assert(pop != nullptr);
440  return obj::pool_base(pop);
441 }
442 
443 } /* namespace detail */
444 } /* namespace pmem */
445 
446 #endif
Class for storing thread local data.
Definition: enumerable_thread_specific.hpp:90
const_iterator end() const
Returns an const_iterator to element after the last.
Definition: enumerable_thread_specific.hpp:422
iterator end()
Returns an iterator to element after the last.
Definition: enumerable_thread_specific.hpp:398
void initialize(Handler handler=[](reference) {})
Initialization method.
Definition: enumerable_thread_specific.hpp:287
enumerable_thread_specific()
Constructor.
Definition: enumerable_thread_specific.hpp:234
size_type size() const
Returns number of elements being stored in the container.
Definition: enumerable_thread_specific.hpp:362
const_iterator begin() const
Returns an const_iterator to the beginning.
Definition: enumerable_thread_specific.hpp:410
bool empty() const
Determines if container is empty or not.
Definition: enumerable_thread_specific.hpp:374
void clear()
Removes all elements from the container.
Definition: enumerable_thread_specific.hpp:345
iterator begin()
Returns an iterator to the beginning.
Definition: enumerable_thread_specific.hpp:386
reference local()
Returns data reference for the current thread.
Definition: enumerable_thread_specific.hpp:305
static void run(obj::pool_base &pool, std::function< void()> tx, Locks &... locks)
Execute a closure-like transaction and lock locks.
Definition: transaction.hpp:810
T & get_rw()
Retrieves read-write reference of the object.
Definition: p.hpp:114
const T & get_ro() const noexcept
Retrieves read-only const reference of the object.
Definition: p.hpp:129
The non-template pool base class.
Definition: pool.hpp:51
Persistent version of segment vector with std::vector compatible interface.
Definition: segment_vector.hpp:506
Persistent memory resident shared_mutex implementation.
Definition: shared_mutex.hpp:32
Commonly used functionality.
Pmem-resident mutex.
Persistent memory namespace.
Definition: allocation_flag.hpp:15
A persistent version of segment vector implementation.
Pmem-resident shared mutex.
This structure is used for assigning unique thread ids so that those ids will be reused in case of th...
Definition: enumerable_thread_specific.hpp:35
void release(size_t id)
Releases thread id so that it can be reused by other threads.
Definition: enumerable_thread_specific.hpp:172
size_t get()
Obtain unique thread id.
Definition: enumerable_thread_specific.hpp:153
RAII-style structure for holding thread id.
Definition: enumerable_thread_specific.hpp:53
~thread_id_type()
thread_id_type destructor.
Definition: enumerable_thread_specific.hpp:216
size_t get()
Obtain current thread id.
Definition: enumerable_thread_specific.hpp:225
thread_id_type()
thread_id_type constructor.
Definition: enumerable_thread_specific.hpp:194