diff --git a/src/main/java/ch/hepia/TesterClass.java b/src/main/java/ch/hepia/TesterClass.java index a431038516849ec856c7476bff43412a553a550a..18722155130aa5460cb412186ef4adf05c7ffce2 100644 --- a/src/main/java/ch/hepia/TesterClass.java +++ b/src/main/java/ch/hepia/TesterClass.java @@ -9,65 +9,9 @@ import java.util.function.Consumer; import java.util.*; import ch.hepia.DummyClass; -import ch.hepia.mq.Message; import ch.hepia.mq.MessageQueue; public class TesterClass { public static void main(String[] argv) throws Exception{ - //Old - MessageQueue myMQ = new MessageQueue("redgrave.science", "frog", "poney1234", "broadcaster"); - - myMQ.addConsumer((bytes) -> { - System.out.println(new String(bytes)); - }); - myMQ.addConsumer((bytes) -> { - System.out.println("2ème handler-"+new String(bytes)); - }); - String message; - while(true){ - message = (new Scanner(System.in)).nextLine(); - if(message.equals("exit")){ - break; - } - myMQ.sendBytes(message.getBytes()); - } - myMQ.close(); - /* - final DummyClass dc = new DummyClass(); - final DummyClass out_dc; - System.out.println(""); - - ObjectOutputStream oos = null; - - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - ByteArrayInputStream inputStream; - try { - oos = new ObjectOutputStream(byteStream); - oos.writeObject(dc); - oos.flush(); - - inputStream = new ByteArrayInputStream(byteStream.toByteArray()); - - ois = new ObjectInputStream(inputStream); - - out_dc = (DummyClass)ois.readObject(); - System.out.println(dc.getProp0() + "" + Integer.toString(dc.prop1)); - System.out.println(out_dc.getProp0() + "" + Integer.toString(out_dc.prop1)); - } catch (final Exception e) { - e.printStackTrace(); - } finally { - try { - if (oos != null) { - oos.flush(); - oos.close(); - } - if (ois != null) { - ois.close(); - } - } catch (final java.io.IOException e) { - e.printStackTrace(); - } - } - */ } } \ No newline at end of file diff --git a/src/main/java/ch/hepia/mq/MessageManager.java b/src/main/java/ch/hepia/mq/MessageManager.java index 12242ca1cf3a45a3882db358a7990cea7dae3d5a..0bc3c17ca7c476a8332e5e3f11692474c339e359 100644 --- a/src/main/java/ch/hepia/mq/MessageManager.java +++ b/src/main/java/ch/hepia/mq/MessageManager.java @@ -35,9 +35,9 @@ public class MessageManager extends MessageQueue { } /* - Private functions - */ - private <T extends Serializable> void sendEvent(MessageType type, T event){ + * Private functions + */ + private <T extends Serializable> void sendEvent(MessageType type, T event) { Message m = new Message(type, event); try { this.sendBytes(MessageQueue.serialize(m)); @@ -46,35 +46,38 @@ public class MessageManager extends MessageQueue { } } + private <T extends Serializable> void conditionalSubscribe(MessageType type, Consumer<T> eventHandler, + Predicate<T> condition) { + Consumer<byte[]> consumer = (bytes) -> { + Message receivedMessage = MessageQueue.unserialize(bytes); + if (receivedMessage.type == type) { + T event = receivedMessage.getData(); + if (condition.test(event)) { + eventHandler.accept(event); + } + } + }; + this.addConsumer(consumer); + } + /* - Public functions - */ - public void suscribeJoinedJourney(Consumer<JoinedJourney> eventHandler){ - this.conditionalSuscribeJoinedJourney(eventHandler, (event) -> true); + * Public functions + */ + public void subscribeJoinedJourney(Consumer<JoinedJourney> eventHandler) { + this.conditionalSubscribeJoinedJourney(eventHandler, (event) -> true); } - public void suscribeLeftJourney(Consumer<LeftJourney> eventHandler){ - this.conditionalSuscribeLeftJourney(eventHandler, (event) -> true); + public void subscribeLeftJourney(Consumer<LeftJourney> eventHandler) { + this.conditionalSubscribeLeftJourney(eventHandler, (event) -> true); } - public void conditionalSuscribeJoinedJourney(Consumer<JoinedJourney> eventHandler, Predicate<JoinedJourney> condition){ - Consumer<byte[]> consumer = (bytes) -> { - Message receivedMessage = MessageQueue.unserialize(bytes); - eventHandler.accept(receivedMessage.getData()); - }; - Predicate<byte[]> predicate = (bytes) -> { - Message receivedMessage = MessageQueue.unserialize(bytes); - if(receivedMessage.type == MessageType.JoinedJourney){ - return condition.test(receivedMessage.getData()); - }else{ - return false; - } - }; - this.addConditionalConsumer(consumer, predicate); + public void conditionalSubscribeJoinedJourney(Consumer<JoinedJourney> eventHandler, + Predicate<JoinedJourney> condition) { + this.conditionalSubscribe(MessageType.JoinedJourney, eventHandler, condition); } - public void conditionalSuscribeLeftJourney(Consumer<LeftJourney> eventHandler, Predicate<LeftJourney> condition){ - + public void conditionalSubscribeLeftJourney(Consumer<LeftJourney> eventHandler, Predicate<LeftJourney> condition) { + this.conditionalSubscribe(MessageType.LeftJourney, eventHandler, condition); } public void sendJoinedJourney(JoinedJourney event) { diff --git a/src/main/java/ch/hepia/mq/MessageQueue.java b/src/main/java/ch/hepia/mq/MessageQueue.java index 3f7e9c145ca38260954ff62bb63287ab5528da77..e898fb70d28a3ff108c8d1aa950da825575c7cc5 100644 --- a/src/main/java/ch/hepia/mq/MessageQueue.java +++ b/src/main/java/ch/hepia/mq/MessageQueue.java @@ -1,11 +1,8 @@ package ch.hepia.mq; -import java.io.Serializable; -import java.util.Map; -import java.util.TreeMap; -import java.util.Map.Entry; +import java.util.ArrayList; +import java.util.List; import java.util.function.Consumer; -import java.util.function.Predicate; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.io.ObjectInputStream; @@ -21,10 +18,10 @@ public abstract class MessageQueue { private final String exchange; private final Connection mqConnection; private final Channel mqChannel; - private final Map<Consumer<byte[]>, Predicate<byte[]>> consumers; + private final List<Consumer<byte[]>> consumers; public MessageQueue(String host, String username, String password, String exchange) throws Exception { - this.consumers = new TreeMap<>(); + this.consumers = new ArrayList<>(); this.exchange = exchange; ConnectionFactory queueConnector = new ConnectionFactory(); queueConnector.setHost(host); @@ -36,23 +33,16 @@ public abstract class MessageQueue { String queueName = this.mqChannel.queueDeclare().getQueue(); this.mqChannel.queueBind(queueName, this.exchange, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { - this.consumers.forEach(((consumer, predicate) -> { - if(predicate.test(delivery.getBody())){ - consumer.accept(delivery.getBody()); - } + for (Consumer<byte[]> consumer : this.consumers) { + consumer.accept(delivery.getBody()); } - )); }; this.mqChannel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } - protected void addConsumer(Consumer<byte[]> messageHandler){ - this.addConditionalConsumer(messageHandler, (bytes) -> true); - } - - protected void addConditionalConsumer(Consumer<byte[]> messageHandler, Predicate<byte[]> condition) { - this.consumers.put(messageHandler, condition); + protected void addConsumer(Consumer<byte[]> messageHandler) { + this.consumers.add(messageHandler); } protected void sendBytes(byte[] bytes) throws Exception { @@ -75,7 +65,7 @@ public abstract class MessageQueue { } catch (final Exception e) { e.printStackTrace(); }finally{ - //Todo + // TODO System.exit(1); } return null; @@ -89,6 +79,9 @@ public abstract class MessageQueue { result = (T) ois.readObject(); } catch (final Exception e) { e.printStackTrace(); + }finally{ + // TODO + System.exit(1); } return result; }