New Saltstack Salt formula

saltgraph.py 6.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. # -*- coding: utf-8 -*-
  2. '''
  3. Saltgraph engine for catching returns of state runs, parsing them
  4. and passing them to flat database of latest Salt resource runs.
  5. '''
  6. # Import python libs
  7. from __future__ import absolute_import
  8. import datetime
  9. import json
  10. import logging
  11. # Import salt libs
  12. import salt.utils.event
  13. # Import third party libs
  14. try:
  15. import psycopg2
  16. import psycopg2.extras
  17. HAS_POSTGRES = True
  18. except ImportError:
  19. HAS_POSTGRES = False
  20. __virtualname__ = 'saltgraph'
  21. log = logging.getLogger(__name__)
  22. def __virtual__():
  23. if not HAS_POSTGRES:
  24. return False, 'Could not import saltgraph engine. python-psycopg2 is not installed.'
  25. return __virtualname__
  26. def _get_conn(options={}):
  27. '''
  28. Return a postgres connection.
  29. '''
  30. host = options.get('host', '127.0.0.1')
  31. user = options.get('user', 'salt')
  32. passwd = options.get('passwd', 'salt')
  33. datab = options.get('db', 'salt')
  34. port = options.get('port', 5432)
  35. return psycopg2.connect(
  36. host=host,
  37. user=user,
  38. password=passwd,
  39. database=datab,
  40. port=port)
  41. def _close_conn(conn):
  42. '''
  43. Close the Postgres connection
  44. '''
  45. conn.commit()
  46. conn.close()
  47. def _get_lowstate_data(options={}):
  48. '''
  49. TODO: document this method
  50. '''
  51. conn = _get_conn(options)
  52. cur = conn.cursor()
  53. try:
  54. # you can only do this on Salt Masters minion
  55. lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True})
  56. except:
  57. lowstate_req = {}
  58. for minion, lowstate_ret in lowstate_req.items():
  59. if lowstate_ret.get('retcode') != 0:
  60. continue
  61. for resource in lowstate_ret.get('ret', []):
  62. low_sql = '''INSERT INTO salt_resources
  63. (id, resource_id, host, service, module, fun, status)
  64. VALUES (%s, %s, %s, %s, %s, %s, %s)
  65. ON CONFLICT (id) DO UPDATE
  66. SET resource_id = excluded.resource_id,
  67. host = excluded.host,
  68. service = excluded.service,
  69. module = excluded.module,
  70. fun = excluded.fun,
  71. alter_time = current_timestamp'''
  72. rid = "%s|%s" % (minion, resource.get('__id__'))
  73. cur.execute(
  74. low_sql, (
  75. rid,
  76. resource.get('__id__'),
  77. minion,
  78. resource.get('__sls__'),
  79. resource.get('state') if 'state' in resource else resource.get('module'),
  80. resource.get('fun'),
  81. 'unknown'
  82. )
  83. )
  84. conn.commit()
  85. if lowstate_req:
  86. meta_sql = '''INSERT INTO salt_resources_meta
  87. (id, options)
  88. VALUES (%s, %s)
  89. ON CONFLICT (id) DO UPDATE
  90. SET options = excluded.options,
  91. alter_time = current_timestamp'''
  92. cur.execute(
  93. meta_sql, (
  94. 'lowstate_data',
  95. '{}'
  96. )
  97. )
  98. _close_conn(conn)
  99. def _up_to_date(options={}):
  100. '''
  101. TODO: document this method
  102. '''
  103. conn = _get_conn(options)
  104. cur = conn.cursor()
  105. #cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
  106. ret = False
  107. # if lowstate data are older than 1 day, refresh them
  108. cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',))
  109. alter_time = cur.fetchone()
  110. if alter_time:
  111. now = datetime.datetime.utcnow()
  112. day = datetime.timedelta(days=1)
  113. time_diff = now - alter_time[0].replace(tzinfo=None)
  114. if time_diff < day:
  115. ret = True
  116. else:
  117. skip = False
  118. _close_conn(conn)
  119. return ret
  120. def _update_resources(event, options):
  121. '''
  122. TODO: document this method
  123. '''
  124. conn = _get_conn(options)
  125. cur = conn.cursor()
  126. cur.execute('SELECT id FROM salt_resources')
  127. resources_db = [res[0] for res in cur.fetchall()]
  128. resources = event.get('return', {}).values()
  129. for resource in resources:
  130. rid = '%s|%s' % (event.get('id'), resource.get('__id__'))
  131. if rid in resources_db:
  132. status = 'unknown'
  133. if resource.get('result', None) is not None:
  134. status = 'success' if resource.get('result') else 'failed'
  135. resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp)
  136. WHERE id = %s'''
  137. cur.execute(
  138. resource_sql, (
  139. status,
  140. repr(resource),
  141. rid
  142. )
  143. )
  144. conn.commit()
  145. _close_conn(conn)
  146. def start(host='salt', user='salt', password='salt', database='salt', port=5432, **kwargs):
  147. '''
  148. Listen to events and parse Salt state returns
  149. '''
  150. if __opts__['__role'] == 'master':
  151. event_bus = salt.utils.event.get_master_event(
  152. __opts__,
  153. __opts__['sock_dir'],
  154. listen=True)
  155. else:
  156. event_bus = salt.utils.event.get_event(
  157. 'minion',
  158. transport=__opts__['transport'],
  159. opts=__opts__,
  160. sock_dir=__opts__['sock_dir'],
  161. listen=True)
  162. log.debug('Saltgraph engine started')
  163. while True:
  164. event = event_bus.get_event()
  165. supported_funcs = ['state.sls', 'state.apply', 'state.highstate']
  166. if event and event.get('fun', None) in supported_funcs:
  167. test = 'test=true' in [arg.lower() for arg in event.get('fun_args', [])]
  168. if not test:
  169. options = {
  170. 'host': host,
  171. 'user': user,
  172. 'passwd': password,
  173. 'db': database,
  174. 'port': port
  175. }
  176. is_reclass = [arg for arg in event.get('fun_args', []) if arg.startswith('reclass')]
  177. if is_reclass or not _up_to_date(options):
  178. _get_lowstate_data(options)
  179. _update_resources(event, options)