summaryrefslogtreecommitdiff
path: root/Assistant.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant.hs')
-rw-r--r--Assistant.hs167
1 files changed, 133 insertions, 34 deletions
diff --git a/Assistant.hs b/Assistant.hs
index e924d9477..413e5e90e 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -10,7 +10,7 @@
- The initial thread run, double forks to background, starts other
- threads, and then stops, waiting for them to terminate,
- or for a ctrl-c.
- - Thread 2: watcher
+ - Thread 2: Watcher
- Notices new files, and calls handlers for events, queuing changes.
- Thread 3: inotify internal
- Used by haskell inotify library to ensure inotify event buffer is
@@ -19,67 +19,166 @@
- Scans the tree and registers inotify watches for each directory.
- A MVar lock is used to prevent other inotify handlers from running
- until this is complete.
- - Thread 5: committer
+ - Thread 5: Committer
- Waits for changes to occur, and runs the git queue to update its
- - index, then commits.
- - Thread 6: status logger
+ - index, then commits. Also queues Transfer events to send added
+ - files to other remotes.
+ - Thread 6: Pusher
+ - Waits for commits to be made, and pushes updated branches to remotes,
+ - in parallel. (Forks a process for each git push.)
+ - Thread 7: PushRetryer
+ - Runs every 30 minutes when there are failed pushes, and retries
+ - them.
+ - Thread 8: Merger
+ - Waits for pushes to be received from remotes, and merges the
+ - updated branches into the current branch.
+ - (This uses inotify on .git/refs/heads, so there are additional
+ - inotify threads associated with it, too.)
+ - Thread 9: TransferWatcher
+ - Watches for transfer information files being created and removed,
+ - and maintains the DaemonStatus currentTransfers map.
+ - (This uses inotify on .git/annex/transfer/, so there are
+ - additional inotify threads associated with it, too.)
+ - Thread 10: Transferrer
+ - Waits for Transfers to be queued and does them.
+ - Thread 11: StatusLogger
- Wakes up periodically and records the daemon's status to disk.
- - Thread 7: sanity checker
+ - Thread 12: SanityChecker
- Wakes up periodically (rarely) and does sanity checks.
+ - Thread 13: MountWatcher
+ - Either uses dbus to watch for drive mount events, or, when
+ - there's no dbus, polls to find newly mounted filesystems.
+ - Once a filesystem that contains a remote is mounted, updates
+ - state about that remote, pulls from it, and queues a push to it,
+ - as well as an update, and queues it onto the
+ - ConnectedRemoteChan
+ - Thread 13: NetWatcher
+ - Deals with network connection interruptions, which would cause
+ - transfers to fail, and can be recovered from by waiting for a
+ - network connection, and syncing with all network remotes.
+ - Uses dbus to watch for network connections, or when dbus
+ - cannot be used, assumes there's been one every 30 minutes.
+ - Thread 15: TransferScanner
+ - Does potentially expensive checks to find data that needs to be
+ - transferred from or to remotes, and queues Transfers.
+ - Uses the ScanRemotes map.
+ - Thread 16: WebApp
+ - Spawns more threads as necessary to handle clients.
+ - Displays the DaemonStatus.
-
- ThreadState: (MVar)
- The Annex state is stored here, which allows resuscitating the
- - Annex monad in IO actions run by the inotify and committer
+ - Annex monad in IO actions run by the watcher and committer
- threads. Thus, a single state is shared amoung the threads, and
- only one at a time can access it.
- - DaemonStatusHandle: (MVar)
- - The daemon's current status. This MVar should only be manipulated
- - from inside the Annex monad, which ensures it's accessed only
- - after the ThreadState MVar.
+ - DaemonStatusHandle: (STM TMVar)
+ - The daemon's current status.
- ChangeChan: (STM TChan)
- Changes are indicated by writing to this channel. The committer
- reads from it.
+ - CommitChan: (STM TChan)
+ - Commits are indicated by writing to this channel. The pusher reads
+ - from it.
+ - FailedPushMap (STM TMVar)
+ - Failed pushes are indicated by writing to this TMVar. The push
+ - retrier blocks until they're available.
+ - TransferQueue (STM TChan)
+ - Transfers to make are indicated by writing to this channel.
+ - TransferSlots (QSemN)
+ - Count of the number of currently available transfer slots.
+ - Updated by the transfer watcher, this allows other threads
+ - to block until a slot is available.
+ - This MVar should only be manipulated from inside the Annex monad,
+ - which ensures it's accessed only after the ThreadState MVar.
+ - ScanRemotes (STM TMVar)
+ - Remotes that have been disconnected, and should be scanned
+ - are indicated by writing to this TMVar.
-}
+{-# LANGUAGE CPP #-}
+
module Assistant where
-import Common.Annex
+import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
-import Assistant.Watcher
-import Assistant.Committer
-import Assistant.SanityChecker
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.ScanRemotes
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Assistant.Threads.Watcher
+import Assistant.Threads.Committer
+import Assistant.Threads.Pusher
+import Assistant.Threads.Merger
+import Assistant.Threads.TransferWatcher
+import Assistant.Threads.Transferrer
+import Assistant.Threads.SanityChecker
+import Assistant.Threads.MountWatcher
+import Assistant.Threads.NetWatcher
+import Assistant.Threads.TransferScanner
+#ifdef WITH_WEBAPP
+import Assistant.Threads.WebApp
+#else
+#warning Building without the webapp. You probably need to install Yesod..
+#endif
import qualified Utility.Daemon
import Utility.LogFile
+import Utility.ThreadScheduler
import Control.Concurrent
-startDaemon :: Bool -> Annex ()
-startDaemon foreground
+stopDaemon :: Annex ()
+stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile
+
+startDaemon :: Bool -> Bool -> Maybe (Url -> FilePath -> IO ()) -> Annex ()
+startDaemon assistant foreground webappwaiter
| foreground = do
- showStart "watch" "."
+ showStart (if assistant then "assistant" else "watch") "."
+ liftIO . Utility.Daemon.lockPidFile =<< fromRepo gitAnnexPidFile
go id
| otherwise = do
logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile
pidfile <- fromRepo gitAnnexPidFile
go $ Utility.Daemon.daemonize logfd (Just pidfile) False
where
- go a = withThreadState $ \st -> do
- checkCanWatch
- dstatus <- startDaemonStatus
- liftIO $ a $ do
- changechan <- newChangeChan
- -- The commit thread is started early,
- -- so that the user can immediately
- -- begin adding files and having them
- -- committed, even while the startup scan
- -- is taking place.
- _ <- forkIO $ commitThread st changechan
- _ <- forkIO $ daemonStatusThread st dstatus
- _ <- forkIO $ sanityCheckerThread st dstatus changechan
- -- Does not return.
- watchThread st dstatus changechan
+ go d = startAssistant assistant d webappwaiter
-stopDaemon :: Annex ()
-stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile
+startAssistant :: Bool -> (IO () -> IO ()) -> Maybe (Url -> FilePath -> IO ()) -> Annex ()
+startAssistant assistant daemonize webappwaiter = do
+ withThreadState $ \st -> do
+ checkCanWatch
+ dstatus <- startDaemonStatus
+ liftIO $ daemonize $ run dstatus st
+ where
+ run dstatus st = do
+ changechan <- newChangeChan
+ commitchan <- newCommitChan
+ pushmap <- newFailedPushMap
+ transferqueue <- newTransferQueue
+ transferslots <- newTransferSlots
+ scanremotes <- newScanRemoteMap
+ mapM_ startthread
+ [ watch $ commitThread st changechan commitchan transferqueue dstatus
+#ifdef WITH_WEBAPP
+ , assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots Nothing webappwaiter
+#endif
+ , assist $ pushThread st dstatus commitchan pushmap
+ , assist $ pushRetryThread st dstatus pushmap
+ , assist $ mergeThread st
+ , assist $ transferWatcherThread st dstatus
+ , assist $ transfererThread st dstatus transferqueue transferslots
+ , assist $ daemonStatusThread st dstatus
+ , assist $ sanityCheckerThread st dstatus transferqueue changechan
+ , assist $ mountWatcherThread st dstatus scanremotes
+ , assist $ netWatcherThread st dstatus scanremotes
+ , assist $ transferScannerThread st dstatus scanremotes transferqueue
+ , watch $ watchThread st dstatus transferqueue changechan
+ ]
+ waitForTermination
+ watch a = (True, a)
+ assist a = (False, a)
+ startthread (watcher, a)
+ | watcher || assistant = void $ forkIO a
+ | otherwise = noop