diff --git a/src/main/java/ch/hepia/mq/MessageManager.java b/src/main/java/ch/hepia/mq/MessageManager.java index 534d03451c888d54264c2e3acab57316431c115d..12242ca1cf3a45a3882db358a7990cea7dae3d5a 100644 --- a/src/main/java/ch/hepia/mq/MessageManager.java +++ b/src/main/java/ch/hepia/mq/MessageManager.java @@ -1,54 +1,87 @@ package ch.hepia.mq; -import java.io.ByteArrayOutputStream; +import ch.hepia.events.*; + import java.io.Serializable; -import java.io.ObjectOutputStream; -import java.io.ObjectInputStream; -import java.io.ByteArrayInputStream; +import java.util.function.Consumer; +import java.util.function.Predicate; -public class MessageManager extends MessageQueue{ - private enum MessageType{ - JoinedJourney, - LeftJourney +public class MessageManager extends MessageQueue { + private enum MessageType { + JoinedJourney, LeftJourney } - private final class Message implements Serializable{ + + private final class Message implements Serializable { private static final long serialVersionUID = 0xAEF34565673L; private final MessageType type; private byte[] data; - public <T extends Serializable> Message(MessageType type, T object){ + public <T extends Serializable> Message(MessageType type, T object) { this.type = type; - - ObjectOutputStream oos = null; - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - try { - oos = new ObjectOutputStream(byteStream); - oos.writeObject(object); - oos.flush(); - this.data = byteStream.toByteArray(); - } catch (final Exception e) { - e.printStackTrace(); - } + this.data = MessageQueue.serialize(object); } - public MessageType getMessageType(){ + public MessageType getMessageType() { return this.type; } - public <T> T getData(){ - ByteArrayInputStream inputStream = new ByteArrayInputStream(this.data); - T result = null; - try { - ObjectInputStream ois = new ObjectInputStream(inputStream); - result = (T) ois.readObject(); - } catch (final Exception e) { - e.printStackTrace(); - } - return result; + public <T> T getData() { + return MessageQueue.unserialize(this.data); } } - - public MessageManager(String host, String username, String password, String exchange) throws Exception{ + + public MessageManager(String host, String username, String password, String exchange) throws Exception { super(host, username, password, exchange); } + + /* + Private functions + */ + private <T extends Serializable> void sendEvent(MessageType type, T event){ + Message m = new Message(type, event); + try { + this.sendBytes(MessageQueue.serialize(m)); + } catch (final Exception e) { + e.printStackTrace(); + } + } + + /* + Public functions + */ + public void suscribeJoinedJourney(Consumer<JoinedJourney> eventHandler){ + this.conditionalSuscribeJoinedJourney(eventHandler, (event) -> true); + } + + public void suscribeLeftJourney(Consumer<LeftJourney> eventHandler){ + this.conditionalSuscribeLeftJourney(eventHandler, (event) -> true); + } + + public void conditionalSuscribeJoinedJourney(Consumer<JoinedJourney> eventHandler, Predicate<JoinedJourney> condition){ + Consumer<byte[]> consumer = (bytes) -> { + Message receivedMessage = MessageQueue.unserialize(bytes); + eventHandler.accept(receivedMessage.getData()); + }; + Predicate<byte[]> predicate = (bytes) -> { + Message receivedMessage = MessageQueue.unserialize(bytes); + if(receivedMessage.type == MessageType.JoinedJourney){ + return condition.test(receivedMessage.getData()); + }else{ + return false; + } + }; + this.addConditionalConsumer(consumer, predicate); + } + + public void conditionalSuscribeLeftJourney(Consumer<LeftJourney> eventHandler, Predicate<LeftJourney> condition){ + + } + + public void sendJoinedJourney(JoinedJourney event) { + this.sendEvent(MessageType.JoinedJourney, event); + } + + public void sendLeftJourney(LeftJourney event) { + this.sendEvent(MessageType.LeftJourney, event); + } } \ No newline at end of file diff --git a/src/main/java/ch/hepia/mq/MessageQueue.java b/src/main/java/ch/hepia/mq/MessageQueue.java index 4d600171d5c1e72e09bea33dec71db86a43f6a92..3f7e9c145ca38260954ff62bb63287ab5528da77 100644 --- a/src/main/java/ch/hepia/mq/MessageQueue.java +++ b/src/main/java/ch/hepia/mq/MessageQueue.java @@ -1,8 +1,16 @@ package ch.hepia.mq; -import java.util.ArrayList; -import java.util.List; +import java.io.Serializable; +import java.util.Map; +import java.util.TreeMap; +import java.util.Map.Entry; import java.util.function.Consumer; +import java.util.function.Predicate; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectInputStream; +import java.io.ByteArrayInputStream; +import java.io.Serializable; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Channel; @@ -13,10 +21,10 @@ public abstract class MessageQueue { private final String exchange; private final Connection mqConnection; private final Channel mqChannel; - private final List<Consumer<byte[]>> consumers; + private final Map<Consumer<byte[]>, Predicate<byte[]>> consumers; public MessageQueue(String host, String username, String password, String exchange) throws Exception { - this.consumers = new ArrayList<>(); + this.consumers = new TreeMap<>(); this.exchange = exchange; ConnectionFactory queueConnector = new ConnectionFactory(); queueConnector.setHost(host); @@ -28,16 +36,23 @@ public abstract class MessageQueue { String queueName = this.mqChannel.queueDeclare().getQueue(); this.mqChannel.queueBind(queueName, this.exchange, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { - for (Consumer<byte[]> consumer : this.consumers) { - consumer.accept(delivery.getBody()); + this.consumers.forEach(((consumer, predicate) -> { + if(predicate.test(delivery.getBody())){ + consumer.accept(delivery.getBody()); + } } + )); }; this.mqChannel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } - protected void addConsumer(Consumer<byte[]> messageHandler) { - this.consumers.add(messageHandler); + protected void addConsumer(Consumer<byte[]> messageHandler){ + this.addConditionalConsumer(messageHandler, (bytes) -> true); + } + + protected void addConditionalConsumer(Consumer<byte[]> messageHandler, Predicate<byte[]> condition) { + this.consumers.put(messageHandler, condition); } protected void sendBytes(byte[] bytes) throws Exception { @@ -48,4 +63,33 @@ public abstract class MessageQueue { this.mqChannel.close(); this.mqConnection.close(); } + + public static <T extends Serializable> byte[] serialize(T object){ + ObjectOutputStream oos = null; + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + try { + oos = new ObjectOutputStream(byteStream); + oos.writeObject(object); + oos.flush(); + return byteStream.toByteArray(); + } catch (final Exception e) { + e.printStackTrace(); + }finally{ + //Todo + System.exit(1); + } + return null; + } + + public static <T extends Serializable> T unserialize(byte[] serializedData){ + ByteArrayInputStream inputStream = new ByteArrayInputStream(serializedData); + T result = null; + try { + ObjectInputStream ois = new ObjectInputStream(inputStream); + result = (T) ois.readObject(); + } catch (final Exception e) { + e.printStackTrace(); + } + return result; + } } \ No newline at end of file