akka学习教程(十一) akka持久化

    xiaoxiao2021-03-26  4

    akka系列文章目录

    akka学习教程(十四) akka分布式实战akka学习教程(十三) akka分布式akka学习教程(十二) Spring与Akka的集成akka学习教程(十一) akka持久化akka学习教程(十) agentakka学习教程(九) STM软件事务内存akka学习教程(八) Actor中的Future-询问模式akka学习教程(七) 内置状态转换Procedureakka学习教程(六) 路由器Routerakka学习教程(五) inbox消息收件箱akka学习教程(四) actor生命周期akka学习教程(三) 不可变对象akka学习教程(二)HelloWordakka学习教程(一)简介

    本文内容来自于官网文档示例: AKKA Persistence

    Akka持久化简介

    Akka持久化可以使有状态的actor能够保持其内部状态,以便在启动、JVM崩溃后重新启动、或在集群中迁移时,恢复它们的内部状态。 Akka持久性关键点在于,只有对actor内部状态的更改才会被持久化,而不会直接保持其当前状态(可选快照除外)。 这些更改只会追加到存储,没有任何修改,这允许非常高的事务速率和高效的复制。 通过加载持久化的数据Stateful actors可以重建内部状态。 这可以是所有修改的完整历史记录,也可以从一个快照开始,这可以显着减少恢复时间。 Akka持久性还提供至少一次消息传递语义的点对点通信。【翻译与官方文档】

    接下来将官方文档的示例运行一遍

    依赖与配置

    引入persistence依赖

    <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-persistence_2.11</artifactId> <version>2.4.16</version> </dependency>

    Akka持久性扩展依赖一些内置持久性插件,包括基于内存堆的日志,基于本地文件系统的快照存储和基于LevelDB的日志。

    基于LevelDB的插件将需要以下附加的依赖声明:

    <dependency> <groupId>org.iq80.leveldb</groupId> <artifactId>leveldb</artifactId> <version>0.7</version> </dependency> <dependency> <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> <version>1.8</version> </dependency>

    不过本文没有使用leveldb。

    添加配置文件

    这里需要配置持久化使用到的配置信息journal

    默认情况下,持久化actor或视图将使用在reference.conf配置资源的以下部分中配置的“default”日志和快照存储插件。注意,在这种情况下,actor或视图只覆盖persistenceId方法。当持久化actor或视图覆盖journalPluginId和snapshotPluginId方法时,actor或视图将由这些特定的持久性插件提供服务,而不是默认值。

    reference.conf:

    akka { loglevel = "INFO" } akka.persistence.journal.plugin = "akka.persistence.journal.inmem" # Absolute path to the default snapshot store plugin configuration entry. akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

    上代码

    package akka.serializable; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.persistence.SnapshotOffer; import akka.persistence.UntypedPersistentActor; import com.alibaba.fastjson.JSON; import java.io.Serializable; import java.util.ArrayList; import java.util.UUID; import static java.util.Arrays.asList; class Cmd implements Serializable { private static final long serialVersionUID = 1L; private final String data; public Cmd(String data) { this.data = data; } public String getData() { return data; } } class Evt implements Serializable { private static final long serialVersionUID = 1L; private final String data; private final String uuid; public Evt(String data, String uuid) { this.data = data; this.uuid = uuid; } public String getUuid() { return uuid; } public String getData() { return data; } } class ExampleState implements Serializable { private static final long serialVersionUID = 1L; private final ArrayList<String> events; public ExampleState() { this(new ArrayList<String>()); } public ExampleState(ArrayList<String> events) { this.events = events; } public ExampleState copy() { return new ExampleState(new ArrayList<String>(events)); } public void update(Evt evt) { events.add(evt.getData()); } public int size() { return events.size(); } @Override public String toString() { return events.toString(); } } class ExamplePersistentActor extends UntypedPersistentActor { LoggingAdapter log = Logging.getLogger(getContext().system (), this ); @Override public String persistenceId() { return "sample-id-1"; } private ExampleState state = new ExampleState(); public int getNumEvents() { return state.size(); } /** * Called on restart. Loads from Snapshot first, and then replays Journal Events to update state. * @param msg */ @Override public void onReceiveRecover(Object msg) { log.info("onReceiveRecover: " + JSON.toJSONString(msg)); if (msg instanceof Evt) { log.info("onReceiveRecover -- msg instanceof Event"); log.info("event --- " + ((Evt) msg).getData()); state.update((Evt) msg); } else if (msg instanceof SnapshotOffer) { log.info("onReceiveRecover -- msg instanceof SnapshotOffer"); state = (ExampleState)((SnapshotOffer)msg).snapshot(); } else { unhandled(msg); } } /** * Called on Command dispatch * @param msg */ @Override public void onReceiveCommand(Object msg) { log.info("onReceiveCommand: " + JSON.toJSONString(msg)); if (msg instanceof Cmd) { final String data = ((Cmd)msg).getData(); // generate an event we will persist after being enriched with a uuid final Evt evt1 = new Evt(data + "-" + getNumEvents(), UUID.randomUUID().toString()); final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1), UUID.randomUUID().toString()); // persist event and THEN update the state of the processor persistAll(asList(evt1, evt2), evt -> { state.update(evt); if (evt.equals(evt2)) { // broadcast event on eventstream 发布该事件 getContext().system().eventStream().publish(evt); } }); } else if (msg.equals("snap")) { // IMPORTANT: create a copy of snapshot // because ExampleState is mutable !!! saveSnapshot(state.copy()); } else if (msg.equals("print")) { System.out.println("state: " + state); } else { unhandled(msg); } } } class EventHandler extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override public void onReceive(Object msg ) throws Exception { log.info( "Handled Event: " + JSON.toJSONString(msg)); } }

    main方法

    package akka.serializable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class MainTest { public static final Logger log = LoggerFactory.getLogger(System.class); public static void main(String... args) throws Exception { final ActorSystem actorSystem = ActorSystem.create("actor-server"); final ActorRef handler = actorSystem.actorOf(Props.create(EventHandler. class)); // 订阅 actorSystem.eventStream().subscribe(handler , Evt.class); Thread.sleep(5000); final ActorRef actorRef = actorSystem.actorOf(Props.create(ExamplePersistentActor. class), "eventsourcing-processor" ); actorRef.tell( new Cmd("CMD 1" ), null); actorRef.tell( new Cmd("CMD 2" ), null); actorRef.tell( new Cmd("CMD 3" ), null); actorRef.tell( "snap", null );//发送保存快照命令 actorRef.tell( new Cmd("CMD 4" ), null); actorRef.tell( new Cmd("CMD 5" ), null); actorRef.tell( "print", null ); Thread.sleep(5000); log.info( "Actor System Shutdown Starting..." ); actorSystem.shutdown(); } }

    运行结果:

    [INFO] [01/17/2017 16:16:02.526] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover: {} [INFO] [01/17/2017 16:16:02.526] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover -- msg instanceof SnapshotOffer [INFO] [01/17/2017 16:16:02.560] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveRecover: {"instance":{"$ref":"@"}} [INFO] [01/17/2017 16:16:02.565] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 1"} [INFO] [01/17/2017 16:16:02.591] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 2"} [INFO] [01/17/2017 16:16:02.592] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 3"} [INFO] [01/17/2017 16:16:02.593] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 1-37","uuid":"fe0fca08-a415-425d-b618-c730b117ca7d"} [INFO] [01/17/2017 16:16:02.594] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: "snap" [INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 2-39","uuid":"76fbf9e8-befd-40f4-a4fe-49b5698c7c7d"} [INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 3-41","uuid":"3b9f70c2-83c5-47d3-9f0a-4710905c3cfe"} [INFO] [01/17/2017 16:16:02.595] [actor-server-akka.actor.default-dispatcher-6] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 4"} [INFO] [01/17/2017 16:16:02.596] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 4-43","uuid":"f8bb1e7e-716e-4f3c-9d81-e4570e0501c0"} [INFO] [01/17/2017 16:16:02.596] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {"data":"CMD 5"} [INFO] [01/17/2017 16:16:02.597] [actor-server-akka.actor.default-dispatcher-5] [akka://actor-server/user/$a] Handled Event: {"data":"CMD 5-45","uuid":"574f9cdf-59f3-421c-8337-df8a50c95ee6"} state: [CMD 1-0, CMD 1-1, CMD 2-2, CMD 2-3, CMD 3-4, CMD 3-5, CMD 1-6, CMD 1-7, CMD 2-8, CMD 2-9, CMD 3-10, CMD 3-11, CMD 1-12, CMD 1-13, CMD 2-14, CMD 2-15, CMD 3-16, CMD 3-17, CMD 1-18, CMD 1-19, CMD 2-20, CMD 2-21, CMD 3-22, CMD 3-23, CMD 1-24, CMD 1-25, CMD 2-26, CMD 2-27, CMD 3-28, CMD 3-29, CMD 1-30, CMD 1-31, CMD 2-32, CMD 2-33, CMD 3-34, CMD 3-35, CMD 1-36, CMD 1-37, CMD 2-38, CMD 2-39, CMD 3-40, CMD 3-41, CMD 4-42, CMD 4-43, CMD 5-44, CMD 5-45] [INFO] [01/17/2017 16:16:02.598] [actor-server-akka.actor.default-dispatcher-8] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: "print" [INFO] [01/17/2017 16:16:02.613] [actor-server-akka.actor.default-dispatcher-15] [akka://actor-server/user/eventsourcing-processor] onReceiveCommand: {} 2017-01-17 16:16:06.835 [main ] INFO System - Actor System Shutdown Starting... Process finished with exit code 0

    这里注意:如果再次执行该程序,系统会首先恢复上次处理器的状态。本程序中会首先恢复快照中的状态然后是其他的状态。 快照使得处理器的状态持久化更加高效。

    假如现在运行结果是酱紫的: state: [ CMD 1-0, CMD 1-1, CMD 2-2, CMD 2-3, CMD 3-4, CMD 3-5, CMD 1-6, CMD 1-7, CMD 2-8, CMD 2-9, CMD 3-10, CMD 3-11, CMD 4-12, CMD 4-13, CMD 5-14, CMD 5-15] 在运行一次,结果就是酱紫的: state: [ CMD 1-0, CMD 1-1, CMD 2-2, CMD 2-3, CMD 3-4, CMD 3-5, CMD 1-6, CMD 1-7, CMD 2-8, CMD 2-9, CMD 3-10, CMD 3-11, CMD 1-12, CMD 1-13, CMD 2-14, CMD 2-15, CMD 3-16, CMD 3-17, CMD 4-18, CMD 4-19, CMD 5-20, CMD 5-21]

    由于我们采用的是默认的持久化策略,会持久化到本地磁盘,进入项目统计目录下,会看到文件夹snapshots:

    参考资料

    书籍《java高并发程序设计》AKKA官方文档
    转载请注明原文地址: https://ju.6miu.com/read-600218.html

    最新回复(0)