内存消耗java文件流

我正在通过
java nio接口读取文件,直接读取到流.这将启动异步http请求并在将来处理这些请求.每10,000条记录,我将此结果上传到服务器并清除记录,这样可以清除内存消耗.

我从字节数组开始,它不断地保留在内存中. http客户端(commons CloseableHttpAsyncClient)触发请求异步,因此这些请求在开始时一次性触发.

有没有办法限制lambda流的方式,我可以限制同时处理的行数?从而控制我的记忆力.

new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
    .lines()
    .map(line -> CsvLine.create(line))
    .filter(line -> !line.isHeader())
    .forEach(line -> getResult(line, new FutureCallback<HttpResponse>() {
        @Override
        public void completed(HttpResponse response) {
            try {
                result.addLine(response);
            } catch (IOException e) {
                LOGGER.error("IOException, cannot write to server", e);
                todo.set(-1); // finish in error
            } finally {
                todo.decrementAndGet();
            }
       }

       @Override
       public void failed(Exception ex) {
           handleError();
       }

       @Override
       public void cancelled() {
           handleError();
       }
    }
));

最佳答案 您可以尝试使用信号量来限制流,以便一次只有特定的最大异步请求.它可能看起来像这样:

Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS, true); // false if FIFO is not important
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file)))
.lines()
        .map(line -> CsvLine.create(line))
        .filter(line -> !line.isHeader())
        .forEach(line -> {
            try {
                if (!semaphore.tryAcquire(ASYNC_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    handleTimeout();
                } else {
                    getResult(line, new FutureCallback<HttpResponse>() {
                        @Override
                        public void completed(HttpResponse response) {
                            try {
                                result.addLine(response);
                            } catch (IOException e) {
                                LOGGER.error("IOException, cannot write to server", e);
                                todo.set(-1); // finish in error
                            } finally {
                                todo.decrementAndGet();
                                semaphore.release();
                            }
                        }

                        @Override
                        public void failed(Exception ex) {
                            handleError();
                            semaphore.release();
                        }

                        @Override
                        public void cancelled() {
                            handleError();
                            semaphore.release();
                        }
                    }
                    );
                }
            } catch (InterruptedException e) {
                // handle appropriately
            }

        });
点赞