..

dingir-exchange - fork-and-save 过程

dingir-exchange的 Technical Details 写道 Persistence 的2个特性1

(a)Append operation log and
(b)Redis-like fork-and-save persistence

Redis 的文档中关于 fork-and-save 的描述如下:

How it works 2
Whenever Redis needs to dump the dataset to disk, this is what happens:

  • Redis forks. We now have a child and a parent process.

  • The child starts to write the dataset to a temporary RDB file.

  • When the child is done writing the new RDB file, it replaces the old one.

This method allows Redis to benefit from copy-on-write semantics.


dingir-exchange

server.rs

在 GrpcHandler 的 new 方法中,调用

unsafe {
    crate::persist::fork_and_make_slice(&*stub_rd);
}

fork-and-save 的入口。
通过使用 tokio 的定时器 interval 3,配合 select4 周期性的调用,默认的 persist_interval = 3600s,代码如下:

let mut persist_interval = tokio::time::interval(std::time::Duration::from_secs(stub.settings.persist_interval as u64));
tokio::spawn(async move {
    persist_interval.tick().await; //skip first tick
    loop {
        tokio::select! {
            //此处代码略 ...

            _ = persist_interval.tick() => {
                let stub_rd = stub_for_dispatch.read().await;
                log::info!("Start a persisting task");
                unsafe {
                    crate::persist::fork_and_make_slice(&*stub_rd);
                }
            }

            //此处代码略 ...
        }
    }

    //此处代码略...
});

这里使用了 unsafe ,关于该特性,参考文档5.


简化后的 fork_and_make_slice 方法如下:

pub unsafe fn fork_and_make_slice(controller: *const Controller) /*-> SimpleResult*/
{    
    if !do_forking() {
        return;
    }

    // 略

    let controller = controller.as_ref().unwrap();
    make_slice(controller)
}

先看 do_forking 方法,看方法名就知道,这里应该是进行真正的 fork 操作。

#[cfg(not(target_family = "windows"))]
fn do_forking() -> bool {
    log::debug!("execute do_forking");

    unsafe {
        // clean last child process by waitpid
        let wait_res = nix::sys::wait::waitpid(nix::unistd::Pid::from_raw(-1), Some(nix::sys::wait::WaitPidFlag::WNOHANG));
        log::info!("waitpid result: {:#?}", wait_res);
        match nix::unistd::fork() {
            Ok(nix::unistd::ForkResult::Parent { child, .. }) => {
                log::info!("Continuing execution in parent process, new child has pid: {}", child);
                false
            }
            Ok(nix::unistd::ForkResult::Child) => {
                log::info!("fork success");
                true
            }
            //if fork fail? should we panic? this will make the main process exit
            //purpose to do that?
            Err(e) => panic!("Fork failed {}", e),
        }
    }
}

这里主要做了2件事儿:

1、通过 cfg 配置,该方法不支持 Windows 平台。
2、调用 fork() 后,父进程返回 false,子进程返回 true。

因此,在 fork_and_make_slice 方法中,判断 do_forking 的返回值,如果是父进程,直接返回,如果是子进程,则调用下面的 make_slice() 方法。
在 make_slice() 中是具体的写db操作。

在这个流程有几个知识点:
1、tokio的interval 和 tokio的select!
2、nix::unistd::fork() 的使用
3、tokio runtime处理fork后的操作
4、std::thread 的使用等

这些细节类的内容,后期会逐渐分析优缺点。


这就是整体的一个流程,不复杂,好理解,关键是在于细节上的一些处理。