diff --git a/07_stackless_coroutines.md b/07_stackless_coroutines.md index f69b19110bacf0740b1fcf839342a21554e09136..6138eb002a2d4ef32bfa8cffe628058d3e62aca3 100644 --- a/07_stackless_coroutines.md +++ b/07_stackless_coroutines.md @@ -295,9 +295,233 @@ impl Future for HttpGetFuture { 3. `stream == Some() && read == Ok(0)` => `Resolved` * Pas de modélisation explicite ici +## Exécution 1/ + +```console +Program starting +First poll - start operation +Schedule other tasks +Schedule other tasks +Schedule other tasks +Schedule other tasks +Schedule other tasks +Schedule other tasks +Schedule other tasks +HTTP/1.1 200 OK +content-type: text/plain; charset=utf-8 +content-length: 11 +connection: close +date: Mon, 17 Mar 2025 08:03:55 GMT + +HelloWorld1 +First poll - start operation +Schedule other tasks +Schedule other tasks +Schedule other tasks +Schedule other tasks +Schedule other tasks +HTTP/1.1 200 OK +content-type: text/plain; charset=utf-8 +content-length: 11 +connection: close +date: Mon, 17 Mar 2025 08:03:55 GMT + +HelloWorld2 +``` + +## Exécution 2/ + +* On a bien le bon nombre de `poll()` pour chaque tâche +* On a bien les deux requêtes `HelloWorld1` et `HelloWorld2` +* Les deux tâches sont exécutées séquentiellement + +## Problème? + +* C'est pas très simple à utiliser +* Impossible à généraliser +* Verbeux +* En un mot: beurk! + +## Généralisation + +* Il faut un support du compilateur +* Nécessité d'avoir des mots clés: `coroutine/wait` +* Utilisation de l'outil `corofy`: <https://github.com/PacktPublishing/Asynchronous-Programming-in-Rust/tree/main/ch07> +* Outil très simplifié qui crée la machine d'état et les attentes + +## Utilisation de corofy + +```console +cargo install --path . +``` + +* Syntaxe `coroutine/wait` + +## Le nouveau `main` + +```rust +fn get_path(i: usize) -> String { + format!("/{}/HelloWorld{i}", i * 1000) +} +coroutine fn async_main() { + println!("Program starting"); + let txt = Http::get(&get_path(0)).wait; + println!("{txt}"); + let txt = Http::get(&get_path(1)).wait; + println!("{txt}"); + let txt = Http::get(&get_path(2)).wait; + println!("{txt}"); + let txt = Http::get(&get_path(3)).wait; + println!("{txt}"); + let txt = Http::get(&get_path(4)).wait; + println!("{txt}"); +} +fn main() { + let start = Instant::now(); + let mut future = async_main(); + loop { + match future.poll() { + PollState::NotReady => (), + PollState::Ready(_) => break, + } + } + println!("Elapsed time: {}", start.elapsed().as_millis()); +} +``` + +## Conversion 1/ + +```rust +enum State0 { + Start, + Wait1(Box<dyn Future<Output = String>>), + Wait2(Box<dyn Future<Output = String>>), + Wait3(Box<dyn Future<Output = String>>), + Wait4(Box<dyn Future<Output = String>>), + Wait5(Box<dyn Future<Output = String>>), + Resolved, +} +``` + +## Conversion 2/ + +```rust +fn poll(&mut self) -> PollState<Self::Output> { + loop { + match self.state { + State0::Start => { + println!("Program starting"); + let fut1 = Box::new( Http::get(&get_path(0))); + self.state = State0::Wait1(fut1); + } + + State0::Wait1(ref mut f1) => { + match f1.poll() { + PollState::Ready(txt) => { + println!("{txt}"); + let fut2 = Box::new( Http::get(&get_path(1))); + self.state = State0::Wait2(fut2); + } + PollState::NotReady => break PollState::NotReady, + } + } +// ... + } + } +} +``` + +* La syntaxe `async/await` fait des choses similaires automatiquement (en beaucoup mieux) + +## Coroutines et concurrence + +* On a besoin d'un `join_all()` des `Future` +* Idée: parcourir les `Future` et les `poll()` à tour de rôle jusqu'à ce que toutes soient `Ready(_)` +* On stocke les `Future` dans une liste, ainsi que leur état (terminé ou non) + +```rust + +``` + +## Le `join_all` 1/ + +```rust +pub struct JoinAll<F: Future> { + futures: Vec<(bool, F)>, + finished_count: usize, +} +pub fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> { + let futures = futures.into_iter().map(|f| (false, f)).collect(); + JoinAll { + futures, + finished_count: 0, + } +} +``` + +* `futures` contient les `Future` et si chacune est terminée ou pas +* On veut connaître combien sont terminées ou non +* On transforme la liste de `Future` en tuple et on y va! + +## Le `join_all` 2/ + +```rust +coroutine fn async_main() { + println!("Program starting"); + let mut futures = vec![]; + for i in 0..5 { + futures.push(request(i)); + } + future::join_all(futures).wait; +} +fn main() { + let start = Instant::now(); + let mut future = async_main(); + loop { + match future.poll() { + PollState::NotReady => (), + PollState::Ready(_) => break, + } + } + println!("Elapsed time: {}", start.elapsed().as_millis()); +} +``` + +## Le `join_all()` 3/ + +```rust +impl<F: Future> Future for JoinAll<F> { + type Output = String; + fn poll(&mut self) -> PollState<Self::Output> { + for (finished, future) in self.futures.iter_mut() { + if *finished { + continue; + } + match future.poll() { + PollState::Ready(_) => { + *finished = true; + self.finished_count += 1; + } + PollState::NotReady => continue, + } + } + if self.finished_count == self.futures.len() { + PollState::Ready(String::new()) + } else { + PollState::NotReady + } + } +} +``` + +* Si `finished` => on continue +* Sinon: on `poll()` chaque `Future`: + * Si `Ready` => on marque le `Future` comme terminé + * Sinon on `continue` +* Quand tous sont terminés le `JoinAll` est `Ready`, sinon `NotReady` + ## Conclusion * Les coroutines sans pile, ne sont pas pré-emptables (on peut pas suspendre leur exécution à l'import quel moment) * En l'absence d'une pile, on a pas les informations nécessaire pour arrêter le processus n'importe où * On peut suspendre l'exécution qu'aux points `wait` -