Cet article est une traduction de « The introduction to Reactive Programming you've been missing » posté par André Staltz. Même s'il est un peu long, j'ai apprécié le niveau et le style de l'article, je le traduis donc ici pour en faire profiter les personnes plus à l'aise en français qu'en l'anglais. Dans le tutoriel, la première personne représente donc André Staltz. Les parties apparaissant comme ce paragraphe (en vert) sont des ajouts et commentaires de ma part.

L'introduction qu'il vous manquait pour enfin comprendre la programmation réactive

Alors, comme ça, vous êtes curieux au sujet du nouveau truc à la mode appelé Programmation Réactive Fonctionnelle (PRF) ? L'article utilise le terme « fonctionnelle » mais pour certains puristes, l'article est plutôt au sujet de la programmation réactive normale.

Il est difficile voire très difficile d'apprendre la PRF, principalement à cause du manque de bonnes ressources d'apprentissage. Quand j'ai commencé, j'ai d'abord cherché des tutoriaux. Je n'ai trouvé que des guides pratiques très introductifs et bas niveau où on ne construisait jamais une architecture complète. J'ai aussi regardé du coté des documentations des bibliothèques qui n'aident pas vraiment… Je veux dire, sérieux, regardez ça: même si on finit par pouvoir comprendre avec le temps

Rx.Observable.prototype.flatMapLatest(selector, [thisArg])

Projects each element of an observable sequence into a new sequence of observable sequences by incorporating the element's index and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.

Ah la vache ! WAT !?

J'ai lu deux livres : un qui ne présentait qu'une vue d'ensemble et un autre qui expliquait plutôt comment utiliser une bibliothèque précise. J'ai fini par apprendre la programmation réactive à la dure : en essayant de faire des choses avec. À mon boulot à Futurice, j'ai été amené à appliquer la PR dans de vrais projets avec l'aide de certains de mes collègues quand j'avais des problèmes.

Un peu de background de mon coté. Bien que pratiquant principalement la programmation impérative et objet, j'ai appris la programmation fonctionnelle avec « Caml », assez tôt. J'y suis revenu avec les cours de Martin Oderski (programmation fonctionnelle en Scala, …). Entre temps j'ai beaucoup travaillé avec sur des projets de perception par ordinateur temps réel. Dans ce contexte, les approches type Dataflow (« boîtes et flux de données ») sont adaptées et en elles sont en fait très proches de la programmation réactive de l'article.

La partie la plus dure dans l'apprentissage est d'apprendre à penser en PRF. Il faut principalement arriver à lâcher ses habitudes de programmation impérative et accepter d'essayer de se plonger dans le nouveau paradigme. Je n'ai pas trouvé de bon guide en ligne pour aider dans cette transition vers la programmation réactive et fonctionnelle. Je pense que cela mérite un tutoriel pratique pour aider les gens à démarrer. Les documentations existantes (de bibliothèques) devraient alors pouvoir éclairer la suite du périple. J'espère que ce tutoriel vous aidera.

Qu'est ce que la programmation réactive (PR) ?

Il y a plein d'explications et de définitions sur internet. Celle de wikipedia est trop théorique et générique, comme d'habitude. La réponse typique sur Stackoverflow n'est évidemment pas adaptée au débutants. Le « Reactive Manifesto » est le genre de chose faite pour être montrée à votre manager. Du coté de Microsoft, les « Reactive Extensions (Rx) » sont définies comme « Observables + LINQ + Schedulers », ce qui est lourd et troublant pour ceux qui ne connaissent pas les technologies Microsoft. Des termes basiques comme « réactive » et « propagation des changements » ne sont pas vraiment différents des architectures typiques MVC/MV* : évidemment que nos vues réagissent aux changements du modèle… si ce n'étais pas le cas, rien ne serait affiché.

Tant de blabla, simplifions tout ça !

Programmation réactive = programmation avec des flux de données asynchrones.

