本文内容来自于官网文档示例: AKKA Persistence
Akka持久化可以使有状态的actor能够保持其内部状态,以便在启动、JVM崩溃后重新启动、或在集群中迁移时,恢复它们的内部状态。 Akka持久性关键点在于,只有对actor内部状态的更改才会被持久化,而不会直接保持其当前状态(可选快照除外)。 这些更改只会追加到存储,没有任何修改,这允许非常高的事务速率和高效的复制。 通过加载持久化的数据Stateful actors可以重建内部状态。 这可以是所有修改的完整历史记录,也可以从一个快照开始,这可以显着减少恢复时间。 Akka持久性还提供至少一次消息传递语义的点对点通信。【翻译与官方文档】
接下来将官方文档的示例运行一遍
Akka持久性扩展依赖一些内置持久性插件,包括基于内存堆的日志,基于本地文件系统的快照存储和基于LevelDB的日志。
基于LevelDB的插件将需要以下附加的依赖声明:
<dependency> <groupId>org.iq80.leveldb</groupId> <artifactId>leveldb</artifactId> <version>0.7</version> </dependency> <dependency> <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> <version>1.8</version> </dependency>不过本文没有使用leveldb。
这里需要配置持久化使用到的配置信息journal
默认情况下,持久化actor或视图将使用在reference.conf配置资源的以下部分中配置的“default”日志和快照存储插件。注意,在这种情况下,actor或视图只覆盖persistenceId方法。当持久化actor或视图覆盖journalPluginId和snapshotPluginId方法时,actor或视图将由这些特定的持久性插件提供服务,而不是默认值。reference.conf:
akka { loglevel = "INFO" } akka.persistence.journal.plugin = "akka.persistence.journal.inmem" # Absolute path to the default snapshot store plugin configuration entry. akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"main方法
package akka.serializable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class MainTest { public static final Logger log = LoggerFactory.getLogger(System.class); public static void main(String... args) throws Exception { final ActorSystem actorSystem = ActorSystem.create("actor-server"); final ActorRef handler = actorSystem.actorOf(Props.create(EventHandler. class)); // 订阅 actorSystem.eventStream().subscribe(handler , Evt.class); Thread.sleep(5000); final ActorRef actorRef = actorSystem.actorOf(Props.create(ExamplePersistentActor. class), "eventsourcing-processor" ); actorRef.tell( new Cmd("CMD 1" ), null); actorRef.tell( new Cmd("CMD 2" ), null); actorRef.tell( new Cmd("CMD 3" ), null); actorRef.tell( "snap", null );//发送保存快照命令 actorRef.tell( new Cmd("CMD 4" ), null); actorRef.tell( new Cmd("CMD 5" ), null); actorRef.tell( "print", null ); Thread.sleep(5000); log.info( "Actor System Shutdown Starting..." ); actorSystem.shutdown(); } }运行结果:
[INFO] [01/17/2017 16:16:02.526] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover: {} [INFO] [01/17/2017 16:16:02.526] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover -- msg instanceof SnapshotOffer [INFO] [01/17/2017 16:16:02.560] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover: {"instance":{"$ref":"@"}} [INFO] [01/17/2017 16:16:02.565] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 1"} [INFO] [01/17/2017 16:16:02.591] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 2"} [INFO] [01/17/2017 16:16:02.592] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 3"} [INFO] [01/17/2017 16:16:02.593] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 1-37","uuid":"fe0fca08-a415-425d-b618-c730b117ca7d"} [INFO] [01/17/2017 16:16:02.594] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: "snap" [INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 2-39","uuid":"76fbf9e8-befd-40f4-a4fe-49b5698c7c7d"} [INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 3-41","uuid":"3b9f70c2-83c5-47d3-9f0a-4710905c3cfe"} [INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 4"} [INFO] [01/17/2017 16:16:02.596] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 4-43","uuid":"f8bb1e7e-716e-4f3c-9d81-e4570e0501c0"} [INFO] [01/17/2017 16:16:02.596] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 5"} [INFO] [01/17/2017 16:16:02.597] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 5-45","uuid":"574f9cdf-59f3-421c-8337-df8a50c95ee6"} state: [CMD 1-0, CMD 1-1, CMD 2-2, CMD 2-3, CMD 3-4, CMD 3-5, CMD 1-6, CMD 1-7, CMD 2-8, CMD 2-9, CMD 3-10, CMD 3-11, CMD 1-12, CMD 1-13, CMD 2-14, CMD 2-15, CMD 3-16, CMD 3-17, CMD 1-18, CMD 1-19, CMD 2-20, CMD 2-21, CMD 3-22, CMD 3-23, CMD 1-24, CMD 1-25, CMD 2-26, CMD 2-27, CMD 3-28, CMD 3-29, CMD 1-30, CMD 1-31, CMD 2-32, CMD 2-33, CMD 3-34, CMD 3-35, CMD 1-36, CMD 1-37, CMD 2-38, CMD 2-39, CMD 3-40, CMD 3-41, CMD 4-42, CMD 4-43, CMD 5-44, CMD 5-45] [INFO] [01/17/2017 16:16:02.598] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: "print" [INFO] [01/17/2017 16:16:02.613] [actor-server-akka.actor.default-dispatcher-15] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {} 2017-01-17 16:16:06.835 [main ] INFO System - Actor System Shutdown Starting... Process finished with exit code 0这里注意:如果再次执行该程序,系统会首先恢复上次处理器的状态。本程序中会首先恢复快照中的状态然后是其他的状态。 快照使得处理器的状态持久化更加高效。
假如现在运行结果是酱紫的: state: [ CMD 1-0, CMD 1-1, CMD 2-2, CMD 2-3, CMD 3-4, CMD 3-5, CMD 1-6, CMD 1-7, CMD 2-8, CMD 2-9, CMD 3-10, CMD 3-11, CMD 4-12, CMD 4-13, CMD 5-14, CMD 5-15] 在运行一次,结果就是酱紫的: state: [ CMD 1-0, CMD 1-1, CMD 2-2, CMD 2-3, CMD 3-4, CMD 3-5, CMD 1-6, CMD 1-7, CMD 2-8, CMD 2-9, CMD 3-10, CMD 3-11, CMD 1-12, CMD 1-13, CMD 2-14, CMD 2-15, CMD 3-16, CMD 3-17, CMD 4-18, CMD 4-19, CMD 5-20, CMD 5-21]由于我们采用的是默认的持久化策略,会持久化到本地磁盘,进入项目统计目录下,会看到文件夹snapshots: