diff --git a/07_stackless_coroutines.md b/07_stackless_coroutines.md index 417ac3d77c34a874ca5a1360b97c89b493a9a402..f69b19110bacf0740b1fcf839342a21554e09136 100644 --- a/07_stackless_coroutines.md +++ b/07_stackless_coroutines.md @@ -30,10 +30,274 @@ patat: * Notre syntaxe `async/await` à nous (nommé `coroutine/wait` ben oui on peut pas réutiliser `async/await`) * Un préprocesseur qui va transformer notre syntaxe en machine d'états -## Ce qu'on va pas faire +### Ce qu'on va pas faire * Gérer les erreurs (erreur => panique) * Pas de généricité * Pas de macros (meilleure lisibilité) +## Avec une syntaxe `async/await` + +```rust +async fn async_main() { + println!("Program starting") + let txt = Http::get("/1000/HelloWorld").await; + println!("{txt}"); + let txt2 = Http::("500/HelloWorld2").await; + println!("{txt2}"); +} +``` + +## Objetif haut niveau + +* On veut créer une tâche qu'on peut mettre en pause et redémarrer +* On veut la modéliser sous la forme d'une machine d'états +* On veut une syntaxe proche du async/await + +## Ce que va faire notre programme + +1. Afficher un message lorsque la tâche démarre +2. Faire une requête `GET` à un serveur +3. Attemdre la réponse de la requête +4. Afficher la réponse du serveur +5. Faire une deuxième requête `GET` +6. Attendre la deuxième réponse du serveur +7. Afficher la deuxième réponse +8. Quitter le programme + +Remarque: pas d'exécuteur ou de réacteur pour le moment + +## Notre propre `Future` + +```rust +pub trait Future { + type Output; + + fn poll(&mut self) -> PollState<Self::Output>; +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; +} +pub enum PollState<T> { + Ready(T), + NotReady, +// Pending +} +``` + +* Presque les mêmes définitions que `Future` et `Poll` de la librairie standard +* Il manque que le contexte pour `poll()` +* `NotReay` -> `Pending` pour la librairie standard +* Tout l'enjeu pour chaque partie de notre code sera d'implémenter le trait `Future` + +## La coroutine 1/ + +```rust +struct Coroutine { + state: CoroutineState, +} +enum CoroutineState { + Start, + Wait1(Box<dyn Future<Output = String>>), + Wait2(Box<dyn Future<Output = String>>), + Resolved, +} +``` + +* La machine d'état état: `Start` -> `Wait1` -> `Wait2` -> `Resolved` +* Rappel: + 1. Démarrage + 2. Reqûete no1 et attente de la réponse + 3. Requête no2 et attente de la réponse + 4. Fin +* Coroutine est "que" l'état (plus tard sera plus que ça) + +## La coroutine /3 + +```rust +fn async_main() -> impl Future<Output = ()> { + Coroutine { + state: CoroutineState::Start, + } +} +fn main() { + // We call the new Coroutine that returns a Future + let mut future = async_main(); + // We poll the Future until completion writing Schedule other tasks each time the Future is in + // NotReady State. We alo wait for 100 milliseconds to not be overwhelmed by messages + loop { + match future.poll() { + PollState::NotReady => { + println!("Schedule other tasks"); + } + PollState::Ready(_) => break, + } + thread::sleep(Duration::from_millis(100)); + } +} +``` + +* On démarre la coroutine avec l'état à `Start` +* La coroutine implémnte le trait `Future` +* On `poll()` la `Coroutine` tant qu'elle est `NotReady` où on pourrait exécuter d'autres tâches +* On attend `100`ms pour pas avoir trop de messages + +## La coroutine 4/ + +```rust +impl Future for Coroutine { + type Output = (); + fn poll(&mut self) -> PollState<Self::Output> { + loop { + match self.state { + CoroutineState::Start => { + println!("Program starting"); + self.state = CoroutineState::Wait1(Box::new(Http::get("/600/HelloWorld1"))); + } + CoroutineState::Wait1(ref mut future) => match future.poll() { + PollState::Ready(txt) => { + println!("{txt}"); + self.state = CoroutineState::Wait2(Box::new(Http::get("/400/HelloWorld2"))); + } + PollState::NotReady => break PollState::NotReady, + }, + CoroutineState::Wait2(ref mut future) => match future.poll() { + PollState::Ready(txt) => { + println!("{txt}"); + self.state = CoroutineState::Resolved; + break PollState::Ready(()); + } + PollState::NotReady => break PollState::NotReady, + }, + CoroutineState::Resolved => panic!("Polled a resolved Future which is illegal!"), + } + } + } +} +``` + +* La `Coroutine` retourne rien +* Boucle infinie pour faire avancer notre état `Start` -> `Wait1` -> `Wait2` -> `Resovled` +* `Wait1/2` contiennent les requêtes `GET` enfant sur lesquelles ont attend pour pouvoir progresser +* On `poll()` les `Future` enfants de `Coroutine` +* Si on tente te `poll()` depuis l'état `Resolved` on `panic!()` + +## La requête `GET` 1/ + +```rust +fn get_req(path: &str) -> String { + format!( + "GET {path} HTTP/1.1\r\n\ + Host: localhost\r\n\ + Connection: close\r\n\ + \r\n" + ) +} +pub struct Http; +impl Http { + pub fn get(path: &str) -> impl Future<Output = String> { + HttpGetFuture::new(path) + } +} +``` + +* `Http::get()` va retourner un `Future` qui contient la requête + +## La requête `GET` 2/ + +```rust +struct HttpGetFuture { + // Option since we will not conect upon creation + stream: Option<mio::net::TcpStream>, + // buffer to read from the TcpStream + buffer: Vec<u8>, + // The GET request we will construct + path: String, +} +``` + +* `stream` n'est pas connecté lors de la création de la requête (`Option`) +* `buffer` de lecture sur le stream +* `path` la requête + +## La requête `GET` 2/ + +```rust +impl HttpGetFuture { + fn new(path: &str) -> Self { + Self { + stream: None, + buffer: vec![], + path: String::from(path), + } + } + fn write_request(&mut self) { + let stream = std::net::TcpStream::connect("localhost:8080").unwrap(); + stream.set_nonblocking(true).unwrap(); + let mut stream = mio::net::TcpStream::from_std(stream); + stream.write_all(get_req(&self.path).as_bytes()).unwrap(); + self.stream = Some(stream); + } +} +``` + +* Création de `HttpGetFuture` avec des valeurs par défaut (pas de requête encore) +* `write_request()` ne sera utilisée que plus tard (dans `poll()`) +* la lecture est non-bloquante + +## La requête `GET` 3/ + +```rust +impl Future for HttpGetFuture { + type Output = String; + fn poll(&mut self) -> PollState<Self::Output> { + if self.stream.is_none() { + println!("First poll - start operation"); + self.write_request(); + return PollState::NotReady; + } + let mut buf = vec![0u8; 4096]; + loop { + match self.stream.as_mut().unwrap().read(&mut buf) { + Ok(0) => { + let s = String::from_utf8_lossy(&self.buffer); + break PollState::Ready(String::from(s)); + } + Ok(n) => { + self.buffer.extend(&buf[0..n]); + continue; + } + Err(e) if e.kind() == ErrorKind::WouldBlock => { + break PollState::NotReady; + } + Err(e) if e.kind() == ErrorKind::Interrupted => { + continue; + } + Err(e) => panic!("{e:?}"), + } + } + } +} +``` + +* Le premier `poll()` écrit la requête au serveur et devient `NotReady` +* Ensuite on `loop` jusqu'à ce que: + 1. On ait finit de lire depuis le `stream` (on lit `0` octects) + 2. On a pas finit de lire depuis le `stream` (on lit `n` octects et on les met dans le `buffer`) et on continue + 3. Les données sont pas prêtes (`WouldBlock`) + 4. On a un signal d'interruption (on tente de relire) + 5. Tout autre retour, on `panic!()` +* Cette `Future` est *lazy*, on ne fait quelque chose que lorsqu'on a besoin du résultat (un `poll()` est *nécessaire* pour faire la requête) + +## La requête `GET` 4/ + +* Trois états différents + 1. `stream == None` => `NotStarted` + 2. `stream == Some() && read == Err(WuldBlock)` => `Pending` + 3. `stream == Some() && read == Ok(0)` => `Resolved` +* Pas de modélisation explicite ici + +## Conclusion + +* Les coroutines sans pile, ne sont pas pré-emptables (on peut pas suspendre leur exécution à l'import quel moment) +* En l'absence d'une pile, on a pas les informations nécessaire pour arrêter le processus n'importe où +* On peut suspendre l'exécution qu'aux points `wait` diff --git a/codes/coroutines/src/main.rs b/codes/coroutines/src/main.rs index ff6f4687f64cd6829dd8a4f740a18a274d13302c..511193962fe3ee0c3c4b1f6c293b90da4abd73a7 100644 --- a/codes/coroutines/src/main.rs +++ b/codes/coroutines/src/main.rs @@ -103,7 +103,7 @@ enum CoroutineState { Resolved, } -// +// We just create a new coroutine fn async_main() -> impl Future<Output = ()> { Coroutine::new() }