Dans un sens, il n'y a rien de nouveau. Les bus d'événements et les événements type clics de souris sont simplement des flux d'événements asynchrones que l'on peut observer pour y réagir. La PR, c'est la même idée mais survitaminée. Il devient possible de créer des flux à partir de tout et n'importe quoi, pas seulement de clics. Ces flux sont légers et présents de partout. Tout peut alors être un flux : les variables, les entrées utilisateurs, les propriétés, les caches, les structures de données, etc. Par exemple, imaginez que votre fil Twitter soit un flux de la même façon que les clics souris. Il est alors possible d'écouter/observer ce flux et réagir aux changements.

En plus de ça, il y a à votre disposition des boîtes à outils puissantes pour combiner, créer, filtrer tous ces flux. C'est là que la partie « fonctionnelle » se combine bien avec la PR. Un flux peut être utilisé comme entrée d'un autre. Plusieurs flux peuvent aussi être combinés pour en produire un nouveau. Deux flux peuvent être fusionnés. Un flux peut être filtré pour produire un nouveau flux avec uniquement les événements qui vous intéressent. Vous pouvez transformer chaque événement d'un flux pour en produire un nouveau.

Si les flux sont si importants pour la PR, il faut s'y plonger, en prenant l'exemple du flux d'événements souris.

représentation des flux

Un flux est une séquence d'événements en cours de production et ordonnés dans le temps. Trois types de signaux peuvent être émis par un flux : une valeur (d'un certain type), une erreur, ou un signal « fin ». Le signal de « fin » est par exemple envoyé quand l'utilisateur ferme la fenêtre qui émettait les événements souris.

L'idée est de traiter ces événements de manière asynchrone, en définissant une fonction à exécuter pour chaque valeur émise, une autre fonction à exécuter en cas d'erreur et une dernière pour le cas du signal de « fin ». Dans certains cas particuliers, les deux dernières sont inutiles et il est suffisant de se concentrer sur la fonction qui traite les valeurs. Écouter un flux (listening), se dit aussi « souscrire au flux » (subscribing). En fait les fonction que l'on définit sont des observateurs et le sujet (objet observé) est le flux. C'est exactement le patron de conception observateur.

Pour simplifier l'écriture du tutoriel, nous adopterons parfois une notation textuelle ASCII pour les diagrammes. Par exemple le diagramme précédent peut être représenté aussi comme ça :

--a---b-c---d---X---|->

a, b, c, d are emitted values
X is an error
| is the 'completed' signal
---> is the timeline

Cette histoire d'observateur est peut être déjà très familière pour vous et pour ne pas vous ennuyer, passons à quelque chose de nouveau : nous allons créer un nouveaux flux d'événements en transformant le flux de clics

Commençons par faire un flux qui compte le nombre de fois que le bouton a été clické, au total. Dans les bibliothèque de programmation réactive fonctionnelle, il existe plein de fonctions pour manipuler les flux : map (pour transformer chaque valeur), filter (pour filtrer certaines valeurs), scan (pour appliquer successivement une opération), etc. Quand on appelle une de ces fonctions, genre fluxDeClics.map(f), cela renvoie un nouveau flux qui découle du premier. Le flux d'origine n'est pas modifié du tout. Cette propriété de non-modification est appelée « immutabilité » et est une des bases de la programmation fonctionnelle. Comme un nouveau flux est créé, il est possible de chaîner les appels, par exemple : fluxDeClics.map(f).scan(g)

  clickStream: ---c----c--c----c------c-->
               vvvvv map(c devient 1) vvvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->

L'appel map(f) transforme chaque élément en calculant x = f(valeur) à chaque valeur reçue (ces « x » étant envoyés dans le nouveau flux créé). Dans notre cas, nous voulons simplement transformer chaque clic souris en la valeur « 1 ». L'appel scan(g) agrège successivement les valeurs reçues, en calculant accumulation = g(accumulation, valeur) à chaque valeur reçue et en envoyant les valeurs successive de « accumulation » dans le nouveau flux créé. Dans notre exemple, la fonction « g » est simplement une addition. Au final le flux créé contient le nombre total de clics, avec une mise à jour à chaque nouveau clic.

