174 lines
5.0 KiB
Python
174 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""!
|
|
Author: J.A. de Jong - ASCEE
|
|
|
|
Description:
|
|
|
|
Data Acquistiion (DAQ) device descriptors, and the DAQ devices themselves
|
|
|
|
"""
|
|
from dataclasses import dataclass, field
|
|
from dataclasses_json import dataclass_json
|
|
import numpy as np
|
|
from ..lasp_common import lasp_shelve, Qty
|
|
|
|
|
|
@dataclass
|
|
class DeviceInfo:
|
|
api: int
|
|
index: int
|
|
probed: bool
|
|
name: str
|
|
outputchannels: int
|
|
inputchannels: int
|
|
duplexchannels: int
|
|
samplerates: list
|
|
sampleformats: list
|
|
prefsamplerate: int
|
|
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass
|
|
class DAQChannel:
|
|
channel_enabled: bool
|
|
channel_name: str
|
|
sensitivity: float
|
|
unit: Qty
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass
|
|
class DAQConfiguration:
|
|
"""
|
|
Initialize a device descriptor
|
|
|
|
Args:
|
|
duplex_mode: Set device to duplex mode, if possible
|
|
monitor_gen: If set to true, add monitor channel to recording.
|
|
outputDelayBlocks: number of blocks to delay output stream when added
|
|
to input for monitoring the output synchronously with input.
|
|
|
|
input_device_name: ASCII name with which to open the device when connected
|
|
outut_device_name: ASCII name with which to open the device when connected
|
|
|
|
==============================
|
|
en_format: index of the format in the list of sample formats
|
|
en_input_rate: index of enabled input sampling frequency [Hz]
|
|
in the list of frequencies.
|
|
input_channel_configs: list of channel indices which are used to
|
|
acquire data from.
|
|
input_sensitivity: List of sensitivity values, in units of [Pa^-1]
|
|
input_gain_settings: If a DAQ supports it, list of indices which
|
|
corresponds to a position in the possible input
|
|
gains for each channel. Should only be not equal
|
|
to None when the hardware supports changing the
|
|
input gain.
|
|
en_output_rate: index in the list of possible output sampling
|
|
frequencies.
|
|
en_output_channels: list of enabled output channels
|
|
===============================
|
|
|
|
|
|
"""
|
|
api: int
|
|
duplex_mode: bool
|
|
|
|
input_device_name: str
|
|
output_device_name: str
|
|
|
|
en_input_sample_format: str
|
|
en_output_sample_format: str
|
|
en_input_rate: int
|
|
en_output_rate: int
|
|
|
|
input_channel_configs: list
|
|
output_channel_configs: list
|
|
monitor_gen: bool
|
|
|
|
outputDelayBlocks: int
|
|
nFramesPerBlock: int
|
|
|
|
def firstEnabledInputChannelNumber(self):
|
|
"""
|
|
Returns the channel number of the first enabled channel. Returns -1 if
|
|
no channels are enabled.
|
|
"""
|
|
for i, ch in enumerate(self.input_channel_configs):
|
|
if ch.channel_enabled:
|
|
return i
|
|
return -1
|
|
|
|
def firstEnabledOutputChannelNumber(self):
|
|
"""
|
|
Returns the channel number of the first enabled output channel. Returns -1 if
|
|
no channels are enabled.
|
|
"""
|
|
for i, ch in enumerate(self.output_channel_configs):
|
|
if ch.channel_enabled:
|
|
return i
|
|
return -1
|
|
|
|
def lastEnabledInputChannelNumber(self):
|
|
last = -1
|
|
for i, ch in enumerate(self.input_channel_configs):
|
|
if ch.channel_enabled:
|
|
last = i
|
|
return last
|
|
|
|
def lastEnabledOutputChannelNumber(self):
|
|
last = -1
|
|
for i, ch in enumerate(self.output_channel_configs):
|
|
print(ch)
|
|
if ch.channel_enabled:
|
|
last = i
|
|
return last
|
|
|
|
def getEnabledInputChannels(self):
|
|
en_channels = []
|
|
for chan in self.input_channel_configs:
|
|
if chan.channel_enabled:
|
|
en_channels.append(chan)
|
|
return en_channels
|
|
|
|
def getEnabledInputChannelNames(self):
|
|
return [ch.channel_name for ch in self.getEnabledInputChannels()]
|
|
|
|
def getEnabledInputChannelSensitivities(self):
|
|
return [float(channel.sensitivity) for channel in
|
|
self.getEnabledInputChannels()]
|
|
|
|
def getEnabledOutputChannels(self):
|
|
en_channels = []
|
|
for chan in self.output_channel_configs:
|
|
if chan.channel_enabled:
|
|
en_channels.append(chan)
|
|
return en_channels
|
|
|
|
def getEnabledOutputChannelSensitivities(self):
|
|
return [float(channel.sensitivity) for channel in
|
|
self.getEnabledOutputChannels()]
|
|
|
|
@staticmethod
|
|
def loadConfigs():
|
|
"""
|
|
Returns a list of currently available configurations
|
|
"""
|
|
with lasp_shelve() as sh:
|
|
return sh.load('daqconfigs', {})
|
|
|
|
def saveConfig(self, name):
|
|
with lasp_shelve() as sh:
|
|
cur_configs = self.loadConfigs()
|
|
cur_configs[name] = self
|
|
sh.store('daqconfigs', cur_configs)
|
|
|
|
@staticmethod
|
|
def deleteConfig(name):
|
|
with lasp_shelve() as sh:
|
|
cur_configs = DAQConfiguration.loadConfigs()
|
|
del cur_configs[name]
|
|
sh.store('daqconfigs', cur_configs)
|
|
|