Select Git revision
Administration.html
MessageManager.java 2.27 KiB
package ch.hepia.mq;
import ch.hepia.events.*;
import java.io.Serializable;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class MessageManager extends MessageQueue implements Serializable {
public MessageManager(String host, String username, String password, String exchange) throws Exception {
super(host, username, password, exchange);
}
/*
* Private functions
*/
private <T extends Serializable> void sendEvent(MessageType type, T event) {
Message m = new Message(type, event);
try {
this.sendBytes(MessageQueue.serialize(m));
} catch (final Exception e) {
e.printStackTrace();
}
}
private <T extends Serializable> void conditionalSubscribe(MessageType type, Consumer<T> eventHandler,
Predicate<T> condition) {
Consumer<byte[]> consumer = (bytes) -> {
Message receivedMessage = MessageQueue.unserialize(bytes);
if (receivedMessage.getMessageType() == type) {
T event = receivedMessage.getData();
if (condition.test(event)) {
eventHandler.accept(event);
}
}
};
this.addConsumer(consumer);
}
/*
* Public functions
*/
public void subscribeJoinedJourney(Consumer<JoinedJourney> eventHandler) {
this.conditionalSubscribeJoinedJourney(eventHandler, (event) -> true);
}
public void subscribeLeftJourney(Consumer<LeftJourney> eventHandler) {
this.conditionalSubscribeLeftJourney(eventHandler, (event) -> true);
}
public void conditionalSubscribeJoinedJourney(Consumer<JoinedJourney> eventHandler,
Predicate<JoinedJourney> condition) {
this.conditionalSubscribe(MessageType.JoinedJourney, eventHandler, condition);
}
public void conditionalSubscribeLeftJourney(Consumer<LeftJourney> eventHandler, Predicate<LeftJourney> condition) {
this.conditionalSubscribe(MessageType.LeftJourney, eventHandler, condition);
}
public void sendJoinedJourney(JoinedJourney event) {
this.sendEvent(MessageType.JoinedJourney, event);
}
public void sendLeftJourney(LeftJourney event) {
this.sendEvent(MessageType.LeftJourney, event);
}
}