|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
-
- # Standard library imports
- import base64
-
- try:
- from collections.abc import MutableMapping
- except ImportError:
- from collections import MutableMapping
-
- from contextlib import contextmanager
- import hashlib
- import logging
- from math import ceil
- import mmap
- import os
- import pickle # nosec
- import sys
- import threading
-
- # Related third party imports (If you used pip/apt/yum to install)
- import posix_ipc
- import six
-
- # Local application/library specific imports (Look ma! I wrote it myself!)
- from ._version import __version__
-
- __author__ = "Nate Bohman"
- __credits__ = ["Nate Bohman"]
- __license__ = "LGPL-3"
- __maintainer__ = "Nate Bohman"
- __email__ = "natrinicle@natrinicle.com"
- __status__ = "Production"
-
- logger = logging.getLogger(__name__) # pylint: disable=invalid-name
-
-
- class SHMDict(MutableMapping):
- """Python shared memory dictionary."""
-
- def __init__(self, name, persist=False, lock_timeout=30, auto_unlock=False):
- """Standard init method.
-
- :param name: Name for shared memory and semaphore if volatile
- or path to file if persistent.
- :param persist: True if name is the path to a file and this
- shared memory dictionary should be written
- out to the file for persistence between runs
- and/or processes.
- :param lock_timeout: Time in seconds before giving up on
- acquiring an exclusive lock to the
- dictionary.
- :param auto_unlock: If the lock_timeout is hit, and this
- is True, automatically bypass the
- lock and use the dictionary anyway.
- :type name: :class:`str`
- :type persist: :class:`bool`
- :type lock_timeout: :class:`int` or :class:`float`
- :type auto_unlock: :class:`bool`
- """
- self.name = name
- self.persist_file = None
- self.lock_timeout = lock_timeout
- self.auto_unlock = auto_unlock
- self._semaphore = None
- self._map_file = None
- self.__thread_local = threading.local()
- self.__thread_local.semaphore = False
- self.__internal_dict = None
- self.__dirty = False
-
- if persist is True:
- self.persist_file = self.name
- if self.persist_file.startswith("~"):
- self.persist_file = os.path.expanduser(self.persist_file)
- self.persist_file = os.path.abspath(os.path.realpath(self.persist_file))
-
- super(SHMDict, self).__init__()
-
- def _safe_name(self, prefix=""):
- """IPC object safe name creator.
-
- Semaphores and Shared Mmeory names allow up to 256 characters (dependong on OS) and must
- begin with a /.
-
- :param prefix: A string to prepend followed by _ and
- then the dictionary's name.
- :type prefix: :class:`str`
- """
- # Hash lengths
- # SHA1: 28
- # SHA256: 44
- # SHA512: 88
- sha_hash = hashlib.sha512()
- sha_hash.update("_".join([prefix, str(self.name)]).encode("utf-8"))
- b64_encode = base64.urlsafe_b64encode(sha_hash.digest())
- return "/{}".format(b64_encode)
-
- @property
- def safe_sem_name(self):
- """Unique semaphore name based on the dictionary name."""
- return self._safe_name("sem")
-
- @property
- def safe_shm_name(self):
- """Unique shared memory segment name based on the dictionary name."""
- return self._safe_name("shm")
-
- @property
- def semaphore(self):
- """Create or return already existing semaphore."""
- if self._semaphore is not None:
- return self._semaphore
-
- try:
- self._semaphore = posix_ipc.Semaphore(self.safe_sem_name)
- except posix_ipc.ExistentialError:
- self._semaphore = posix_ipc.Semaphore(
- self.safe_sem_name, flags=posix_ipc.O_CREAT, initial_value=1
- )
- return self._semaphore
-
- @property
- def shared_mem(self):
- """Create or return already existing shared memory object."""
- try:
- return posix_ipc.SharedMemory(
- self.safe_shm_name, size=len(pickle.dumps(self.__internal_dict))
- )
- except posix_ipc.ExistentialError:
- return posix_ipc.SharedMemory(
- self.safe_shm_name, flags=posix_ipc.O_CREX, size=posix_ipc.PAGE_SIZE
- )
-
- @property
- def map_file(self):
- """Create or return mmap file resizing if necessary."""
- if self._map_file is None:
- self._map_file = mmap.mmap(self.shared_mem.fd, self.shared_mem.size)
- self.shared_mem.close_fd()
-
- self._map_file.resize(
- int(
- ceil(float(len(pickle.dumps(self.__internal_dict, 2))) / mmap.PAGESIZE)
- * mmap.PAGESIZE
- )
- )
- return self._map_file
-
- def __load_dict(self):
- """Load dictionary from shared memory or file if persistent and memory empty."""
- # Read in internal data from map_file
- self.map_file.seek(0)
- try:
- self.__internal_dict = pickle.load(self.map_file) # nosec
- except (KeyError, pickle.UnpicklingError, EOFError):
- # Curtis Pullen found that Python 3.4 throws EOFError
- # instead of UnpicklingError that Python 3.6 throws
- # when attempting to unpickle an empty file.
- pass
-
- # If map_file is empty and persist_file is true, treat
- # self.name as filename and attempt to load from disk.
- if self.__internal_dict is None and self.persist_file is not None:
- try:
- with open(self.persist_file, "rb") as pfile:
- self.__internal_dict = pickle.load(pfile) # nosec
- except IOError:
- pass
-
- # If map_file is empty, persist_file is False or
- # self.name is empty create a new empty dictionary.
- if self.__internal_dict is None:
- self.__internal_dict = {}
-
- def __save_dict(self):
- """Save dictionary into shared memory and file if persistent."""
- # Write out internal dict to map_file
- if self.__dirty is True:
- self.map_file.seek(0)
- pickle.dump(self.__internal_dict, self.map_file, 2)
-
- if self.persist_file is not None:
- with open(self.persist_file, "wb") as pfile:
- pickle.dump(self.__internal_dict, pfile, 2)
-
- self.__dirty = False
-
- def _acquire_lock(self):
- """Acquire an exclusive dict lock.
-
- Loads dictionary data from memory or disk (if persistent) to
- ensure data is up to date when lock is requested.
-
- .. warnings also::
- MacOS has a number of shortcomings with regards to
- semaphores and shared memory segments, this is one
- method contains one of them.
-
- When the timeout is > 0, the call will wait no longer than
- timeout seconds before either returning (having acquired
- the semaphore) or raising a BusyError.
- On platforms that don't support the sem_timedwait() API,
- a timeout > 0 is treated as infinite. The call will not
- return until its wait condition is satisfied.
- Most platforms provide sem_timedwait(). macOS is a notable
- exception. The module's Boolean constant
- SEMAPHORE_TIMEOUT_SUPPORTED is True on platforms that
- support sem_timedwait().
-
- -- http://semanchuk.com/philip/posix_ipc/
- """
- if self.__thread_local.semaphore is False:
- try:
- self.semaphore.acquire(self.lock_timeout)
- self.__thread_local.semaphore = True
- except posix_ipc.BusyError:
- if self.auto_unlock is True:
- self.__thread_local.semaphore = True
- else:
- six.reraise(*sys.exc_info())
-
- self.__load_dict()
-
- def _release_lock(self):
- """Release the exclusive semaphore lock."""
- if self.__thread_local.semaphore is True:
- self.__save_dict()
- self.semaphore.release()
- self.__thread_local.semaphore = False
-
- @contextmanager
- def exclusive_lock(self):
- """A context manager for the lock to allow with statements for exclusive access."""
- self._acquire_lock()
- yield
- self._release_lock()
-
- def __del__(self):
- """Destroy the object nicely."""
- self.map_file.close()
- self.shared_mem.unlink()
- self.semaphore.unlink()
-
- def __setitem__(self, key, value):
- """Set a key in the dictionary to a value."""
- with self.exclusive_lock():
- self.__internal_dict[key] = value
- self.__dirty = True
-
- def __getitem__(self, key):
- """Get the value of a key from the dictionary."""
- with self.exclusive_lock():
- return self.__internal_dict[key]
-
- def __repr__(self):
- """Represent the dictionary in a human readable format."""
- with self.exclusive_lock():
- return repr(self.__internal_dict)
-
- def __len__(self):
- """Return the length of the dictionary."""
- with self.exclusive_lock():
- return len(self.__internal_dict)
-
- def __delitem__(self, key):
- """Remove an item from the dictionary."""
- with self.exclusive_lock():
- del self.__internal_dict[key]
- self.__dirty = True
-
- def clear(self):
- """Completely clear the dictionary."""
- with self.exclusive_lock():
- self.__dirty = True
- return self.__internal_dict.clear()
-
- def copy(self):
- """Create and return a copy of the internal dictionary."""
- with self.exclusive_lock():
- return self.__internal_dict.copy()
-
- def has_key(self, key):
- """Return true if a key is in the internal dictionary."""
- with self.exclusive_lock():
- return key in self.__internal_dict
-
- def __eq__(self, other):
- """Shared memory dictionary equality check with another shared memory dictionary."""
- return isinstance(other, SHMDict) and self.safe_shm_name == other.safe_shm_name
-
- def __ne__(self, other):
- """Shared memory dictionary non-equality check with another shared memory dictionary."""
- return not isinstance(other, SHMDict) or (
- isinstance(other, SHMDict) and self.safe_shm_name != other.safe_shm_name
- )
-
- def __contains__(self, key):
- """Check if a key exists inside the dictionary."""
- with self.exclusive_lock():
- return key in self.__internal_dict
-
- def __iter__(self):
- """Iterate through the dictionary keys."""
- with self.exclusive_lock():
- return iter(self.__internal_dict)
|