UserDataProcessor

class pvapy.hpc.userDataProcessor.UserDataProcessor

Bases: object

Class that serves as a base for any user implementation of a processor class suitable for usage with the streaming framework. Interface methods will be called at different stages of the processing workflow.

The following variables will be set after processor instance is created and before processing starts:

- logger (logging.Logger) : logger object

- processorId (int) : processor id

- inputChannel (str) : input channel

- outputChannel (str) : output channel

- objectIdField (str) : name of the object id field

- pvaServer (PvaServer) : PVA Server instance for publishing output objects

- metadataQueueMap (dict) : dictionary of available PvObject queues for metadata channels

UserDataProcessor(configDict={})

Parameter:

configDict (dict) - dictionary containing configuration parameters

configure(configDict)

Method invoked at user initiated runtime configuration changes.

Parameter:

configDict (dict) - dictionary containing configuration parameters

getOutputPvObjectType(pvObject)

Method invoked at processing startup that defines PVA structure for the output (processed) PvObject. This method is called immediately after receiving the first input channel update.

There is no need to override this method if the structure of input and output objects are the same, or if the application will not publish processing output.

Parameter:

pvObject (PvObject) - input channel object

Returns:

PvObject with the same structure as generated by the process() method

getStats()

Method invoked periodically for generating application statistics.

Returns:

Dictionary containing application statistics parameters

getStatsPvaTypes()

Method invoked at processing startup. It defines user application part of the status PvObject published on the status PVA channel.

Returns:

Dictionary containing PVA types for the application statistics parameters

process(pvObject)

Method invoked every time input channel updates its PV record.

Parameter:

pvObject (PvObject) - input channel object

resetStats()

Method invoked at user initiated application statistics reset.

start()

Method invoked at processing startup.

stop()

Method invoked at processing shutdown.

updateOutputChannel(pvObject)

Method that can be used for publishing processed object on the output PVA channel. It should be invoked by the user application itself as part of the process() method. Typically, there should be no need for overriding this method in the derived class.

Parameter:

pvObject (PvObject) - processed object

AdImageProcessor

class pvapy.hpc.adImageProcessor.AdImageProcessor

Bases: UserDataProcessor

Class that can be used as a base for user implementation of an Area Detector image processor class suitable for usage with the streaming framework.

AdImageProcessor(configDict={})

Parameter:

configDict (dict) - dictionary containing configuration parameters

configure(configDict)

Method invoked at user initiated runtime configuration changes.

Parameter:

configDict (dict) - dictionary containing configuration parameters

classmethod generateNtNdArray2D(imageId, image, nx=None, ny=None, dtype=None, compressorName=None, extraFieldsPvObject=None)

Generate new NtNdArray object from NumPy array containing 2D image data.

Parameter:

imageId (int) - Value for the ‘uniqueId’ field

Parameter:

image (numpy.array) - Image data

Parameter:

nx (int) - X dimension (number of columns), needed only for compressed image data

Parameter:

ny (int) - Y dimension (number of rows), needed only for compressed image data

Parameter:

dtype (numpy.dtype) - Array data type, needed only for compressed image data

Parameter:

compressorName (str) - Compressor name, needed only for compressed image data

Parameter:

extraFieldsPvbject (PvObject) - optional PvObject to be used for setting additional fields in the generated NtNdArray object

Returns:

NtNdArray object

classmethod getNtNdArrayDataFieldKey(image)

Get NtNdArray value (union) field key suitable for the given NumPy array.

Parameter:

image (numpy.array) - array containing image data

Returns:

NtNdArray union value field key. Possible return values are ‘ubyteValue’, ‘byteValue’, ‘ushortValue’, ‘shortValue’, ‘uintValue’, ‘intValue’, ‘ulongValue’, ‘longValue’, ‘floatValue’, or ‘doubleValue’.

getOutputPvObjectType(pvObject)

Method invoked at processing startup that defines PVA structure for the output (processed) PvObject. This method is called immediately after receiving the first input channel update.

There is no need to override this method if the structure of input and output objects are the same, or if the application will not publish processing output.

Parameter:

pvObject (PvObject) - input channel object

Returns:

PvObject with the same structure as generated by the process() method

getStats()

Method invoked periodically for generating application statistics.

Returns:

Dictionary containing application statistics parameters

getStatsPvaTypes()

Method invoked at processing startup. It defines user application part of the status PvObject published on the status PVA channel.

Returns:

Dictionary containing PVA types for the application statistics parameters