Pour illustrer la puissance de la PR, supposons que l'on veut faire un flux de « double clics ». Pour que ce soit encore plus intéressant, on veut aussi considérer les triples clics et autres clics multiples. Prenez un instant et imaginez comment vous feriez ça en programmation impérative traditionnelle. En général, c'est vraiment tordu et ça implique au moins d'avoir des variables pour garder l'état et de manipuler avec des horloges et des intervalles de temps.

En programmation réactive fonctionnelle, c'est en fait super simple. Le code fait au final 4 lignes, mais mettons ça de coté pour l'instant. Nous allons utilisez des diagrammes parce que c'est quand même bien pratique, que ce soit pour les débutants ou pour les experts.

flux de multi-clics

Les boites grises sont des fonctions qui transforment un flux en un autre. Tout d'abord, on groupe les clics dans des listes, un groupe étant créé dès qu'il y a un silence de 250 millisecondes (c'est ce que fait stream.throttle(250ms)). Pas de panique si vous ne comprenez pas les détails tout de suite, c'est juste une démo pour l'instant. Le résultat est un flux de listes sur lequel on applique map() pour transformer chaque liste en un entier qui est la longueur de la liste. Au final, on ignore les 1 (simples clics) en utilisant un filtre filter(x >= 2). Ça y est ! Seulement 3 opérations pour produire le flux que l'on veut. On peut maintenant souscrire à ce flux et réagir aux événements comme on le souhaite.

J'espère que vous appréciez la beauté de cette approche. Cet exemple est juste la partie visible de l'iceberg. D'un coté, les mêmes opérations peuvent être appliquées à différents types de flux, comme par exemple un flux de réponses d'un service Web. D'un autre coté, il y a beaucoup plus de fonctions disponibles que celles que l'on a vues pour l'instant.

« Pourquoi envisager d'utiliser la programmation réactive fonctionnelle ? »

La PR fonctionnelle élève le niveau d'abstraction de votre code et, au lieu de se battre avec des détails d'implantation, vous pouvez vous concentrer sur les interdépendances entre les événements de votre logique métier. Un code de PR fonctionnelle est très souvent plus concis, ce qui peut avoir certains avantages.

Le gain est plus flagrant dans les applications web modernes qui sont fortement interactive et reçoivent de nombreux événements venant des interactions utilisateur et des services distants. Il y a seulement 10 ans, la navigation web était quasiment limitée à faire une requête à un serveur distant et rendre la réponse dans le navigateur. Les applications webs ont évolué et sont maintenant très dynamiques : l'édition d'un formulaire peut déclencher automatiquement une sauvegarde sur le serveur distant, un « like/+1/RT » peut se répercuter directement sur l'écran de tous les utilisateurs connectés, etc.

De nos jours les applis web gèrent de nombreux types d'évènements qui permettent de proposer une expérience utilisateur plus riche. En tant que développeurs, il nous faut des outils pour gérer tout ça et la PRF est la/une réponse au problème.

Apprendre à penser en PRF, par l'exemple

Plongeons dans le vif du sujet, un exemple réel qui servira de guide pour apprendre à penser en PRF. Procédons pas à pas : stop aux exemples synthétiques et aux concepts définis à moitié. À la fin de ce tutoriel, nous aurons écrit du code qui fonctionne tout en sachant ce que chaque ligne fait.

J'ai choisis Javascript et RxJS comme outils pour une bonne raison : Javascript est le language le plus connu en ce moment, et la the famille de bibliothèque Rx* est disponible dans à peu près tous les langages (.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy, etc) Quel que soit vos outils de prédilection, suivre ce tutoriel devrait vous être bénéfique.

Implantation du conteneur « Suggestions » de Twitter

Dans Twitter, un des éléments de l'interface web propose des suggestions de nouvelles personnes à suivre:

suggestions twitter

Nous allons donc essayer d'imiter cet élément, en se concentrant sur certaines fonctionnalités :

  • Au début, afficher trois suggestions chargées à partir de l'API web.
  • Quand on clique sur « Refresh », 3 nouvelles suggestions sont chargées et remplacent celles présentées avant.
  • Quand on clique sur un des « x » sur une ligne, cette suggestion et remplacée par une nouvelle.
  • Chaque ligne affiche l'avatar et un lien vers la page du compte suggéré.

Nous laisserons de coté les autres fonctionnalités que l'on peut considérer comme mineures. Au lieu de construire cette boîte de suggestion pour Twitter (qui a récemment fermé son API publique), construisons la pour les comptes Github. Il y a une API pour obtenir les utilisateurs de Github.

Le code final complet de ce tutoriel est disponible sur http://jsfiddle.net/staltz/8jFJH/48/ si vous voulez y jeter un œil. Pour rester proche du code donné dans le lien, le code du tutoriel n'est pas traduit.

Requêtes et Réponses

Comment approcher ce problème en PRF ? Pour commencer, (presque) tout peut être un flux, les flux sont partout. C'est la philosophie de la PRF. Commençons par la fonctionnalité la plus simple: « Au début, afficher trois suggestions chargées à partir de l'API web ». Évidemment, c'est aussi simple que (1) faire une requête, (2) recevoir la réponse, (3) faire le rendu de la réponse. Nous allons donc représenter nos requêtes par un flux. Cela peut sembler inutile au premier abord, il faut bien commencer par le début non ?

Au démarrage, il suffit de faire une seule requête au serveur, du coup, si on modélise les requêtes à réaliser comme un flux, ce sera un flux qui ne génère qu'une valeur. Plus tard, en fait, nous aurons plusieurs requêtes, mais pour l'instant, il n'y a en qu'une.

--a------|->

Où « a » est la chaîne de caractères 'https://api.github.com/users'.

C'est un flux d'URLs à requêter. Chaque événement dans un flux contient deux choses : le « quand » et le « quoi ». Le « quand » correspond au moment où l'événement est émis, ici le moment où la requête doit être exécutée. Le « quoi » correspond à la valeur émise, ici l'URL.

Il est très facile avec Rx* de créer un flux à partir d'une seule valeur comme c'est le cas ici. La nom « officiel » pour un flux est « Observable », puisqu'un flux peut être observé, mais je trouve ce nom un peut tordu, alors nous allons continuer à utiliser « flux ».

var requestStream = Rx.Observable.returnValue('https://api.github.com/users');

Pour l'instant, c'est juste un flux de chaînes de caractères, qui ne fait rien d'autre. Il va donc falloir faire quelque chose de des valeurs émises par le flux. Pour cela, on va s'abonner/souscrire au flux :

requestStream.subscribe(function(requestUrl) {
  // execute the request
  jQuery.getJSON(requestUrl, function(responseData) {
    // ...
  });
}

On utilise ici jQuery pour faire une requête asynchrone et traiter la réponse dans une callback (on suppose que vous connaissez déjà). Mais attendez un peu : on a pas dit que la PRF est faite pour gérer les données asynchrones !? Est-ce que les réponses aux requêtes ne pourraient elles pas être vues comme un flux ? D'un point de vue conceptuel ça semble totalement adapté, essayons donc d'implanter ça.

requestStream.subscribe(function(requestUrl) {
  // execute the request
  var responseStream = Rx.Observable.create(function (observer) {
    jQuery.getJSON(requestUrl)
    .then(function(response) { observer.onNext(response); })
    .fail(function(jqXHR, status, error) { observer.onError(error); })
    .done(function() { observer.onCompleted(); });
  });

  responseStream.subscribe(function(response) {
    // do something with the response
  });
}

La fonction Rx.Observable.create() crée un flux qui informera chaque observateur de chaque nouvel événement (onNext()) et des erreurs (onError()). Nous venons juste d'encapsuler une « promesse » de réponse (promise) jQuery. Cela veut dire qu'une promesse est un observable ?

impressionné ?

Exactement !

Un observable est un peu une super-promesse. Dans Rx*, il est très facile de convertir un promesse en observable avec var stream = Rx.Observable.fromPromise(promise), essayons donc d'utiliser cette possibilité. En fait, « observable » n'est pas exactement conforme à la convention Promises/A+, mais d'un point de vue conceptuel, il n'y a aucun problème. Une promesse n'est qu'un observable avec une seule valeur émise. Les flux de la PRF vont au delà des promesses en permettant d'émettre plusieurs valeurs.

C'est plutôt sympa et cela illustre que la PRF est aussi puissante que les promesses. Si vous croyez en la mode des promesses, c'est une motivation de plus pour être motivés par la PRF.

Retournons à nos moutons. Si vous avez été attentifs, vous avez pu voir que notre code contient un subscribe() dans un subscribe(), c'est un peu un enfer de callback. De plus, la création de responseStream dépend de requestStream. Comme vu avant, la PR fonctionnelle permet de facilement créer des nouveaux flux à partir de flux existants, et c'est ce que nous devrions donc faire.

La seule fonction de base que vous êtes censés vraiment connaître à ce niveau c'est map(f), qui prend chaque valeur dans le flux, lui applique la fonction f() et produit un flux avec ces valeurs. Si nous appliquons ce principe à nos flux de requêtes et de réponses, nous pouvons « mapper » nos URLs de requête en promesses de réponse (promesses déguisées en flux).

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

Ici nous avons créé un monstre, un « méta-flux », c'est à dire un flux de flux. Pas de panique : un méta-flux est un flux dont chaque valeur émise est elle même un flux. On peut voir ces flux comme des pointeurs, chaque valeur émise par le méta-flux est une sorte de pointeur vers un autre flux. Dans notre exemple, chaque URL requête est transformée en une promesse réponse, déguisée en un flux de réponse (émettant une unique réponse).

méta-flux de réponses

Utiliser un méta-flux peut être troublant et ça ne semble pas vraiment nous rendre la vie plus facile. Au final, on veut juste un flux de réponses où chaque valeur est un objet JSON, pas une promesse d'objet JSON. J'ai l'honneur de vous présenter Mr Flatmap : c'est l'équivalent de map() mais qui aplatit (flattens) un méta-flux, c'est à dire qu'une valeur sera émise dans le flux résultants à chaque fois qu'une valeur est émise dans n'importe lequel des sous-flux d'entrées (les branches dans le schéma). Flatmap n'est pas un patch/fix et les méta-flux ne sont pas des bugs : ce sont de vrais outils pour gérer les réponses asynchrones en PR fonctionnelle.

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });


flux de réponses

Cool. Au final, comme notre flux de réponses est défini à partir du flux de requêtes, si plus tard, notre flux de requête contient de nouvelles URLs, nous obtiendrons automatiquement les réponses correspondantes dans notre flux de réponses.

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

(minuscules : requêtes ; majuscules : réponses correspondantes)

Nous avons enfin notre flux de réponses, il ne reste plus qu'à faire un rendu de ce que nous recevons.

responseStream.subscribe(function(response) {
  // render response to the DOM however you wish
});

Voici le code que nous avons jusqu'à présent en mettant tout bout à bout.

var requestStream = Rx.Observable.returnValue('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // render "response" to the DOM however you wish
});

