1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use crate::settings::{rewrite::Rewrite, timer::Timer};
use notify::{
    event::{ModifyKind, RenameMode},
    Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
};
use serde::Deserialize;
use std::path::PathBuf;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing::error;

#[derive(Deserialize, Clone)]
pub struct Notify {
    /// Paths to monitor
    pub paths: Vec<String>,
    /// Rewrite path
    pub rewrite: Option<Rewrite>,
    /// Recursive monitoring (default: true)
    pub recursive: Option<bool>,
    /// Targets to exclude
    #[serde(default)]
    pub excludes: Vec<String>,
    /// Timer
    pub timer: Timer,
}

impl Notify {
    pub fn send_event(
        &self,
        tx: UnboundedSender<String>,
        path: Option<&PathBuf>,
    ) -> anyhow::Result<()> {
        if path.is_none() {
            return Ok(());
        }

        let mut path = path.unwrap().to_string_lossy().to_string();

        if let Some(rewrite) = &self.rewrite {
            path = rewrite.rewrite_path(path);
        }

        tx.send(path).map_err(|e| anyhow::anyhow!(e))
    }

    pub fn async_watcher(
        &self,
    ) -> notify::Result<(RecommendedWatcher, UnboundedReceiver<notify::Result<Event>>)> {
        let (tx, rx) = unbounded_channel();

        let watcher = RecommendedWatcher::new(
            move |res| {
                if let Err(e) = tx.send(res) {
                    error!("unable to process notify event: {e}")
                }
            },
            Config::default(),
        )?;

        Ok((watcher, rx))
    }

    pub async fn watcher(&self, tx: UnboundedSender<String>) -> anyhow::Result<()> {
        let (mut watcher, mut rx) = self.async_watcher()?;

        for path in &self.paths {
            watcher.watch(
                path.as_ref(),
                if self.recursive.unwrap_or(true) {
                    RecursiveMode::Recursive
                } else {
                    RecursiveMode::NonRecursive
                },
            )?;
        }

        while let Some(res) = rx.recv().await {
            match res {
                Ok(event) => match event.kind {
                    EventKind::Modify(ModifyKind::Data(_))
                    | EventKind::Modify(ModifyKind::Metadata(_))
                    | EventKind::Modify(ModifyKind::Name(RenameMode::Both))
                    | EventKind::Create(_)
                    | EventKind::Remove(_) => {
                        for path in event.paths {
                            self.send_event(tx.clone(), Some(&path))?;
                        }
                    }
                    _ => {}
                },
                Err(e) => error!("unable to process notify event: {e}"),
            }
        }

        Ok(())
    }
}