296 lines
10 KiB
Cython
296 lines
10 KiB
Cython
# -*- coding: utf-8 -*-
|
|
"""!
|
|
Author: J.A. de Jong - ASCEE
|
|
|
|
Description:
|
|
|
|
Data Acquistiion (DAQ) device descriptors, and the DAQ devices themselves
|
|
|
|
"""
|
|
__all__ = ['DaqConfiguration', 'DaqConfigurations']
|
|
from ..lasp_common import lasp_shelve, SIQtys, Qty
|
|
from .lasp_device_common import DaqChannel
|
|
import json
|
|
|
|
cdef class DaqConfigurations:
|
|
cdef public:
|
|
object input_config, output_config, duplex_mode
|
|
|
|
def __init__(self, duplex_mode, DaqConfiguration input_config,
|
|
DaqConfiguration output_config):
|
|
|
|
self.input_config = input_config
|
|
self.output_config = output_config
|
|
self.duplex_mode = duplex_mode
|
|
|
|
def to_json(self):
|
|
return json.dumps(
|
|
dict(
|
|
duplex_mode = self.duplex_mode,
|
|
input_config = self.input_config.to_json(),
|
|
output_config = self.output_config.to_json(),
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def from_json(daq_configs_json):
|
|
configs_dict = json.loads(daq_configs_json)
|
|
input_config = DaqConfiguration.from_json(configs_dict['input_config'])
|
|
output_config = DaqConfiguration.from_json(configs_dict['output_config'])
|
|
return DaqConfigurations(configs_dict['duplex_mode'],
|
|
input_config,
|
|
output_config)
|
|
|
|
@staticmethod
|
|
def loadAllConfigs():
|
|
"""
|
|
Returns a dictionary of all configurations presets. The dictionary keys
|
|
are the names of the configurations
|
|
|
|
"""
|
|
with lasp_shelve() as sh:
|
|
configs_json = sh.load('daqconfigs', {})
|
|
configs = {}
|
|
for name, val in configs_json.items():
|
|
configs[name] = DaqConfigurations.from_json(val)
|
|
return configs
|
|
|
|
@staticmethod
|
|
def loadConfigs(name: str):
|
|
"""
|
|
Load a configuration preset, containing input config and output config
|
|
"""
|
|
|
|
with lasp_shelve() as sh:
|
|
configs_json = sh.load('daqconfigs', {})
|
|
return DaqConfigurations.from_json(configs_json[name])
|
|
|
|
def saveConfigs(self, name):
|
|
with lasp_shelve() as sh:
|
|
configs_json = sh.load('daqconfigs', {})
|
|
configs_json[name] = self.to_json()
|
|
sh.store('daqconfigs', configs_json)
|
|
|
|
@staticmethod
|
|
def deleteConfigs(name):
|
|
with lasp_shelve() as sh:
|
|
configs_json = sh.load('daqconfigs', {})
|
|
del configs_json[name]
|
|
sh.store('daqconfigs', configs_json)
|
|
|
|
def constructDaqConfig(dict_data):
|
|
return DaqConfiguration.from_dict(dict_data)
|
|
|
|
cdef class DaqConfiguration:
|
|
"""
|
|
Initialize a device descriptor
|
|
"""
|
|
def __cinit__(self):
|
|
pass
|
|
|
|
def __str__(self):
|
|
return str(self.to_json())
|
|
|
|
@staticmethod
|
|
def fromDeviceInfo(DeviceInfo devinfo):
|
|
cdef:
|
|
cppDaqConfiguration cconfig
|
|
|
|
d = DaqConfiguration()
|
|
cconfig = cppDaqConfiguration(devinfo.devinfo)
|
|
d.config = cconfig
|
|
return d
|
|
|
|
@staticmethod
|
|
def from_json(jsonstring):
|
|
config_dict = json.loads(jsonstring)
|
|
return DaqConfiguration.from_dict(config_dict)
|
|
|
|
def __reduce__(self):
|
|
return (constructDaqConfig, (self.to_dict(),))
|
|
|
|
@staticmethod
|
|
def from_dict(pydict):
|
|
cdef:
|
|
cppDaqConfiguration config
|
|
vector[DaqApi] apis = DaqApi.getAvailableApis()
|
|
|
|
config.api = apis[pydict['apicode']]
|
|
config.device_name = pydict['device_name'].encode('utf-8')
|
|
config.eninchannels = pydict['eninchannels']
|
|
config.enoutchannels = pydict['enoutchannels']
|
|
|
|
config.inchannel_names = [inchname.encode('utf-8') for inchname in
|
|
pydict['inchannel_names']]
|
|
|
|
config.outchannel_names = [outchname.encode('utf-8') for outchname in
|
|
pydict['outchannel_names']]
|
|
|
|
config.inchannel_sensitivities = pydict['inchannel_sensitivities']
|
|
config.outchannel_sensitivities = pydict['outchannel_sensitivities']
|
|
|
|
config.sampleRateIndex = pydict['sampleRateIndex']
|
|
config.framesPerBlockIndex = pydict['framesPerBlockIndex']
|
|
config.dataTypeIndex = pydict['dataTypeIndex']
|
|
config.monitorOutput = pydict['monitorOutput']
|
|
config.inputIEPEEnabled = pydict['inputIEPEEnabled']
|
|
config.inputACCouplingMode = pydict['inputACCouplingMode']
|
|
config.inputRangeIndices = pydict['inputRangeIndices']
|
|
config.inchannel_metadata = [inchmeta.encode('utf-8') for inchmeta in
|
|
pydict['inchannel_metadata']]
|
|
config.outchannel_metadata = [outchmeta.encode('utf-8') for outchmeta in
|
|
pydict['outchannel_metadata']]
|
|
|
|
pydaqcfg = DaqConfiguration()
|
|
pydaqcfg.config = config
|
|
|
|
return pydaqcfg
|
|
|
|
def to_dict(self):
|
|
return dict(
|
|
apicode = self.config.api.apicode,
|
|
device_name = self.config.device_name.decode('utf-8'),
|
|
|
|
eninchannels = self.eninchannels(),
|
|
enoutchannels = self.enoutchannels(),
|
|
|
|
inchannel_names = [name.decode('utf-8') for name in
|
|
self.config.inchannel_names],
|
|
|
|
outchannel_names = [name.decode('utf-8') for name in
|
|
self.config.outchannel_names],
|
|
|
|
inchannel_sensitivities = [sens for sens in
|
|
self.config.inchannel_sensitivities],
|
|
outchannel_sensitivities = [sens for sens in
|
|
self.config.outchannel_sensitivities],
|
|
|
|
inchannel_metadata = [inchmeta.decode('utf-8') for inchmeta in
|
|
self.config.inchannel_metadata],
|
|
outchannel_metadata = [outchmeta.decode('utf-8') for outchmeta in
|
|
self.config.outchannel_metadata],
|
|
|
|
sampleRateIndex = self.config.sampleRateIndex,
|
|
dataTypeIndex = self.config.dataTypeIndex,
|
|
framesPerBlockIndex = self.config.framesPerBlockIndex,
|
|
monitorOutput = self.config.monitorOutput,
|
|
|
|
inputIEPEEnabled = self.config.inputIEPEEnabled,
|
|
inputACCouplingMode = self.config.inputACCouplingMode,
|
|
inputRangeIndices = self.config.inputRangeIndices,
|
|
)
|
|
|
|
def to_json(self):
|
|
return json.dumps(self.to_dict())
|
|
|
|
def getInChannel(self, i:int):
|
|
return DaqChannel(
|
|
channel_enabled=self.config.eninchannels.at(i),
|
|
channel_name=self.config.inchannel_names.at(i).decode('utf-8'),
|
|
sensitivity=self.config.inchannel_sensitivities.at(i),
|
|
range_index=self.config.inputRangeIndices.at(i),
|
|
ACCoupling_enabled=self.config.inputACCouplingMode.at(i),
|
|
IEPE_enabled=self.config.inputIEPEEnabled.at(i),
|
|
channel_metadata=self.config.inchannel_metadata.at(i).decode('utf-8'),
|
|
)
|
|
def getOutChannel(self, i:int):
|
|
return DaqChannel(
|
|
channel_enabled=self.config.enoutchannels.at(i),
|
|
channel_name=self.config.outchannel_names.at(i).decode('utf-8'),
|
|
channel_metadata=self.config.outchannel_metadata.at(i).decode('utf-8'),
|
|
)
|
|
|
|
def setInChannel(self, i:int, ch: DaqChannel):
|
|
self.config.eninchannels[i] = ch.channel_enabled
|
|
self.config.inchannel_names[i] = ch.channel_name.encode('utf-8')
|
|
self.config.inchannel_sensitivities[i] = ch.sensitivity
|
|
self.config.inputRangeIndices[i] = ch.range_index
|
|
self.config.inputACCouplingMode[i] = ch.ACCoupling_enabled
|
|
self.config.inputIEPEEnabled[i] = ch.IEPE_enabled
|
|
self.config.inchannel_metadata[i] = ch.channel_metadata.encode('utf-8')
|
|
|
|
def setOutChannel(self, i:int, ch: DaqChannel):
|
|
self.config.enoutchannels[i] = ch.channel_enabled
|
|
self.config.outchannel_names[i] = ch.channel_name.encode('utf-8')
|
|
self.config.outchannel_metadata[i] = ch.channel_metadata.encode('utf-8')
|
|
|
|
def getEnabledInChannels(self, include_monitor=True):
|
|
inch = []
|
|
for i, enabled in enumerate(self.config.eninchannels):
|
|
if enabled:
|
|
inch.append(self.getInChannel(i))
|
|
if include_monitor:
|
|
outch = self.getEnabledOutChannels()
|
|
if len(outch) > 0 and self.monitorOutput:
|
|
inch.insert(0, outch[0])
|
|
|
|
return inch
|
|
|
|
def getEnabledOutChannels(self):
|
|
outch = []
|
|
for i, enabled in enumerate(self.config.enoutchannels):
|
|
if enabled:
|
|
outch.append(self.getOutChannel(i))
|
|
return outch
|
|
|
|
@property
|
|
def api(self):
|
|
return self.config.api.apiname.decode('utf-8')
|
|
|
|
@api.setter
|
|
def api(self, apitxt):
|
|
cdef:
|
|
vector[DaqApi] apis = DaqApi.getAvailableApis()
|
|
for api in apis:
|
|
if api.apiname.decode('utf-8') == apitxt:
|
|
self.config.api = api
|
|
return
|
|
raise RuntimeError(f'Api {apitxt} unavailable')
|
|
|
|
def eninchannels(self):
|
|
return self.config.eninchannels
|
|
|
|
def enoutchannels(self):
|
|
return self.config.enoutchannels
|
|
|
|
@property
|
|
def sampleRateIndex(self):
|
|
return self.config.sampleRateIndex
|
|
|
|
@sampleRateIndex.setter
|
|
def sampleRateIndex(self, int idx):
|
|
self.config.sampleRateIndex = idx
|
|
|
|
@property
|
|
def dataTypeIndex(self):
|
|
return self.config.dataTypeIndex
|
|
|
|
@dataTypeIndex.setter
|
|
def dataTypeIndex(self, int idx):
|
|
self.config.dataTypeIndex = idx
|
|
|
|
@property
|
|
def framesPerBlockIndex(self):
|
|
return self.config.framesPerBlockIndex
|
|
|
|
@framesPerBlockIndex.setter
|
|
def framesPerBlockIndex(self, int idx):
|
|
self.config.framesPerBlockIndex = idx
|
|
|
|
@property
|
|
def monitorOutput(self):
|
|
return self.config.monitorOutput
|
|
|
|
@monitorOutput.setter
|
|
def monitorOutput(self, bool idx):
|
|
self.config.monitorOutput = idx
|
|
|
|
@property
|
|
def device_name(self):
|
|
return self.config.device_name.decode('utf-8')
|
|
|
|
@device_name.setter
|
|
def device_name(self, idx):
|
|
self.config.device_name = idx.encode('utf-8')
|
|
|