SE VUOI PRENDERE LA CERTIFICAZIONE PER QUESTO CORSO CLICCA QUI
Trasformiamo il nostro Server Single-Threaded in un Server Multithreaded
Attualmente, il server elabora ogni richiesta una alla volta, il che significa che non processerà una seconda connessione fino a quando la prima non avrà finito di essere elaborata. Se il server ricevesse sempre più richieste, questa esecuzione seriale sarebbe sempre meno ottimale. Se il server ricevesse una richiesta che richiede molto tempo per essere elaborata, le richieste successive dovranno attendere fino a quando la richiesta lunga non sarà stata completata, anche se le nuove richieste potrebbero essere elaborate rapidamente. Dovremo risolvere questo problema, ma prima vedremo il problema in azione.
Simulare una Richiesta Lenta nell’Implementazione Attuale del Server
Vedremo come una richiesta di elaborazione lenta può influenzare altre richieste fatte alla nostra attuale implementazione del server. Il codice seguente implementa la gestione di una richiesta a /sleep con una risposta simulata lenta che farà dormire il server per 5 secondi prima di rispondere.
rust
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
// --snip-- fn handle_connection(mut stream: TcpStream) {
// --snip-- let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
// --snip--
}
Abbiamo cambiato da if
a match
ora che abbiamo tre casi. Dobbiamo corrispondere esplicitamente a una fetta di request_line
per fare il pattern matching contro i valori letterali della stringa; match
non fa automaticamente riferimenti e dereferenziazioni come fa il metodo di uguaglianza.
Il primo caso è lo stesso del blocco if
dal Listato 20-9. Il secondo caso corrisponde a una richiesta a /sleep. Quando viene ricevuta quella richiesta, il server dormirà per 5 secondi prima di visualizzare la pagina HTML di successo. Il terzo caso è lo stesso del blocco else
dal Listato 20-9.
Puoi vedere quanto sia primitivo il nostro server: le librerie reali gestirebbero il riconoscimento di più richieste in un modo molto meno verboso!
Avvia il server usando cargo run
. Quindi apri due finestre del browser: una per http://127.0.0.1:7878/ e l’altra per http://127.0.0.1:7878/sleep. Se inserisci l’URI / un paio di volte, come prima, vedrai che risponde rapidamente. Ma se inserisci /sleep e poi carichi /, vedrai che / attende finché sleep non ha dormito per tutti i suoi 5 secondi prima di caricarsi.
Esistono diverse tecniche che potremmo usare per evitare che le richieste si accumulino dietro a una richiesta lenta; quella che implementeremo è un pool di thread.
Aumentare la Capacità con un Pool di Thread
Un pool di thread è un gruppo di thread creati e pronti per gestire un compito. Quando il programma riceve un nuovo compito, assegna uno dei thread nel pool al compito e quel thread elaborerà il compito. I thread rimanenti nel pool sono disponibili per gestire eventuali altri compiti che arrivano mentre il primo thread sta elaborando. Quando il primo thread ha finito di elaborare il compito, viene restituito al pool di thread inattivi, pronto per gestire un nuovo compito. Un pool di thread ti consente di elaborare le connessioni contemporaneamente, aumentando la capacità del tuo server.
Limiteremo il numero di thread nel pool a un numero piccolo per proteggerci dagli attacchi di Denial of Service (DoS); se il nostro programma dovesse creare un nuovo thread per ogni richiesta mentre arrivano, qualcuno che effettua 10 milioni di richieste al nostro server potrebbe creare caos usando tutte le risorse del nostro server e rallentando l’elaborazione delle richieste fino a fermarla.
Piuttosto che spawnare thread illimitati, quindi, avremo un numero fisso di thread in attesa nel pool. Le richieste in arrivo vengono inviate al pool per l’elaborazione. Il pool manterrà una coda di richieste in arrivo. Ciascuno dei thread nel pool prenderà una richiesta da questa coda, gestirà la richiesta e quindi chiederà alla coda un’altra richiesta. Con questo design, possiamo elaborare fino a N richieste contemporaneamente, dove N è il numero di thread. Se ogni thread sta rispondendo a una richiesta a lungo termine, le richieste successive possono ancora accumularsi nella coda, ma abbiamo aumentato il numero di richieste a lungo termine che possiamo gestire prima di raggiungere quel punto.
Questa tecnica è solo una delle tante modalità per migliorare la capacità di un server web. Altre opzioni che potresti esplorare sono il modello fork/join, il modello di I/O asincrono single-threaded o il modello di I/O asincrono multi-threaded. Se sei interessato a questo argomento, puoi leggere di più su altre soluzioni e provare a implementarle; con un linguaggio di basso livello come Rust, tutte queste opzioni sono possibili.
Prima di iniziare a implementare un pool di thread, parliamo di come dovrebbe essere utilizzare il pool. Quando cerchi di progettare un codice, scrivere prima l’interfaccia del cliente può aiutare a guidare la progettazione. Scrivi l’API del codice in modo che sia strutturata nel modo in cui vuoi chiamarla; quindi implementa la funzionalità all’interno di quella struttura anziché implementare la funzionalità e quindi progettare l’API pubblica.
Similmente a come abbiamo usato lo sviluppo guidato dai test nel progetto nel Capitolo 12, useremo lo sviluppo guidato dal compilatore qui. Scriveremo il codice che chiama le funzioni desiderate, e quindi guarderemo gli errori dal compilatore per determinare cosa dobbiamo cambiare per far funzionare il codice. Prima di farlo, però, esploreremo la tecnica che non useremo come punto di partenza.
Creazione di un Thread per Ogni Richiesta
Prima di tutto, esploriamo come potrebbe apparire il nostro codice se creasse un nuovo thread per ogni connessione. Come accennato in precedenza, questo non è il nostro piano finale a causa dei problemi con la possibile creazione di un numero illimitato di thread, ma è un punto di partenza per ottenere prima un server multithreaded funzionante. Poi aggiungeremo il pool di thread come miglioramento, e contrastare le due soluzioni sarà più facile. Il Listato 20-11 mostra le modifiche da apportare a main per avviare un nuovo thread per gestire ogni flusso all’interno del ciclo for.
rust
use std::thread;
use std::net::TcpListener; fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
Come hai imparato nel Capitolo 16, thread::spawn
creerà un nuovo thread e quindi eseguirà il codice nel closure nel nuovo thread. Se esegui questo codice e carichi /sleep nel tuo browser, quindi / in altre due schede del browser, vedrai effettivamente che le richieste a / non devono attendere che /sleep finisca. Tuttavia, come abbiamo detto, questo alla fine sovracaricherà il sistema perché stai creando nuovi thread senza alcun limite.
Creazione di un Numero Finito di Thread
Vogliamo che il nostro pool di thread funzioni in modo simile e familiare, in modo che passare dai thread a un pool di thread non richieda grandi modifiche al codice che utilizza la nostra API. Il Listato 20-12 mostra l’interfaccia ipotetica per una struttura ThreadPool che vogliamo utilizzare invece di thread::spawn.
rust
use hello::ThreadPool;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4); for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
Non funziona ancora, ma controlliamolo di nuovo per ottenere il prossimo errore che dobbiamo risolvere.
shell
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
Questo errore indica che dobbiamo creare una funzione associata chiamata new per ThreadPool. Sappiamo anche che new deve avere un parametro che può accettare 4 come argomento e dovrebbe restituire un’istanza di ThreadPool. Implementiamo quindi la funzione new più semplice che avrà queste caratteristiche:
rust
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
Controlliamolo di nuovo:
shell
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| ^^^^^^^ method not found in `ThreadPool`
Ora l’errore si verifica perché non abbiamo un metodo execute su ThreadPool. Ricordiamo dalla sezione “Creazione di un Numero Finito di Thread” che abbiamo deciso che il nostro pool di thread dovrebbe avere un’interfaccia simile a thread::spawn. Inoltre, implementeremo la funzione execute in modo che prenda il closure che gli viene dato e lo dia a un thread inattivo nel pool per eseguirlo.
Definiamo il metodo execute su ThreadPool per prendere un closure come parametro. Ricordiamo dalla sezione “Moving Captured Values Out of the Closure and the Fn Traits” nel Capitolo 13 che possiamo prendere chiusure come parametri con tre diversi tratti: Fn, FnMut e FnOnce. Dobbiamo decidere quale tipo di chiusura utilizzare qui. Sappiamo che alla fine faremo qualcosa di simile all’implementazione di thread::spawn della libreria standard, quindi possiamo guardare quali vincoli ha la firma di thread::spawn sul suo parametro. La documentazione ci mostra quanto segue:
rust
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Il parametro di tipo F è quello che ci interessa qui; il parametro di tipo T è relativo al valore di ritorno, e non ci interessa quello. Possiamo vedere che spawn utilizza FnOnce come vincolo di tratto su F. Questo è probabilmente quello che vogliamo anche noi, perché alla fine passeremo l’argomento che riceviamo in execute a spawn. Possiamo essere ancora più sicuri che FnOnce sia il tratto che vogliamo usare perché il thread per eseguire una richiesta eseguirà solo quella chiusura della richiesta una sola volta, che corrisponde all’Once in FnOnce.
Il parametro di tipo F ha anche il vincolo di tratto Send e il vincolo di durata ‘static, che sono utili nella nostra situazione: abbiamo bisogno di Send per trasferire la chiusura da un thread a un altro e ‘static perché non sappiamo quanto tempo il thread impiegherà ad eseguire. Creiamo quindi un metodo execute su ThreadPool che prenda un parametro generico di tipo F con questi vincoli:
rust
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Usiamo ancora () dopo FnOnce perché questo FnOnce rappresenta una chiusura che non prende parametri e restituisce il tipo unit (). Proprio come le definizioni di funzioni, il tipo di ritorno può essere omesso dalla firma, ma anche se non abbiamo parametri, abbiamo ancora bisogno delle parentesi.
Ancora una volta, questa è l’implementazione più semplice del metodo execute: non fa nulla, ma stiamo cercando solo di compilare il nostro codice. Controlliamolo di nuovo:
shell
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 0.24s
Compila! Ma nota che se provi cargo run e fai una richiesta nel browser, vedrai gli errori nel browser che abbiamo visto all’inizio del capitolo. La nostra libreria non sta effettivamente chiamando il closure passato a execute!
Convalida del Numero di Thread in new
Non stiamo facendo nulla con i parametri di new ed execute. Implementiamo i corpi di queste funzioni con il comportamento desiderato. Per iniziare, pensiamo a new. In precedenza abbiamo scelto un tipo non negativo per il parametro size, perché un pool con un numero negativo di thread non ha senso. Tuttavia, un pool con zero thread non ha senso, eppure zero è un usize perfettamente valido. Aggiungeremo del codice per verificare che size sia maggiore di zero prima di restituire un’istanza di ThreadPool e far panicare il programma se riceve uno zero utilizzando il macro assert!, come mostrato nel Listato 20-13.
rust
impl ThreadPool {
/// Crea un nuovo ThreadPool.
///
/// La dimensione è il numero di thread nel pool.
///
/// # Panics
///
/// La funzione `new` farà panicare se la dimensione è zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0); ThreadPool
}
// --snip--
}
Abbiamo anche aggiunto della documentazione per il nostro ThreadPool con commenti doc. Nota che abbiamo seguito le buone pratiche di documentazione aggiungendo una sezione che specifica le situazioni in cui la nostra funzione può fare panic, come discusso nel Capitolo 14. Prova a eseguire cargo doc –open e fare clic sulla struttura ThreadPool per vedere com’è fatta la documentazione generata per new!
Invece di aggiungere il macro assert! come abbiamo fatto qui, potremmo cambiare new in build e restituire un Result come abbiamo fatto con Config::build nel progetto I/O nel Listato 12-9. Ma abbiamo deciso in questo caso che cercare di creare un pool di thread senza thread dovrebbe essere un errore irreversibile. Se sei ambizioso, prova a scrivere una funzione chiamata build con la seguente firma per confrontarla con la funzione new:
rust
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
Creazione dello Spazio per Conservare i Thread
Ora che abbiamo un modo per sapere che abbiamo un numero valido di thread da conservare nel pool, possiamo creare quei thread e conservarli nella struttura ThreadPool prima di restituire la struttura. Ma come “conserviamo” un thread? Diamo un’altra occhiata alla firma di thread::spawn:
rust
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
La funzione spawn restituisce un JoinHandle<T>, dove T è il tipo che la chiusura restituisce. Proviamo anche a usare JoinHandle e vediamo cosa succede. Nel nostro caso, le chiusure che passiamo al pool di thread gestiranno la connessione e non restituiranno nulla, quindi T sarà il tipo unit ().
Il codice nel Listato 20-14 compilerà ma non creerà ancora nessun thread. Abbiamo cambiato la definizione di ThreadPool per contenere un vettore di istanze di thread::JoinHandle<()>, inizializzato il vettore con una capacità di size, impostato un ciclo for che eseguirà del codice per creare i thread e restituito un’istanza di ThreadPool che li contenga.
rust
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
} impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size {
// crea alcuni thread e conservali nel vettore
}
ThreadPool { threads }
}
// --snip--
}
Abbiamo portato std::thread nel contesto del crate della libreria, perché stiamo usando thread::JoinHandle come tipo degli elementi nel vettore in ThreadPool.
Una volta ricevuta una dimensione valida, il nostro ThreadPool crea un nuovo vettore che può contenere size elementi. La funzione with_capacity svolge lo stesso compito di Vec::new ma con una differenza importante: prealloca spazio nel vettore. Poiché sappiamo di dover memorizzare size elementi nel vettore, fare questa allocazione anticipata è leggermente più efficiente che utilizzare Vec::new, che si ridimensiona man mano che vengono inseriti gli elementi.
Quando esegui nuovamente cargo check, dovrebbe avere successo.
Una Struttura Lavoratore Responsabile dell’Invio del Codice dal ThreadPool a un Thread
Abbiamo lasciato un commento nel ciclo for nel Listato 20-14 riguardante la creazione di thread. Qui, vedremo come creare effettivamente i thread. La libreria standard fornisce thread::spawn come modo per creare thread, e thread::spawn si aspetta di ricevere del codice che il thread dovrebbe eseguire non appena viene creato il thread. Tuttavia, nel nostro caso, vogliamo creare i thread e farli attendere del codice che invieremo in seguito. L’implementazione della libreria standard dei thread non include alcun modo per farlo; dobbiamo implementarlo manualmente.
Implementeremo questo comportamento introducendo una nuova struttura dati tra ThreadPool e i thread che gestirà questo nuovo comportamento. Chiameremo questa struttura dati Worker, che è un termine comune nelle implementazioni di pooling. Il Worker prende il codice che deve essere eseguito e lo esegue nel thread del Worker. Pensate alle persone che lavorano in cucina in un ristorante: i lavoratori aspettano fino a quando arrivano gli ordini dai clienti e poi sono responsabili di prendere quegli ordini e soddisfarli.
Invece di memorizzare un vettore di istanze di JoinHandle<()> nel pool di thread, memorizzeremo le istanze della struttura Worker. Ogni Worker memorizzerà una singola istanza di JoinHandle<()>. Poi implementeremo un metodo su Worker che prenderà una chiusura di codice da eseguire e la invierà al thread già in esecuzione per l’esecuzione. Daremo anche a ciascun lavoratore un id in modo da poter distinguere tra i diversi lavoratori nel pool durante la registrazione o il debug.
Ecco il nuovo processo che avverrà quando creeremo un ThreadPool. Implementeremo il codice che invia la chiusura al thread dopo aver impostato Worker in questo modo:
python
Definire una struttura Worker che contiene un id e un JoinHandle<()>.
Cambiare ThreadPool per contenere un vettore di istanze Worker.
Definire una funzione Worker::new che prende un numero di id e restituisce un'istanza Worker che contiene l'id e un thread spawnato con una chiusura vuota.
In ThreadPool::new, utilizzare il contatore del ciclo for per generare un id, creare un nuovo Worker con quell'id e conservare il worker nel vettore.
Se sei pronto per una sfida, prova a implementare queste modifiche da solo prima di guardare il codice nel Listato 20-15.
Pronto? Ecco il Listato 20-15 con un modo per apportare le modifiche precedenti.
rust
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
} impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id));
} ThreadPool { workers }
}
// --snip--
} struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
} impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
Abbiamo cambiato il nome del campo su ThreadPool da threads a workers perché ora contiene istanze Worker invece di istanze JoinHandle<()>. Usiamo il contatore nel ciclo for come argomento per Worker::new, e conserviamo ogni nuovo Worker nel vettore chiamato workers.
Il codice esterno (come il nostro server in src/main.rs) non ha bisogno di conoscere i dettagli implementativi relativi all’uso di una struttura Worker all’interno di ThreadPool, quindi rendiamo la struttura Worker e la sua funzione new private. La funzione Worker::new utilizza l’id che le abbiamo dato e memorizza un’istanza JoinHandle<()> che viene creata spawnando un nuovo thread utilizzando una chiusura vuota.
arduino
Nota: Se il sistema operativo non può creare un thread perché non ci sono abbastanza risorse di sistema, thread::spawn farà panic. Questo causerà il panic di tutto il nostro server, anche se la creazione di alcuni thread potrebbe avere successo. Per semplicità, questo comportamento va bene, ma in un'implementazione di pool di thread in produzione, probabilmente vorresti usare std::thread::Builder e il suo metodo spawn che restituisce Result invece.
Invio delle Richieste ai Thread tramite Canali
Ora affronteremo il problema successivo: le chiusure date a thread::spawn non fanno assolutamente nulla. Attualmente, otteniamo la chiusura che vogliamo eseguire nel metodo execute. Ma dobbiamo fornire a thread::spawn una chiusura da eseguire quando creiamo ogni Worker durante la creazione del ThreadPool.
Vogliamo che le strutture Worker che abbiamo appena creato prendano il codice da eseguire da una coda mantenuta nel ThreadPool e inviino quel codice al proprio thread per l’esecuzione.
I canali che abbiamo imparato nel Capitolo 16 – un modo semplice per comunicare tra due thread – sarebbero perfetti per questo caso d’uso. Useremo un canale per fungere da coda di lavori, e execute invierà un lavoro dal ThreadPool alle istanze Worker, che invieranno il lavoro al proprio thread. Ecco il piano:
- Il ThreadPool creerà un canale e conserverà il mittente.
- Ogni Worker conserverà il ricevitore.
- Creeremo una nuova struttura Job che conterrà le chiusure che vogliamo inviare attraverso il canale.
- Il metodo execute invierà il lavoro che vuole eseguire attraverso il mittente.
- Nel suo thread, il Worker eseguirà un loop sul proprio ricevitore ed eseguirà le chiusure di tutti i lavori che riceve.
Iniziamo creando un canale in ThreadPool::new e conservando il mittente nell’istanza del ThreadPool, come mostrato nel Listato 20-16. La struttura Job per ora non contiene nulla ma sarà il tipo di elemento che stiamo inviando attraverso il canale.
rust
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
} struct Job; impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
}
Nel metodo ThreadPool::new, creiamo il nostro nuovo canale e facciamo sì che il pool conservi il mittente. Questo verrà compilato correttamente.
Proviamo a passare un ricevitore del canale a ciascun Worker mentre il thread pool crea il canale. Sappiamo che vogliamo usare il ricevitore nel thread che i lavoratori spawnano, quindi faremo riferimento al parametro ricevitore nella chiusura. Il codice nel Listato 20-17 non compilerà ancora.
rust
use std::{sync::mpsc, thread};
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id, receiver));
} ThreadPool { workers, sender }
}
} impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Abbiamo apportato alcune piccole e semplici modifiche: passiamo il ricevitore in Worker::new, e poi lo usiamo all’interno della chiusura.
Quando proviamo a controllare questo codice, otteniamo questo errore:
rust
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
Il codice sta cercando di passare il ricevitore a più istanze Worker. Questo non funzionerà, come ricorderai dal Capitolo 16: l’implementazione del canale che Rust fornisce è multi-produttore, single-consumer. Questo significa che non possiamo semplicemente clonare l’estremità consumatrice del canale per correggere questo codice. Inoltre, non vogliamo inviare un messaggio più volte a più consumatori; vogliamo una lista di messaggi con più lavoratori in modo che ciascun messaggio venga elaborato una volta.
Inoltre, prendere un lavoro dalla coda del canale comporta la mutazione del ricevitore, quindi i thread devono avere un modo sicuro per condividere e modificare il ricevitore; in caso contrario, potremmo ottenere delle condizioni di gara (come discusso nel Capitolo 16).
Ricorda i puntatori intelligenti thread-safe discussi nel Capitolo 16: per condividere la proprietà tra più thread e consentire ai thread di mutare il valore, dobbiamo usare Arc<Mutex<T>>. Il tipo Arc permetterà a più lavoratori di possedere il ricevitore, e Mutex garantirà che solo un lavoratore ottenga un lavoro dal ricevitore alla volta. Il Listato 20-18 mostra le modifiche che dobbiamo apportare.
rust
use std::{
sync::{mpsc, Arc, Mutex},
thread,
}; impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
} ThreadPool { workers, sender }
}
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
}
}
Nel metodo ThreadPool::new, mettiamo il ricevitore in un Arc e un Mutex. Per ogni nuovo Worker, cloniamo l’Arc per aumentare il conteggio dei riferimenti in modo che i lavoratori possano condividere la proprietà del ricevitore.
Con queste modifiche, il codice viene compilato! Stiamo arrivando là!
Implementazione del Metodo execute
Finalmente implementiamo il metodo execute su ThreadPool. Cambieremo anche Job da una struttura a un alias di tipo per un oggetto di tipo trait che contiene il tipo di chiusura che execute riceve. Come discusso nella sezione “Creazione di Sinonimi di Tipo con Alias di Tipo” del Capitolo 19, gli alias di tipo ci consentono di rendere più brevi i tipi lunghi per facilità d’uso. Guarda il Listato 20-19.
rust
use std::{sync::mpsc, thread};
type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool {
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
Dopo aver creato una nuova istanza Job utilizzando la chiusura che otteniamo in execute, inviamo quel lavoro alla fine di invio del canale. Stiamo chiamando unwrap su send nel caso in cui l’invio fallisca. Questo potrebbe accadere se, ad esempio, interrompiamo tutti i nostri thread dall’esecuzione, il che significa che l’estremità ricevente ha smesso di ricevere nuovi messaggi. Al momento, non possiamo fermare i nostri thread dall’esecuzione: i nostri thread continuano a eseguire fintanto che il pool esiste. Il motivo per cui usiamo unwrap è che sappiamo che il caso di fallimento non si verificherà, ma il compilatore non lo sa.
Ma non siamo ancora del tutto finiti! Nel worker, la nostra chiusura passata a thread::spawn fa ancora riferimento solo all’estremità ricevente del canale. Invece, abbiamo bisogno che la chiusura faccia un loop all’infinito, chiedendo all’estremità ricevente del canale un lavoro ed eseguendo il lavoro quando ne riceve uno. Facciamo il cambiamento mostrato nel Listato 20-20 a Worker::new.
rust
use std::{
sync::{mpsc, Arc, Mutex},
thread,
}; impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job();
});
Worker { id, thread }
}
}
Qui, chiamiamo prima lock sul ricevitore per acquisire il mutex, e poi chiamiamo unwrap per causare un panico su eventuali errori. L’acquisizione di un blocco potrebbe fallire se il mutex è in uno stato velenoso, cosa che può accadere se un altro thread ha provocato un panico mentre deteneva il blocco anziché rilasciarlo. In questa situazione, chiamare unwrap per far panneggiare questo thread è l’azione corretta da compiere. Sentiti libero di cambiare questo unwrap in un expect con un messaggio di errore che ha un significato per te.
Se otteniamo il blocco sul mutex, chiamiamo recv per ricevere un Job dal canale. Anche qui un unwrap finale supera eventuali errori, che potrebbero verificarsi se il thread che tiene il mittente si è arrestato, simile a come il metodo send restituisce Err se il ricevitore si ferma.
La chiamata a recv è bloccante, quindi se non c’è ancora un lavoro, il thread corrente attende fino a quando non diventa disponibile un lavoro. Il Mutex<T> garantisce che solo un thread Worker alla volta stia cercando di richiedere un lavoro.
Il nostro pool di thread è ora in uno stato di funzionamento! Dai un cargo run e fai alcune richieste:
less
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^ warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Successo! Ora abbiamo un pool di thread che esegue le connessioni in modo asincrono. Non vengono mai creati più di quattro thread, quindi il nostro sistema non sarà sovraccarico se il server riceve molte richieste. Se facciamo una richiesta a /sleep, il server sarà in grado di servire altre richieste facendo eseguire un altro thread.
r
Nota: se apri /sleep in più finestre del browser contemporaneamente, potrebbero caricarsi una alla volta in intervalli di 5 secondi. Alcuni browser web eseguono più istanze della stessa richiesta sequenzialmente per motivi di caching. Questo limite non è causato dal nostro server web.
Dopo aver appreso del ciclo while let nel Capitolo 18, potresti chiederti perché non abbiamo scritto il codice del thread worker come mostrato nel Listato 20-21.
rust
use std::{
sync::{mpsc, Arc, Mutex},
thread,
}; impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing."); job();
}
});
Worker { id, thread }
}
}
Questo codice viene compilato ed eseguito ma non produce il comportamento desiderato: una richiesta lenta causerà comunque l’attesa delle altre richieste da elaborare. Il motivo è un po’ sottile: la struttura Mutex non ha un metodo di sblocco pubblico perché il possesso del blocco si basa sulla durata di MutexGuard<T> all’interno di LockResult<MutexGuard<T>> che il metodo lock restituisce. Al momento della compilazione, il controllo degli affitti può quindi far rispettare la regola che una risorsa protetta da un Mutex non può essere accessa a meno che non si tenga il blocco. Tuttavia, questa implementazione può anche comportare il mantenimento del blocco per un tempo più lungo del previsto se non si tiene conto della durata di MutexGuard<T>.
Il codice nel Listato 20-20 che utilizza let job = receiver.lock().unwrap().recv().unwrap(); funziona perché con let, tutti i valori temporanei utilizzati nell’espressione sul lato destro del segno uguale vengono immediatamente eliminati quando termina l’istruzione let. Tuttavia, while let (e if let e match) non elimina i valori temporanei fino alla fine del blocco associato. Nel Listato 20-21, il blocco rimane tenuto per la durata della chiamata a job(), il che significa che altri worker non possono ricevere lavori.