Rust多线程编程和异步编程
多线程编程和异步编程
多线程编程和异步编程都是提高程序性能和响应能力的重要手段,在具体应用时需要结合任务特点和系统架构进行选择(在某些情况下两者也可以结合使用,发挥各自的优势)。
二者定义:
-
多线程编程指使用多个线程并行执行任务的编程方式。线程是操作系统分配CPU时间片的基本单位。
-
异步编程指程序不需要等待某个操作完成就可以继续执行其他操作的编程方式。
执行机制:
-
多线程编程依赖于操作系统在多个线程之间切换执行,利用多核CPU提高并行计算能力。
-
异步编程通过事件驱动、回调函数、Promises或async/await等机制实现,避免程序因某个操作阻塞而停滞不前。
应用场景:
- 多线程编程适用于CPU密集型任务,如数学计算、图像处理等。
- 异步编程适用于I/O密集型任务,如网络请求、文件读写等,可以提高资源利用率和响应速度。
难度:
-
多线程编程涉及线程同步、死锁等复杂问题,编程难度较高。
-
异步编程虽然编程复杂度较低,但需要掌握事件驱动、回调等编程模式。
thread::spawn与Rust的多线程编程
下面是Rust实现一段简单代码:
执行fn1(耗时3s),
执行fn2(耗时6s),
执行fn3(耗时4s)。
这三个func由先到后依次串行执行,最后打印出总的耗时
use std::time::{Duration, Instant}; use std::thread; fn fn1() { thread::sleep(Duration::from_secs(3)); } fn fn2() { thread::sleep(Duration::from_secs(6)); } fn fn3() { thread::sleep(Duration::from_secs(4)); } fn main() { let start = Instant::now(); println!("开始执行 fn1"); fn1(); println!("fn1 执行完毕"); println!("-------"); println!("开始执行 fn2"); fn2(); println!("fn2 执行完毕"); println!("-------"); println!("开始执行 fn3"); fn3(); println!("fn3 执行完毕"); println!("-------"); let duration = start.elapsed(); println!("总耗时: {:?}", duration); }
输出:
开始执行 fn1 fn1 执行完毕 ------- 开始执行 fn2 fn2 执行完毕 ------- 开始执行 fn3 fn3 执行完毕 ------- 总耗时: 13.011184791s
很多情况下,几个func之间并没有依赖关系,完全可以同时执行,即 将串行改为并行,这样耗时就不再是三者耗时之和,而是其中耗时最大的那个func执行所需的时间。
use std::time::{Duration, Instant}; use std::thread; fn fn1() { thread::sleep(Duration::from_secs(3)); } fn fn2() { thread::sleep(Duration::from_secs(6)); } fn fn3() { thread::sleep(Duration::from_secs(4)); } fn main() { let start = Instant::now(); let handle1 = thread::spawn(|| { println!("开始执行 fn1"); fn1(); println!("fn1 执行完毕"); }); let handle2 = thread::spawn(|| { println!("开始执行 fn2"); fn2(); println!("fn2 执行完毕"); }); let handle3 = thread::spawn(|| { println!("开始执行 fn3"); fn3(); println!("fn3 执行完毕"); }); // 等待所有线程完成 handle1.join().unwrap(); handle2.join().unwrap(); handle3.join().unwrap(); let duration = start.elapsed(); println!("总耗时: {:?}", duration); }
输出:
开始执行 fn1 开始执行 fn2 开始执行 fn3 fn1 执行完毕 fn3 执行完毕 fn2 执行完毕 总耗时: 6.00545525s
使用thread::spawn为每个func创建一个新线程。 每个thread::spawn返回一个JoinHandle,可以用它来等待线程完成。
在主线程中,我们使用join()方法等待所有子线程完成。
由于这三个func是并行执行的,总耗时将接近最长的单个函数耗时(在此例中是fn2的6秒),而不是三个func耗时的总和。
不难用Go语言实现以上同样的操作
package main import ( "fmt" "sync" "time" ) func fn1() { time.Sleep(3 * time.Second) } func fn2() { time.Sleep(6 * time.Second) } func fn3() { time.Sleep(4 * time.Second) } func main() { start := time.Now() var wg sync.WaitGroup wg.Add(3) go func() { defer wg.Done() fmt.Println("开始执行 fn1") fn1() fmt.Println("fn1 执行完毕") }() go func() { defer wg.Done() fmt.Println("开始执行 fn2") fn2() fmt.Println("fn2 执行完毕") }() go func() { defer wg.Done() fmt.Println("开始执行 fn3") fn3() fmt.Println("fn3 执行完毕") }() wg.Wait() duration := time.Since(start) fmt.Printf("总耗时: %v\n", duration) }
输出:
开始执行 fn1 开始执行 fn3 开始执行 fn2 fn1 执行完毕 fn3 执行完毕 fn2 执行完毕 总耗时: 6s
Rust中的thread::spawn类似Go中的go关键字,创建多个线程(或协程),返回的JoinHandle则类似Go中的sync.WaitGroup或channel,用来优雅等待 子线程(或协程)执行结束。
我们知道,和线程本身的内存资源占用及线程调度所需的开销相比,goroutine的最大优势是轻量级,可以开成千上万个。
在Go中很常见在for循环中使用go关键字,启动若干的协程,同时调用某个子方法或闭包函数。
例如,有这样需求: 我想请求Alexa排名前5万的网站,拿到每个网站的http状态码。并维护一个map,以网站域名为key,http状态码为value。
func main() { // 读取Alexa排名前5万的网站 websites, err := readWebsites("top-50000.txt") if err != nil { fmt.Println("Error reading websites:", err) return } results := make(map[string]int) var mutex sync.Mutex var wg sync.WaitGroup for _, site := range websites { wg.Add(1) go func(site string) { defer wg.Done() status := getStatus(site) mutex.Lock() results[site] = status mutex.Unlock() }(site) } wg.Wait() // 打印结果 for site, status := range results { fmt.Printf("%s: %d\n", site, status) } } func getStatus(site string) int { time.Sleep(3e9) return 200 }
以上代码没有进行协程数量的控制,而getStatus中有3s的休眠期,程序运行后的3s内,将产生几万的goroutine。
简化以上场景,实际编码运行一下,不请求前5万的网站了,而模拟打印从1-5万的数字,并在每次打印前sleep 3s
package main import ( "fmt" "sync" "time" ) func printWithDelay(num int, wg *sync.WaitGroup) { defer wg.Done() time.Sleep(3 * time.Second) fmt.Println(num) } func main() { start := time.Now() var wg sync.WaitGroup for i := 1; i wg.Add(1) go printWithDelay(i, &wg) } wg.Wait() duration := time.Since(start) fmt.Printf("Total time: %v\n", duration) } Duration, Instant}; use std::sync::{Arc, Mutex}; fn print_with_delay(num: u32) { thread::sleep(Duration::from_secs(3)); println!("{}", num); } fn main() { let start = Instant::now(); let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for i in 1..=50000 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { print_with_delay(i); let mut count = counter.lock().unwrap(); *count += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } let duration = start.elapsed(); println!("Total time: {:?}", duration); println!("Total numbers printed: {}", *counter.lock().unwrap()); } code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" } note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace Duration, Instant}; use rayon::prelude::*; fn print_with_delay(num: u32) { std::thread::sleep(Duration::from_secs(2)); println!("{}", num); } fn main() { let start = Instant::now(); // 配置线程池,这里使用系统的逻辑核心数量作为线程数 rayon::ThreadPoolBuilder::new() .num_threads(num_cpus::get()) .build_global() .unwrap(); (1..=50000).into_par_iter().for_each(|i| { print_with_delay(i); }); let duration = start.elapsed(); println!("Total time: {:?}", duration); } Context, Poll}; use std::time::{Duration, Instant}; use futures::executor::block_on; use futures::future::{join_all, BoxFuture}; struct Sleep { duration: Duration, start: Option type Output = (); fn poll(mut self: Pin
-
-