This commit is contained in:
Christoph Stahl 2023-05-17 13:43:32 +02:00
parent b0c7abc923
commit 160bf32174
7 changed files with 182 additions and 83 deletions

View File

@ -24,11 +24,13 @@ public class AkkaMainSystem extends AbstractBehavior<AkkaMainSystem.Create> {
private Behavior<Create> onCreate(Create command) { private Behavior<Create> onCreate(Create command) {
//#create-actors //#create-actors
ActorRef<ExampleActor.Message> a = this.getContext().spawn(ExampleActor.create("Alice"), "alice"); ActorRef<Queue.Message> queue = this.getContext().spawn(Queue.create(), "queue");
ActorRef<ExampleTimerActor.Message> b = this.getContext().spawn(ExampleTimerActor.create(), "timeractor"); ActorRef<Producer.Message> producer = this.getContext().spawn(Producer.create(queue), "producer");
ActorRef<ConsumerFactory.Message> cFactory = this.getContext().spawn(ConsumerFactory.create(queue), "cFactory");
//#create-actors //#create-actors
a.tell(new ExampleActor.ExampleMessage(this.getContext().getSelf(),"Test123")); producer.tell(new Producer.Start(90));
cFactory.tell(new ConsumerFactory.Start(100));
return this; return this;
} }
} }

View File

@ -0,0 +1,46 @@
package com.example;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
public class Consumer extends AbstractBehavior<Consumer.Message> {
public interface Message {};
public record Reply(int i) implements Message { }
public enum Error implements Message {
EMPTY_QUEUE
}
public static Behavior<Message> create(ActorRef<Queue.Message> queue) {
return Behaviors.setup(context -> new Consumer(context, queue));
}
private final ActorRef<Queue.Message> queue;
private Consumer(ActorContext<Message> context, ActorRef<Queue.Message> queue) {
super(context);
this.queue = queue;
this.queue.tell(new Queue.Get(this.getContext().getSelf()));
}
@Override
public Receive<Message> createReceive() {
return newReceiveBuilder()
.onMessage(Reply.class, this::onReply)
.onMessage(Error.class, this::onError)
.build();
}
private Behavior<Message> onError(Error msg) {
this.getContext().getLog().info("ERROR: {}", msg.toString());
return this;
}
private Behavior<Message> onReply(Reply msg) {
this.getContext().getLog().info("Got: {}", msg.i);
return this;
}
}

View File

@ -0,0 +1,40 @@
package com.example;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
public class ConsumerFactory extends AbstractBehavior<ConsumerFactory.Message> {
public interface Message {};
public record Start(int amount) implements Message { }
public static Behavior<Message> create(ActorRef<Queue.Message> queue) {
return Behaviors.setup(context -> new ConsumerFactory(context, queue));
}
private final ActorRef<Queue.Message> queue;
private ConsumerFactory(ActorContext<Message> context, ActorRef<Queue.Message> queue) {
super(context);
this.queue = queue;
}
@Override
public Receive<Message> createReceive() {
return newReceiveBuilder()
.onMessage(Start.class, this::onStart)
.build();
}
private Behavior<Message> onStart(Start msg) {
for(int i=0; i < msg.amount; i++) {
this.getContext().spawnAnonymous(Consumer.create(this.queue));
}
return this;
}
}

View File

@ -1,35 +0,0 @@
package com.example;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.*;
public class ExampleActor extends AbstractBehavior<ExampleActor.Message> {
public interface Message {};
public record ExampleMessage(ActorRef<AkkaMainSystem.Create> someReference, String someString) implements Message { }
public static Behavior<Message> create(String name) {
return Behaviors.setup(context -> new ExampleActor(context, name));
}
private final String name;
private ExampleActor(ActorContext<Message> context, String name) {
super(context);
this.name = name;
}
@Override
public Receive<Message> createReceive() {
return newReceiveBuilder()
.onMessage(ExampleMessage.class, this::onExampleMessage)
.build();
}
private Behavior<Message> onExampleMessage(ExampleMessage msg) {
getContext().getLog().info("I ({}) got a message: ExampleMessage({},{})", this.name, msg.someReference, msg.someString);
return this;
}
}

View File

@ -1,45 +0,0 @@
package com.example;
import akka.actor.typed.javadsl.TimerScheduler;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import java.time.Duration;
public class ExampleTimerActor extends AbstractBehavior<ExampleTimerActor.Message> {
public interface Message {};
public record ExampleMessage(String someString) implements Message { }
public static Behavior<Message> create() {
return Behaviors.setup(context -> Behaviors.withTimers(timers -> new ExampleTimerActor(context, timers)));
}
private final TimerScheduler<ExampleTimerActor.Message> timers;
private ExampleTimerActor(ActorContext<Message> context, TimerScheduler<ExampleTimerActor.Message> timers) {
super(context);
this.timers = timers;
Message msg = new ExampleMessage("test123");
this.timers.startSingleTimer(msg, msg, Duration.ofSeconds(10));
}
@Override
public Receive<Message> createReceive() {
return newReceiveBuilder()
.onMessage(ExampleMessage.class, this::onExampleMessage)
.build();
}
private Behavior<Message> onExampleMessage(ExampleMessage msg) {
getContext().getLog().info("I have send myself this message after 10 Seconds: {}", msg.someString);
return this;
}
}

View File

@ -0,0 +1,40 @@
package com.example;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
public class Producer extends AbstractBehavior<Producer.Message> {
public interface Message {};
public record Start(int amount) implements Message { }
public static Behavior<Message> create(ActorRef<Queue.Message> queue) {
return Behaviors.setup(context -> new Producer(context, queue));
}
private final ActorRef<Queue.Message> queue;
private Producer(ActorContext<Message> context, ActorRef<Queue.Message> queue) {
super(context);
this.queue = queue;
}
@Override
public Receive<Message> createReceive() {
return newReceiveBuilder()
.onMessage(Start.class, this::onStart)
.build();
}
private Behavior<Message> onStart(Start msg) {
for(int i = 0; i < msg.amount; i++) {
this.queue.tell(new Queue.Put(i));
}
return this;
}
}

View File

@ -0,0 +1,51 @@
package com.example;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import java.util.ArrayList;
public class Queue extends AbstractBehavior<Queue.Message> {
private final ArrayList<Integer> queue;
public interface Message {};
public record Get(ActorRef<Consumer.Message> replyTo) implements Message { }
public record Put(int i) implements Message { }
public static Behavior<Message> create() {
return Behaviors.setup(Queue::new);
}
private Queue(ActorContext<Message> context) {
super(context);
this.queue = new ArrayList<Integer>();
}
@Override
public Receive<Message> createReceive() {
return newReceiveBuilder()
.onMessage(Get.class, this::onGet)
.onMessage(Put.class, this::onPut)
.build();
}
private Behavior<Message> onPut(Put msg) {
this.queue.add(msg.i);
return this;
}
private Behavior<Message> onGet(Get msg) {
if (this.queue.isEmpty()) {
msg.replyTo.tell(Consumer.Error.EMPTY_QUEUE);
} else {
int i = this.queue.remove(0);
msg.replyTo.tell(new Consumer.Reply(i));
}
return this;
}
}