process(pvObject)

Method invoked every time input channel updates its PV record.

Parameter:

pvObject (PvObject) - input channel object

classmethod replaceNtNdArrayImage2D(ntNdArray, imageId, image, extraFieldsPvObject=None)

Replace 2D image data in the existing NtNdArray object. This method is slightly faster than generateNtNdArray2D().

Parameter:

ntNdArray (NtNdArray) - target NtNdArray object

Parameter:

imageId (int) - Value for the ‘uniqueId’ field

Parameter:

image (numpy.array) - Image data

Parameter:

extraFieldsPvbject (PvObject) - optional PvObject to be used for setting additional fields in the generated NtNdArray object

Returns:

NtNdArray object

resetStats()

Method invoked at user initiated application statistics reset.

classmethod reshapeNtNdArray(ntNdArray)

Reshape NtNdArray object and return tuple with image id, NumPy image array, image dimensions, color mode and NtNdArray value (union) field key.

Parameter:

ntNdArray (NtNdArray) - NtNdArray object

Returns:

Tuple (imageId,image,nx,ny,nz,colorMode,fieldKey).

start()

Method invoked at processing startup.

stop()

Method invoked at processing shutdown.

updateOutputChannel(pvObject)

Method that can be used for publishing processed object on the output PVA channel. It should be invoked by the user application itself as part of the process() method. Typically, there should be no need for overriding this method in the derived class.

Parameter:

pvObject (PvObject) - processed object

AdOutputFileProcessor

Hdf5AdImageWriter

DataDecryptor

AdImageDataDecryptor

DataEncryptor

AdImageDataEncryptor

UserMpDataProcessor

class pvapy.hpc.userMpDataProcessor.UserMpDataProcessor

Bases: object

Class that serves as a base for any user class that will be processing data from a multiprocessing queue. The following variables will be set after processor instance is created and before processing starts:

- logger (logging.Logger) : logger object

UserMpDataProcessor(processorId=1)

Parameter:

processorId (int) - processor id

configure(configDict)

Method invoked at user initiated runtime configuration changes.

Parameter:

configDict (dict) - dictionary containing configuration parameters

getStats()

Method invoked periodically for generating application statistics.

Returns:

Dictionary containing application statistics parameters

process(mpqObject)

Data processing method.

Parameter:

mpqObject (object) - object received from multiprocessing queue

resetStats()

Method invoked at user initiated application statistics reset.

start()

Method invoked at processing startup.

stop()

Method invoked at processing shutdown.

UserMpWorker

class pvapy.hpc.userMpWorker.UserMpWorker

Bases: Process

User multiprocessing worker class.

UserMpWorker(workerId, userMpDataProcessor, commandRequestQueue, commandResponseQueue, inputDataQueue, logLevel=None, logFile=None)

Parameter:

workerId (str) - Worker id.

Parameter:

userMpDataProceessor (UserMpDataProcessor) - Instance of the UserMpDataProcessor class that will be processing data.

Parameter:

commandRequestQueue (multiprocessing.Queue) - Command request queue.

Parameter:

commandResponseQueue (multiprocessing.Queue) - Command response queue.

Parameter:

inputDataQueue (multiprocessing.Queue) - Input data queue.

Parameter:

logLevel (str) - Log level; possible values: debug, info, warning, error, critical. If not provided, there will be no log output.

Parameter:

logFile (str) - Log file.

static _Popen(process_obj)
static _after_fork()
_bootstrap(parent_sentinel=None)
_check_closed()
_start_method = None
property authkey
close()

Close the Process object.

This method releases resources held by the Process object. It is an error to call this method if the child process is still running.

configure()

Method invoked at user initiated runtime configuration changes.

Parameter:

configDict (dict) - dictionary containing configuration parameters

property daemon

Return whether process is a daemon

property exitcode

Return exit code of process or None if it has yet to stop

getStats()

Method invoked periodically for generating application statistics.

Returns:

Dictionary containing application statistics parameters

property ident

Return identifier (PID) of process or None if it has yet to start

is_alive()

Return whether process is alive

join(timeout=None)

Wait until child process terminates

kill()

Terminate process; sends SIGKILL signal or uses TerminateProcess()

property name
property pid

Return identifier (PID) of process or None if it has yet to start

process(mpqObject)

Data processing method.

Parameter:

mpqObject (object) - object received from multiprocessing queue

resetStats()

Method invoked at user initiated application statistics reset.

run()

