ftlog阅读报告 | PHM's world

LOADING

歡迎來到烏托邦的世界

ftlog阅读报告

ftlog异步日志阅读报告(1)

ftlog是GitHub上的一个由非凸科技开源的异步日志项目。

项目地址:https://github.com/nonconvextech/ftlog

我对其阅读加深理解。

异步的实现:ftlog 库通过使用一个专用的日志记录线程和异步消息通道来实现异步日志记录。这种方式可以将日志记录操作从主线程或工作线程中分离出来,从而减少主线程的阻塞,提高日志记录的性能。

异步实现

  1. 日志记录线程

    • ftlog 启动了一个专用的日志记录线程,该线程负责接收和处理来自主线程或工作线程的日志消息。
  2. 消息通道

    • ftlog 使用了 crossbeam_channel 库提供的有界或无界通道(bounded or unbounded channel)来在主线程和日志记录线程之间传递日志消息。

    • 注:crossbeam_channel 是 Rust 语言中的一个高性能、跨线程的消息传递库。

      分为有界通道和无界通道:

      • 有界通道(bounded channel):创建一个固定大小的缓冲区,当缓冲区满时发送操作将阻塞。

      • 无界通道(unbounded channel):缓冲区大小动态增长,不会阻塞发送操作,但可能导致内存消耗较大。

        使用示例:

        use crossbeam_channel::bounded;
        use std::thread;
        
        fn main() {
            // 创建一个有界通道,容量为5
            let (sender, receiver) = bounded(5);
        
            // 创建一个发送线程
            let sender_thread = thread::spawn(move || {
                for i in 0..10 {
                    sender.send(i).unwrap();
                    println!("Sent: {}", i);
                }
            });
        
            // 创建一个接收线程
            let receiver_thread = thread::spawn(move || {
                for i in receiver {
                    println!("Received: {}", i);
                }
            });
        
            sender_thread.join().unwrap();
            receiver_thread.join().unwrap();
        }
        
        
        fn main() {
            // 创建一个无界通道
            let (sender, receiver) = unbounded();
        
            // 创建一个发送线程
            let sender_thread = thread::spawn(move || {
                for i in 0..10 {
                    sender.send(i).unwrap();
                    println!("Sent: {}", i);
                }
            });
        
            // 创建一个接收线程
            let receiver_thread = thread::spawn(move || {
                for i in receiver {
                    println!("Received: {}", i);
                }
            });
        
            sender_thread.join().unwrap();
            receiver_thread.join().unwrap();
        }
        
        //使用选择操作
        fn main() {
            let (s1, r1) = bounded(1);
            let (s2, r2) = bounded(1);
        
            thread::spawn(move || s1.send("message from s1").unwrap());
            thread::spawn(move || s2.send("message from s2").unwrap());
        
            select! {
                recv(r1) -> msg => println!("Received: {}", msg.unwrap()),
                recv(r2) -> msg => println!("Received: {}", msg.unwrap()),
            }
        }
        
  3. 异步消息传递

    • 主线程通过发送日志消息到通道来将日志消息传递给日志记录线程。
    • 日志记录线程从通道中接收消息并进行处理(如格式化、写入文件等)。

具体代码

以下是 ftlog 异步实现的关键部分:

  1. 消息通道的创建

    let (sync_sender, receiver) = match &self.bounded_channel_option {
        None => unbounded(),
        Some(option) => bounded(option.size),
    };
    let (notification_sender, notification_receiver) = bounded(1);
    

    根据配置,创建有界或无界的消息通道,用于在主线程和日志记录线程之间传递日志消息。

  2. 日志记录线程的启动

    std::thread::Builder::new()
        .name("logger".to_string())
        .spawn(move || {
            // 日志处理逻辑
            loop {
                match receiver.recv_timeout(timeout) {
                    Ok(LoggerInput::LogMsg(log_msg)) => {
                        // 处理日志消息
                    }
                    Ok(LoggerInput::Flush) => {
                        // 刷新日志
                    }
                    Err(RecvTimeoutError::Timeout) => {
                        // 处理超时
                    }
                    Err(e) => {
                        eprintln!("sender closed without sending a Quit first, this is a bug, {}", e);
                    }
                }
            }
        })?;
    

    使用 std::thread::Builder 创建并启动一个日志记录线程,该线程在循环中从通道中接收日志消息并进行处理。

  3. 日志消息的发送

    impl Log for Logger {
        fn log(&self, record: &Record) {
            let msg = self.format.msg(record);
            let log_msg = LoggerInput::LogMsg(LogMsg {
                time: now(),
                msg: msg,
                target: record.target().to_owned(),
                level: record.level(),
                limit: record.key_values().get(Key::from_str("limit")).and_then(|x| x.to_u64()).unwrap_or(0) as u32,
                limit_key,
            });
            if self.block {
                if let Err(_) = self.queue.send(log_msg) {
                    eprintln!("logger queue closed when logging, this is a bug");
                }
            } else {
                match self.queue.try_send(log_msg) {
                    Err(TrySendError::Full(_)) => {
                        if let Some(s) = &self.discard_state {
                            let count = s.count.fetch_add(1, Ordering::SeqCst);
                            if s.last.load().elapsed().as_secs() >= 5 {
                                eprintln!("Excessive log messages. Log omitted: {}", count);
                                s.last.store(Arc::new(Instant::now()));
                            }
                        }
                    }
                    Err(TrySendError::Disconnected(_)) => {
                        eprintln!("logger queue closed when logging, this is a bug");
                    }
                    _ => (),
                }
            }
        }
    
        fn flush(&self) {
            self.queue.send(LoggerInput::Flush).expect("logger queue closed when flushing, this is a bug");
            if let LoggerOutput::FlushError(err) = self.notification.recv().expect("logger notification closed, this is a bug") {
                eprintln!("Fail to flush: {}", err);
            }
        }
    }
    

    主线程在调用日志记录宏(如 info!debug! 等)时,会将日志消息打包为 LoggerInput::LogMsg 并发送到消息通道。日志记录线程会从通道中接收这些消息并进行处理。

  4. 消息处理

    日志记录线程在循环中从通道接收消息并处理:

    loop {
        match receiver.recv_timeout(timeout) {
            Ok(LoggerInput::LogMsg(log_msg)) => {
                log_msg.write(&filters, &mut appenders, &mut root, root_level, &mut missed_log, &mut last_log, offset, &time_format);
            }
            Ok(LoggerInput::Flush) => {
                // 刷新日志
            }
            Err(RecvTimeoutError::Timeout) => {
                // 处理超时
            }
            Err(e) => {
                eprintln!("sender closed without sending a Quit first, this is a bug, {}", e);
            }
        }
    }
    

    处理日志消息包括格式化日志、写入到目标输出(如文件)、处理超时和刷新操作等。

ftlog 通过使用一个专用的日志记录线程和异步消息通道实现了异步日志记录。这种方式将日志记录操作从主线程中分离出来,提高了日志记录的性能,并减少了主线程的阻塞。开发者可以通过配置 ftlog 来定制日志记录的行为和输出目标,以满足不同的性能和功能需求。