0

I'm working on optimizing a PyQt application (to control tensile testing machine) that's sometimes slow. The application uses two QThreads, both running in a loop with time.sleep inside.

  1. One QThread (AnalogReaderThread) continuously reads analog values, performs calculations and controls motors.
  2. The other QThread (VideoThread) grabs video frames and tracks markers. The coordinates of these markers need to be sent to the first thread.

The typical cycle time is about 0.1 seconds. These two QThreads work asynchronously at different frequencies, and I'm considering how best to organize communication between them.

I have two potential solutions in mind:

  1. The VideoThread sends a signal (though signal/slot mechanism) every cycle with marker coordinates, which are stored in temporary variables in the first thread and accessed when needed.
  2. The VideoThread has a getter that returns the last known marker coordinates.

I've made two minimal examples that outline my implementation. Please let me know your ideas:

Signal/slots used for asynchronous communication:

import sys
import time
import random
import numpy as np
from PyQt5.QtCore import QThread, pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QApplication, QLabel, QMainWindow, QVBoxLayout, QWidget

class AnalogReaderThread(QThread):
    update_analog_data = pyqtSignal(float, float)

    def __init__(self, thread):
        super().__init__()
        self._thread = thread
        self._thread.update_image_data.connect(self.update_image_data)
        self._mean_brightness = 0
    def run(self):
        while True:
            analog_data = self.read_analog_data()
            self.update_analog_data.emit(analog_data, self._mean_brightness)
            time.sleep(1)

    def read_analog_data(self):
        return random.uniform(0, 100)
    
    def update_image_data(self, mean_brightness):
        self._mean_brightness = mean_brightness

class VideoThread(QThread):
    update_image_data = pyqtSignal(float)

    def run(self):
        while True:
            image_data = self.read_image_data()
            mean_brightness = self.process_image(image_data)
            self.update_image_data.emit(mean_brightness)
            time.sleep(0.5)

    def read_image_data(self):
        # Generate a random image (100x100 pixels, 3 color channels)
        image = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8)
        return image

    def process_image(self, image_data):
        # Convert to grayscale
        grayscale = np.mean(image_data, axis=2)
        # Calculate mean brightness
        mean_brightness = np.mean(grayscale)
        return mean_brightness

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        self.setWindowTitle("Multi-QThread Example")
        self.setGeometry(100, 100, 400, 300)

        self.analog_label = QLabel("Analog data: --", self)
        self.image_label = QLabel("Mean brightness: --", self)

        layout = QVBoxLayout()
        layout.addWidget(self.analog_label)
        layout.addWidget(self.image_label)

        container = QWidget()
        container.setLayout(layout)
        self.setCentralWidget(container)

        self.video_thread = VideoThread()
        self.video_thread.start()

        self.analog_thread = AnalogReaderThread(self.video_thread)
        self.analog_thread.update_analog_data.connect(self.update_analog_label)
        self.analog_thread.start()

    @pyqtSlot(float, float)
    def update_analog_label(self, data, mean_brightness):
        self.analog_label.setText(f"Analog data: {data:.2f}")
        self.image_label.setText(f"Mean brightness: {mean_brightness:.2f}")

        

if __name__ == "__main__":
    app = QApplication(sys.argv)
    mainWin = MainWindow()
    mainWin.show()
    sys.exit(app.exec_())

And getters:

import sys
import time
import random
import numpy as np
from PyQt5.QtCore import QThread, pyqtSignal, pyqtSlot, QMutex
from PyQt5.QtWidgets import QApplication, QMainWindow, QLabel, QVBoxLayout, QWidget

class AnalogReaderThread(QThread):
    update_analog_data = pyqtSignal(float, float)

    def __init__(self, video_thread):
        super().__init__()
        self._video_thread = video_thread

    def run(self):
        while True:
            analog_data = self.read_analog_data()
            mean_brightness = self._video_thread.get_mean_brightness()
            self.update_analog_data.emit(analog_data, mean_brightness)
            time.sleep(1)

    def read_analog_data(self):
        return random.uniform(0, 100)

class VideoThread(QThread):
    update_image_data = pyqtSignal(float)

    def __init__(self):
        super().__init__()
        self._mean_brightness = 0
        self._mutex = QMutex()

    def run(self):
        while True:
            image_data = self.read_image_data()
            mean_brightness = self.process_image(image_data)
            self._mutex.lock()
            self._mean_brightness = mean_brightness
            self._mutex.unlock()
            self.update_image_data.emit(self._mean_brightness)
            time.sleep(0.5)

    def read_image_data(self):
        # Generate a random image (100x100 pixels, 3 color channels)
        image = np.random.randint(0, 256, (100, 100, 3), dtype=np.uint8)
        return image

    def process_image(self, image_data):
        # Convert to grayscale
        grayscale = np.mean(image_data, axis=2)
        # Calculate mean brightness
        mean_brightness = np.mean(grayscale)
        return mean_brightness

    def get_mean_brightness(self):
        self._mutex.lock()
        value = self._mean_brightness
        self._mutex.unlock()
        return value

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        self.setWindowTitle("Multi-QThread Example")
        self.setGeometry(100, 100, 400, 300)

        self.analog_label = QLabel("Analog data: --", self)
        self.image_label = QLabel("Mean brightness: --", self)

        layout = QVBoxLayout()
        layout.addWidget(self.analog_label)
        layout.addWidget(self.image_label)

        container = QWidget()
        container.setLayout(layout)
        self.setCentralWidget(container)

        self.video_thread = VideoThread()
        self.video_thread.start()

        self.analog_thread = AnalogReaderThread(self.video_thread)
        self.analog_thread.update_analog_data.connect(self.update_analog_label)
        self.analog_thread.start()

    @pyqtSlot(float, float)
    def update_analog_label(self, data, mean_brightness):
        self.analog_label.setText(f"Analog data: {data:.2f}")
        self.image_label.setText(f"Mean brightness: {mean_brightness:.2f}")

if __name__ == "__main__":
    app = QApplication(sys.argv)
    mainWin = MainWindow()
    mainWin.show()
    sys.exit(app.exec_())
3
  • 1
    While this partially improves your original post, it's still inadequate, as we don't provide code reviewing or evaluation. As said, you should explain the possible differences you get in results (including performance) or impressions of the outcome, test results, and possible doubts about your understanding of those two approaches. If they both work, and work fine for you for your current needs, only you can decide which is better. Note that I don't think you need a lock for _mean_brightness, as you're not modifying its contents, you always overwrite its reference. Commented Jul 8 at 13:33
  • @Vittor It's unspecific. This does not fit on SO. What about posting on Code Review or Software Engineering? Keep in mind that you need to specify the detailed description on the current limitation on the latency and the desired goal there, to get a good answer.
    – relent95
    Commented Jul 9 at 5:06
  • if you're reading analog values from serial port, you can get rid of one thread and just use timer to send requests and readyRead() signal to receive responces Commented Jul 10 at 14:23

0

Browse other questions tagged or ask your own question.