Background:
The program reads 1000000 lines in the file. Every four line will be parsed into an object in a vector. If it has 2 objects with the same name, it will drop 1 object and increment one of the field in another object. After that, the vector is required to sort out. Finally, it writes to output file.
The main code
#include <cmath>
#include <algorithm>
#include <fstream>
#include <iostream>
#include <map>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
std::queue<FASTQentry> shared_queue;
bool queue_closed = false;
std::mutex shared_queue_mutex;
int main(int argc, char* argv[]) {
std::ifstream input_file;
std::ofstream output_file;
std::size_t total_reads = 0;
std::vector<FASTQentry> fqs;
// parse input fastq file
// fqs = parse_fastq_file(input_file, total_reads);
std::thread readerThread(parse_fastq_file_mutli, std::ref(input_file), std::ref(total_reads));
std::thread processThread(count_frequencies_and_merge_qualities_multi, std::ref(fqs));
readerThread.join();
processThread.join();
sort(fqs.begin(), fqs.end(), sort_by_freq_quality);
for (auto& out_entry : fqs) {
output_file << out_entry;
}
return 0;
}
Class
class FASTQentry {
public:
FASTQentry() = default;
FASTQentry(const std::string& hdr, const std::string& seq, const std::string& plus, const std::vector<double>& probs) {
header = hdr;
sequence = seq;
this->plus = plus;
probas = probs;
freq = 1;
}
std::string header;
std::string sequence;
std::string plus;
std::vector<double> probas;
int freq;
friend std::ostream& operator<<(std::ostream& o, const FASTQentry& fq_entry);
friend std::istream& operator>>(std::istream& is, FASTQentry& fq_entry);
};
Parse File Function
void parse_fastq_file_mutli(std::ifstream& infile, std::size_t& total_reads) {
while (!infile.eof()) {
FASTQentry fq;
fq.freq = 1;
infile >> fq;
{
std::lock_guard<std::mutex> lock(shared_queue_mutex);
shared_queue.push(fq);
}
++total_reads;
}
{
std::lock_guard<std::mutex> lock(shared_queue_mutex);
queue_closed = true;
}
}
std::istream& operator>>(std::istream& is, FASTQentry& fq_entry) {
is.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
std::getline(is, fq_entry.sequence);
is.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
std::string quals;
std::getline(is, quals);
fq_entry.probas = qualities_to_probabilities(quals);
return is;
}
std::vector<double> qualities_to_probabilities(const std::string& quals) {
std::vector<double> result;
result.reserve(quals.size());
for (auto& q : quals) {
result.push_back(std::pow(10, (q - 33) / -10));
}
return result;
}
Processing Function
void count_frequencies_and_merge_qualities_multi(std::vector<FASTQentry>& fqs) {
std::map<std::string, int> read2fqs;
bool is_closed;
bool empty_entry;
int counter = 1;
// Check if the file has been closed or if there are no more entries
{
std::lock_guard<std::mutex> lock(shared_queue_mutex);
is_closed = queue_closed;
empty_entry = shared_queue.empty();
}
while (!is_closed || !empty_entry) {
if (!empty_entry) {
FASTQentry fq;
{
std::lock_guard<std::mutex> lock(shared_queue_mutex);
fq = shared_queue.front();
shared_queue.pop();
}
if (read2fqs.count(fq.sequence) > 0) {
int index = read2fqs[fq.sequence] - 1;
fqs[index].freq += 1;
for (int i = 0; i < fqs[index].probas.size(); ++i) {
fqs[index].probas[i] += fq.probas[i];
}
} else {
read2fqs[fq.sequence] = counter;
fq.header = "@seq_" + std::to_string(counter) + "_x";
fq.plus = "+";
fqs.push_back(fq);
counter += 1;
}
}
// Check if the file has been closed or if there are no more entries
{
std::lock_guard<std::mutex> lock(shared_queue_mutex);
is_closed = queue_closed;
empty_entry = shared_queue.empty();
}
}
}
Sorting Function
bool sort_by_freq_quality(FASTQentry fq1, FASTQentry fq2) {
if (fq1.freq != fq2.freq) {
return fq1.freq > fq2.freq;
} else {
// cumulative sum of quality
auto s1 = 0.;
auto s2 = 0.;
for (auto i = 0ul; i < fq1.probas.size(); ++i) {
s1 += fq1.probas[i];
}
for (auto i = 0ul; i < fq2.probas.size(); ++i) {
s2 += fq2.probas[i];
}
s1 /= fq1.freq;
s2 /= fq2.freq;
if (s1 != s2){
return s1 > s2;
}
return fq1.sequence > fq2.sequence;
}
}
Write to File
std::string probabilities_to_qualities(const std::vector<double>& probas, int freq) {
std::string result;
result.reserve(probas.size());
for (auto& p : probas) {
result.push_back(-10 * std::log10(p / freq) + 33);
}
return result;
}
I convert the code from single thread to this but I can't see much improvement. What is the bottleneck of these code?
Thanks!