rFTP - 用 Rust 实现简单的 FTP Server (2)

rFTP - 用 Rust 实现简单的 FTP Server (2)

前情提要

上一篇文章中,我讲述了我为什么选择 Rust 作为学习计算机底层知识的工具,一些 Rust 的基础知识和我当时所遇到的困难。在这篇又是属于目标回收的文章中,我将介绍最近的进展、Rust 开发的体验和下一步的计划。总的来说,用 Rust 写项目体验尚可,通过与编译器博弈而最终通过“考试”后,自己对 Rust 的理解也有了些许提升。

异步编程

在 Rust 中,异步编程是通过 Future 特征和 async/await 语法糖来实现的。Future 是 Rust 中的异步编程的基础,它代表了一个异步计算的结果或者异步任务的“承诺”,可以通过 poll 方法来获取计算的结果。async/await 语法糖则是为了让异步编程更加友好,通过 async 关键字来定义异步函数,通过 await 关键字来等待异步计算的结果。

越来越多的语言采纳异步编程机制,比如 Python 的 asyncio、JavaScript 的 Promise(或者是同样的 async/await)、Golang 的 goroutine 等等。异步编程的优势在于可以提高程序的并发性能,因为异步编程可以让程序在等待 I/O 操作的时候不阻塞,可以继续执行其他任务。Rust 的 Tokio 是一个基于 Future 的异步编程框架,它提供了很多异步编程的工具,比如 tokio::spawntokio::net::TcpStream 等等。

Future

可以看到,Future 是一个特征,它有一个关联类型 Output,代表了异步计算的结果类型。Future 只有一个 poll 方法,这个方法接受一个 Pin<&mut Self> 类型的参数,返回一个 Poll<Self::Output> 类型的结果。Poll 是一个枚举类型,它有两个成员 Ready(T)Pending,分别代表了异步计算已经完成和异步计算还在进行中。

future_poll.rs
1
2
3
4
5
6
7
8
9
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
Ready(T),
Pending,
}

通过 async 关键字修饰函数后,正常的函数将转化为一个异步函数,调用后返回一个实现了 Future 特征的类型,这个类型可以通过 await 关键字来等待异步计算的结果,而 await 关键字则会调用 poll 方法来获取计算的结果。在 poll 方法中还可以通过 cx.waker().wake_by_ref() 来唤醒任务,这样可以让任务在等待 I/O 操作时交出控制权而不阻塞主流程,在 I/O 操作完成时唤醒主流程继续执行。这样等待和唤醒的机制能够精确地控制任务的执行。

Rust 异步编程与其他语言的异同

惰性求值 (Evaluation)

Rust 的异步编程和 Python 的 asyncio 较为相似,调用 async 关键字修饰的函数后会返回一个“执行器”,直到调用 .await 后才开始执行异步函数和获取结果。Python 的 asyncio 也是这样的,调用 async 修饰的函数后会返回一个协程对象,需要调用 await 来执行协程。这种实现是惰性求值的风格,只有在需要的时候才执行任务,有助于不必要的资源消耗和任务调度,可以更灵活地控制异步任务的执行时机和顺序(例如通过 waker 控制何时唤醒)。

立即求值

JavaScript 和 Golang 的异步编程风格则属于立即求值,即调用异步函数后立即执行其中的语句,这种风格更加直观、符合直觉。而它们的具体方案也有所差异:

  • JavaScript 中的异步函数执行完成后是返回一个 Promise 对象,通过对其调用 .then() 或者 .catch() 方法可以获取其执行状态和结果,JavaScript 的实现就相当于调用异步函数后返回一个标识符,用以检查异步任务完成情况;
  • Go 的异步函数 go func() 就显得更加简单粗暴了,没有返回任何标识符,开发者不需要知道协程本身的信息,“任务能跑就行”。当要从协程中返回信息或者是要控制协程的状态时则需要通过传递 channel 进行通信的方式来完成。这可能也是 Go 简单哲学的一处体现吧。

Do not communicate by sharing memory; instead, share memory by communicating.
– from Andrew Gerrand

proms.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
(async () => {
const proms = new Promise((resolve, reject) => {
console.log("Hi, I'm out of setTimeout!");
setTimeout(() => {
console.log("Hi, I'm in setTimeout!");
resolve();
}, 1500);
});

console.log("Hi, I'm on the root.");

await proms;

console.log("Hi, everything is done!");
})();

// Run this script by node proms.js, you will get:
// Hi, I'm out of setTimeout!
// Hi, I'm on the root.
// Hi, I'm in setTimeout!
// Hi, everything is done!
goroutines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
go func() {
// Simple task
}()
complexTask := func(done chan struct{}, results chan Result) {
for {
select {
case <-done:
// Clean and exit
default:
// Do something
}
}
}
// go func() has no return value.
go complexTask(done, results)
}

错误处理

Rust 的错误处理机制是通过 ResultOption 类型来实现的,Result 代表了可能出现错误的结果,Option 代表了可能为空的结果。ResultOption 都是枚举类型,Result 有两个成员 Ok(T)Err(E)Option 有两个成员 Some(T)None。虽然和 Go 一样是通过返回值而非 try-catch 来传递错误信息,但是 Rust 的错误处理更加严谨细致,配合模式匹配和 ? 操作符可以更方便地处理错误。

result_option.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
enum Result<T, E> {
Ok(T),
Err(E),
}

enum Option<T> {
Some(T),
None,
}

fn helper_option() -> Option<i32> {
// Do something
}

fn helper_result() -> Result<i32, String> {
// Do something
}

fn process() -> Result<i32, String> {
// Option.ok_or() will return the value if it is Some(T), otherwise return the error message.
let value = helper_option().ok_or("No value found")?;
// Result? will return the value if it is Ok(T), otherwise return the error message.
let result = helper_result()?;
Ok(value + result)
}

当 Option 或者 Result 的值为 None 或者 Err 时,可以通过 ? 操作符来提前返回错误,这样可以减少代码的嵌套和提高代码的可读性。而且不必像 C++、Java 一样在 catch 语句块中单独处理错误信息,打乱原本代码的逻辑结构。

实话说,因为之前写过一段时间的 TypeScript,对其中的 ? 操作符比较有好感的,在 Rust 中使用这样的操作符也是一种愉悦的体验。这是 Rust 错误处理相对 Go 的一个重大优势。在 Go 里面,错误通过返回值来传递,而且没有类似 ? 操作符这样的语法糖,所以在处理错误时需要显式地检查错误并返回,这样会导致代码的嵌套和可读性下降。而且嵌套盘空也是影响代码可读性的一个重要问题。

nested_nil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type A struct {
// ...
}
type B struct {
A *A
}
type C struct {
B *B
}

func (c *C) DoSomething() error {
if c.B == nil {
return errors.New("B is nil")
}
if c.B.A == nil {
return errors.New("A is nil")
}
// Do something with c.B.A
return nil
}

FTP 服务器

在 Rust 中实现一个简单的 FTP 服务器是一个不错的练习,可以通过实现 FTP 协议来学习网络编程和异步编程。FTP 协议是一个比较古老的协议,它是基于文本的协议,通过控制连接和数据连接来实现文件的上传和下载。FTP 服务器的实现可以分为两个部分,一个是控制连接的处理,另一个是数据连接的处理。控制连接用于接收客户端的命令和发送响应,数据连接用于传输文件数据。

主流程

前面也提到过,我使用了 Rust 中的异步编程框架 Tokio 来实现 FTP 服务器。通过 Tokio::net::TcpListener 绑定通过参数指定的主机和端口后,通过 listener.accept() 方法来接受客户端的连接请求,返回一个 tokio::net::TcpStream 类型的流。随后通过 tokio::spawn 方法来创建一个异步任务 handle() 来专门处理来自该客户端的连接请求,这样可以让主线程继续接受其他客户端的连接请求。

