0
\$\begingroup\$

I have a task which runs sequentially. If I have hundreds of thousands of tags, it is very time consuming. Can this be modified to run the task in parallel? Works fine upto 100 tags

 (:require [content :as db-content]
           [impl.content-tag :as db-content-tag]
           [story :as db-story]
           [tag :as db-tag]
           [transaction :as transaction]
           [log :as log]
           [config :as config]))

(defn- find-story-ids-by-tag [tag-id]
 (map :id (db-content-tag/read-contents-by-tag-id (config/db-spec) tag-id)))

(defn- update-story [txn publisher-id story-id tag-id]
 (let [{:keys [published-json]} (db-content/find-by-id txn story-id)
       updated-tags (filter (fn [tag] (not= (:id tag) tag-id)) (:tags published-json))
       _ (db-story/update-published-json-without-timestamps txn publisher-id story-id (assoc published-json :tags updated-tags))]
   (log/info {:message "[TAG-DELETION] Updated Story Tags in Published JSON"
              :publisher-id publisher-id
              :tag-id tag-id
              :story-id story-id
              :updated-tags-json updated-tags})))

(defn- delete-tag [publisher-id tag-id]
 (let [associated-content-ids (find-story-ids-by-tag tag-id)]
   (transaction/with-transaction
     [txn (config/db-spec)]
     (do
       (when (seq associated-content-ids)
         (do
           (doseq [story-id associated-content-ids]
             (update-story txn publisher-id story-id tag-id))
           (db-content-tag/delete-batch-by-tag txn tag-id associated-content-ids)
           (log/info {:message "[TAG-DELETION] Deleted from Content Tag"
                      :publisher-id publisher-id
                      :tag-id tag-id
                      :story-ids associated-content-ids})))
       (db-tag/delete txn publisher-id tag-id)
       (log/info {:message "[TAG-DELETION] Deleted Tag"
                  :publisher-id publisher-id
                  :tag-id tag-id})))))

(defn run [publisher-id tag-ids]
 (comment run 123 [4 5 6])
 (try
   (do
     (log/info {:message "[TAG-DELETION] started"
                :publisher-id publisher-id
                :tag-ids tag-ids})
     (doseq [tag-id tag-ids] (if (db-tag/find-by-id (config/db-spec) publisher-id tag-id)
                               (delete-tag publisher-id tag-id)
                               (log/info {:message "[TAG-DELETION] Tag Not Found"
                                          :publisher-id publisher-id
                                          :tag-id tag-id})))
     (log/info {:message "[TAG-DELETION] completed"
                :publisher-id publisher-id
                :tag-ids tag-ids}))
   (catch Exception e
     (log/exception e {:message "[TAG-DELETION] errored"
                       :publisher-id publisher-id
                       :tag-ids tag-ids}))))```
   

\$\endgroup\$
2
  • \$\begingroup\$ Welcome to Code Review! The current question title, which states your concerns about the code, is too general to be useful here. Please edit to the site standard, which is for the title to simply state the task accomplished by the code. Please see How to get the best value out of Code Review: Asking Questions for guidance on writing good question titles. \$\endgroup\$ Commented Jun 6, 2023 at 15:07
  • \$\begingroup\$ Do you expect a speed-up factor into the thousands from running the task in parallel? \$\endgroup\$
    – greybeard
    Commented Jun 7, 2023 at 7:35

1 Answer 1

2
\$\begingroup\$

You want to use the Claypoole library. Sample code:

(ns tst.demo.core
  (:use tupelo.core tupelo.test)
  (:require
    [com.climate.claypoole :as pool]
    ))

(defn task [] (Thread/sleep 10))

(defn slow
  []
  (newline) (prn :slow)
  (doseq [i (range 100)]
    (task)))

(defn fast
  []
  (newline) (prn :fast)
  (pool/pdoseq 8 [i (range 100)]
    (task)))

(verify
  (time (slow))
  (time (fast)))

with results:

time (clojure -X:test)

Running tests in #{"test"}

Testing tst._bootstrap

------------------------------------------
   Clojure 1.12.0-alpha3    Java 20.0.1
------------------------------------------

Testing tst.demo.core

:slow
"Elapsed time: 1012.908212 msecs"

:fast
"Elapsed time: 137.932203 msecs"

Ran 2 tests containing 0 assertions.
0 failures, 0 errors.
  36.50s user 0.85s system 284% cpu 13.150 total

So we see an 8x speedup by using 1 thread on each cpu core (i.e. the 8 after pdoseq). For I/O dominated workloads, you could benefit by using many more threads (eg 64).

Built using my favorite template project.

\$\endgroup\$

Not the answer you're looking for? Browse other questions tagged or ask your own question.