我有一个
java.io.File序列流.我使用flatMapConcat创建新的文件源,类似于:
def func(files: List[File]) =
Source(files)
.map(f => openFile(f))
.flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
流结束后是否有一种简单的方法可以关闭每个文件? SomePublisher()无法关闭它.
最佳答案 因此,如果我理解正确,您可以执行以下操作:对于每个文件,您可以创建数据库对象以打开文件.因此,由于您在代码中打开数据库连接,因此您有责任关闭它.由于您正在处理有限的文件列表,因此您可以按顺序存储所有数据库连接,运行流并在流结束后关闭所有连接.
另一种方法是让自己的发布者获得文件名,打开数据库连接,从中流,关闭数据库连接.第二个选项将允许您从无限的文件列表中流式传输.
如果你想要我的代码片段给我你的功能的完整来源,我会更新它.