Le bouton « Rafraîchir »

Je ne l'ai pas précisé mais l'API fournit des réponses JSON contentant systématiquement 100 utilisateurs (à recommander). Il est possible de donner un décalage (à partir de quel indice on veut les utilisateurs) mais il n'est pas possible de spécifier combien d'utilisateurs on veut. Du coup, on utilise juste 3 utilisateurs pour les recommandations et on gâche les 97 autres. Pour l'instant, on va ignorer ce problème, on verra plus tard comment mettre ces réponses en cache.

À chaque fois que l'utilisateur clique sur le bouton rafraîchir, le flux de requête doit émettre un nouvelle URL, de façon à ce que, au final, on obtienne la réponse. Nous devons donc faire deux choses. Il nous faut un flux d'événements de clics sur le bouton rafraîchir (devise : les flux sont partout) et il faut que l'on modifie le flux de requêtes pour qu'il dépende du flux de clics. Heureusement, RxJS permet très facilement de créer un observable (flux) à partir de d'événements d'interface.

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

Étant donné que les événements de clics ne contiennent pas d'URL, il nous faut transformer le flux pour produire des URLs. Nous allons donc « mapper » les clics vers une URL pour l'API contenant à chaque fois un décalage choisi au hasard.

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

Comme je suis un peu simplet et que je n'ai pas de tests automatisés, je viens de casser une fonctionnalité que nous avions auparavant. Nous ne faisons plus de requête au départ, la requête n'est faite que lorsque l'utilisateur clique sur « rafraîchir ». Oups. Il nous faut les deux comportements : une requête doit être émise quand la page est ouverte mais aussi à chaque clic sur « rafraîchir ».

