Skip to content
Snippets Groups Projects
Verified Commit 78b097e5 authored by orestis.malaspin's avatar orestis.malaspin
Browse files

added example

parent 4d824ef7
No related branches found
No related tags found
No related merge requests found
Pipeline #38734 passed
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "libc"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "log"
version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
[[package]]
name = "mio"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys",
]
[[package]]
name = "reactor_executor"
version = "0.1.0"
dependencies = [
"mio",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[package]
name = "reactor_executor"
version = "0.1.0"
edition = "2021"
[dependencies]
mio = { version = "1.0.3", features = ["net", "os-poll"] }
use crate::runtime::Waker;
// Future trait looks like standard Future except for the missing context
// pub trait Future {
// type Output;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
//}
// We now also have the `Waker` as an argument of the `Future` trait. We see that in the standard
// library the Waker is hidden in a Context which we do not need here
pub trait Future {
type Output;
// TODO:
// Add the Waker into poll
fn poll(&mut self) -> PollState<Self::Output>;
}
// Similar to the Poll #[derive(Debug)]
// pub enum Poll<T> {
// Ready(T),
// Pending,
// }
pub enum PollState<T> {
Ready(T),
NotReady,
}
// Exercise adapt the join_all function to accept the Waker structs
use mio::Interest;
use crate::future::{Future, PollState};
use crate::runtime::{self, reactor, Waker};
use std::io::{ErrorKind, Read, Write};
// Simple helper function to write GET requests
fn get_req(path: &str) -> String {
format!(
"GET {path} HTTP/1.1\r\n\
Host: localhost\r\n\
Connection: close\r\n\
\r\n"
)
}
pub struct Http;
impl Http {
pub fn get(path: &str) -> impl Future<Output = String> {
HttpGetFuture::new(path)
}
}
// The id of the `Future` is also a needed information to register it for the `Waker` and `Poll`
struct HttpGetFuture {
// Option since we will not conect upon creation
stream: Option<mio::net::TcpStream>,
// buffer to read from the TcpStream
buffer: Vec<u8>,
// The GET request we will construct
path: String,
// NEW: The id of our Future
id: usize,
}
impl HttpGetFuture {
fn new(path: &str) -> Self {
unimplemented!();
// TODO:
// get the id from the next_id of the reactor
let id = 1;
Self {
stream: None,
buffer: vec![],
path: String::from(path),
id,
}
}
// Writes the request to a non bocking stream. Here we are using mio t connect to the socket
// We just create the stream and do not perform any request here
fn write_request(&mut self) {
let stream = std::net::TcpStream::connect("localhost:8080").unwrap();
stream.set_nonblocking(true).unwrap();
let mut stream = mio::net::TcpStream::from_std(stream);
stream.write_all(get_req(&self.path).as_bytes()).unwrap();
self.stream = Some(stream);
}
}
impl Future for HttpGetFuture {
type Output = String;
// TODO:
// Add the waker into poll()
fn poll(&mut self) -> PollState<Self::Output> {
if self.stream.is_none() {
println!("First poll - start operation");
self.write_request();
//TODO:
// From the reactor()
// 1. Registering the Tcp stream as a source of events in the poll instance
// 2. We also set the `Waker` responsible for handling the `Future` with the self.id
}
let mut buf = vec![0u8; 4096];
// We loop until everything is read and return Ready when done
loop {
// We listen on the socket and read
match self.stream.as_mut().unwrap().read(&mut buf) {
// 0 bytes read so we are finished we got the entire GET response
Ok(0) => {
let s = String::from_utf8_lossy(&self.buffer);
// TODO:
// The future is over we deregister our interest from this source of events for
// the current id
break PollState::Ready(String::from(s));
}
// We read n bytes so we continue reading
Ok(n) => {
self.buffer.extend(&buf[0..n]);
continue;
}
// This error indicates that the data is not ready yet or that that there is more
// data we did not received yet
Err(e) if e.kind() == ErrorKind::WouldBlock => {
println!("Data not ready");
// TODO:
// We need to set the `waker` again since the data is not ready
break PollState::NotReady;
}
// This error can happen if a signal interrupted the read (which can happen) we
// handle this case by reading again
Err(e) if e.kind() == ErrorKind::Interrupted => {
continue;
}
// Nothing we can do. We panic!
Err(e) => panic!("{e:?}"),
}
}
}
}
mod future;
mod http;
mod runtime;
use future::{Future, PollState};
use http::Http;
use runtime::Waker;
fn main() {
let mut executor = runtime::init();
executor.block_on(async_main());
}
fn async_main() -> impl Future<Output = String> {
Coroutine::new()
}
enum State {
Start,
Wait1(Box<dyn Future<Output = String>>),
Wait2(Box<dyn Future<Output = String>>),
Resolved,
}
struct Coroutine {
state: State,
}
impl Coroutine {
fn new() -> Self {
Self {
state: State::Start,
}
}
}
// We need to adapt `poll()` and the subsequent calls of the children `Future` to have an extra
// argument: the `Waker`.
impl Future for Coroutine {
type Output = String;
//TODO:
//add Waker into poll
fn poll(&mut self) -> PollState<Self::Output> {
loop {
match self.state {
State::Start => {
println!("Program starting");
let fut1 = Box::new(Http::get("/600/HelloAsyncAwait"));
self.state = State::Wait1(fut1);
}
//TODO:
//add Waker into poll
State::Wait1(ref mut future) => match future.poll() {
PollState::Ready(txt) => {
println!("{txt}");
let fut2 = Box::new(Http::get("/400/HelloAsyncAwait"));
self.state = State::Wait2(fut2);
}
PollState::NotReady => break PollState::NotReady,
},
//TODO:
//add Waker into poll
State::Wait2(ref mut future) => match future.poll() {
PollState::Ready(txt) => {
println!("{txt}");
self.state = State::Resolved;
break PollState::Ready(String::new());
}
PollState::NotReady => break PollState::NotReady,
},
State::Resolved => panic!("Polled a resolved future"),
}
}
}
}
mod executor;
mod reactor;
pub use executor::{Executor, Waker};
pub use reactor::reactor;
pub fn init() -> Executor {
reactor::start();
Executor::new()
}
use std::{
cell::{Cell, RefCell},
collections::HashMap,
sync::{Arc, Mutex},
thread::{self, Thread},
};
use crate::future::{Future, PollState};
#[derive(Clone)]
pub struct Waker {
thread: Thread,
id: usize,
ready_queue: Arc<Mutex<Vec<usize>>>,
}
impl Waker {
pub fn wake(&self) {
// TODO:
// 1. push id into the ready_queue
// 2. unpark the thread
unimplemented!()
}
}
type Task = Box<dyn Future<Output = String>>;
thread_local! {
static CURRENT_EXECUTOR: ExecutorCore = ExecutorCore::default();
}
#[derive(Default)]
struct ExecutorCore {
tasks: RefCell<HashMap<usize, Task>>,
ready_queue: Arc<Mutex<Vec<usize>>>,
next_id: Cell<usize>,
}
pub fn spawn<F>(future: F)
where
F: Future<Output = String> + 'static,
{
unimplemented!();
// TODO:
CURRENT_EXECUTOR.with(|e| {
// 1. insert (next_id and future)
// 2. push id into ready_queue
// 3. add one to next_id
});
}
pub struct Executor;
impl Executor {
pub fn new() -> Self {
Self {}
}
fn pop_ready(&self) -> Option<usize> {
// TODO:
// pop from ready queue, CURRENT_EXECUTOR.with
// dont forget the lock()
unimplemented!()
}
fn get_future(&self, id: usize) -> Option<Task> {
// TODO:
// remove &id from tasks thanks to borrow_mut
unimplemented!()
}
fn get_waker(&self, id: usize) -> Waker {
// TODO:
// create a new waker from: id, thread::current(), and a clone from ready_queue
unimplemented!()
}
// Inserts a new task with a given id into the tasks HashMap
fn insert_task(&self, id: usize, task: Task) {
// TODO:
//insert a pair id, task into tasks
unimplemented!()
}
// Counts how many tasks are left to be executed
fn task_count(&self) -> usize {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow().len())
}
pub fn block_on<F>(&mut self, future: F)
where
F: Future<Output = String> + 'static,
{
// TODO:
// 1. spawn a future
loop {
// TODO:
// 1. while there are something left into ready_queue, pop_ready the id
// 2. get the future by id
// 3. get a waker by id
// 4. poll the future
// 5. if NotReady => insert the (id, future)
// 6. else continue
//
let task_count = self.task_count();
let name = String::from(thread::current().name().unwrap_or_default());
// TODO: if not finished park()
// otherwise break
if task_count > 0 {
println!("{name}: {task_count}, pending tasks. Sleep until notified.");
} else {
println!("{name}: All tasks finished");
}
}
}
}
use std::{
collections::HashMap,
sync::{atomic::AtomicUsize, Arc, Mutex, OnceLock},
};
use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token};
use crate::runtime::Waker;
type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
static REACTOR: OnceLock<Reactor> = OnceLock::new();
pub fn reactor() -> &'static Reactor {
REACTOR.get().expect("Called outside a runtime context")
}
pub struct Reactor {
wakers: Wakers,
registry: Registry,
next_id: AtomicUsize,
}
impl Reactor {
pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
// TODO:
// register on the registry for stream, id, and interest
unimplemented!()
}
pub fn set_waker(&self, waker: &Waker, id: usize) {
// TODO:
// insert a pair id, waker into wakers
unimplemented!()
}
pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
//TODO:
// 1. remove the if from the wakers (don't forget to lock/map)
// 2. deregister the stream from the registry
unimplemented!()
}
pub fn next_id(&self) -> usize {
self.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
}
fn event_loop(mut poll: Poll, wakers: Wakers) {
unimplemented!();
let mut events = Events::with_capacity(100);
loop {
// TODO:
// 1. poll event with None timeout
// 2. for_each event get the .token()
// 3. lock the wakers and the the waker by id
// 4. wake() the waker
}
}
pub fn start() {
unimplemented!();
// TODO:
// 1. create a new empty wakers HashMap<usize, Waker>
// 2. create a new Poll
// 3. create a try_clone poll.registry()
// 4. initialize a new next_id to 1
// 5. create a new Reactor
REACTOR.set(reactor).ok().expect("Rector already running");
// 6. move the event loop of poll and wakers into a new system thread:
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment