我目前正在编写自己的Akka SyncWriteJournal API插件来实现与HSQLDB的连接.
问题是我不理解方法doAsyncReplayMessages的要求.它声明它需要返回一个未来,并且所有消息都应该由replayCallback调用.
假设我有一个返回消息列表的查询:List< Message>消息.任何人都可以提供如何使用replayCallback的最小示例(有解释),以及使用该列表正确实现该方法的Future吗? replayCallback和Future如何协同工作以及doAsyncReplayMessages方法应该返回什么?
谢谢!
-编辑-
在一些评论的帮助下,我提出了一个不完整的实现,但结合了提出的想法:
public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
final Procedure<PersistentRepr> replayCallback) {
final ExecutionContext ec = context().system().dispatcher();
final Future<Void> future = Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
final List<Message> messages = getMessages();
for (int i = 0; i < feedbackList.size(); i++) {
replayCallback.apply(
new PersistentImpl(messages.get(i), i, persistenceId, false, null, null));
}
return null;
}
}, ec);
return future;
}
你可能会看到它错过了一些我仍然缺少的关键概念. PersistentImpl需要一个参数Seq< String>确认哪个仍为空.也许更重要的是我返回null,因为未来期望Void作为返回类型,我不确定如何实现.它目前抛出一个NPE:
[ERROR] [08/28/2014 12:31:19.582] [akkaSystem-akka.actor.default-dispatcher-7] [akka://akkaSystem/system/journal] null
java.lang.NullPointerException
at akka.persistence.journal.japi.AsyncRecovery.asyncReadHighestSequenceNr(AsyncRecovery.scala:26)
at akka.persistence.journal.SyncWriteJournal$$anonfun$receive$1.applyOrElse(SyncWriteJournal.scala:53)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.persistence.journal.japi.SyncWriteJournal.aroundReceive(SyncWriteJournal.scala:16)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
最佳答案 您可以简单地将阻塞操作包装在Future中,例如:Future {fetchStuff()}.
您可以参考dnvriend/akka-persistence-jdbc: JdbcSyncWriteJournal以获得同步日志的完整实现.