Nous savons déjà comment faire un flux pour chacun de ces cas :

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });
var startupRequestStream = Rx.Observable.returnValue('https://api.github.com/users');

Maintenant il faudrait que l'on fusionne ces flux, comment faire ? Tada, voici la fonction merge(). Une petite illustration de ce que « merge » fait :

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

Maintenant, tout devient plus facile :

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.returnValue('https://api.github.com/users');

var requestStream = Rx.Observable.merge(
  requestOnRefreshStream, startupRequestStream
);

On peut aussi écrire ça plus simplement, sans les variables intermédiaires, c'est un peu plus lisible :

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .merge(Rx.Observable.returnValue('https://api.github.com/users'));

Encore plus court et plus lisible :

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .startWith('https://api.github.com/users');

La fonction startWith() fait exactement ce que vous imaginez. Quel que soit le flux sur lequel elle est appelée, le flux produit par .startWith(x) émettra la valeur « x » en tout premier, puis le reste du flux d'origine. Mais tout cela n'est pas très DRY, je répète l'URL de l'API. Une solution est de déplacer le startWith() sur le refreshClickStream, pour en fait générer un clic virtuel au démarrage.

var requestStream = refreshClickStream.startWith('this is a virtual click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

Très bien. Si vous jeter un œil au code au dessus (au moment où l'on a cassé les tests automatiques), la seule différence est que l'on vient d'ajouter le startWith().

Modélisation des 3 suggestions avec les flux

Jusque là, nous avons modifié qu'un seul élément de l'interface graphique au moment de faire le rendu dans « responseStream.subscribe ». Avec le bouton rafraîchir, il y a maintenant un problème : quand l'utilisateur clique sur « rafraîchir », les 3 suggestions déjà affichées ne sont pas effacées. Les nouvelles suggestions apparaissent uniquement quand les réponses arrivent, mais pour que l'interface soit plus réactive, nous devons effacer les suggestions courantes dès que l'utilisateur clique sur « rafraîchir ».

refreshClickStream.subscribe(function() {
  // clear the 3 suggestion DOM elements 
});

Pas si vite ! C'est la porte ouverte aux problèmes. Nous avons maintenant deux souscripteurs qui modifient tous les deux le DOM (l'autre étant responseStream.subscribe()), et ça ne semble pas être une bonne séparation de responsabilités. Vous vous rappelez de la devise de la PRF ?

les flux sont partout

Ok, on va donc modéliser les suggestions comme un flux, dans lequel chaque valeur émise est un objet JSON contenant les infos pour la suggestion. On va faire cela de manière séparée pour chacune des 3 suggestions. Voici le flux de suggestions JSON pour la première zone de suggestion :

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });

