diff --git a/lasp/lasp_avstream.py b/lasp/lasp_avstream.py index 03a8071..0ec6108 100644 --- a/lasp/lasp_avstream.py +++ b/lasp/lasp_avstream.py @@ -231,7 +231,12 @@ class AvStreamProcess(mp.Process): self.siggen_activated = Atomic(False) while True: - msg, data = self.pipe.recv() + try: + msg, data = self.pipe.recv() + except OSError: + logging.error("Error with pipe, terminating process") + self.stopAllStreams() + self.terminate() logging.debug(f"Streamprocess obtained message {msg}") if msg == StreamMsg.activateSiggen: @@ -419,18 +424,28 @@ class AvStreamProcess(mp.Process): # explanation. def sendInQueues(self, msg, *data): # logging.debug('sendInQueues()') - for q in self.indata_qlist: - # Fan out the input data to all queues in the queue list - q.put((msg, data)) + try: + for q in self.indata_qlist: + # Fan out the input data to all queues in the queue list + q.put((msg, data)) + except ValueError: + logging.error("Error with data queue, terminating process") + self.stopAllStreams() + self.terminate() def sendAllQueues(self, msg, *data): """ Destined for all queues, including capture data queues """ self.sendInQueues(msg, *data) - for q in self.msg_qlist: - # Fan out the input data to all queues in the queue list - q.put((msg, data)) + try: + for q in self.msg_qlist: + # Fan out the input data to all queues in the queue list + q.put((msg, data)) + except ValueError: + logging.error("Error with data queue, terminating process") + self.stopAllStreams() + self.terminate() @dataclass diff --git a/lasp/lasp_siggen.py b/lasp/lasp_siggen.py index 1f73647..7a0a2b0 100644 --- a/lasp/lasp_siggen.py +++ b/lasp/lasp_siggen.py @@ -181,7 +181,13 @@ class SiggenProcess(mp.Process): if np.issubdtype(dtype, np.integer): bitdepth_fixed = dtype.itemsize * 8 signal *= 2 ** (bitdepth_fixed - 1) - 1 - self.dataq.put(signal.astype(dtype)) + try: + self.dataq.put(signal.astype(dtype)) + except ValueError: + # As of Python 3.8, a value error on a Queue means that the oter + # end of the process died. + logging.error("Error with data queue, terminating process") + self.terminate() def newEqualizer(self, eqdata): """ @@ -207,6 +213,13 @@ class SiggenProcess(mp.Process): eq.setLevels(eq_levels) return eq + def sendPipe(self, msgtype, msg): + try: + self.pipe.send((msgtype, msg)) + except OSError: + logging.error("Error with pipe, terminating process") + self.terminate() + def run(self): # The main function of the actual process # First things first @@ -215,23 +228,28 @@ class SiggenProcess(mp.Process): try: self.siggen = self.newSiggen(self.siggendata) except Exception as e: - self.pipe.send((SiggenMessage.error, str(e))) + self.sendPipe(SiggenMessage.error, str(e)) try: self.eq = self.newEqualizer(self.siggendata.eqdata) except Exception as e: - self.pipe.send((SiggenMessage.error, str(e))) + self.sendPipe(SiggenMessage.error, str(e)) # Pre-generate blocks of signal data while self.dataq.qsize() < self.nblocks_buffer: self.generate() - self.pipe.send((SiggenMessage.ready, None)) + self.sendPipe(SiggenMessage.ready, None) while True: # Wait here for a while, to check for messages to consume if self.pipe.poll(timeout=QUEUE_BUFFER_TIME / 4): - msg, data = self.pipe.recv() + try: + msg, data = self.pipe.recv() + except OSError: + logging.error("Error with pipe, terminating process") + self.terminate() + if msg == SiggenMessage.endProcess: logging.debug("Signal generator caught 'endProcess' message. Exiting.") return 0 @@ -254,7 +272,7 @@ class SiggenProcess(mp.Process): try: self.generate() except SiggenWorkerDone: - self.pipe.send(SiggenMessage.done) + self.sendPipe(SiggenMessage.done, None) return 0 return 1