SE VUOI PRENDERE LA CERTIFICAZIONE PER QUESTO CORSO CLICCA QUI
Chiusura Sicura e Pulizia
Il codice nel Listato 20-20 sta rispondendo alle richieste in modo asincrono attraverso l’uso di un pool di thread, come previsto. Otteniamo alcuni avvertimenti sui campi workers, id e thread che non stiamo utilizzando in modo diretto e ci ricordano che non stiamo pulendo nulla. Quando usiamo il metodo meno elegante ctrl-c per interrompere il thread principale, tutti gli altri thread vengono interrotti immediatamente, anche se sono nel mezzo del servizio di una richiesta.
Quindi, implementeremo il trait Drop per chiamare join su ciascuno dei thread nel pool in modo che possano completare le richieste su cui stanno lavorando prima di chiudersi. Poi implementeremo un modo per dire ai thread che dovrebbero smettere di accettare nuove richieste e spegnersi. Per vedere questo codice in azione, modificheremo il nostro server per accettare solo due richieste prima di spegnere il suo pool di thread.
Implementazione del Trait Drop su ThreadPool
Iniziamo con l’implementazione di Drop sul nostro pool di thread. Quando il pool viene rilasciato, tutti i nostri thread dovrebbero unirsi per assicurarsi di completare il loro lavoro. Il Listato 20-22 mostra un primo tentativo di implementazione di Drop; questo codice non funzionerà ancora.
rust
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Arresto del worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
Innanzitutto, iteriamo attraverso ciascuno dei worker del pool di thread. Usiamo &mut per questo perché self è un riferimento mutabile, e abbiamo anche bisogno di poter mutare worker. Per ogni worker, stampiamo un messaggio che indica che questo particolare worker sta chiudendo e poi chiamiamo join sul thread di quel worker. Se la chiamata a join fallisce, usiamo unwrap per far panneggiare Rust e passare a uno spegnimento non delicato.
Ecco l’errore che otteniamo quando compiliamo questo codice:
arduino
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
L’errore ci dice che non possiamo chiamare join perché abbiamo solo un mutabile riferimento di ciascun worker e join richiede il possesso del suo argomento. Per risolvere questo problema, dobbiamo spostare il thread dall’istanza di Worker che possiede il thread in modo che join possa consumare il thread. Abbiamo fatto questo nel Listato 17-15: se Worker contiene un Option<thread::JoinHandle<()>> invece, possiamo chiamare il metodo take sull’Option per spostare il valore fuori dalla variante Some e lasciare al suo posto una variante None. In altre parole, un Worker che è in esecuzione avrà una variante Some in thread, e quando vogliamo pulire un Worker, sostituiremo Some con None in modo che il Worker non abbia un thread su cui eseguire.
Quindi sappiamo che vogliamo aggiornare la definizione di Worker in questo modo:
rust
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
Ora facciamo leva sul compilatore per trovare gli altri posti che devono cambiare. Verificando questo codice, otteniamo due errori:
sql
error[E0599]: no method named `join` found for enum `Option` in the current scope
css
error[E0308]: mismatched types
Affrontiamo il secondo errore, che punta al codice alla fine di Worker::new; dobbiamo avvolgere il valore del thread in Some quando creiamo un nuovo Worker. Apporta le seguenti modifiche per risolvere questo errore:
rust
Worker {
id,
thread: Some(thread),
}
Il primo errore è nella nostra implementazione di Drop. Abbiamo accennato in precedenza che intendevamo chiamare take sul valore Option per spostare il thread fuori dal worker. Le seguenti modifiche lo faranno:
rust
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Arresto del worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
Come discusso nel Capitolo 17, il metodo take su Option prende la variante Some e lascia al suo posto None. Usiamo if let per destrutturare il Some e ottenere il thread; quindi chiamiamo join sul thread. Se il thread di un worker è già None, sappiamo che quel worker ha già avuto il suo thread pulito, quindi in quel caso non succede nulla.
Segnalazione ai Thread di Smettere di Ascoltare le Richieste
Con tutte le modifiche apportate, il nostro codice si compila senza alcun avvertimento. Tuttavia, la brutta notizia è che questo codice non funziona ancora come vorremmo. La chiave è la logica nelle chiusure eseguite dai thread delle istanze Worker: al momento, chiamiamo join, ma questo non spegnerà i thread perché fanno un loop all’infinito alla ricerca di lavori. Se proviamo a rilasciare il nostro ThreadPool con la nostra attuale implementazione di drop, il thread principale bloccherà per sempre in attesa che il primo thread finisca.
Per risolvere questo problema, avremo bisogno di un cambiamento nell’implementazione di drop di ThreadPool e quindi un cambiamento nel loop di Worker.
Innanzitutto, cambieremo l’implementazione di drop di ThreadPool per rilasciare esplicitamente il mittente prima di attendere che i thread finiscano. Il Listato 20-23 mostra le modifiche a ThreadPool per rilasciare esplicitamente il mittente. Usiamo la stessa tecnica di Option e take come abbiamo fatto con il thread per poter spostare il mittente fuori da ThreadPool:
rust
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
// --snip-- ThreadPool {
workers,
sender: Some(sender),
}
} pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap();
}
} impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take()); for worker in &mut self.workers {
println!("Arresto del worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
Rilasciare il mittente chiude il canale, indicando che non verranno inviati altri messaggi. Quando ciò accade, tutte le chiamate a recv che fanno i worker nel loop infinito restituiranno un errore. Nel Listato 20-24, cambiamo il loop di Worker per uscire dal loop in modo delicato in quel caso, il che significa che i thread finiranno quando l’implementazione di drop di ThreadPool li chiama con join.
rust
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv(); match message {
Ok(job) => {
println!("Worker {id} got a job; executing."); job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Per vedere questo codice in azione, modifichiamo main per accettare solo due richieste prima di spegnere gentilmente il server, come mostrato nel Listato 20-25.
rust
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) {
let stream = stream.unwrap(); pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
Non vorresti che un server web del mondo reale si spegnesse dopo aver servito solo due richieste. Questo codice mostra solo che la chiusura delicata e la pulizia funzionano correttamente.
Il metodo take è definito nel trait Iterator e limita l’iterazione ai primi due elementi al massimo. ThreadPool uscirà dallo scope alla fine di main, e l’implementazione di drop verrà eseguita.
Avvia il server con cargo run, e fai tre richieste. La terza richiesta dovrebbe dare errore, e nel terminale dovresti vedere un output simile a questo:
less
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
Potresti vedere un ordine diverso dei worker e dei messaggi stampati. Possiamo vedere come funziona questo codice dai messaggi: i worker 0 e 3 hanno ricevuto le prime due richieste. Il server ha smesso di accettare connessioni dopo la seconda connessione, e l’implementazione di Drop su ThreadPool inizia ad eseguire prima che il worker 3 inizi anche solo il suo lavoro. Rilasciare il mittente disconnette tutti i worker e dice loro di spegnersi. I worker stampano ciascuno un messaggio quando si disconnettono, e poi il pool di thread chiama join per attendere che ciascun thread del worker finisca.
Nota un aspetto interessante di questa particolare esecuzione: ThreadPool ha rilasciato il mittente, e prima che qualsiasi worker ricevesse un errore, abbiamo provato a unire il worker 0. Il worker 0 non aveva ancora ricevuto un errore da recv, quindi il thread principale è bloccato in attesa che il worker 0 finisca. Nel frattempo, il worker 3 ha ricevuto un lavoro e quindi tutti i thread hanno ricevuto un errore. Quando il worker 0 ha finito, il thread principale ha atteso che il resto dei worker finisse. A quel punto, erano tutti usciti dai loro loop e si erano fermati.
Congratulazioni! Abbiamo completato il nostro progetto; abbiamo un server web di base che utilizza un pool di thread per rispondere in modo asincrono. Siamo in grado di eseguire una chiusura delicata del server, che pulisce tutti i thread nel pool.
Ecco il codice completo per riferimento:
rust
// Contenuto di src/main.rs
use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration; fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) {
let stream = stream.unwrap(); pool.execute(|| {
handle_connection(stream);
});
} println!("Shutting down.");
} fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
}; let contents = fs::read_to_string(filename).unwrap(); let response = format!(
"{}\r\nContent-Length: {}\r\n\r\n{}",
status_line,
contents.len(),
contents
); stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
} // Contenuto di src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
}; pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
} type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
} ThreadPool {
workers,
sender: Some(sender),
}
} pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap();
}
} impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take()); for worker in &mut self.workers {
println!("Arresto del worker {}", worker.id); if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
} struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
} impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv(); match message {
Ok(job) => {
println!("Worker {id} got a job; executing."); job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Potremmo fare di più qui! Se vuoi continuare a migliorare questo progetto, ecco alcune idee:
- Aggiungi più documentazione a ThreadPool e ai suoi metodi pubblici.
- Aggiungi test sulla funzionalità della libreria.
- Cambia le chiamate a unwrap in un sistema di gestione degli errori più robusto.
- Usa ThreadPool per eseguire un compito diverso dal servire richieste web.
- Trova una crate di pool di thread su crates.io e implementa un server web simile usando la crate. Poi confronta la sua API e la sua robustezza con il pool di thread che abbiamo implementato.
Sommario
Ben fatto! Sei arrivato alla fine del corso! Vogliamo ringraziarti per esserti unito a noi in questo tour di Rust. Ora sei pronto per implementare i tuoi progetti Rust e aiutare gli altri con i loro progetti. Tieni presente che c’è una comunità accogliente di altri Rustaceans che sarebbe felice di aiutarti con qualsiasi sfida tu incontri nel tuo viaggio con Rust.