Les autres, suggestion2Stream et suggestion3Stream, peuvent être de simples copier/coller de suggestion1Stream. Ce n'est pas DRY mais l'exemple reste plus simple grâce à ça, et je trouve que c'est un très bon exercice de se poser la question de comment éviter cette répétition.

Au lieu de faire le rendu dans responseStream.subscribe, on le fait ici :

suggestion1Stream.subscribe(function(suggestion) {
  // render the 1st suggestion to the DOM
});

Pour revenir au problème du « quand l'utilisateur clique rafraîchir, effacer immédiatement les suggestions », nous allons simplement transformer les clics sur rafraîchir en une suggestion JSON null (on pourrait aussi utiliser une constante, et par exemple l'appeler EFFACER) et l'inclure dans suggestion1Stream, comme ça :

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  );

Et pour le rendu, il faut maintenant interpréter ce null comme « pas d'info » et donc cacher la vue :

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});

Voici la vue d'ensemble de nos flux :

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t-->

(« N » voulant dire null)

En bonus, il devient super facile d'afficher des suggestions vides au démarrage. Il suffit de faire commencer le flux de suggestions par null avec startWith(null).

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

Ce qui nous donne :

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t-->

Défausse d'une suggestion et mise en cache des réponses

Il nous reste une fonctionnalité à implémenter. Chaque suggestion dans l'interface doit avoir un bouton 'x' qui permet de la défausser et de la remplacer par une nouvelle suggestion. Au premier abord, on pourrait se dire qu'il suffit de faire une nouvelle requête quand l'utilisateur clique sur le bouton, ce qui donnerait :

var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// and the same for close2Button and close3Button

var requestStream = refreshClickStream.startWith('startup click')
  .merge(close1ClickStream) // we added this
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

En fait, ça ne fonctionnerait pas vraiment. Cela fermerait et rechargerait toutes les suggestions, alors que l'on veut impacter uniquement la suggestion correspondant au bouton 'x' cliqué. Il y a plusieurs façons de résoudre ce problème, et pour que ce soit un peu plus intéressant, nous allons le résoudre en utilisant les réponses que l'on a déjà reçues. Pour rappel, l'API JSON produit systématiquement 100 utilisateurs et nous n'en utilisons que 3, nous avons donc beaucoup de données disponibles. Il n'est donc pas nécessaire de refaire une nouvelle requête.

Encore une fois, pensons avec des flux. Quand un événement 'close1' arrive, nous voulons utiliser la réponse la plus récente du responseStream et sélectionner au hasard un nouvel élément dans la liste des 100 utilisateurs. Cela donne :

    requestStream: --r--------------->
   responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

Dans Rx*, il y a une fonction de combinaison qui s'appelle combineLatest et qui semble faire exactement ce que nous voulons. Elle prends deux flux A et B en paramètres et, à chaque fois qu'un des deux flux émets une valeur, combineLatest(f) prends les deux valeurs les plus récentes a et b émises par les deux flux (A et B) et émets c = f(a,b). C'est plus simple avec un schéma :

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          -----AB---AC--EC---ED--ID--IQ--->

(dans le cas où f est une fonction qui mets en majuscule)

Nous pouvons donc utiliser combineLatest sur close1ClickStream et responseStream, de façon à ce que dès que l'utilisateur clique sur close1, nous utilisons la dernière réponse reçue pour émettre une nouvelle valeur dans suggestion1Stream. D'un autre coté, combineLatest est symétrique : dès qu'une réponse est émise dans responseStream, elle sera combinée avec le dernier événement close1 pour produire une nouvelle suggestion. Ceci est très intéressant puisqu'on peut maintenant simplifier notre code de suggestion1Stream comme suit :

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

Il manque une pièce au puzzle. La fonction combineLatest utilise les valeurs les plus récentes en provenance de chaque source mais si une des source n'a encore rien émis, combineLatest ne produira pas d'événements en sortie. Si vous regardez le schéma ASCII ci-dessus, vous voyez que rien n'est émis sur la sortie quand le premier flux émets « a ». C'est seulement quand le second flux émets « b » que combineLatest peut produire une sortie pour la première fois.

