2
\$\begingroup\$

Background:

The program reads 1000000 lines in the file. Every four line will be parsed into an object in a vector. If it has 2 objects with the same name, it will drop 1 object and increment one of the field in another object. After that, the vector is required to sort out. Finally, it writes to output file.

The main code

#include <cmath>

#include <algorithm>
#include <fstream>
#include <iostream>
#include <map>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

std::queue<FASTQentry> shared_queue;
bool queue_closed = false;
std::mutex shared_queue_mutex;

int main(int argc, char* argv[]) {
  std::ifstream input_file;
  std::ofstream output_file;

  std::size_t total_reads = 0;
  std::vector<FASTQentry> fqs;

  // parse input fastq file
  // fqs = parse_fastq_file(input_file, total_reads);

  std::thread readerThread(parse_fastq_file_mutli, std::ref(input_file), std::ref(total_reads));
  std::thread processThread(count_frequencies_and_merge_qualities_multi, std::ref(fqs));

  readerThread.join();
  processThread.join();

  sort(fqs.begin(), fqs.end(), sort_by_freq_quality);

  for (auto& out_entry : fqs) {
    output_file << out_entry;
  }

  return 0;
}

Class

class FASTQentry {
 public:
  FASTQentry() = default;
  FASTQentry(const std::string& hdr, const std::string& seq, const std::string& plus, const std::vector<double>& probs) {
    header = hdr;
    sequence = seq;
    this->plus = plus;
    probas = probs;
    freq = 1;
  }

  std::string header;
  std::string sequence;
  std::string plus;
  std::vector<double> probas;
  int freq;

  friend std::ostream& operator<<(std::ostream& o, const FASTQentry& fq_entry);
  friend std::istream& operator>>(std::istream& is, FASTQentry& fq_entry);
};

Parse File Function

void parse_fastq_file_mutli(std::ifstream& infile, std::size_t& total_reads) {
  while (!infile.eof()) {
    FASTQentry fq;
    fq.freq = 1;
    infile >> fq;
    {
      std::lock_guard<std::mutex> lock(shared_queue_mutex);
      shared_queue.push(fq);
    }
    ++total_reads;
  }

  {
    std::lock_guard<std::mutex> lock(shared_queue_mutex);
    queue_closed = true;
  }
}

std::istream& operator>>(std::istream& is, FASTQentry& fq_entry) {
  is.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
  std::getline(is, fq_entry.sequence);
  is.ignore(std::numeric_limits<std::streamsize>::max(), '\n');

  std::string quals;
  std::getline(is, quals);
  fq_entry.probas = qualities_to_probabilities(quals);
  
  return is;
}

std::vector<double> qualities_to_probabilities(const std::string& quals) {
  std::vector<double> result;
  result.reserve(quals.size());
  for (auto& q : quals) {
    result.push_back(std::pow(10, (q - 33) / -10));
  }
  return result;
}

Processing Function

void count_frequencies_and_merge_qualities_multi(std::vector<FASTQentry>& fqs) {
  std::map<std::string, int> read2fqs;

  bool is_closed;
  bool empty_entry;
  int counter = 1;

  // Check if the file has been closed or if there are no more entries
  {
    std::lock_guard<std::mutex> lock(shared_queue_mutex);
    is_closed = queue_closed;
    empty_entry = shared_queue.empty();
  }

  while (!is_closed || !empty_entry) {
    if (!empty_entry) {
      FASTQentry fq;
      {
        std::lock_guard<std::mutex> lock(shared_queue_mutex);
        fq = shared_queue.front();
        shared_queue.pop();
      }

      if (read2fqs.count(fq.sequence) > 0) {
        int index = read2fqs[fq.sequence] - 1;
        fqs[index].freq += 1;

        for (int i = 0; i < fqs[index].probas.size(); ++i) {
          fqs[index].probas[i] += fq.probas[i];
        }
      } else {
        read2fqs[fq.sequence] = counter;
        fq.header = "@seq_" + std::to_string(counter) + "_x";
        fq.plus = "+";
        fqs.push_back(fq);
        counter += 1;
      }
    }

    // Check if the file has been closed or if there are no more entries
    {
      std::lock_guard<std::mutex> lock(shared_queue_mutex);
      is_closed = queue_closed;
      empty_entry = shared_queue.empty();
    }
  }
}

