4
votes

I'm attempting to use the clojure pantomime library to extract/ocr text from a large number of tif documents (among others).

My plan has been to use pmap for to apply the mapping over a sequence of input data (from a postgres database) and then update that same postgres database with the tika/tesseract OCR output. This has been working ok, however i notice in htop that many of the cores are idle at times.

Is there anyway to reconcile this, and what steps can i take to determine why this may be blocking somewhere? All processing occurs on a single tif file, and each thread is entirely mutually exclusive.

Additional info:

  1. some tika/tesseract processes take 3 seconds, others take up to 90 seconds. Generally speaking, tika is heavily CPU bound. I have ample memory available according to htop.
  2. postgres has no locking issues in session management, so i don't think thats holding me up.
  3. maybe somewhere future's are waiting to deref? how to tell where?

Any tips appreciated, thanks. Code added below.

(defn parse-a-path [{:keys [row_id, file_path]}]
      (try
        (let [
              start        (System/currentTimeMillis)
              mime_type    (pm/mime-type-of file_path)
              file_content (-> file_path (extract/parse) :text)
              language     (pl/detect-language file_content)
              ]
          {:mime_type   mime_type
          :file_content file_content
          :language     language
          :row_id       row_id
          :parse_time_in_seconds   (float (/ ( - (System/currentTimeMillis) start) 100))
          :record_status "doc parsed"})))


(defn fetch-all-batch []
      (t/info (str "Fetching lazy seq. all rows for batch.") )
      (jdbc/query (db-connection)
                  ["select
                   row_id,
                   file_path ,
                   file_extension
                   from the_table" ]))


(defn update-a-row [{:keys [row_id, file_path, file_extension] :as all-keys}]
      (let [parse-out (parse-a-path all-keys )]
        (try
          (doall
            (jdbc/execute!
              (db-connection)
              ["update the_table
               set
               record_last_updated        = current_timestamp ,
               file_content          = ?                 ,
               mime_type             = ?                 ,
               language              = ?                 ,
               parse_time_in_seconds = ?                 ,
               record_status         = ?
               where row_id = ? "
               (:file_content          parse-out) ,
               (:mime_type             parse-out) ,
               (:language              parse-out) ,
               (:parse_time_in_seconds parse-out) ,
               (:record_status         parse-out) ,
               row_id ])
            (t/debug (str "updated row_id " (:row_id parse-out) " (" file_extension ") "
                          " in " (:parse_time_in_seconds parse-out) " seconds." )))
          (catch  Exception _ ))))

(dorun
  (pmap
    #(try
       (update-a-row %)
       (catch Exception e (t/error (.getNextException e)))
       )
    fetch-all-batch )
  )
2
Rasterize has suggested that pmap is "broken" due to chunked sequences. (rasterize.io/blog/clojure-the-good-parts.html). - BWStearns
Please, share a snippet of your code how you are using pmap or future. It's hard to guess if you made a mistake or it's a limitation of those tools. - Piotrek Bzdyl
addad code snipped, thanks. - joefromct

2 Answers

3
votes

pmap runs the map function in parallel on batches of (+ 2 cores), but preserves ordering. This means if you have 8 cores, a batch of 10 items will be processed, but the new batch will only be started if all 10 have finished.

You could create your own code that uses combinations of future, delay and deref, which would be good academic exercise. After that, you can throw out your code and start using the claypoole library, which has a set of abstractions that cover the majority of uses of future.

For this specific case, use their unordered pmap or pfor implementations (upmap and upfor), which do exactly the same thing pmap does but do not have ordering; new items are picked up as soon as any one item in the batch is finished.

In situations where IO is the main bottleneck, or where processing times can greatly vary between items of work, it is the best way to parallelize map or for operations.

Of course you should take care not to rely on any sort of ordering for the return values.

  (require '[com.climate.claypoole :as cp])

  (cp/upmap (cp/ncpus)
    #(try
       (update-a-row %)
       (catch Exception e (t/error (.getNextException e)))
       )
    fetch-all-batch )
1
votes

I had a similar problem some time ago. I guess that you are making the same assumptions as me:

  • pmap calls f in parallel. But that doesn't mean that the work is shared equally. As you said, some take 3 seconds whereas other take 90 seconds. The thread that finished in 3 seconds does NOT ask the other ones to share some of the work left to do. So the finished threads just wait iddle until the last one finishes.

  • you didn't describe exactly how is your data but I will assume that you are using some kind of lazy sequence, which is bad for parallel processing. If your process is CPU bounded and you can hold your entire input in memory then prefer the use of clojure.core.reducers ('map', 'filter' and specially 'fold') to the use of the lazy map, filter and others.

In my case, these tips drop the processing time from 34 to a mere 8 seconds. Hope it helps