Skip to content
Snippets Groups Projects
Verified Commit 359b56f5 authored by Michaël El Kharroubi's avatar Michaël El Kharroubi :satellite:
Browse files

MessageManager ready for testing

parent 8cd1579a
No related branches found
No related tags found
1 merge request!21*poof* final version
......@@ -9,65 +9,9 @@ import java.util.function.Consumer;
import java.util.*;
import ch.hepia.DummyClass;
import ch.hepia.mq.Message;
import ch.hepia.mq.MessageQueue;
public class TesterClass {
public static void main(String[] argv) throws Exception{
//Old
MessageQueue myMQ = new MessageQueue("redgrave.science", "frog", "poney1234", "broadcaster");
myMQ.addConsumer((bytes) -> {
System.out.println(new String(bytes));
});
myMQ.addConsumer((bytes) -> {
System.out.println("2ème handler-"+new String(bytes));
});
String message;
while(true){
message = (new Scanner(System.in)).nextLine();
if(message.equals("exit")){
break;
}
myMQ.sendBytes(message.getBytes());
}
myMQ.close();
/*
final DummyClass dc = new DummyClass();
final DummyClass out_dc;
System.out.println("");
ObjectOutputStream oos = null;
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
ByteArrayInputStream inputStream;
try {
oos = new ObjectOutputStream(byteStream);
oos.writeObject(dc);
oos.flush();
inputStream = new ByteArrayInputStream(byteStream.toByteArray());
ois = new ObjectInputStream(inputStream);
out_dc = (DummyClass)ois.readObject();
System.out.println(dc.getProp0() + "" + Integer.toString(dc.prop1));
System.out.println(out_dc.getProp0() + "" + Integer.toString(out_dc.prop1));
} catch (final Exception e) {
e.printStackTrace();
} finally {
try {
if (oos != null) {
oos.flush();
oos.close();
}
if (ois != null) {
ois.close();
}
} catch (final java.io.IOException e) {
e.printStackTrace();
}
}
*/
}
}
\ No newline at end of file
......@@ -35,9 +35,9 @@ public class MessageManager extends MessageQueue {
}
/*
Private functions
*/
private <T extends Serializable> void sendEvent(MessageType type, T event){
* Private functions
*/
private <T extends Serializable> void sendEvent(MessageType type, T event) {
Message m = new Message(type, event);
try {
this.sendBytes(MessageQueue.serialize(m));
......@@ -46,35 +46,38 @@ public class MessageManager extends MessageQueue {
}
}
private <T extends Serializable> void conditionalSubscribe(MessageType type, Consumer<T> eventHandler,
Predicate<T> condition) {
Consumer<byte[]> consumer = (bytes) -> {
Message receivedMessage = MessageQueue.unserialize(bytes);
if (receivedMessage.type == type) {
T event = receivedMessage.getData();
if (condition.test(event)) {
eventHandler.accept(event);
}
}
};
this.addConsumer(consumer);
}
/*
Public functions
*/
public void suscribeJoinedJourney(Consumer<JoinedJourney> eventHandler){
this.conditionalSuscribeJoinedJourney(eventHandler, (event) -> true);
* Public functions
*/
public void subscribeJoinedJourney(Consumer<JoinedJourney> eventHandler) {
this.conditionalSubscribeJoinedJourney(eventHandler, (event) -> true);
}
public void suscribeLeftJourney(Consumer<LeftJourney> eventHandler){
this.conditionalSuscribeLeftJourney(eventHandler, (event) -> true);
public void subscribeLeftJourney(Consumer<LeftJourney> eventHandler) {
this.conditionalSubscribeLeftJourney(eventHandler, (event) -> true);
}
public void conditionalSuscribeJoinedJourney(Consumer<JoinedJourney> eventHandler, Predicate<JoinedJourney> condition){
Consumer<byte[]> consumer = (bytes) -> {
Message receivedMessage = MessageQueue.unserialize(bytes);
eventHandler.accept(receivedMessage.getData());
};
Predicate<byte[]> predicate = (bytes) -> {
Message receivedMessage = MessageQueue.unserialize(bytes);
if(receivedMessage.type == MessageType.JoinedJourney){
return condition.test(receivedMessage.getData());
}else{
return false;
}
};
this.addConditionalConsumer(consumer, predicate);
public void conditionalSubscribeJoinedJourney(Consumer<JoinedJourney> eventHandler,
Predicate<JoinedJourney> condition) {
this.conditionalSubscribe(MessageType.JoinedJourney, eventHandler, condition);
}
public void conditionalSuscribeLeftJourney(Consumer<LeftJourney> eventHandler, Predicate<LeftJourney> condition){
public void conditionalSubscribeLeftJourney(Consumer<LeftJourney> eventHandler, Predicate<LeftJourney> condition) {
this.conditionalSubscribe(MessageType.LeftJourney, eventHandler, condition);
}
public void sendJoinedJourney(JoinedJourney event) {
......
package ch.hepia.mq;
import java.io.Serializable;
import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
......@@ -21,10 +18,10 @@ public abstract class MessageQueue {
private final String exchange;
private final Connection mqConnection;
private final Channel mqChannel;
private final Map<Consumer<byte[]>, Predicate<byte[]>> consumers;
private final List<Consumer<byte[]>> consumers;
public MessageQueue(String host, String username, String password, String exchange) throws Exception {
this.consumers = new TreeMap<>();
this.consumers = new ArrayList<>();
this.exchange = exchange;
ConnectionFactory queueConnector = new ConnectionFactory();
queueConnector.setHost(host);
......@@ -36,23 +33,16 @@ public abstract class MessageQueue {
String queueName = this.mqChannel.queueDeclare().getQueue();
this.mqChannel.queueBind(queueName, this.exchange, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
this.consumers.forEach(((consumer, predicate) -> {
if(predicate.test(delivery.getBody())){
consumer.accept(delivery.getBody());
}
for (Consumer<byte[]> consumer : this.consumers) {
consumer.accept(delivery.getBody());
}
));
};
this.mqChannel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
protected void addConsumer(Consumer<byte[]> messageHandler){
this.addConditionalConsumer(messageHandler, (bytes) -> true);
}
protected void addConditionalConsumer(Consumer<byte[]> messageHandler, Predicate<byte[]> condition) {
this.consumers.put(messageHandler, condition);
protected void addConsumer(Consumer<byte[]> messageHandler) {
this.consumers.add(messageHandler);
}
protected void sendBytes(byte[] bytes) throws Exception {
......@@ -75,7 +65,7 @@ public abstract class MessageQueue {
} catch (final Exception e) {
e.printStackTrace();
}finally{
//Todo
// TODO
System.exit(1);
}
return null;
......@@ -89,6 +79,9 @@ public abstract class MessageQueue {
result = (T) ois.readObject();
} catch (final Exception e) {
e.printStackTrace();
}finally{
// TODO
System.exit(1);
}
return result;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment