channelarchiver.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. # -*- coding: utf-8 -*-
  2. try:
  3. from xmlrpc.client import Server
  4. except ImportError: # Python 2
  5. from xmlrpclib import Server
  6. from collections import defaultdict
  7. from itertools import groupby
  8. from . import codes
  9. from . import utils
  10. from .models import ChannelData, ArchiveProperties, Limits
  11. from .exceptions import ChannelNotFound, ChannelKeyMismatch
  12. class Archiver(object):
  13. """Class for interacting with an EPICS Channel Access Archiver."""
  14. def __init__(self, host):
  15. """
  16. Args:
  17. host (str): URL to your archiver's ArchiveDataServer.cgi. Will
  18. look something like: http://cr01arc01/cgi-bin/ArchiveDataServer.cgi
  19. """
  20. super(Archiver, self).__init__()
  21. self.server = Server(host)
  22. self.archiver = self.server.archiver
  23. self.archives_for_channel = defaultdict(list)
  24. def scan_archives(self, channels=None):
  25. """
  26. Determine which archives contain the specified channels. This
  27. can be called prior to calling .get() with scan_archives=False
  28. to speed up data retrieval.
  29. Args:
  30. channels (Optional[List[str]]): The channel names to scan for.
  31. If omitted, all channels will be scanned for.
  32. """
  33. if channels is None:
  34. channels = []
  35. elif isinstance(channels, utils.StrType):
  36. channels = [channels]
  37. channel_pattern = "|".join(channels)
  38. list_emptied_for_channel = defaultdict(bool)
  39. for archive in self.archiver.archives():
  40. archive_key = archive["key"]
  41. archives = self.archiver.names(archive_key, channel_pattern)
  42. for archive_details in archives:
  43. channel = archive_details["name"]
  44. start_time = utils.datetime_from_sec_and_nano(
  45. archive_details["start_sec"],
  46. archive_details["start_nano"],
  47. utils.utc,
  48. )
  49. end_time = utils.datetime_from_sec_and_nano(
  50. archive_details["end_sec"], archive_details["end_nano"], utils.utc
  51. )
  52. properties = ArchiveProperties(archive_key, start_time, end_time)
  53. if list_emptied_for_channel[channel]:
  54. self.archives_for_channel[channel].append(properties)
  55. else:
  56. self.archives_for_channel[channel][:] = [properties]
  57. list_emptied_for_channel[channel] = True
  58. def _parse_values(self, archive_data, tz):
  59. channel_data = ChannelData(
  60. channel=archive_data["name"],
  61. data_type=archive_data["type"],
  62. elements=archive_data["count"],
  63. )
  64. meta_data = archive_data["meta"]
  65. if meta_data["type"] == 0:
  66. channel_data.states = meta_data["states"]
  67. else:
  68. channel_data.display_limits = Limits(
  69. meta_data["disp_low"], meta_data["disp_high"]
  70. )
  71. channel_data.alarm_limits = Limits(
  72. meta_data["alarm_low"], meta_data["alarm_high"]
  73. )
  74. channel_data.warn_limits = Limits(
  75. meta_data["warn_low"], meta_data["warn_high"]
  76. )
  77. channel_data.display_precision = meta_data["prec"]
  78. channel_data.units = meta_data["units"]
  79. statuses = []
  80. severities = []
  81. times = []
  82. values = []
  83. for sample in archive_data["values"]:
  84. if channel_data.elements == 1:
  85. values.append(sample["value"][0])
  86. else:
  87. values.append(sample["value"])
  88. statuses.append(sample["stat"])
  89. severities.append(sample["sevr"])
  90. times.append(
  91. utils.datetime_from_sec_and_nano(sample["secs"], sample["nano"], tz)
  92. )
  93. channel_data.values = values
  94. channel_data.times = times
  95. channel_data.statuses = statuses
  96. channel_data.severities = severities
  97. return channel_data
  98. def get(
  99. self,
  100. channels,
  101. start,
  102. end,
  103. limit=1000,
  104. interpolation="linear",
  105. scan_archives=True,
  106. archive_keys=None,
  107. tz=None,
  108. ):
  109. """
  110. Retrieves archived data.
  111. Args:
  112. channels (str or List[str]): The channels to get data for.
  113. start (str or datetime): Start time as a datetime or ISO 8601
  114. formatted string. If no timezone is specified, assumes
  115. local timezone.
  116. end (str or datetime): End time.
  117. limit (Optional[int]): Number of data points to aim to retrieve.
  118. The actual number returned may differ depending on the
  119. number of points in the archive, the interpolation method
  120. and the maximum allowed points set by the archiver.
  121. interpolation (Optional[str]): Method of interpolating the data.
  122. Should be one of 'raw', 'spreadsheet', 'averaged',
  123. 'plot-binning' or 'linear'.
  124. scan_archives (Optional[bool]): Whether or not to perform a scan to
  125. determine which archives the channels are on. If this is to
  126. be False .scan_archives() should have been called prior to
  127. calling .get().
  128. Default: True
  129. archive_keys (Optional[List[int]]): The keys of the archives to get
  130. data from. Should be the same length as channels. If this
  131. is omitted the archives with the greatest coverage of the
  132. requested time interval will be used.
  133. tz (Optional[tzinfo]): The timezone that datetimes should be returned
  134. in. If omitted, the timezone of start will be used.
  135. Returns:
  136. ChannelData objects. If the channels parameters was a string the
  137. returned value will be a single ChannelData object. If channels was a
  138. list of strings a list of ChannelData objects will be returned.
  139. """
  140. received_str = isinstance(channels, utils.StrType)
  141. if received_str:
  142. channels = [channels]
  143. if archive_keys is not None:
  144. archive_keys = [archive_keys]
  145. if isinstance(start, utils.StrType):
  146. start = utils.datetime_from_isoformat(start)
  147. if isinstance(end, utils.StrType):
  148. end = utils.datetime_from_isoformat(end)
  149. if isinstance(interpolation, utils.StrType):
  150. interpolation = codes.interpolation[interpolation]
  151. if start.tzinfo is None:
  152. start = utils.localize_datetime(start, utils.local_tz)
  153. if end.tzinfo is None:
  154. end = utils.localize_datetime(end, utils.local_tz)
  155. if tz is None:
  156. tz = start.tzinfo
  157. # Convert datetimes to seconds and nanoseconds for archiver request
  158. start_sec, start_nano = utils.sec_and_nano_from_datetime(start)
  159. end_sec, end_nano = utils.sec_and_nano_from_datetime(end)
  160. if archive_keys is None:
  161. if scan_archives:
  162. self.scan_archives(channels)
  163. channels_for_key = defaultdict(list)
  164. for channel in channels:
  165. greatest_overlap = None
  166. key_with_greatest_overlap = None
  167. archives = self.archives_for_channel[channel]
  168. for archive_key, archive_start, archive_end in archives:
  169. overlap = utils.overlap_between_intervals(
  170. start, end, archive_start, archive_end
  171. )
  172. if greatest_overlap is None or overlap > greatest_overlap:
  173. key_with_greatest_overlap = archive_key
  174. greatest_overlap = overlap
  175. if key_with_greatest_overlap is None:
  176. raise ChannelNotFound(
  177. f"Channel {channel} not found in any archive (a scan may be needed)"
  178. )
  179. channels_for_key[key_with_greatest_overlap].append(channel)
  180. else:
  181. # Group by archive key so we can request multiple channels
  182. # with a single query
  183. if len(channels) != len(archive_keys):
  184. raise ChannelKeyMismatch(
  185. "Number of archive keys must equal number of channels."
  186. )
  187. key_for_channel = dict(zip(channels, archive_keys))
  188. grouping_func = key_for_channel.get
  189. groups = groupby(sorted(channels, key=grouping_func), key=grouping_func)
  190. channels_for_key = {key: list(channels) for key, channels in groups}
  191. return_data = [None] * len(channels)
  192. for archive_key, channels_on_archive in channels_for_key.items():
  193. data = self.archiver.values(
  194. archive_key,
  195. channels_on_archive,
  196. start_sec,
  197. start_nano,
  198. end_sec,
  199. end_nano,
  200. limit,
  201. interpolation,
  202. )
  203. for archive_data in data:
  204. channel_data = self._parse_values(archive_data, tz)
  205. channel_data.archive_key = archive_key
  206. channel_data.interpolation = interpolation
  207. index = channels.index(channel_data.channel)
  208. return_data[index] = channel_data
  209. return return_data if not received_str else return_data[0]