Il y a plusieurs façons de résoudre ce problème et on va choisir la plus simple, qui consiste à simuler un clic sur close1 au démarrage :

var suggestion1Stream = close1ClickStream.startWith('this is a virtual click') // we added this
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

En résumé

Ça, c'est fait ! Le code complet est donc celui ci :

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

var closeButton1 = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
// and the same logic for close2 and close3

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var responseStream = requestStream
  .flatMap(function (requestUrl) {
    return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
  });

var suggestion1Stream = close1ClickStream.startWith('startup click')
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});

Vous pouvez le voir en action sur http://jsfiddle.net/staltz/8jFJH/48/.

Ce code est court mais dense : il y a une gestion de multiples événements avec séparation propre des responsabilités et la mise en cache des réponses. Le style fonctionnel a rendu ce code plus déclaratif que impératif : au final, on ne donne pas une séquence d'instructions à exécuter mais on décrit plutôt quels sont les relations entre les flux d'événements. Par exemple, avec la PRF, on a indiqué que suggestion1Stream est le flux du bouton « close1 » combiné avec un utilisateur pris dans la dernière réponse obtenue de l'API, et qu'il est null quand le bouton rafraîchir est cliqué ou au départ de l'application.

On peut aussi noter l'absence de contrôle de flux (if, for, while) et de callbacks si typique aux applications Javascript. On pourrait même se débarrasser du if/else dans le rendu en utilisant « filter() » (ceci n'est pas indispensable et est laissé en exercice). En PRF, nous avons a disposition de nombreux outils comme map, filter, scan, merge, combineLatest, startWith et bien d'autres pour gérer le flux de contrôle pour des programmes à base d'événements. Cette boîte à outils permet de faire plus avec moins de code.

Et maintenant, quoi ?

Si vous pensez que Rx* sera votre bibliothèque de choix, prenez un peu de temps pour vous familiariser avec son énorme liste de fonctions pour transformer, combiner et créer des Observables. Si vous préférez comprendre ces fonctions à l'aides de diagrammes, je vous conseille de regarder l'excellente documentation de RxJava qui fourmille de schémas. Dès que vous bloquez en essayant d'implémenter quelque chose, utilisez ces diagrammes, réfléchissez dessus, regardez la longue liste de fonctions, et réfléchissez à nouveau. Mon expérience tends à prouver que ce processus de résolution de problème est très efficace.

Quand vous commencerez à programmer avec Rx*, il sera indispensable de comprendre les concepts d'observables « Cold » et « Hot ». Ce sont des concepts que l'on se prend de plein fouet à un moment ou à un autre. Vous êtes prévenus, jetez y un œil avant que qu'ils ne vous mordent. Vous pouvez aussi affûter vos connaissances en apprenant la vrai programmation fonctionnelle et comment gérer les choses telles que les effets de bord qui affectent Rx.

Cependant, la PRF n'est pas limitée à Rx. Il y a Bacon.js qui est très intuitive, sans certaines bizarreries que l'on trouve dans Rx. Le langage Elm joue dans sa propre catégorie : c'est un langage de PRF qui est compilé vers du Javascript + HTML + CSS et a un debugger avec voyage dans le temps. Plutôt cool !

La PRF marche très bien pour les frontends et les application avec beaucoup d'événements. Ce n'est pourtant pas juste un truc à utiliser coté client, ça marche aussi très bien coté serveur et proche des bases de données. D'ailleurs, RxJava est un composant clé dans la gestion de la concurrence derrière l'API Netflix. La PR fonctionnelle n'est pas réservée à un type d'application ou à un langage. C'est un paradigme que vous pouvez appliquer dans n'importe quel programme qui est structuré par des évènements.

Si ce tutoriel vous a aidé, faîtes le suivre sur twitter et ailleurs.

Rappel : et article est une traduction de « The introduction to Reactive Programming you've been missing » posté par André Staltz.

Des commentaires, des remarques ? Contactez moi : click-me ;-p @nospam.com.