summaryrefslogtreecommitdiff
path: root/Assistant/Threads
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/Threads')
-rw-r--r--Assistant/Threads/Committer.hs237
-rw-r--r--Assistant/Threads/Merger.hs80
-rw-r--r--Assistant/Threads/MountWatcher.hs189
-rw-r--r--Assistant/Threads/NetWatcher.hs132
-rw-r--r--Assistant/Threads/Pusher.hs82
-rw-r--r--Assistant/Threads/SanityChecker.hs98
-rw-r--r--Assistant/Threads/TransferScanner.hs138
-rw-r--r--Assistant/Threads/TransferWatcher.hs80
-rw-r--r--Assistant/Threads/Transferrer.hs113
-rw-r--r--Assistant/Threads/Watcher.hs251
-rw-r--r--Assistant/Threads/WebApp.hs112
11 files changed, 1512 insertions, 0 deletions
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs
new file mode 100644
index 000000000..5aadcc02a
--- /dev/null
+++ b/Assistant/Threads/Committer.hs
@@ -0,0 +1,237 @@
+{- git-annex assistant commit thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Committer where
+
+import Assistant.Common
+import Assistant.Changes
+import Assistant.Commits
+import Assistant.Alert
+import Assistant.ThreadedMonad
+import Assistant.Threads.Watcher
+import Assistant.TransferQueue
+import Assistant.DaemonStatus
+import Logs.Transfer
+import qualified Annex
+import qualified Annex.Queue
+import qualified Git.Command
+import qualified Git.HashObject
+import Git.Types
+import qualified Command.Add
+import Utility.ThreadScheduler
+import qualified Utility.Lsof as Lsof
+import qualified Utility.DirWatcher as DirWatcher
+import Types.KeySource
+
+import Data.Time.Clock
+import Data.Tuple.Utils
+import qualified Data.Set as S
+import Data.Either
+
+thisThread :: ThreadName
+thisThread = "Committer"
+
+{- This thread makes git commits at appropriate times. -}
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO ()
+commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do
+ -- We already waited one second as a simple rate limiter.
+ -- Next, wait until at least one change is available for
+ -- processing.
+ changes <- getChanges changechan
+ -- Now see if now's a good time to commit.
+ time <- getCurrentTime
+ if shouldCommit time changes
+ then do
+ readychanges <- handleAdds st changechan transferqueue dstatus changes
+ if shouldCommit time readychanges
+ then do
+ debug thisThread
+ [ "committing"
+ , show (length readychanges)
+ , "changes"
+ ]
+ void $ alertWhile dstatus commitAlert $
+ tryIO (runThreadState st commitStaged)
+ >> return True
+ recordCommit commitchan (Commit time)
+ else refill readychanges
+ else refill changes
+ where
+ refill [] = noop
+ refill cs = do
+ debug thisThread
+ [ "delaying commit of"
+ , show (length cs)
+ , "changes"
+ ]
+ refillChanges changechan cs
+
+
+commitStaged :: Annex ()
+commitStaged = do
+ Annex.Queue.flush
+ inRepo $ Git.Command.run "commit"
+ [ Param "--allow-empty-message"
+ , Param "-m", Param ""
+ -- Empty commits may be made if tree changes cancel
+ -- each other out, etc
+ , Param "--allow-empty"
+ -- Avoid running the usual git-annex pre-commit hook;
+ -- watch does the same symlink fixing, and we don't want
+ -- to deal with unlocked files in these commits.
+ , Param "--quiet"
+ ]
+
+{- Decide if now is a good time to make a commit.
+ - Note that the list of change times has an undefined order.
+ -
+ - Current strategy: If there have been 10 changes within the past second,
+ - a batch activity is taking place, so wait for later.
+ -}
+shouldCommit :: UTCTime -> [Change] -> Bool
+shouldCommit now changes
+ | len == 0 = False
+ | len > 10000 = True -- avoid bloating queue too much
+ | length (filter thisSecond changes) < 10 = True
+ | otherwise = False -- batch activity
+ where
+ len = length changes
+ thisSecond c = now `diffUTCTime` changeTime c <= 1
+
+{- If there are PendingAddChanges, the files have not yet actually been
+ - added to the annex (probably), and that has to be done now, before
+ - committing.
+ -
+ - Deferring the adds to this point causes batches to be bundled together,
+ - which allows faster checking with lsof that the files are not still open
+ - for write by some other process.
+ -
+ - When a file is added, Inotify will notice the new symlink. So this waits
+ - for additional Changes to arrive, so that the symlink has hopefully been
+ - staged before returning, and will be committed immediately.
+ -
+ - OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly
+ - created and staged.
+ -
+ - Returns a list of all changes that are ready to be committed.
+ - Any pending adds that are not ready yet are put back into the ChangeChan,
+ - where they will be retried later.
+ -}
+handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
+handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
+ (postponed, toadd) <- partitionEithers <$>
+ safeToAdd st pendingadds
+
+ unless (null postponed) $
+ refillChanges changechan postponed
+
+ returnWhen (null toadd) $ do
+ added <- catMaybes <$> forM toadd add
+ if (DirWatcher.eventsCoalesce || null added)
+ then return $ added ++ otherchanges
+ else do
+ r <- handleAdds st changechan transferqueue dstatus
+ =<< getChanges changechan
+ return $ r ++ added ++ otherchanges
+ where
+ (pendingadds, otherchanges) = partition isPendingAddChange cs
+
+ returnWhen c a
+ | c = return otherchanges
+ | otherwise = a
+
+ add :: Change -> IO (Maybe Change)
+ add change@(PendingAddChange { keySource = ks }) =
+ alertWhile' dstatus (addFileAlert $ keyFilename ks) $
+ liftM ret $ catchMaybeIO $
+ sanitycheck ks $ runThreadState st $ do
+ showStart "add" $ keyFilename ks
+ key <- Command.Add.ingest ks
+ handle (finishedChange change) (keyFilename ks) key
+ where
+ {- Add errors tend to be transient and will
+ - be automatically dealt with, so don't
+ - pass to the alert code. -}
+ ret (Just j@(Just _)) = (True, j)
+ ret _ = (True, Nothing)
+ add _ = return Nothing
+
+ handle _ _ Nothing = do
+ showEndFail
+ return Nothing
+ handle change file (Just key) = do
+ link <- Command.Add.link file key True
+ when DirWatcher.eventsCoalesce $ do
+ sha <- inRepo $
+ Git.HashObject.hashObject BlobObject link
+ stageSymlink file sha
+ queueTransfers Next transferqueue dstatus key (Just file) Upload
+ showEndOk
+ return $ Just change
+
+ {- Check that the keysource's keyFilename still exists,
+ - and is still a hard link to its contentLocation,
+ - before ingesting it. -}
+ sanitycheck keysource a = do
+ fs <- getSymbolicLinkStatus $ keyFilename keysource
+ ks <- getSymbolicLinkStatus $ contentLocation keysource
+ if deviceID ks == deviceID fs && fileID ks == fileID fs
+ then a
+ else return Nothing
+
+{- PendingAddChanges can Either be Right to be added now,
+ - or are unsafe, and must be Left for later.
+ -
+ - Check by running lsof on the temp directory, which
+ - the KeySources are locked down in.
+ -}
+safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change]
+safeToAdd st changes = runThreadState st $
+ ifM (Annex.getState Annex.force)
+ ( allRight changes -- force bypasses lsof check
+ , do
+ tmpdir <- fromRepo gitAnnexTmpDir
+ openfiles <- S.fromList . map fst3 . filter openwrite <$>
+ liftIO (Lsof.queryDir tmpdir)
+
+ -- TODO this is here for debugging a problem on
+ -- OSX, and is pretty expensive, so remove later
+ liftIO $ debug thisThread
+ [ "checking changes:"
+ , show changes
+ , "vs open files:"
+ , show openfiles
+ ]
+
+ let checked = map (check openfiles) changes
+
+ {- If new events are received when files are closed,
+ - there's no need to retry any changes that cannot
+ - be done now. -}
+ if DirWatcher.closingTracked
+ then do
+ mapM_ canceladd $ lefts checked
+ allRight $ rights checked
+ else return checked
+ )
+ where
+ check openfiles change@(PendingAddChange { keySource = ks })
+ | S.member (contentLocation ks) openfiles = Left change
+ check _ change = Right change
+
+ canceladd (PendingAddChange { keySource = ks }) = do
+ warning $ keyFilename ks
+ ++ " still has writers, not adding"
+ -- remove the hard link
+ void $ liftIO $ tryIO $
+ removeFile $ contentLocation ks
+ canceladd _ = noop
+
+ openwrite (_file, mode, _pid) =
+ mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
+
+ allRight = return . map Right
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
new file mode 100644
index 000000000..0f33b68ed
--- /dev/null
+++ b/Assistant/Threads/Merger.hs
@@ -0,0 +1,80 @@
+{- git-annex assistant git merge thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Merger where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Git
+import qualified Git.Merge
+import qualified Git.Branch
+import qualified Command.Sync
+
+thisThread :: ThreadName
+thisThread = "Merger"
+
+{- This thread watches for changes to .git/refs/heads/synced/,
+ - which indicate incoming pushes. It merges those pushes into the
+ - currently checked out branch. -}
+mergeThread :: ThreadState -> IO ()
+mergeThread st = do
+ g <- runThreadState st $ fromRepo id
+ let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler g a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+ debug thisThread ["watching", dir]
+
+type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler g handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler g file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ msg _ = error msg
+
+{- Called when a new branch ref is written.
+ -
+ - This relies on git's atomic method of updating branch ref files,
+ - which is to first write the new file to .lock, and then rename it
+ - over the old file. So, ignore .lock files, and the rename ensures
+ - the watcher sees a new file being added on each update.
+ -
+ - At startup, synthetic add events fire, causing this to run, but that's
+ - ok; it ensures that any changes pushed since the last time the assistant
+ - ran are merged in.
+ -}
+onAdd :: Handler
+onAdd g file _
+ | ".lock" `isSuffixOf` file = noop
+ | otherwise = do
+ let changedbranch = Git.Ref $
+ "refs" </> "heads" </> takeFileName file
+ current <- Git.Branch.current g
+ when (Just changedbranch == current) $ do
+ liftIO $ debug thisThread
+ [ "merging changes into"
+ , show current
+ ]
+ void $ mergeBranch changedbranch g
+
+mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
+mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs
new file mode 100644
index 000000000..cc7495602
--- /dev/null
+++ b/Assistant/Threads/MountWatcher.hs
@@ -0,0 +1,189 @@
+{- git-annex assistant mount watcher, using either dbus or mtab polling
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Assistant.Threads.MountWatcher where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.Sync
+import qualified Annex
+import qualified Git
+import Utility.ThreadScheduler
+import Utility.Mounts
+import Remote.List
+import qualified Types.Remote as Remote
+
+import Control.Concurrent
+import qualified Control.Exception as E
+import qualified Data.Set as S
+
+#if WITH_DBUS
+import Utility.DBus
+import DBus.Client
+import DBus
+import Data.Word (Word32)
+#else
+#warning Building without dbus support; will use mtab polling
+#endif
+
+thisThread :: ThreadName
+thisThread = "MountWatcher"
+
+mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+mountWatcherThread st handle scanremotes =
+#if WITH_DBUS
+ dbusThread st handle scanremotes
+#else
+ pollingThread st handle scanremotes
+#endif
+
+#if WITH_DBUS
+
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
+ where
+ go client = ifM (checkMountMonitor client)
+ ( do
+ {- Store the current mount points in an mvar,
+ - to be compared later. We could in theory
+ - work out the mount point from the dbus
+ - message, but this is easier. -}
+ mvar <- newMVar =<< currentMountPoints
+ forM_ mountChanged $ \matcher ->
+ listen client matcher $ \_event -> do
+ nowmounted <- currentMountPoints
+ wasmounted <- swapMVar mvar nowmounted
+ handleMounts st dstatus scanremotes wasmounted nowmounted
+ , do
+ runThreadState st $
+ warning "No known volume monitor available through dbus; falling back to mtab polling"
+ pollinstead
+ )
+ onerr :: E.SomeException -> IO ()
+ onerr e = do
+ runThreadState st $
+ warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")"
+ pollinstead
+ pollinstead = pollingThread st dstatus scanremotes
+
+{- Examine the list of services connected to dbus, to see if there
+ - are any we can use to monitor mounts. If not, will attempt to start one. -}
+checkMountMonitor :: Client -> IO Bool
+checkMountMonitor client = do
+ running <- filter (`elem` usableservices)
+ <$> listServiceNames client
+ case running of
+ [] -> startOneService client startableservices
+ (service:_) -> do
+ debug thisThread [ "Using running DBUS service"
+ , service
+ , "to monitor mount events."
+ ]
+ return True
+ where
+ startableservices = [gvfs]
+ usableservices = startableservices ++ [kde]
+ gvfs = "org.gtk.Private.GduVolumeMonitor"
+ kde = "org.kde.DeviceNotifications"
+
+startOneService :: Client -> [ServiceName] -> IO Bool
+startOneService _ [] = return False
+startOneService client (x:xs) = do
+ _ <- callDBus client "StartServiceByName"
+ [toVariant x, toVariant (0 :: Word32)]
+ ifM (elem x <$> listServiceNames client)
+ ( do
+ debug thisThread [ "Started DBUS service"
+ , x
+ , "to monitor mount events."
+ ]
+ return True
+ , startOneService client xs
+ )
+
+{- Filter matching events recieved when drives are mounted and unmounted. -}
+mountChanged :: [MatchRule]
+mountChanged = [gvfs True, gvfs False, kde, kdefallback]
+ where
+ {- gvfs reliably generates this event whenever a drive is mounted/unmounted,
+ - whether automatically, or manually -}
+ gvfs mount = matchAny
+ { matchInterface = Just "org.gtk.Private.RemoteVolumeMonitor"
+ , matchMember = Just $ if mount then "MountAdded" else "MountRemoved"
+ }
+ {- This event fires when KDE prompts the user what to do with a drive,
+ - but maybe not at other times. And it's not received -}
+ kde = matchAny
+ { matchInterface = Just "org.kde.Solid.Device"
+ , matchMember = Just "setupDone"
+ }
+ {- This event may not be closely related to mounting a drive, but it's
+ - observed reliably when a drive gets mounted or unmounted. -}
+ kdefallback = matchAny
+ { matchInterface = Just "org.kde.KDirNotify"
+ , matchMember = Just "enteredDirectory"
+ }
+
+#endif
+
+pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+pollingThread st dstatus scanremotes = go =<< currentMountPoints
+ where
+ go wasmounted = do
+ threadDelaySeconds (Seconds 10)
+ nowmounted <- currentMountPoints
+ handleMounts st dstatus scanremotes wasmounted nowmounted
+ go nowmounted
+
+handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO ()
+handleMounts st dstatus scanremotes wasmounted nowmounted =
+ mapM_ (handleMount st dstatus scanremotes . mnt_dir) $
+ S.toList $ newMountPoints wasmounted nowmounted
+
+handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO ()
+handleMount st dstatus scanremotes dir = do
+ debug thisThread ["detected mount of", dir]
+ reconnectRemotes thisThread st dstatus scanremotes
+ =<< filter (Git.repoIsLocal . Remote.repo)
+ <$> remotesUnder st dstatus dir
+
+{- Finds remotes located underneath the mount point.
+ -
+ - Updates state to include the remotes.
+ -
+ - The config of git remotes is re-read, as it may not have been available
+ - at startup time, or may have changed (it could even be a different
+ - repository at the same remote location..)
+ -}
+remotesUnder :: ThreadState -> DaemonStatusHandle -> FilePath -> IO [Remote]
+remotesUnder st dstatus dir = runThreadState st $ do
+ repotop <- fromRepo Git.repoPath
+ rs <- remoteList
+ pairs <- mapM (checkremote repotop) rs
+ let (waschanged, rs') = unzip pairs
+ when (any id waschanged) $ do
+ Annex.changeState $ \s -> s { Annex.remotes = rs' }
+ updateKnownRemotes dstatus
+ return $ map snd $ filter fst pairs
+ where
+ checkremote repotop r = case Remote.localpath r of
+ Just p | dirContains dir (absPathFrom repotop p) ->
+ (,) <$> pure True <*> updateRemote r
+ _ -> return (False, r)
+
+type MountPoints = S.Set Mntent
+
+currentMountPoints :: IO MountPoints
+currentMountPoints = S.fromList <$> getMounts
+
+newMountPoints :: MountPoints -> MountPoints -> MountPoints
+newMountPoints old new = S.difference new old
diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs
new file mode 100644
index 000000000..117ab7a20
--- /dev/null
+++ b/Assistant/Threads/NetWatcher.hs
@@ -0,0 +1,132 @@
+{- git-annex assistant network connection watcher, using dbus
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Assistant.Threads.NetWatcher where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.Sync
+import qualified Git
+import Utility.ThreadScheduler
+import Remote.List
+import qualified Types.Remote as Remote
+
+import qualified Control.Exception as E
+import Control.Concurrent
+
+#if WITH_DBUS
+import Utility.DBus
+import DBus.Client
+import DBus
+import Data.Word (Word32)
+#else
+#warning Building without dbus support; will poll for network connection changes
+#endif
+
+thisThread :: ThreadName
+thisThread = "NetWatcher"
+
+netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+netWatcherThread st dstatus scanremotes = do
+#if WITH_DBUS
+ void $ forkIO $ dbusThread st dstatus scanremotes
+#endif
+ {- This is a fallback for when dbus cannot be used to detect
+ - network connection changes, but it also ensures that
+ - any networked remotes that may have not been routable for a
+ - while (despite the local network staying up), are synced with
+ - periodically. -}
+ runEvery (Seconds 3600) $
+ handleConnection st dstatus scanremotes
+
+#if WITH_DBUS
+
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr
+ where
+ go client = ifM (checkNetMonitor client)
+ ( do
+ listenNMConnections client handle
+ listenWicdConnections client handle
+ , do
+ runThreadState st $
+ warning "No known network monitor available through dbus; falling back to polling"
+ )
+ onerr :: E.SomeException -> IO ()
+ onerr e = runThreadState st $
+ warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")"
+ handle = do
+ debug thisThread ["detected network connection"]
+ handleConnection st dstatus scanremotes
+
+{- Examine the list of services connected to dbus, to see if there
+ - are any we can use to monitor network connections. -}
+checkNetMonitor :: Client -> IO Bool
+checkNetMonitor client = do
+ running <- filter (`elem` [networkmanager, wicd])
+ <$> listServiceNames client
+ case running of
+ [] -> return False
+ (service:_) -> do
+ debug thisThread [ "Using running DBUS service"
+ , service
+ , "to monitor network connection events."
+ ]
+ return True
+ where
+ networkmanager = "org.freedesktop.NetworkManager"
+ wicd = "org.wicd.daemon"
+
+{- Listens for new NetworkManager connections. -}
+listenNMConnections :: Client -> IO () -> IO ()
+listenNMConnections client callback =
+ listen client matcher $ \event ->
+ when (Just True == anyM activeconnection (signalBody event)) $
+ callback
+ where
+ matcher = matchAny
+ { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active"
+ , matchMember = Just "PropertiesChanged"
+ }
+ nm_connection_activated = toVariant (2 :: Word32)
+ nm_state_key = toVariant ("State" :: String)
+ activeconnection v = do
+ m <- fromVariant v
+ vstate <- lookup nm_state_key $ dictionaryItems m
+ state <- fromVariant vstate
+ return $ state == nm_connection_activated
+
+{- Listens for new Wicd connections. -}
+listenWicdConnections :: Client -> IO () -> IO ()
+listenWicdConnections client callback =
+ listen client matcher $ \event ->
+ when (any (== wicd_success) (signalBody event)) $
+ callback
+ where
+ matcher = matchAny
+ { matchInterface = Just "org.wicd.daemon"
+ , matchMember = Just "ConnectResultsSent"
+ }
+ wicd_success = toVariant ("success" :: String)
+
+#endif
+
+handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+handleConnection st dstatus scanremotes = do
+ reconnectRemotes thisThread st dstatus scanremotes =<<
+ filter (Git.repoIsUrl . Remote.repo)
+ <$> networkRemotes st
+
+{- Finds network remotes. -}
+networkRemotes :: ThreadState -> IO [Remote]
+networkRemotes st = runThreadState st $
+ filter (isNothing . Remote.localpath) <$> remoteList
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
new file mode 100644
index 000000000..6bf8de2df
--- /dev/null
+++ b/Assistant/Threads/Pusher.hs
@@ -0,0 +1,82 @@
+{- git-annex assistant git pushing thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Pusher where
+
+import Assistant.Common
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.Alert
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Sync
+import Utility.ThreadScheduler
+import qualified Remote
+import qualified Types.Remote as Remote
+
+import Data.Time.Clock
+
+thisThread :: ThreadName
+thisThread = "Pusher"
+
+{- This thread retries pushes that failed before. -}
+pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> IO ()
+pushRetryThread st dstatus pushmap = runEvery (Seconds halfhour) $ do
+ -- We already waited half an hour, now wait until there are failed
+ -- pushes to retry.
+ topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
+ unless (null topush) $ do
+ debug thisThread
+ [ "retrying"
+ , show (length topush)
+ , "failed pushes"
+ ]
+ now <- getCurrentTime
+ void $ alertWhile dstatus (pushRetryAlert topush) $
+ pushToRemotes thisThread now st (Just pushmap) topush
+ where
+ halfhour = 1800
+
+{- This thread pushes git commits out to remotes soon after they are made. -}
+pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO ()
+pushThread st dstatus commitchan pushmap = do
+ runEvery (Seconds 2) $ do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ commits <- getCommits commitchan
+ -- Now see if now's a good time to push.
+ now <- getCurrentTime
+ if shouldPush now commits
+ then do
+ remotes <- filter pushable . knownRemotes
+ <$> getDaemonStatus dstatus
+ unless (null remotes) $
+ void $ alertWhile dstatus (pushAlert remotes) $
+ pushToRemotes thisThread now st (Just pushmap) remotes
+ else do
+ debug thisThread
+ [ "delaying push of"
+ , show (length commits)
+ , "commits"
+ ]
+ refillCommits commitchan commits
+ where
+ pushable r
+ | Remote.specialRemote r = False
+ | Remote.readonly r = False
+ | otherwise = True
+
+{- Decide if now is a good time to push to remotes.
+ -
+ - Current strategy: Immediately push all commits. The commit machinery
+ - already determines batches of changes, so we can't easily determine
+ - batches better.
+ -}
+shouldPush :: UTCTime -> [Commit] -> Bool
+shouldPush _now commits
+ | not (null commits) = True
+ | otherwise = False
diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
new file mode 100644
index 000000000..a7c2189d8
--- /dev/null
+++ b/Assistant/Threads/SanityChecker.hs
@@ -0,0 +1,98 @@
+{- git-annex assistant sanity checker
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.SanityChecker (
+ sanityCheckerThread
+) where
+
+import Assistant.Common
+import Assistant.DaemonStatus
+import Assistant.ThreadedMonad
+import Assistant.Changes
+import Assistant.Alert
+import Assistant.TransferQueue
+import qualified Git.LsFiles
+import Utility.ThreadScheduler
+import qualified Assistant.Threads.Watcher as Watcher
+
+import Data.Time.Clock.POSIX
+
+thisThread :: ThreadName
+thisThread = "SanityChecker"
+
+{- This thread wakes up occasionally to make sure the tree is in good shape. -}
+sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+sanityCheckerThread st dstatus transferqueue changechan = forever $ do
+ waitForNextCheck dstatus
+
+ debug thisThread ["starting sanity check"]
+
+ void $ alertWhile dstatus sanityCheckAlert go
+
+ debug thisThread ["sanity check complete"]
+ where
+ go = do
+ modifyDaemonStatus_ dstatus $ \s -> s
+ { sanityCheckRunning = True }
+
+ now <- getPOSIXTime -- before check started
+ r <- catchIO (check st dstatus transferqueue changechan)
+ $ \e -> do
+ runThreadState st $ warning $ show e
+ return False
+
+ modifyDaemonStatus_ dstatus $ \s -> s
+ { sanityCheckRunning = False
+ , lastSanityCheck = Just now
+ }
+
+ return r
+
+{- Only run one check per day, from the time of the last check. -}
+waitForNextCheck :: DaemonStatusHandle -> IO ()
+waitForNextCheck dstatus = do
+ v <- lastSanityCheck <$> getDaemonStatus dstatus
+ now <- getPOSIXTime
+ threadDelaySeconds $ Seconds $ calcdelay now v
+ where
+ calcdelay _ Nothing = oneDay
+ calcdelay now (Just lastcheck)
+ | lastcheck < now = max oneDay $
+ oneDay - truncate (now - lastcheck)
+ | otherwise = oneDay
+
+oneDay :: Int
+oneDay = 24 * 60 * 60
+
+{- It's important to stay out of the Annex monad as much as possible while
+ - running potentially expensive parts of this check, since remaining in it
+ - will block the watcher. -}
+check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO Bool
+check st dstatus transferqueue changechan = do
+ g <- runThreadState st $ fromRepo id
+ -- Find old unstaged symlinks, and add them to git.
+ unstaged <- Git.LsFiles.notInRepo False ["."] g
+ now <- getPOSIXTime
+ forM_ unstaged $ \file -> do
+ ms <- catchMaybeIO $ getSymbolicLinkStatus file
+ case ms of
+ Just s | toonew (statusChangeTime s) now -> noop
+ | isSymbolicLink s ->
+ addsymlink file ms
+ _ -> noop
+ return True
+ where
+ toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime)
+ slop = fromIntegral tenMinutes
+ insanity msg = do
+ runThreadState st $ warning msg
+ void $ addAlert dstatus $ sanityCheckFixAlert msg
+ addsymlink file s = do
+ Watcher.runHandler thisThread st dstatus
+ transferqueue changechan
+ Watcher.onAddSymlink file s
+ insanity $ "found unstaged symlink: " ++ file
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
new file mode 100644
index 000000000..87af3567e
--- /dev/null
+++ b/Assistant/Threads/TransferScanner.hs
@@ -0,0 +1,138 @@
+{- git-annex assistant thread to scan remotes to find needed transfers
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.TransferScanner where
+
+import Assistant.Common
+import Assistant.ScanRemotes
+import Assistant.TransferQueue
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Alert
+import Logs.Transfer
+import Logs.Location
+import Logs.Web (webUUID)
+import qualified Remote
+import qualified Types.Remote as Remote
+import Utility.ThreadScheduler
+import qualified Git.LsFiles as LsFiles
+import Command
+import Annex.Content
+
+import qualified Data.Set as S
+
+thisThread :: ThreadName
+thisThread = "TransferScanner"
+
+{- This thread waits until a remote needs to be scanned, to find transfers
+ - that need to be made, to keep data in sync.
+ -}
+transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO ()
+transferScannerThread st dstatus scanremotes transferqueue = do
+ startupScan
+ go S.empty
+ where
+ go scanned = do
+ threadDelaySeconds (Seconds 2)
+ (rs, infos) <- unzip <$> getScanRemote scanremotes
+ if any fullScan infos || any (`S.notMember` scanned) rs
+ then do
+ expensiveScan st dstatus transferqueue rs
+ go (S.union scanned (S.fromList rs))
+ else do
+ mapM_ (failedTransferScan st dstatus transferqueue) rs
+ go scanned
+ {- All available remotes are scanned in full on startup,
+ - for multiple reasons, including:
+ -
+ - * This may be the first run, and there may be remotes
+ - already in place, that need to be synced.
+ - * We may have run before, and scanned a remote, but
+ - only been in a subdirectory of the git remote, and so
+ - not synced it all.
+ - * We may have run before, and had transfers queued,
+ - and then the system (or us) crashed, and that info was
+ - lost.
+ -}
+ startupScan = addScanRemotes scanremotes True
+ =<< knownRemotes <$> getDaemonStatus dstatus
+
+{- This is a cheap scan for failed transfers involving a remote. -}
+failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
+failedTransferScan st dstatus transferqueue r = do
+ ts <- runThreadState st $
+ getFailedTransfers $ Remote.uuid r
+ go ts
+ where
+ go [] = noop
+ go ((t, info):ts)
+ | transferDirection t == Download = do
+ {- Check if the remote still has the key.
+ - If not, relies on the expensiveScan to
+ - get it queued from some other remote. -}
+ ifM (runThreadState st $ remoteHas r $ transferKey t)
+ ( requeue t info
+ , dequeue t
+ )
+ go ts
+ | otherwise = do
+ {- The Transferrer checks when uploading
+ - that the remote doesn't already have the
+ - key, so it's not redundantly checked
+ - here. -}
+ requeue t info
+ go ts
+
+ requeue t info = do
+ queueTransferWhenSmall
+ transferqueue dstatus (associatedFile info) t r
+ dequeue t
+ dequeue t = void $ runThreadState st $ inRepo $
+ liftIO . tryIO . removeFile . failedTransferFile t
+
+{- This is a expensive scan through the full git work tree, finding
+ - files to download from or upload to any of the remotes.
+ -
+ - The scan is blocked when the transfer queue gets too large. -}
+expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO ()
+expensiveScan st dstatus transferqueue rs = unless onlyweb $ do
+ liftIO $ debug thisThread ["starting scan of", show visiblers]
+ void $ alertWhile dstatus (scanAlert visiblers) $ do
+ g <- runThreadState st $ fromRepo id
+ files <- LsFiles.inRepo [] g
+ go files
+ return True
+ liftIO $ debug thisThread ["finished scan of", show visiblers]
+ where
+ onlyweb = all (== webUUID) $ map Remote.uuid rs
+ visiblers = let rs' = filter (not . Remote.readonly) rs
+ in if null rs' then rs else rs'
+ go [] = noop
+ go (f:fs) = do
+ mapM_ (enqueue f) =<< catMaybes <$> runThreadState st
+ (ifAnnexed f findtransfers $ return [])
+ go fs
+ enqueue f (r, t) = do
+ debug thisThread ["queuing", show t]
+ queueTransferWhenSmall transferqueue dstatus (Just f) t r
+ findtransfers (key, _) = do
+ locs <- loggedLocations key
+ let use a = return $ map (a key locs) rs
+ ifM (inAnnex key)
+ ( use $ check Upload False
+ , use $ check Download True
+ )
+ check direction want key locs r
+ | direction == Upload && Remote.readonly r = Nothing
+ | (Remote.uuid r `elem` locs) == want = Just $
+ (r, Transfer direction (Remote.uuid r) key)
+ | otherwise = Nothing
+
+remoteHas :: Remote -> Key -> Annex Bool
+remoteHas r key = elem
+ <$> pure (Remote.uuid r)
+ <*> loggedLocations key
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
new file mode 100644
index 000000000..fe8af9aad
--- /dev/null
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -0,0 +1,80 @@
+{- git-annex assistant transfer watching thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.TransferWatcher where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Logs.Transfer
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Remote
+
+thisThread :: ThreadName
+thisThread = "TransferWatcher"
+
+{- This thread watches for changes to the gitAnnexTransferDir,
+ - and updates the DaemonStatus's map of ongoing transfers. -}
+transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO ()
+transferWatcherThread st dstatus = do
+ g <- runThreadState st $ fromRepo id
+ let dir = gitAnnexTransferDir g
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler st dstatus a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , delHook = hook onDel
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+ debug thisThread ["watching for transfers"]
+
+type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: ThreadState -> DaemonStatusHandle -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler st dstatus file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ _ msg _ = error msg
+
+{- Called when a new transfer information file is written. -}
+onAdd :: Handler
+onAdd st dstatus file _ = case parseTransferFile file of
+ Nothing -> noop
+ Just t -> go t =<< runThreadState st (checkTransfer t)
+ where
+ go _ Nothing = noop -- transfer already finished
+ go t (Just info) = do
+ debug thisThread
+ [ "transfer starting:"
+ , show t
+ ]
+ r <- headMaybe . filter (sameuuid t) . knownRemotes
+ <$> getDaemonStatus dstatus
+ updateTransferInfo dstatus t info
+ { transferRemote = r }
+ sameuuid t r = Remote.uuid r == transferUUID t
+
+{- Called when a transfer information file is removed. -}
+onDel :: Handler
+onDel _ dstatus file _ = case parseTransferFile file of
+ Nothing -> noop
+ Just t -> do
+ debug thisThread
+ [ "transfer finishing:"
+ , show t
+ ]
+ void $ removeTransfer dstatus t
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
new file mode 100644
index 000000000..9a772d628
--- /dev/null
+++ b/Assistant/Threads/Transferrer.hs
@@ -0,0 +1,113 @@
+{- git-annex assistant data transferrer thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Transferrer where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Assistant.Alert
+import Logs.Transfer
+import Logs.Location
+import Annex.Content
+import qualified Remote
+import Types.Key
+import Locations.UserConfig
+
+import System.Process (create_group)
+
+thisThread :: ThreadName
+thisThread = "Transferrer"
+
+{- For now only one transfer is run at a time. -}
+maxTransfers :: Int
+maxTransfers = 1
+
+{- Dispatches transfers from the queue. -}
+transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO ()
+transfererThread st dstatus transferqueue slots = go =<< readProgramFile
+ where
+ go program = getNextTransfer transferqueue dstatus notrunning >>= handle program
+ handle program Nothing = go program
+ handle program (Just (t, info)) = do
+ ifM (runThreadState st $ shouldTransfer t info)
+ ( do
+ debug thisThread [ "Transferring:" , show t ]
+ notifyTransfer dstatus
+ transferThread dstatus slots t info inTransferSlot program
+ , do
+ debug thisThread [ "Skipping unnecessary transfer:" , show t ]
+ -- getNextTransfer added t to the
+ -- daemonstatus's transfer map.
+ void $ removeTransfer dstatus t
+ )
+ go program
+ {- Skip transfers that are already running. -}
+ notrunning i = startedTime i == Nothing
+
+{- Checks if the file to download is already present, or the remote
+ - being uploaded to isn't known to have the file. -}
+shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
+shouldTransfer t info
+ | transferDirection t == Download =
+ not <$> inAnnex key
+ | transferDirection t == Upload =
+ {- Trust the location log to check if the
+ - remote already has the key. This avoids
+ - a roundtrip to the remote. -}
+ case transferRemote info of
+ Nothing -> return False
+ Just remote ->
+ notElem (Remote.uuid remote)
+ <$> loggedLocations key
+ | otherwise = return False
+ where
+ key = transferKey t
+
+{- A sepeate git-annex process is forked off to run a transfer,
+ - running in its own process group. This allows killing it and all its
+ - children if the user decides to cancel the transfer.
+ -
+ - A thread is forked off to run the process, and the thread
+ - occupies one of the transfer slots. If all slots are in use, this will
+ - block until one becomes available. The thread's id is also recorded in
+ - the transfer info; the thread will also be killed when a transfer is
+ - stopped, to avoid it displaying any alert about the transfer having
+ - failed. -}
+transferThread :: DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> TransferSlotRunner -> FilePath -> IO ()
+transferThread dstatus slots t info runner program = case (transferRemote info, associatedFile info) of
+ (Nothing, _) -> noop
+ (_, Nothing) -> noop
+ (Just remote, Just file) -> do
+ tid <- runner slots $
+ transferprocess remote file
+ updateTransferInfo dstatus t $ info { transferTid = Just tid }
+ where
+ direction = transferDirection t
+ isdownload = direction == Download
+
+ transferprocess remote file = void $ do
+ (_, _, _, pid)
+ <- createProcess (proc program $ toCommand params)
+ { create_group = True }
+ ok <- (==) ExitSuccess <$> waitForProcess pid
+ addAlert dstatus $
+ makeAlertFiller ok $
+ transferFileAlert direction ok file
+ where
+ params =
+ [ Param "transferkey"
+ , Param $ key2file $ transferKey t
+ , Param $ if isdownload
+ then "--from"
+ else "--to"
+ , Param $ Remote.name remote
+ , Param "--file"
+ , File file
+ ]
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
new file mode 100644
index 000000000..8ba015b19
--- /dev/null
+++ b/Assistant/Threads/Watcher.hs
@@ -0,0 +1,251 @@
+{- git-annex assistant tree watcher
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Watcher (
+ watchThread,
+ checkCanWatch,
+ needLsof,
+ stageSymlink,
+ onAddSymlink,
+ runHandler,
+) where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Changes
+import Assistant.TransferQueue
+import Assistant.Alert
+import Logs.Transfer
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Annex
+import qualified Annex.Queue
+import qualified Git.Command
+import qualified Git.UpdateIndex
+import qualified Git.HashObject
+import qualified Git.LsFiles
+import qualified Backend
+import qualified Command.Add
+import Annex.Content
+import Annex.CatFile
+import Git.Types
+
+import Data.Bits.Utils
+import qualified Data.ByteString.Lazy as L
+
+thisThread :: ThreadName
+thisThread = "Watcher"
+
+checkCanWatch :: Annex ()
+checkCanWatch
+ | canWatch =
+ unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) $
+ needLsof
+ | otherwise = error "watch mode is not available on this system"
+
+needLsof :: Annex ()
+needLsof = error $ unlines
+ [ "The lsof command is needed for watch mode to be safe, and is not in PATH."
+ , "To override lsof checks to ensure that files are not open for writing"
+ , "when added to the annex, you can use --force"
+ , "Be warned: This can corrupt data in the annex, and make fsck complain."
+ ]
+
+watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+watchThread st dstatus transferqueue changechan = do
+ void $ watchDir "." ignored hooks startup
+ debug thisThread [ "watching", "."]
+ where
+ startup = startupScan st dstatus
+ hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a
+ hooks = WatchHooks
+ { addHook = hook onAdd
+ , delHook = hook onDel
+ , addSymlinkHook = hook onAddSymlink
+ , delDirHook = hook onDelDir
+ , errHook = hook onErr
+ }
+
+{- Initial scartup scan. The action should return once the scan is complete. -}
+startupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a
+startupScan st dstatus scanner = do
+ runThreadState st $ showAction "scanning"
+ r <- alertWhile' dstatus startupScanAlert $ do
+ r <- scanner
+
+ -- Notice any files that were deleted before
+ -- watching was started.
+ runThreadState st $ do
+ inRepo $ Git.Command.run "add" [Param "--update"]
+ showAction "started"
+
+ modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True }
+
+ return (True, r)
+
+ return r
+
+ignored :: FilePath -> Bool
+ignored = ig . takeFileName
+ where
+ ig ".git" = True
+ ig ".gitignore" = True
+ ig ".gitattributes" = True
+ ig _ = False
+
+type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change)
+
+{- Runs an action handler, inside the Annex monad, and if there was a
+ - change, adds it to the ChangeChan.
+ -
+ - Exceptions are ignored, otherwise a whole watcher thread could be crashed.
+ -}
+runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do
+ r <- tryIO go
+ case r of
+ Left e -> print e
+ Right Nothing -> noop
+ Right (Just change) -> recordChange changechan change
+ where
+ go = runThreadState st $ handler threadname file filestatus dstatus transferqueue
+
+{- During initial directory scan, this will be run for any regular files
+ - that are already checked into git. We don't want to turn those into
+ - symlinks, so do a check. This is rather expensive, but only happens
+ - during startup.
+ -
+ - It's possible for the file to still be open for write by some process.
+ - This can happen in a few ways; one is if two processes had the file open
+ - and only one has just closed it. We want to avoid adding a file to the
+ - annex that is open for write, to avoid anything being able to change it.
+ -
+ - We could run lsof on the file here to check for other writers.
+ - But, that's slow, and even if there is currently a writer, we will want
+ - to add the file *eventually*. Instead, the file is locked down as a hard
+ - link in a temp directory, with its write bits disabled, for later
+ - checking with lsof, and a Change is returned containing a KeySource
+ - using that hard link. The committer handles running lsof and finishing
+ - the add.
+ -}
+onAdd :: Handler
+onAdd threadname file filestatus dstatus _
+ | maybe False isRegularFile filestatus = do
+ ifM (scanComplete <$> liftIO (getDaemonStatus dstatus))
+ ( go
+ , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
+ ( noChange
+ , go
+ )
+ )
+ | otherwise = noChange
+ where
+ go = do
+ liftIO $ debug threadname ["file added", file]
+ pendingAddChange =<< Command.Add.lockDown file
+
+{- A symlink might be an arbitrary symlink, which is just added.
+ - Or, if it is a git-annex symlink, ensure it points to the content
+ - before adding it.
+ -}
+onAddSymlink :: Handler
+onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file
+ where
+ go (Just (key, _)) = do
+ link <- calcGitLink file key
+ ifM ((==) link <$> liftIO (readSymbolicLink file))
+ ( do
+ s <- liftIO $ getDaemonStatus dstatus
+ checkcontent key s
+ ensurestaged link s
+ , do
+ liftIO $ debug threadname ["fix symlink", file]
+ liftIO $ removeFile file
+ liftIO $ createSymbolicLink link file
+ addlink link
+ )
+ go Nothing = do -- other symlink
+ link <- liftIO (readSymbolicLink file)
+ ensurestaged link =<< liftIO (getDaemonStatus dstatus)
+
+ {- This is often called on symlinks that are already
+ - staged correctly. A symlink may have been deleted
+ - and being re-added, or added when the watcher was
+ - not running. So they're normally restaged to make sure.
+ -
+ - As an optimisation, during the status scan, avoid
+ - restaging everything. Only links that were created since
+ - the last time the daemon was running are staged.
+ - (If the daemon has never ran before, avoid staging
+ - links too.)
+ -}
+ ensurestaged link daemonstatus
+ | scanComplete daemonstatus = addlink link
+ | otherwise = case filestatus of
+ Just s
+ | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange
+ _ -> addlink link
+
+ {- For speed, tries to reuse the existing blob for
+ - the symlink target. -}
+ addlink link = do
+ liftIO $ debug threadname ["add symlink", file]
+ v <- catObjectDetails $ Ref $ ':':file
+ case v of
+ Just (currlink, sha)
+ | s2w8 link == L.unpack currlink ->
+ stageSymlink file sha
+ _ -> do
+ sha <- inRepo $
+ Git.HashObject.hashObject BlobObject link
+ stageSymlink file sha
+ madeChange file LinkChange
+
+ {- When a new link appears, after the startup scan,
+ - try to get the key's content. -}
+ checkcontent key daemonstatus
+ | scanComplete daemonstatus = unlessM (inAnnex key) $
+ queueTransfers Next transferqueue dstatus
+ key (Just file) Download
+ | otherwise = noop
+
+onDel :: Handler
+onDel threadname file _ _dstatus _ = do
+ liftIO $ debug threadname ["file deleted", file]
+ Annex.Queue.addUpdateIndex =<<
+ inRepo (Git.UpdateIndex.unstageFile file)
+ madeChange file RmChange
+
+{- A directory has been deleted, or moved, so tell git to remove anything
+ - that was inside it from its cache. Since it could reappear at any time,
+ - use --cached to only delete it from the index.
+ -
+ - Note: This could use unstageFile, but would need to run another git
+ - command to get the recursive list of files in the directory, so rm is
+ - just as good. -}
+onDelDir :: Handler
+onDelDir threadname dir _ _dstatus _ = do
+ liftIO $ debug threadname ["directory deleted", dir]
+ Annex.Queue.addCommand "rm"
+ [Params "--quiet -r --cached --ignore-unmatch --"] [dir]
+ madeChange dir RmDirChange
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ msg _ _dstatus _ = do
+ warning msg
+ return Nothing
+
+{- Adds a symlink to the index, without ever accessing the actual symlink
+ - on disk. This avoids a race if git add is used, where the symlink is
+ - changed to something else immediately after creation.
+ -}
+stageSymlink :: FilePath -> Sha -> Annex ()
+stageSymlink file sha =
+ Annex.Queue.addUpdateIndex =<<
+ inRepo (Git.UpdateIndex.stageSymlink file sha)
diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs
new file mode 100644
index 000000000..e203d50ba
--- /dev/null
+++ b/Assistant/Threads/WebApp.hs
@@ -0,0 +1,112 @@
+{- git-annex assistant webapp thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE TypeFamilies, QuasiQuotes, MultiParamTypeClasses, TemplateHaskell, OverloadedStrings, RankNTypes #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+
+module Assistant.Threads.WebApp where
+
+import Assistant.Common
+import Assistant.WebApp
+import Assistant.WebApp.DashBoard
+import Assistant.WebApp.SideBar
+import Assistant.WebApp.Notifications
+import Assistant.WebApp.Configurators
+import Assistant.WebApp.Documentation
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Utility.WebApp
+import Utility.FileMode
+import Utility.TempFile
+import Git
+
+import Yesod
+import Yesod.Static
+import Network.Socket (PortNumber)
+import Data.Text (pack, unpack)
+
+thisThread :: String
+thisThread = "WebApp"
+
+mkYesodDispatch "WebApp" $(parseRoutesFile "Assistant/WebApp/routes")
+
+type Url = String
+
+webAppThread
+ :: (Maybe ThreadState)
+ -> DaemonStatusHandle
+ -> ScanRemoteMap
+ -> TransferQueue
+ -> TransferSlots
+ -> Maybe (IO String)
+ -> Maybe (Url -> FilePath -> IO ())
+ -> IO ()
+webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun onstartup = do
+ webapp <- WebApp
+ <$> pure mst
+ <*> pure dstatus
+ <*> pure scanremotes
+ <*> pure transferqueue
+ <*> pure transferslots
+ <*> (pack <$> genRandomToken)
+ <*> getreldir mst
+ <*> pure $(embed "static")
+ <*> newWebAppState
+ <*> pure postfirstrun
+ app <- toWaiAppPlain webapp
+ app' <- ifM debugEnabled
+ ( return $ httpDebugLogger app
+ , return app
+ )
+ runWebApp app' $ \port -> case mst of
+ Nothing -> withTempFile "webapp.html" $ \tmpfile _ -> go port webapp tmpfile
+ Just st -> go port webapp =<< runThreadState st (fromRepo gitAnnexHtmlShim)
+ where
+ getreldir Nothing = return Nothing
+ getreldir (Just st) = Just <$>
+ (relHome =<< absPath
+ =<< runThreadState st (fromRepo repoPath))
+ go port webapp htmlshim = do
+ writeHtmlShim webapp port htmlshim
+ maybe noop (\a -> a (myUrl webapp port "/") htmlshim) onstartup
+
+{- Creates a html shim file that's used to redirect into the webapp,
+ - to avoid exposing the secretToken when launching the web browser. -}
+writeHtmlShim :: WebApp -> PortNumber -> FilePath -> IO ()
+writeHtmlShim webapp port file = do
+ debug thisThread ["running on port", show port]
+ viaTmp go file $ genHtmlShim webapp port
+ where
+ go tmpfile content = do
+ h <- openFile tmpfile WriteMode
+ modifyFileMode tmpfile $ removeModes [groupReadMode, otherReadMode]
+ hPutStr h content
+ hClose h
+
+{- TODO: generate this static file using Yesod. -}
+genHtmlShim :: WebApp -> PortNumber -> String
+genHtmlShim webapp port = unlines
+ [ "<html>"
+ , "<head>"
+ , "<title>Starting webapp...</title>"
+ , "<meta http-equiv=\"refresh\" content=\"0; URL="++url++"\">"
+ , "<body>"
+ , "<p>"
+ , "<a href=\"" ++ url ++ "\">Starting webapp...</a>"
+ , "</p>"
+ , "</body>"
+ , "</html>"
+ ]
+ where
+ url = myUrl webapp port "/"
+
+myUrl :: WebApp -> PortNumber -> FilePath -> Url
+myUrl webapp port page = "http://localhost:" ++ show port ++ page ++
+ "?auth=" ++ unpack (secretToken webapp)