I'm working on optimizing a PyQt application (to control tensile testing machine) that's sometimes slow. The application uses two QThreads, both running in a loop with time.sleep inside.
- One QThread (AnalogReaderThread) continuously reads analog values, performs calculations and controls motors.
- The other QThread (VideoThread) grabs video frames and tracks markers. The coordinates of these markers need to be sent to the first thread.
The typical cycle time is about 0.1 seconds. These two QThreads work asynchronously at different frequencies, and I'm considering how best to organize communication between them.
I have two potential solutions in mind:
- The VideoThread sends a signal (though signal/slot mechanism) every cycle with marker coordinates, which are stored in temporary variables in the first thread and accessed when needed.
- The VideoThread has a getter that returns the last known marker coordinates.
I've made two minimal examples that outline my implementation. Please let me know your ideas:
Signal/slots used for asynchronous communication:
import sys
import time
import random
import numpy as np
from PyQt5.QtCore import QThread, pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication, QLabel, QMainWindow, QVBoxLayout, QWidget
class AnalogReaderThread(QThread):
update_analog_data = pyqtSignal(float, float)
def __init__(self, thread):
super().__init__()
self._thread = thread
self._thread.update_image_data.connect(self.update_image_data)
self._mean_brightness = 0
def run(self):
while True:
analog_data = self.read_analog_data()
self.update_analog_data.emit(analog_data, self._mean_brightness)
time.sleep(1)
def read_analog_data(self):
return random.uniform(0, 100)
def update_image_data(self, mean_brightness):
self._mean_brightness = mean_brightness
class VideoThread(QThread):
update_image_data = pyqtSignal(float)
def run(self):
while True:
image_data = self.read_image_data()
mean_brightness = self.process_image(image_data)
self.update_image_data.emit(mean_brightness)
time.sleep(0.5)
def read_image_data(self):
# Generate a random image (100x100 pixels, 3 color channels)
image = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8)
return image
def process_image(self, image_data):
# Convert to grayscale
grayscale = np.mean(image_data, axis=2)
# Calculate mean brightness
mean_brightness = np.mean(grayscale)
return mean_brightness
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
self.setWindowTitle("Multi-QThread Example")
self.setGeometry(100, 100, 400, 300)
self.analog_label = QLabel("Analog data: --", self)
self.image_label = QLabel("Mean brightness: --", self)
layout = QVBoxLayout()
layout.addWidget(self.analog_label)
layout.addWidget(self.image_label)
container = QWidget()
container.setLayout(layout)
self.setCentralWidget(container)
self.video_thread = VideoThread()
self.video_thread.start()
self.analog_thread = AnalogReaderThread(self.video_thread)
self.analog_thread.update_analog_data.connect(self.update_analog_label)
self.analog_thread.start()
@pyqtSlot(float, float)
def update_analog_label(self, data, mean_brightness):
self.analog_label.setText(f"Analog data: {data:.2f}")
self.image_label.setText(f"Mean brightness: {mean_brightness:.2f}")
if __name__ == "__main__":
app = QApplication(sys.argv)
mainWin = MainWindow()
mainWin.show()
sys.exit(app.exec_())
And getters:
import sys
import time
import random
import numpy as np
from PyQt5.QtCore import QThread, pyqtSignal, pyqtSlot, QMutex
from PyQt5.QtWidgets import QApplication, QMainWindow, QLabel, QVBoxLayout, QWidget
class AnalogReaderThread(QThread):
update_analog_data = pyqtSignal(float, float)
def __init__(self, video_thread):
super().__init__()
self._video_thread = video_thread
def run(self):
while True:
analog_data = self.read_analog_data()
mean_brightness = self._video_thread.get_mean_brightness()
self.update_analog_data.emit(analog_data, mean_brightness)
time.sleep(1)
def read_analog_data(self):
return random.uniform(0, 100)
class VideoThread(QThread):
update_image_data = pyqtSignal(float)
def __init__(self):
super().__init__()
self._mean_brightness = 0
self._mutex = QMutex()
def run(self):
while True:
image_data = self.read_image_data()
mean_brightness = self.process_image(image_data)
self._mutex.lock()
self._mean_brightness = mean_brightness
self._mutex.unlock()
self.update_image_data.emit(self._mean_brightness)
time.sleep(0.5)
def read_image_data(self):
# Generate a random image (100x100 pixels, 3 color channels)
image = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8)
return image
def process_image(self, image_data):
# Convert to grayscale
grayscale = np.mean(image_data, axis=2)
# Calculate mean brightness
mean_brightness = np.mean(grayscale)
return mean_brightness
def get_mean_brightness(self):
self._mutex.lock()
value = self._mean_brightness
self._mutex.unlock()
return value
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
self.setWindowTitle("Multi-QThread Example")
self.setGeometry(100, 100, 400, 300)
self.analog_label = QLabel("Analog data: --", self)
self.image_label = QLabel("Mean brightness: --", self)
layout = QVBoxLayout()
layout.addWidget(self.analog_label)
layout.addWidget(self.image_label)
container = QWidget()
container.setLayout(layout)
self.setCentralWidget(container)
self.video_thread = VideoThread()
self.video_thread.start()
self.analog_thread = AnalogReaderThread(self.video_thread)
self.analog_thread.update_analog_data.connect(self.update_analog_label)
self.analog_thread.start()
@pyqtSlot(float, float)
def update_analog_label(self, data, mean_brightness):
self.analog_label.setText(f"Analog data: {data:.2f}")
self.image_label.setText(f"Mean brightness: {mean_brightness:.2f}")
if __name__ == "__main__":
app = QApplication(sys.argv)
mainWin = MainWindow()
mainWin.show()
sys.exit(app.exec_())
_mean_brightness
, as you're not modifying its contents, you always overwrite its reference.readyRead()
signal to receive responces