Data processing thread. It retrieves objects from the input queue and invokes user data processor process() method.

property sentinel

Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.

start()

Method invoked at processing startup.

stop()

Method invoked at processing shutdown.

Returns:

Dictionary containing application statistics parameters

terminate()

Terminate process; sends SIGTERM signal or uses TerminateProcess()

UserMpWorkerController

class pvapy.hpc.userMpWorkerController.UserMpWorkerController

Bases: HpcController

COMMAND_EXEC_DELAY = 0.1
CONFIGURE_COMMAND = 'configure'
CONTROLLER_TYPE = 'user'

Controller class for user multiprocessing worker process.

UserMpWorkerController(workerId, userMpDataProcessor, inputDataQueue, logLevel=None, logFile=None)

Parameter:

workerId (str) - Worker id.

Parameter:

userMpDataProcessor (UserMpDataProcessor) - Instance of the UserMpDataProcessor class that will be processing data.

Parameter:

inputDataQueue (multiprocessing.Queue) - Input data queue.

Parameter:

logLevel (str) - Log level; possible values: debug, info, warning, error, critical. If not provided, there will be no log output.

Parameter:

logFile (str) - Log file.

ERROR_RETURN_CODE = 1
GET_STATS_COMMAND = 'get_stats'
MIN_STATUS_UPDATE_PERIOD = 10.0
exception ProcessNotResponding(args)

Bases: Exception

__cause__

exception cause

__class__

alias of type

__context__

exception context

__delattr__(name, /)

Implement delattr(self, name).

__dict__ = mappingproxy({'__module__': 'pvapy.hpc.userMpWorkerController', '__init__': <function UserMpWorkerController.ProcessNotResponding.__init__>, '__weakref__': <attribute '__weakref__' of 'ProcessNotResponding' objects>, '__doc__': None, '__annotations__': {}})
__dir__()

Default dir() implementation.

__doc__ = None
__eq__(value, /)

Return self==value.

__format__(format_spec, /)

Default object formatter.

__ge__(value, /)

Return self>=value.

__getattribute__(name, /)

Return getattr(self, name).

__getstate__()

Helper for pickle.

__gt__(value, /)

Return self>value.

__hash__()

Return hash(self).

__init__(args)
__init_subclass__()

This method is called when a class is subclassed.

The default implementation does nothing. It may be overridden to extend subclasses.

__le__(value, /)

Return self<=value.

__lt__(value, /)

Return self<value.

__module__ = 'pvapy.hpc.userMpWorkerController'
__ne__(value, /)

Return self!=value.

__new__(**kwargs)
__reduce__()

Helper for pickle.

__reduce_ex__(protocol, /)

Helper for pickle.

__repr__()

Return repr(self).

__setattr__(name, value, /)

Implement setattr(self, name, value).

__setstate__()
__sizeof__()

Size of object in memory, in bytes.

__str__()

Return str(self).

__subclasshook__()

Abstract classes can override this to customize issubclass().

This is invoked early on by abc.ABCMeta.__subclasscheck__(). It should return True, False or NotImplemented. If it returns NotImplemented, the normal algorithm is used. Otherwise, it overrides the normal algorithm (and the outcome is cached).

__suppress_context__
__traceback__
__weakref__

list of weak references to the object

add_note()

Exception.add_note(note) – add a note to the exception

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

RESET_STATS_COMMAND = 'reset_stats'
STOP_COMMAND = 'stop'
SUCCESS_RETURN_CODE = 0
WAIT_TIME = 1.0
_invokeCommandRequest(command, args={})
classmethod _renameDictKeys(d, keyPrefix)
configure()

Method invoked at user initiated runtime configuration changes.

Parameter:

configDict (dict) - dictionary containing configuration parameters

getStats(statsKeyPrefix=None)

Method invoked periodically for generating application statistics.

Parameter:

statsKeyPrefix (str) - optional prefix to be used for all statistics parameter keys; the prefix should start with a letter or underscore, and consist of alphanumeric and underscore characters only

Returns:

Dictionary containing application statistics parameters

reportStats(statsDict=None)
resetStats()

Method invoked at user initiated application statistics reset.

start()

Method invoked at processing startup.

stop(statsKeyPrefix=None)

Method invoked at processing shutdown.

Parameter:

statsKeyPrefix (str) - optional prefix to be used for all statistics parameter keys; the prefix should start with a letter or underscore, and consist of alphanumeric and underscore characters only

Returns:

Dictionary containing application statistics parameters