我正在通过
java nio接口读取文件,直接读取到流.这将启动异步http请求并在将来处理这些请求.每10,000条记录,我将此结果上传到服务器并清除记录,这样可以清除内存消耗.
我从字节数组开始,它不断地保留在内存中. http客户端(commons CloseableHttpAsyncClient)触发请求异步,因此这些请求在开始时一次性触发.
有没有办法限制lambda流的方式,我可以限制同时处理的行数?从而控制我的记忆力.
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
.lines()
.map(line -> CsvLine.create(line))
.filter(line -> !line.isHeader())
.forEach(line -> getResult(line, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
result.addLine(response);
} catch (IOException e) {
LOGGER.error("IOException, cannot write to server", e);
todo.set(-1); // finish in error
} finally {
todo.decrementAndGet();
}
}
@Override
public void failed(Exception ex) {
handleError();
}
@Override
public void cancelled() {
handleError();
}
}
));
最佳答案 您可以尝试使用信号量来限制流,以便一次只有特定的最大异步请求.它可能看起来像这样:
Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS, true); // false if FIFO is not important
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
.lines()
.map(line -> CsvLine.create(line))
.filter(line -> !line.isHeader())
.forEach(line -> {
try {
if (!semaphore.tryAcquire(ASYNC_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)) {
handleTimeout();
} else {
getResult(line, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
result.addLine(response);
} catch (IOException e) {
LOGGER.error("IOException, cannot write to server", e);
todo.set(-1); // finish in error
} finally {
todo.decrementAndGet();
semaphore.release();
}
}
@Override
public void failed(Exception ex) {
handleError();
semaphore.release();
}
@Override
public void cancelled() {
handleError();
semaphore.release();
}
}
);
}
} catch (InterruptedException e) {
// handle appropriately
}
});