|
|
Experimental Physics and
| ||||||||||||||||
|
|
Sure but unfortunately this high level control monitor IOC was done using the pyDevSup module (https://github.com/mdavidsaver/pyDevSup) instead of just using EPICS base and a normal subroutine. So it might not apply to you. I attached the python file in case that helps at all (aa_status maps to a boolean PV). We include that PV in our alarm handler which has definitely come in handy. Best Tynan On Wed, May 3, 2023 at 11:49 AM Manoussakis, Adamandios <[email protected]> wrote:
import devsup.ptable as PT
from devsup.hooks import addHook
import time
from threading import Thread
import signal
import channelfinder
import requests
import urllib3
urllib3.disable_warnings()
class hlcSupport(PT.TableBase):
cf_status = PT.Parameter(iointr=True)
aa_status = PT.Parameter(iointr=True)
def __init__(self, name, pv_prefix):
super().__init__(name=name)
self.cf = channelfinder.ChannelFinderClient(BaseURL="https://myhost.lbl.gov/ChannelFinder")
self.aa = "https://myhost.lbl.gov/getApplianceInfo"
self.aa_identity = "myappliance"
self.stop = False
self.sleep_time = 30
self.cf_thread = Thread(target=self.cf_mon)
self.cf_thread.start()
self.aa_thread = Thread(target=self.aa_mon)
self.aa_thread.start()
def stop_threads(self):
self.stop = True
def aa_mon(self):
while True:
if self.stop:
return
try:
response = requests.get(self.aa)
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Error in HTTP respsonse")
print(err)
self.aa_status.value = False
self.aa_status.notify()
time.sleep(15)
continue
except requests.exceptions.RequestException as e:
print("Couldn't connect to archiver appliance")
print(e)
self.aa_status.value = False
self.aa_status.notify()
time.sleep(15)
continue
aa_json = response.json()
if aa_json['identity'] == self.aa_identity:
self.aa_status.value = True
self.aa_status.notify()
else:
self.aa_status.value = False
self.aa_status.notify()
time.sleep(self.sleep_time)
def cf_mon(self):
while True:
if self.stop:
return
try:
heartbeat_pvs = self.cf.find(name="*HEARTBEAT")
if len(heartbeat_pvs) > 2:
self.cf_status.value = True
self.cf_status.notify()
else:
self.cf_status.value = False
self.cf_status.notify()
except requests.ConnectionError:
print("Couldn't connect to channel finder")
self.cf_status.value = False
self.cf_status.notify()
time.sleep(self.sleep_time)
def build(prefix):
sup = hlcSupport(name='hlc', pv_prefix=prefix)
signal.signal(signal.SIGINT, signal.SIG_DFL)
def stopThreads():
sup.stop_threads()
addHook('AtIocExit', stopThreads)
| ||||||||||||||||
| ANJ, 19 Mar 2026 |
·
Home
·
News
·
About
·
Talk
·
Base
·
Modules
·
Extensions
·
· Distributions · Download · Documents · Links · Licensing · |