Experimental Physics and Industrial Control System
> I would like to test a streamdevice enabled IOC Application against a simulated device.
>
> Has someone already built a Python-based Device simulator?
Here (attached) is another. This one takes a file of simulated responses as a command line
parameter (sample attached). When each message is received from the IOC, it sends back the next response in the file. The program itself does not know anything about the messages - for each sequence of commands, you have to construct a file that contains the responses to just those commands,
in the right order. A typical command line is:
python moxascript.py dmc-in.dat
then the program is waiting for the first message from the IOC. More directions appear in the header to the attache program.
Jon Jacky
"""
moxascript.py, server used by stTest.cmd instead of actual MOXA ports.
Similar to moxa.py, but instead of user entering or selecting responses,
read responses from script named in argv[1].
scripts are in the same format as *-in.* files in
/radonc/cnts/demo/test: They have all the lines sent by SCX ctlr back
to IOC, including the ack line (space) and completion line ($).
But this program trims the \n from each line and then appends the
terminator sequence LF + CR when it sends it back.
argv[2] is optional port number, default 8080.
Based on ReceiverScript.py, which was based on echo server at
http://docs.python.org/library/socket.html
"""
import sys
import socket
def nresponse_lines(command):
""" commands expect different numbers of response lines
"""
if command.startswith('OUT'): # poll command
return 3 # ack, data. complete
else: # reset, INP commands
return 2 # ack, complete
# command line args and defaults
script = 'tmc.dat'
if len(sys.argv) > 1:
script = sys.argv[1]
PORT = 8080
if len(sys.argv) > 2:
PORT = int(sys.argv[2])
# responses to SCX controllers, copied from tmc.proto
term = '\n\r' # LF then CR at the end of each response line
ack = ' ' + term # acknowledgement string, begins response
complete = '$' + term # last line of response
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Ensure we release listener socket immediately when program exits,
# to avoid socket.error: [Errno 48] Address already in use
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
rcvbuf = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
mss = s.getsockopt(socket.SOL_IP, socket.TCP_MAXSEG) # SOL_TCP returns 0
s.bind(('localhost', PORT))
s.listen(1)
conn, addr = s.accept()
print 'Connected by %s with RCVBUF %s, MAXSEG %s' % \
(addr, rcvbuf, mss)
fd = open(script, 'r')
sn = 0 # command serial command
linelen = 120
reading_script = True
while True:
# first receive from IOC, EPICS protocol
command = conn.recv(linelen) # block here waiting for IOC to send
sn += 1
if not command:
print 'connection lost'
break
print command
#print '%d: %s'% (sn, command)
nresponse = nresponse_lines(command)
for iresponse in range(nresponse):
if reading_script:
response = fd.readline().rstrip('\n') # but keep blank for ack line
if not response:
reading_script = False; # readline on EOF returns empty string
if not reading_script:
response = raw_input('moxa>')
print response
conn.send(response + term) # add LF CR at the end of each response
conn.close()
Attachment:
dmc-in.dat
Description: Netscape Proxy Auto Config
- Navigate by Date:
- Prev:
Re: Is there a device simulator to test against an Asyn/StreamDevice IOC App Ernest L. Williams Jr.
- Next:
HLS FOGALE James Rezende Piton - LNLS
- Index:
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
<2015>
2016
2017
2018
2019
2020
2021
2022
2023
2024
- Navigate by Thread:
- Prev:
Re: Is there a device simulator to test against an Asyn/StreamDevice IOC App Ernest L. Williams Jr.
- Next:
HLS FOGALE James Rezende Piton - LNLS
- Index:
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
<2015>
2016
2017
2018
2019
2020
2021
2022
2023
2024