Experimental Physics and
| |||||||||||||||||
|
Sure but unfortunately this high level control monitor IOC was done using the pyDevSup module (https://github.com/mdavidsaver/pyDevSup) instead of just using EPICS base and a normal subroutine. So it might not apply to you. I attached the python file in case that helps at all (aa_status maps to a boolean PV). We include that PV in our alarm handler which has definitely come in handy. Best Tynan On Wed, May 3, 2023 at 11:49 AM Manoussakis, Adamandios <manoussakis1 at llnl.gov> wrote:
import devsup.ptable as PT from devsup.hooks import addHook import time from threading import Thread import signal import channelfinder import requests import urllib3 urllib3.disable_warnings() class hlcSupport(PT.TableBase): cf_status = PT.Parameter(iointr=True) aa_status = PT.Parameter(iointr=True) def __init__(self, name, pv_prefix): super().__init__(name=name) self.cf = channelfinder.ChannelFinderClient(BaseURL="https://myhost.lbl.gov/ChannelFinder") self.aa = "https://myhost.lbl.gov/getApplianceInfo" self.aa_identity = "myappliance" self.stop = False self.sleep_time = 30 self.cf_thread = Thread(target=self.cf_mon) self.cf_thread.start() self.aa_thread = Thread(target=self.aa_mon) self.aa_thread.start() def stop_threads(self): self.stop = True def aa_mon(self): while True: if self.stop: return try: response = requests.get(self.aa) response.raise_for_status() except requests.exceptions.HTTPError as err: print("Error in HTTP respsonse") print(err) self.aa_status.value = False self.aa_status.notify() time.sleep(15) continue except requests.exceptions.RequestException as e: print("Couldn't connect to archiver appliance") print(e) self.aa_status.value = False self.aa_status.notify() time.sleep(15) continue aa_json = response.json() if aa_json['identity'] == self.aa_identity: self.aa_status.value = True self.aa_status.notify() else: self.aa_status.value = False self.aa_status.notify() time.sleep(self.sleep_time) def cf_mon(self): while True: if self.stop: return try: heartbeat_pvs = self.cf.find(name="*HEARTBEAT") if len(heartbeat_pvs) > 2: self.cf_status.value = True self.cf_status.notify() else: self.cf_status.value = False self.cf_status.notify() except requests.ConnectionError: print("Couldn't connect to channel finder") self.cf_status.value = False self.cf_status.notify() time.sleep(self.sleep_time) def build(prefix): sup = hlcSupport(name='hlc', pv_prefix=prefix) signal.signal(signal.SIGINT, signal.SIG_DFL) def stopThreads(): sup.stop_threads() addHook('AtIocExit', stopThreads)
| ||||||||||||||||
ANJ, 04 May 2023 |
·
Home
·
News
·
About
·
Base
·
Modules
·
Extensions
·
Distributions
·
Download
·
· Search · EPICS V4 · IRMIS · Talk · Bugs · Documents · Links · Licensing · |