Sorting Function

bool sort_by_freq_quality(FASTQentry fq1, FASTQentry fq2) {
  if (fq1.freq != fq2.freq) {
    return fq1.freq > fq2.freq;
  } else {
    // cumulative sum of quality
    auto s1 = 0.;
    auto s2 = 0.;
    for (auto i = 0ul; i < fq1.probas.size(); ++i) {
      s1 += fq1.probas[i];
    }
    for (auto i = 0ul; i < fq2.probas.size(); ++i) {
      s2 += fq2.probas[i];
    }
    s1 /= fq1.freq;
    s2 /= fq2.freq;
    if (s1 != s2){
      return s1 > s2;
    }
    return fq1.sequence > fq2.sequence;
  }
}

Write to File

std::string probabilities_to_qualities(const std::vector<double>& probas, int freq) {
  std::string result;
  result.reserve(probas.size());
  for (auto& p : probas) {
    result.push_back(-10 * std::log10(p / freq) + 33);
  }
  return result;
}

I convert the code from single thread to this but I can't see much improvement. What is the bottleneck of these code?

Thanks!

\$\endgroup\$
1
  • 1
    \$\begingroup\$ "What is the bottleneck of these code?" One could guess. Or one could measure. Post the profiler results, which will reveal the hot spot. \$\endgroup\$
    – J_H
    Commented Jan 29 at 18:32

1 Answer 1

1
\$\begingroup\$

Use or make a thread-safe queue

You are using a std::queue and a separate std::mutex to ensure thread-safe access. However, it's easy to make mistakes or introduce inefficiencies. For example, between the check for shared_queue.empty() and the call to shared_queue.pop_front(), you have unlocked the mutex. This is not a problem right now, since other threads only ever push items, so it can not happen that you pop while the queue is empty. But what if you want to speed up processing and want to add a second thread running count_frequencies_and_merge_qualities_multi()?

Second, if you process the data faster than you can read from file, your data processing thread will do a busy loop, wasting CPU time, wasting power and increasing the temperature (which might cause the CPU to throttle). Ideally, you go to sleep until another thread pushes a new item to the queue.

A proper thread-safe queue class takes care of all these things. You can find examples of them here on Code Review (be sure to read the answers of course). They also make the code using them simpler. Consider being able to write:

ThreadSafeQueue<FASTQentry> shared_queue;
…
void parse_fastq_file_mutli(…) {
  while (!infile.eof()) {
    …
    infile >> fq;
    shared_queue.push(fq);
  }

  shared_queue.close();
}
…
void count_frequencies_and_merge_qualities_multi(…) {
  while (std::optional<FASTQentry> fq = shared_queue.pop()) {
    …
  }
}

Avoid code duplication

You have duplicated code in sort_by_freq_quality(). Create a function that calculates the "quality" for a single FASTQentry(), so that you can then write:

bool sort_by_freq_quality(FASTQentry fq1, FASTQentry fq2) {
  if (fq1.freq != fq2.freq) {
    return fq1.freq > fq2.freq;
  } else {
    return quality(fq1) > quality(fq2);
  }
}

Make more use of the standard library

Newer versions of the C++ standard add simpler and more useful features, such as std::ranges::sort(). But there are also features that have existed for a long time that might help simplify your code. For example, you can use std::istream_iterator along with std::copy to simplify reading the input, at least with a thread safe queue class that supports push_back():

void parse_fastq_file_mutli(std::ifstream& infile) {
  std::copy(std::istream_iterator<FASTQentry>(infile),
            std::istream_iterator<FASTQentry>(),
            std::back_inserter(shared_queue));

  shared_queue.close();
}

But you can also make use of std::accumulate to sum things and std::transform() to do conversions on ranges.

Missing error checking

Lots of things can go wrong when reading from and writing to files. Make sure you check the state of the stream to ensure it is still good. If you did encounter an error, make sure your program exits with an error message written to std::cerr and error code EXIT_FAILURE, otherwise errors might get missed and unexpected things might happen.

Avoid global variables

You already pass references to local variables in main() to the threads that you are starting, but why is the queue a global variable? You can declare that in main() and pass a reference to it as well.

\$\endgroup\$

Not the answer you're looking for? Browse other questions tagged or ask your own question.