lasp/lasp/device/lasp_daqconfig.pyx

296 lines
10 KiB
Cython

# -*- coding: utf-8 -*-
"""!
Author: J.A. de Jong - ASCEE
Description:
Data Acquistiion (DAQ) device descriptors, and the DAQ devices themselves
"""
__all__ = ['DaqConfiguration', 'DaqConfigurations']
from ..lasp_common import lasp_shelve, SIQtys, Qty
from .lasp_device_common import DaqChannel
import json
cdef class DaqConfigurations:
cdef public:
object input_config, output_config, duplex_mode
def __init__(self, duplex_mode, DaqConfiguration input_config,
DaqConfiguration output_config):
self.input_config = input_config
self.output_config = output_config
self.duplex_mode = duplex_mode
def to_json(self):
return json.dumps(
dict(
duplex_mode = self.duplex_mode,
input_config = self.input_config.to_json(),
output_config = self.output_config.to_json(),
)
)
@staticmethod
def from_json(daq_configs_json):
configs_dict = json.loads(daq_configs_json)
input_config = DaqConfiguration.from_json(configs_dict['input_config'])
output_config = DaqConfiguration.from_json(configs_dict['output_config'])
return DaqConfigurations(configs_dict['duplex_mode'],
input_config,
output_config)
@staticmethod
def loadAllConfigs():
"""
Returns a dictionary of all configurations presets. The dictionary keys
are the names of the configurations
"""
with lasp_shelve() as sh:
configs_json = sh.load('daqconfigs', {})
configs = {}
for name, val in configs_json.items():
configs[name] = DaqConfigurations.from_json(val)
return configs
@staticmethod
def loadConfigs(name: str):
"""
Load a configuration preset, containing input config and output config
"""
with lasp_shelve() as sh:
configs_json = sh.load('daqconfigs', {})
return DaqConfigurations.from_json(configs_json[name])
def saveConfigs(self, name):
with lasp_shelve() as sh:
configs_json = sh.load('daqconfigs', {})
configs_json[name] = self.to_json()
sh.store('daqconfigs', configs_json)
@staticmethod
def deleteConfigs(name):
with lasp_shelve() as sh:
configs_json = sh.load('daqconfigs', {})
del configs_json[name]
sh.store('daqconfigs', configs_json)
def constructDaqConfig(dict_data):
return DaqConfiguration.from_dict(dict_data)
cdef class DaqConfiguration:
"""
Initialize a device descriptor
"""
def __cinit__(self):
pass
def __str__(self):
return str(self.to_json())
@staticmethod
def fromDeviceInfo(DeviceInfo devinfo):
cdef:
cppDaqConfiguration cconfig
d = DaqConfiguration()
cconfig = cppDaqConfiguration(devinfo.devinfo)
d.config = cconfig
return d
@staticmethod
def from_json(jsonstring):
config_dict = json.loads(jsonstring)
return DaqConfiguration.from_dict(config_dict)
def __reduce__(self):
return (constructDaqConfig, (self.to_dict(),))
@staticmethod
def from_dict(pydict):
cdef:
cppDaqConfiguration config
vector[DaqApi] apis = DaqApi.getAvailableApis()
config.api = apis[pydict['apicode']]
config.device_name = pydict['device_name'].encode('utf-8')
config.eninchannels = pydict['eninchannels']
config.enoutchannels = pydict['enoutchannels']
config.inchannel_names = [inchname.encode('utf-8') for inchname in
pydict['inchannel_names']]
config.outchannel_names = [outchname.encode('utf-8') for outchname in
pydict['outchannel_names']]
config.inchannel_sensitivities = pydict['inchannel_sensitivities']
config.outchannel_sensitivities = pydict['outchannel_sensitivities']
config.sampleRateIndex = pydict['sampleRateIndex']
config.framesPerBlockIndex = pydict['framesPerBlockIndex']
config.dataTypeIndex = pydict['dataTypeIndex']
config.monitorOutput = pydict['monitorOutput']
config.inputIEPEEnabled = pydict['inputIEPEEnabled']
config.inputACCouplingMode = pydict['inputACCouplingMode']
config.inputRangeIndices = pydict['inputRangeIndices']
config.inchannel_metadata = [inchmeta.encode('utf-8') for inchmeta in
pydict['inchannel_metadata']]
config.outchannel_metadata = [outchmeta.encode('utf-8') for outchmeta in
pydict['outchannel_metadata']]
pydaqcfg = DaqConfiguration()
pydaqcfg.config = config
return pydaqcfg
def to_dict(self):
return dict(
apicode = self.config.api.apicode,
device_name = self.config.device_name.decode('utf-8'),
eninchannels = self.eninchannels(),
enoutchannels = self.enoutchannels(),
inchannel_names = [name.decode('utf-8') for name in
self.config.inchannel_names],
outchannel_names = [name.decode('utf-8') for name in
self.config.outchannel_names],
inchannel_sensitivities = [sens for sens in
self.config.inchannel_sensitivities],
outchannel_sensitivities = [sens for sens in
self.config.outchannel_sensitivities],
inchannel_metadata = [inchmeta.decode('utf-8') for inchmeta in
self.config.inchannel_metadata],
outchannel_metadata = [outchmeta.decode('utf-8') for outchmeta in
self.config.outchannel_metadata],
sampleRateIndex = self.config.sampleRateIndex,
dataTypeIndex = self.config.dataTypeIndex,
framesPerBlockIndex = self.config.framesPerBlockIndex,
monitorOutput = self.config.monitorOutput,
inputIEPEEnabled = self.config.inputIEPEEnabled,
inputACCouplingMode = self.config.inputACCouplingMode,
inputRangeIndices = self.config.inputRangeIndices,
)
def to_json(self):
return json.dumps(self.to_dict())
def getInChannel(self, i:int):
return DaqChannel(
channel_enabled=self.config.eninchannels.at(i),
channel_name=self.config.inchannel_names.at(i).decode('utf-8'),
sensitivity=self.config.inchannel_sensitivities.at(i),
range_index=self.config.inputRangeIndices.at(i),
ACCoupling_enabled=self.config.inputACCouplingMode.at(i),
IEPE_enabled=self.config.inputIEPEEnabled.at(i),
channel_metadata=self.config.inchannel_metadata.at(i).decode('utf-8'),
)
def getOutChannel(self, i:int):
return DaqChannel(
channel_enabled=self.config.enoutchannels.at(i),
channel_name=self.config.outchannel_names.at(i).decode('utf-8'),
channel_metadata=self.config.outchannel_metadata.at(i).decode('utf-8'),
)
def setInChannel(self, i:int, ch: DaqChannel):
self.config.eninchannels[i] = ch.channel_enabled
self.config.inchannel_names[i] = ch.channel_name.encode('utf-8')
self.config.inchannel_sensitivities[i] = ch.sensitivity
self.config.inputRangeIndices[i] = ch.range_index
self.config.inputACCouplingMode[i] = ch.ACCoupling_enabled
self.config.inputIEPEEnabled[i] = ch.IEPE_enabled
self.config.inchannel_metadata[i] = ch.channel_metadata.encode('utf-8')
def setOutChannel(self, i:int, ch: DaqChannel):
self.config.enoutchannels[i] = ch.channel_enabled
self.config.outchannel_names[i] = ch.channel_name.encode('utf-8')
self.config.outchannel_metadata[i] = ch.channel_metadata.encode('utf-8')
def getEnabledInChannels(self, include_monitor=True):
inch = []
for i, enabled in enumerate(self.config.eninchannels):
if enabled:
inch.append(self.getInChannel(i))
if include_monitor:
outch = self.getEnabledOutChannels()
if len(outch) > 0 and self.monitorOutput:
inch.insert(0, outch[0])
return inch
def getEnabledOutChannels(self):
outch = []
for i, enabled in enumerate(self.config.enoutchannels):
if enabled:
outch.append(self.getOutChannel(i))
return outch
@property
def api(self):
return self.config.api.apiname.decode('utf-8')
@api.setter
def api(self, apitxt):
cdef:
vector[DaqApi] apis = DaqApi.getAvailableApis()
for api in apis:
if api.apiname.decode('utf-8') == apitxt:
self.config.api = api
return
raise RuntimeError(f'Api {apitxt} unavailable')
def eninchannels(self):
return self.config.eninchannels
def enoutchannels(self):
return self.config.enoutchannels
@property
def sampleRateIndex(self):
return self.config.sampleRateIndex
@sampleRateIndex.setter
def sampleRateIndex(self, int idx):
self.config.sampleRateIndex = idx
@property
def dataTypeIndex(self):
return self.config.dataTypeIndex
@dataTypeIndex.setter
def dataTypeIndex(self, int idx):
self.config.dataTypeIndex = idx
@property
def framesPerBlockIndex(self):
return self.config.framesPerBlockIndex
@framesPerBlockIndex.setter
def framesPerBlockIndex(self, int idx):
self.config.framesPerBlockIndex = idx
@property
def monitorOutput(self):
return self.config.monitorOutput
@monitorOutput.setter
def monitorOutput(self, bool idx):
self.config.monitorOutput = idx
@property
def device_name(self):
return self.config.device_name.decode('utf-8')
@device_name.setter
def device_name(self, idx):
self.config.device_name = idx.encode('utf-8')