diff --git a/src/main/java/com/example/AkkaMainSystem.java b/src/main/java/com/example/AkkaMainSystem.java index 868351b..fbc5d90 100644 --- a/src/main/java/com/example/AkkaMainSystem.java +++ b/src/main/java/com/example/AkkaMainSystem.java @@ -24,11 +24,13 @@ public class AkkaMainSystem extends AbstractBehavior { private Behavior onCreate(Create command) { //#create-actors - ActorRef a = this.getContext().spawn(ExampleActor.create("Alice"), "alice"); - ActorRef b = this.getContext().spawn(ExampleTimerActor.create(), "timeractor"); + ActorRef queue = this.getContext().spawn(Queue.create(), "queue"); + ActorRef producer = this.getContext().spawn(Producer.create(queue), "producer"); + ActorRef cFactory = this.getContext().spawn(ConsumerFactory.create(queue), "cFactory"); //#create-actors - a.tell(new ExampleActor.ExampleMessage(this.getContext().getSelf(),"Test123")); + producer.tell(new Producer.Start(90)); + cFactory.tell(new ConsumerFactory.Start(100)); return this; } } diff --git a/src/main/java/com/example/Consumer.java b/src/main/java/com/example/Consumer.java new file mode 100644 index 0000000..451a261 --- /dev/null +++ b/src/main/java/com/example/Consumer.java @@ -0,0 +1,46 @@ +package com.example; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +public class Consumer extends AbstractBehavior { + + public interface Message {}; + + public record Reply(int i) implements Message { } + public enum Error implements Message { + EMPTY_QUEUE + } + + public static Behavior create(ActorRef queue) { + return Behaviors.setup(context -> new Consumer(context, queue)); + } + + private final ActorRef queue; + + private Consumer(ActorContext context, ActorRef queue) { + super(context); + this.queue = queue; + this.queue.tell(new Queue.Get(this.getContext().getSelf())); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Reply.class, this::onReply) + .onMessage(Error.class, this::onError) + .build(); + } + private Behavior onError(Error msg) { + this.getContext().getLog().info("ERROR: {}", msg.toString()); + return this; + } + private Behavior onReply(Reply msg) { + this.getContext().getLog().info("Got: {}", msg.i); + return this; + } +} diff --git a/src/main/java/com/example/ConsumerFactory.java b/src/main/java/com/example/ConsumerFactory.java new file mode 100644 index 0000000..79ca92c --- /dev/null +++ b/src/main/java/com/example/ConsumerFactory.java @@ -0,0 +1,40 @@ +package com.example; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +public class ConsumerFactory extends AbstractBehavior { + + public interface Message {}; + + public record Start(int amount) implements Message { } + + public static Behavior create(ActorRef queue) { + return Behaviors.setup(context -> new ConsumerFactory(context, queue)); + } + + private final ActorRef queue; + + private ConsumerFactory(ActorContext context, ActorRef queue) { + super(context); + this.queue = queue; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Start.class, this::onStart) + .build(); + } + + private Behavior onStart(Start msg) { + for(int i=0; i < msg.amount; i++) { + this.getContext().spawnAnonymous(Consumer.create(this.queue)); + } + return this; + } +} diff --git a/src/main/java/com/example/ExampleActor.java b/src/main/java/com/example/ExampleActor.java deleted file mode 100644 index cf56713..0000000 --- a/src/main/java/com/example/ExampleActor.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.example; - -import akka.actor.typed.ActorRef; -import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.*; - -public class ExampleActor extends AbstractBehavior { - - public interface Message {}; - - public record ExampleMessage(ActorRef someReference, String someString) implements Message { } - - public static Behavior create(String name) { - return Behaviors.setup(context -> new ExampleActor(context, name)); - } - - private final String name; - - private ExampleActor(ActorContext context, String name) { - super(context); - this.name = name; - } - - @Override - public Receive createReceive() { - return newReceiveBuilder() - .onMessage(ExampleMessage.class, this::onExampleMessage) - .build(); - } - - private Behavior onExampleMessage(ExampleMessage msg) { - getContext().getLog().info("I ({}) got a message: ExampleMessage({},{})", this.name, msg.someReference, msg.someString); - return this; - } -} diff --git a/src/main/java/com/example/ExampleTimerActor.java b/src/main/java/com/example/ExampleTimerActor.java deleted file mode 100644 index 0bffbea..0000000 --- a/src/main/java/com/example/ExampleTimerActor.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.example; - -import akka.actor.typed.javadsl.TimerScheduler; -import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.AbstractBehavior; -import akka.actor.typed.javadsl.ActorContext; -import akka.actor.typed.javadsl.Behaviors; -import akka.actor.typed.javadsl.Receive; - -import java.time.Duration; - - -public class ExampleTimerActor extends AbstractBehavior { - - public interface Message {}; - - - public record ExampleMessage(String someString) implements Message { } - - public static Behavior create() { - return Behaviors.setup(context -> Behaviors.withTimers(timers -> new ExampleTimerActor(context, timers))); - } - - private final TimerScheduler timers; - - private ExampleTimerActor(ActorContext context, TimerScheduler timers) { - super(context); - this.timers = timers; - - Message msg = new ExampleMessage("test123"); - this.timers.startSingleTimer(msg, msg, Duration.ofSeconds(10)); - } - - @Override - public Receive createReceive() { - return newReceiveBuilder() - .onMessage(ExampleMessage.class, this::onExampleMessage) - .build(); - } - - private Behavior onExampleMessage(ExampleMessage msg) { - getContext().getLog().info("I have send myself this message after 10 Seconds: {}", msg.someString); - return this; - } -} diff --git a/src/main/java/com/example/Producer.java b/src/main/java/com/example/Producer.java new file mode 100644 index 0000000..5db7823 --- /dev/null +++ b/src/main/java/com/example/Producer.java @@ -0,0 +1,40 @@ +package com.example; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +public class Producer extends AbstractBehavior { + + public interface Message {}; + + public record Start(int amount) implements Message { } + + public static Behavior create(ActorRef queue) { + return Behaviors.setup(context -> new Producer(context, queue)); + } + + private final ActorRef queue; + + private Producer(ActorContext context, ActorRef queue) { + super(context); + this.queue = queue; + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Start.class, this::onStart) + .build(); + } + + private Behavior onStart(Start msg) { + for(int i = 0; i < msg.amount; i++) { + this.queue.tell(new Queue.Put(i)); + } + return this; + } +} diff --git a/src/main/java/com/example/Queue.java b/src/main/java/com/example/Queue.java new file mode 100644 index 0000000..672fb6d --- /dev/null +++ b/src/main/java/com/example/Queue.java @@ -0,0 +1,51 @@ +package com.example; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +import java.util.ArrayList; + +public class Queue extends AbstractBehavior { + + private final ArrayList queue; + + public interface Message {}; + + public record Get(ActorRef replyTo) implements Message { } + public record Put(int i) implements Message { } + + public static Behavior create() { + return Behaviors.setup(Queue::new); + } + + private Queue(ActorContext context) { + super(context); + this.queue = new ArrayList(); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Get.class, this::onGet) + .onMessage(Put.class, this::onPut) + .build(); + } + + private Behavior onPut(Put msg) { + this.queue.add(msg.i); + return this; + } + private Behavior onGet(Get msg) { + if (this.queue.isEmpty()) { + msg.replyTo.tell(Consumer.Error.EMPTY_QUEUE); + } else { + int i = this.queue.remove(0); + msg.replyTo.tell(new Consumer.Reply(i)); + } + return this; + } +}