handle() 中,首先会将 TcpStream 通过 into_split() 方法分割为读取流和写入流,然后通过读取流来接收客户端的命令,通过写入流来发送响应。在处理命令的过程中,会根据命令的类型来分发调用不同的处理函数,比如 USER 命令会调用 user() 函数,LIST 命令会调用 list() 函数等等。在我最初的实现中,在处理完完成一条命令后,服务器才会通过 writer.write_all() 方法来发送响应,然后继续等待下一个命令。但这种实现方式会在传输数据时阻塞主线程,导致中断传输命令 ABOR 无法被及时处理。于是再后来的优化里,我将发送响应的操作也放到了异步任务中,这样可以让主线程继续接受其他客户端的连接请求,而不会被阻塞。

下面的代码片段就是这个 FTP 处理客户端请求的主要流程。

ftp_server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
impl Server {
pub async fn listen(&self) {
loop {
if let Ok((socket, addr)) = self.listener.lock().await.accept().await {
// 收到客户端连接请求后,创建一个异步任务来处理
let shared_self = self.clone();
tokio::spawn(async move {
shared_self.handle(socket, addr).await;
});
} else {
continue;
}
}
}
pub async fn handle(&self, socket: TcpStream, addr: SocketAddr) {
let user_map = self.user_map.clone();
// 将 TcpStream 分割为读取流和写入流
let (mut reader, mut writer) = socket.into_split();
{
// 检查用户是否已经登录,如果是新用户,则发送欢迎信息
let mut user_map_locked = user_map.lock().await;
if !user_map_locked.contains_key(&addr) {
if let Err(e) = writer
.write_all(b"220 xxx.xxx.xxx.xxx FTP server ready.\r\n")
.await {
// ...
}
}
}
// 创建一个 Arc<Mutex> 来保护写入流,避免多个异步任务同时写入
let writer_guard = Arc::new(Mutex::new(writer));
loop {
let mut buf = vec![0; 2048];
// 读取客户端发送的命令
let req = {
let n = match reader.read(&mut buf).await {
// ...
};
String::from_utf8_lossy(&buf[..n]).to_string()
};
// ...
// 分发命令并进行错误处理
tokio::spawn(async move {
let cloned = cloned_writer.clone();
cloned_self.dispatch(cloned_writer.clone(), cmd, user).await;
});
}
}
}

主要涉及的结构体

服务器 Server

服务器结构体 Server 很简单,只包含了服务器的地址、端口、根目录、监听器、用户映射等信息。其中监听器是一个 Arc<TcpListener> 类型的字段,给 TcpListener 加上了 Clone 特征以满足在 Future 间传递的要求。用户映射是一个 Arc<Mutex<HashMap<SocketAddr, Arc<Mutex<User>>>>> 类型的字段,用于保存用户的信息。实现了 new() listen() handle() dispatch() 等方法和 FtpServer 特征。

server.rs
1
2
3
4
5
6
7
pub struct Server {
pub host: String,
pub port: u16,
pub root: String,
pub listener: Arc<TcpListener>,
pub user_map: Arc<Mutex<HashMap<SocketAddr, Arc<Mutex<User>>>>>,
}

用户 User

用户结构体 User 用于保存用户的信息,包括用户名、密码、当前目录、数据连接等信息。用户结构体还包括了一个 PathGuard 类型的字段,用于保护用户的当前目录,避免用户越权访问文件系统。用户结构体还包括了一个 TransferSession 类型的字段,用于保存数据连接的信息,包括数据连接的类型(主动或被动)、数据连接的地址等。

user.rs
1
2
3
4
5
6
7
8
9
pub struct User {
pub username: String,
pub status: UserStatus,
pub addr: SocketAddr,
pub session: Option<Arc<Mutex<TransferSession>>>,
pub trans_type: TransferType, // ASCII or Binary

path: PathGuard, // 保护 PWD 的工具类型
}

传输会话 TransferSession

传输会话结构体 TransferSession 用于保存数据连接的信息,包括数据连接的类型(主动或被动)、数据连接的地址、文件的总大小、已传输的大小、文件名等信息。传输会话结构体还包括了一个 offset 字段,用于指定文件的传输偏移量,以便在传输中断后继续传输(实现了 REST 命令)。

