第20ç« å¼æ¥ç¼ç¨
# 第20ç« å¼æ¥ç¼ç¨
åè®¾ä½ æ£å¨ç¼åä¸ä¸ªè天æå¡å¨ãå¯¹äºæ¯ä¸ªç½ç»è¿æ¥ï¼é½æä¼ å ¥çæ°æ®å éè¦è§£æãä¼ åºçæ°æ®å éè¦ç»è£ ãå®å ¨åæ°éè¦ç®¡çãè天群ç»è®¢é éè¦è·è¸ªççãè¦åæ¶ç®¡ç许å¤è¿æ¥ï¼éè¦è¿è¡ä¸äºè§åã
çæ³æ åµä¸ï¼ä½ å¯ä»¥ä¸ºæ¯ä¸ªä¼ å ¥è¿æ¥å¯å¨ä¸ä¸ªåç¬ç线ç¨ï¼
use std::{net, thread};
let listener = net::TcpListener::bind(address)?;
for socket_result in listener.incoming() {
let socket = socket_result?;
let groups = chat_group_table.clone();
thread::spawn(|| {
log_error(serve(socket, groups));
});
}
2
3
4
5
6
7
8
9
10
å¯¹äºæ¯ä¸ªæ°è¿æ¥ï¼è¿æ®µä»£ç é½ä¼å¯å¨ä¸ä¸ªæ°çº¿ç¨æ¥è¿è¡serve彿°ï¼è¯¥å½æ°è½å¤ä¸æ³¨äºç®¡çåä¸ªè¿æ¥çéæ±ã
è¿ç§æ¹æ³å¨ä¸åè¿å±é¡ºå©æ¶é½è½å¾å¥½å°å·¥ä½ï¼ç´å°ç¨æ·æ°éçªç¶å¢å å°æ°ä¸ãä¸ä¸ªçº¿ç¨çæ å¢é¿å°100 KiBææ´å¤å¹¶ä¸ç½è§ï¼èè¿å¯è½ä¸æ¯ä½ æ³è¦çæå¡å¨å åä½¿ç¨æ¹å¼ã线ç¨å¯¹äºå¨å¤ä¸ªå¤çå¨ä¹é´åé å·¥ä½å¾æç¨ä¸å¿ è¦ï¼ä½å®ä»¬å¯¹å åçéæ±è¾å¤§ï¼å æ¤æä»¬é常éè¦ä¸äºä¸çº¿ç¨é å使ç¨çäºè¡¥æ¹å¼æ¥å解工ä½ã
ä½ å¯ä»¥ä½¿ç¨Rust弿¥ä»»å¡å¨åä¸ªçº¿ç¨æä¸ç»å·¥ä½çº¿ç¨ä¸äº¤éæ§è¡è®¸å¤ç¬ç«çæ´»å¨ã弿¥ä»»å¡ä¸çº¿ç¨ç±»ä¼¼ï¼ä½å建é度æ´å¿«ï¼å®ä»¬ä¹é´ä¼ éæ§å¶æçæçæ´é«ï¼å¹¶ä¸å åå¼éæ¯çº¿ç¨ä½ä¸ä¸ªæ°é级ãå¨å个ç¨åºä¸åæ¶è¿è¡æ°åä¸ä¸ªå¼æ¥ä»»å¡æ¯å®å ¨å¯è¡çãå½ç¶ï¼ä½ çåºç¨ç¨åºå¯è½ä»ç¶ä¼åå°å ¶ä»å ç´ çéå¶ï¼å¦ç½ç»å¸¦å®½ãæ°æ®åºé度ã计ç®è½åæå·¥ä½æ¬èº«çå åéæ±ï¼ä½ä½¿ç¨ä»»å¡æå¸¦æ¥çå åå¼éæ¯çº¿ç¨å°å¾å¤ã
ä¸è¬æ¥è¯´ï¼Rust弿¥ä»£ç çèµ·æ¥ä¸æ®éçå¤çº¿ç¨ä»£ç é常ç¸ä¼¼ï¼åªæ¯å¯è½ä¼é»å¡çæä½ï¼å¦I/Oæè·åäºæ¥éï¼éè¦ä»¥ç¨å¾®ä¸åçæ¹å¼å¤çã对è¿äºæä½è¿è¡ç¹æ®å¤çå¯ä»¥è®©Rustæ´å¤å°äºè§£ä½ ç代ç è¡ä¸ºï¼è¿ä¹æ¯å®ç°æ§è½æåçåå ãåé¢ä»£ç ç弿¥çæ¬å¦ä¸ï¼
use async_std::{net, task};
let listener = net::TcpListener::bind(address).await?;
let mut new_connections = listener.incoming();
while let Some(socket_result) = new_connections.next().await {
let socket = socket_result?;
let groups = chat_group_table.clone();
task::spawn(async {
log_error(serve(socket, groups).await);
});
}
2
3
4
5
6
7
8
9
10
11
è¿æ®µä»£ç 使ç¨äºasync_stdåºçç½ç»å任塿¨¡åï¼å¹¶å¨å¯è½é»å¡çè°ç¨åæ·»å äº.awaitã使»ä½ç»æä¸åºäºçº¿ç¨ççæ¬ç¸åã
æ¬ç« çç®æ ä¸ä» æ¯å¸®å©ä½ ç¼å弿¥ä»£ç ï¼è¿ä¼è¯¦ç»ä»ç»å ¶å·¥ä½åçï¼ä»¥ä¾¿ä½ è½å¤é¢æµå®å¨åºç¨ç¨åºä¸çæ§è½è¡¨ç°ï¼å¹¶äºè§£å ¶ææä»·å¼çåºç¨åºæ¯ã
- 为äºå±ç¤ºå¼æ¥ç¼ç¨çæºå¶ï¼æä»¬å°ååºä¸ç»æå°çè¯è¨ç¹æ§ï¼æ¶µçæææ ¸å¿æ¦å¿µï¼æªæ¥å¼ï¼futuresï¼ã弿¥å½æ°ãçå¾
表达å¼ï¼await expressionsï¼ãä»»å¡ï¼ä»¥å
block_onåspawn_localæ§è¡å¨ã - ç¶åæä»¬å°ä»ç»å¼æ¥åå
spawnæ§è¡å¨ãè¿äºå¯¹äºå®æå®é å·¥ä½è³å ³éè¦ï¼ä½ä»æ¦å¿µä¸è®²ï¼å®ä»¬åªæ¯æä»¬å颿å°çç¹æ§çåä½ãå¨è¿ä¸ªè¿ç¨ä¸ï¼æä»¬ä¼æåºä¸äºä½ å¨å¼æ¥ç¼ç¨ä¸å¯è½éå°çç¬ç¹é®é¢ï¼å¹¶è§£éå¦ä½å¤çå®ä»¬ã - 为äºå±ç¤ºææè¿äºé¨åå¦ä½ååå·¥ä½ï¼æä»¬å°éæ¥è®²è§£è天æå¡å¨å客æ·ç«¯ç宿´ä»£ç ï¼åé¢ç代ç çæ®µå°±æ¯å ¶ä¸çä¸é¨åã
- 为äºè¯´æåºæ¬çæªæ¥å¼åæ§è¡å¨æ¯å¦ä½å·¥ä½çï¼æä»¬å°ç»åº
spawn_blockingåblock_onçç®åä½å®ç¨çå®ç°ã - æåï¼æä»¬å°è§£é
Pinç±»åï¼å®ä¸æ¶åºç°å¨å¼æ¥æ¥å£ä¸ï¼ä»¥ç¡®ä¿å®å ¨å°ä½¿ç¨å¼æ¥å½æ°ååæªæ¥å¼ã
# ä»åæ¥å°å¼æ¥
èèä¸ä¸ï¼å½ä½ è°ç¨ä»¥ä¸ï¼é弿¥ï¼å®å ¨ä¼ ç»çï¼å½æ°æ¶ä¼åçä»ä¹ï¼
use std::io::prelude::*;
use std::net;
fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
let mut socket = net::TcpStream::connect((host, port))?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes())?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response)?;
Ok(response)
}
2
3
4
5
6
7
8
9
10
11
12
è¿ä¸ªå½æ°æå¼ä¸ä¸ªå°Webæå¡å¨çTCPè¿æ¥ï¼ç¨ä¸ç§è¿æ¶çåè®®åå ¶åéä¸ä¸ªåºæ¬çHTTP请æ±ï¼ç¶å读åååºãå¾20-1å±ç¤ºäºè¿ä¸ªå½æ°éæ¶é´çæ§è¡è¿ç¨ã
æ¤å¾å±ç¤ºäºå½æ°è°ç¨æ å¦ä½éçæ¶é´ä»å·¦åå³è¿è¡ãæ¯ä¸ªå½æ°è°ç¨é½æ¯ä¸ä¸ªæ¡ï¼æ¾ç½®å¨å
¶è°ç¨è
ä¹ä¸ãæ¾ç¶ï¼cheapo_request彿°å¨æ´ä¸ªæ§è¡è¿ç¨ä¸é½å¨è¿è¡ãå®è°ç¨Rustæ ååºä¸ç彿°ï¼å¦TcpStream::connect以åTcpStreamçwrite_allåread_to_stringå®ç°ãè¿äºå½æ°åä¼ä¾æ¬¡è°ç¨å
¶ä»å½æ°ï¼ä½æç»ç¨åºä¼è¿è¡ç³»ç»è°ç¨ï¼åæä½ç³»ç»ååºè¯·æ±ä»¥å®é
宿æäºæä½ï¼ä¾å¦æå¼ä¸ä¸ªTCPè¿æ¥æè¯»åä¸äºæ°æ®ã
å¾20-1 弿¥HTTP请æ±çæ§è¡è¿ç¨ï¼æ·±ç°è²åºå表示çå¾
æä½ç³»ç»çæ¶é´)
æ·±ç°è²èæ¯æ è®°äºç¨åºçå¾ æä½ç³»ç»å®æç³»ç»è°ç¨çæ¶é´ãæä»¬æ²¡æææ¯ä¾ç»å¶è¿äºæ¶é´ãå¦æææ¯ä¾ç»å¶ï¼æ´ä¸ªå¾è¡¨å°å¤§é¨åæ¯æ·±ç°è²çï¼å®é ä¸ï¼è¿ä¸ªå½æ°å ä¹ææææ¶é´é½è±å¨çå¾ æä½ç³»ç»ä¸äºãåé¢ä»£ç çæ§è¡è¿ç¨å¨ç³»ç»è°ç¨ä¹é´åªæ¯å¾çªççæ®µã
å½è¿ä¸ªå½æ°çå¾ ç³»ç»è°ç¨è¿åæ¶ï¼å®æå¨çå个线ç¨ä¼è¢«é»å¡ï¼å¨ç³»ç»è°ç¨å®æä¹åï¼å®æ æ³æ§è¡å ¶ä»ä»»ä½æä½ãä¸ä¸ªçº¿ç¨çæ 大å°è¾¾å°å åæå ç¾ååèå¹¶ä¸ç½è§ï¼æä»¥å¦æè¿æ¯æä¸ªæ´å¤§ç³»ç»çä¸é¨åï¼æè®¸å¤çº¿ç¨å¨æ§è¡ç±»ä¼¼çä»»å¡ï¼é£ä¹éå®è¿äºçº¿ç¨çèµæºåªæ¯ä¸ºäºçå¾ ï¼ä»£ä»·å¯è½ä¼å¾é«ã
为äºè§£å³è¿ä¸ªé®é¢ï¼çº¿ç¨éè¦è½å¤å¨çå¾ ç³»ç»è°ç¨å®ææ¶å»å¤çå ¶ä»å·¥ä½ãä½å¦ä½å®ç°è¿ä¸ç¹å¹¶ä¸ææ¾ãä¾å¦ï¼æä»¬ç¨äºä»å¥æ¥å读åååºç彿°ç¾åå¦ä¸ï¼
fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize>;
ä»ç±»åä¸å°±å¯ä»¥çåºï¼è¿ä¸ªå½æ°å¨ä»»å¡å®ææåºç°é误ä¹åä¸ä¼è¿åãè¿ä¸ªå½æ°æ¯åæ¥çï¼æä½å®æåè°ç¨è æä¼ç»§ç»æ§è¡ã妿æä»¬æ³å¨æä½ç³»ç»å·¥ä½æ¶è®©çº¿ç¨å»åå ¶ä»äºæ ï¼å°±éè¦ä¸ä¸ªæ°çI/Oåºï¼å®è½æä¾è¿ä¸ªå½æ°ç弿¥çæ¬ã
# æªæ¥å¼ï¼Futuresï¼
Rustæ¯æå¼æ¥æä½çæ¹æ³æ¯å¼å
¥ä¸ä¸ªç¹æ§std::future::Futureï¼
trait Future {
type Output;
// ç®åï¼å°`Pin<&mut Self>`ç解为`&mut Self`ã
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
2
3
4
5
6
7
8
9
10
Future代表ä¸ä¸ªä½ å¯ä»¥æ£æµå
¶æ¯å¦å®æçæä½ãä¸ä¸ªæªæ¥å¼çpollæ¹æ³æ°¸è¿ä¸ä¼çå¾
æä½å®æï¼å®æ»æ¯ç«å³è¿åã妿æä½å®æï¼pollè¿åPoll::Ready(output)ï¼å
¶ä¸outputæ¯æç»ç»æãå¦åï¼å®è¿åPendingãå¦ææªæ¥å¼å¼å¾å次轮询ï¼å®ä¼éè¿è°ç¨ä¸ä¸ªå¤éå¨ï¼å¨Context䏿ä¾çåè°å½æ°ï¼æ¥éç¥æä»¬ãæä»¬å°è¿ç§å¼æ¥ç¼ç¨æ¹å¼ç§°ä¸º âç®çº³å¡æ¨¡åâï¼å¯¹äºä¸ä¸ªæªæ¥å¼ï¼ä½ å¯ä¸è½åçå°±æ¯ç¨pollæ¹æ³å夿¢æµå®ï¼ç´å°å¾å°ä¸ä¸ªå¼ã
ææç°ä»£æä½ç³»ç»é½å å«å ¶ç³»ç»è°ç¨çåä½ï¼æä»¬å¯ä»¥ç¨è¿äºå使¥å®ç°è¿ç§è½®è¯¢æ¥å£ãä¾å¦ï¼å¨UnixåWindowsä¸ï¼å¦æä½ å°ç½ç»å¥æ¥å设置为éé»å¡æ¨¡å¼ï¼é£ä¹è¯»åæä½å¨å¯è½é»å¡æ¶ä¼è¿åä¸ä¸ªé误ï¼ä½ å¿ é¡»ç¨ååè¯ã
æä»¥read_to_stringç弿¥çæ¬çç¾å大è´å¦ä¸ï¼
fn read_to_string(&mut self, buf: &mut String) -> impl Future<Output = Result<usize>>;
é¤äºè¿åç±»åï¼è¿ä¸ªç¾å䏿们ä¹åå±ç¤ºçç¸åï¼å¼æ¥çæ¬è¿åä¸ä¸ªResult<usize>çæªæ¥å¼ãä½ éè¦è½®è¯¢è¿ä¸ªæªæ¥å¼ï¼ç´å°ä»ä¸å¾å°ä¸ä¸ªReady(result)ãæ¯æ¬¡è½®è¯¢æ¶ï¼è¯»åæä½ä¼å°½å¯è½å°è¿è¡ãæç»ç»æä¼åæ®éI/Oæä½ä¸æ ·ï¼ç»ä½ ä¸ä¸ªæå弿é误å¼ãè¿æ¯ä¸è¬æ¨¡å¼ï¼ä»»ä½å½æ°ç弿¥çæ¬é½ä¸åæ¥çæ¬æ¥åç¸åçåæ°ï¼ä½è¿åç±»å被Futureå
裹ã
è°ç¨è¿ä¸ªçæ¬çread_to_stringå®é
ä¸å¹¶ä¸ä¼è¯»åä»»ä½å
容ï¼å®å¯ä¸çèè´£æ¯æé å¹¶è¿åä¸ä¸ªæªæ¥å¼ï¼å½å¯¹è¿ä¸ªæªæ¥å¼è¿è¡è½®è¯¢æ¶ï¼å®ä¼æ§è¡å®é
çå·¥ä½ãè¿ä¸ªæªæ¥å¼å¿
é¡»ä¿åè°ç¨æè¯·æ±çææå¿
è¦ä¿¡æ¯ãä¾å¦ï¼è¿ä¸ªread_to_stringè¿åçæªæ¥å¼å¿
须记ä½å®è¢«è°ç¨æ¶çè¾å
¥æµï¼ä»¥åå®åºè¯¥å°ä¼ å
¥æ°æ®è¿½å å°çStringãå®é
ä¸ï¼ç±äºæªæ¥å¼ææselfåbufçå¼ç¨ï¼read_to_stringçæ£ç¡®ç¾åå¿
é¡»æ¯ï¼
fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> impl Future<Output = Result<usize>> + 'a;
è¿éæ·»å äºçå½å¨ææ 注ï¼ä»¥è¡¨æè¿åçæªæ¥å¼ççå½å¨æåªè½ä¸selfåbufæåç¨çå¼ççå½å¨æä¸æ ·é¿ã
async-stdåºæä¾äºstd䏿æI/Oåè½ç弿¥çæ¬ï¼å
æ¬ä¸ä¸ªå¸¦æread_to_stringæ¹æ³ç弿¥Readç¹æ§ãasync-stdç´§å¯éµå¾ªstdç设计ï¼å°½å¯è½å¨èªå·±çæ¥å£ä¸éç¨stdçç±»åï¼æä»¥é误ãç»æãç½ç»å°å以å大夿°å
¶ä»ç¸å
³æ°æ®å¨è¿ä¸¤ä¸ªåºä¸æ¯å
¼å®¹çãçæstdæå©äºä½ 使ç¨async-stdï¼åä¹äº¦ç¶ã
Futureç¹æ§çè§åä¹ä¸æ¯ï¼ä¸æ¦ä¸ä¸ªæªæ¥å¼è¿åäºPoll::Readyï¼å®å¯ä»¥è®¤ä¸ºèªå·±åä¹ä¸ä¼è¢«è½®è¯¢äºãæäºæªæ¥å¼å¦æè¢«è¿åº¦è½®è¯¢ï¼ä¼æ°¸è¿è¿åPoll::Pendingï¼å
¶ä»çå¯è½ä¼å¯¼è´ç¨åºå´©æºææèµ·ï¼ä¸è¿ï¼å®ä»¬ç»ä¸è½è¿åå
åæçº¿ç¨å®å
¨ï¼å¦åä¼å¯¼è´æªå®ä¹è¡ä¸ºï¼ãFutureç¹æ§ä¸çfuseéé
卿¹æ³å¯ä»¥å°ä»»ä½æªæ¥å¼è½¬æ¢ä¸ºä¸ä¸ªæ°¸è¿è¿åPoll::Pendingçæªæ¥å¼ã使æå¸¸ç¨çæ¶è´¹æªæ¥å¼çæ¹å¼é½éµå¾ªè¿ä¸ªè§åï¼æä»¥é常ä¸éè¦ä½¿ç¨fuseã
å¦æè½®è¯¢å¬èµ·æ¥æçä¸é«ï¼å«æ
å¿ãRustç弿¥æ¶æç»è¿ç²¾å¿è®¾è®¡ï¼åªè¦åread_to_stringè¿æ ·çåºæ¬I/O彿°å®ç°æ£ç¡®ï¼ä½ åªä¼å¨å¼å¾è½®è¯¢æ¶æå»è½®è¯¢ä¸ä¸ªæªæ¥å¼ãæ¯æ¬¡è°ç¨pollæ¶ï¼æä¸ªå°æ¹çæä¸ªæä½åºè¯¥è¿åReadyï¼æè
è³å°æçè¿ä¸ªç®æ åå¾è¿å±ãæä»¬å°å¨ âåºæ¬æªæ¥å¼åæ§è¡å¨ï¼ä½æ¶å¼å¾åæ¬¡è½®è¯¢æªæ¥å¼ï¼â ä¸è§£éè¿æ¯å¦ä½å®ç°çã
使¯ä½¿ç¨æªæ¥å¼ä¼¼ä¹æ¯ä¸ªææï¼å½ä½ 轮询æ¶ï¼å¦æå¾å°Poll::Pendingï¼ä½ 该æä¹åï¼ä½ å¿
é¡»ææ¶æ¾ç¹å
¶ä»å·¥ä½è®©è¿ä¸ªçº¿ç¨å»åï¼åæ¶åä¸è½å¿è®°ç¨å忥忬¡è½®è¯¢è¿ä¸ªæªæ¥å¼ãä½ çæ´ä¸ªç¨åºä¼å
满ç¨äºè·è¸ªåªäºä»»å¡å¤äºæèµ·ç¶æä»¥åå®ä»¬åå¤å¥½å该åä»ä¹ç夿代ç ãcheapo_request彿°çç®æ´æ§è¢«ç ´åäºã
å¥½æ¶æ¯æ¯ï¼å ¶å®å¹¶æ²¡æã
# 弿¥å½æ°å await 表达å¼
ä¸é¢æ¯cheapo_requeståæå¼æ¥å½æ°ççæ¬ï¼
use async_std::io::prelude::*;
use async_std::net;
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
let mut socket = net::TcpStream::connect((host, port)).await?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes()).await?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response).await?;
Ok(response)
}
2
3
4
5
6
7
8
9
10
11
12
è¿æ®µä»£ç 䏿们æåççæ¬éå对æ¯ï¼åºå«å¦ä¸ï¼
- 彿°ä»¥
async fnå¼å¤´ï¼è䏿¯fnã - å®ä½¿ç¨äº
async_stdåºä¸TcpStream::connectãwrite_allåread_to_stringç弿¥çæ¬ãè¿äºå½æ°é½è¿åå®ä»¬ç»æçæªæ¥å¼ï¼æ¬èç示ä¾ä½¿ç¨async_std1.7çæ¬ï¼ã - 卿¯ä¸ªè¿åæªæ¥å¼çè°ç¨ä¹åï¼ä»£ç ä¸é½æ
.awaitãè½ç¶è¿çèµ·æ¥åæ¯å¯¹å为awaitçç»æä½å段çå¼ç¨ï¼ä½å®å®é 䏿¯è¯è¨å ç½®çç¹æ®è¯æ³ï¼ç¨äºçå¾ ä¸ä¸ªæªæ¥å¼åå¤å°±ç»ªãawait表达å¼ä¼è®¡ç®åºæªæ¥å¼çæç»å¼ã彿°å°±æ¯éè¿è¿ç§æ¹å¼ä»connectãwrite_allåread_to_stringè·åç»æçã
䏿®é彿°ä¸åï¼å½ä½ è°ç¨ä¸ä¸ªå¼æ¥å½æ°æ¶ï¼å®ä¼å¨å½æ°ä½å¼å§æ§è¡ä¹åç«å³è¿åãæ¾ç¶ï¼è°ç¨çæç»è¿åå¼è¿æ²¡æè®¡ç®åºæ¥ï¼ä½ å¾å°çæ¯å ¶æç»å¼çæªæ¥å¼ãæä»¥ï¼å¦æä½ æ§è¡è¿æ®µä»£ç ï¼
let response = cheapo_request(host, port, path);
é£ä¹responseå°æ¯ä¸ä¸ªstd::io::Result<String>çæªæ¥å¼ï¼ècheapo_requestç彿°ä½è¿æ²¡æå¼å§æ§è¡ãä½ ä¸éè¦è°æ´å¼æ¥å½æ°çè¿åç±»åï¼Rustä¼èªå¨å°async fn f(...) -> Tè§ä¸ºä¸ä¸ªè¿åTçæªæ¥å¼ç彿°ï¼è䏿¯ç´æ¥è¿åTã
弿¥å½æ°è¿åçæªæ¥å¼å°è£
äºå½æ°ä½è¿è¡æéçææä¿¡æ¯ï¼å½æ°çåæ°ãå±é¨åéç空é´ççãï¼å°±å¥½åä½ æè°ç¨çæ 帧æè·ä¸ºä¸ä¸ªæ®éçRustå¼ãï¼æä»¥responseå¿
é¡»ä¿å为hostãportåpathä¼ éçå¼ï¼å 为cheapo_requestç彿°ä½è¿è¡æ¶éè¦è¿äºå¼ã
æªæ¥å¼çå
·ä½ç±»åç±ç¼è¯å¨æ ¹æ®å½æ°ä½ååæ°èªå¨çæãè¿ä¸ªç±»å没æååï¼ä½ åªç¥éå®å®ç°äºFuture<Output = R>ï¼å
¶ä¸Ræ¯å¼æ¥å½æ°çè¿åç±»åãä»è¿ä¸ªæä¹ä¸è¯´ï¼å¼æ¥å½æ°çæªæ¥å¼ç±»ä¼¼äºéå
ï¼éå
乿ç±ç¼è¯å¨çæçå¿åç±»åï¼è¿äºç±»åå®ç°äºFnOnceãFnåFnMutç¹æ§ã
å½ä½ 馿¬¡è½®è¯¢cheapo_requestè¿åçæªæ¥å¼æ¶ï¼æ§è¡ä»å½æ°ä½é¡¶é¨å¼å§ï¼ä¸ç´è¿è¡å°TcpStream::connectè¿åçæªæ¥å¼ç第ä¸ä¸ªawaitãawait表达å¼ä¼è½®è¯¢connectçæªæ¥å¼ï¼å¦æå®è¿æ²¡æåå¤å¥½ï¼å°±ä¼åå®èªå·±çè°ç¨è
è¿åPoll::Pendingï¼å¨TcpStream::connectçæªæ¥å¼è½®è¯¢è¿åPoll::Readyä¹åï¼å¯¹cheapo_requestçæªæ¥å¼çè½®è¯¢æ æ³è¶è¿ç¬¬ä¸ä¸ªawaitç»§ç»è¿è¡ãæä»¥TcpStream::connect(...).await表达å¼å¤§è´ç¸å½äºï¼
{
// 注æï¼è¿æ¯ä¼ªä»£ç ï¼ä¸æ¯ææçRust代ç
let connect_future = TcpStream::connect(...);
'retry_point:
match connect_future.poll(cx) {
Poll::Ready(value) => value,
Poll::Pending => {
// 宿䏿¬¡å¯¹`cheapo_request`çæªæ¥å¼è¿è¡`poll`æ¶ï¼ä»'retry_pointç»§ç»æ§è¡
...
return Poll::Pending;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
await表达å¼è·åæªæ¥å¼çæææï¼ç¶å对å
¶è¿è¡è½®è¯¢ãå¦ææªæ¥å¼å·²åå¤å¥½ï¼é£ä¹æªæ¥å¼çæç»å¼å°±æ¯await表达å¼çå¼ï¼æ§è¡ç»§ç»è¿è¡ãå¦åï¼å®åèªå·±çè°ç¨è
è¿åPoll::Pendingã
ä½å
³é®çæ¯ï¼ä¸ä¸æ¬¡å¯¹cheapo_requestçæªæ¥å¼è¿è¡è½®è¯¢æ¶ï¼ä¸ä¼åä»å½æ°é¡¶é¨å¼å§ï¼èæ¯å¨å½æ°ä¸å³å°è½®è¯¢connect_futureçä½ç½®ç»§ç»æ§è¡ãå¨è¯¥æªæ¥å¼åå¤å¥½ä¹åï¼æä»¬ä¸ä¼ç»§ç»æ§è¡å¼æ¥å½æ°çå
¶ä½é¨åã
éç对cheapo_requestçæªæ¥å¼çæç»è½®è¯¢ï¼å®ä¼ä»ä¸ä¸ªawaitç§»å¨å°ä¸ä¸ä¸ªawaitï¼éæ¥æ§è¡å½æ°ä½ï¼åªæå¨çå¾
çåæªæ¥å¼åå¤å¥½æ¶æä¼ç»§ç»ãå æ¤ï¼cheapo_requestçæªæ¥å¼éè¦è¢«è½®è¯¢ç次æ°åå³äºåæªæ¥å¼çè¡ä¸ºå彿°èªèº«çæ§å¶æµã
cheapo_requestçæªæ¥å¼ä¼è·è¸ªä¸ä¸æ¬¡è½®è¯¢åºè¯¥ä»åªéç»§ç»ï¼ä»¥åç»§ç»æ§è¡æéçææå±é¨ç¶æï¼åéãåæ°ã临æ¶åéçï¼ã
è½å¤å¨å½æ°ä¸é´æåæ§è¡ï¼ç¶åç¨åç»§ç»ï¼è¿æ¯å¼æ¥å½æ°ç¬æçç¹æ§ãå½ä¸ä¸ªæ®é彿°è¿åæ¶ï¼å®çæ å¸§å°±æ°¸è¿æ¶å¤±äºã
ç±äºawait表达å¼ä¾èµäºè½å¤ç»§ç»æ§è¡çè½åï¼æä»¥ä½ åªè½å¨å¼æ¥å½æ°å
é¨ä½¿ç¨å®ä»¬ã
卿°åæ¬ææ¶ï¼Rustè¿ä¸å
è®¸ç¹æ§æ¥æå¼æ¥æ¹æ³ãåªæèªç±å½æ°åç¹å®ç±»åçåºæå½æ°å¯ä»¥æ¯å¼æ¥çãåæ¶è¿ä¸ªéå¶éè¦å¯¹è¯è¨è¿è¡ä¸äºä¿®æ¹ã卿¤æé´ï¼å¦æä½ éè¦å®ä¹å
å«å¼æ¥å½æ°çç¹æ§ï¼å¯ä»¥èè使ç¨async-traitåºï¼å®æä¾äºä¸ç§åºäºå®çè§£å³æ¹æ³ã
# ä»åæ¥ä»£ç ä¸è°ç¨å¼æ¥å½æ°ï¼block_on
ä»æç§æä¹ä¸è®²ï¼å¼æ¥å½æ°åªæ¯æé¾é¢ä¸¢ç»äºè°ç¨è
ãç¡®å®ï¼å¨å¼æ¥å½æ°ä¸è·åä¸ä¸ªæªæ¥å¼å¾å®¹æï¼åªé使ç¨awaitãä½å¼æ¥å½æ°æ¬èº«è¿åçæ¯ä¸ä¸ªæªæ¥å¼ï¼æä»¥ç°å¨è°ç¨è
徿³åæ³å¯¹å
¶è¿è¡è½®è¯¢ãæç»ï¼æ»å¾æäººå®é
å»çå¾
ä¸ä¸ªå¼ã
æä»¬å¯ä»¥å¨æ®éç忥彿°ï¼æ¯å¦main彿°ï¼ä¸ï¼ä½¿ç¨async_stdçtask::block_on彿°æ¥è°ç¨cheapo_requestãtask::block_on彿°æ¥åä¸ä¸ªæªæ¥å¼ï¼å¹¶å¯¹å
¶è¿è¡è½®è¯¢ï¼ç´å°å¾å°ä¸ä¸ªç»æï¼
fn main() -> std::io::Result<()> {
use async_std::task;
let response = task::block_on(cheapo_request("example.com", 80, "/"))?;
println!("{}", response);
Ok(())
}
2
3
4
5
6
ç±äºblock_onæ¯ä¸ä¸ªåæ¥å½æ°ï¼å®ä¼äº§ç弿¥å½æ°çæç»å¼ï¼æä»¥ä½ å¯ä»¥æå®ç使¯ä»å¼æ¥ä¸çå°åæ¥ä¸ççéé
å¨ãä½å®çé»å¡ç¹æ§ä¹æå³çä½ ç»ä¸è½å¨å¼æ¥å½æ°ä¸ä½¿ç¨block_onï¼è¿ä¼é»å¡æ´ä¸ªçº¿ç¨ï¼ç´å°å¾å°ç»æãåºè¯¥ä½¿ç¨await代æ¿ã
å¾20 - 2å±ç¤ºäºmain彿°ä¸ç§å¯è½çæ§è¡è¿ç¨ã
䏿¹çæ¶é´çº¿âç®åè§å¾âå±ç¤ºäºç¨åºå¼æ¥è°ç¨çæ½è±¡è§å¾ï¼cheapo_requesté¦å
è°ç¨TcpStream::connectè·åä¸ä¸ªå¥æ¥åï¼ç¶åå¨è¯¥å¥æ¥åä¸è°ç¨write_allåread_to_stringï¼ä¹åè¿åãè¿ä¸æ¬ç« åé¢åæ¥çæ¬çcheapo_requestçæ¶é´çº¿é常ç¸ä¼¼ã
å¾20 - 2 é»å¡çå¾
弿¥å½æ°
ä½è¿äºå¼æ¥è°ç¨ä¸çæ¯ä¸ä¸ªé½æ¯ä¸ä¸ªå¤æ¥éª¤çè¿ç¨ï¼å å建ä¸ä¸ªæªæ¥å¼ï¼ç¶åå¯¹å ¶è¿è¡è½®è¯¢ï¼ç´å°å®åå¤å¥½ï¼å¨æ¤è¿ç¨ä¸å¯è½è¿ä¼åå»ºå¹¶è½®è¯¢å ¶ä»åæªæ¥å¼ã䏿¹çæ¶é´çº¿âå®ç°âå±ç¤ºäºå®ç°è¿ç§å¼æ¥è¡ä¸ºçå®é 忥è°ç¨ãè¿æ¯ä¸ä¸ªå¾å¥½çæºä¼ï¼è®©æä»¬è¯¦ç»äºè§£æ®é弿¥æ§è¡è¿ç¨ä¸å°åºåçäºä»ä¹ï¼
- é¦å
ï¼
main彿°è°ç¨cheapo_requestï¼å®è¿åå ¶æç»ç»æçæªæ¥å¼Aãç¶åmainå°è¯¥æªæ¥å¼ä¼ éç»async_std::block_onï¼block_onå¯¹å ¶è¿è¡è½®è¯¢ã - å¯¹æªæ¥å¼
Aç轮询使å¾cheapo_requestç彿°ä½å¼å§æ§è¡ãå®è°ç¨TcpStream::connect以è·åä¸ä¸ªå¥æ¥åçæªæ¥å¼Bï¼ç¶åçå¾ è¿ä¸ªæªæ¥å¼ãæ´åç¡®å°è¯´ï¼ç±äºTcpStream::connectå¯è½ä¼éå°éè¯¯ï¼æä»¥Bæ¯ä¸ä¸ªResult<TcpStream, std::io::Error>çæªæ¥å¼ã awaitä¼è½®è¯¢æªæ¥å¼Bãç±äºç½ç»è¿æ¥å°æªå»ºç«ï¼B.pollè¿åPoll::Pendingï¼ä½ä¼å®æå¨å¥æ¥ååå¤å¥½æ¶å¤éè°ç¨ä»»å¡ã- ç±äºæªæ¥å¼
Bè¿æ²¡æåå¤å¥½ï¼A.pollåå®èªå·±çè°ç¨èblock_onè¿åPoll::Pendingã - ç±äº
block_onæ äºå¯åï¼å®è¿å ¥ç¡ç ç¶æãæ¤æ¶æ´ä¸ªçº¿ç¨è¢«é»å¡ã - å½
Bçè¿æ¥åå¤å¥½ä½¿ç¨æ¶ï¼å®ä¼å¤é轮询å®çä»»å¡ãè¿ä½¿å¾block_on忬¡è¡å¨èµ·æ¥ï¼å®ä¼å次å°è¯è½®è¯¢æªæ¥å¼Aã - 对
Aç轮询导è´cheapo_requestå¨å ¶ç¬¬ä¸ä¸ªawait夿¢å¤æ§è¡ï¼å¨é£éå®å次轮询Bã - è¿æ¬¡ï¼
Bå·²ç»åå¤å¥½ï¼å¥æ¥ååå»ºå®æï¼æä»¥å®åA.pollè¿åPoll::Ready(Ok(socket))ã - 对
TcpStream::connectç弿¥è°ç¨ç°å¨å®æäºãå æ¤ï¼TcpStream::connect(...).await表达å¼çå¼ä¸ºOk(socket)ã cheapo_request彿°ä½çæ§è¡æ£å¸¸ç»§ç»ï¼ä½¿ç¨format!å®æå»ºè¯·æ±å符串ï¼å¹¶å°å ¶ä¼ éç»socket.write_allã- ç±äº
socket.write_allæ¯ä¸ä¸ªå¼æ¥å½æ°ï¼å®è¿åå ¶ç»æçæªæ¥å¼Cï¼cheapo_requestä¼çå¾ è¿ä¸ªæªæ¥å¼ã
åç»çè¿ç¨ç±»ä¼¼ãå¨å¾20 - 2æç¤ºçæ§è¡è¿ç¨ä¸ï¼socket.read_to_stringçæªæ¥å¼å¨åå¤å¥½ä¹å被轮询äºåæ¬¡ï¼æ¯æ¬¡å¤éé½ä¼ä»å¥æ¥å读åä¸äºæ°æ®ï¼ä½read_to_string被æå®è¦ä¸ç´è¯»å°è¾å
¥ç»æï¼è¿éè¦å 个æä½ã
åä¸ä¸ªä¸æè°ç¨pollç循ç¯å¬èµ·æ¥å¹¶ä¸é¾ãä½async_std::task::block_onçä»·å¼å¨äºï¼å®ç¥éå¦ä½è¿å
¥ç¡ç ç¶æï¼ç´å°æªæ¥å¼å®é
ä¸å¼å¾å次轮询ï¼è䏿¯æµªè´¹å¤ç卿¶é´åçµæ± çµéè¿è¡æ°åäº¿æ¬¡æ æä¹ç轮询è°ç¨ãåconnectåread_to_stringè¿æ ·çåºæ¬I/O彿°è¿åçæªæ¥å¼ä¼ä¿çä¼ éç»pollçContextæä¾çå¤éå¨ï¼å¹¶å¨block_onåºè¯¥å¤é并忬¡å°è¯è½®è¯¢æ¶è°ç¨å®ãæä»¬å°å¨âåºæ¬æªæ¥å¼åæ§è¡å¨ï¼ä½æ¶å¼å¾åæ¬¡è½®è¯¢æªæ¥å¼ï¼âä¸éè¿èªå·±å®ç°ä¸ä¸ªç®åçæ¬çblock_onæ¥ç¡®åå±ç¤ºè¿æ¯å¦ä½å·¥ä½çã
䏿们åé¢å±ç¤ºçåå§åæ¥çæ¬ä¸æ ·ï¼è¿ä¸ªå¼æ¥çæ¬çcheapo_requestå ä¹ææææ¶é´é½è±å¨äºçå¾
æä½å®æä¸ãå¦æææ¯ä¾ç»å¶æ¶é´è½´ï¼å¾è¡¨å ä¹å
¨æ¯æ·±ç°è²çï¼åªæå¨ç¨åºè¢«å¤éæ¶ææä¸å°æ®µè®¡ç®æ¶é´ã
è¿éæ¶åäºå¾å¤ç»èã幸è¿çæ¯ï¼ä½ é常å¯ä»¥åªä»ç®åç䏿¹æ¶é´çº¿çè§åº¦æ¥æèï¼æäºå½æ°è°ç¨æ¯åæ¥çï¼æäºæ¯å¼æ¥çï¼éè¦ä½¿ç¨awaitï¼ä½å®ä»¬é½åªæ¯å½æ°è°ç¨ãRust弿¥æ¯æçæååå³äºå¸®å©ç¨åºåå¨å®è·µä¸ä½¿ç¨ç®åè§å¾ï¼èä¸ä¼è¢«å®ç°è¿ç¨ä¸çæ¥åæä½æå¹²æ°ã
# çæå¼æ¥ä»»å¡
async_std::task::block_on彿°ä¼é»å¡ï¼ç´å°ä¸ä¸ªæªæ¥å¼åå¤å°±ç»ªãä½è®©çº¿ç¨å®å
¨é»å¡å¨åä¸ªæªæ¥å¼ä¸ï¼å¹¶ä¸æ¯åæ¥è°ç¨å¥½ï¼æ¬ç« çç®æ æ¯è®©çº¿ç¨å¨çå¾
æ¶å»åå
¶ä»å·¥ä½ã
为æ¤ï¼ä½ å¯ä»¥ä½¿ç¨async_std::task::spawn_localãè¿ä¸ªå½æ°æ¥åä¸ä¸ªæªæ¥å¼ï¼å¹¶å°å
¶æ·»å å°ä¸ä¸ªä»»å¡æ± ä¸ï¼å½block_onæé»å¡çå¾
çæªæ¥å¼æªåå¤å¥½æ¶ï¼å®ä¼å°è¯å¯¹ä»»å¡æ± ä¸çæªæ¥å¼è¿è¡è½®è¯¢ãæä»¥ï¼å¦æä½ å°ä¸å æªæ¥å¼ä¼ éç»spawn_localï¼ç¶å对æç»ç»æçæªæ¥å¼åºç¨block_onï¼é£ä¹block_onä¼å¨æè¿å±æ¶è½®è¯¢æ¯ä¸ªçæçæªæ¥å¼ï¼å¹¶åå°è¿è¡æ´ä¸ªä»»å¡æ± ï¼ç´å°ä½ çç»æåå¤å¥½ã
卿°åæ¬ææ¶ï¼spawn_localå¨async-stdä¸åªæå¯ç¨äºè¯¥åºçä¸ç¨³å®ç¹æ§æ¶æå¯ç¨ãè¦åå°è¿ä¸ç¹ï¼ä½ éè¦å¨Cargo.tomlä¸åè¿æ ·å¼ç¨async-stdï¼
async-std = { version = "1", features = ["unstable"] }
spawn_local彿°æ¯æ ååºä¸ç¨äºå¯å¨çº¿ç¨çstd::thread::spawn彿°ç弿¥çæ¬ï¼
std::thread::spawn(c)æ¥åä¸ä¸ªéåcï¼å¹¶å¯å¨ä¸ä¸ªçº¿ç¨æ¥è¿è¡å®ï¼è¿åä¸ä¸ªstd::thread::JoinHandleï¼å ¶joinæ¹æ³ä¼çå¾ çº¿ç¨å®æï¼å¹¶è¿åcçè¿åå¼ãasync_std::task::spawn_local(f)æ¥åæªæ¥å¼fï¼å¹¶å°å ¶æ·»å å°ä»»å¡æ± ä¸ï¼ä»¥ä¾¿å¨å½å线ç¨è°ç¨block_onæ¶è¿è¡è½®è¯¢ãspawn_localè¿åå®èªå·±çasync_std::task::JoinHandleç±»åï¼å®æ¬èº«ä¹æ¯ä¸ä¸ªæªæ¥å¼ï¼ä½ å¯ä»¥ä½¿ç¨awaitæ¥è·åfçæç»å¼ã
ä¾å¦ï¼å设æä»¬æ³å¹¶åå°ååºä¸ç»HTTP请æ±ãä¸é¢æ¯ç¬¬ä¸æ¬¡å°è¯ï¼
pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>> {
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(task::spawn_local(cheapo_request(&host, port, &path)));
}
let mut results = vec![];
for handle in handles {
results.push(handle.await);
}
results
}
2
3
4
5
6
7
8
9
10
11
12
è¿ä¸ªå½æ°å¯¹requestsä¸çæ¯ä¸ªå
ç´ è°ç¨cheapo_requestï¼å°æ¯ä¸ªè°ç¨çæªæ¥å¼ä¼ éç»spawn_localãå®å°å¾å°çJoinHandleæ¶éå°ä¸ä¸ªåéä¸ï¼ç¶åçå¾
æ¯ä¸ªJoinHandleã以任ä½é¡ºåºçå¾
JoinHandle齿²¡é®é¢ï¼å 为请æ±å·²ç»è¢«çæï¼åªè¦è¿ä¸ªçº¿ç¨è°ç¨block_on䏿 äºå¯åæ¶ï¼å®ä»¬çæªæ¥å¼å°±ä¼æ ¹æ®éè¦è¢«è½®è¯¢ãææè¯·æ±å°å¹¶åè¿è¡ã䏿¦å®ä»¬å®æï¼many_requestså°±ä¼å°ç»æè¿åç»è°ç¨è
ã
åé¢ç代ç å 乿¯æ£ç¡®çï¼ä½Rustçåç¨æ£æ¥å¨å¯¹cheapo_requestçæªæ¥å¼ççå½å¨æè¡¨ç¤ºæ
å¿§ï¼
error: `host` does not live long enough
handles.push(task::spawn_local(cheapo_request(&host, port,
&path)));
---------------^^^^^----------
----
| |
| borrowed value
does not
| live long
enough
argument requires that `host` is borrowed
for `'static`
}
- `host` dropped here while still borrowed
2
3
4
5
6
7
8
9
10
11
12
13
14
pathä¹æç±»ä¼¼çé误ã
èªç¶å°ï¼å¦ææä»¬å°å¼ç¨ä¼ éç»ä¸ä¸ªå¼æ¥å½æ°ï¼å®è¿åçæªæ¥å¼å¿ é¡»ææè¿äºå¼ç¨ï¼æä»¥è¿ä¸ªæªæ¥å¼ççå½å¨æä¸è½è¶ è¿å®æåç¨çå¼ççå½å¨æãè¿ä¸ä»»ä½ææå¼ç¨ç弿åçéå¶æ¯ä¸æ ·çã
é®é¢å¨äºspawn_localä¸è½ç¡®å®å¨hoståpath被丢å¼ä¹åï¼ä½ ä¼çå¾
ä»»å¡å®æãå®é
ä¸ï¼spawn_localåªæ¥åçå½å¨æä¸º'staticçæªæ¥å¼ï¼å ä¸ºä½ å¯è½ä¼ç®åå°å¿½ç¥å®è¿åçJoinHandleï¼è®©ä»»å¡å¨ç¨åºçå©ä½æ§è¡æ¶é´å
ç»§ç»è¿è¡ãè¿å¹¶é弿¥ä»»å¡ç¹æçé®é¢ï¼å¦æä½ è¯å¾ä½¿ç¨std::thread::spawnå¯å¨ä¸ä¸ªéå
æè·äºå±é¨åéå¼ç¨ç线ç¨ï¼ä¹ä¼å¾å°ç±»ä¼¼çé误ã
ä¸ç§è§£å³æ¹æ³æ¯å建å¦ä¸ä¸ªå¼æ¥å½æ°ï¼å®æ¥ååæ°çæ¥ææææççæ¬ï¼
async fn cheapo_owning_request(host: String, port: u16, path: String) -> std::io::Result<String> {
cheapo_request(&host, port, &path).await
}
2
3
è¿ä¸ªå½æ°æ¥åStringç±»åè䏿¯&strå¼ç¨ï¼æä»¥å®çæªæ¥å¼èªå·±æ¥æhoståpathåç¬¦ä¸²çæææï¼å¹¶ä¸å®ççå½å¨ææ¯'staticãåç¨æ£æ¥å¨å¯ä»¥çå°å®ç«å³çå¾
cheapo_requestçæªæ¥å¼ï¼å æ¤ï¼å¦æè¿ä¸ªæªæ¥å¼æ£å¨è¢«è½®è¯¢ï¼é£ä¹å®æåç¨çhoståpathåéè¯å®ä»ç¶åå¨ãè¿æ ·å°±æ²¡é®é¢äºã
使ç¨cheapo_owning_requestï¼ä½ å¯ä»¥åè¿æ ·çæææè¯·æ±ï¼
for (host, port, path) in requests {
handles.push(task::spawn_local(cheapo_owning_request(host, port, path)));
}
2
3
ä½ å¯ä»¥å¨åæ¥çmain彿°ä¸ä½¿ç¨block_onæ¥è°ç¨many_requestsï¼
let requests = vec![
("example.com".to_string(), 80, "/".to_string()),
("www.red-bean.com".to_string(), 80, "/".to_string()),
("en.wikipedia.org".to_string(), 80, "/".to_string()),
];
let results = async_std::task::block_on(many_requests(requests));
for result in results {
match result {
Ok(response) => println!("{}", response),
Err(err) => eprintln!("error: {}", err),
}
}
2
3
4
5
6
7
8
9
10
11
12
è¿æ®µä»£ç å¨block_onçè°ç¨ä¸å¹¶åè¿è¡ææä¸ä¸ªè¯·æ±ãå¨å
¶ä»è¯·æ±è¢«é»å¡æ¶ï¼æ¯ä¸ªè¯·æ±é½è½å¨ææºä¼æ¶åå¾è¿å±ï¼ææè¿äºé½å¨è°ç¨çº¿ç¨ä¸è¿è¡ãå¾20 - 3å±ç¤ºäºå¯¹cheapo_requestç䏿¬¡è°ç¨ä¸ç§å¯è½çæ§è¡è¿ç¨ã
ï¼æä»¬é¼å±ä½ èªå·±è¿è¡è¿æ®µä»£ç ï¼å¨cheapo_requestçé¡¶é¨åæ¯ä¸ªawait表达å¼ä¹åæ·»å eprintln!è°ç¨ï¼è¿æ ·ä½ å°±è½çå°æ¯æ¬¡æ§è¡ä¸è°ç¨æ¯å¦ä½ä¸åå°äº¤éè¿è¡çãï¼
å¾20 - 3 å¨å个线ç¨ä¸è¿è¡ä¸ä¸ªå¼æ¥ä»»å¡
对many_requestsçè°ç¨ï¼ä¸ºç®åèµ·è§æªå±ç¤ºï¼çæäºä¸ä¸ªå¼æ¥ä»»å¡ï¼æä»¬å°å
¶æ 记为AãBåCãblock_oné¦å
轮询Aï¼Aå¼å§è¿æ¥å°example.comã䏿¦Aè¿åPoll::Pendingï¼block_on就尿³¨æå转åä¸ä¸ä¸ªçæçä»»å¡ï¼è½®è¯¢æªæ¥å¼Bï¼æåæ¯Cï¼å®ä»¬åèªå¼å§è¿æ¥å°ç¸åºçæå¡å¨ã
彿æå¯è½®è¯¢çæªæ¥å¼é½è¿åPoll::Pendingæ¶ï¼block_onè¿å
¥ç¡ç ç¶æï¼ç´å°æä¸ªTcpStream::connectçæªæ¥å¼è¡¨æå
¶ä»»å¡å¼å¾å次轮询ã
å¨è¿æ¬¡æ§è¡ä¸ï¼en.wikipedia.orgæå¡å¨çååºæ¯å
¶ä»æå¡å¨æ´å¿«ï¼æä»¥è¯¥ä»»å¡æå
宿ãå½ä¸ä¸ªçæçä»»å¡å®ææ¶ï¼å®ä¼å°å
¶å¼ä¿åå¨JoinHandleä¸ï¼å¹¶å°å
¶æ 记为已åå¤å¥½ï¼è¿æ ·many_requestså¨çå¾
宿¶å°±å¯ä»¥ç»§ç»è¿è¡ãæç»ï¼å
¶ä»å¯¹cheapo_requestçè°ç¨è¦ä¹æåï¼è¦ä¹è¿åä¸ä¸ªé误ï¼ç¶åmany_requestsæ¬èº«å°±å¯ä»¥è¿åãæåï¼main彿°ä»block_onæ¥æ¶ç»æåéã
ææè¿äºæ§è¡é½åçå¨å个线ç¨ä¸ï¼éè¿å¯¹å®ä»¬çæªæ¥å¼è¿è¡è¿ç»è½®è¯¢ï¼ä¸æ¬¡å¯¹cheapo_requestçè°ç¨ç¸äºäº¤éè¿è¡ãä¸ä¸ªå¼æ¥è°ç¨çèµ·æ¥åæ¯ä¸ä¸ªå½æ°è°ç¨è¿è¡è³å®æï¼ä½è¿ä¸ªå¼æ¥è°ç¨å®é
䏿¯éè¿å¯¹æªæ¥å¼çpollæ¹æ³è¿è¡ä¸ç³»å忥è°ç¨æ¥å®ç°çãæ¯ä¸ªåç¬çpollè°ç¨é½è½å¿«éè¿åï¼è®©åºçº¿ç¨ï¼ä»¥ä¾¿å¦ä¸ä¸ªå¼æ¥è°ç¨å¯ä»¥è½®æµæ§è¡ã
æä»¬ç»äºå®ç°äºæ¬ç« å¼å¤´è®¾å®çç®æ ï¼è®©çº¿ç¨å¨çå¾
I/O宿æ¶å»åå
¶ä»å·¥ä½ï¼è¿æ ·çº¿ç¨çèµæºå°±ä¸ä¼è¢«é²ç½®ãæ´å¥½çæ¯ï¼å®ç°è¿ä¸ªç®æ ç代ç çèµ·æ¥é叏忮éçRust代ç ï¼æäºå½æ°è¢«æ 记为asyncï¼æäºå½æ°è°ç¨åé¢è·ç.awaitï¼æä»¬ä½¿ç¨async_stdä¸ç彿°è䏿¯stdä¸ç彿°ï¼ä½é¤æ¤ä¹å¤ï¼å®å°±æ¯æ®éçRust代ç ã
éè¦è®°ä½çæ¯ï¼å¼æ¥ä»»å¡å线ç¨ä¹é´ä¸ä¸ªéè¦çåºå«æ¯ï¼ä»ä¸ä¸ªå¼æ¥ä»»å¡åæ¢å°å¦ä¸ä¸ªå¼æ¥ä»»å¡åªåçå¨await表达å¼å¤ï¼å³å½æ£å¨çå¾
çæªæ¥å¼è¿åPoll::Pendingæ¶ãè¿æå³çï¼å¦æä½ å¨cheapo_request䏿¾å
¥ä¸ä¸ªé¿æ¶é´è¿è¡ç计ç®ï¼å¨è¿ä¸ªè®¡ç®å®æä¹åï¼ä½ ä¼ éç»spawn_localçå
¶ä»ä»»å¡é½æ²¡ææºä¼è¿è¡ãè对äºçº¿ç¨ï¼è¿ä¸ªé®é¢ä¸ä¼åºç°ï¼æä½ç³»ç»å¯ä»¥å¨ä»»ä½æ¶åæåä»»ä½çº¿ç¨ï¼å¹¶è®¾ç½®å®æ¶å¨ä»¥ç¡®ä¿æ²¡æçº¿ç¨ç¬å å¤çå¨ã
弿¥ä»£ç ä¾èµäºå
±äº«çº¿ç¨çæªæ¥å¼ç主å¨åä½ãå¦æä½ éè¦è®©é¿æ¶é´è¿è¡ç计ç®ä¸å¼æ¥ä»£ç å
±åï¼æ¬ç« åé¢ç âé¿æ¶é´è¿è¡ç计ç®ï¼yield_nowåspawn_blockingâ æè¿°äºä¸äºéæ©ã
# 弿¥å
é¤äºå¼æ¥å½æ°ï¼Rustè¿æ¯æå¼æ¥åãæ®éçåè¯å¥è¿åå
¶æåä¸ä¸ªè¡¨è¾¾å¼çå¼ï¼è弿¥åè¿åå
¶æåä¸ä¸ªè¡¨è¾¾å¼çå¼çæªæ¥å¼ãä½ å¯ä»¥å¨å¼æ¥åä¸ä½¿ç¨await表达å¼ã
弿¥åçèµ·æ¥åæ®éçåè¯å¥ï¼åªæ¯å¨åé¢å ä¸asyncå
³é®åï¼
let serve_one = async {
use async_std::net;
// çå¬è¿æ¥å¹¶æ¥åä¸ä¸ªè¿æ¥ã
let listener = net::TcpListener::bind("localhost:8087").await?;
let (mut socket, _addr) = listener.accept().await?;
// ä¸ `socket` ä¸ç客æ·ç«¯è¿è¡éä¿¡ã
...
};
2
3
4
5
6
7
8
è¿ç¨ä¸ä¸ªæªæ¥å¼åå§åäºserve_oneï¼å½å¯¹è¿ä¸ªæªæ¥å¼è¿è¡è½®è¯¢æ¶ï¼å®ä¼çå¬å¹¶å¤çå个TCPè¿æ¥ãå°±å弿¥å½æ°è°ç¨å¨å
¶æªæ¥å¼è¢«è½®è¯¢ä¹åä¸ä¼å¼å§æ§è¡ä¸æ ·ï¼è¿ä¸ªåç主ä½å¨serve_one被轮询ä¹åä¹ä¸ä¼å¼å§æ§è¡ã
å¦æä½ å¨å¼æ¥åä¸å¯¹ä¸ä¸ªé误åºç¨?æä½ç¬¦ï¼å®åªä¼ä»è¿ä¸ªåè¿åï¼è䏿¯ä»å
å«å®ç彿°è¿åãä¾å¦ï¼å¦æåé¢çbindè°ç¨è¿åä¸ä¸ªé误ï¼?æä½ç¬¦ä¼å°å
¶ä½ä¸ºserve_oneçæç»å¼è¿åãåæ ·ï¼return表达å¼ä¹æ¯ä»å¼æ¥åè¿åï¼è䏿¯ä»å¤é¨å½æ°è¿åã
妿ä¸ä¸ªå¼æ¥åå¼ç¨äºå¨å¨å´ä»£ç ä¸å®ä¹çåéï¼å®çæªæ¥å¼ä¼æè·è¿äºåéçå¼ï¼å°±åéå
䏿 ·ãå°±åmoveéå
ï¼è§ âçªååéçéå
âï¼ä¸æ ·ï¼ä½ å¯ä»¥å¨åå¼å¤´ä½¿ç¨async moveæ¥è·åæè·åéçæææï¼èä¸åªæ¯ææå®ä»¬çå¼ç¨ã
弿¥åæä¾äºä¸ç§ç®æ´çæ¹å¼ï¼å°ä½ 叿弿¥è¿è¡ç䏿®µä»£ç åç¦»åºæ¥ãä¾å¦ï¼å¨ä¸ä¸èä¸ï¼spawn_localéè¦ä¸ä¸ªçå½å¨æä¸º'staticçæªæ¥å¼ï¼æä»¥æä»¬å®ä¹äºcheapo_owning_requestå
è£
彿°ï¼ä»¥è·å¾ä¸ä¸ªæ¥æå
¶åæ°æææçæªæ¥å¼ãä½ å¯ä»¥éè¿å¨å¼æ¥åä¸è°ç¨cheapo_requestï¼èæ é使ç¨å
è£
彿°ï¼å°±è½è¾¾å°åæ ·çææï¼
pub async fn many_requests(requests: Vec<(String, u16, String)>) -> Vec<std::io::Result<String>> {
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(task::spawn_local(async move {
cheapo_request(&host, port, &path).await
}));
}
...
}
2
3
4
5
6
7
8
9
10
ç±äºè¿æ¯ä¸ä¸ªasync moveåï¼å®çæªæ¥å¼åmoveéå
䏿 ·è·åStringç±»åçhoståpathå¼çæææãç¶åå®å°å¼ç¨ä¼ éç»cheapo_requestãåç¨æ£æ¥å¨å¯ä»¥çå°ï¼è¿ä¸ªåçawait表达å¼è·åäºcheapo_requestçæªæ¥å¼çæææï¼æä»¥å¯¹hoståpathçå¼ç¨ççå½å¨æä¸ä¼è¶
è¿å®ä»¬æåç¨ç被æè·åéççå½å¨æãè¿ä¸ªå¼æ¥åå®ç°äºä¸cheapo_owning_requestç¸åçåè½ï¼ä½æ ·æ¿ä»£ç æ´å°ã
ä½ å¯è½ä¼éå°ä¸ä¸ªå°é®é¢ï¼å³æ²¡æç±»ä¼¼äºå¼æ¥å½æ°åæ°åé¢ç-> Tè¿æ ·çè¯æ³æ¥æå®å¼æ¥åçè¿åç±»åãè¿å¨ä½¿ç¨?æä½ç¬¦æ¶å¯è½ä¼å¯¼è´é®é¢ï¼
let input = async_std::io::stdin();
let future = async {
let mut line = String::new();
// è¿è¿å `std::io::Result<usize>`ã
input.read_line(&mut line).await?;
println!("Read line: {}", line);
Ok(())
};
2
3
4
5
6
7
8
è¿æ®µä»£ç 伿¥éï¼
error: type annotations needed
|
42 | let future = async {
| ------ consider giving `future` a type
...
46 | input.read_line(&mut line).await?;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot infer type
2
3
4
5
6
7
Rustæ æ³å¤æè¿ä¸ªå¼æ¥åçè¿åç±»ååºè¯¥æ¯ä»ä¹ãread_lineæ¹æ³è¿åResult<(), std::io::Error>ï¼ä½ç±äº?æä½ç¬¦ä½¿ç¨Fromç¹æ§å°å½åçé误类å转æ¢ä¸ºæéçä»»ä½ç±»åï¼æä»¥è¿ä¸ªå¼æ¥åçè¿åç±»åå¯ä»¥æ¯ä»»ä½å®ç°äºFrom<std::io::Error>çE对åºçResult<(), E>ã
æªæ¥çæ¬çRustå¯è½ä¼æ·»å æå®å¼æ¥åè¿åç±»åçè¯æ³ãç®åï¼ä½ å¯ä»¥éè¿æç¡®ååºåçæç»Okçç±»åæ¥è§£å³è¿ä¸ªé®é¢ï¼
let future = async {
...
Ok::<(), std::io::Error>(())
};
2
3
4
ç±äºResultæ¯ä¸ä¸ªæ³åç±»åï¼éè¦æåç±»ååé误类åä½ä¸ºåæ°ï¼æä»¥æä»¬å¯ä»¥å¨ä½¿ç¨OkæErræ¶åè¿æ ·æå®è¿äºç±»ååæ°ã
# ä»å¼æ¥åæå»ºå¼æ¥å½æ°
弿¥å为æä»¬æä¾äºå¦ä¸ç§å®ç°ä¸å¼æ¥å½æ°ç¸åææçæ¹å¼ï¼å¹¶ä¸æ´å
·çµæ´»æ§ãä¾å¦ï¼æä»¬å¯ä»¥å°cheapo_request示ä¾åæä¸ä¸ªæ®éç忥彿°ï¼è¯¥å½æ°è¿åä¸ä¸ªå¼æ¥åçæªæ¥å¼ï¼
use std::io;
use std::future::Future;
fn cheapo_request<'a>(host: &'a str, port: u16, path: &'a str) -> impl Future<Output = io::Result<String>> + 'a {
async move {
...彿°ä½...
}
}
2
3
4
5
6
7
8
å½ä½ è°ç¨è¿ä¸ªçæ¬ç彿°æ¶ï¼å®ä¼ç«å³è¿å弿¥åçå¼çæªæ¥å¼ãè¿ä¼æè·å½æ°çåæ°ï¼å¹¶ä¸è¡ä¸ºä¸å¼æ¥å½æ°è¿åçæªæ¥å¼ä¸æ ·ãç±äºæä»¬æ²¡æä½¿ç¨async fnè¯æ³ï¼æä»¥éè¦å¨è¿åç±»åä¸ååºimpl Futureï¼ä½å¯¹è°ç¨è
æ¥è¯´ï¼è¿ä¸¤ä¸ªå®ä¹æ¯ç¸å彿°ç¾åçå¯äºæ¢å®ç°ã
å½ä½ 叿å¨å½æ°è¢«è°ç¨æ¶ï¼å¨åå»ºç»æçæªæ¥å¼ä¹åç«å³è¿è¡ä¸äºè®¡ç®æ¶ï¼ç¬¬äºç§æ¹æ³ä¼å¾æç¨ãä¾å¦ï¼å¦ä¸ç§ä½¿cheapo_requestä¸spawn_localå
¼å®¹çæ¹æ³æ¯å°å®åæä¸ä¸ªåæ¥å½æ°ï¼è¿åä¸ä¸ªçå½å¨æä¸º'staticçæªæ¥å¼ï¼è¯¥æªæ¥å¼æè·å
¶åæ°çå®å
¨æ¥ææææç坿¬ï¼
fn cheapo_request(host: &str, port: u16, path: &str) -> impl Future<Output = io::Result<String>> + 'static {
let host = host.to_string();
let path = path.to_string();
async move {
...ä½¿ç¨ `&*host`ã`port` å `path`...
}
}
2
3
4
5
6
7
è¿ä¸ªçæ¬è®©å¼æ¥åå°hoståpathä½ä¸ºæ¥ææææçString弿è·ï¼è䏿¯&strå¼ç¨ãç±äºè¿ä¸ªæªæ¥å¼æ¥æè¿è¡æéçæææ°æ®ï¼æä»¥å®ççå½å¨æä¸º'staticï¼æä»¬å¨åé¢çç¾åä¸æç¡®ååºäº+ 'staticï¼ä½'staticæ¯-> implè¿åç±»åçé»è®¤çå½å¨æï¼æä»¥çç¥å®ä¹æ²¡æå½±åï¼ã
ç±äºè¿ä¸ªçæ¬çcheapo_requestè¿åçæªæ¥å¼ççå½å¨æä¸º'staticï¼æä»¬å¯ä»¥ç´æ¥å°å®ä»¬ä¼ éç»spawn_localï¼
let join_handle = async_std::task::spawn_local(
cheapo_request("areweasyncyet.rs", 80, "/")
);
...å
¶ä»å·¥ä½...
let response = join_handle.await?;
2
3
4
5
# å¨çº¿ç¨æ± ä¸çæå¼æ¥ä»»å¡
å°ç®å为æ¢ï¼æä»¬å±ç¤ºç示ä¾å ä¹é½ææ¶é´è±å¨äºçå¾
I/Oä¸ï¼ä½æäºå·¥ä½è´è½½æ¯å¤çå¨è®¡ç®åé»å¡æä½çæ··åãå½è®¡ç®éè¶³å¤å¤§ï¼å个å¤ç卿 æ³å¤çæ¶ï¼ä½ å¯ä»¥ä½¿ç¨async_std::task::spawnå°ä¸ä¸ªæªæ¥å¼çæå°ä¸ä¸ªå·¥ä½çº¿ç¨æ± ä¸ï¼è¿äºçº¿ç¨ä¸é¨ç¨äºè½®è¯¢åå¤å¥½åå¾è¿å±çæªæ¥å¼ã
async_std::task::spawnçç¨æ³ä¸async_std::task::spawn_local类似ï¼
use async_std::task;
let mut handles = vec![];
for (host, port, path) in requests {
handles.push(task::spawn(async move {
cheapo_request(&host, port, &path).await
}));
}
...
2
3
4
5
6
7
8
9
ä¸spawn_local䏿 ·ï¼spawnè¿åä¸ä¸ªJoinHandleå¼ï¼ä½ å¯ä»¥ä½¿ç¨awaitæ¥è·åæªæ¥å¼çæç»ç»æãä½ä¸spawn_localä¸åçæ¯ï¼è¿ä¸ªæªæ¥å¼ä¸éè¦çå°ä½ è°ç¨block_onæä¼è¢«è½®è¯¢ã䏿¦çº¿ç¨æ± ä¸çæä¸ªçº¿ç¨ç©ºé²ï¼å®å°±ä¼å°è¯å¯¹å
¶è¿è¡è½®è¯¢ã
å¨å®è·µä¸ï¼spawnæ¯spawn_local使ç¨å¾æ´å¹¿æ³ï¼ä»
ä»
æ¯å ä¸ºäººä»¬å¸æèªå·±çå·¥ä½è´è½½ï¼æ 论计ç®åé»å¡æä½å¦ä½æ··åï¼é½è½å¨æºå¨çèµæºä¸å¾å°åè¡¡åé
ã
使ç¨spawnæ¶è¦è®°ä½çä¸ç¹æ¯ï¼çº¿ç¨æ± ä¼å°½éä¿æå¿ç¢ç¶æï¼æä»¥ä½ çæªæ¥å¼ä¼è¢«æå
è½®å°ç线ç¨è¿è¡è½®è¯¢ãä¸ä¸ªå¼æ¥è°ç¨å¯è½å¨ä¸ä¸ªçº¿ç¨ä¸å¼å§æ§è¡ï¼å¨await表达å¼å¤é»å¡ï¼ç¶åå¨å¦ä¸ä¸ªçº¿ç¨ä¸æ¢å¤æ§è¡ãæä»¥ï¼è½ç¶å°å¼æ¥å½æ°è°ç¨ç使¯ä¸æ®µåä¸ãè¿è´¯çä»£ç æ§è¡è¿ç¨æ¯ä¸ç§åççç®åï¼å®é
ä¸ï¼å¼æ¥å½æ°åawait表达å¼çç®çå°±æ¯é¼å±ä½ è¿æ ·æ³ï¼ï¼ä½è¿ä¸ªè°ç¨å®é
ä¸å¯è½ç±è®¸å¤ä¸åççº¿ç¨æ¥æ§è¡ã
å¦æä½ ä½¿ç¨çº¿ç¨å±é¨åå¨ï¼å¯è½ä¼æè®¶å°åç°ï¼å¨await表达å¼ä¹ååå¨å¨é£éçæ°æ®ï¼å¨ä¹å被å®å
¨ä¸åçæ°æ®å代äºï¼å ä¸ºä½ çä»»å¡ç°å¨ç±çº¿ç¨æ± ä¸çå¦ä¸ä¸ªçº¿ç¨è¿è¡è½®è¯¢ãå¦æè¿æ¯ä¸ªé®é¢ï¼ä½ åºè¯¥ä½¿ç¨ä»»å¡å±é¨åå¨ï¼æå
³è¯¦ç»ä¿¡æ¯ï¼è¯·æ¥çasync-stdåºææ¡£ä¸çtask_local!å®ã
# 使¯ä½ çæªæ¥å¼å®ç°äºSendåï¼
spawnæä¸ä¸ªspawn_local没æçéå¶ãç±äºæªæ¥å¼ä¼è¢«åéå°å¦ä¸ä¸ªçº¿ç¨ä¸è¿è¡ï¼æä»¥å®å¿
é¡»å®ç°Sendæ è®°ç¹æ§ãæä»¬å¨ â线ç¨å®å
¨ï¼SendåSyncâ ä¸ä»ç»è¿Sendãåªæå½ä¸ä¸ªæªæ¥å¼å
å«çææå¼é½æ¯Sendæ¶ï¼å®ææ¯Sendï¼ææå½æ°åæ°ãå±é¨åéï¼çè³å¿å临æ¶å¼é½å¿
é¡»å¯ä»¥å®å
¨å°ç§»å¨å°å¦ä¸ä¸ªçº¿ç¨ã
ååé¢ä¸æ ·ï¼è¿ä¸ªè¦æ±å¹¶é弿¥ä»»å¡æç¹æï¼å¦æä½ è¯å¾ä½¿ç¨std::thread::spawnå¯å¨ä¸ä¸ªéå
æè·äºéSendå¼ç线ç¨ï¼ä¹ä¼å¾å°ç±»ä¼¼çé误ãä¸åä¹å¤å¨äºï¼ä¼ éç»std::thread::spawnçéå
ä¼çå¨ä¸ºè¿è¡å®èå建ç线ç¨ä¸ï¼èå¨çº¿ç¨æ± ä¸çæçæªæ¥å¼å¨æ¯æ¬¡çå¾
æ¶é½å¯è½ä»ä¸ä¸ªçº¿ç¨ç§»å¨å°å¦ä¸ä¸ªçº¿ç¨ã
è¿ä¸ªéå¶å¾å®¹æå¨ä¸ç»æé´è§¦åãä¾å¦ï¼ä¸é¢è¿æ®µä»£ç çèµ·æ¥æ²¡ä»ä¹é®é¢ï¼
use async_std::task;
use std::rc::Rc;
async fn reluctant() -> String {
let string = Rc::new("ref-counted string".to_string());
some_asynchronous_thing().await;
format!("Your splendid string: {}", string)
}
task::spawn(reluctant());
2
3
4
5
6
7
8
9
10
ä¸ä¸ªå¼æ¥å½æ°çæªæ¥å¼éè¦ä¿åè¶³å¤çä¿¡æ¯ï¼ä»¥ä¾¿å½æ°è½ä»await表达å¼å¤ç»§ç»æ§è¡ãå¨è¿ä¸ªä¾åä¸ï¼reluctantçæªæ¥å¼å¨awaitä¹åå¿
须使ç¨stringï¼æä»¥è¿ä¸ªæªæ¥å¼è³å°ææ¶ä¼å
å«ä¸ä¸ªRc<String>å¼ãç±äºRcæéä¸è½å¨å¤ä¸ªçº¿ç¨ä¹é´å®å
¨å
±äº«ï¼æä»¥è¿ä¸ªæªæ¥å¼æ¬èº«ä¸æ¯Sendãèç±äºspawnåªæ¥åSendçæªæ¥å¼ï¼Rust伿¥éï¼
error: future cannot be sent between threads safely
|
17 | task::spawn(reluctant());
| ^^^^^^^^^^^ future returned by `reluctant` is not
`Send`
|
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
`async_std::task::spawn`
|
= help: within `impl Future`, the trait `Send` is not
implemented
for `Rc<String>`
note: future is not `Send` as this value is used across an await
|
10 | let string = Rc::new("ref-counted
string".to_string());
| ------ has type `Rc<String>` which is not
`Send`
11 |
12 | some_asynchronous_thing().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
await occurs here, with `string` maybe used
later
...
15 | }
| - `string` is later dropped here
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
è¿ä¸ªé误信æ¯å¾é¿ï¼ä½å å«äºå¾å¤æç¨çç»èï¼
- å®è§£éäºä¸ºä»ä¹æªæ¥å¼éè¦æ¯
Sendï¼task::spawnè¦æ±å¦æ¤ã - å®è§£éäºåªä¸ªå¼ä¸æ¯
Sendï¼å±é¨åéstringï¼å ¶ç±»åæ¯Rc<String>ã - å®è§£éäº
string为ä»ä¹ä¼å½±åæªæ¥å¼ï¼å®å¨æå®çawait表达å¼çä½ç¨åå ã
æä¸¤ç§æ¹æ³å¯ä»¥è§£å³è¿ä¸ªé®é¢ãä¸ç§æ¯éå¶éSendå¼çä½ç¨åï¼ä½¿å
¶ä¸è¦çä»»ä½await表达å¼ï¼è¿æ ·å°±ä¸éè¦å°å
¶ä¿åå¨å½æ°çæªæ¥å¼ä¸ï¼
async fn reluctant() -> String {
let return_value = {
let string = Rc::new("ref-counted string".to_string());
format!("Your splendid string: {}", string)
// `Rc<String>` å¨è¿éè¶
åºä½ç¨å...
};
// ... å æ¤å¨æä»¬å¨è¿éæåæ¶å®ä¸åå¨ã
some_asynchronous_thing().await;
return_value
}
2
3
4
5
6
7
8
9
10
å¦ä¸ç§è§£å³æ¹æ¡æ¯ç®åå°ä½¿ç¨std::sync::Arc代æ¿RcãArc使ç¨ååæ´æ°æ¥ç®¡çå
¶å¼ç¨è®¡æ°ï¼è¿ä¼ä½¿å®ç¨å¾®æ
¢ä¸äºï¼ä½Arcæéæ¯Sendã
è½ç¶æç»ä½ ä¼å¦ä¼è¯å«å¹¶é¿å
使ç¨éSendç±»åï¼ä½ä¸å¼å§å®ä»¬å¯è½ä¼æç¹è®©äººæå¤ãï¼è³å°ï¼æ¬ä¹¦çä½è
ç»å¸¸æå°æè®¶ãï¼ä¾å¦ï¼è¾æ§çRustä»£ç ææ¶ä¼ä½¿ç¨è¿æ ·çæ³åç»æç±»åï¼
// 䏿¨èï¼
type GenericError = Box<dyn std::error::Error>;
type GenericResult<T> = Result<T, GenericError>;
2
3
è¿ä¸ªGenericErrorç±»å使ç¨è£
ç®±çç¹æ§å¯¹è±¡æ¥ææä»»ä½å®ç°äºstd::error::Errorçç±»åçå¼ãä½å®æ²¡æå¯¹å
¶è¿è¡è¿ä¸æ¥éå¶ï¼å¦ææäººæä¸ä¸ªå®ç°äºErrorçéSendç±»åï¼ä»ä»¬å¯ä»¥å°è¯¥ç±»åçè£
ç®±å¼è½¬æ¢ä¸ºGenericErrorã
ç±äºåå¨è¿ç§å¯è½æ§ï¼GenericError䏿¯Sendï¼å æ¤ä¸é¢ç代ç å°æ æ³å·¥ä½ï¼
fn some_fallible_thing() -> GenericResult<i32> {
...
}
// è¿ä¸ªå½æ°çæªæ¥å¼ä¸æ¯ `Send`...
async fn unfortunate() {
// ... å 为è¿ä¸ªè°ç¨çå¼...
match some_fallible_thing() {
Err(error) => {
report_error(error);
}
Ok(output) => {
// ... å¨è¿ä¸ª await ä¸åå¨...
use_output(output).await;
}
}
}
// ... å æ¤è¿ä¸ª `spawn` æ¯ä¸ä¸ªé误ã
async_std::task::spawn(unfortunate());
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ä¸åé¢çä¾å䏿 ·ï¼ç¼è¯å¨çé误信æ¯è§£éäºåçäºä»ä¹ï¼æåºResultç±»åæ¯é®é¢æå¨ãç±äºRust认为some_fallible_thingçç»æå¨æ´ä¸ªmatchè¯å¥ä¸é½åå¨ï¼å
æ¬await表达å¼ï¼æä»¥å®å¤å®unfortunateçæªæ¥å¼ä¸æ¯Sendãä»Rustçè§åº¦æ¥çï¼è¿ä¸ªé误æç¹è¿äºä¿å®ï¼è½ç¶GenericErrorç¡®å®ä¸è½å®å
¨å°åéå°å¦ä¸ä¸ªçº¿ç¨ï¼ä½awaitåªå¨ç»æä¸ºOkæ¶æä¼åçï¼æä»¥å½æä»¬çå¾
use_outputçæªæ¥å¼æ¶ï¼é误å¼å®é
ä¸å¹¶ä¸åå¨ã
çæ³çè§£å³æ¹æ¡æ¯ä½¿ç¨æ´ä¸¥æ ¼çæ³åé误类åï¼å°±åæä»¬å¨ âå¤çå¤ç§é误类åâ ä¸å»ºè®®ç飿 ·ï¼
type GenericError = Box<dyn std::error::Error + Send + Sync +'static>;
type GenericResult<T> = Result<T, GenericError>;
2
è¿ä¸ªç¹æ§å¯¹è±¡æç¡®è¦æ±åºå±çé误类åå®ç°Sendï¼è¿æ ·å°±æ²¡é®é¢äºã
å¦æä½ çæªæ¥å¼ä¸æ¯Sendï¼å¹¶ä¸ä¸æ¹ä¾¿ä½¿å
¶æä¸ºSendï¼é£ä¹ä½ ä»ç¶å¯ä»¥ä½¿ç¨spawn_localå¨å½å线ç¨ä¸è¿è¡å®ãå½ç¶ï¼ä½ éè¦ç¡®ä¿çº¿ç¨å¨æä¸ªæ¶å»è°ç¨block_onï¼ç»å®è¿è¡çæºä¼ï¼èä¸ä½ æ æ³ä»å¨å¤ä¸ªå¤çå¨ä¹é´åé
å·¥ä½ä¸åçã
# é¿æ¶é´è¿è¡ç计ç®ï¼yield_nowåspawn_blocking
为äºè®©ä¸ä¸ªæªæ¥å¼è½ä¸å
¶ä»ä»»å¡å¨åä¸çº¿ç¨ä¸å好å°å
±äº«èµæºï¼å
¶pollæ¹æ³åºå§ç»å°½å¯è½å¿«éå°è¿åã使¯ï¼å¦æä½ æ£å¨æ§è¡ä¸ä¸ªé¿æ¶é´ç计ç®ï¼å¯è½éè¦å¾é¿æ¶é´æè½å°è¾¾ä¸ä¸ä¸ªawaitï¼è¿ä¼ä½¿å
¶ä»å¼æ¥ä»»å¡å¨çå¾
线ç¨è½®å°å®ä»¬æ§è¡æ¶çå¾
çæ¶é´æ¯é¢ææ´é¿ã
ä¸ç§é¿å
è¿ç§æ
åµçæ¹æ³æ¯å¶å°è¿è¡çå¾
æä½ãasync_std::task::yield_now彿°è¿åä¸ä¸ªä¸é¨ä¸ºæ¤è®¾è®¡çç®åæªæ¥å¼ï¼
while computation_not_done() {
...æ§è¡ä¸ä¸ªä¸çè§æ¨¡çè®¡ç®æ¥éª¤...
async_std::task::yield_now().await;
}
2
3
4
ç¬¬ä¸æ¬¡è½®è¯¢yield_nowçæªæ¥å¼æ¶ï¼å®ä¼è¿åPoll::Pendingï¼ä½ä¼è¡¨æå¾å¿«å°±å¼å¾å次轮询ãè¿æ ·åçæææ¯ï¼ä½ ç弿¥è°ç¨ä¼è®©åºçº¿ç¨ï¼è®©å
¶ä»ä»»å¡ææºä¼è¿è¡ï¼èä¸ä½ çè°ç¨å¾å¿«ä¹ä¼å次è·å¾æ§è¡æºä¼ãç¬¬äºæ¬¡è½®è¯¢yield_nowçæªæ¥å¼æ¶ï¼å®ä¼è¿åPoll::Ready(())ï¼æ¤æ¶ä½ ç弿¥å½æ°å¯ä»¥ç»§ç»æ§è¡ã
ç¶èï¼è¿ç§æ¹æ³å¹¶ä¸æ»æ¯å¯è¡çãå¦æä½ æ£å¨ä½¿ç¨å¤é¨åºæ¥è¿è¡é¿æ¶é´è¿è¡ç计ç®ï¼æè
è°ç¨CæC++代ç ï¼å¯è½ä¸æ¹ä¾¿ä¿®æ¹è¿äºä»£ç 以使å
¶æ´éå弿¥ç¼ç¨ãæè
ï¼å¯è½å¾é¾ç¡®ä¿è®¡ç®è¿ç¨ä¸çæ¯ä¸æ¡è·¯å¾é½è½æ¶ä¸æ¶å°æ§è¡å°awaitã
对äºè¿ç§æ
åµï¼ä½ å¯ä»¥ä½¿ç¨async_std::task::spawn_blockingãè¿ä¸ªå½æ°æ¥åä¸ä¸ªéå
ï¼å¨å®èªå·±ç线ç¨ä¸å¯å¨è¯¥éå
è¿è¡ï¼å¹¶è¿åå
¶è¿åå¼çæªæ¥å¼ã弿¥ä»£ç å¯ä»¥çå¾
è¿ä¸ªæªæ¥å¼ï¼å°å
¶æå¨çº¿ç¨è®©ç»å
¶ä»ä»»å¡ï¼ç´å°è®¡ç®å®æãéè¿å°ç¹éç工使¾å¨ä¸ä¸ªåç¬ç线ç¨ä¸ï¼ä½ å¯ä»¥è®©æä½ç³»ç»è´è´£åçå°åé
å¤çå¨èµæºã
ä¾å¦ï¼å设æä»¬éè¦å°ç¨æ·æä¾çå¯ç ä¸åå¨å¨èº«ä»½éªè¯æ°æ®åºä¸çåå¸çæ¬è¿è¡æ¯å¯¹ã为äºå®å
¨èµ·è§ï¼éªè¯å¯ç éè¦è¿è¡å¤§éç计ç®ï¼è¿æ ·å³ä½¿æ»å»è
è·åäºæä»¬æ°æ®åºç坿¬ï¼ä»ä»¬ä¹æ æ³ç®åå°å°è¯æ°ä¸äº¿ä¸ªå¯è½çå¯ç æ¥è¿è¡å¹é
ãargonauticaåºæä¾äºä¸ä¸ªä¸é¨ç¨äºåå¨å¯ç çåå¸å½æ°ï¼æ£ç¡®çæçargonauticaåå¸å¼éè¦è±è´¹ç¸å½é¿çæ¶é´æ¥éªè¯ãæä»¬å¯ä»¥å¨å¼æ¥åºç¨ç¨åºä¸è¿æ ·ä½¿ç¨argonauticaï¼0.2çæ¬ï¼ï¼
async fn verify_password(password: &str, hash: &str, key: &str) -> Result<bool, argonautica::Error> {
// å¤å¶åæ°ï¼ä»¥ä¾¿éå
å¯ä»¥æ¯'staticçã
let password = password.to_string();
let hash = hash.to_string();
let key = key.to_string();
async_std::task::spawn_blocking(move || {
argonautica::Verifier::default()
.with_hash(hash)
.with_password(password)
.with_secret_key(key)
.verify()
}).await
}
2
3
4
5
6
7
8
9
10
11
12
13
妿å¨ç»å®keyï¼ä½ä¸ºæ´ä¸ªæ°æ®åºçå¯é¥ï¼çæ
åµä¸ï¼passwordä¸hashå¹é
ï¼è¯¥å½æ°å°è¿åOk(true)ãéè¿å¨ä¼ éç»spawn_blockingçéå
ä¸è¿è¡éªè¯ï¼æä»¬å°èæ¶çè®¡ç®æ¨å°äºå®èªå·±ç线ç¨ä¸ï¼ç¡®ä¿å®ä¸ä¼å½±åæä»¬å¯¹å
¶ä»ç¨æ·è¯·æ±çååºé度ã
# 弿¥è®¾è®¡çæ¯è¾
å¨å¾å¤æ¹é¢ï¼Rustç弿¥ç¼ç¨æ¹æ³ä¸å
¶ä»è¯è¨ç±»ä¼¼ãä¾å¦ï¼JavaScriptãC#åRust齿另await表达å¼ç弿¥å½æ°ãèä¸è¿äºè¯è¨é½æè¡¨ç¤ºæªå®æè®¡ç®çå¼ï¼Rustç§°å®ä»¬ä¸º âæªæ¥å¼ï¼futuresï¼âï¼JavaScriptç§°å®ä»¬ä¸º âæ¿è¯ºï¼promisesï¼âï¼C#ç§°å®ä»¬ä¸º âä»»å¡ï¼tasksï¼âï¼ä½å®ä»¬é½ä»£è¡¨ä¸ä¸ªä½ å¯è½éè¦çå¾
çå¼ã
ç¶èï¼Rust对轮询çä½¿ç¨ææä¸åãå¨JavaScriptåC#ä¸ï¼å¼æ¥å½æ°ä¸æ¦è¢«è°ç¨å°±ä¼ç«å³å¼å§è¿è¡ï¼ç³»ç»åºä¸å
ç½®äºä¸ä¸ªå
¨å±äºä»¶å¾ªç¯ï¼å½å¼æ¥å½æ°çå¾
çå¼å¯ç¨æ¶ï¼è¯¥äºä»¶å¾ªç¯ä¼æ¢å¤æèµ·ç弿¥å½æ°è°ç¨ãä½å¨Rustä¸ï¼å¼æ¥è°ç¨å¨ä½ å°å
¶ä¼ éç»block_onãspawnæspawn_localç彿°è¿è¡è½®è¯¢å¹¶æ¨å¨å·¥ä½å®æä¹åä¸ä¼æ§è¡ä»»ä½æä½ãè¿äºå½æ°è¢«ç§°ä¸ºæ§è¡å¨ï¼executorsï¼ï¼å®ä»¬æ¿æ
äºå
¶ä»è¯è¨ä¸å
¨å±äºä»¶å¾ªç¯çè§è²ã
å 为Rust让ç¨åºåéæ©æ§è¡å¨æ¥è½®è¯¢æªæ¥å¼ï¼æä»¥Rustç³»ç»ä¸ä¸éè¦å
ç½®å
¨å±äºä»¶å¾ªç¯ãasync-stdåºæä¾äºæä»¬å¨æ¬ç« ä¸ä½¿ç¨çæ§è¡å¨å½æ°ï¼èæä»¬å°å¨æ¬ç« åé¢ä½¿ç¨çtokioåºåå®ä¹äºå®èªå·±çä¸ç»ç±»ä¼¼çæ§è¡å¨å½æ°ã卿¬ç« ç»å°¾ï¼æä»¬è¿å°å®ç°èªå·±çæ§è¡å¨ãä½ å¯ä»¥å¨åä¸ä¸ªç¨åºä¸ä½¿ç¨è¿ä¸ç§æ§è¡å¨ã
# ä¸ä¸ªçæ£ç弿¥HTTP客æ·ç«¯
妿æä»¬ä¸å±ç¤ºä¸ä¸ªä½¿ç¨åéç弿¥HTTP客æ·ç«¯åºç示ä¾ï¼é£å°±å¤ªå¤±èäºï¼å 为è¿é常ç®åï¼è䏿å 个ä¸éçåºå¯ä¾éæ©ï¼å
æ¬reqweståsurfã
ä¸é¢æ¯å¯¹many_requestsçéåï¼å®æ¯åºäºcheapo_requestççæ¬æ´ç®åï¼ä½¿ç¨surfå¹¶åå°è¿è¡ä¸ç³»å请æ±ãä½ éè¦å¨Cargo.tomlæä»¶ä¸æ·»å 以ä¸ä¾èµé¡¹ï¼
[dependencies]
async-std = "1.7"
surf = "1.0"
2
3
ç¶åï¼æä»¬å¯ä»¥è¿æ ·å®ä¹many_requestsï¼
pub async fn many_requests(urls: &[String]) -> Vec<Result<String, surf::Exception>> {
let client = surf::Client::new();
let mut handles = vec![];
for url in urls {
let request = client.get(&url).recv_string();
handles.push(async_std::task::spawn(request));
}
let mut results = vec![];
for handle in handles {
results.push(handle.await);
}
results
}
fn main() {
let requests = &["http://example.com".to_string(),
"https://www.red-bean.com".to_string(),
"https://en.wikipedia.org/wiki/Main_Page".to_string()];
let results = async_std::task::block_on(many_requests(requests));
for result in results {
match result {
Ok(response) => println!("*** {}\n", response),
Err(err) => eprintln!("error: {}\n", err),
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
使ç¨å个surf::Clientæ¥ååºææè¯·æ±ï¼è¿æ ·å¦ææå ä¸ªè¯·æ±æååä¸å°æå¡å¨ï¼æä»¬å°±å¯ä»¥éç¨HTTPè¿æ¥ãèä¸è¿éä¸éè¦å¼æ¥åï¼ç±äºrecv_stringæ¯ä¸ä¸ªå¼æ¥æ¹æ³ï¼å®è¿åä¸ä¸ªSend + 'staticçæªæ¥å¼ï¼æä»¥æä»¬å¯ä»¥ç´æ¥å°å
¶æªæ¥å¼ä¼ éç»spawnã
# ä¸ä¸ªå¼æ¥å®¢æ·ç«¯åæå¡å¨
ç°å¨æ¯æ¶åææä»¬å°ç®å为æ¢è®¨è®ºçå ³é®æ¦å¿µç»åæä¸ä¸ªå¯è¿è¡çç¨åºäºãå¨å¾å¤§ç¨åº¦ä¸ï¼å¼æ¥åºç¨ç¨åºä¸æ®éçå¤çº¿ç¨åºç¨ç¨åºç±»ä¼¼ï¼ä½ä½ å¯ä»¥çæä¸ä¸ï¼è¿éææºä¼ç¼åæ´ç´§åãæ´å ·è¡¨ç°åç代ç ã
æ¬èçç¤ºä¾æ¯ä¸ä¸ªè天æå¡å¨å客æ·ç«¯ãæ¥ç宿´ä»£ç ãçæ£çè天系ç»å¾å¤æï¼æ¶åä»å®å ¨ãéæ°è¿æ¥å°éç§å管çç诸å¤é®é¢ï¼ä½æä»¬å¯¹å ¶è¿è¡äºç®åï¼åªä¿çäºä¸äºåºæ¬åè½ï¼ä»¥ä¾¿èç¦äºå 个æè¶£çç¹ã
ç¹å«å°ï¼æä»¬å¸æè½å¾å¥½å°å¤çèåï¼backpressureï¼é®é¢ãè¿æå³çï¼å¦æä¸ä¸ªå®¢æ·ç«¯çç½ç»è¿æ¥è¾æ ¢æå®å ¨æå¼è¿æ¥ï¼ç»ä¸è½å½±åå ¶ä»å®¢æ·ç«¯æèªå·±çèå¥äº¤æ¢æ¶æ¯çè½åãèä¸ï¼ç±äºä¸ä¸ªæ ¢å®¢æ·ç«¯ä¸åºè¯¥è®©æå¡å¨æ¶è大éå 忥ä¿åå ¶ä¸æå¢é¿çæ¶æ¯ç§¯åï¼æä»¬çæå¡å¨åºè¯¥ä¸¢å¼é£äºè·ä¸ä¸èå¥ç客æ·ç«¯çæ¶æ¯ï¼ä½è¦éç¥å®ä»¬æ¶æ¯æµä¸å®æ´ãï¼çæ£çè天æå¡å¨ä¼å°æ¶æ¯è®°å½å°ç£çä¸ï¼å¹¶è®©å®¢æ·ç«¯æ£ç´¢å®ä»¬éè¿çæ¶æ¯ï¼ä½æä»¬è¿éçç¥äºè¿é¨åãï¼
æä»¬ä½¿ç¨cargo new --lib async-chatå½ä»¤å¯å¨é¡¹ç®ï¼å¹¶å¨async-chat/Cargo.toml䏿·»å 以ä¸å
容ï¼
[package]
name = "async-chat"
version = "0.1.0"
authors = ["You <you@example.com>"]
edition = "2018"
[dependencies]
async-std = { version = "1.7", features = ["unstable"] }
tokio = { version = "1.0", features = ["sync"] }
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
2
3
4
5
6
7
8
9
10
11
æä»¬ä¾èµå个åºï¼
async-stdåºæ¯æä»¬å¨æ¬ç« ä¸ä¸ç´å¨ä½¿ç¨ç弿¥I/Oåè¯åå®ç¨å·¥å ·çéåãtokioåºæ¯å¦ä¸ä¸ªç±»ä¼¼äºasync-stdç弿¥åè¯éåï¼å®æ¯æå¤èãææççåºä¹ä¸ãå®è¢«å¹¿æ³ä½¿ç¨ï¼å¨è®¾è®¡åå®ç°ä¸é½éµå¾ªé«æ åï¼ä½ä½¿ç¨æ¶æ¯async-stdéè¦æ´å¤ç注æäºé¡¹ãtokioæ¯ä¸ä¸ªå¾å¤§çåºï¼ä½æä»¬åªéè¦å ¶ä¸çä¸ä¸ªç»ä»¶ï¼æä»¥Cargo.tomlä¾èµé¡¹ä¸çfeatures = ["sync"]åæ®µä¼å°tokioç²¾ç®ä¸ºæä»¬éè¦çé¨åï¼ä½¿å ¶æä¸ºä¸ä¸ªè½»é级çä¾èµé¡¹ãå¨å¼æ¥åºçæç³»ç»è¿ä¸å¤ªæççæ¶åï¼äººä»¬é¿å å¨åä¸ä¸ªç¨åºä¸åæ¶ä½¿ç¨tokioåasync-stdï¼ä½è¿ä¸¤ä¸ªé¡¹ç®ä¸ç´å¨åä½ï¼ç¡®ä¿åªè¦éµå¾ªæ¯ä¸ªåºçææ¡£è§åï¼åæ¶ä½¿ç¨å®ä»¬æ¯å¯è¡çã- æä»¬å¨ç¬¬18ç« ä¸è§è¿
serdeåserde_jsonåºãå®ä»¬ä¸ºæä»¬æä¾äºæ¹ä¾¿é«æçå·¥å ·æ¥çæåè§£æJSONï¼æä»¬çè天å议使ç¨JSONå¨ç½ç»ä¸è¡¨ç¤ºæ°æ®ãæä»¬å¸æä½¿ç¨serdeçä¸äºå¯éåè½ï¼æä»¥å¨å£°æä¾èµé¡¹æ¶éæ©äºè¿äºåè½ã
æä»¬è天åºç¨ç¨åºï¼å æ¬å®¢æ·ç«¯åæå¡å¨ï¼çæ´ä½ç»æå¦ä¸ï¼
async-chat
âââ Cargo.toml
âââ src
âââ lib.rs
âââ utils.rs
âââ bin
âââ client.rs
âââ server
âââ main.rs
âââ connection.rs
âââ group.rs
âââ group_table.rs
2
3
4
5
6
7
8
9
10
11
12
è¿ç§å
å¸å±ä½¿ç¨äºæä»¬å¨ âsrc/binç®å½â 䏿å°çCargoç¹æ§ï¼é¤äºä¸»åºsrc/lib.rsåå
¶å模åsrc/utils.rsä¹å¤ï¼å®è¿å
æ¬ä¸¤ä¸ªå¯æ§è¡æä»¶ï¼
src/bin/client.rsæ¯è天客æ·ç«¯çåæä»¶å¯æ§è¡æä»¶ãsrc/bin/serveræ¯æå¡å¨å¯æ§è¡æä»¶ï¼åå¸å¨å个æä»¶ä¸ï¼main.rså å«ä¸»å½æ°ï¼è¿æä¸ä¸ªå模åconnection.rsãgroup.rsågroup_table.rsã
卿¬ç« ä¸ï¼æä»¬å°éæ¥å±ç¤ºæ¯ä¸ªæºæä»¶çå
容ã䏿¦æææä»¶é½å°±ä½ï¼å¦æä½ å¨è¿ä¸ªé¡¹ç®ç®å½ä¸è¾å
¥cargo buildï¼å®å°ç¼è¯åºï¼å¹¶æå»ºä¸¤ä¸ªå¯æ§è¡æä»¶ãCargoä¼èªå¨å°åºä½ä¸ºä¾èµé¡¹å
å«è¿æ¥ï¼è¿ä½¿å¾å®æä¸ºæ¾ç½®å®¢æ·ç«¯åæå¡å¨å
±äº«å®ä¹çæ¹ä¾¿ä½ç½®ãåæ ·ï¼cargo check伿£æ¥æ´ä¸ªæºæä»¶æ ãè¦è¿è¡å
¶ä¸ä»»ä½ä¸ä¸ªå¯æ§è¡æä»¶ï¼ä½ å¯ä»¥ä½¿ç¨ä»¥ä¸å½ä»¤ï¼
$ cargo run --release --bin server -- localhost:8088
$ cargo run --release --bin client -- localhost:8088
2
--biné项æå®è¦è¿è¡ç坿§è¡æä»¶ï¼--é项åé¢çä»»ä½åæ°é½ä¼ä¼ éç»å¯æ§è¡æä»¶æ¬èº«ãæä»¬ç客æ·ç«¯åæå¡å¨åªéè¦ç¥éæå¡å¨çå°ååTCP端å£ã
# é误åç»æç±»å
åºçutils模åå®ä¹äºæä»¬å¨æ´ä¸ªåºç¨ç¨åºä¸ä½¿ç¨çç»æåé误类åãå¨src/utils.rsä¸ï¼
use std::error::Error;
pub type ChatError = Box<dyn Error + Send + Sync + 'static>;
pub type ChatResult<T> = Result<T, ChatError>;
2
3
4
è¿äºæ¯æä»¬å¨ âå¤çå¤ç§é误类åâ ä¸å»ºè®®ä½¿ç¨çéç¨é误类åãasync_stdãserde_jsonåtokioåºåèªå®ä¹äºå®ä»¬èªå·±çé误类åï¼ä½æ¯?æä½ç¬¦å¯ä»¥ä½¿ç¨æ ååºä¸Fromç¹æ§çå®ç°ï¼èªå¨å°å®ä»¬é½è½¬æ¢ä¸ºChatErrorï¼è¯¥å®ç°å¯ä»¥å°ä»»ä½åéçé误类å转æ¢ä¸ºBox<dyn Error + Send + Sync + 'static>ãSendåSync约æç¡®ä¿å¦æçæå°å¦ä¸ä¸ªçº¿ç¨ä¸çä»»å¡å¤±è´¥ï¼å®å¯ä»¥å®å
¨å°å°é误æ¥åç»ä¸»çº¿ç¨ã
å¨å®é
åºç¨ä¸ï¼å¯ä»¥èè使ç¨anyhowåºï¼å®æä¾äºä¸è¿äºç±»ä¼¼çErroråResultç±»åãanyhowåºä½¿ç¨æ¹ä¾¿ï¼å¹¶ä¸æä¾äºä¸äºChatErroråChatResultææ²¡æçä¸éçåè½ã
# åè®®
åºå¨lib.rsä¸å®ä¹çè¿ä¸¤ä¸ªç±»å䏿¶µçäºæ´ä¸ªè天åè®®ï¼
use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub mod utils;
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub enum FromClient {
Join { group_name: Arc<String> },
Post {
group_name: Arc<String>,
message: Arc<String>,
},
}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub enum FromServer {
Message {
group_name: Arc<String>,
message: Arc<String>,
},
Error(String),
}
#[test]
fn test_fromclient_json() {
use std::sync::Arc;
let from_client = FromClient::Post {
group_name: Arc::new("Dogs".to_string()),
message: Arc::new("Samoyeds rock!".to_string()),
};
let json = serde_json::to_string(&from_client).unwrap();
assert_eq!(json,
r#"{"Post":
{"group_name":"Dogs","message":"Samoyeds rock!"}}"#);
assert_eq!(serde_json::from_str::<FromClient>(&json).unwrap(),
from_client);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
FromClientæä¸¾è¡¨ç¤ºå®¢æ·ç«¯å¯ä»¥åéç»æå¡å¨çæ°æ®å
ï¼å®å¯ä»¥è¯·æ±å å
¥ä¸ä¸ªæ¿é´ï¼å¹¶åå
¶å å
¥ç任使¿é´åéæ¶æ¯ãFromServer表示æå¡å¨å¯ä»¥è¿åçå
容ï¼åéå°æä¸ªç¾¤ç»çæ¶æ¯åéè¯¯æ¶æ¯ã使ç¨å¼ç¨è®¡æ°çArc<String>è䏿¯æ®éçStringï¼æå©äºæå¡å¨å¨ç®¡ç群ç»åååæ¶æ¯æ¶é¿å
å¤å¶å符串ã
#[derive]屿§åè¯serdeåºä¸ºFromClientåFromServerçæSerializeåDeserializeç¹æ§çå®ç°ãè¿ä½¿æä»¬è½å¤è°ç¨serde_json::to_stringå°å®ä»¬è½¬æ¢ä¸ºJSONå¼ï¼éè¿ç½ç»åéï¼æåè°ç¨serde_json::from_strå°å®ä»¬è½¬æ¢åRustä¸çå½¢å¼ã
test_fromclient_jsonåå
æµè¯å±ç¤ºäºå
¶ç¨æ³ãæäºserdeæ´¾ççSerializeå®ç°ï¼æä»¬å¯ä»¥è°ç¨serde_json::to_stringå°ç»å®çFromClientå¼è½¬æ¢ä¸ºä»¥ä¸JSONï¼
{"Post" :{"group_name" :"Dogs","message" :"Samoyeds rock!"}}
ç¶åï¼æ´¾ççDeserializeå®ç°å°å
¶è§£æåçæçFromClientå¼ã请注æï¼FromClientä¸çArcæé对åºåå形弿²¡æå½±åï¼å¼ç¨è®¡æ°çåç¬¦ä¸²ç´æ¥ä½ä¸ºJSON对象æåå¼åºç°ã
# è·åç¨æ·è¾å ¥ï¼å¼æ¥æµ
è天客æ·ç«¯çé¦è¦ä»»å¡æ¯è¯»åç¨æ·çå½ä»¤ï¼å¹¶å°ç¸åºçæ°æ®å
åéå°æå¡å¨ã管çä¸ä¸ªåéçç¨æ·çé¢è¶
åºäºæ¬ç« çèå´ï¼æä»¥æä»¬å°åæç®åå¯è¡çäºæ
ï¼ç´æ¥ä»æ åè¾å
¥è¯»åè¡ã以ä¸ä»£ç ä½äºsrc/bin/client.rsä¸ï¼
use async_std::prelude::*;
use async_chat::utils::{self, ChatResult};
use async_std::io;
use async_std::net;
async fn send_commands(mut to_server: net::TcpStream) -> ChatResult<()> {
println!("Commands:\n\
join GROUP\n\
post GROUP MESSAGE...\n\
Type Control-D (on Unix) or Control-Z (on Windows)\
to close the connection.");
let mut command_lines =
io::BufReader::new(io::stdin()).lines();
while let Some(command_result) = command_lines.next().await {
let command = command_result?;
// æå
³`parse_command`çå®ä¹ï¼è¯·æ¥çGitHubä»åºã
let request = match parse_command(&command) {
Some(request) => request,
None => continue,
};
utils::send_as_json(&mut to_server, &request).await?;
to_server.flush().await?;
}
Ok(())
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
è¿æ®µä»£ç è°ç¨async_std::io::stdinè·å客æ·ç«¯æ åè¾å
¥ç弿¥å¥æï¼å°å
¶å
è£
å¨async_std::io::BufReaderä¸è¿è¡ç¼å²ï¼ç¶åè°ç¨lineséè¡å¤çç¨æ·è¾å
¥ãå®å°è¯å°æ¯ä¸è¡è§£æä¸ºä¸æä¸ªFromClientå¼å¯¹åºçå½ä»¤ï¼å¦ææåï¼å°±å°è¯¥å¼åéå°æå¡å¨ãå¦æç¨æ·è¾å
¥æ æ³è¯å«çå½ä»¤ï¼parse_command伿å°éè¯¯æ¶æ¯å¹¶è¿åNoneï¼è¿æ ·send_commandså¯ä»¥ç»§ç»å¾ªç¯ãå¦æç¨æ·è¾å
¥æä»¶ç»ææç¤ºç¬¦ï¼é£ä¹linesæµå°è¿åNoneï¼send_commandsä¹ä¼è¿åãè¿ä¸ä½ 卿®é忥ç¨åºä¸ç¼åç代ç é常ç¸ä¼¼ï¼åªæ¯å®ä½¿ç¨äºasync_stdåºä¸ççæ¬ã
弿¥BufReaderçlinesæ¹æ³å¾æè¶£ãå®ä¸è½åæ ååºé£æ ·è¿åä¸ä¸ªè¿ä»£å¨ï¼Iterator::nextæ¹æ³æ¯ä¸ä¸ªæ®éç忥彿°ï¼æä»¥è°ç¨commands.next()ä¼é»å¡çº¿ç¨ï¼ç´å°ä¸ä¸è¡åå¤å¥½ãç¸åï¼linesè¿åä¸ä¸ªResult<String>å¼çæµãæµæ¯è¿ä»£å¨ç弿¥ç±»ä¼¼ç©ï¼å®ä»¥å¼æ¥åå¥½çæ¹å¼æéçæä¸ç³»åå¼ã以䏿¯async_std::stream模åä¸Streamç¹æ§çå®ä¹ï¼
trait Stream {
type Item;
// ç®åï¼å°`Pin<&mut Self>`ç解为`&mut Self`ã
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
2
3
4
5
ä½ å¯ä»¥å°å
¶è§ä¸ºIteratoråFutureç¹æ§çæ··åä½ãä¸è¿ä»£å¨ä¸æ ·ï¼Streamæä¸ä¸ªå
³èçItemç±»åï¼å¹¶ä½¿ç¨Optionæ¥æç¤ºåºå使¶ç»æãä½ä¸æªæ¥å¼ä¸æ ·ï¼æµå¿
须被轮询ï¼ä¸ºäºè·åä¸ä¸ä¸ªé¡¹ï¼æå¾ç¥æµå·²ç»æï¼ï¼ä½ å¿
é¡»è°ç¨poll_nextï¼ç´å°å®è¿åPoll::Readyãæµçpoll_nextå®ç°åºè¯¥å§ç»å¿«éè¿åï¼ä¸é»å¡ãå¹¶ä¸å¦æä¸ä¸ªæµè¿åPoll::Pendingï¼å®å¿
é¡»éè¿Contextéç¥è°ç¨è
使¶å¼å¾å次轮询ã
poll_nextæ¹æ³ç´æ¥ä½¿ç¨èµ·æ¥å¾éº»ç¦ï¼ä½éå¸¸ä½ ä¸éè¦è¿æ ·åãä¸è¿ä»£å¨ä¸æ ·ï¼æµæä¸ç³»åå®ç¨æ¹æ³ï¼å¦filteråmapãå
¶ä¸æä¸ä¸ªnextæ¹æ³ï¼å®è¿åæµçä¸ä¸ä¸ªOption<Self::Item>çæªæ¥å¼ãä½ ä¸å¿
æ¾å¼å°è½®è¯¢æµï¼èæ¯å¯ä»¥è°ç¨nextå¹¶çå¾
å®è¿åçæªæ¥å¼ã
å°è¿äºé¨åç»åå¨ä¸èµ·ï¼send_commandséè¿ä½¿ç¨while let循ç¯éåæµçæç弿¥æ¶è´¹è¾å
¥è¡çæµï¼
while let Some(item) = stream.next().await {
...使ç¨item...
}
2
3
ï¼æªæ¥çæ¬çRustå¯è½ä¼å¼å
¥ä¸ç§å¼æ¥çæ¬çfor循ç¯è¯æ³æ¥æ¶è´¹æµï¼å°±åæ®éforå¾ªç¯æ¶è´¹Iteratorå¼ä¸æ ·ãï¼
卿µç»æåï¼å³å®è¿åPoll::Ready(None)表示æµç»æåï¼è½®è¯¢æµï¼å°±åå¨è¿ä»£å¨è¿åNoneåè°ç¨nextï¼æè
卿ªæ¥å¼è¿åPoll::Readyåè¿è¡è½®è¯¢ä¸æ ·ï¼Streamç¹æ§æ²¡æè§å®æµåºè¯¥æä¹åï¼æäºæµå¯è½ä¼è¡¨ç°å¼å¸¸ã䏿ªæ¥å¼åè¿ä»£å¨ä¸æ ·ï¼æµæä¸ä¸ªfuseæ¹æ³ï¼å¨éè¦æ¶ç¡®ä¿è¿ç§è°ç¨çè¡ä¸ºå¯é¢æµï¼è¯¦ç»ä¿¡æ¯è¯·æ¥çææ¡£ã
å¨ä½¿ç¨æµæ¶ï¼å¡å¿
è®°ä½ä½¿ç¨async_stdçå置导å
¥ï¼
use async_std::prelude::*;
è¿æ¯å 为Streamç¹æ§çå®ç¨æ¹æ³ï¼å¦nextãmapãfilterçï¼å®é
ä¸å¹¶ä¸æ¯å¨Streamæ¬èº«ä¸å®ä¹çãç¸åï¼å®ä»¬æ¯ä¸ä¸ªåç¬çç¹æ§StreamExtçé»è®¤æ¹æ³ï¼StreamExtä¼ä¸ºææStreamèªå¨å®ç°ï¼
pub trait StreamExt : Stream {
...å°å®ç¨æ¹æ³å®ä¹ä¸ºé»è®¤æ¹æ³...
}
impl<T: Stream> StreamExt for T { }
2
3
4
5
è¿æ¯æä»¬å¨ âç¹æ§ä¸ä»äººçç±»åâ ä¸æè¿°çæ©å±ç¹æ§æ¨¡å¼çä¸ä¸ªç¤ºä¾ãasync_std::prelude模åå°StreamExtæ¹æ³å¼å
¥ä½ç¨åï¼æä»¥ä½¿ç¨å置导å
¥å¯ç¡®ä¿å
¶æ¹æ³å¨ä½ ç代ç ä¸å¯è§ã
# åéæ°æ®å
为äºå¨ç½ç»å¥æ¥åä¸ä¼ è¾æ°æ®å
ï¼æä»¬ç客æ·ç«¯åæå¡å¨ä½¿ç¨åºæ¨¡åutilsä¸çsend_as_json彿°ï¼
use async_std::prelude::*;
use serde::Serialize;
use std::marker::Unpin;
pub async fn send_as_json<S, P>(outbound: &mut S, packet: &P) -> ChatResult<()>
where
S: async_std::io::Write + Unpin,
P: Serialize,
{
let mut json = serde_json::to_string(&packet)?;
json.push('\n');
outbound.write_all(json.as_bytes()).await?;
Ok(())
}
2
3
4
5
6
7
8
9
10
11
12
13
14
è¿ä¸ªå½æ°å°packetæå»ºä¸ºStringç±»åçJSON表示ï¼å¨æ«å°¾æ·»å ä¸ä¸ªæ¢è¡ç¬¦ï¼ç¶åå°å
¶å
¨é¨åå
¥outboundã
ä»å®çwhereåå¥ä¸ï¼ä½ å¯ä»¥çåºsend_as_jsonéå¸¸çµæ´»ãè¦åéçæ°æ®å
ç±»åPå¯ä»¥æ¯ä»»ä½å®ç°äºserde::Serializeçç±»åãè¾åºæµSå¯ä»¥æ¯ä»»ä½å®ç°äºasync_std::io::Writeçç±»åï¼async_std::io::Writeæ¯æ ååºä¸ç¨äºè¾åºæµçstd::io::Writeç¹æ§ç弿¥çæ¬ãè¿è¶³ä»¥è®©æä»¬å¨å¼æ¥TcpStreamä¸åéFromClientåFromServerå¼ãä¿æsend_as_jsonçå®ä¹ä¸ºæ³åï¼å¯ç¡®ä¿å®ä¸ä¼ä»¥æå¤çæ¹å¼ä¾èµäºæµææ°æ®å
ç±»åçç»èï¼send_as_jsonåªè½ä½¿ç¨è¿äºç¹æ§ä¸çæ¹æ³ã
Sä¸çUnpinçº¦ææ¯ä½¿ç¨write_allæ¹æ³æå¿
éçãæä»¬å°å¨æ¬ç« åé¢ä»ç»åºå®ï¼pinningï¼åéåºå®ï¼unpinningï¼ï¼ä½ç®åï¼å¨éè¦çå°æ¹ä¸ºç±»ååéæ·»å Unpin约æå°±è¶³å¤äºï¼å¦æä½ å¿è®°äºï¼Rustç¼è¯å¨ä¼æåºè¿äºæ
åµã
send_as_json没æç´æ¥å°æ°æ®å
åºååå°è¾åºæµï¼èæ¯å
å°å
¶åºåå为ä¸ä¸ªä¸´æ¶çStringï¼ç¶ååå°å
¶åå
¥outboundãserde_jsonåºç¡®å®æä¾äºç´æ¥å°å¼åºååå°è¾åºæµç彿°ï¼ä½è¿äºå½æ°åªæ¯æåæ¥æµãè¦åå
¥å¼æ¥æµï¼éè¦å¯¹serde_jsonåserdeåºä¸ä¸æ ¼å¼æ å
³çæ ¸å¿é¨åè¿è¡æ ¹æ¬æ§æ´æ¹ï¼å 为å®ä»¬æå´ç»è®¾è®¡çç¹æ§å
·æåæ¥æ¹æ³ã
䏿µä¸æ ·ï¼async_stdçI/Oç¹æ§çè®¸å¤æ¹æ³å®é
䏿¯å¨æ©å±ç¹æ§ä¸å®ä¹çï¼æä»¥å¨ä½¿ç¨å®ä»¬æ¶ï¼å¡å¿
è®°ä½ä½¿ç¨async_std::prelude::*ã
# æ¥æ¶æ°æ®å ï¼æ´å¤å¼æ¥æµ
ä¸ºäºæ¥æ¶æ°æ®å
ï¼æä»¬çæå¡å¨å客æ·ç«¯å°ä½¿ç¨utils模åä¸çè¿ä¸ªå½æ°ï¼ä»å¼æ¥ç¼å²çTCP奿¥åï¼async_std::io::BufReader<TcpStream>ï¼æ¥æ¶FromClientåFromServerå¼ï¼
use serde::de::DeserializeOwned;
pub fn receive_as_json<S, P>(inbound: S) -> impl Stream<Item = ChatResult<P>>
where
S: async_std::io::BufRead + Unpin,
P: DeserializeOwned,
{
inbound.lines()
.map(|line_result| -> ChatResult<P> {
let line = line_result?;
let parsed = serde_json::from_str::<P>(&line)?;
Ok(parsed)
})
}
2
3
4
5
6
7
8
9
10
11
12
13
14
ä¸send_as_json䏿 ·ï¼è¿ä¸ªå½æ°å¨è¾å
¥æµåæ°æ®å
ç±»å䏿¯æ³åçï¼
- æµç±»å
Så¿ é¡»å®ç°async_std::io::BufReadï¼å®æ¯std::io::BufReadç弿¥ç±»ä¼¼ç©ï¼è¡¨ç¤ºä¸ä¸ªç¼å²çè¾å ¥åèæµã - æ°æ®å
ç±»å
På¿ é¡»å®ç°DeserializeOwnedï¼å®æ¯serdeçDeserializeç¹æ§çæ´ä¸¥æ ¼çæ¬ãä¸ºäºæé«æçï¼Deserializeå¯ä»¥çæç´æ¥ä»ååºååçç¼å²åºåç¨å 容ç&strå&[u8]å¼ï¼ä»¥é¿å å¤å¶æ°æ®ãä½å¨æä»¬çæ åµä¸ï¼è¿å¹¶ä¸å¯è¡ï¼æä»¬éè¦å°ååºåååçå¼è¿åç»è°ç¨è ï¼æä»¥å®ä»¬ççå½å¨æå¿ é¡»é¿äºè§£æå®ä»¬çç¼å²åºãå®ç°DeserializeOwnedçç±»åæ»æ¯ä¸å®ååºååæ¶çç¼å²åºæ å ³ã
è°ç¨inbound.lines()ä¼ç»æä»¬ä¸ä¸ªstd::io::Result<String>å¼çæµãç¶åæä»¬ä½¿ç¨æµçmapéé
å¨å°ä¸ä¸ªéå
åºç¨äºæ¯ä¸ªé¡¹ï¼å¤çéè¯¯å¹¶å°æ¯ä¸è¡è§£æä¸ºç±»åPçå¼çJSONå½¢å¼ãè¿ç»æä»¬ä¸ä¸ªChatResult<P>å¼çæµï¼æä»¬ç´æ¥è¿åå®ãè¯¥å½æ°çè¿åç±»åæ¯ï¼
impl Stream<Item = ChatResult<P>>
è¿è¡¨ææä»¬è¿åæç§ç±»åï¼å®ä¼å¼æ¥çæä¸ç³»åChatResult<P>å¼ï¼ä½è°ç¨è
æ æ³ç¡®åç¥éå
·ä½æ¯åªç§ç±»åãç±äºæä»¬ä¼ éç»mapçéå
æ¬èº«å
·æå¿åç±»åï¼è¿æ¯receive_as_jsonå¯è½è¿åçæå
·ä½çç±»åãæ³¨æï¼receive_as_jsonæ¬èº«ä¸æ¯ä¸ä¸ªå¼æ¥å½æ°ã宿¯ä¸ä¸ªæ®é彿°ï¼è¿åä¸ä¸ªå¼æ¥å¼ï¼å³ä¸ä¸ªæµãæ¯ âå°å¤æ·»å asyncå.awaitâ æ´æ·±å
¥å°çè§£Rust弿¥æ¯æçæºå¶ï¼ä¸ºåè¿æ ·æ¸
æ°ãçµæ´»å髿çå®ä¹å¼è¾äºå¯è½æ§ï¼è¿äºå®ä¹å
åå©ç¨äºè¿é¨è¯è¨çç¹æ§ã
为äºäºè§£receive_as_jsonçä½¿ç¨æ¹å¼ï¼ä¸é¢æ¯æä»¬è天客æ·ç«¯çhandle_replies彿°ï¼å®ä½äºsrc/bin/client.rsä¸ï¼ç¨äºä»ç½ç»æ¥æ¶FromServerå¼çæµï¼å¹¶å°å
¶æå°åºæ¥ä¾ç¨æ·æ¥çï¼
use async_chat::FromServer;
async fn handle_replies(from_server: net::TcpStream) -> ChatResult<()> {
let buffered = io::BufReader::new(from_server);
let mut reply_stream = utils::receive_as_json(buffered);
while let Some(reply) = reply_stream.next().await {
match reply? {
FromServer::Message { group_name, message } => {
println!("message posted to {}: {}", group_name, message);
}
FromServer::Error(message) => {
println!("error from server: {}", message);
}
}
}
Ok(())
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
è¿ä¸ªå½æ°æ¥åä¸ä¸ªä»æå¡å¨æ¥æ¶æ°æ®ç奿¥åï¼ç¨BufReaderï¼æ³¨ææ¯async_stdçæ¬ï¼å°å
¶å
è£
ï¼ç¶åå°å
¶ä¼ éç»receive_as_json以è·åä¼ å
¥çFromServerå¼çæµãç¶åå®ä½¿ç¨while let循ç¯å¤çä¼ å
¥çåå¤ï¼æ£æ¥éè¯¯ç»æï¼å¹¶å°æ¯ä¸ªæå¡å¨å夿å°åºæ¥ä¾ç¨æ·æ¥çã
# 客æ·ç«¯ç䏻彿°
æ¢ç¶æä»¬å·²ç»ä»ç»äºsend_commandsåhandle_repliesï¼ç°å¨å¯ä»¥å±ç¤ºè天客æ·ç«¯ç䏻彿°äºï¼å®ä½äºsrc/bin/client.rsä¸ï¼
use async_std::task;
fn main() -> ChatResult<()> {
let address = std::env::args().nth(1)
.expect("Usage: client ADDRESS:PORT");
task::block_on(async {
let socket = net::TcpStream::connect(address).await?;
socket.set_nodelay(true)?;
let to_server = send_commands(socket.clone());
let from_server = handle_replies(socket);
from_server.race(to_server).await?;
Ok(())
})
}
2
3
4
5
6
7
8
9
10
11
12
13
14
ä»å½ä»¤è¡è·åæå¡å¨å°ååï¼main彿°æä¸ç³»åæ³è¦è°ç¨ç弿¥å½æ°ï¼æä»¥å®å°å½æ°çå
¶ä½é¨åå
è£
å¨ä¸ä¸ªå¼æ¥åä¸ï¼å¹¶å°è¯¥åçæªæ¥å¼ä¼ éç»async_std::task::block_onæ¥è¿è¡ã
䏿¦å»ºç«è¿æ¥ï¼æä»¬å¸æsend_commandsåhandle_replies彿°åæ¶è¿è¡ï¼è¿æ ·å¨æä»¬è¾å
¥æ¶å°±è½çå°å
¶ä»äººçæ¶æ¯å°è¾¾ã妿æä»¬è¾å
¥æä»¶ç»ææç¤ºç¬¦ï¼æè
䏿å¡å¨çè¿æ¥æå¼ï¼ç¨åºåºè¯¥éåºã
æ ¹æ®æä»¬å¨æ¬ç« å ¶ä»å°æ¹çåæ³ï¼ä½ å¯è½ä¼ææçå°è¿æ ·ç代ç ï¼
let to_server = task::spawn(send_commands(socket.clone()));
let from_server = task::spawn(handle_replies(socket));
to_server.await?;
from_server.await?;
2
3
4
ä½ç±äºæä»¬çå¾
äºä¸¤ä¸ªJoinHandleï¼è¿ä¼ä½¿ç¨åºå¨ä¸¤ä¸ªä»»å¡é½å®æåæéåºãèæä»¬å¸æåªè¦å
¶ä¸ä¸ä¸ªä»»å¡å®æå°±éåºãæªæ¥å¼çraceæ¹æ³å¯ä»¥å®ç°è¿ä¸ç¹ãè°ç¨from_server.race(to_server)ä¼è¿åä¸ä¸ªæ°çæªæ¥å¼ï¼å®ä¼è½®è¯¢from_serveråto_serverï¼åªè¦å
¶ä¸ä¸ä¸ªåå¤å¥½ï¼å°±è¿åPoll::Ready(v)ãä¸¤ä¸ªæªæ¥å¼å¿
é¡»å
·æç¸åçè¾åºç±»åï¼æç»å¼æ¯æå
宿çé£ä¸ªæªæ¥å¼çå¼ãæªå®æçæªæ¥å¼å°è¢«ä¸¢å¼ã
raceæ¹æ³ä»¥å许å¤å
¶ä»ä¾¿æ·å®ç¨æ¹æ³ï¼é½å®ä¹å¨async_std::prelude::FutureExtç¹æ§ä¸ï¼async_std::prelude使æä»¬è½å¤ä½¿ç¨è¿äºæ¹æ³ã
æ¤æ¶ï¼å®¢æ·ç«¯ä»£ç ä¸å¯ä¸æä»¬è¿æ²¡æå±ç¤ºçé¨åæ¯parse_command彿°ãè¿æ¯ç¸å½ç®åçææ¬å¤ç代ç ï¼æä»¥æä»¬è¿éä¸å±ç¤ºå®çå®ä¹ã详ç»å
容请æ¥çGitä»åºä¸ç宿´ä»£ç ã
# æå¡å¨ç䏻彿°
以䏿¯æå¡å¨ä¸»æä»¶src/bin/server/main.rsçå
¨é¨å
容ï¼
use async_std::prelude::*;
use async_chat::utils::ChatResult;
use std::sync::Arc;
mod connection;
mod group;
mod group_table;
use connection::serve;
fn main() -> ChatResult<()> {
let address = std::env::args().nth(1).expect("Usage: server ADDRESS");
let chat_group_table =
Arc::new(group_table::GroupTable::new());
async_std::task::block_on(async {
// æ¤ä»£ç 卿¬ç« å¼å¤´å·²å±ç¤ºã
use async_std::{net, task};
let listener = net::TcpListener::bind(address).await?;
let mut new_connections = listener.incoming();
while let Some(socket_result) =
new_connections.next().await {
let socket = socket_result?;
let groups = chat_group_table.clone();
task::spawn(async {
log_error(serve(socket, groups).await);
});
}
Ok(())
})
}
fn log_error(result: ChatResult<()>) {
if let Err(error) = result {
eprintln!("Error: {}", error);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
æå¡å¨ç䏻彿°ä¸å®¢æ·ç«¯ç类似ï¼å®è¿è¡ä¸äºè®¾ç½®ï¼ç¶åè°ç¨block_onæ¥è¿è¡ä¸ä¸ªå¼æ¥åï¼è¯¥åæ§è¡å®é
å·¥ä½ã为äºå¤çæ¥èªå®¢æ·ç«¯çä¼ å
¥è¿æ¥ï¼å®å建ä¸ä¸ªTcpListener奿¥åï¼å
¶incomingæ¹æ³è¿åä¸ä¸ªstd::io::Result<TcpStream>å¼çæµã
å¯¹äºæ¯ä¸ªä¼ å
¥è¿æ¥ï¼æä»¬çæä¸ä¸ªå¼æ¥ä»»å¡æ¥è¿è¡connection::serve彿°ãæ¯ä¸ªä»»å¡è¿ä¼æ¥æ¶ä¸ä¸ªæåGroupTableå¼çå¼ç¨ï¼è¯¥å¼è¡¨ç¤ºæå¡å¨å½åçè天群ç»åè¡¨ï¼ææè¿æ¥éè¿Arcå¼ç¨è®¡æ°æéå
±äº«è¿ä¸ªå表ã
妿connection::serveè¿åä¸ä¸ªéè¯¯ï¼æä»¬å°ä¸æ¡æ¶æ¯è®°å½å°æ åé误è¾åºï¼ç¶å让任å¡éåºãå
¶ä»è¿æ¥ç»§ç»æ£å¸¸è¿è¡ã
# å¤çèå¤©è¿æ¥ï¼å¼æ¥äºæ¥é
ä¸é¢æ¯æå¡å¨çæ ¸å¿å½æ°ï¼src/bin/server/connection.rsä¸connection模åçserve彿°ï¼
use async_chat::{FromClient, FromServer};
use async_chat::utils::{self, ChatResult};
use async_std::prelude::*;
use async_std::io::BufReader;
use async_std::net::TcpStream;
use async_std::sync::Arc;
use crate::group_table::GroupTable;
pub async fn serve(socket: TcpStream, groups: Arc<GroupTable>) -> ChatResult<()> {
let outbound = Arc::new(Outbound::new(socket.clone()));
let buffered = BufReader::new(socket);
let mut from_client = utils::receive_as_json(buffered);
while let Some(request_result) = from_client.next().await {
let request = request_result?;
let result = match request {
FromClient::Join { group_name } => {
let group = groups.get_or_create(group_name);
group.join(outbound.clone());
Ok(())
}
FromClient::Post { group_name, message } => {
match groups.get(&group_name) {
Some(group) => {
group.post(message);
Ok(())
}
None => {
Err(format!("Group '{}' does not exist", group_name))
}
}
}
};
if let Err(message) = result {
let report = FromServer::Error(message);
outbound.send(report).await?;
}
}
Ok(())
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
è¿å 乿¯å®¢æ·ç«¯handle_replies彿°çéåï¼ä»£ç ç主è¦é¨åæ¯ä¸ä¸ªå¾ªç¯ï¼å¤çä»ç¼å²çTCPæµéè¿receive_as_jsonæå»ºçFromClientå¼çè¾å
¥æµã妿åçéè¯¯ï¼æä»¬çæä¸ä¸ªFromServer::Erroræ°æ®å
ï¼å°éè¯¯æ¶æ¯ä¼ è¾¾å客æ·ç«¯ã
é¤äºéè¯¯æ¶æ¯ï¼å®¢æ·ç«¯è¿å¸ææ¥æ¶æ¥èªå
¶å å
¥çè天群ç»çæ¶æ¯ï¼æä»¥ä¸å®¢æ·ç«¯çè¿æ¥éè¦ä¸æ¯ä¸ªç¾¤ç»å
±äº«ãæä»¬å¯ä»¥ç®åå°ç»æ¯ä¸ªç¾¤ç»ä¸ä¸ªTcpStreamçå
éï¼ä½å¦æå
¶ä¸ä¸¤ä¸ªæºè¯å¾åæ¶å奿¥ååå
¥æ°æ®å
ï¼å®ä»¬çè¾åºå¯è½ä¼äº¤éï¼å®¢æ·ç«¯æç»å¯è½ä¼æ¶å°ä¹±ç çJSONãæä»¬éè¦å®æå¯¹è¿æ¥çå®å
¨å¹¶å访é®ã
è¿æ¯éè¿Outboundç±»åæ¥ç®¡ççï¼Outboundå¨src/bin/server/connection.rsä¸å®ä¹å¦ä¸ï¼
use async_std::sync::Mutex;
pub struct Outbound(Mutex<TcpStream>);
impl Outbound {
pub fn new(to_client: TcpStream) -> Outbound {
Outbound(Mutex::new(to_client))
}
pub async fn send(&self, packet: FromServer) -> ChatResult<()> {
let mut guard = self.0.lock().await;
utils::send_as_json(&mut *guard, &packet).await?;
guard.flush().await?;
Ok(())
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
å建Outbound弿¶ï¼å®ä¼è·åä¸ä¸ªTcpStreamçæææï¼å¹¶å°å
¶å
è£
å¨ä¸ä¸ªäºæ¥éï¼Mutexï¼ä¸ï¼ä»¥ç¡®ä¿ä¸æ¬¡åªæä¸ä¸ªä»»å¡å¯ä»¥ä½¿ç¨å®ãserve彿°å°æ¯ä¸ªOutboundå
è£
å¨ä¸ä¸ªArcå¼ç¨è®¡æ°æéä¸ï¼è¿æ ·å®¢æ·ç«¯å å
¥çææç¾¤ç»é½å¯ä»¥æååä¸ä¸ªå
±äº«çOutboundå®ä¾ã
è°ç¨Outbound::sendæ¶ï¼é¦å
ä¼éå®äºæ¥éï¼è¿åä¸ä¸ªå¯è§£å¼ç¨ä¸ºå
é¨TcpStreamç宿¤å¼ï¼guard valueï¼ãæä»¬ä½¿ç¨send_as_jsonä¼ è¾packetï¼æåè°ç¨guard.flush()以确ä¿å®ä¸ä¼å¨æä¸ªç¼å²åºä¸åä¼ è¾ç¶æä¸æ»çãï¼æ®æä»¬æç¥ï¼TcpStreamå®é
ä¸ä¸ä¼ç¼å²æ°æ®ï¼ä½Writeç¹æ§å
许å
¶å®ç°è¿è¡ç¼å²ï¼æä»¥æä»¬ä¸è½åé©ãï¼
表达å¼&mut *guard让æä»¬è§£å³äºRustä¸ä¼ä¸ºæ»¡è¶³ç¹æ§è¾¹çèåºç¨è§£å¼ç¨å¼ºå¶è½¬æ¢çé®é¢ãç¸åï¼æä»¬æ¾å¼å°è§£å¼ç¨äºæ¥éç宿¤å¼ï¼ç¶ååç¨å®æä¿æ¤çTcpStreamçå¯åå¼ç¨ï¼ä»èå¾å°send_as_jsonæéç&mut TcpStreamã
注æï¼Outbound使ç¨çæ¯async_std::sync::Mutexç±»åï¼è䏿¯æ ååºä¸çMutexãè¿æä¸ä¸ªåå ã
é¦å
ï¼å¦æä¸ä¸ªä»»å¡å¨ææäºæ¥é宿¤å¼æ¶è¢«æèµ·ï¼æ ååºä¸çMutexå¯è½ä¼åºç°é®é¢ã妿è¿è¡è¯¥ä»»å¡ççº¿ç¨æ¥æ¶å°å¦ä¸ä¸ªè¯å¾éå®åä¸ä¸ªMutexçä»»å¡ï¼å°±ä¼åºç°é®é¢ï¼ä»Mutexçè§åº¦æ¥çï¼å·²ç»æ¥æå®ççº¿ç¨æ£å¨å°è¯å次éå®å®ãæ åçMutex䏿¯ä¸ºå¤çè¿ç§æ
åµè设计çï¼æä»¥å®ä¼å¯¼è´ç¨åºå´©æºææ»éï¼å®æ°¸è¿ä¸ä¼ä¸æ°å½å°æäºéï¼ãç®åæ£å¨è¿è¡ç¸å
³å·¥ä½ï¼ä»¥ä½¿Rustå¨ç¼è¯æ¶æ£æµå°è¿ä¸ªé®é¢ï¼å¹¶å¨std::sync::Mutex宿¤å¼å¨await表达å¼ä¸å卿¶ååºè¦åãç±äºOutbound::sendå¨çå¾
send_as_jsonåguard.flushçæªæ¥å¼æ¶éè¦ææéï¼æä»¥å®å¿
须使ç¨async_stdçMutexã
å
¶æ¬¡ï¼å¼æ¥Mutexçlockæ¹æ³è¿åä¸ä¸ªå®æ¤å¼çæªæ¥å¼ï¼æä»¥çå¾
éå®äºæ¥éçä»»å¡ä¼è®©åºå
¶çº¿ç¨ï¼ä¾å
¶ä»ä»»å¡ä½¿ç¨ï¼ç´å°äºæ¥éå¯ç¨ï¼å¦æäºæ¥éå·²ç»å¯ç¨ï¼lockçæªæ¥å¼ä¼ç«å³åå¤å¥½ï¼ä»»å¡æ ¹æ¬ä¸ä¼æèµ·èªèº«ï¼ãå¦ä¸æ¹é¢ï¼æ åMutexçlockæ¹æ³å¨çå¾
è·åéæ¶ä¼é»å¡æ´ä¸ªçº¿ç¨ãç±äºåé¢ç代ç å¨éè¿ç½ç»ä¼ è¾æ°æ®å
æ¶ææäºæ¥éï¼è¿å¯è½ä¼è±è´¹ç¸å½é¿çæ¶é´ã
æåï¼æ åMutexå¿
é¡»åªè½ç±éå®å®çåä¸ä¸ªçº¿ç¨è§£éã为äºå¼ºå¶æ§è¡è¿ä¸ç¹ï¼æ åäºæ¥éç宿¤ç±»å没æå®ç°Sendï¼å®ä¸è½è¢«ä¼ è¾å°å
¶ä»çº¿ç¨ãè¿æå³çææè¿æ ·ä¸ä¸ªå®æ¤å¼çæªæ¥å¼æ¬èº«æ²¡æå®ç°Sendï¼ä¸è½ä¼ éç»spawnå¨çº¿ç¨æ± ä¸è¿è¡ï¼å®åªè½ä¸block_onæspawn_localä¸èµ·ä½¿ç¨ãasync_stdçMutexç宿¤å¼å®ç°äºSendï¼æä»¥å¨çæçä»»å¡ä¸ä½¿ç¨å®æ²¡æé®é¢ã
# 群ç»è¡¨ï¼åæ¥äºæ¥é
ä½äºæ
å¹¶éç®åå° âå¨å¼æ¥ä»£ç 䏿»æ¯ä½¿ç¨async_std::sync::Mutexâãé常æ
åµä¸ï¼å¨ææäºæ¥éæ¶æ éçå¾
任使ä½ï¼ä¸ææéçæ¶é´ä¹ä¸é¿ãå¨è¿ç§æ
åµä¸ï¼æ ååºä¸çMutexå¯è½ä¼æ´é«æãæä»¬è天æå¡å¨çGroupTableç±»å就说æäºè¿ç§æ
åµã以䏿¯src/bin/server/group_table.rsç宿´å
容ï¼
use crate::group::Group;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub struct GroupTable(Mutex<HashMap<Arc<String>, Arc<Group>>>);
impl GroupTable {
pub fn new() -> GroupTable {
GroupTable(Mutex::new(HashMap::new()))
}
pub fn get(&self, name: &String) -> Option<Arc<Group>> {
self.0.lock()
.unwrap()
.get(name)
.cloned()
}
pub fn get_or_create(&self, name: Arc<String>) -> Arc<Group> {
self.0.lock()
.unwrap()
.entry(name.clone())
.or_insert_with(|| Arc::new(Group::new(name)))
.clone()
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
GroupTableåªæ¯ä¸ä¸ªç±äºæ¥éä¿æ¤çåå¸è¡¨ï¼å°è天群ç»åç§°æ å°å°å®é
ç群ç»ï¼ä¸¤è
é½ä½¿ç¨å¼ç¨è®¡æ°æéè¿è¡ç®¡çãgetåget_or_createæ¹æ³ä¼éå®äºæ¥éï¼æ§è¡ä¸äºåå¸è¡¨æä½ï¼å¯è½è¿ä¼è¿è¡ä¸äºå
ååé
ï¼ç¶åè¿åã
å¨GroupTableä¸ï¼æä»¬ä½¿ç¨æ®éçstd::sync::Mutexãè¿ä¸ªæ¨¡å䏿 ¹æ¬æ²¡æå¼æ¥ä»£ç ï¼æä»¥ä¹ä¸åå¨éè¦é¿å
çawaitæä½ãå®é
ä¸ï¼å¦ææä»¬å¨è¿é使ç¨async_std::sync::Mutexï¼å°±éè¦å°getåget_or_createåæå¼æ¥å½æ°ï¼è¿ä¼å¼å
¥åå»ºæªæ¥å¼ãæèµ·åæ¢å¤çå¼éï¼ä½å´æ²¡æä»ä¹å®é
好å¤ï¼äºæ¥éåªæ¯å¨ä¸äºå叿ä½åå¯è½çå°éå
ååé
æé´è¢«éå®ã
妿æä»¬çè天æå¡å¨æä¸ç¾ä¸ç¨æ·ï¼å¹¶ä¸GroupTableçäºæ¥éççæä¸ºäºç¶é¢ï¼å°å
¶å¼æ¥åå¹¶ä¸è½è§£å³é®é¢ãä½¿ç¨æç§ä¸é¨ä¸ºå¹¶å访é®è®¾è®¡çéåç±»åï¼è䏿¯HashMapï¼å¯è½ä¼æ´å¥½ãä¾å¦ï¼dashmapåºå°±æä¾äºè¿æ ·çç±»åã
# è天群ç»ï¼tokio ç广æéé
卿们çæå¡å¨ä¸ï¼group::Groupç±»å代表ä¸ä¸ªè天群ç»ãè¿ä¸ªç±»ååªéè¦æ¯æconnection::serveè°ç¨çä¸¤ä¸ªæ¹æ³ï¼joinç¨äºæ·»å æ°æåï¼postç¨äºå叿¶æ¯ãåå¸çæ¯æ¡æ¶æ¯é½éè¦ååç»æææåã
è¿å°±æ¯æä»¬è¦è§£å³å颿å°çèåé®é¢çå°æ¹ãè¿éæå 个ç¸äºçç¾çéæ±ï¼
- 妿ä¸ä¸ªæåè·ä¸ä¸ç¾¤ç»ä¸åå¸çæ¶æ¯ï¼æ¯å¦ç½ç»è¿æ¥ç¼æ ¢ï¼ï¼ç¾¤ç»ä¸çå ¶ä»æåä¸åºåå°å½±åã
- å³ä½¿ä¸ä¸ªæåè½åäºï¼ä¹åºè¯¥æåæ³è®©ä»ä»¬éæ°å å ¥å¯¹è¯å¹¶ç»§ç»ä»¥æç§æ¹å¼åä¸ã
- ç¨äºç¼å²æ¶æ¯çå åä¸åºæ éå¶å°å¢é¿ã
ç±äºå¨å®ç°å¤å¯¹å¤éä¿¡æ¨¡å¼æ¶è¿äºææå¾å¸¸è§ï¼tokioåºæä¾äºä¸ç§å¹¿æééç±»åï¼å®å®ç°äºä¸ç»åççæè¡¡ãtokio广æé鿝ä¸ä¸ªå¼ï¼å¨æä»¬çä¾å䏿¯èå¤©æ¶æ¯ï¼çéåï¼å®å
è®¸ä»»ææ°éçä¸åçº¿ç¨æä»»å¡åé忥æ¶å¼ãå®è¢«ç§°ä¸º â广æâ ééï¼æ¯å 为æ¯ä¸ªæ¶è´¹è
é½ä¼å¾å°åéçæ¯ä¸ªå¼çèªå·±ç坿¬ï¼å¼ç±»åå¿
é¡»å®ç°Cloneï¼ã
é常æ åµä¸ï¼å¹¿æééä¼å¨éåä¸ä¿ç䏿¡æ¶æ¯ï¼ç´å°æ¯ä¸ªæ¶è´¹è é½è·åå°èªå·±ç坿¬ã使¯ï¼å¦æéåçé¿åº¦è¶ è¿äºåå»ºæ¶æå®çæå¤§å®¹éï¼ææ§çæ¶æ¯å°±ä¼è¢«ä¸¢å¼ãä»»ä½è·ä¸ä¸çæ¶è´¹è å¨ä¸æ¬¡å°è¯è·åä¸ä¸æ¡æ¶æ¯æ¶é½ä¼æ¶å°ä¸ä¸ªé误ï¼å¹¶ä¸ééä¼è®©ä»ä»¬è·ä¸ä»ç¶å¯ç¨çææ§æ¶æ¯ã
ä¾å¦ï¼å¾20 - 4å±ç¤ºäºä¸ä¸ªæå¤§å®¹é为16个å¼ç广æééã
å¾20 - 4 ä¸ä¸ªtokio广æéé
æä¸¤ä¸ªåéè å¨éå䏿·»å æ¶æ¯ï¼åä¸ªæ¥æ¶è å¨éåä¸ååºæ¶æ¯ ââ æè æ´åç¡®å°è¯´ï¼æ¯ä»éåä¸å¤å¶æ¶æ¯ãæ¥æ¶è Bè¿æ14æ¡æ¶æ¯è¦æ¥æ¶ï¼æ¥æ¶è Cæ7æ¡ï¼æ¥æ¶è Då·²ç»å®å ¨è·ä¸ãæ¥æ¶è Aè½åäºï¼å¨å®çå°ä¹åæ11æ¡æ¶æ¯è¢«ä¸¢å¼äºãå®ä¸æ¬¡å°è¯æ¥æ¶æ¶æ¯æ¶ä¼å¤±è´¥ï¼è¿åä¸ä¸ªæç¤ºè¿ç§æ åµçé误ï¼å¹¶ä¸å®ä¼è¢«æ´æ°å°å½åéåçæ«å°¾ã
æä»¬çè天æå¡å¨å°æ¯ä¸ªè天群ç»è¡¨ç¤ºä¸ºä¸ä¸ªæºå¸¦Arc<String>å¼ç广æééï¼å群ç»å叿¶æ¯ä¼å°å
¶å¹¿æç»ææå½åæåã以䏿¯group::Groupç±»åçå®ä¹ï¼å®å¨src/bin/server/group.rsä¸ï¼
use async_std::task;
use crate::connection::Outbound;
use std::sync::Arc;
use tokio::sync::broadcast;
pub struct Group {
name: Arc<String>,
sender: broadcast::Sender<Arc<String>>
}
impl Group {
pub fn new(name: Arc<String>) -> Group {
let (sender, _receiver) = broadcast::channel(1000);
Group { name, sender }
}
pub fn join(&self, outbound: Arc<Outbound>) {
let receiver = self.sender.subscribe();
task::spawn(handle_subscriber(self.name.clone(), receiver, outbound));
}
pub fn post(&self, message: Arc<String>) {
// åªæå¨æ²¡æè®¢é
è
æ¶ï¼è¿ä¸ªæä½æä¼è¿åé误ãè¿æ¥çè¾åºç«¯å¯è½ä¼å¨è¾å
¥ç«¯ä¹åéåºï¼ä¸¢å¼å
¶è®¢é
ï¼
// è¿å¯è½ä¼å¯¼è´è¾å
¥ç«¯è¯å¾å空群ç»åéæ¶æ¯ã
let _ignored = self.sender.send(message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Groupç»æä½å
å«è天群ç»çåç§°ï¼ä»¥åä¸ä¸ªbroadcast::Senderï¼å®ä»£è¡¨ç¾¤ç»å¹¿æééçåé端ãGroup::newæ¹æ³è°ç¨broadcast::channelå建ä¸ä¸ªæå¤§å®¹é为1000æ¡æ¶æ¯ç广æééãchannel彿°ä¼è¿åä¸ä¸ªåéè
åä¸ä¸ªæ¥æ¶è
ï¼ä½æ¤æ¶æä»¬ä¸éè¦æ¥æ¶è
ï¼å 为群ç»è¿æ²¡æä»»ä½æåã
为äºå群ç»ä¸æ·»å æ°æåï¼Group::joinæ¹æ³ä¼è°ç¨åéè
çsubscribeæ¹æ³ä¸ºééå建ä¸ä¸ªæ°çæ¥æ¶è
ãç¶åå®ä¼çæä¸ä¸ªæ°ç弿¥ä»»å¡ï¼å¨handle_subscribe彿°ä¸çæ§è¯¥æ¥æ¶è
以è·åæ¶æ¯å¹¶å°å
¶åå客æ·ç«¯ã
æäºè¿äºç»èï¼Group::postæ¹æ³å°±å¾ç®åäºï¼å®åªæ¯å°æ¶æ¯åéå°å¹¿æééãç±äºééæºå¸¦ç弿¯Arc<String>å¼ï¼ç»æ¯ä¸ªæ¥æ¶è
ä¸ä¸ªæ¶æ¯å¯æ¬åªä¼å¢å æ¶æ¯çå¼ç¨è®¡æ°ï¼èæ éä»»ä½å¤å¶æå å
ååé
ã䏿¦ææè®¢é
è
é½ä¼ è¾äºè¯¥æ¶æ¯ï¼å¼ç¨è®¡æ°å°±ä¼é为é¶ï¼æ¶æ¯å°±ä¼è¢«éæ¾ã以䏿¯handle_subscriberçå®ä¹ï¼
use async_chat::FromServer;
use tokio::sync::broadcast::error::RecvError;
async fn handle_subscriber(group_name: Arc<String>, mut receiver: broadcast::Receiver<Arc<String>>, outbound: Arc<Outbound>) {
loop {
let packet = match receiver.recv().await {
Ok(message) => FromServer::Message {
group_name: group_name.clone(),
message: message.clone(),
},
Err(RecvError::Lagged(n)) => FromServer::Error(
format!("Dropped {} messages from {}.", n, group_name)
),
Err(RecvError::Closed) => break,
};
if outbound.send(packet).await.is_err() {
break;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
è½ç¶ç»èä¸åï¼ä½è¿ä¸ªå½æ°çå½¢å¼å¾å¸¸è§ï¼å®æ¯ä¸ä¸ªå¾ªç¯ï¼ä»å¹¿æééæ¥æ¶æ¶æ¯ï¼å¹¶éè¿å
±äº«çOutboundå¼å°å
¶ä¼ è¾å客æ·ç«¯ãå¦æå¾ªç¯è·ä¸ä¸å¹¿æééçé度ï¼å®ä¼æ¶å°ä¸ä¸ªLaggedé误ï¼å¹¶å¦å®å客æ·ç«¯æ¥åã
妿尿°æ®å
åéå客æ·ç«¯å®å
¨å¤±è´¥ï¼å¯è½æ¯å ä¸ºè¿æ¥å·²ç»å
³éï¼handle_subscriberä¼éåºå¾ªç¯å¹¶è¿åï¼å¯¼è´å¼æ¥ä»»å¡éåºãè¿ä¼ä¸¢å¼å¹¿æééçæ¥æ¶è
ï¼ä½¿å
¶åæ¶å¯¹ééç订é
ã
è¿æ ·ï¼å½ä¸ä¸ªè¿æ¥æå¼æ¶ï¼ä¸æ¬¡ç¾¤ç»å°è¯åå ¶åéæ¶æ¯æ¶ï¼å®çæ¯ä¸ªç»æå身份é½ä¼è¢«æ¸ çã
æä»¬çèå¤©ç¾¤ç»æ°¸è¿ä¸ä¼å
³éï¼å 为æä»¬ä»ä¸ä»ç¾¤ç»è¡¨ä¸å é¤ç¾¤ç»ï¼ä½ä¸ºäºå®æ´èµ·è§ï¼handle_subscriberå·²ç»åå¤å¥½éè¿éåºä»»å¡æ¥å¤çClosedé误ã
请注æï¼æä»¬ä¸ºæ¯ä¸ªå®¢æ·ç«¯çæ¯ä¸ªç»æå身份é½å建äºä¸ä¸ªæ°ç弿¥ä»»å¡ãè¿æ¯å¯è¡çï¼å ä¸ºå¼æ¥ä»»å¡æ¯çº¿ç¨ä½¿ç¨çå åå°å¾å¤ï¼å¹¶ä¸å¨ä¸ä¸ªè¿ç¨ä¸ä»ä¸ä¸ªå¼æ¥ä»»å¡åæ¢å°å¦ä¸ä¸ªå¼æ¥ä»»å¡çæçé常é«ã
è¿å°±æ¯è天æå¡å¨ç宿´ä»£ç ã宿ç¹ç®éï¼èä¸async_stdãtokioåfuturesåºä¸è¿æè®¸å¤æä»·å¼çåè½ï¼æä»¬å¨æ¬ä¹¦ä¸æ æ³å
¨é¨æ¶µçï¼ä½å¸æè¿ä¸ªæ©å±ç¤ºä¾è½å¤è¯´æå¼æ¥çæç³»ç»çä¸äºåè½æ¯å¦ä½ååå·¥ä½çï¼å¼æ¥ä»»å¡ãæµã弿¥I/Oç¹æ§ãéé以å两ç§ç±»åçäºæ¥éã
# åçFutureåæ§è¡å¨ï¼ä½æ¶å¼å¾å次轮询Futureï¼
è天æå¡å¨å±ç¤ºäºæä»¬å¦ä½ä½¿ç¨è¯¸å¦TcpListenerå广æééä¹ç±»ç弿¥åè¯ç¼å代ç ï¼ä»¥åå¦ä½ä½¿ç¨block_onåspawnçæ§è¡å¨æ¥é©±å¨å®ä»¬çæ§è¡ãç°å¨æä»¬å¯ä»¥æ¥ççè¿äºæ¯å¦ä½å®ç°çãå
³é®é®é¢å¨äºï¼å½ä¸ä¸ªFutureè¿åPoll::Pendingæ¶ï¼å®å¦ä½ä¸æ§è¡å¨åè°ï¼ä»¥ä¾¿å¨æ£ç¡®çæ¶é´å次对å
¶è¿è¡è½®è¯¢ï¼
æ³æ³å½æä»¬è¿è¡è天客æ·ç«¯ä¸»å½æ°ä¸çå¦ä¸ä»£ç æ¶ä¼åçä»ä¹ï¼
task::block_on(async {
let socket = net::TcpStream::connect(address).await?;
...
})
2
3
4
block_onç¬¬ä¸æ¬¡è½®è¯¢è¿ä¸ªå¼æ¥åçFutureæ¶ï¼ç½ç»è¿æ¥å ä¹è¯å®ä¸ä¼ç«å³å°±ç»ªï¼æä»¥block_onä¼è¿å
¥ç¡ç ç¶æãä½å®åºè¯¥å¨ä»ä¹æ¶åå¤éå¢ï¼ä¸ç¥æçï¼ä¸æ¦ç½ç»è¿æ¥å°±ç»ªï¼TcpStreaméè¦åè¯block_onï¼å®åºè¯¥å次å°è¯è½®è¯¢è¿ä¸ªå¼æ¥åçFutureï¼å 为å®ç¥éè¿æ¬¡awaitå°ä¼å®æï¼å¼æ¥åçæ§è¡ä¹è½å¤ç»§ç»æ¨è¿ã
å½åblock_onè¿æ ·çæ§è¡å¨è½®è¯¢ä¸ä¸ªFutureæ¶ï¼å®å¿
é¡»ä¼ å
¥ä¸ä¸ªå为å¤éå¨ï¼wakerï¼çåè°å½æ°ã妿Futureå°æªåå¤å¥½ï¼æ ¹æ®Futureç¹æ§çè§åï¼å®ç®åå¿
é¡»è¿åPoll::Pendingï¼å¹¶å®æå¨è¯¥Futureå¼å¾å次轮询æ¶è°ç¨è¿ä¸ªå¤éå¨ã
æä»¥ï¼æå¨å®ç°çFutureé常å¦ä¸æç¤ºï¼
use std::task::Waker;
struct MyPrimitiveFuture {
...
waker: Option<Waker>,
}
impl Future for MyPrimitiveFuture {
type Output =...;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<...> {
...
if... Futureå·²åå¤å¥½... {
return Poll::Ready(final_value);
}
// ä¿åå¤éå¨ä»¥ä¾åç»ä½¿ç¨ã
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
æ¢å¥è¯è¯´ï¼å¦æFutureçå¼å·²åå¤å¥½ï¼å°±è¿åå®ãå¦åï¼å°Contextçå¤éå¨çå
éä¿åèµ·æ¥ï¼ç¶åè¿åPoll::Pendingã
å½Futureå¼å¾å次轮询æ¶ï¼å®å¿ é¡»éè¿è°ç¨å¤é卿¥éç¥ä¸ä¸æ¬¡è½®è¯¢å®çæ§è¡å¨ï¼
// 妿æä»¬æä¸ä¸ªå¤éå¨ï¼è°ç¨å®ï¼å¹¶æ¸
空`self.waker`ã
if let Some(waker) = self.waker.take() {
waker.wake();
}
2
3
4
çæ³æ åµä¸ï¼æ§è¡å¨åFutureä¼è½®æµè¿è¡è½®è¯¢åå¤éæä½ï¼æ§è¡å¨è½®è¯¢Futureï¼ç¶åè¿å ¥ç¡ç ç¶æï¼æ¥çFutureè°ç¨å¤éå¨ï¼è¿æ ·æ§è¡å¨å°±ä¼éæ¥å¹¶å次轮询Futureã
弿¥å½æ°å弿¥åçFutureæ¬èº«å¹¶ä¸å¤çå¤éå¨ãå®ä»¬åªæ¯å°æ¥æ¶å°çContextä¼ éç»å®ä»¬æ£å¨çå¾
çåFutureï¼å°ä¿ååè°ç¨å¤éå¨çä»»å¡å§æç»åFutureã卿们çè天客æ·ç«¯ä¸ï¼å¯¹å¼æ¥åçFutureçç¬¬ä¸æ¬¡è½®è¯¢ï¼å¨çå¾
TcpStream::connectçFutureæ¶åªæ¯å°Contextä¼ éä¸å»ãåç»çè½®è¯¢åæ ·ä¼å°å®ä»¬çContextä¼ éç»è¯¥åæ¥ä¸æ¥çå¾
çä»»ä½Futureã
TcpStream::connectçFutureç轮询å¤çæ¹å¼å¦åé¢çç¤ºä¾æç¤ºï¼å®å°å¤éå¨äº¤ç»ä¸ä¸ªè¾
å©çº¿ç¨ï¼è¯¥çº¿ç¨çå¾
è¿æ¥å°±ç»ªï¼ç¶åè°ç¨å¤éå¨ã
Wakerå®ç°äºCloneåSendï¼æä»¥ä¸ä¸ªFutureæ»æ¯å¯ä»¥å建èªå·±çå¤éå¨å¯æ¬ï¼å¹¶æ ¹æ®éè¦å°å
¶åéå°å
¶ä»çº¿ç¨ãWaker::wakeæ¹æ³ä¼æ¶èå¤éå¨ãè¿æä¸ä¸ªwake_by_refæ¹æ³ä¸ä¼æ¶èå¤éå¨ï¼ä½æäºæ§è¡å¨å¯ä»¥æ´é«æå°å®ç°æ¶èçæ¬ï¼ä¸¤è
çåºå«æå¤åªæ¯ä¸æ¬¡å
éæä½ï¼ã
æ§è¡å¨è¿åº¦è½®è¯¢ä¸ä¸ªFutureå¹¶æ å±å®³ï¼åªæ¯æçè¾ä½ãç¶èï¼Futureåºè¯¥æ³¨æä» å¨è½®è¯¢è½å¤çæ£åå¾è¿å±æ¶æè°ç¨å¤éå¨ï¼èåçå¤éå轮询循ç¯å¯è½ä¼å¯¼è´æ§è¡å¨æ ¹æ¬æ æ³è¿å ¥ç¡ç ç¶æï¼è¿ä¼æµªè´¹èµæºï¼å¹¶ä¸ä½¿å¤çå¨å¯¹å ¶ä»ä»»å¡çååºæ§éä½ã
æ¢ç¶æä»¬å·²ç»å±ç¤ºäºæ§è¡å¨ååçFutureå¦ä½éä¿¡ï¼æ¥ä¸æ¥æä»¬å°èªå·±å®ç°ä¸ä¸ªåçFutureï¼ç¶åéæ¥è®²è§£block_onæ§è¡å¨çå®ç°ã
# è°ç¨å¤éå¨ï¼spawn_blocking
卿¬ç« åé¢ï¼æä»¬ä»ç»äºspawn_blocking彿°ï¼å®ä¼å¨å¦ä¸ä¸ªçº¿ç¨ä¸å¯å¨ç»å®çéå
ï¼å¹¶è¿åå
¶è¿åå¼çFutureãç°å¨æä»¬å·²ç»å
·å¤äºèªå·±å®ç°spawn_blockingæéçææè¦ç´ ã为ç®åèµ·è§ï¼æä»¬ççæ¬ä¸ºæ¯ä¸ªéå
å建ä¸ä¸ªæ°çº¿ç¨ï¼èä¸åasync_stdççæ¬é£æ ·ä½¿ç¨çº¿ç¨æ± ã
è½ç¶spawn_blockingè¿åä¸ä¸ªFutureï¼ä½æä»¬ä¸ä¼å°å
¶åæasync fnãç¸åï¼å®å°æ¯ä¸ä¸ªæ®éç忥彿°ï¼è¿åä¸ä¸ªç»æä½SpawnBlockingï¼æä»¬å°ä¸ºå
¶å®ç°Futureç¹æ§ã
æä»¬çspawn_blockingçç¾åå¦ä¸ï¼
pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
2
3
4
5
ç±äºæä»¬éè¦å°éå
åéå°å¦ä¸ä¸ªçº¿ç¨å¹¶å°è¿åå¼å¸¦åï¼æä»¥éå
Fåå
¶è¿åå¼Té½å¿
é¡»å®ç°Sendãå¹¶ä¸ç±äºæä»¬ä¸ç¥é线ç¨ä¼è¿è¡å¤é¿æ¶é´ï¼å®ä»¬ççå½å¨æä¹é½å¿
é¡»æ¯'staticãè¿äºçº¦æä¸std::thread::spawnæ¬èº«ææ½å ç约æç¸åã
SpawnBlocking<T>æ¯éå
è¿åå¼çFutureãå®çå®ä¹å¦ä¸ï¼
use std::sync::{Arc, Mutex};
use std::task::Waker;
pub struct SpawnBlocking<T>(Arc<Mutex<Shared<T>>>);
struct Shared<T> {
value: Option<T>,
waker: Option<Waker>,
}
2
3
4
5
6
7
8
9
Sharedç»æä½å¿
é¡»å
å½Futureåè¿è¡éå
ç线ç¨ä¹é´çä¸ä¸ªä¼åç¹ï¼æä»¥å®ç±Arcæ¥æï¼å¹¶ç±Mutexä¿æ¤ï¼è¿é使ç¨åæ¥äºæ¥éå°±å¯ä»¥ï¼ã轮询Futureæ¶ä¼æ£æ¥valueæ¯å¦åå¨ï¼å¦æä¸åå¨åå°å¤éå¨ä¿åå¨wakerä¸ãè¿è¡éå
ç线ç¨ä¼å°å
¶è¿åå¼ä¿åå¨valueä¸ï¼ç¶å妿wakeråå¨åè°ç¨å®ã
以䏿¯spawn_blockingç宿´å®ä¹ï¼
pub fn spawn_blocking<T, F>(closure: F) -> SpawnBlocking<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
let inner = Arc::new(Mutex::new(Shared {
value: None,
waker: None,
}));
std::thread::spawn({
let inner = inner.clone();
move || {
let value = closure();
let maybe_waker = {
let mut guard = inner.lock().unwrap();
guard.value = Some(value);
guard.waker.take()
};
if let Some(waker) = maybe_waker {
waker.wake();
}
}
});
SpawnBlocking(inner)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
å建Sharedå¼åï¼è¿æ®µä»£ç ä¼çæä¸ä¸ªçº¿ç¨æ¥è¿è¡éå
ï¼å°ç»æåå¨å¨Sharedçvalueåæ®µä¸ï¼å¹¶è°ç¨å¤éå¨ï¼å¦ææçè¯ï¼ã
æä»¬å¯ä»¥ä¸ºSpawnBlockingå®ç°Futureç¹æ§å¦ä¸ï¼
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl<T: Send> Future for SpawnBlocking<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let mut guard = self.0.lock().unwrap();
if let Some(value) = guard.value.take() {
return Poll::Ready(value);
}
guard.waker = Some(cx.waker().clone());
Poll::Pending
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
轮询SpawnBlockingæ¶ä¼æ£æ¥éå
ç弿¯å¦å·²åå¤å¥½ï¼å¦ææ¯åè·åæææå¹¶è¿åå®ãå¦åï¼Futureä»å¤äºæèµ·ç¶æï¼æä»¥å®ä¼å°Contextçå¤éå¨çå
éä¿åå¨Futureçwakeråæ®µä¸ã
䏿¦ä¸ä¸ªFutureè¿åäºPoll::Readyï¼å°±ä¸åºè¯¥å对å
¶è¿è¡è½®è¯¢ãé常使ç¨Futureçæ¹å¼ï¼å¦awaitåblock_onï¼é½éµå¾ªè¿ä¸ªè§åã妿坹SpawnBlockingçFutureè¿åº¦è½®è¯¢ï¼ä¸ä¼åçç¹å«ç³ç³çäºæ
ï¼ä½å®ä¹ä¸ä¼ç¹æå»å¤çè¿ç§æ
åµãè¿æ¯æå¨å®ç°çFutureçå
¸åæ
åµã
# å®ç°block_on
é¤äºè½å¤å®ç°åºæ¬çæªæ¥å¼ï¼æä»¬è¿å
·å¤æå»ºä¸ä¸ªç®åæ§è¡å¨æéçææè¦ç´ ã卿¬èä¸ï¼æä»¬å°ç¼åèªå·±çæ¬çblock_onãå®ä¼æ¯async_stdä¸ççæ¬ç®åå¾å¤ï¼ä¾å¦ï¼å®ä¸æ¯æspawn_localãä»»å¡å±é¨åéï¼ä¹ä¸æ¯æåµå¥è°ç¨ï¼å¨å¼æ¥ä»£ç ä¸è°ç¨block_onï¼ãä½å®è¶³ä»¥è¿è¡æä»¬çè天客æ·ç«¯åæå¡å¨ã
代ç å¦ä¸ï¼
use waker_fn::waker_fn; // Cargo.toml: waker-fn = "1.1"
use futures_lite::pin; // Cargo.toml: futures-lite = "1.11"
use crossbeam::sync::Parker; // Cargo.toml: crossbeam = "0.8"
use std::future::Future;
use std::task::{Context, Poll};
fn block_on<F: Future>(future: F) -> F::Output {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
let mut context = Context::from_waker(&waker);
pin!(future);
loop {
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
è¿æ®µä»£ç ç¸å½ç®çï¼ä½å ¶ä¸æå¾å¤å å®¹ï¼æä»¥è®©æä»¬éé¨ååæã
let parker = Parker::new();
let unparker = parker.unparker().clone();
2
crossbeamåºçParkerç±»åæ¯ä¸ä¸ªç®åçé»å¡åè¯ï¼è°ç¨parker.park()ä¼é»å¡çº¿ç¨ï¼ç´å°å
¶ä»äººå¯¹éè¿parker.unparker()é¢å
è·åçç¸åºUnparkerè°ç¨.unpark()ãå¦æä½ å¨ä¸ä¸ªå°æªé»å¡ç线ç¨ä¸è°ç¨unparkï¼é£ä¹å®ä¸æ¬¡è°ç¨parkæ¶ä¼ç«å³è¿åï¼ä¸ä¼é»å¡ãæä»¬çblock_onä¼å¨æªæ¥å¼æªåå¤å¥½æ¶ä½¿ç¨Parkerè¿è¡çå¾
ï¼èæä»¬ä¼ éç»æªæ¥å¼çå¤éå¨ä¼è°ç¨unparkæ¥å¤é线ç¨ã
let waker = waker_fn(move || unparker.unpark());
waker_fn彿°ï¼æ¥èªåååºï¼æ ¹æ®ç»å®çéå
å建ä¸ä¸ªWakerãå¨è¿éï¼æä»¬å建äºä¸ä¸ªWakerï¼å½å®è¢«è°ç¨æ¶ï¼ä¼è°ç¨éå
move || unparker.unpark()ãä½ ä¹å¯ä»¥ä»
ä½¿ç¨æ ååºæ¥å建å¤éå¨ï¼ä½waker_fnæ´æ¹ä¾¿ä¸äºã
pin!(future);
ç»å®ä¸ä¸ªææç±»å为Fçæªæ¥å¼çåéï¼pin!å®ä¼è·åè¯¥æªæ¥å¼çæææï¼å¹¶å£°æä¸ä¸ªååçæ°åéï¼å
¶ç±»å为Pin<&mut F>ï¼å¹¶åç¨è¯¥æªæ¥å¼ãè¿ä¸ºæä»¬æä¾äºpollæ¹æ³æéçPin<&mut Self>ãåºäºä¸ä¸èå°è§£éçåå ï¼å¼æ¥å½æ°å弿¥åçæªæ¥å¼å¨è¢«è½®è¯¢ä¹åï¼å¿
é¡»éè¿Pinè¿è¡å¼ç¨ã
loop {
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
}
2
3
4
5
6
æåï¼è½®è¯¢å¾ªç¯é常ç®åãæä»¬å¸¦çæä»¬çå¤éå¨ä¼ éä¸ä¸ªä¸ä¸æï¼è½®è¯¢æªæ¥å¼ï¼ç´å°å®è¿åPoll::Readyã妿å®è¿åPoll::Pendingï¼æä»¬é»å¡çº¿ç¨ï¼ç´å°å¤éå¨è¢«è°ç¨ãç¶åæä»¬å次å°è¯ã
as_mutè°ç¨è®©æä»¬å¨ä¸æ¾å¼æææçæ
åµä¸è½®è¯¢æªæ¥å¼ï¼æä»¬å°å¨ä¸ä¸èä¸å¯¹æ¤è¿è¡æ´è¯¦ç»çè§£éã
# Pinning
è½ç¶å¼æ¥å½æ°å弿¥å对äºç¼åæ¸
æ°ç弿¥ä»£ç è³å
³éè¦ï¼ä½å¤çå®ä»¬çæªæ¥å¼éè¦æ ¼å¤å°å¿ãPinç±»å帮å©Rustç¡®ä¿å®ä»¬è¢«å®å
¨ä½¿ç¨ã
卿¬èä¸ï¼æä»¬å°è¯´æä¸ºä»ä¹å¼æ¥å½æ°è°ç¨å弿¥åçæªæ¥å¼ä¸è½åæ®éRustå¼é£æ ·éæå¤çãç¶åæä»¬å°å±ç¤ºPinå¦ä½ä½ä¸ºä¸ç§âå®å
¨ä½¿ç¨æ å¿âï¼ç¨äºé£äºå¯ä»¥å®å
¨ç®¡çæ¤ç±»æªæ¥å¼çæéãæåï¼æä»¬å°å±ç¤ºä¸äºä½¿ç¨Pinå¼çæ¹æ³ã
# æªæ¥å¼ç两个çå½å¨æé¶æ®µ
èèè¿ä¸ªç®åç弿¥å½æ°ï¼
use async_std::io::prelude::*;
use async_std::{io, net};
async fn fetch_string(address: &str) -> io::Result<String> {
â¶
let mut socket = net::TcpStream::connect(address).awaitâ·?;
let mut buf = String::new();
socket.read_to_string(&mut buf).await�;
Ok(buf)
}
2
3
4
5
6
7
8
9
10
è¿ä¸ªå½æ°æå¼ä¸ä¸ªå°ç»å®å°åçTCPè¿æ¥ï¼å¹¶ä»¥Stringç±»åè¿åæå¡å¨åéçä»»ä½å
å®¹ãæ è®°ä¸ºâ¶ãâ·åâ¸çç¹æ¯æ¢å¤ç¹ï¼å³å¼æ¥å½æ°ä»£ç 䏿§è¡å¯è½è¢«æåçä½ç½®ã
åè®¾ä½ åè¿æ ·è°ç¨å®ï¼ä½ä¸ä½¿ç¨awaitï¼
let response = fetch_string("localhost:6502");
ç°å¨responseæ¯ä¸ä¸ªåå¤å¨fetch_stringå¼å¤´å¼å§æ§è¡çæªæ¥å¼ï¼å¹¶å¸¦æç»å®çåæ°ãå¨å
åä¸ï¼è¿ä¸ªæªæ¥å¼å¤§è´å¦å¾20 - 5æç¤ºã
å¾20 - 5 为è°ç¨fetch_stringæå»ºçæªæ¥å¼
ç±äºæä»¬ååå建è¿ä¸ªæªæ¥å¼ï¼å®è¡¨ç¤ºæ§è¡åºè¯¥ä»æ¢å¤ç¹â¶å¼å§ï¼å³å½æ°ä½çé¡¶é¨ãå¨è¿ç§ç¶æä¸ï¼æªæ¥å¼ç»§ç»æ§è¡æéçå¯ä¸å¼æ¯å½æ°åæ°ã
ç°å¨åè®¾ä½ å¯¹responseè¿è¡äºå 次轮询ï¼å®å°è¾¾äºå½æ°ä½ä¸çè¿ä¸ªç¹ï¼
socket.read_to_string(&mut buf).await�;
è¿ä¸æ¥å设read_to_stringçç»æè¿æªåå¤å¥½ï¼æä»¥è½®è¯¢è¿åPoll::Pendingãæ¤æ¶ï¼è¿ä¸ªæªæ¥å¼å¦å¾20 - 6æç¤ºã
ä¸ä¸ªæªæ¥å¼å¿ é¡»å§ç»ä¿å䏿¬¡è½®è¯¢æ¶æ¢å¤æ§è¡æéçææä¿¡æ¯ãå¨è¿ç§æ åµä¸ï¼è¿äºä¿¡æ¯å æ¬ï¼
æ¢å¤ç¹â¸ï¼è¡¨ç¤ºæ§è¡åºè¯¥å¨çå¾ è½®è¯¢
read_to_stringçæªæ¥å¼æ¶æ¢å¤ãå¨è¯¥æ¢å¤ç¹åå¨çåéï¼
socketåbufãaddressçå¼ä¸ååå¨äºæªæ¥å¼ä¸ï¼å ä¸ºå½æ°ä¸åéè¦å®ãread_to_stringåæªæ¥å¼ï¼awaitè¡¨è¾¾å¼æ£å¨è½®è¯¢è¿ä¸ªåæªæ¥å¼ã
å¾20 - 6 åä¸ä¸ªæªæ¥å¼ï¼å¨çå¾
read_to_stringçè¿ç¨ä¸
注æï¼å¯¹read_to_stringçè°ç¨åç¨äºå¯¹socketåbufçå¼ç¨ãå¨åæ¥å½æ°ä¸ï¼ææå±é¨åéé½åå¨äºæ ä¸ï¼ä½å¨å¼æ¥å½æ°ä¸ï¼å¨awaitè¿ç¨ä¸ä»ç¶åå¨çå±é¨åéå¿
é¡»åå¨å¨æªæ¥å¼ä¸ï¼è¿æ ·å¨å次轮询æ¶å®ä»¬æå¯ç¨ãåç¨è¿æ ·ä¸ä¸ªåéçå¼ç¨ï¼å°±ç¸å½äºåç¨äºæªæ¥å¼çä¸é¨åã
ç¶èï¼Rustè¦æ±å¨å¼è¢«åç¨æ¶ä¸è½ç§»å¨å®ä»¬ãåè®¾ä½ å°è¿ä¸ªæªæ¥å¼ç§»å¨å°ä¸ä¸ªæ°çä½ç½®ï¼
let new_variable = response;
Rust没æåæ³æ¾å°æææ´»å¨çå¼ç¨å¹¶ç¸åºå°è°æ´å®ä»¬ãè¿äºå¼ç¨ä¸ä¼æåsocketåbufçæ°ä½ç½®ï¼èæ¯ç»§ç»æåç°å¨å·²æªåå§åçresponseä¸çæ§ä½ç½®ãå¦å¾20 - 7æç¤ºï¼å®ä»¬åæäºæ¬ç©ºæéã
鲿¢è¢«åç¨çå¼è¢«ç§»å¨é常æ¯åç¨æ£æ¥å¨çèè´£ãåç¨æ£æ¥å¨å°åéè§ä¸ºææææ çæ ¹ï¼ä½ä¸åå¨å¨æ ä¸çåéä¸åï¼åå¨å¨æªæ¥å¼ä¸çåéä¼éçæªæ¥å¼æ¬èº«çç§»å¨èç§»å¨ãè¿æå³ç对socketåbufçåç¨ä¸ä»
å½±åfetch_string对å
¶èªèº«åéçæä½ï¼è¿å½±åå
¶è°ç¨è
对ææè¿äºåéçæªæ¥å¼responseçå®å
¨æä½ã弿¥å½æ°çæªæ¥å¼æ¯åç¨æ£æ¥å¨çä¸ä¸ªç²ç¹ï¼å¦æRustæ³è¦ä¿è¯å
åå®å
¨ï¼å°±å¿
须以æç§æ¹å¼è§£å³è¿ä¸ªé®é¢ã
å¾20 - 7 fetch_stringçæªæ¥å¼å¨è¢«åç¨æ¶è¢«ç§»å¨ï¼Rustä¼é»æ¢è¿ç§æ
åµ)
Rust对è¿ä¸ªé®é¢çè§£å³æ¹æ¡åºäºè¿æ ·ä¸ä¸ªè§è§£ï¼æªæ¥å¼å¨ååå»ºæ¶æ»æ¯å¯ä»¥å®å ¨ç§»å¨çï¼åªæå¨è¢«è½®è¯¢åæåå¾ä¸å®å ¨ãéè¿è°ç¨å¼æ¥å½æ°ååå»ºçæªæ¥å¼åªä¿åä¸ä¸ªæ¢å¤ç¹ååæ°å¼ãè¿äºä» å¨å¼æ¥å½æ°ä½çä½ç¨åå ï¼è彿°ä½å°æªå¼å§æ§è¡ãåªæè½®è¯¢æªæ¥å¼æä¼åç¨å ¶å 容ã
ç±æ¤æä»¬å¯ä»¥çåºï¼æ¯ä¸ªæªæ¥å¼é½æä¸¤ä¸ªçå½å¨æé¶æ®µï¼
- 第ä¸é¶æ®µä»æªæ¥å¼å建æ¶å¼å§ãç±äºå½æ°ä½å°æªå¼å§æ§è¡ï¼å®çä»»ä½é¨åé½ä¸å¯è½è¢«åç¨ãæ¤æ¶ï¼å®åä»»ä½å ¶ä»Rustå¼ä¸æ ·å¯ä»¥å®å ¨ç§»å¨ã
- 第äºé¶æ®µä»æªæ¥å¼ç¬¬ä¸æ¬¡è¢«è½®è¯¢æ¶å¼å§ã䏿¦å½æ°ä½å¼å§æ§è¡ï¼å®å¯è½ä¼åç¨å¯¹åå¨å¨æªæ¥å¼ä¸çåéçå¼ç¨ï¼ç¶åè¿è¡
awaitæä½ï¼ä½¿æªæ¥å¼çè¿é¨åå¤äºè¢«åç¨ç¶æãä»ç¬¬ä¸æ¬¡è½®è¯¢ä¹åå¼å§ï¼æä»¬å¿ é¡»åè®¾æªæ¥å¼å¯è½ä¸è½å®å ¨ç§»å¨ã
第ä¸çå½å¨æé¶æ®µççµæ´»æ§ä½¿æä»¬è½å¤å°æªæ¥å¼ä¼ éç»block_onåspawnï¼å¹¶è°ç¨åraceåfuseè¿æ ·çéé
卿¹æ³ï¼è¿äºæ¹æ³é½æå¼æ¥åæªæ¥å¼ãå®é
ä¸ï¼å³ä½¿æååå»ºæªæ¥å¼ç弿¥å½æ°è°ç¨ä¹å¿
é¡»å°å
¶è¿åç»è°ç¨è
ï¼è¿ä¹æ¯ä¸æ¬¡ç§»å¨ã
è¦è¿å
¥ç¬¬äºçå½å¨æé¶æ®µï¼æªæ¥å¼å¿
须被轮询ãpollæ¹æ³è¦æ±æªæ¥å¼ä½ä¸ºPin<&mut Self>å¼ä¼ éãPinæ¯æéç±»åï¼å¦&mut Selfï¼çå
è£
å¨ï¼å®éå¶äºæéçä½¿ç¨æ¹å¼ï¼ç¡®ä¿å
¶æåç对象ï¼å¦Selfï¼åä¹ä¸è½è¢«ç§»å¨ãæä»¥å¨è½®è¯¢æªæ¥å¼ä¹åï¼ä½ å¿
é¡»å建ä¸ä¸ªæåå®çPinå
è£
æéã
è¿å°±æ¯Rustç¡®ä¿æªæ¥å¼å®å
¨ççç¥ï¼æªæ¥å¼å¨è¢«è½®è¯¢ä¹åä¸ä¼åå¾å±é©ï¼å¨æå»ºæåå®çPinå
è£
æéä¹åï¼ä½ ä¸è½è½®è¯¢æªæ¥å¼ï¼ä¸æ¦ä½ è¿æ ·åäºï¼æªæ¥å¼å°±ä¸è½å被移å¨ã
âä¸ä¸ªä¸è½ç§»å¨çå¼âå¬èµ·æ¥ä¼¼ä¹ä¸å¯è½ï¼å¨Rustä¸ç§»å¨æä½éå¤å¯è§ãæä»¬å°å¨ä¸ä¸èä¸è¯¦ç»è§£éPinå¦ä½ä¿æ¤æªæ¥å¼ã
è½ç¶æ¬èè®¨è®ºçæ¯å¼æ¥å½æ°ï¼ä½è¿éçææå 容ä¹éç¨äºå¼æ¥åãæ°å建ç弿¥åçæªæ¥å¼å°±åéå 䏿 ·ï¼åªæ¯æè·å®å°ä»å¨å´ä»£ç ä¸ä½¿ç¨çåéãåªæè½®è¯¢è¿ä¸ªæªæ¥å¼æä¼åå»ºå¯¹å ¶å 容çå¼ç¨ï¼ä½¿å ¶åå¾ä¸å®å ¨èä¸è½ç§»å¨ã
请记ä½ï¼è¿ç§ç§»å¨èå¼±æ§ä»
éäºå¼æ¥å½æ°å弿¥åçæªæ¥å¼ï¼ä»¥åå®ä»¬ç±ç¼è¯å¨çæçç¹æ®Futureå®ç°ãå¦æä½ åæä»¬å¨âè°ç¨å¤éå¨ï¼spawn_blockingâä¸ä¸ºSpawnBlockingç±»åæå¨å®ç°Future飿 ·ï¼ä¸ºèªå·±çç±»åæå¨å®ç°Futureï¼é£ä¹è¿æ ·çæªæ¥å¼å¨è¢«è½®è¯¢ä¹ååä¹åé½å¯ä»¥å®å
¨ç§»å¨ãå¨ä»»ä½æå¨å®ç°çpollä¸ï¼åç¨æ£æ¥å¨ä¼ç¡®ä¿å¨pollè¿åæ¶ï¼ä½ 对selfé¨åçä»»ä½åç¨é½å·²ç»æãåªæå¼æ¥å½æ°å弿¥åå
·æå¨å½æ°è°ç¨è¿ç¨ä¸æåæ§è¡ä¸åç¨ä»å¨è¿è¡çè½åï¼æä»¥æä»¬å¿
é¡»å°å¿å¤çå®ä»¬çæªæ¥å¼ã
# åºå®æéï¼Pinned Pointersï¼
Pinç±»åæ¯æåæªæ¥å¼çæéçå
è£
å¨ï¼å®éå¶äºæéçä½¿ç¨æ¹å¼ï¼ä»¥ç¡®ä¿æªæ¥å¼å¨è¢«è½®è¯¢åä¸è½è¢«ç§»å¨ã对äºé£äºä¸ä»æè¢«ç§»å¨çæªæ¥å¼ï¼è¿äºéå¶å¯ä»¥è§£é¤ï¼ä½å¯¹äºå®å
¨å°è½®è¯¢å¼æ¥å½æ°å弿¥åçæªæ¥å¼æ¥è¯´ï¼è¿äºéå¶è³å
³éè¦ã
è¿éæè¯´çæéï¼æ¯æä»»ä½å®ç°äºDerefï¼å¯è½è¿å®ç°äºDerefMutçç±»åãå
è£
å¨Pinä¸çæé称为åºå®æéãPin<&mut T>åPin<Box<T>>æ¯æ¯è¾å¸¸è§çä¾åã
æ ååºä¸Pinçå®ä¹å¾ç®åï¼
pub struct Pin<P> {
pointer: P,
}
2
3
注æï¼pointeråæ®µä¸æ¯pubçãè¿æå³çï¼æé æä½¿ç¨Pinçå¯ä¸æ¹å¼æ¯éè¿è¯¥ç±»åæä¾çç²¾å¿éæ©çæ¹æ³ã
对äºå¼æ¥å½æ°æå¼æ¥åçæªæ¥å¼ï¼åªæå ç§æ¹æ³å¯ä»¥è·åæåå®çåºå®æéï¼
futures-liteåºä¸çpin!å®ï¼ç¨ä¸ä¸ªç±»å为Pin<&mut T>çæ°åéé®è½ä¸ä¸ªç±»å为Tçåéãæ°åéæååå§åéçå¼ï¼è¯¥å¼å·²è¢«ç§»å¨å°æ ä¸çä¸ä¸ªå¿å临æ¶ä½ç½®ãå½åéè¶ åºä½ç¨åæ¶ï¼è¯¥å¼ä¼è¢«ä¸¢å¼ãæä»¬å¨block_onçå®ç°ä¸ä½¿ç¨pin!宿¥åºå®æä»¬æ³è¦è½®è¯¢çæªæ¥å¼ã- æ ååºç
Box::pinæé 彿°è·åä»»ä½ç±»åTçå¼çæææï¼å°å ¶ç§»å¨å°å ä¸ï¼å¹¶è¿åä¸ä¸ªPin<Box<T>>ã Pin<Box<T>>å®ç°äºFrom<Box<T>>ï¼æä»¥Pin::from(boxed)è·åboxedçæææï¼å¹¶è¿åä¸ä¸ªæåå ä¸ç¸åTçåºå®è£ ç®±æéã
è·åæåè¿äºæªæ¥å¼çåºå®æéçæ¯ç§æ¹æ³é½æå³çæ¾å¼å¯¹æªæ¥å¼çæææï¼è䏿²¡æåæ³åå°å ¶ååãå½ç¶ï¼åºå®æéæ¬èº«å¯ä»¥éæç§»å¨ï¼ä½ç§»å¨æéå¹¶ä¸ä¼ç§»å¨å®ææåç对象ãæä»¥ï¼æ¥æä¸ä¸ªæåæªæ¥å¼çåºå®æéå°±è¯æä½ å·²ç»æ°¸ä¹ æ¾å¼äºç§»å¨è¯¥æªæ¥å¼çè½åãè¿å°±æ¯æä»¬ç¡®ä¿å¯ä»¥å®å ¨è½®è¯¢æªæ¥å¼æéè¦ç¥éçå ¨é¨å 容ã
䏿¦åºå®äºä¸ä¸ªæªæ¥å¼ï¼å¦æä½ æ³è½®è¯¢å®ï¼ææPin<pointer to T>ç±»å齿ä¸ä¸ªas_mutæ¹æ³ï¼è¯¥æ¹æ³è§£å¼ç¨æéå¹¶è¿åpollæéçPin<&mut T>ã
as_mutæ¹æ³è¿å¯ä»¥å¸®å©ä½ å¨ä¸æ¾å¼æææçæ
åµä¸è½®è¯¢æªæ¥å¼ãæä»¬çblock_onå®ç°ä¸å°±æ¯è¿æ ·ä½¿ç¨å®çï¼
pin!(future);
loop {
match future.as_mut().poll(&mut context) {
Poll::Ready(value) => return value,
Poll::Pending => parker.park(),
}
}
2
3
4
5
6
7
è¿éï¼pin!å®å°future鿰声æä¸ºPin<&mut F>ï¼æä»¥æä»¬æ¬å¯ä»¥ç´æ¥å°å
¶ä¼ éç»pollã使¯å¯åå¼ç¨æ¯ä¸å¯å¤å¶çï¼æä»¥Pin<&mut F>ä¹ä¸å¯å¤å¶ï¼è¿æå³çç´æ¥è°ç¨future.poll()ä¼è·åfutureçæææï¼å¯¼è´å¾ªç¯çä¸ä¸æ¬¡è¿ä»£ä¸åéæªåå§åã为äºé¿å
è¿ç§æ
åµï¼æä»¬å¨æ¯æ¬¡å¾ªç¯è¿ä»£æ¶è°ç¨future.as_mut()ï¼éæ°åç¨ä¸ä¸ªæ°çPin<&mut F>ã
没æåæ³è·åæååºå®æªæ¥å¼ç&mutå¼ç¨ï¼å¦æä½ è½è·åï¼å°±å¯ä»¥ä½¿ç¨std::mem::replaceæstd::mem::swapå°å
¶ç§»åºï¼å¹¶ç¨å¦ä¸ä¸ªæªæ¥å¼æ¿æ¢å®ã
卿®é弿¥ä»£ç ä¸ï¼æä»¬ä¸å¿
æ
å¿åºå®æªæ¥å¼çé®é¢ï¼æ¯å 为è·åæªæ¥å¼çæå¸¸è§æ¹å¼ï¼çå¾
å®æä¼ éç»æ§è¡å¨ï¼é½ä¼è·åæªæ¥å¼çæææï¼å¹¶å¨å
é¨ç®¡çåºå®æä½ãä¾å¦ï¼æä»¬çblock_onå®ç°è·åæªæ¥å¼çæææï¼å¹¶ä½¿ç¨pin!å®çæè½®è¯¢æéçPin<&mut F>ãawait表达å¼ä¹ä¼è·åæªæ¥å¼çæææï¼å¹¶å¨å
é¨ä½¿ç¨ç±»ä¼¼äºpin!å®çæ¹æ³ã
# Unpinç¹æ§
ç¶èï¼å¹¶éæææªæ¥å¼é½éè¦è¿ç§å°å¿çå¤çã对äºä¸ºæ®éç±»åï¼æ¯å¦æä»¬å颿å°çSpawnBlockingç±»åï¼æå¨å®ç°çFutureï¼æé å使ç¨åºå®æéçéå¶æ¯ä¸å¿
è¦çã
è¿ç§ç¨³å®çç±»åå®ç°äºUnpinæ è®°ç¹æ§ï¼
trait Unpin { }
å¨Rustä¸ï¼å 乿æç±»åé½åå©ç¼è¯å¨çç¹æ®æ¯æèªå¨å®ç°äºUnpinã弿¥å½æ°å弿¥åçæªæ¥å¼æ¯è¿ä¸ªè§åçä¾å¤ã
对äºå®ç°äºUnpinçç±»åï¼Pinä¸ä¼æ½å ä»»ä½éå¶ãä½ å¯ä»¥ä½¿ç¨Pin::new仿®éæéå建ä¸ä¸ªåºå®æéï¼è¿å¯ä»¥ä½¿ç¨Pin::into_innerå°æéååºãPinæ¬èº«ä¼æ²¿ç¨æéèªèº«çDerefåDerefMutå®ç°ã
ä¾å¦ï¼Stringå®ç°äºUnpinï¼æä»¥æä»¬å¯ä»¥è¿æ ·åï¼
let mut string = "Pinned?".to_string();
let mut pinned: Pin<&mut String> = Pin::new(&mut string);
pinned.push_str(" Not");
Pin::into_inner(pinned).push_str(" so much.");
let new_home = string;
assert_eq!(new_home, "Pinned? Not so much.");
2
3
4
5
6
å³ä½¿å建äºPin<&mut String>ï¼æä»¬ä»ç¶å¯ä»¥å®å
¨å¯åå°è®¿é®å符串ï¼å¹¶ä¸å¨into_inneræ¶èäºPinï¼å¯åå¼ç¨æ¶å¤±åï¼è¿å¯ä»¥å°å
¶ç§»å¨å°ä¸ä¸ªæ°åéä¸ãæä»¥å¯¹äºå®ç°äºUnpinçç±»åï¼å 乿¯ææç±»åï¼ï¼Pinåªæ¯å¯¹æå该类åçæéçä¸ä¸ªç®åå
è£
ã
è¿æå³çï¼å½ä½ 为èªå·±çUnpinç±»åå®ç°Futureæ¶ï¼ä½ çpollå®ç°å¯ä»¥å°selfå½ä½&mut Selfæ¥å¤çï¼è䏿¯Pin<&mut Self>ãåºå®æä½å¨å¾å¤§ç¨åº¦ä¸å¯ä»¥è¢«å¿½ç¥ã
ä½ å¯è½ä¼æè®¶å°åç°ï¼å³ä½¿F没æå®ç°Unpinï¼Pin<&mut F>åPin<Box<F>>ä¹å®ç°äºUnpinãè¿å¬èµ·æ¥æç¹å¥æª ââ ä¸ä¸ªPinæä¹ä¼æ¯Unpinçå¢ï¼ä½å¦æä½ ä»ç»æèæ¯ä¸ªæ¯è¯çå«ä¹ï¼å°±ä¼åç°è¿æ¯æéççãå³ä½¿Få¨è¢«è½®è¯¢åä¸è½å®å
¨ç§»å¨ï¼ä½æåå®çæéæ 论æ¯å¦è¢«è½®è¯¢ï¼æ»æ¯å¯ä»¥å®å
¨ç§»å¨çãç§»å¨çåªæ¯æéï¼è宿æåçæåå¯¹è±¡ä¿æä¸å¨ã
å½ä½ æ³å°å¼æ¥å½æ°æå¼æ¥åçæªæ¥å¼ä¼ éç»ä¸ä¸ªåªæ¥åUnpinæªæ¥å¼ç彿°æ¶ï¼äºè§£è¿ä¸ç¹å¾æç¨ãï¼å¨async_stdä¸ï¼è¿æ ·ç彿°å¾å°è§ï¼ä½å¨å¼æ¥çæç³»ç»çå
¶ä»å°æ¹å¹¶é妿¤ãï¼å³ä½¿F没æå®ç°Unpinï¼Pin<Box<F>>乿¯Unpinçï¼æä»¥å¯¹å¼æ¥å½æ°æå¼æ¥åçæªæ¥å¼åºç¨Box::pinï¼ä½ å°±å¯ä»¥å¾å°ä¸ä¸ªå¯ä»¥å¨ä»»ä½å°æ¹ä½¿ç¨çæªæ¥å¼ï¼ä»£ä»·æ¯ä¸æ¬¡å å
ååé
ã
æåç§ç¨äºå¤çPinçä¸å®å
¨æ¹æ³ï¼è¿äºæ¹æ³å
è®¸ä½ å¯¹æéåå
¶ç®æ 对象åä»»ä½ä½ æ³åçæä½ï¼å³ä½¿ç®æ ç±»åæ²¡æå®ç°Unpinã使£å¦ç¬¬22ç« æè§£éçï¼Rustæ æ³æ£æ¥è¿äºæ¹æ³çä½¿ç¨æ¯å¦æ£ç¡®ï¼ä½¿ç¨å®ä»¬ç代ç çå®å
¨æ§ç±ä½ èªå·±è´è´£ã
# 弿¥ä»£ç å¨ä½æ¶æç¨ï¼
ç¼å弿¥ä»£ç æ¯ç¼åå¤çº¿ç¨ä»£ç æ´å¤æãä½ å¿ é¡»ä½¿ç¨æ£ç¡®çI/Oå忥åè¯ï¼æå¨æåé¿æ¶é´è¿è¡ç计ç®ï¼æè å°å®ä»¬æ¾å°å ¶ä»çº¿ç¨ä¸æ§è¡ï¼è¿è¦å¤çååºå®æä½è¿æ ·å¨çº¿ç¨ä»£ç ä¸ä¸ä¼åºç°çå ¶ä»ç»èãé£ä¹å¼æ¥ä»£ç å ·ä½æåªäºä¼å¿å¢ï¼
æä¸¤ä¸ªå¸¸è§ç说æ³ç»ä¸èµ·ä»ç»æ¨æ²ï¼
- â弿¥ä»£ç é常éåI/Oæä½ãâ è¿ç§è¯´æ³å¹¶ä¸å®å ¨æ£ç¡®ãå¦æä½ çåºç¨ç¨åºå¨çå¾ I/Oæä½ä¸è±è´¹æ¶é´ï¼å°å ¶å¼æ¥åå¹¶ä¸ä¼ä½¿I/Oæä½è¿è¡å¾æ´å¿«ãå¦ä»å¸¸ç¨ç弿¥I/Oæ¥å£å¨æçä¸å¹¶ä¸æ¯åæ¥æ¥å£æ´é«ãæ 论åªç§æ¹å¼ï¼æä½ç³»ç»è¦åçå·¥ä½é½æ¯ä¸æ ·çãï¼å®é ä¸ï¼ä¸ä¸ªå°æªåå¤å¥½ç弿¥I/Oæä½ç¨åå¿ é¡»åæ¬¡å°è¯ï¼æä»¥å®éè¦ä¸¤æ¬¡ç³»ç»è°ç¨æè½å®æï¼è䏿¯ä¸æ¬¡ãï¼
- â弿¥ä»£ç æ¯å¤çº¿ç¨ä»£ç æ´å®¹æç¼åãâ å¨åJavaScriptåPythonè¿æ ·çè¯è¨ä¸ï¼è¿å¯è½ç¡®å®æ¯ççãå¨è¿äºè¯è¨ä¸ï¼ç¨åºåå°
async/awaitä½ä¸ºä¸ç§è¡ä¸ºè¯å¥½çå¹¶åå½¢å¼ä½¿ç¨ï¼åªæä¸ä¸ªæ§è¡çº¿ç¨ï¼å¹¶ä¸åªæå¨await表达å¼å¤æä¼åçä¸æï¼æä»¥é常ä¸éè¦äºæ¥éæ¥ä¿è¯æ°æ®ä¸è´æ§ï¼åªè¦å¨ä½¿ç¨æ°æ®æ¶ä¸è¿è¡awaitæä½å°±è¡ï¼å½ä»»å¡åæ¢åªæå¨ä½ æç¡®å è®¸çæ åµä¸æä¼åçæ¶ï¼ç解代ç è¦å®¹æå¾å¤ã
ä½è¿ä¸ªè§ç¹å¹¶ä¸éç¨äºRustï¼å¨Rustä¸çº¿ç¨å¹¶æ²¡æé£ä¹éº»ç¦ã䏿¦ä½ çç¨åºç¼è¯éè¿ï¼å°±ä¸ä¼å卿°æ®ç«äºé®é¢ãä¸ç¡®å®æ§è¡ä¸ºä» éäºåäºæ¥éãééãååæä½çåæ¥ç¹æ§ï¼èè¿äºç¹æ§å°±æ¯ä¸ºå¤çè¿ç±»é®é¢è设计çãæä»¥å¼æ¥ä»£ç å¨å¸®å©ä½ äºè§£å ¶ä»çº¿ç¨å¯è½ä½æ¶å½±åä½ çä»£ç æ¹é¢å¹¶æ²¡æç¬ç¹çä¼å¿ï¼å¨ææå®å ¨çRust代ç ä¸è¿ä¸ç¹é½å¾æ¸ æ¥ãå½ç¶ï¼Rustç弿¥æ¯æä¸çº¿ç¨ç»åä½¿ç¨æ¶ç¡®å®é常åºè²ã妿æ¾å¼è¿ç§ç»åï¼é£å°±å¤ªå¯æäºã
é£ä¹ï¼å¼æ¥ä»£ç çæ£çä¼å¿æ¯ä»ä¹å¢ï¼
- 弿¥ä»»å¡å¯ä»¥ä½¿ç¨æ´å°çå åãå¨Linuxä¸ï¼ä¸ä¸ªçº¿ç¨çå å使ç¨éï¼å æ¬ç¨æ·ç©ºé´åå æ ¸ç©ºé´ï¼èµ·å§ä¸º20 KiBãèæªæ¥å¼å¯ä»¥å°å¾å¤ï¼æä»¬è天æå¡å¨çæªæ¥å¼å¤§å°åªæå ç¾åèï¼å¹¶ä¸éçRustç¼è¯å¨ç䏿æ¹è¿ï¼è¿å¨å徿´å°ã
- 弿¥ä»»å¡çå建é度æ´å¿«ãå¨Linuxä¸ï¼å建ä¸ä¸ªçº¿ç¨å¤§çº¦éè¦15å¾®ç§ãçæä¸ä¸ªå¼æ¥ä»»å¡å¤§çº¦éè¦300纳ç§ï¼å¤§çº¦æ¯åå»ºçº¿ç¨æ¶é´çäºååä¹ä¸ã
- å¨Linuxä¸ï¼å¼æ¥ä»»å¡ä¹é´çä¸ä¸æåæ¢æ¯æä½ç³»ç»çº¿ç¨ä¹é´çä¸ä¸æåæ¢æ´å¿«ï¼å嫿¯0.2å¾®ç§å1.7å¾®ç§ãä¸è¿ï¼è¿äºé½æ¯æ¯ç§æ åµçæä½³æ°æ®ï¼å¦æåæ¢æ¯ç±äºI/Oåå¤å°±ç»ªå¼èµ·çï¼ä¸¤ç§æ åµä¸çå¼éé½ä¼å¢å å°1.7å¾®ç§ã忢åçå¨ä¸åå¤ç卿 ¸å¿ä¸ç线ç¨è¿æ¯ä»»å¡ä¹é´ï¼ä¹ä¼æå¾å¤§å·®å¼ï¼æ ¸å¿ä¹é´çéä¿¡éå¸¸æ ¢ã
è¿è®©æä»¬äºè§£å°å¼æ¥ä»£ç å¯ä»¥è§£å³åªäºç±»åçé®é¢ãä¾å¦ï¼ä¸ä¸ªå¼æ¥æå¡å¨æ¯ä¸ªä»»å¡å¯è½ä½¿ç¨æ´å°çå åï¼å æ¤è½å¤å¤çæ´å¤çå¹¶åè¿æ¥ãï¼è¿å¯è½å°±æ¯å¼æ¥ä»£ç âéåI/Oæä½â è¿ç§è¯´æ³çç±æ¥ãï¼æè ï¼å¦æä½ ç设计èªç¶å°ç±è®¸å¤ç¸äºéä¿¡çç¬ç«ä»»å¡ç»æï¼é£ä¹æ¯ä¸ªä»»å¡çä½å¼éãçå建æ¶é´åå¿«éä¸ä¸æåæ¢é½æ¯éè¦çä¼å¿ãè¿å°±æ¯ä¸ºä»ä¹è天æå¡å¨æ¯å¼æ¥ç¼ç¨çç»å ¸ç¤ºä¾ï¼èå¤äººæ¸¸æåç½ç»è·¯ç±å¨å¯è½ä¹æ¯å¼æ¥ä»£ç çè¯å¥½åºç¨åºæ¯ã
å¨å ¶ä»æ åµä¸ï¼ä½¿ç¨å¼æ¥ä»£ç çä¼å¿å°±ä¸é£ä¹ææ¾äºãå¦æä½ çç¨åºæä¸ä¸ªçº¿ç¨æ± å¨è¿è¡ç¹éç计ç®ï¼æè é²ç½®çå¾ I/O宿ï¼é£ä¹åé¢ååºçä¼å¿å¯è½å¯¹å ¶æ§è½å½±åä¸å¤§ãä½ å¿ é¡»ä¼å计ç®è¿ç¨ãå¯»æ¾æ´å¿«çç½ç»è¿æ¥ï¼æè éåå ¶ä»å®é å½±åéå¶å ç´ çæªæ½ã
å¨å®è·µä¸ï¼æä»¬æ¾å°çæ¯ä¸ç¯å ³äºå®ç°é«å¹¶åæå¡å¨çæç« é½å¼ºè°äºæµéãè°ä¼ä»¥åæç»è¯å«åæ¶é¤ä»»å¡ä¹é´ç«äºæºçéè¦æ§ã弿¥æ¶æå¹¶ä¸è½è®©ä½ è·³è¿è¿äºå·¥ä½ãäºå®ä¸ï¼è½ç¶æå¾å¤ç°æçå·¥å ·å¯ä»¥è¯ä¼°å¤çº¿ç¨ç¨åºçè¡ä¸ºï¼ä½Rust弿¥ä»»å¡å¯¹è¿äºå·¥å ·æ¯ä¸å¯è§çï¼å æ¤éè¦ä¸é¨é坹弿¥ä»»å¡çå·¥å ·ãï¼æ£å¦ä¸ä½æºè æè¯´ï¼âç°å¨ä½ åå¤äºä¸ä¸ªé®é¢ãâï¼
å³ä½¿ä½ ç°å¨ä¸ä½¿ç¨å¼æ¥ä»£ç ï¼ç¥é妿尿¥ä¸å¡éå¤§å¹ å¢å æ¶è¿æè¿ä¸ªéæ©ä¹æ¯ä¸éçã
æ¹æ³¨ï¼
- å¦æä½ ç¡®å®éè¦ä¸ä¸ªHTTP客æ·ç«¯ï¼å¯ä»¥èè使ç¨è®¸å¤ä¼ç§çåºï¼å¦
surfæreqwestï¼å®ä»¬è½æ£ç¡®ä¸å¼æ¥å°å®æå·¥ä½ãè¿ä¸ªå®¢æ·ç«¯ä¸»è¦æ¯ç¨äºå¤çHTTPSéå®åã- è¿å æ¬å æ ¸å åï¼å¹¶ä¸è®¡ç®çæ¯ä¸ºçº¿ç¨åé çç©ç页é¢ï¼è䏿¯èæçãå°æªåé ç页é¢ãå¨macOSåWindowsä¸ï¼è¿ä¸ªæ°åä¹ç±»ä¼¼ã
- å¨Linuxä¸ï¼ç±äºå¤çå¨å®å ¨æ¼æ´ï¼å æ ¸ä¸å¾ä¸ä½¿ç¨è¾æ ¢çææ¯ä¹åï¼ä¸ä¸æåæ¢æ¶é´ä¹å¨0.2å¾®ç§å·¦å³ã