23
Parlons un peu de SEDA…
Bon encore un vague acronyme…Ça veut dire quoi au fait ? Staged Event Driven Architecture.
OK, encore un nouveau paradigme dans la mouvance DDD, TDD, MDA … ? Ben pas vraiment nouveau en fait, ça fait 10 ans que ça existe et c’est Matt Welsh qui lance le mouvement dans la année 2000 avec une publication (cf. http://www.eecs.harvard.edu/~mdw/papers/events.pdf).
Heu en 2 mots c’est quoi le principe ? Bon l’idée c’est de créer des architectures scalables. OK…comment ?
C’est la que ça devient intéressant …
Le principe est distinguer des « étapes ». Basiquement, on peut imaginer ces étapes comme étant les couches de notre architectures… Le principe est de faire communiquer ces couches de façon non-bloquantes en les faisant communiquer par des canaux dans lesquelles on publie des messages. De cette façon celui qui publie le message n’attend pas le retour et peut continuer son travail il sera avertit lorsque la réponse sera prête.
Comment ça marche ?
Dans les architectures en couches, chacune des couches délèguent aux couches inférieures un certain nombre de traitements. Dans une architectures synchrone le demandeur (stage 1) attend la réponse de l’exécuteur (stage 2) avant de continuer son travail. Ici les choses sont différentes, le demandeur envoie des demandes d’exécutions à l’exécuteur qui va les traiter à son rythme pendant que le demandeur poursuivra son travail en parallèle le temps d’obtenir d’obtenir la réponse demandée. C’est le concept présenté par le pattern request-reply des EAI (cf. http://www.eaipatterns.com/RequestReply.html).
Voyons cela en image:
Architecture synchrone
Ici, lorsque la couche A fait un appel à la couche B, celle-ce reste bloquée en attente de réponse.
Architecture asynchrone
Ici, la couche A publie une requête dans une canal de requête (request) et continue son cycle de vie pendant que la couche B traite en parallèle la requête. Une fois le traitement terminé, la couche B va publier la réponse dans un canal de réponse (reply). Le canal peut être une file d’attente JMS, une queue, une base de donnée, etc…
Les principaux intérêt de ce pattern résident dans la possibilité d’exécuter des tâches en parallèle et d’adapter facilement les producteurs/consommateurs en fonction de l’état des files d’attentes ce qui rend ce type d’architecture hautement scalable:
Aujourd’hui de nombreux acteurs proposent des implémentations de ce concept (Mule, Spring Integration, service Mix, …).
Je propose une petite librairie qui utilise des bloquing queue pour établir ses canaux de communication. C’est une classe abstraite qu’il suffit d’étendre pour permettre de paralléliser de façon asynchrone un traitement synchrone.
Exemple d’un appel synchrone
Prenons l’exemple d’une classe SampleSynchone qui fait appel à un service VeryLongLegacyProcess donnant la météo en fonction du jour passé en paramètre. Le temps d’exécution du service est de 5 secondes.
La classe appelante choisi de prendre un parapluie ou une casquette en fonction du temps. Une première implémentation donnerait le code suivant:
La classe SampleSynchone:
public class SampleSynchrone {
VeryLongLegacyProcess veryLongProcess = new VeryLongLegacyProcess();
public void howDoIDress(EDay day) throws InterruptedException{
EWeather weather = veryLongProcess.whatTheWeather(day); //1 seconde
String clothe = "";
if (EWeather.RAINNY.equals(weather)){
clothe = "umbrella";
}else{
clothe = "cap";
}
System.out.println(day+" I will take my "+clothe);
}
}
La classe Main qui va appeler le traitement:
public class Main {
public static void main (String[] args){
System.out.println("current time:"+System.currentTimeMillis());
LayerA layerA = new LayerA();
layerA.howDoIDress(EDay.MONDAY);
layerA.howDoIDress(EDay.TUESDAY);
layerA.howDoIDress(EDay.WEDNESDAY);
layerA.howDoIDress(EDay.THURSDAY);
layerA.howDoIDress(EDay.FRIDAY);
}
Execution
current time:1272183941674 MONDAY I will take my umbrella(current time:1272183946682) TUESDAY I will take my cap(current time:1272183951682) WEDNESDAY I will take my umbrella(current time:1272183956683) THURSDAY I will take my cap(current time:1272183961683) FRIDAY I will take my umbrella(current time:1272183966683) job execution: 25 009 ms
=> L’exécution du traitement pour chacun des jours de la semaine prend 25 secondes (5 fois 5 secondes).
Exemple Asynchrone
Nous allons maintenant essayer de réduire la fenêtre temporelle de la classe main en parallélisant les appels au service en étendant la classe Parallelizable.
La classe parallelisable:
public class Sample extends Parallelizable<EDay, EWeather>{
VeryLongLegacyProcess veryLongLegacyProcess = new VeryLongLegacyProcess();
public Sample(int nbExecutors) throws InstantiationException,
IllegalAccessException, IllegalArgumentException,
SecurityException, InvocationTargetException, NoSuchMethodException {
super(nbExecutors);
}
@Override
public void callback(Reply<EDay, EWeather> reply) {
String clothe = "";
if (EWeather.RAINNY.equals(reply.getOut())){//using results for next process
clothe = "umbrella";
}else{
clothe = "cap";
}
System.out.println(reply.getIn()+" I will take my "+clothe);//using input parameters     Â
}
@Override
public EWeather processRequest(EDay in) throws InterruptedException {
return veryLongLegacyProcess.whatTheWeather((EDay)in);
}
}
La classe Main qui va appeler le traitement:
public class Main { public static void main (String[] args){System.out.println("current time:"+System.currentTimeMillis());Sample sample = new Sample(5);sample.request(EDay.MONDAY);sample.request(EDay.TUESDAY);sample.request(EDay.WEDNESDAY);sample.request(EDay.THURSDAY);sample.request(EDay.FRIDAY);}}
Execution
current time:1272184056048 WEDNESDAY I will take my umbrella(current time:1272184061059) FRIDAY I will take my umbrella(current time:1272184061059) THURSDAY I will take my cap(current time:1272184061059) TUESDAY I will take my cap(current time:1272184061059) MONDAY I will take my umbrella(current time:1272184061059)job execution: 5 011 ms
La classe Parallelizable va créer un pool de threads exécuteurs, donnée en paramètre au niveau du constructeur (Sample sample = new Sample(5);). Ces threads seront mis en écoute à la première requête publiée dans le canal de requête (avec la méthode request (sample.request(EDay.MONDAY)). Chacun des thread va dépiler les messages envoyés et les traiter. La réponse sera ensuite publiée dans un canal de réponse, dépilée et envoyée dans la méthode callback qui permettra de terminer le traitement en choisissant le vêtement approprié.
=> L’exécution du traitement pour chacun des jours de la semaine prend cette fois 5 secondes du fait de leur exécution parallèle.
L’exemple, la librairie et les sources sont publiés sous la licence Apache V2 sur Source Forge https://sourceforge.net/projects/parallelizable/