Clojure / Java:在Amazon S3数据流上执行复杂操作时最小化带宽消耗的最有效方法

我正在使用BufferedReader执行对象的流式读取.

我需要用这个对象做两件事:

>将其传递给SuperCSV csv阅读器
>获取原始线并将它们保持在(Clojure)懒惰序列中

目前,我不得不使用两个不同的BufferedReaders:一个作为SuperCSV CSV读取器类的参数,另一个用于初始化延迟的原始行序列.我实际上两次下载S3对象,这是昂贵的($)和慢.

我的一位同事指出,我正在寻找类似于Unix“tee”命令的东西.一个BufferedReader,它可以以某种方式被“拆分”,下载一大块数据,并将一个副本传递给延迟序列和csv阅读器功能将是有用的.

我目前正在调查是否可以将惰性序列包装在BufferedReader中并将其传递给super csv.当将非常大的延迟序列传递给多个消费者时,我遇到了一些Java堆空间问题,所以我有点担心使用这个解决方案.

另一种解决方案是在本地下载文件,然后在此文件上打开两个流.这消除了流式传输背后的原始动机:允许在数据开始到达时立即开始处理文件.

最后的解决方案,也是我认为只有在没有其他工作的情况下才能实现的解决方案是实现我自己的CSV读取器,它返回解析的CSV和原始未解析的行.如果您使用的是一个非常可靠的CSV读取器,它可以返回已解析的CSV数据的Java Hash和原始未解析的行,请告诉我们!

谢谢!

最佳答案 我倾向于从网络中创建一系列线路,然后将其交给需要在该seq上工作的许多进程;持久性数据结构很酷.如果需要将一系列字符串转换为可以移交给SuperCSV api的Reader,这似乎有效:

(import '[java.io Reader StringReader])

(defn concat-reader
  "Returns a Reader that reads from a sequence of strings."
  [lines]
  (let [srs (atom (map #(StringReader. %) lines))]
    (proxy [Reader] []
      (read 
        ([] 
          (let [c (.read (first @srs))]
            (if (and (neg? c) (swap! srs next))
              (.read this)
              c)))
        ([cbuf] 
          (.read this cbuf 0 (count cbuf)))
        ([cbuf off len]
          (let [actual (.read (first @srs) cbuf off len)]
            (if (and (neg? actual) (swap! srs next))
              (.read this cbuf off len)
              actual))))
      (close [] ))))

例如.

user=> (def r (concat-reader ["foo" "bar"]))
#'user/r
user=> (def cbuf (char-array 2))
#'user/cbuf
user=> (.read r cbuf)
2
user=> (seq cbuf)
(\f \o)
user=> (char (.read r))
\o
user=> (char (.read r))
\b
点赞