我正在分别使用
akka-stream和
RxJava对Scala与Java Reactive Spec实现进行比较.我的用例是一个简单的grep:给定一个目录,一个文件过滤器和一个搜索文本,我在该目录中查找具有该文本的所有匹配文件.然后我流式传输(filename – >匹配行)对.
这适用于Java,但对于Scala,没有打印任何内容.没有例外但也没有输出.
测试数据是从互联网上下载的,但正如您所看到的,代码也可以轻松地使用任何本地目录进行测试.
斯卡拉:
object Transformer {
implicit val system = ActorSystem("transformer")
implicit val materializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = {
implicitly
}
import collection.JavaConverters._
def run(path: String, text: String, fileFilter: String) = {
Source.fromIterator { () =>
Files.newDirectoryStream(Paths.get(path), fileFilter).iterator().asScala
}.map(p => {
val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
(p, lines)
})
.runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))
}
}
Java的:
public class Transformer {
public static void run(String path, String text, String fileFilter) {
Observable.from(files(path, fileFilter)).flatMap(p -> {
try {
return Observable.from((Iterable<Map.Entry<String, List<String>>>) Files.lines(p)
.filter(line -> line.contains(text))
.map(String::trim)
.collect(collectingAndThen(groupingBy(pp -> p.toAbsolutePath().toString()), Map::entrySet)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).toBlocking().forEach(e -> System.out.printf("%s -> %s.%n", e.getKey(), e.getValue()));
}
private static Iterable<Path> files(String path, String fileFilter) {
try {
return Files.newDirectoryStream(Paths.get(path), fileFilter);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
使用Scala测试进行单元测试:
class TransformerSpec extends FlatSpec with Matchers {
"Transformer" should "extract temperature" in {
Transformer.run(NoaaClient.currentConditionsPath(), "temp_f", "*.xml")
}
"Java Transformer" should "extract temperature" in {
JavaTransformer.run(JavaNoaaClient.currentConditionsPath(false), "temp_f", "*.xml")
}
}
最佳答案 Dang,我忘了Source返回Future,这意味着流程从未运行过. @MrWiggles的评论给了我一个提示.以下Scala代码生成与Java版本相同的结果.
注意:我的问题中的代码没有关闭DirectoryStream,对于包含大量文件的目录,导致java.io.IOException:系统中打开的文件过多.下面的代码正确关闭了资源.
def run(path: String, text: String, fileFilter: String) = {
val files = Files.newDirectoryStream(Paths.get(path), fileFilter)
val future = Source(files.asScala.toList).map(p => {
val lines = io.Source.fromFile(p.toFile).getLines().filter(_.contains(text)).map(_.trim).to[ImmutableList]
(p, lines)
})
.filter(!_._2.isEmpty)
.runWith(Sink.foreach(e => println(s"${e._1} -> ${e._2}")))
Await.result(future, 10.seconds)
files.close
true // for testing
}