Shared Memory Dictionary utilizing Posix IPC semaphores and shared memory segments and offering permanent disk storage of data if required.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

361 lines
13KB

  1. # -*- coding: utf-8 -*-
  2. # Standard library imports
  3. try:
  4. from collections.abc import MutableMapping
  5. except ImportError:
  6. from collections import MutableMapping
  7. from math import ceil
  8. import mmap
  9. import mock
  10. import os
  11. import pickle
  12. from random import SystemRandom
  13. import re
  14. from string import ascii_letters as str_ascii_letters, digits as str_digits
  15. # Related third party imports (If you used pip/apt/yum to install)
  16. import posix_ipc
  17. import pytest
  18. # Local application/library specific imports (Look ma! I wrote it myself!)
  19. import shm_dict
  20. from shm_dict import SHMDict
  21. from shm_dict import __version__
  22. __author__ = "Nate Bohman"
  23. __credits__ = ["Nate Bohman"]
  24. __license__ = "LGPL-3"
  25. __maintainer__ = "Nate Bohman"
  26. __email__ = "natrinicle@natrinicle.com"
  27. __status__ = "Production"
  28. # Global variables
  29. KEY_SEQUENCE = 0
  30. B64_INVALID_CHARS = re.compile(r"[^a-zA-Z0-9+/='-_]")
  31. str_ascii = "".join([str_ascii_letters, str_digits])
  32. def rand_string(num_chars, use_chars=str_ascii):
  33. """Generate a random string of chars from use_chars.
  34. :type num_chars: int
  35. :param num_chars: Desired length of random string.
  36. :return: A string of length num_chars composed of random characters from use_chars.
  37. """
  38. return "".join(SystemRandom().choice(use_chars) for _ in range(num_chars))
  39. @pytest.fixture()
  40. def dict_key():
  41. global KEY_SEQUENCE
  42. next_key = "KEY{}".format(KEY_SEQUENCE)
  43. KEY_SEQUENCE += 1
  44. return next_key
  45. @pytest.fixture
  46. def os_path_mock(monkeypatch, tmpdir):
  47. """Disable looking up actual paths."""
  48. def _return_tmp_file(arg):
  49. return str(
  50. os.path.join(str(tmpdir), "".join(["pytest_shm_dict_", rand_string(10)]))
  51. )
  52. os_path_mock = mock.Mock()
  53. attrs = {
  54. "expanduser.side_effect": _return_tmp_file,
  55. "realpath.side_effect": _return_tmp_file,
  56. "abspath.side_effect": _return_tmp_file,
  57. }
  58. os_path_mock.configure_mock(**attrs)
  59. return os_path_mock
  60. class TestSHMDict(object):
  61. _tmpfile_prefix = "pytest_shm_dict_"
  62. _tmpfile_rand = ""
  63. per_shm_dict = None
  64. vol_shm_dict = None
  65. def create_per_shm_dict(self, temp_dir):
  66. """Create a persistent shared memory dictionary for testing."""
  67. self.per_shm_dict = SHMDict(
  68. self.dict_filename(temp_dir), persist=True, lock_timeout=0
  69. )
  70. return self.per_shm_dict
  71. def create_vol_shm_dict(self):
  72. """Create a volatile shared memory dictionary for testing."""
  73. self.vol_shm_dict = SHMDict("PyTestSHMDict", lock_timeout=0)
  74. return self.vol_shm_dict
  75. def dict_filename(self, temp_dir):
  76. return os.path.join(
  77. str(temp_dir), "".join([self._tmpfile_prefix, self._tmpfile_rand])
  78. )
  79. def setup_method(self, method):
  80. # Set up one time random string for temp filename and make
  81. # sure the temp test file doesn't already exist
  82. self._tmpfile_rand = rand_string(10)
  83. def teardown_method(self, method):
  84. if self.per_shm_dict is not None:
  85. while self.per_shm_dict.semaphore.value > 0:
  86. self.per_shm_dict.semaphore.acquire()
  87. while self.per_shm_dict.semaphore.value <= 0:
  88. self.per_shm_dict.semaphore.release()
  89. del self.per_shm_dict
  90. if self.vol_shm_dict is not None:
  91. while self.vol_shm_dict.semaphore.value > 0:
  92. self.vol_shm_dict.semaphore.acquire()
  93. while self.vol_shm_dict.semaphore.value <= 0:
  94. self.vol_shm_dict.semaphore.release()
  95. del self.vol_shm_dict
  96. def test_types(self):
  97. self.create_per_shm_dict("test")
  98. self.create_vol_shm_dict()
  99. assert isinstance(self.per_shm_dict, MutableMapping)
  100. assert isinstance(self.vol_shm_dict, MutableMapping)
  101. assert isinstance(self.per_shm_dict, SHMDict)
  102. assert isinstance(self.vol_shm_dict, SHMDict)
  103. assert isinstance(self.per_shm_dict.semaphore, posix_ipc.Semaphore)
  104. assert isinstance(self.vol_shm_dict.semaphore, posix_ipc.Semaphore)
  105. assert isinstance(self.per_shm_dict.shared_mem, posix_ipc.SharedMemory)
  106. assert isinstance(self.vol_shm_dict.shared_mem, posix_ipc.SharedMemory)
  107. assert isinstance(self.per_shm_dict.map_file, mmap.mmap)
  108. assert isinstance(self.vol_shm_dict.map_file, mmap.mmap)
  109. def test_persist_filename(self, monkeypatch, os_path_mock):
  110. with monkeypatch.context() as monkey:
  111. monkey.setattr("os.path", os_path_mock)
  112. self.create_per_shm_dict("test")
  113. self.create_vol_shm_dict()
  114. # Ensure persistent shm_dict has a str representation
  115. # of a path in persist_file
  116. assert isinstance(self.per_shm_dict.persist_file, str)
  117. assert self.per_shm_dict.persist_file == os.path.abspath("test")
  118. # Ensure volatile shm_dict has no persist_file
  119. assert self.vol_shm_dict.persist_file is None
  120. def test_persist_filename_homedir(self, monkeypatch, os_path_mock):
  121. with monkeypatch.context() as monkey:
  122. monkey.setattr("os.path", os_path_mock)
  123. self.create_per_shm_dict("~/test")
  124. # Ensure persistent shm_dict has a str representation
  125. # of a path in persist_file
  126. assert isinstance(self.per_shm_dict.persist_file, str)
  127. assert self.per_shm_dict.persist_file == os.path.abspath("test")
  128. def test_safe_names(self, tmpdir):
  129. self.create_per_shm_dict(tmpdir)
  130. # Ensure both sem and shm names begin with / per
  131. # http://semanchuk.com/philip/posix_ipc
  132. assert self.per_shm_dict.safe_sem_name.startswith("/")
  133. assert self.per_shm_dict.safe_shm_name.startswith("/")
  134. # Ensure only base64 characters are used
  135. assert B64_INVALID_CHARS.search(self.per_shm_dict.safe_sem_name) is None
  136. assert B64_INVALID_CHARS.search(self.per_shm_dict.safe_shm_name) is None
  137. def test_persistent_file(self, tmpdir, dict_key):
  138. """Test that a persistent file is written to disk """
  139. test_rand_string = rand_string(10)
  140. self.create_per_shm_dict(tmpdir)
  141. self.create_vol_shm_dict()
  142. # File should be created after first dict release
  143. self.per_shm_dict[dict_key] = test_rand_string
  144. # Make sure the file exists and the contents are correct
  145. assert os.path.isfile(self.dict_filename(tmpdir))
  146. assert self.per_shm_dict.get(dict_key) == test_rand_string
  147. # Ensure the file exists after dict is deleted
  148. del self.per_shm_dict
  149. assert os.path.isfile(self.dict_filename(tmpdir))
  150. # Re-open dict from test file
  151. self.create_per_shm_dict(tmpdir)
  152. # Make sure the contents are still the same after reopening
  153. assert self.per_shm_dict.get(dict_key) == test_rand_string
  154. def test_get_set(self, tmpdir, dict_key):
  155. test_rand_string = rand_string(10)
  156. self.create_per_shm_dict(tmpdir)
  157. self.create_vol_shm_dict()
  158. # Assign value to key and make sure it gets set
  159. self.per_shm_dict[dict_key] = test_rand_string
  160. # Check that the persistent dict has the key
  161. assert self.per_shm_dict.has_key(dict_key) is True
  162. # Check the value of the key to ensure no corruption
  163. assert self.per_shm_dict[dict_key] == test_rand_string
  164. # Use update to set the key in the volatile dict
  165. # from the value in the persistent dict
  166. self.vol_shm_dict.update(self.per_shm_dict)
  167. # Check the keys and values of the volatile dict
  168. assert list(self.vol_shm_dict.keys()) == [dict_key]
  169. assert list(self.vol_shm_dict.values()) == [test_rand_string]
  170. assert list(self.vol_shm_dict.items()) == [(dict_key, test_rand_string)]
  171. for key in iter(self.vol_shm_dict):
  172. assert self.vol_shm_dict[key] == test_rand_string
  173. # Test popping a key from the dictionary
  174. assert self.vol_shm_dict.pop(dict_key) == test_rand_string
  175. assert (dict_key in self.vol_shm_dict) == False
  176. # Delete key and make sure it's deleted
  177. del self.per_shm_dict[dict_key]
  178. assert self.per_shm_dict.get(dict_key) is None
  179. def test_copy(self, tmpdir, dict_key):
  180. test_rand_string = rand_string(10)
  181. self.create_per_shm_dict(tmpdir)
  182. # Assign value to a key and then copy to a
  183. # testing dictionary object.
  184. self.per_shm_dict[dict_key] = test_rand_string
  185. dict_copy = self.per_shm_dict.copy()
  186. # Delete key from persistent dict and make sure
  187. # it's deleted only from the persistent dict as
  188. # the dict copy should be a new dict and not a
  189. # pointer to the persistent dict.
  190. assert self.per_shm_dict[dict_key] == test_rand_string
  191. del self.per_shm_dict[dict_key]
  192. assert self.per_shm_dict.get(dict_key) is None
  193. assert dict_copy[dict_key] == test_rand_string
  194. del dict_copy[dict_key]
  195. assert dict_copy.get(dict_key) is None
  196. def test_equality(self, tmpdir, dict_key):
  197. test_rand_string = rand_string(10)
  198. self.create_per_shm_dict(tmpdir)
  199. self.create_vol_shm_dict()
  200. # Assign value to a key and then copy the pointer
  201. # to the volatile dict to another object.
  202. self.vol_shm_dict[dict_key] = test_rand_string
  203. self.per_shm_dict[dict_key] = test_rand_string
  204. dict_dup = self.vol_shm_dict
  205. assert self.vol_shm_dict == dict_dup
  206. assert (self.vol_shm_dict == {dict_key: test_rand_string}) == False
  207. assert self.vol_shm_dict != {dict_key: test_rand_string}
  208. assert self.vol_shm_dict != self.per_shm_dict
  209. def test_len(self, dict_key):
  210. self.create_vol_shm_dict()
  211. # Make sure dict starts out empty
  212. assert len(self.vol_shm_dict) == 0
  213. # Add 1 key to the dict and make sure dict has 1 key
  214. self.vol_shm_dict[dict_key] = rand_string(10)
  215. assert len(self.vol_shm_dict) == 1
  216. def test_large_dict(self, dict_key):
  217. test_rand_string_short = rand_string(10)
  218. test_rand_string_medium = rand_string(mmap.PAGESIZE * 2)
  219. test_rand_string_long = rand_string(mmap.PAGESIZE * 4)
  220. self.create_vol_shm_dict()
  221. # Test short, medium, and long size storage
  222. self.vol_shm_dict[dict_key] = test_rand_string_short
  223. assert self.vol_shm_dict[dict_key] == test_rand_string_short
  224. assert self.vol_shm_dict.map_file.size() == int(
  225. ceil(float(len(pickle.dumps(self.vol_shm_dict.copy(), 2))) / mmap.PAGESIZE)
  226. * mmap.PAGESIZE
  227. )
  228. self.vol_shm_dict[dict_key] = test_rand_string_medium
  229. assert self.vol_shm_dict[dict_key] == test_rand_string_medium
  230. assert self.vol_shm_dict.map_file.size() == int(
  231. ceil(float(len(pickle.dumps(self.vol_shm_dict.copy(), 2))) / mmap.PAGESIZE)
  232. * mmap.PAGESIZE
  233. )
  234. self.vol_shm_dict[dict_key] = test_rand_string_long
  235. assert self.vol_shm_dict[dict_key] == test_rand_string_long
  236. assert self.vol_shm_dict.map_file.size() == int(
  237. ceil(float(len(pickle.dumps(self.vol_shm_dict.copy(), 2))) / mmap.PAGESIZE)
  238. * mmap.PAGESIZE
  239. )
  240. # Test short + medium and short + medium + long storage
  241. # Ensures that dict resizes map_file down first and then back up
  242. self.vol_shm_dict[dict_key] = "".join(
  243. [test_rand_string_short, test_rand_string_medium]
  244. )
  245. assert self.vol_shm_dict[dict_key] == "".join(
  246. [test_rand_string_short, test_rand_string_medium]
  247. )
  248. assert self.vol_shm_dict.map_file.size() == int(
  249. ceil(float(len(pickle.dumps(self.vol_shm_dict.copy(), 2))) / mmap.PAGESIZE)
  250. * mmap.PAGESIZE
  251. )
  252. self.vol_shm_dict[dict_key] = "".join(
  253. [test_rand_string_short, test_rand_string_medium, test_rand_string_long]
  254. )
  255. assert self.vol_shm_dict[dict_key] == "".join(
  256. [test_rand_string_short, test_rand_string_medium, test_rand_string_long]
  257. )
  258. assert self.vol_shm_dict.map_file.size() == int(
  259. ceil(float(len(pickle.dumps(self.vol_shm_dict.copy(), 2))) / mmap.PAGESIZE)
  260. * mmap.PAGESIZE
  261. )
  262. def test_clear(self, dict_key):
  263. test_rand_string = rand_string(10)
  264. self.create_vol_shm_dict()
  265. # Assign value to key and make sure it gets set
  266. self.vol_shm_dict[dict_key] = test_rand_string
  267. assert self.vol_shm_dict[dict_key] == test_rand_string
  268. # Clean dict and ensure it's empty again
  269. self.vol_shm_dict.clear()
  270. assert len(self.vol_shm_dict) == 0
  271. def test_lock(self):
  272. self.create_vol_shm_dict()
  273. # Simulate another thread/process having a lock
  274. # with the semaphore value at 0 and the internal
  275. # semaphore not set to true.
  276. while self.vol_shm_dict.semaphore.value > 0:
  277. self.vol_shm_dict.semaphore.acquire()
  278. with pytest.raises(posix_ipc.BusyError, match=r".*Semaphore is busy.*"):
  279. repr(self.vol_shm_dict)
  280. # Set auto_unlock to True to ensure the semaphore
  281. # is automatically released
  282. self.vol_shm_dict.auto_unlock = True
  283. repr(self.vol_shm_dict)