session.rs
1
2
3
4
5
6
7
8
9
pub struct TransferSession {
pub mode: TransferMode, // PORT/PASV
pub total_size: u64, // 文件总大小
pub finished_size: u64, // 已传输大小
pub file_name: String, // 文件名
pub finished: bool, // 是否传输完成
pub aborted: bool, // 是否传输中断
pub offset: u64, // 传输偏移量
}

开发经验

使用标准工具对拍

在开发过程中,我发现了一个很好的测试方法,那就是使用标准的 FTP 客户端来对拍自己的 FTP 服务器。通过使用 Python 的 ftplib.FTP 和 macOS 上的 Transmit 这个标准的 FTP 图形化客户端,我可以看到客户端和服务器之间的交互过程,可以更好地发现自己的错误。这种对拍的方法可以让开发工作更有目标,因为我知道自己的 FTP 是符合标准的。

使用 {} 来缩小作用域

在 Rust 中,可以通过 {} 来缩小变量的作用域,这样可以避免变量的生命周期过长,提高代码的安全性。在开发过程中,因为通过 Mutex.lock() 方法拿到的是一个 MutexGuard 类型,它的生命周期和得到的锁的生命周期是一样的,所以可以通过 {} 来缩小锁的作用域,避免锁的生命周期过长而导致可能的死锁。

多看编译器的提示

Rust 是以编译时检查为主的语言,编译器会给出很多有用的提示,比如未使用的变量、未处理的错误、不安全的代码等等。在开发过程中,我发现多看编译器的提示可以帮助我更好地理解 Rust 的语法和规则,提高代码的质量。通过解决编译器提出的问题,有种准备考试时专项训练的感觉,完成这些测试就能拿到不错的成绩,这个过程是一种挣扎也是一种解谜,我的直观感受是节省了不少查阅文档的时间。

当前进展和下一步计划

已实现功能

已经实现的功能包括了 FTP 服务器的基本功能,包括用户登录、文件传输、目录操作、文件操作等。下面是在开发过程中定义的命令枚举类型,包括了目前 FTP 服务器支持的所有命令。

commands.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
pub enum FtpCommand {
USER(String),
PASS(String),
PORT(SocketAddr),
PASV,
RETR(String),
STOR(String),
ABOR,
QUIT,
SYST,
TYPE(String),
RNFR(String),
RNTO(String),
PWD,
CWD(String),
MKD(String),
RMD(String),
LIST(Option<String>),

REST(u64), // To be tested
DELE(String),
STAT(Option<String>),
STOU,
APPE(String),
ALLO(u64),
NOOP,
NLST(Option<String>),
CDUP,

FEAT,
MDTM(String),
}

下一步计划

从 2023 年写毕业论文思绪万千时想用 Rust 写 FTP 服务器来放松心情,到 2024 年的今天终于算是完成了这个玩具工具的开发。从 Git 提交记录来看其间搁置了接近 17 个月,6 月终于是有两三周的时间和动力来完成这个小项目。下一步的计划是继续完善该项目的功能,包括但不限于下面的几项。最终要是能让这个工具变得真正有用就再好不过了。

  • FTP 客户端: 实现一个简单的 FTP 客户端,配套测试服务器的功能
  • 优化代码: 优化代码结构和性能,提高代码的可读性和可维护性
  • 补全测试: 增加更多的测试代码,提升代码质量;进行压力测试,检查性能效果
  • 制品发布: 使用 Docker 和 Github Action 构建和发布制品,便于部署和使用

这篇文章里实现的 FTP 服务器 “rFTP” 的代码可以在我的 Github 仓库 powerfooi/rftp 中查看。

参考资料

rFTP - 用 Rust 实现简单的 FTP Server (2)

https://powerfooi.github.io/2024/06/22/RFTP-2/

作者

PowerfooI

发布于

2024-06-22

更新于

2024-08-04

许可协议

评论