Terminate processes when the pipes are broken, or the queues are raising exceptions. Helps with cleaning up orphaned processes when the software crashes.

This commit is contained in:
Anne de Jong 2021-10-16 13:29:42 +02:00
parent 6f782f237e
commit 921c1b067f
2 changed files with 46 additions and 13 deletions

View File

@ -231,7 +231,12 @@ class AvStreamProcess(mp.Process):
self.siggen_activated = Atomic(False) self.siggen_activated = Atomic(False)
while True: while True:
try:
msg, data = self.pipe.recv() msg, data = self.pipe.recv()
except OSError:
logging.error("Error with pipe, terminating process")
self.stopAllStreams()
self.terminate()
logging.debug(f"Streamprocess obtained message {msg}") logging.debug(f"Streamprocess obtained message {msg}")
if msg == StreamMsg.activateSiggen: if msg == StreamMsg.activateSiggen:
@ -419,18 +424,28 @@ class AvStreamProcess(mp.Process):
# explanation. # explanation.
def sendInQueues(self, msg, *data): def sendInQueues(self, msg, *data):
# logging.debug('sendInQueues()') # logging.debug('sendInQueues()')
try:
for q in self.indata_qlist: for q in self.indata_qlist:
# Fan out the input data to all queues in the queue list # Fan out the input data to all queues in the queue list
q.put((msg, data)) q.put((msg, data))
except ValueError:
logging.error("Error with data queue, terminating process")
self.stopAllStreams()
self.terminate()
def sendAllQueues(self, msg, *data): def sendAllQueues(self, msg, *data):
""" """
Destined for all queues, including capture data queues Destined for all queues, including capture data queues
""" """
self.sendInQueues(msg, *data) self.sendInQueues(msg, *data)
try:
for q in self.msg_qlist: for q in self.msg_qlist:
# Fan out the input data to all queues in the queue list # Fan out the input data to all queues in the queue list
q.put((msg, data)) q.put((msg, data))
except ValueError:
logging.error("Error with data queue, terminating process")
self.stopAllStreams()
self.terminate()
@dataclass @dataclass

View File

@ -181,7 +181,13 @@ class SiggenProcess(mp.Process):
if np.issubdtype(dtype, np.integer): if np.issubdtype(dtype, np.integer):
bitdepth_fixed = dtype.itemsize * 8 bitdepth_fixed = dtype.itemsize * 8
signal *= 2 ** (bitdepth_fixed - 1) - 1 signal *= 2 ** (bitdepth_fixed - 1) - 1
try:
self.dataq.put(signal.astype(dtype)) self.dataq.put(signal.astype(dtype))
except ValueError:
# As of Python 3.8, a value error on a Queue means that the oter
# end of the process died.
logging.error("Error with data queue, terminating process")
self.terminate()
def newEqualizer(self, eqdata): def newEqualizer(self, eqdata):
""" """
@ -207,6 +213,13 @@ class SiggenProcess(mp.Process):
eq.setLevels(eq_levels) eq.setLevels(eq_levels)
return eq return eq
def sendPipe(self, msgtype, msg):
try:
self.pipe.send((msgtype, msg))
except OSError:
logging.error("Error with pipe, terminating process")
self.terminate()
def run(self): def run(self):
# The main function of the actual process # The main function of the actual process
# First things first # First things first
@ -215,23 +228,28 @@ class SiggenProcess(mp.Process):
try: try:
self.siggen = self.newSiggen(self.siggendata) self.siggen = self.newSiggen(self.siggendata)
except Exception as e: except Exception as e:
self.pipe.send((SiggenMessage.error, str(e))) self.sendPipe(SiggenMessage.error, str(e))
try: try:
self.eq = self.newEqualizer(self.siggendata.eqdata) self.eq = self.newEqualizer(self.siggendata.eqdata)
except Exception as e: except Exception as e:
self.pipe.send((SiggenMessage.error, str(e))) self.sendPipe(SiggenMessage.error, str(e))
# Pre-generate blocks of signal data # Pre-generate blocks of signal data
while self.dataq.qsize() < self.nblocks_buffer: while self.dataq.qsize() < self.nblocks_buffer:
self.generate() self.generate()
self.pipe.send((SiggenMessage.ready, None)) self.sendPipe(SiggenMessage.ready, None)
while True: while True:
# Wait here for a while, to check for messages to consume # Wait here for a while, to check for messages to consume
if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4): if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4):
try:
msg, data = self.pipe.recv() msg, data = self.pipe.recv()
except OSError:
logging.error("Error with pipe, terminating process")
self.terminate()
if msg == SiggenMessage.endProcess: if msg == SiggenMessage.endProcess:
logging.debug("Signal generator caught 'endProcess' message. Exiting.") logging.debug("Signal generator caught 'endProcess' message. Exiting.")
return 0 return 0
@ -254,7 +272,7 @@ class SiggenProcess(mp.Process):
try: try:
self.generate() self.generate()
except SiggenWorkerDone: except SiggenWorkerDone:
self.pipe.send(SiggenMessage.done) self.sendPipe(SiggenMessage.done, None)
return 0 return 0
return 1 return 1