clojure – 确保clj-http连接管理器在所有请求完成后关闭的正确方法

我有一个代码,它是clj-http,core.async工具和atom的组合.它创建了一些线程来获取和解析一堆页面:

(defn fetch-page
  ([url] (fetch-page url nil))
  ([url conn-manager]
    (-> (http.client/get url {:connection-manager conn-manager})
        :body hickory/parse hickory/as-hickory)))

(defn- create-worker
  [url-chan result conn-manager]
  (async/thread
    (loop [url (async/<!! url-chan)]
      (when url
        (swap! result assoc url (fetch-page url conn-manager))
        (recur (async/<!! url-chan))))))

(defn fetch-pages
  [urls]
  (let [url-chan (async/to-chan urls)
        pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
        conn-manager (http.conn-mgr/make-reusable-conn-manager {})
        workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                      (range n-cpus))]
    ; wait for workers to finish and shut conn-manager down
    (dotimes [_ n-cpus] (async/alts!! workers))
    (http.conn-mgr/shutdown-manager conn-manager)

    (mapv #(get @pages %) urls)))

我的想法是使用多个线程来减少获取和解析页面的时间,但我不想重载服务器,一次发送大量请求 – 这就是使用连接管理器的原因.我不知道我的方法是否正确,欢迎提出建议.目前问题是最后一个请求失败,因为连接管理器在终止之前关闭:线程中的异常“async-thread-macro-15”java.lang.IllegalStateException:连接池关闭.

主要问题:如何在合适的时刻关闭连接管理器(以及我当前代码执行失败的原因)?边追求:我的做法是对的吗?如果没有,我可以做什么来一次获取和解析多个页面,而不是重载服务器?

谢谢!

最佳答案 问题是async / alts !!返回第一个结果(并且将继续这样做,因为工作人员永远不会改变).我认为使用async / merge来构建一个通道,然后重复读取它应该工作.

(defn fetch-pages
  [urls]
  (let [url-chan (async/to-chan urls)
        pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
        conn-manager (http.conn-mgr/make-reusable-conn-manager {})
        workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                      (range n-cpus))
        all-workers (async/merge workers)]
    ; wait for workers to finish and shut conn-manager down
    (dotimes [_ n-cpus] (async/<!! all-workers))
    (http.conn-mgr/shutdown-manager conn-manager)

    (mapv #(get @pages %) urls)))

或者,您可以重复并继续缩减工人,以便您只等待以前未完成的工人.

(defn fetch-pages
  [urls]
  (let [url-chan (async/to-chan urls)
        pages (atom (reduce (fn [m u] (assoc m u nil)) {} urls))
        conn-manager (http.conn-mgr/make-reusable-conn-manager {})
        workers (mapv (fn [_] (create-worker url-chan pages conn-manager))
                      (range n-cpus))]
    ; wait for workers to finish and shut conn-manager down
    (loop [workers workers]
      (when (seq workers)
        (let [[_ finished-worker] (async/alts!! workers)]
          (recur (filterv #(not= finished-worker %) workers)))))

    (http.conn-mgr/shutdown-manager conn-manager)    
    (mapv #(get @pages %) urls)))
点赞