Shared Memory Dictionary utilizing Posix IPC semaphores and shared memory segments and offering permanent disk storage of data if required.
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

test_shm_dict.py 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. # -*- coding: utf-8 -*-
  2. # Standard library imports
  3. try:
  4. from collections.abc import MutableMapping
  5. except ImportError:
  6. from collections import MutableMapping
  7. import mmap
  8. import mock
  9. import os
  10. from random import SystemRandom
  11. import re
  12. from string import ascii_letters as str_ascii_letters, digits as str_digits
  13. # Related third party imports (If you used pip/apt/yum to install)
  14. import posix_ipc
  15. import pytest
  16. # Local application/library specific imports (Look ma! I wrote it myself!)
  17. import shm_dict
  18. from shm_dict import SHMDict
  19. from shm_dict import __version__
  20. __author__ = "Nate Bohman"
  21. __credits__ = ["Nate Bohman"]
  22. __license__ = "LGPL-3"
  23. __maintainer__ = "Nate Bohman"
  24. __email__ = "natrinicle@natrinicle.com"
  25. __status__ = "Production"
  26. # Global variables
  27. KEY_SEQUENCE = 0
  28. B64_INVALID_CHARS = re.compile(r"[^a-zA-Z0-9+/='-_]")
  29. str_ascii = "".join([str_ascii_letters, str_digits])
  30. def rand_string(num_chars, use_chars=str_ascii):
  31. """Generate a random string of chars from use_chars.
  32. :type num_chars: int
  33. :param num_chars: Desired length of random string.
  34. :return: A string of length num_chars composed of random characters from use_chars.
  35. """
  36. return "".join(SystemRandom().choice(use_chars) for _ in range(num_chars))
  37. @pytest.fixture()
  38. def dict_key():
  39. global KEY_SEQUENCE
  40. next_key = "KEY{}".format(KEY_SEQUENCE)
  41. KEY_SEQUENCE += 1
  42. return next_key
  43. @pytest.fixture
  44. def os_path_mock(monkeypatch, tmpdir):
  45. """Disable looking up actual paths."""
  46. def _return_tmp_file(arg):
  47. return str(
  48. os.path.join(str(tmpdir), "".join(["pytest_shm_dict_", rand_string(10)]))
  49. )
  50. os_path_mock = mock.Mock()
  51. attrs = {
  52. "expanduser.side_effect": _return_tmp_file,
  53. "realpath.side_effect": _return_tmp_file,
  54. "abspath.side_effect": _return_tmp_file,
  55. }
  56. os_path_mock.configure_mock(**attrs)
  57. return os_path_mock
  58. class TestSHMDict(object):
  59. _tmpfile_prefix = "pytest_shm_dict_"
  60. _tmpfile_rand = ""
  61. per_shm_dict = None
  62. vol_shm_dict = None
  63. def create_per_shm_dict(self, temp_dir):
  64. """Create a persistent shared memory dictionary for testing."""
  65. self.per_shm_dict = SHMDict(
  66. self.dict_filename(temp_dir), persist=True, lock_timeout=0
  67. )
  68. return self.per_shm_dict
  69. def create_vol_shm_dict(self):
  70. """Create a volatile shared memory dictionary for testing."""
  71. self.vol_shm_dict = SHMDict("PyTestSHMDict", lock_timeout=0)
  72. return self.vol_shm_dict
  73. def dict_filename(self, temp_dir):
  74. return os.path.join(
  75. str(temp_dir), "".join([self._tmpfile_prefix, self._tmpfile_rand])
  76. )
  77. def setup_method(self, method):
  78. # Set up one time random string for temp filename and make
  79. # sure the temp test file doesn't already exist
  80. self._tmpfile_rand = rand_string(10)
  81. def teardown_method(self, method):
  82. if self.per_shm_dict is not None:
  83. while self.per_shm_dict.semaphore.value > 0:
  84. self.per_shm_dict.semaphore.acquire()
  85. while self.per_shm_dict.semaphore.value <= 0:
  86. self.per_shm_dict.semaphore.release()
  87. del self.per_shm_dict
  88. if self.vol_shm_dict is not None:
  89. while self.vol_shm_dict.semaphore.value > 0:
  90. self.vol_shm_dict.semaphore.acquire()
  91. while self.vol_shm_dict.semaphore.value <= 0:
  92. self.vol_shm_dict.semaphore.release()
  93. del self.vol_shm_dict
  94. def test_types(self):
  95. self.create_per_shm_dict("test")
  96. self.create_vol_shm_dict()
  97. assert isinstance(self.per_shm_dict, MutableMapping)
  98. assert isinstance(self.vol_shm_dict, MutableMapping)
  99. assert isinstance(self.per_shm_dict, SHMDict)
  100. assert isinstance(self.vol_shm_dict, SHMDict)
  101. assert isinstance(self.per_shm_dict.semaphore, posix_ipc.Semaphore)
  102. assert isinstance(self.vol_shm_dict.semaphore, posix_ipc.Semaphore)
  103. assert isinstance(self.per_shm_dict.shared_mem, posix_ipc.SharedMemory)
  104. assert isinstance(self.vol_shm_dict.shared_mem, posix_ipc.SharedMemory)
  105. assert isinstance(self.per_shm_dict.map_file, mmap.mmap)
  106. assert isinstance(self.vol_shm_dict.map_file, mmap.mmap)
  107. def test_persist_filename(self, monkeypatch, os_path_mock):
  108. with monkeypatch.context() as monkey:
  109. monkey.setattr("os.path", os_path_mock)
  110. self.create_per_shm_dict("test")
  111. self.create_vol_shm_dict()
  112. # Ensure persistent shm_dict has a str representation
  113. # of a path in persist_file
  114. assert isinstance(self.per_shm_dict.persist_file, str)
  115. assert self.per_shm_dict.persist_file == os.path.abspath("test")
  116. # Ensure volatile shm_dict has no persist_file
  117. assert self.vol_shm_dict.persist_file is None
  118. def test_persist_filename_homedir(self, monkeypatch, os_path_mock):
  119. with monkeypatch.context() as monkey:
  120. monkey.setattr("os.path", os_path_mock)
  121. self.create_per_shm_dict("~/test")
  122. # Ensure persistent shm_dict has a str representation
  123. # of a path in persist_file
  124. assert isinstance(self.per_shm_dict.persist_file, str)
  125. assert self.per_shm_dict.persist_file == os.path.abspath("test")
  126. def test_safe_names(self, tmpdir):
  127. self.create_per_shm_dict(tmpdir)
  128. # Ensure both sem and shm names begin with / per
  129. # http://semanchuk.com/philip/posix_ipc
  130. assert self.per_shm_dict.safe_sem_name.startswith("/")
  131. assert self.per_shm_dict.safe_shm_name.startswith("/")
  132. # Ensure only base64 characters are used
  133. assert B64_INVALID_CHARS.search(self.per_shm_dict.safe_sem_name) is None
  134. assert B64_INVALID_CHARS.search(self.per_shm_dict.safe_shm_name) is None
  135. def test_persistent_file(self, tmpdir, dict_key):
  136. """Test that a persistent file is written to disk """
  137. test_rand_string = rand_string(10)
  138. self.create_per_shm_dict(tmpdir)
  139. self.create_vol_shm_dict()
  140. # File should be created after first dict release
  141. self.per_shm_dict[dict_key] = test_rand_string
  142. # Make sure the file exists and the contents are correct
  143. assert os.path.isfile(self.dict_filename(tmpdir))
  144. assert self.per_shm_dict.get(dict_key) == test_rand_string
  145. # Ensure the file exists after dict is deleted
  146. del self.per_shm_dict
  147. assert os.path.isfile(self.dict_filename(tmpdir))
  148. # Re-open dict from test file
  149. self.create_per_shm_dict(tmpdir)
  150. # Make sure the contents are still the same after reopening
  151. assert self.per_shm_dict.get(dict_key) == test_rand_string
  152. def test_get_set(self, tmpdir, dict_key):
  153. test_rand_string = rand_string(10)
  154. self.create_per_shm_dict(tmpdir)
  155. self.create_vol_shm_dict()
  156. # Assign value to key and make sure it gets set
  157. self.per_shm_dict[dict_key] = test_rand_string
  158. # Check that the persistent dict has the key
  159. assert self.per_shm_dict.has_key(dict_key) is True
  160. # Check the value of the key to ensure no corruption
  161. assert self.per_shm_dict[dict_key] == test_rand_string
  162. # Use update to set the key in the volatile dict
  163. # from the value in the persistent dict
  164. self.vol_shm_dict.update(self.per_shm_dict)
  165. # Check the keys and values of the volatile dict
  166. assert list(self.vol_shm_dict.keys()) == [dict_key]
  167. assert list(self.vol_shm_dict.values()) == [test_rand_string]
  168. assert list(self.vol_shm_dict.items()) == [(dict_key, test_rand_string)]
  169. for key in iter(self.vol_shm_dict):
  170. assert self.vol_shm_dict[key] == test_rand_string
  171. # Test popping a key from the dictionary
  172. assert self.vol_shm_dict.pop(dict_key) == test_rand_string
  173. assert (dict_key in self.vol_shm_dict) == False
  174. # Delete key and make sure it's deleted
  175. del self.per_shm_dict[dict_key]
  176. assert self.per_shm_dict.get(dict_key) is None
  177. def test_copy(self, tmpdir, dict_key):
  178. test_rand_string = rand_string(10)
  179. self.create_per_shm_dict(tmpdir)
  180. # Assign value to a key and then copy to a
  181. # testing dictionary object.
  182. self.per_shm_dict[dict_key] = test_rand_string
  183. dict_copy = self.per_shm_dict.copy()
  184. # Delete key from persistent dict and make sure
  185. # it's deleted only from the persistent dict as
  186. # the dict copy should be a new dict and not a
  187. # pointer to the persistent dict.
  188. assert self.per_shm_dict[dict_key] == test_rand_string
  189. del self.per_shm_dict[dict_key]
  190. assert self.per_shm_dict.get(dict_key) is None
  191. assert dict_copy[dict_key] == test_rand_string
  192. del dict_copy[dict_key]
  193. assert dict_copy.get(dict_key) is None
  194. def test_equality(self, tmpdir, dict_key):
  195. test_rand_string = rand_string(10)
  196. self.create_per_shm_dict(tmpdir)
  197. self.create_vol_shm_dict()
  198. # Assign value to a key and then copy the pointer
  199. # to the volatile dict to another object.
  200. self.vol_shm_dict[dict_key] = test_rand_string
  201. self.per_shm_dict[dict_key] = test_rand_string
  202. dict_dup = self.vol_shm_dict
  203. assert self.vol_shm_dict == dict_dup
  204. assert (self.vol_shm_dict == {dict_key: test_rand_string}) == False
  205. assert self.vol_shm_dict != {dict_key: test_rand_string}
  206. assert self.vol_shm_dict != self.per_shm_dict
  207. def test_len(self, dict_key):
  208. self.create_vol_shm_dict()
  209. # Make sure dict starts out empty
  210. assert len(self.vol_shm_dict) == 0
  211. # Add 1 key to the dict and make sure dict has 1 key
  212. self.vol_shm_dict[dict_key] = rand_string(10)
  213. assert len(self.vol_shm_dict) == 1
  214. def test_clear(self, dict_key):
  215. test_rand_string = rand_string(10)
  216. self.create_vol_shm_dict()
  217. # Assign value to key and make sure it gets set
  218. self.vol_shm_dict[dict_key] = test_rand_string
  219. assert self.vol_shm_dict[dict_key] == test_rand_string
  220. # Clean dict and ensure it's empty again
  221. self.vol_shm_dict.clear()
  222. assert len(self.vol_shm_dict) == 0
  223. def test_lock(self):
  224. self.create_vol_shm_dict()
  225. # Simulate another thread/process having a lock
  226. # with the semaphore value at 0 and the internal
  227. # semaphore not set to true.
  228. while self.vol_shm_dict.semaphore.value > 0:
  229. self.vol_shm_dict.semaphore.acquire()
  230. with pytest.raises(posix_ipc.BusyError, match=r".*Semaphore is busy.*"):
  231. repr(self.vol_shm_dict)
  232. # Set auto_unlock to True to ensure the semaphore
  233. # is automatically released
  234. self.vol_shm_dict.auto_unlock = True
  235. repr(self.vol_shm_dict)