summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Alert.hs286
-rw-r--r--Assistant/Changes.hs27
-rw-r--r--Assistant/Commits.hs34
-rw-r--r--Assistant/Common.hs21
-rw-r--r--Assistant/DaemonStatus.hs199
-rw-r--r--Assistant/Pushes.hs46
-rw-r--r--Assistant/SanityChecker.hs81
-rw-r--r--Assistant/ScanRemotes.hs48
-rw-r--r--Assistant/Sync.hs105
-rw-r--r--Assistant/ThreadedMonad.hs18
-rw-r--r--Assistant/Threads/Committer.hs (renamed from Assistant/Committer.hs)84
-rw-r--r--Assistant/Threads/Merger.hs80
-rw-r--r--Assistant/Threads/MountWatcher.hs189
-rw-r--r--Assistant/Threads/NetWatcher.hs132
-rw-r--r--Assistant/Threads/Pusher.hs82
-rw-r--r--Assistant/Threads/SanityChecker.hs98
-rw-r--r--Assistant/Threads/TransferScanner.hs138
-rw-r--r--Assistant/Threads/TransferWatcher.hs80
-rw-r--r--Assistant/Threads/Transferrer.hs113
-rw-r--r--Assistant/Threads/Watcher.hs (renamed from Assistant/Watcher.hs)105
-rw-r--r--Assistant/Threads/WebApp.hs112
-rw-r--r--Assistant/TransferQueue.hs154
-rw-r--r--Assistant/TransferSlots.hs71
-rw-r--r--Assistant/WebApp.hs177
-rw-r--r--Assistant/WebApp/Configurators.hs363
-rw-r--r--Assistant/WebApp/DashBoard.hs219
-rw-r--r--Assistant/WebApp/Documentation.hs22
-rw-r--r--Assistant/WebApp/Notifications.hs58
-rw-r--r--Assistant/WebApp/SideBar.hs84
-rw-r--r--Assistant/WebApp/routes22
30 files changed, 3055 insertions, 193 deletions
diff --git a/Assistant/Alert.hs b/Assistant/Alert.hs
new file mode 100644
index 000000000..4c4906ef5
--- /dev/null
+++ b/Assistant/Alert.hs
@@ -0,0 +1,286 @@
+{- git-annex assistant alerts
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE RankNTypes, BangPatterns, OverloadedStrings #-}
+
+module Assistant.Alert where
+
+import Common.Annex
+import qualified Remote
+import Utility.Tense
+import Logs.Transfer
+
+import qualified Data.Text as T
+import qualified Data.Map as M
+import Data.String
+
+{- Different classes of alerts are displayed differently. -}
+data AlertClass = Success | Message | Activity | Warning | Error
+ deriving (Eq, Ord)
+
+data AlertPriority = Filler | Low | Medium | High | Pinned
+ deriving (Eq, Ord)
+
+{- An alert can have an name, which is used to combine it with other similar
+ - alerts. -}
+data AlertName = FileAlert TenseChunk | DownloadFailedAlert | SanityCheckFixAlert
+ deriving (Eq)
+
+{- The first alert is the new alert, the second is an old alert.
+ - Should return a modified version of the old alert. -}
+type AlertCombiner = Alert -> Alert -> Maybe Alert
+
+data Alert = Alert
+ { alertClass :: AlertClass
+ , alertHeader :: Maybe TenseText
+ , alertMessageRender :: [TenseChunk] -> TenseText
+ , alertData :: [TenseChunk]
+ , alertBlockDisplay :: Bool
+ , alertClosable :: Bool
+ , alertPriority :: AlertPriority
+ , alertIcon :: Maybe String
+ , alertCombiner :: Maybe AlertCombiner
+ , alertName :: Maybe AlertName
+ }
+
+type AlertPair = (AlertId, Alert)
+
+type AlertMap = M.Map AlertId Alert
+
+{- Higher AlertId indicates a more recent alert. -}
+newtype AlertId = AlertId Integer
+ deriving (Read, Show, Eq, Ord)
+
+firstAlertId :: AlertId
+firstAlertId = AlertId 0
+
+nextAlertId :: AlertId -> AlertId
+nextAlertId (AlertId i) = AlertId $ succ i
+
+{- This is as many alerts as it makes sense to display at a time.
+ - A display might be smaller, or larger, the point is to not overwhelm the
+ - user with a ton of alerts. -}
+displayAlerts :: Int
+displayAlerts = 6
+
+{- This is not a hard maximum, but there's no point in keeping a great
+ - many filler alerts in an AlertMap, so when there's more than this many,
+ - they start being pruned, down toward displayAlerts. -}
+maxAlerts :: Int
+maxAlerts = displayAlerts * 2
+
+{- The desired order is the reverse of:
+ -
+ - - Pinned alerts
+ - - High priority alerts, newest first
+ - - Medium priority Activity, newest first (mostly used for Activity)
+ - - Low priority alerts, newest first
+ - - Filler priorty alerts, newest first
+ - - Ties are broken by the AlertClass, with Errors etc coming first.
+ -}
+compareAlertPairs :: AlertPair -> AlertPair -> Ordering
+compareAlertPairs
+ (aid, Alert { alertClass = aclass, alertPriority = aprio })
+ (bid, Alert { alertClass = bclass, alertPriority = bprio })
+ = compare aprio bprio
+ `thenOrd` compare aid bid
+ `thenOrd` compare aclass bclass
+
+sortAlertPairs :: [AlertPair] -> [AlertPair]
+sortAlertPairs = sortBy compareAlertPairs
+
+{- Renders an alert's header for display, if it has one. -}
+renderAlertHeader :: Alert -> Maybe T.Text
+renderAlertHeader alert = renderTense (alertTense alert) <$> alertHeader alert
+
+{- Renders an alert's message for display. -}
+renderAlertMessage :: Alert -> T.Text
+renderAlertMessage alert = renderTense (alertTense alert) $
+ (alertMessageRender alert) (alertData alert)
+
+alertTense :: Alert -> Tense
+alertTense alert
+ | alertClass alert == Activity = Present
+ | otherwise = Past
+
+{- Checks if two alerts display the same. -}
+effectivelySameAlert :: Alert -> Alert -> Bool
+effectivelySameAlert x y = all id
+ [ alertClass x == alertClass y
+ , alertHeader x == alertHeader y
+ , alertData x == alertData y
+ , alertBlockDisplay x == alertBlockDisplay y
+ , alertClosable x == alertClosable y
+ , alertPriority x == alertPriority y
+ ]
+
+makeAlertFiller :: Bool -> Alert -> Alert
+makeAlertFiller success alert
+ | isFiller alert = alert
+ | otherwise = alert
+ { alertClass = if c == Activity then c' else c
+ , alertPriority = Filler
+ , alertClosable = True
+ , alertIcon = Just $ if success then "ok" else "exclamation-sign"
+ }
+ where
+ c = alertClass alert
+ c'
+ | success = Success
+ | otherwise = Error
+
+isFiller :: Alert -> Bool
+isFiller alert = alertPriority alert == Filler
+
+{- Updates the Alertmap, adding or updating an alert.
+ -
+ - Any old filler that looks the same as the alert is removed.
+ -
+ - Or, if the alert has an alertCombiner that combines it with
+ - an old alert, the old alert is replaced with the result, and the
+ - alert is removed.
+ -
+ - Old filler alerts are pruned once maxAlerts is reached.
+ -}
+mergeAlert :: AlertId -> Alert -> AlertMap -> AlertMap
+mergeAlert i al m = maybe updatePrune updateCombine (alertCombiner al)
+ where
+ pruneSame k al' = k == i || not (effectivelySameAlert al al')
+ pruneBloat m'
+ | bloat > 0 = M.fromList $ pruneold $ M.toList m'
+ | otherwise = m'
+ where
+ bloat = M.size m' - maxAlerts
+ pruneold l =
+ let (f, rest) = partition (\(_, a) -> isFiller a) l
+ in drop bloat f ++ rest
+ updatePrune = pruneBloat $ M.filterWithKey pruneSame $
+ M.insertWith' const i al m
+ updateCombine combiner =
+ let combined = M.mapMaybe (combiner al) m
+ in if M.null combined
+ then updatePrune
+ else M.delete i $ M.union combined m
+
+baseActivityAlert :: Alert
+baseActivityAlert = Alert
+ { alertClass = Activity
+ , alertHeader = Nothing
+ , alertMessageRender = tenseWords
+ , alertData = []
+ , alertBlockDisplay = False
+ , alertClosable = False
+ , alertPriority = Medium
+ , alertIcon = Just "refresh"
+ , alertCombiner = Nothing
+ , alertName = Nothing
+ }
+
+activityAlert :: Maybe TenseText -> [TenseChunk] -> Alert
+activityAlert header dat = baseActivityAlert
+ { alertHeader = header
+ , alertData = dat
+ }
+
+startupScanAlert :: Alert
+startupScanAlert = activityAlert Nothing $
+ [Tensed "Performing" "Performed", "startup scan"]
+
+commitAlert :: Alert
+commitAlert = activityAlert Nothing $
+ [Tensed "Committing" "Committed", "changes to git"]
+
+showRemotes :: [Remote] -> TenseChunk
+showRemotes = UnTensed . T.unwords . map (T.pack . Remote.name)
+
+pushAlert :: [Remote] -> Alert
+pushAlert rs = activityAlert Nothing $
+ [Tensed "Syncing" "Synced", "with", showRemotes rs]
+
+pushRetryAlert :: [Remote] -> Alert
+pushRetryAlert rs = activityAlert
+ (Just $ tenseWords [Tensed "Retrying" "Retried", "sync"])
+ (["with", showRemotes rs])
+
+syncAlert :: [Remote] -> Alert
+syncAlert rs = baseActivityAlert
+ { alertHeader = Just $ tenseWords
+ [Tensed "Syncing" "Synced", "with", showRemotes rs]
+ , alertData = []
+ , alertPriority = Low
+ }
+
+scanAlert :: [Remote] -> Alert
+scanAlert rs = baseActivityAlert
+ { alertHeader = Just $ tenseWords
+ [Tensed "Scanning" "Scanned", showRemotes rs]
+ , alertBlockDisplay = True
+ , alertPriority = Low
+ }
+
+sanityCheckAlert :: Alert
+sanityCheckAlert = activityAlert
+ (Just $ tenseWords [Tensed "Running" "Ran", "daily sanity check"])
+ ["to make sure everything is ok."]
+
+sanityCheckFixAlert :: String -> Alert
+sanityCheckFixAlert msg = Alert
+ { alertClass = Warning
+ , alertHeader = Just $ tenseWords ["Fixed a problem"]
+ , alertMessageRender = render
+ , alertData = [UnTensed $ T.pack msg]
+ , alertBlockDisplay = True
+ , alertPriority = High
+ , alertClosable = True
+ , alertIcon = Just "exclamation-sign"
+ , alertName = Just SanityCheckFixAlert
+ , alertCombiner = Just $ dataCombiner (++)
+ }
+ where
+ render dta = tenseWords $ alerthead : dta ++ [alertfoot]
+ alerthead = "The daily sanity check found and fixed a problem:"
+ alertfoot = "If these problems persist, consider filing a bug report."
+
+fileAlert :: TenseChunk -> FilePath -> Alert
+fileAlert msg file = (activityAlert Nothing [f])
+ { alertName = Just $ FileAlert msg
+ , alertMessageRender = render
+ , alertCombiner = Just $ dataCombiner combiner
+ }
+ where
+ f = fromString $ shortFile $ takeFileName file
+ render fs = tenseWords $ msg : fs
+ combiner new old = take 10 $ new ++ old
+
+addFileAlert :: FilePath -> Alert
+addFileAlert = fileAlert (Tensed "Adding" "Added")
+
+{- This is only used as a success alert after a transfer, not during it. -}
+transferFileAlert :: Direction -> Bool -> FilePath -> Alert
+transferFileAlert direction True
+ | direction == Upload = fileAlert "Uploaded"
+ | otherwise = fileAlert "Downloaded"
+transferFileAlert direction False
+ | direction == Upload = fileAlert "Upload failed"
+ | otherwise = fileAlert "Download failed"
+
+dataCombiner :: ([TenseChunk] -> [TenseChunk] -> [TenseChunk]) -> AlertCombiner
+dataCombiner combiner new old
+ | alertClass new /= alertClass old = Nothing
+ | alertName new == alertName old =
+ Just $! old { alertData = alertData new `combiner` alertData old }
+ | otherwise = Nothing
+
+shortFile :: FilePath -> String
+shortFile f
+ | len < maxlen = f
+ | otherwise = take half f ++ ".." ++ drop (len - half) f
+ where
+ len = length f
+ maxlen = 20
+ half = (maxlen - 2) `div` 2
+
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index 173ba1922..eca922109 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -1,6 +1,8 @@
{- git-annex assistant change tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Changes where
@@ -8,14 +10,14 @@ module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
import Types.KeySource
+import Utility.TSet
-import Control.Concurrent.STM
import Data.Time.Clock
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
-type ChangeChan = TChan Change
+type ChangeChan = TSet Change
data Change
= Change
@@ -29,11 +31,8 @@ data Change
}
deriving (Show)
-runChangeChan :: STM a -> IO a
-runChangeChan = atomically
-
newChangeChan :: IO ChangeChan
-newChangeChan = atomically newTChan
+newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
@@ -65,17 +64,13 @@ finishedChange c = c
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
-getChanges chan = runChangeChan $ do
- c <- readTChan chan
- go [c]
- where
- go l = do
- v <- tryReadTChan chan
- case v of
- Nothing -> return l
- Just c -> go (c:l)
+getChanges = getTSet
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
-refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
+refillChanges = putTSet
+
+{- Records a change in the channel. -}
+recordChange :: ChangeChan -> Change -> IO ()
+recordChange = putTSet1
diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs
new file mode 100644
index 000000000..86fd7599f
--- /dev/null
+++ b/Assistant/Commits.hs
@@ -0,0 +1,34 @@
+{- git-annex assistant commit tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Commits where
+
+import Utility.TSet
+
+import Data.Time.Clock
+
+type CommitChan = TSet Commit
+
+data Commit = Commit UTCTime
+ deriving (Show)
+
+newCommitChan :: IO CommitChan
+newCommitChan = newTSet
+
+{- Gets all unhandled commits.
+ - Blocks until at least one commit is made. -}
+getCommits :: CommitChan -> IO [Commit]
+getCommits = getTSet
+
+{- Puts unhandled commits back into the channel.
+ - Note: Original order is not preserved. -}
+refillCommits :: CommitChan -> [Commit] -> IO ()
+refillCommits = putTSet
+
+{- Records a commit in the channel. -}
+recordCommit :: CommitChan -> Commit -> IO ()
+recordCommit = putTSet1
diff --git a/Assistant/Common.hs b/Assistant/Common.hs
new file mode 100644
index 000000000..c1a346e75
--- /dev/null
+++ b/Assistant/Common.hs
@@ -0,0 +1,21 @@
+{- Common infrastructure for the git-annex assistant threads.
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Common (
+ module X,
+ ThreadName,
+ debug
+) where
+
+import Common.Annex as X
+
+import System.Log.Logger
+
+type ThreadName = String
+
+debug :: ThreadName -> [String] -> IO ()
+debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index e5ba3d151..8e3b48777 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -1,20 +1,28 @@
{- git-annex assistant daemon status
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.DaemonStatus where
import Common.Annex
import Assistant.ThreadedMonad
+import Assistant.Alert
import Utility.ThreadScheduler
import Utility.TempFile
+import Utility.NotificationBroadcaster
+import Logs.Transfer
+import Logs.Trust
+import qualified Remote
-import Control.Concurrent
+import Control.Concurrent.STM
import System.Posix.Types
import Data.Time.Clock.POSIX
import Data.Time
import System.Locale
+import qualified Data.Map as M
data DaemonStatus = DaemonStatus
-- False when the daemon is performing its startup scan
@@ -25,47 +33,103 @@ data DaemonStatus = DaemonStatus
, sanityCheckRunning :: Bool
-- Last time the sanity checker ran
, lastSanityCheck :: Maybe POSIXTime
+ -- Currently running file content transfers
+ , currentTransfers :: TransferMap
+ -- Messages to display to the user.
+ , alertMap :: AlertMap
+ , lastAlertId :: AlertId
+ -- Ordered list of remotes to talk to.
+ , knownRemotes :: [Remote]
+ -- Broadcasts notifications about all changes to the DaemonStatus
+ , changeNotifier :: NotificationBroadcaster
+ -- Broadcasts notifications when queued or current transfers change.
+ , transferNotifier :: NotificationBroadcaster
+ -- Broadcasts notifications when there's a change to the alerts
+ , alertNotifier :: NotificationBroadcaster
}
- deriving (Show)
-type DaemonStatusHandle = MVar DaemonStatus
+type TransferMap = M.Map Transfer TransferInfo
+
+{- This TMVar is never left empty, so accessing it will never block. -}
+type DaemonStatusHandle = TMVar DaemonStatus
-newDaemonStatus :: DaemonStatus
+newDaemonStatus :: IO DaemonStatus
newDaemonStatus = DaemonStatus
- { scanComplete = False
- , lastRunning = Nothing
- , sanityCheckRunning = False
- , lastSanityCheck = Nothing
- }
+ <$> pure False
+ <*> pure Nothing
+ <*> pure False
+ <*> pure Nothing
+ <*> pure M.empty
+ <*> pure M.empty
+ <*> pure firstAlertId
+ <*> pure []
+ <*> newNotificationBroadcaster
+ <*> newNotificationBroadcaster
+ <*> newNotificationBroadcaster
+
+getDaemonStatus :: DaemonStatusHandle -> IO DaemonStatus
+getDaemonStatus = atomically . readTMVar
+
+modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> IO ()
+modifyDaemonStatus_ dstatus a = modifyDaemonStatus dstatus $ \s -> (a s, ())
+
+modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> IO b
+modifyDaemonStatus dstatus a = do
+ (s, b) <- atomically $ do
+ r@(s, _) <- a <$> takeTMVar dstatus
+ putTMVar dstatus s
+ return r
+ sendNotification $ changeNotifier s
+ return b
-getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
-getDaemonStatus = liftIO . readMVar
+{- Remotes ordered by cost, with dead ones thrown out. -}
+calcKnownRemotes :: Annex [Remote]
+calcKnownRemotes = do
+ rs <- concat . Remote.byCost <$> Remote.enabledRemoteList
+ alive <- snd <$> trustPartition DeadTrusted (map Remote.uuid rs)
+ let good r = Remote.uuid r `elem` alive
+ return $ filter good rs
-modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
-modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a)
+{- Updates the cached ordered list of remotes from the list in Annex
+ - state. -}
+updateKnownRemotes :: DaemonStatusHandle -> Annex ()
+updateKnownRemotes dstatus = do
+ remotes <- calcKnownRemotes
+ liftIO $ modifyDaemonStatus_ dstatus $
+ \s -> s { knownRemotes = remotes }
-{- Load any previous daemon status file, and store it in the MVar for this
- - process to use as its DaemonStatus. -}
+{- Load any previous daemon status file, and store it in a MVar for this
+ - process to use as its DaemonStatus. Also gets current transfer status. -}
startDaemonStatus :: Annex DaemonStatusHandle
startDaemonStatus = do
file <- fromRepo gitAnnexDaemonStatusFile
status <- liftIO $
- catchDefaultIO (readDaemonStatusFile file) newDaemonStatus
- liftIO $ newMVar status
+ catchDefaultIO (readDaemonStatusFile file) =<< newDaemonStatus
+ transfers <- M.fromList <$> getTransfers
+ remotes <- calcKnownRemotes
+ liftIO $ atomically $ newTMVar status
{ scanComplete = False
, sanityCheckRunning = False
+ , currentTransfers = transfers
+ , knownRemotes = remotes
}
-{- This thread wakes up periodically and writes the daemon status to disk. -}
+{- This writes the daemon status to disk, when it changes, but no more
+ - frequently than once every ten minutes.
+ -}
daemonStatusThread :: ThreadState -> DaemonStatusHandle -> IO ()
-daemonStatusThread st handle = do
+daemonStatusThread st dstatus = do
+ notifier <- newNotificationHandle
+ =<< changeNotifier <$> getDaemonStatus dstatus
checkpoint
- runEvery (Seconds tenMinutes) checkpoint
+ runEvery (Seconds tenMinutes) $ do
+ waitNotification notifier
+ checkpoint
where
- checkpoint = runThreadState st $ do
- file <- fromRepo gitAnnexDaemonStatusFile
- status <- getDaemonStatus handle
- liftIO $ writeDaemonStatusFile file status
+ checkpoint = do
+ status <- getDaemonStatus dstatus
+ file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile
+ writeDaemonStatusFile file status
{- Don't just dump out the structure, because it will change over time,
- and parts of it are not relevant. -}
@@ -81,9 +145,9 @@ writeDaemonStatusFile file status =
]
readDaemonStatusFile :: FilePath -> IO DaemonStatus
-readDaemonStatusFile file = parse <$> readFile file
+readDaemonStatusFile file = parse <$> newDaemonStatus <*> readFile file
where
- parse = foldr parseline newDaemonStatus . lines
+ parse status = foldr parseline status . lines
parseline line status
| key == "lastRunning" = parseval readtime $ \v ->
status { lastRunning = Just v }
@@ -117,3 +181,86 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
tenMinutes :: Int
tenMinutes = 10 * 60
+
+{- Mutates the transfer map. Runs in STM so that the transfer map can
+ - be modified in the same transaction that modifies the transfer queue.
+ - Note that this does not send a notification of the change; that's left
+ - to the caller. -}
+adjustTransfersSTM :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> STM ()
+adjustTransfersSTM dstatus a = do
+ s <- takeTMVar dstatus
+ putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) }
+
+{- Updates a transfer's info.
+ - Preserves the transferTid and transferPaused values,
+ - which are not written to disk. -}
+updateTransferInfo :: DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
+updateTransferInfo dstatus t info =
+ notifyTransfer dstatus `after` modifyDaemonStatus_ dstatus go
+ where
+ go s = s { currentTransfers = update (currentTransfers s) }
+ update m = M.insertWith' merge t info m
+ merge new old = new
+ { transferTid = maybe (transferTid new) Just (transferTid old)
+ , transferPaused = transferPaused new || transferPaused old
+ }
+
+{- Removes a transfer from the map, and returns its info. -}
+removeTransfer :: DaemonStatusHandle -> Transfer -> IO (Maybe TransferInfo)
+removeTransfer dstatus t =
+ notifyTransfer dstatus `after` modifyDaemonStatus dstatus go
+ where
+ go s =
+ let (info, ts) = M.updateLookupWithKey
+ (\_k _v -> Nothing)
+ t (currentTransfers s)
+ in (s { currentTransfers = ts }, info)
+
+{- Send a notification when a transfer is changed. -}
+notifyTransfer :: DaemonStatusHandle -> IO ()
+notifyTransfer dstatus = sendNotification
+ =<< transferNotifier <$> atomically (readTMVar dstatus)
+
+{- Send a notification when alerts are changed. -}
+notifyAlert :: DaemonStatusHandle -> IO ()
+notifyAlert dstatus = sendNotification
+ =<< alertNotifier <$> atomically (readTMVar dstatus)
+
+{- Returns the alert's identifier, which can be used to remove it. -}
+addAlert :: DaemonStatusHandle -> Alert -> IO AlertId
+addAlert dstatus alert = notifyAlert dstatus `after` modifyDaemonStatus dstatus go
+ where
+ go s = (s { lastAlertId = i, alertMap = m }, i)
+ where
+ i = nextAlertId $ lastAlertId s
+ m = mergeAlert i alert (alertMap s)
+
+removeAlert :: DaemonStatusHandle -> AlertId -> IO ()
+removeAlert dstatus i = updateAlert dstatus i (const Nothing)
+
+updateAlert :: DaemonStatusHandle -> AlertId -> (Alert -> Maybe Alert) -> IO ()
+updateAlert dstatus i a = updateAlertMap dstatus $ \m -> M.update a i m
+
+updateAlertMap :: DaemonStatusHandle -> (AlertMap -> AlertMap) -> IO ()
+updateAlertMap dstatus a = notifyAlert dstatus `after` modifyDaemonStatus_ dstatus go
+ where
+ go s = s { alertMap = a (alertMap s) }
+
+{- Displays an alert while performing an activity that returns True on
+ - success.
+ -
+ - The alert is left visible afterwards, as filler.
+ - Old filler is pruned, to prevent the map growing too large. -}
+alertWhile :: DaemonStatusHandle -> Alert -> IO Bool -> IO Bool
+alertWhile dstatus alert a = alertWhile' dstatus alert $ do
+ r <- a
+ return (r, r)
+
+{- Like alertWhile, but allows the activity to return a value too. -}
+alertWhile' :: DaemonStatusHandle -> Alert -> IO (Bool, a) -> IO a
+alertWhile' dstatus alert a = do
+ let alert' = alert { alertClass = Activity }
+ i <- addAlert dstatus alert'
+ (ok, r) <- a
+ updateAlertMap dstatus $ mergeAlert i $ makeAlertFiller ok alert'
+ return r
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
new file mode 100644
index 000000000..f411dda07
--- /dev/null
+++ b/Assistant/Pushes.hs
@@ -0,0 +1,46 @@
+{- git-annex assistant push tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Pushes where
+
+import Common.Annex
+
+import Control.Concurrent.STM
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- Track the most recent push failure for each remote. -}
+type PushMap = M.Map Remote UTCTime
+type FailedPushMap = TMVar PushMap
+
+{- The TMVar starts empty, and is left empty when there are no
+ - failed pushes. This way we can block until there are some failed pushes.
+ -}
+newFailedPushMap :: IO FailedPushMap
+newFailedPushMap = atomically newEmptyTMVar
+
+{- Blocks until there are failed pushes.
+ - Returns Remotes whose pushes failed a given time duration or more ago.
+ - (This may be an empty list.) -}
+getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote]
+getFailedPushesBefore v duration = do
+ m <- atomically $ readTMVar v
+ now <- getCurrentTime
+ return $ M.keys $ M.filter (not . toorecent now) m
+ where
+ toorecent now time = now `diffUTCTime` time < duration
+
+{- Modifies the map. -}
+changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO ()
+changeFailedPushMap v a = atomically $
+ store . a . fromMaybe M.empty =<< tryTakeTMVar v
+ where
+ {- tryTakeTMVar empties the TMVar; refill it only if
+ - the modified map is not itself empty -}
+ store m
+ | m == M.empty = noop
+ | otherwise = putTMVar v $! m
diff --git a/Assistant/SanityChecker.hs b/Assistant/SanityChecker.hs
deleted file mode 100644
index e2ca9da74..000000000
--- a/Assistant/SanityChecker.hs
+++ /dev/null
@@ -1,81 +0,0 @@
-{- git-annex assistant sanity checker
- -
- - Copyright 2012 Joey Hess <joey@kitenet.net>
- -}
-
-module Assistant.SanityChecker (
- sanityCheckerThread
-) where
-
-import Common.Annex
-import qualified Git.LsFiles
-import Assistant.DaemonStatus
-import Assistant.ThreadedMonad
-import Assistant.Changes
-import Utility.ThreadScheduler
-import qualified Assistant.Watcher
-
-import Data.Time.Clock.POSIX
-
-{- This thread wakes up occasionally to make sure the tree is in good shape. -}
-sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-sanityCheckerThread st status changechan = forever $ do
- waitForNextCheck st status
-
- runThreadState st $
- modifyDaemonStatus status $ \s -> s
- { sanityCheckRunning = True }
-
- now <- getPOSIXTime -- before check started
- catchIO (check st status changechan)
- (runThreadState st . warning . show)
-
- runThreadState st $ do
- modifyDaemonStatus status $ \s -> s
- { sanityCheckRunning = False
- , lastSanityCheck = Just now
- }
-
-{- Only run one check per day, from the time of the last check. -}
-waitForNextCheck :: ThreadState -> DaemonStatusHandle -> IO ()
-waitForNextCheck st status = do
- v <- runThreadState st $
- lastSanityCheck <$> getDaemonStatus status
- now <- getPOSIXTime
- threadDelaySeconds $ Seconds $ calcdelay now v
- where
- calcdelay _ Nothing = oneDay
- calcdelay now (Just lastcheck)
- | lastcheck < now = max oneDay $
- oneDay - truncate (now - lastcheck)
- | otherwise = oneDay
-
-oneDay :: Int
-oneDay = 24 * 60 * 60
-
-{- It's important to stay out of the Annex monad as much as possible while
- - running potentially expensive parts of this check, since remaining in it
- - will block the watcher. -}
-check :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-check st status changechan = do
- g <- runThreadState st $ do
- showSideAction "Running daily check"
- fromRepo id
- -- Find old unstaged symlinks, and add them to git.
- unstaged <- Git.LsFiles.notInRepo False ["."] g
- now <- getPOSIXTime
- forM_ unstaged $ \file -> do
- ms <- catchMaybeIO $ getSymbolicLinkStatus file
- case ms of
- Just s | toonew (statusChangeTime s) now -> noop
- | isSymbolicLink s ->
- addsymlink file ms
- _ -> noop
- where
- toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime)
- slop = fromIntegral tenMinutes
- insanity m = runThreadState st $ warning m
- addsymlink file s = do
- insanity $ "found unstaged symlink: " ++ file
- Assistant.Watcher.runHandler st status changechan
- Assistant.Watcher.onAddSymlink file s
diff --git a/Assistant/ScanRemotes.hs b/Assistant/ScanRemotes.hs
new file mode 100644
index 000000000..661c98095
--- /dev/null
+++ b/Assistant/ScanRemotes.hs
@@ -0,0 +1,48 @@
+{- git-annex assistant remotes needing scanning
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.ScanRemotes where
+
+import Common.Annex
+import qualified Types.Remote as Remote
+
+import Data.Function
+import Control.Concurrent.STM
+import qualified Data.Map as M
+
+data ScanInfo = ScanInfo
+ { scanPriority :: Int
+ , fullScan :: Bool
+ }
+
+type ScanRemoteMap = TMVar (M.Map Remote ScanInfo)
+
+{- The TMVar starts empty, and is left empty when there are no remotes
+ - to scan. -}
+newScanRemoteMap :: IO ScanRemoteMap
+newScanRemoteMap = atomically newEmptyTMVar
+
+{- Blocks until there is a remote or remotes that need to be scanned.
+ -
+ - The list has higher priority remotes listed first. -}
+getScanRemote :: ScanRemoteMap -> IO [(Remote, ScanInfo)]
+getScanRemote v = atomically $
+ reverse . sortBy (compare `on` scanPriority . snd) . M.toList
+ <$> takeTMVar v
+
+{- Adds new remotes that need scanning. -}
+addScanRemotes :: ScanRemoteMap -> Bool -> [Remote] -> IO ()
+addScanRemotes _ _ [] = noop
+addScanRemotes v full rs = atomically $ do
+ m <- fromMaybe M.empty <$> tryTakeTMVar v
+ putTMVar v $ M.unionWith merge (M.fromList $ zip rs (map info rs)) m
+ where
+ info r = ScanInfo (-1 * Remote.cost r) full
+ merge x y = ScanInfo
+ { scanPriority = max (scanPriority x) (scanPriority y)
+ , fullScan = fullScan x || fullScan y
+ }
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
new file mode 100644
index 000000000..499fc960c
--- /dev/null
+++ b/Assistant/Sync.hs
@@ -0,0 +1,105 @@
+{- git-annex assistant repo syncing
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Sync where
+
+import Assistant.Common
+import Assistant.Pushes
+import Assistant.Alert
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import qualified Command.Sync
+import Utility.Parallel
+import qualified Git
+import qualified Git.Branch
+import qualified Git.Command
+import qualified Remote
+import qualified Annex.Branch
+
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- Syncs with remotes that may have been disconnected for a while.
+ -
+ - First gets git in sync, and then prepares any necessary file transfers.
+ -
+ - An expensive full scan is queued when the git-annex branches of some of
+ - the remotes have diverged from the local git-annex branch. Otherwise,
+ - it's sufficient to requeue failed transfers.
+ -}
+reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO ()
+reconnectRemotes _ _ _ _ [] = noop
+reconnectRemotes threadname st dstatus scanremotes rs = void $
+ alertWhile dstatus (syncAlert rs) $ do
+ sync =<< runThreadState st (inRepo Git.Branch.current)
+ where
+ sync (Just branch) = do
+ diverged <- manualPull st (Just branch) rs
+ addScanRemotes scanremotes diverged rs
+ now <- getCurrentTime
+ pushToRemotes threadname now st Nothing rs
+ {- No local branch exists yet, but we can try pulling. -}
+ sync Nothing = do
+ diverged <- manualPull st Nothing rs
+ addScanRemotes scanremotes diverged rs
+ return True
+
+{- Updates the local sync branch, then pushes it to all remotes, in
+ - parallel.
+ -
+ - Avoids running possibly long-duration commands in the Annex monad, so
+ - as not to block other threads. -}
+pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> (Maybe FailedPushMap) -> [Remote] -> IO Bool
+pushToRemotes threadname now st mpushmap remotes = do
+ (g, branch) <- runThreadState st $
+ (,) <$> fromRepo id <*> inRepo Git.Branch.current
+ go True branch g remotes
+ where
+ go _ Nothing _ _ = return True -- no branch, so nothing to do
+ go shouldretry (Just branch) g rs = do
+ debug threadname
+ [ "pushing to"
+ , show rs
+ ]
+ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
+ (succeeded, failed) <- inParallel (push g branch) rs
+ let ok = null failed
+ case mpushmap of
+ Nothing -> noop
+ Just pushmap ->
+ changeFailedPushMap pushmap $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
+ unless (ok) $
+ debug threadname
+ [ "failed to push to"
+ , show failed
+ ]
+ if (ok || not shouldretry)
+ then return ok
+ else retry branch g failed
+
+ makemap l = M.fromList $ zip l (repeat now)
+
+ push g branch remote = Command.Sync.pushBranch remote branch g
+
+ retry branch g rs = do
+ debug threadname [ "trying manual pull to resolve failed pushes" ]
+ void $ manualPull st (Just branch) rs
+ go False (Just branch) g rs
+
+{- Manually pull from remotes and merge their branches. -}
+manualPull :: ThreadState -> (Maybe Git.Ref) -> [Remote] -> IO Bool
+manualPull st currentbranch remotes = do
+ g <- runThreadState st $ fromRepo id
+ forM_ remotes $ \r ->
+ Git.Command.runBool "fetch" [Param $ Remote.name r] g
+ haddiverged <- runThreadState st $ Annex.Branch.forceUpdate
+ forM_ remotes $ \r ->
+ runThreadState st $ Command.Sync.mergeRemote r currentbranch
+ return haddiverged
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs
index 51f579d07..7b915e12c 100644
--- a/Assistant/ThreadedMonad.hs
+++ b/Assistant/ThreadedMonad.hs
@@ -1,17 +1,17 @@
{- making the Annex monad available across threads
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE BangPatterns #-}
-
module Assistant.ThreadedMonad where
import Common.Annex
import qualified Annex
import Control.Concurrent
-import Control.Exception (throw)
+import Data.Tuple
{- The Annex state is stored in a MVar, so that threaded actions can access
- it. -}
@@ -32,13 +32,7 @@ withThreadState a = do
{- Runs an Annex action, using the state from the MVar.
-
- - This serializes calls by threads. -}
+ - This serializes calls by threads; only one thread can run in Annex at a
+ - time. -}
runThreadState :: ThreadState -> Annex a -> IO a
-runThreadState mvar a = do
- startstate <- takeMVar mvar
- -- catch IO errors and rethrow after restoring the MVar
- !(r, newstate) <- catchIO (Annex.run startstate a) $ \e -> do
- putMVar mvar startstate
- throw e
- putMVar mvar newstate
- return r
+runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
diff --git a/Assistant/Committer.hs b/Assistant/Threads/Committer.hs
index 63df8cafc..5aadcc02a 100644
--- a/Assistant/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -1,14 +1,21 @@
{- git-annex assistant commit thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.Committer where
+module Assistant.Threads.Committer where
-import Common.Annex
+import Assistant.Common
import Assistant.Changes
+import Assistant.Commits
+import Assistant.Alert
import Assistant.ThreadedMonad
-import Assistant.Watcher
+import Assistant.Threads.Watcher
+import Assistant.TransferQueue
+import Assistant.DaemonStatus
+import Logs.Transfer
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
@@ -25,9 +32,12 @@ import Data.Tuple.Utils
import qualified Data.Set as S
import Data.Either
+thisThread :: ThreadName
+thisThread = "Committer"
+
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> IO ()
-commitThread st changechan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO ()
+commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@@ -36,12 +46,30 @@ commitThread st changechan = runEvery (Seconds 1) $ do
time <- getCurrentTime
if shouldCommit time changes
then do
- readychanges <- handleAdds st changechan changes
+ readychanges <- handleAdds st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
- void $ tryIO $ runThreadState st commitStaged
- else refillChanges changechan readychanges
- else refillChanges changechan changes
+ debug thisThread
+ [ "committing"
+ , show (length readychanges)
+ , "changes"
+ ]
+ void $ alertWhile dstatus commitAlert $
+ tryIO (runThreadState st commitStaged)
+ >> return True
+ recordCommit commitchan (Commit time)
+ else refill readychanges
+ else refill changes
+ where
+ refill [] = noop
+ refill cs = do
+ debug thisThread
+ [ "delaying commit of"
+ , show (length cs)
+ , "changes"
+ ]
+ refillChanges changechan cs
+
commitStaged :: Annex ()
commitStaged = do
@@ -93,8 +121,8 @@ shouldCommit now changes
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
-handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
-handleAdds st changechan cs = returnWhen (null pendingadds) $ do
+handleAdds :: ThreadState -> ChangeChan -> TransferQueue -> DaemonStatusHandle -> [Change] -> IO [Change]
+handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds) $ do
(postponed, toadd) <- partitionEithers <$>
safeToAdd st pendingadds
@@ -106,7 +134,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
if (DirWatcher.eventsCoalesce || null added)
then return $ added ++ otherchanges
else do
- r <- handleAdds st changechan
+ r <- handleAdds st changechan transferqueue dstatus
=<< getChanges changechan
return $ r ++ added ++ otherchanges
where
@@ -117,17 +145,21 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
| otherwise = a
add :: Change -> IO (Maybe Change)
- add change@(PendingAddChange { keySource = ks }) = do
- r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do
- showStart "add" $ keyFilename ks
- handle (finishedChange change) (keyFilename ks)
- =<< Command.Add.ingest ks
- return $ maybeMaybe r
+ add change@(PendingAddChange { keySource = ks }) =
+ alertWhile' dstatus (addFileAlert $ keyFilename ks) $
+ liftM ret $ catchMaybeIO $
+ sanitycheck ks $ runThreadState st $ do
+ showStart "add" $ keyFilename ks
+ key <- Command.Add.ingest ks
+ handle (finishedChange change) (keyFilename ks) key
+ where
+ {- Add errors tend to be transient and will
+ - be automatically dealt with, so don't
+ - pass to the alert code. -}
+ ret (Just j@(Just _)) = (True, j)
+ ret _ = (True, Nothing)
add _ = return Nothing
- maybeMaybe (Just j@(Just _)) = j
- maybeMaybe _ = Nothing
-
handle _ _ Nothing = do
showEndFail
return Nothing
@@ -137,6 +169,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
+ queueTransfers Next transferqueue dstatus key (Just file) Upload
showEndOk
return $ Just change
@@ -164,6 +197,15 @@ safeToAdd st changes = runThreadState st $
tmpdir <- fromRepo gitAnnexTmpDir
openfiles <- S.fromList . map fst3 . filter openwrite <$>
liftIO (Lsof.queryDir tmpdir)
+
+ -- TODO this is here for debugging a problem on
+ -- OSX, and is pretty expensive, so remove later
+ liftIO $ debug thisThread
+ [ "checking changes:"
+ , show changes
+ , "vs open files:"
+ , show openfiles
+ ]
let checked = map (check openfiles) changes
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
new file mode 100644
index 000000000..0f33b68ed
--- /dev/null
+++ b/Assistant/Threads/Merger.hs
@@ -0,0 +1,80 @@
+{- git-annex assistant git merge thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Merger where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Git
+import qualified Git.Merge
+import qualified Git.Branch
+import qualified Command.Sync
+
+thisThread :: ThreadName
+thisThread = "Merger"
+
+{- This thread watches for changes to .git/refs/heads/synced/,
+ - which indicate incoming pushes. It merges those pushes into the
+ - currently checked out branch. -}
+mergeThread :: ThreadState -> IO ()
+mergeThread st = do
+ g <- runThreadState st $ fromRepo id
+ let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler g a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+ debug thisThread ["watching", dir]
+
+type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler g handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler g file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ msg _ = error msg
+
+{- Called when a new branch ref is written.
+ -
+ - This relies on git's atomic method of updating branch ref files,
+ - which is to first write the new file to .lock, and then rename it
+ - over the old file. So, ignore .lock files, and the rename ensures
+ - the watcher sees a new file being added on each update.
+ -
+ - At startup, synthetic add events fire, causing this to run, but that's
+ - ok; it ensures that any changes pushed since the last time the assistant
+ - ran are merged in.
+ -}
+onAdd :: Handler
+onAdd g file _
+ | ".lock" `isSuffixOf` file = noop
+ | otherwise = do
+ let changedbranch = Git.Ref $
+ "refs" </> "heads" </> takeFileName file
+ current <- Git.Branch.current g
+ when (Just changedbranch == current) $ do
+ liftIO $ debug thisThread
+ [ "merging changes into"
+ , show current
+ ]
+ void $ mergeBranch changedbranch g
+
+mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
+mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs
new file mode 100644
index 000000000..cc7495602
--- /dev/null
+++ b/Assistant/Threads/MountWatcher.hs
@@ -0,0 +1,189 @@
+{- git-annex assistant mount watcher, using either dbus or mtab polling
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Assistant.Threads.MountWatcher where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.Sync
+import qualified Annex
+import qualified Git
+import Utility.ThreadScheduler
+import Utility.Mounts
+import Remote.List
+import qualified Types.Remote as Remote
+
+import Control.Concurrent
+import qualified Control.Exception as E
+import qualified Data.Set as S
+
+#if WITH_DBUS
+import Utility.DBus
+import DBus.Client
+import DBus
+import Data.Word (Word32)
+#else
+#warning Building without dbus support; will use mtab polling
+#endif
+
+thisThread :: ThreadName
+thisThread = "MountWatcher"
+
+mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+mountWatcherThread st handle scanremotes =
+#if WITH_DBUS
+ dbusThread st handle scanremotes
+#else
+ pollingThread st handle scanremotes
+#endif
+
+#if WITH_DBUS
+
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
+ where
+ go client = ifM (checkMountMonitor client)
+ ( do
+ {- Store the current mount points in an mvar,
+ - to be compared later. We could in theory
+ - work out the mount point from the dbus
+ - message, but this is easier. -}
+ mvar <- newMVar =<< currentMountPoints
+ forM_ mountChanged $ \matcher ->
+ listen client matcher $ \_event -> do
+ nowmounted <- currentMountPoints
+ wasmounted <- swapMVar mvar nowmounted
+ handleMounts st dstatus scanremotes wasmounted nowmounted
+ , do
+ runThreadState st $
+ warning "No known volume monitor available through dbus; falling back to mtab polling"
+ pollinstead
+ )
+ onerr :: E.SomeException -> IO ()
+ onerr e = do
+ runThreadState st $
+ warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")"
+ pollinstead
+ pollinstead = pollingThread st dstatus scanremotes
+
+{- Examine the list of services connected to dbus, to see if there
+ - are any we can use to monitor mounts. If not, will attempt to start one. -}
+checkMountMonitor :: Client -> IO Bool
+checkMountMonitor client = do
+ running <- filter (`elem` usableservices)
+ <$> listServiceNames client
+ case running of
+ [] -> startOneService client startableservices
+ (service:_) -> do
+ debug thisThread [ "Using running DBUS service"
+ , service
+ , "to monitor mount events."
+ ]
+ return True
+ where
+ startableservices = [gvfs]
+ usableservices = startableservices ++ [kde]
+ gvfs = "org.gtk.Private.GduVolumeMonitor"
+ kde = "org.kde.DeviceNotifications"
+
+startOneService :: Client -> [ServiceName] -> IO Bool
+startOneService _ [] = return False
+startOneService client (x:xs) = do
+ _ <- callDBus client "StartServiceByName"
+ [toVariant x, toVariant (0 :: Word32)]
+ ifM (elem x <$> listServiceNames client)
+ ( do
+ debug thisThread [ "Started DBUS service"
+ , x
+ , "to monitor mount events."
+ ]
+ return True
+ , startOneService client xs
+ )
+
+{- Filter matching events recieved when drives are mounted and unmounted. -}
+mountChanged :: [MatchRule]
+mountChanged = [gvfs True, gvfs False, kde, kdefallback]
+ where
+ {- gvfs reliably generates this event whenever a drive is mounted/unmounted,
+ - whether automatically, or manually -}
+ gvfs mount = matchAny
+ { matchInterface = Just "org.gtk.Private.RemoteVolumeMonitor"
+ , matchMember = Just $ if mount then "MountAdded" else "MountRemoved"
+ }
+ {- This event fires when KDE prompts the user what to do with a drive,
+ - but maybe not at other times. And it's not received -}
+ kde = matchAny
+ { matchInterface = Just "org.kde.Solid.Device"
+ , matchMember = Just "setupDone"
+ }
+ {- This event may not be closely related to mounting a drive, but it's
+ - observed reliably when a drive gets mounted or unmounted. -}
+ kdefallback = matchAny
+ { matchInterface = Just "org.kde.KDirNotify"
+ , matchMember = Just "enteredDirectory"
+ }
+
+#endif
+
+pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+pollingThread st dstatus scanremotes = go =<< currentMountPoints
+ where
+ go wasmounted = do
+ threadDelaySeconds (Seconds 10)
+ nowmounted <- currentMountPoints
+ handleMounts st dstatus scanremotes wasmounted nowmounted
+ go nowmounted
+
+handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO ()
+handleMounts st dstatus scanremotes wasmounted nowmounted =
+ mapM_ (handleMount st dstatus scanremotes . mnt_dir) $
+ S.toList $ newMountPoints wasmounted nowmounted
+
+handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO ()
+handleMount st dstatus scanremotes dir = do
+ debug thisThread ["detected mount of", dir]
+ reconnectRemotes thisThread st dstatus scanremotes
+ =<< filter (Git.repoIsLocal . Remote.repo)
+ <$> remotesUnder st dstatus dir
+
+{- Finds remotes located underneath the mount point.
+ -
+ - Updates state to include the remotes.
+ -
+ - The config of git remotes is re-read, as it may not have been available
+ - at startup time, or may have changed (it could even be a different
+ - repository at the same remote location..)
+ -}
+remotesUnder :: ThreadState -> DaemonStatusHandle -> FilePath -> IO [Remote]
+remotesUnder st dstatus dir = runThreadState st $ do
+ repotop <- fromRepo Git.repoPath
+ rs <- remoteList
+ pairs <- mapM (checkremote repotop) rs
+ let (waschanged, rs') = unzip pairs
+ when (any id waschanged) $ do
+ Annex.changeState $ \s -> s { Annex.remotes = rs' }
+ updateKnownRemotes dstatus
+ return $ map snd $ filter fst pairs
+ where
+ checkremote repotop r = case Remote.localpath r of
+ Just p | dirContains dir (absPathFrom repotop p) ->
+ (,) <$> pure True <*> updateRemote r
+ _ -> return (False, r)
+
+type MountPoints = S.Set Mntent
+
+currentMountPoints :: IO MountPoints
+currentMountPoints = S.fromList <$> getMounts
+
+newMountPoints :: MountPoints -> MountPoints -> MountPoints
+newMountPoints old new = S.difference new old
diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs
new file mode 100644
index 000000000..117ab7a20
--- /dev/null
+++ b/Assistant/Threads/NetWatcher.hs
@@ -0,0 +1,132 @@
+{- git-annex assistant network connection watcher, using dbus
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Assistant.Threads.NetWatcher where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.Sync
+import qualified Git
+import Utility.ThreadScheduler
+import Remote.List
+import qualified Types.Remote as Remote
+
+import qualified Control.Exception as E
+import Control.Concurrent
+
+#if WITH_DBUS
+import Utility.DBus
+import DBus.Client
+import DBus
+import Data.Word (Word32)
+#else
+#warning Building without dbus support; will poll for network connection changes
+#endif
+
+thisThread :: ThreadName
+thisThread = "NetWatcher"
+
+netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+netWatcherThread st dstatus scanremotes = do
+#if WITH_DBUS
+ void $ forkIO $ dbusThread st dstatus scanremotes
+#endif
+ {- This is a fallback for when dbus cannot be used to detect
+ - network connection changes, but it also ensures that
+ - any networked remotes that may have not been routable for a
+ - while (despite the local network staying up), are synced with
+ - periodically. -}
+ runEvery (Seconds 3600) $
+ handleConnection st dstatus scanremotes
+
+#if WITH_DBUS
+
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr
+ where
+ go client = ifM (checkNetMonitor client)
+ ( do
+ listenNMConnections client handle
+ listenWicdConnections client handle
+ , do
+ runThreadState st $
+ warning "No known network monitor available through dbus; falling back to polling"
+ )
+ onerr :: E.SomeException -> IO ()
+ onerr e = runThreadState st $
+ warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")"
+ handle = do
+ debug thisThread ["detected network connection"]
+ handleConnection st dstatus scanremotes
+
+{- Examine the list of services connected to dbus, to see if there
+ - are any we can use to monitor network connections. -}
+checkNetMonitor :: Client -> IO Bool
+checkNetMonitor client = do
+ running <- filter (`elem` [networkmanager, wicd])
+ <$> listServiceNames client
+ case running of
+ [] -> return False
+ (service:_) -> do
+ debug thisThread [ "Using running DBUS service"
+ , service
+ , "to monitor network connection events."
+ ]
+ return True
+ where
+ networkmanager = "org.freedesktop.NetworkManager"
+ wicd = "org.wicd.daemon"
+
+{- Listens for new NetworkManager connections. -}
+listenNMConnections :: Client -> IO () -> IO ()
+listenNMConnections client callback =
+ listen client matcher $ \event ->
+ when (Just True == anyM activeconnection (signalBody event)) $
+ callback
+ where
+ matcher = matchAny
+ { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active"
+ , matchMember = Just "PropertiesChanged"
+ }
+ nm_connection_activated = toVariant (2 :: Word32)
+ nm_state_key = toVariant ("State" :: String)
+ activeconnection v = do
+ m <- fromVariant v
+ vstate <- lookup nm_state_key $ dictionaryItems m
+ state <- fromVariant vstate
+ return $ state == nm_connection_activated
+
+{- Listens for new Wicd connections. -}
+listenWicdConnections :: Client -> IO () -> IO ()
+listenWicdConnections client callback =
+ listen client matcher $ \event ->
+ when (any (== wicd_success) (signalBody event)) $
+ callback
+ where
+ matcher = matchAny
+ { matchInterface = Just "org.wicd.daemon"
+ , matchMember = Just "ConnectResultsSent"
+ }
+ wicd_success = toVariant ("success" :: String)
+
+#endif
+
+handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+handleConnection st dstatus scanremotes = do
+ reconnectRemotes thisThread st dstatus scanremotes =<<
+ filter (Git.repoIsUrl . Remote.repo)
+ <$> networkRemotes st
+
+{- Finds network remotes. -}
+networkRemotes :: ThreadState -> IO [Remote]
+networkRemotes st = runThreadState st $
+ filter (isNothing . Remote.localpath) <$> remoteList
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
new file mode 100644
index 000000000..6bf8de2df
--- /dev/null
+++ b/Assistant/Threads/Pusher.hs
@@ -0,0 +1,82 @@
+{- git-annex assistant git pushing thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Pusher where
+
+import Assistant.Common
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.Alert
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Sync
+import Utility.ThreadScheduler
+import qualified Remote
+import qualified Types.Remote as Remote
+
+import Data.Time.Clock
+
+thisThread :: ThreadName
+thisThread = "Pusher"
+
+{- This thread retries pushes that failed before. -}
+pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> IO ()
+pushRetryThread st dstatus pushmap = runEvery (Seconds halfhour) $ do
+ -- We already waited half an hour, now wait until there are failed
+ -- pushes to retry.
+ topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
+ unless (null topush) $ do
+ debug thisThread
+ [ "retrying"
+ , show (length topush)
+ , "failed pushes"
+ ]
+ now <- getCurrentTime
+ void $ alertWhile dstatus (pushRetryAlert topush) $
+ pushToRemotes thisThread now st (Just pushmap) topush
+ where
+ halfhour = 1800
+
+{- This thread pushes git commits out to remotes soon after they are made. -}
+pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO ()
+pushThread st dstatus commitchan pushmap = do
+ runEvery (Seconds 2) $ do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ commits <- getCommits commitchan
+ -- Now see if now's a good time to push.
+ now <- getCurrentTime
+ if shouldPush now commits
+ then do
+ remotes <- filter pushable . knownRemotes
+ <$> getDaemonStatus dstatus
+ unless (null remotes) $
+ void $ alertWhile dstatus (pushAlert remotes) $
+ pushToRemotes thisThread now st (Just pushmap) remotes
+ else do
+ debug thisThread
+ [ "delaying push of"
+ , show (length commits)
+ , "commits"
+ ]
+ refillCommits commitchan commits
+ where
+ pushable r
+ | Remote.specialRemote r = False
+ | Remote.readonly r = False
+ | otherwise = True
+
+{- Decide if now is a good time to push to remotes.
+ -
+ - Current strategy: Immediately push all commits. The commit machinery
+ - already determines batches of changes, so we can't easily determine
+ - batches better.
+ -}
+shouldPush :: UTCTime -> [Commit] -> Bool
+shouldPush _now commits
+ | not (null commits) = True
+ | otherwise = False
diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
new file mode 100644
index 000000000..a7c2189d8
--- /dev/null
+++ b/Assistant/Threads/SanityChecker.hs
@@ -0,0 +1,98 @@
+{- git-annex assistant sanity checker
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.SanityChecker (
+ sanityCheckerThread
+) where
+
+import Assistant.Common
+import Assistant.DaemonStatus
+import Assistant.ThreadedMonad
+import Assistant.Changes
+import Assistant.Alert
+import Assistant.TransferQueue
+import qualified Git.LsFiles
+import Utility.ThreadScheduler
+import qualified Assistant.Threads.Watcher as Watcher
+
+import Data.Time.Clock.POSIX
+
+thisThread :: ThreadName
+thisThread = "SanityChecker"
+
+{- This thread wakes up occasionally to make sure the tree is in good shape. -}
+sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+sanityCheckerThread st dstatus transferqueue changechan = forever $ do
+ waitForNextCheck dstatus
+
+ debug thisThread ["starting sanity check"]
+
+ void $ alertWhile dstatus sanityCheckAlert go
+
+ debug thisThread ["sanity check complete"]
+ where
+ go = do
+ modifyDaemonStatus_ dstatus $ \s -> s
+ { sanityCheckRunning = True }
+
+ now <- getPOSIXTime -- before check started
+ r <- catchIO (check st dstatus transferqueue changechan)
+ $ \e -> do
+ runThreadState st $ warning $ show e
+ return False
+
+ modifyDaemonStatus_ dstatus $ \s -> s
+ { sanityCheckRunning = False
+ , lastSanityCheck = Just now
+ }
+
+ return r
+
+{- Only run one check per day, from the time of the last check. -}
+waitForNextCheck :: DaemonStatusHandle -> IO ()
+waitForNextCheck dstatus = do
+ v <- lastSanityCheck <$> getDaemonStatus dstatus
+ now <- getPOSIXTime
+ threadDelaySeconds $ Seconds $ calcdelay now v
+ where
+ calcdelay _ Nothing = oneDay
+ calcdelay now (Just lastcheck)
+ | lastcheck < now = max oneDay $
+ oneDay - truncate (now - lastcheck)
+ | otherwise = oneDay
+
+oneDay :: Int
+oneDay = 24 * 60 * 60
+
+{- It's important to stay out of the Annex monad as much as possible while
+ - running potentially expensive parts of this check, since remaining in it
+ - will block the watcher. -}
+check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO Bool
+check st dstatus transferqueue changechan = do
+ g <- runThreadState st $ fromRepo id
+ -- Find old unstaged symlinks, and add them to git.
+ unstaged <- Git.LsFiles.notInRepo False ["."] g
+ now <- getPOSIXTime
+ forM_ unstaged $ \file -> do
+ ms <- catchMaybeIO $ getSymbolicLinkStatus file
+ case ms of
+ Just s | toonew (statusChangeTime s) now -> noop
+ | isSymbolicLink s ->
+ addsymlink file ms
+ _ -> noop
+ return True
+ where
+ toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime)
+ slop = fromIntegral tenMinutes
+ insanity msg = do
+ runThreadState st $ warning msg
+ void $ addAlert dstatus $ sanityCheckFixAlert msg
+ addsymlink file s = do
+ Watcher.runHandler thisThread st dstatus
+ transferqueue changechan
+ Watcher.onAddSymlink file s
+ insanity $ "found unstaged symlink: " ++ file
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
new file mode 100644
index 000000000..87af3567e
--- /dev/null
+++ b/Assistant/Threads/TransferScanner.hs
@@ -0,0 +1,138 @@
+{- git-annex assistant thread to scan remotes to find needed transfers
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.TransferScanner where
+
+import Assistant.Common
+import Assistant.ScanRemotes
+import Assistant.TransferQueue
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Alert
+import Logs.Transfer
+import Logs.Location
+import Logs.Web (webUUID)
+import qualified Remote
+import qualified Types.Remote as Remote
+import Utility.ThreadScheduler
+import qualified Git.LsFiles as LsFiles
+import Command
+import Annex.Content
+
+import qualified Data.Set as S
+
+thisThread :: ThreadName
+thisThread = "TransferScanner"
+
+{- This thread waits until a remote needs to be scanned, to find transfers
+ - that need to be made, to keep data in sync.
+ -}
+transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO ()
+transferScannerThread st dstatus scanremotes transferqueue = do
+ startupScan
+ go S.empty
+ where
+ go scanned = do
+ threadDelaySeconds (Seconds 2)
+ (rs, infos) <- unzip <$> getScanRemote scanremotes
+ if any fullScan infos || any (`S.notMember` scanned) rs
+ then do
+ expensiveScan st dstatus transferqueue rs
+ go (S.union scanned (S.fromList rs))
+ else do
+ mapM_ (failedTransferScan st dstatus transferqueue) rs
+ go scanned
+ {- All available remotes are scanned in full on startup,
+ - for multiple reasons, including:
+ -
+ - * This may be the first run, and there may be remotes
+ - already in place, that need to be synced.
+ - * We may have run before, and scanned a remote, but
+ - only been in a subdirectory of the git remote, and so
+ - not synced it all.
+ - * We may have run before, and had transfers queued,
+ - and then the system (or us) crashed, and that info was
+ - lost.
+ -}
+ startupScan = addScanRemotes scanremotes True
+ =<< knownRemotes <$> getDaemonStatus dstatus
+
+{- This is a cheap scan for failed transfers involving a remote. -}
+failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
+failedTransferScan st dstatus transferqueue r = do
+ ts <- runThreadState st $
+ getFailedTransfers $ Remote.uuid r
+ go ts
+ where
+ go [] = noop
+ go ((t, info):ts)
+ | transferDirection t == Download = do
+ {- Check if the remote still has the key.
+ - If not, relies on the expensiveScan to
+ - get it queued from some other remote. -}
+ ifM (runThreadState st $ remoteHas r $ transferKey t)
+ ( requeue t info
+ , dequeue t
+ )
+ go ts
+ | otherwise = do
+ {- The Transferrer checks when uploading
+ - that the remote doesn't already have the
+ - key, so it's not redundantly checked
+ - here. -}
+ requeue t info
+ go ts
+
+ requeue t info = do
+ queueTransferWhenSmall
+ transferqueue dstatus (associatedFile info) t r
+ dequeue t
+ dequeue t = void $ runThreadState st $ inRepo $
+ liftIO . tryIO . removeFile . failedTransferFile t
+
+{- This is a expensive scan through the full git work tree, finding
+ - files to download from or upload to any of the remotes.
+ -
+ - The scan is blocked when the transfer queue gets too large. -}
+expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO ()
+expensiveScan st dstatus transferqueue rs = unless onlyweb $ do
+ liftIO $ debug thisThread ["starting scan of", show visiblers]
+ void $ alertWhile dstatus (scanAlert visiblers) $ do
+ g <- runThreadState st $ fromRepo id
+ files <- LsFiles.inRepo [] g
+ go files
+ return True
+ liftIO $ debug thisThread ["finished scan of", show visiblers]
+ where
+ onlyweb = all (== webUUID) $ map Remote.uuid rs
+ visiblers = let rs' = filter (not . Remote.readonly) rs
+ in if null rs' then rs else rs'
+ go [] = noop
+ go (f:fs) = do
+ mapM_ (enqueue f) =<< catMaybes <$> runThreadState st
+ (ifAnnexed f findtransfers $ return [])
+ go fs
+ enqueue f (r, t) = do
+ debug thisThread ["queuing", show t]
+ queueTransferWhenSmall transferqueue dstatus (Just f) t r
+ findtransfers (key, _) = do
+ locs <- loggedLocations key
+ let use a = return $ map (a key locs) rs
+ ifM (inAnnex key)
+ ( use $ check Upload False
+ , use $ check Download True
+ )
+ check direction want key locs r
+ | direction == Upload && Remote.readonly r = Nothing
+ | (Remote.uuid r `elem` locs) == want = Just $
+ (r, Transfer direction (Remote.uuid r) key)
+ | otherwise = Nothing
+
+remoteHas :: Remote -> Key -> Annex Bool
+remoteHas r key = elem
+ <$> pure (Remote.uuid r)
+ <*> loggedLocations key
diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs
new file mode 100644
index 000000000..fe8af9aad
--- /dev/null
+++ b/Assistant/Threads/TransferWatcher.hs
@@ -0,0 +1,80 @@
+{- git-annex assistant transfer watching thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.TransferWatcher where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Logs.Transfer
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Remote
+
+thisThread :: ThreadName
+thisThread = "TransferWatcher"
+
+{- This thread watches for changes to the gitAnnexTransferDir,
+ - and updates the DaemonStatus's map of ongoing transfers. -}
+transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO ()
+transferWatcherThread st dstatus = do
+ g <- runThreadState st $ fromRepo id
+ let dir = gitAnnexTransferDir g
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler st dstatus a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , delHook = hook onDel
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+ debug thisThread ["watching for transfers"]
+
+type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: ThreadState -> DaemonStatusHandle -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler st dstatus handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler st dstatus file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ _ msg _ = error msg
+
+{- Called when a new transfer information file is written. -}
+onAdd :: Handler
+onAdd st dstatus file _ = case parseTransferFile file of
+ Nothing -> noop
+ Just t -> go t =<< runThreadState st (checkTransfer t)
+ where
+ go _ Nothing = noop -- transfer already finished
+ go t (Just info) = do
+ debug thisThread
+ [ "transfer starting:"
+ , show t
+ ]
+ r <- headMaybe . filter (sameuuid t) . knownRemotes
+ <$> getDaemonStatus dstatus
+ updateTransferInfo dstatus t info
+ { transferRemote = r }
+ sameuuid t r = Remote.uuid r == transferUUID t
+
+{- Called when a transfer information file is removed. -}
+onDel :: Handler
+onDel _ dstatus file _ = case parseTransferFile file of
+ Nothing -> noop
+ Just t -> do
+ debug thisThread
+ [ "transfer finishing:"
+ , show t
+ ]
+ void $ removeTransfer dstatus t
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
new file mode 100644
index 000000000..9a772d628
--- /dev/null
+++ b/Assistant/Threads/Transferrer.hs
@@ -0,0 +1,113 @@
+{- git-annex assistant data transferrer thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Transferrer where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Assistant.Alert
+import Logs.Transfer
+import Logs.Location
+import Annex.Content
+import qualified Remote
+import Types.Key
+import Locations.UserConfig
+
+import System.Process (create_group)
+
+thisThread :: ThreadName
+thisThread = "Transferrer"
+
+{- For now only one transfer is run at a time. -}
+maxTransfers :: Int
+maxTransfers = 1
+
+{- Dispatches transfers from the queue. -}
+transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO ()
+transfererThread st dstatus transferqueue slots = go =<< readProgramFile
+ where
+ go program = getNextTransfer transferqueue dstatus notrunning >>= handle program
+ handle program Nothing = go program
+ handle program (Just (t, info)) = do
+ ifM (runThreadState st $ shouldTransfer t info)
+ ( do
+ debug thisThread [ "Transferring:" , show t ]
+ notifyTransfer dstatus
+ transferThread dstatus slots t info inTransferSlot program
+ , do
+ debug thisThread [ "Skipping unnecessary transfer:" , show t ]
+ -- getNextTransfer added t to the
+ -- daemonstatus's transfer map.
+ void $ removeTransfer dstatus t
+ )
+ go program
+ {- Skip transfers that are already running. -}
+ notrunning i = startedTime i == Nothing
+
+{- Checks if the file to download is already present, or the remote
+ - being uploaded to isn't known to have the file. -}
+shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
+shouldTransfer t info
+ | transferDirection t == Download =
+ not <$> inAnnex key
+ | transferDirection t == Upload =
+ {- Trust the location log to check if the
+ - remote already has the key. This avoids
+ - a roundtrip to the remote. -}
+ case transferRemote info of
+ Nothing -> return False
+ Just remote ->
+ notElem (Remote.uuid remote)
+ <$> loggedLocations key
+ | otherwise = return False
+ where
+ key = transferKey t
+
+{- A sepeate git-annex process is forked off to run a transfer,
+ - running in its own process group. This allows killing it and all its
+ - children if the user decides to cancel the transfer.
+ -
+ - A thread is forked off to run the process, and the thread
+ - occupies one of the transfer slots. If all slots are in use, this will
+ - block until one becomes available. The thread's id is also recorded in
+ - the transfer info; the thread will also be killed when a transfer is
+ - stopped, to avoid it displaying any alert about the transfer having
+ - failed. -}
+transferThread :: DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> TransferSlotRunner -> FilePath -> IO ()
+transferThread dstatus slots t info runner program = case (transferRemote info, associatedFile info) of
+ (Nothing, _) -> noop
+ (_, Nothing) -> noop
+ (Just remote, Just file) -> do
+ tid <- runner slots $
+ transferprocess remote file
+ updateTransferInfo dstatus t $ info { transferTid = Just tid }
+ where
+ direction = transferDirection t
+ isdownload = direction == Download
+
+ transferprocess remote file = void $ do
+ (_, _, _, pid)
+ <- createProcess (proc program $ toCommand params)
+ { create_group = True }
+ ok <- (==) ExitSuccess <$> waitForProcess pid
+ addAlert dstatus $
+ makeAlertFiller ok $
+ transferFileAlert direction ok file
+ where
+ params =
+ [ Param "transferkey"
+ , Param $ key2file $ transferKey t
+ , Param $ if isdownload
+ then "--from"
+ else "--to"
+ , Param $ Remote.name remote
+ , Param "--file"
+ , File file
+ ]
diff --git a/Assistant/Watcher.hs b/Assistant/Threads/Watcher.hs
index db58f01e8..8ba015b19 100644
--- a/Assistant/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -5,14 +5,22 @@
- Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE CPP #-}
-
-module Assistant.Watcher where
-
-import Common.Annex
+module Assistant.Threads.Watcher (
+ watchThread,
+ checkCanWatch,
+ needLsof,
+ stageSymlink,
+ onAddSymlink,
+ runHandler,
+) where
+
+import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
+import Assistant.TransferQueue
+import Assistant.Alert
+import Logs.Transfer
import Utility.DirWatcher
import Utility.Types.DirWatcher
import qualified Annex
@@ -27,10 +35,12 @@ import Annex.Content
import Annex.CatFile
import Git.Types
-import Control.Concurrent.STM
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
+thisThread :: ThreadName
+thisThread = "Watcher"
+
checkCanWatch :: Annex ()
checkCanWatch
| canWatch =
@@ -46,11 +56,13 @@ needLsof = error $ unlines
, "Be warned: This can corrupt data in the annex, and make fsck complain."
]
-watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-watchThread st dstatus changechan = watchDir "." ignored hooks startup
+watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
+watchThread st dstatus transferqueue changechan = do
+ void $ watchDir "." ignored hooks startup
+ debug thisThread [ "watching", "."]
where
- startup = statupScan st dstatus
- hook a = Just $ runHandler st dstatus changechan a
+ startup = startupScan st dstatus
+ hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a
hooks = WatchHooks
{ addHook = hook onAdd
, delHook = hook onDel
@@ -60,18 +72,21 @@ watchThread st dstatus changechan = watchDir "." ignored hooks startup
}
{- Initial scartup scan. The action should return once the scan is complete. -}
-statupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a
-statupScan st dstatus scanner = do
- runThreadState st $
- showAction "scanning"
- r <- scanner
- runThreadState st $
- modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
-
- -- Notice any files that were deleted before watching was started.
- runThreadState st $ do
- inRepo $ Git.Command.run "add" [Param "--update"]
- showAction "started"
+startupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a
+startupScan st dstatus scanner = do
+ runThreadState st $ showAction "scanning"
+ r <- alertWhile' dstatus startupScanAlert $ do
+ r <- scanner
+
+ -- Notice any files that were deleted before
+ -- watching was started.
+ runThreadState st $ do
+ inRepo $ Git.Command.run "add" [Param "--update"]
+ showAction "started"
+
+ modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True }
+
+ return (True, r)
return r
@@ -83,23 +98,22 @@ ignored = ig . takeFileName
ig ".gitattributes" = True
ig _ = False
-type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
+type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change)
{- Runs an action handler, inside the Annex monad, and if there was a
- change, adds it to the ChangeChan.
-
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
-}
-runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
-runHandler st dstatus changechan handler file filestatus = void $ do
+runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do
r <- tryIO go
case r of
Left e -> print e
Right Nothing -> noop
- Right (Just change) -> void $
- runChangeChan $ writeTChan changechan change
+ Right (Just change) -> recordChange changechan change
where
- go = runThreadState st $ handler file filestatus dstatus
+ go = runThreadState st $ handler threadname file filestatus dstatus transferqueue
{- During initial directory scan, this will be run for any regular files
- that are already checked into git. We don't want to turn those into
@@ -120,9 +134,9 @@ runHandler st dstatus changechan handler file filestatus = void $ do
- the add.
-}
onAdd :: Handler
-onAdd file filestatus dstatus
+onAdd threadname file filestatus dstatus _
| maybe False isRegularFile filestatus = do
- ifM (scanComplete <$> getDaemonStatus dstatus)
+ ifM (scanComplete <$> liftIO (getDaemonStatus dstatus))
( go
, ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
( noChange
@@ -131,27 +145,33 @@ onAdd file filestatus dstatus
)
| otherwise = noChange
where
- go = pendingAddChange =<< Command.Add.lockDown file
+ go = do
+ liftIO $ debug threadname ["file added", file]
+ pendingAddChange =<< Command.Add.lockDown file
{- A symlink might be an arbitrary symlink, which is just added.
- Or, if it is a git-annex symlink, ensure it points to the content
- before adding it.
-}
onAddSymlink :: Handler
-onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
+onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file
where
go (Just (key, _)) = do
link <- calcGitLink file key
ifM ((==) link <$> liftIO (readSymbolicLink file))
- ( ensurestaged link =<< getDaemonStatus dstatus
+ ( do
+ s <- liftIO $ getDaemonStatus dstatus
+ checkcontent key s
+ ensurestaged link s
, do
+ liftIO $ debug threadname ["fix symlink", file]
liftIO $ removeFile file
liftIO $ createSymbolicLink link file
addlink link
)
go Nothing = do -- other symlink
link <- liftIO (readSymbolicLink file)
- ensurestaged link =<< getDaemonStatus dstatus
+ ensurestaged link =<< liftIO (getDaemonStatus dstatus)
{- This is often called on symlinks that are already
- staged correctly. A symlink may have been deleted
@@ -174,6 +194,7 @@ onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
{- For speed, tries to reuse the existing blob for
- the symlink target. -}
addlink link = do
+ liftIO $ debug threadname ["add symlink", file]
v <- catObjectDetails $ Ref $ ':':file
case v of
Just (currlink, sha)
@@ -185,8 +206,17 @@ onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
stageSymlink file sha
madeChange file LinkChange
+ {- When a new link appears, after the startup scan,
+ - try to get the key's content. -}
+ checkcontent key daemonstatus
+ | scanComplete daemonstatus = unlessM (inAnnex key) $
+ queueTransfers Next transferqueue dstatus
+ key (Just file) Download
+ | otherwise = noop
+
onDel :: Handler
-onDel file _ _dstatus = do
+onDel threadname file _ _dstatus _ = do
+ liftIO $ debug threadname ["file deleted", file]
Annex.Queue.addUpdateIndex =<<
inRepo (Git.UpdateIndex.unstageFile file)
madeChange file RmChange
@@ -199,14 +229,15 @@ onDel file _ _dstatus = do
- command to get the recursive list of files in the directory, so rm is
- just as good. -}
onDelDir :: Handler
-onDelDir dir _ _dstatus = do
+onDelDir threadname dir _ _dstatus _ = do
+ liftIO $ debug threadname ["directory deleted", dir]
Annex.Queue.addCommand "rm"
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
madeChange dir RmDirChange
{- Called when there's an error with inotify. -}
onErr :: Handler
-onErr msg _ _dstatus = do
+onErr _ msg _ _dstatus _ = do
warning msg
return Nothing
diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs
new file mode 100644
index 000000000..e203d50ba
--- /dev/null
+++ b/Assistant/Threads/WebApp.hs
@@ -0,0 +1,112 @@
+{- git-annex assistant webapp thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE TypeFamilies, QuasiQuotes, MultiParamTypeClasses, TemplateHaskell, OverloadedStrings, RankNTypes #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+
+module Assistant.Threads.WebApp where
+
+import Assistant.Common
+import Assistant.WebApp
+import Assistant.WebApp.DashBoard
+import Assistant.WebApp.SideBar
+import Assistant.WebApp.Notifications
+import Assistant.WebApp.Configurators
+import Assistant.WebApp.Documentation
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Utility.WebApp
+import Utility.FileMode
+import Utility.TempFile
+import Git
+
+import Yesod
+import Yesod.Static
+import Network.Socket (PortNumber)
+import Data.Text (pack, unpack)
+
+thisThread :: String
+thisThread = "WebApp"
+
+mkYesodDispatch "WebApp" $(parseRoutesFile "Assistant/WebApp/routes")
+
+type Url = String
+
+webAppThread
+ :: (Maybe ThreadState)
+ -> DaemonStatusHandle
+ -> ScanRemoteMap
+ -> TransferQueue
+ -> TransferSlots
+ -> Maybe (IO String)
+ -> Maybe (Url -> FilePath -> IO ())
+ -> IO ()
+webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun onstartup = do
+ webapp <- WebApp
+ <$> pure mst
+ <*> pure dstatus
+ <*> pure scanremotes
+ <*> pure transferqueue
+ <*> pure transferslots
+ <*> (pack <$> genRandomToken)
+ <*> getreldir mst
+ <*> pure $(embed "static")
+ <*> newWebAppState
+ <*> pure postfirstrun
+ app <- toWaiAppPlain webapp
+ app' <- ifM debugEnabled
+ ( return $ httpDebugLogger app
+ , return app
+ )
+ runWebApp app' $ \port -> case mst of
+ Nothing -> withTempFile "webapp.html" $ \tmpfile _ -> go port webapp tmpfile
+ Just st -> go port webapp =<< runThreadState st (fromRepo gitAnnexHtmlShim)
+ where
+ getreldir Nothing = return Nothing
+ getreldir (Just st) = Just <$>
+ (relHome =<< absPath
+ =<< runThreadState st (fromRepo repoPath))
+ go port webapp htmlshim = do
+ writeHtmlShim webapp port htmlshim
+ maybe noop (\a -> a (myUrl webapp port "/") htmlshim) onstartup
+
+{- Creates a html shim file that's used to redirect into the webapp,
+ - to avoid exposing the secretToken when launching the web browser. -}
+writeHtmlShim :: WebApp -> PortNumber -> FilePath -> IO ()
+writeHtmlShim webapp port file = do
+ debug thisThread ["running on port", show port]
+ viaTmp go file $ genHtmlShim webapp port
+ where
+ go tmpfile content = do
+ h <- openFile tmpfile WriteMode
+ modifyFileMode tmpfile $ removeModes [groupReadMode, otherReadMode]
+ hPutStr h content
+ hClose h
+
+{- TODO: generate this static file using Yesod. -}
+genHtmlShim :: WebApp -> PortNumber -> String
+genHtmlShim webapp port = unlines
+ [ "<html>"
+ , "<head>"
+ , "<title>Starting webapp...</title>"
+ , "<meta http-equiv=\"refresh\" content=\"0; URL="++url++"\">"
+ , "<body>"
+ , "<p>"
+ , "<a href=\"" ++ url ++ "\">Starting webapp...</a>"
+ , "</p>"
+ , "</body>"
+ , "</html>"
+ ]
+ where
+ url = myUrl webapp port "/"
+
+myUrl :: WebApp -> PortNumber -> FilePath -> Url
+myUrl webapp port page = "http://localhost:" ++ show port ++ page ++
+ "?auth=" ++ unpack (secretToken webapp)
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
new file mode 100644
index 000000000..fe2c667f9
--- /dev/null
+++ b/Assistant/TransferQueue.hs
@@ -0,0 +1,154 @@
+{- git-annex assistant pending transfer queue
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.TransferQueue (
+ TransferQueue,
+ Schedule(..),
+ newTransferQueue,
+ getTransferQueue,
+ queueTransfers,
+ queueTransfer,
+ queueTransferAt,
+ queueTransferWhenSmall,
+ getNextTransfer,
+ dequeueTransfer,
+) where
+
+import Common.Annex
+import Assistant.DaemonStatus
+import Logs.Transfer
+import Types.Remote
+import qualified Remote
+import qualified Types.Remote as Remote
+
+import Control.Concurrent.STM
+import qualified Data.Map as M
+
+{- The transfer queue consists of a channel listing the transfers to make;
+ - the size of the queue is also tracked, and a list is maintained
+ - in parallel to allow for reading. -}
+data TransferQueue = TransferQueue
+ { queue :: TChan (Transfer, TransferInfo)
+ , queuesize :: TVar Int
+ , queuelist :: TVar [(Transfer, TransferInfo)]
+ }
+
+data Schedule = Next | Later
+ deriving (Eq)
+
+newTransferQueue :: IO TransferQueue
+newTransferQueue = atomically $ TransferQueue
+ <$> newTChan
+ <*> newTVar 0
+ <*> newTVar []
+
+{- Reads the queue's content without blocking or changing it. -}
+getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)]
+getTransferQueue q = atomically $ readTVar $ queuelist q
+
+stubInfo :: AssociatedFile -> Remote -> TransferInfo
+stubInfo f r = TransferInfo
+ { startedTime = Nothing
+ , transferPid = Nothing
+ , transferTid = Nothing
+ , transferRemote = Just r
+ , bytesComplete = Nothing
+ , associatedFile = f
+ , transferPaused = False
+ }
+
+{- Adds transfers to queue for some of the known remotes. -}
+queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
+queueTransfers schedule q dstatus k f direction = do
+ rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
+ mapM_ go =<< sufficientremotes rs
+ where
+ sufficientremotes rs
+ -- Queue downloads from all remotes that
+ -- have the key, with the cheapest ones first.
+ -- More expensive ones will only be tried if
+ -- downloading from a cheap one fails.
+ | direction == Download = do
+ uuids <- Remote.keyLocations k
+ return $ filter (\r -> uuid r `elem` uuids) rs
+ -- TODO: Determine a smaller set of remotes that
+ -- can be uploaded to, in order to ensure all
+ -- remotes can access the content. Currently,
+ -- send to every remote we can.
+ | otherwise = return $ filter (not . Remote.readonly) rs
+ gentransfer r = Transfer
+ { transferDirection = direction
+ , transferKey = k
+ , transferUUID = Remote.uuid r
+ }
+ go r = liftIO $
+ enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
+
+enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
+enqueue schedule q dstatus t info
+ | schedule == Next = go unGetTChan (new:)
+ | otherwise = go writeTChan (\l -> l++[new])
+ where
+ new = (t, info)
+ go modqueue modlist = do
+ atomically $ do
+ void $ modqueue (queue q) new
+ void $ modifyTVar' (queuesize q) succ
+ void $ modifyTVar' (queuelist q) modlist
+ void $ notifyTransfer dstatus
+
+{- Adds a transfer to the queue. -}
+queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransfer schedule q dstatus f t remote =
+ enqueue schedule q dstatus t (stubInfo f remote)
+
+{- Blocks until the queue is no larger than a given size, and then adds a
+ - transfer to the queue. -}
+queueTransferAt :: Int -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransferAt wantsz schedule q dstatus f t remote = do
+ atomically $ do
+ sz <- readTVar (queuesize q)
+ if sz <= wantsz
+ then return ()
+ else retry -- blocks until queuesize changes
+ enqueue schedule q dstatus t (stubInfo f remote)
+
+queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransferWhenSmall = queueTransferAt 10 Later
+
+{- Blocks until a pending transfer is available from the queue,
+ - and removes it.
+ -
+ - Checks that it's acceptable, before adding it to the
+ - the currentTransfers map. If it's not acceptable, it's discarded.
+ -
+ - This is done in a single STM transaction, so there is no window
+ - where an observer sees an inconsistent status. -}
+getNextTransfer :: TransferQueue -> DaemonStatusHandle -> (TransferInfo -> Bool) -> IO (Maybe (Transfer, TransferInfo))
+getNextTransfer q dstatus acceptable = atomically $ do
+ void $ modifyTVar' (queuesize q) pred
+ void $ modifyTVar' (queuelist q) (drop 1)
+ r@(t, info) <- readTChan (queue q)
+ if acceptable info
+ then do
+ adjustTransfersSTM dstatus $
+ M.insertWith' const t info
+ return $ Just r
+ else return Nothing
+
+{- Removes a transfer from the queue, if present, and returns True if it
+ - was present. -}
+dequeueTransfer :: TransferQueue -> DaemonStatusHandle -> Transfer -> IO Bool
+dequeueTransfer q dstatus t = do
+ ok <- atomically $ do
+ (l, removed) <- partition (\i -> fst i /= t) <$> readTVar (queuelist q)
+ void $ writeTVar (queuesize q) (length l)
+ void $ writeTVar (queuelist q) l
+ return $ not $ null removed
+ when ok $
+ notifyTransfer dstatus
+ return ok
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
new file mode 100644
index 000000000..27b869f1d
--- /dev/null
+++ b/Assistant/TransferSlots.hs
@@ -0,0 +1,71 @@
+{- git-annex assistant transfer slots
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE DeriveDataTypeable #-}
+
+module Assistant.TransferSlots where
+
+import qualified Control.Exception as E
+import Control.Concurrent
+import Data.Typeable
+
+import Common.Annex
+import Utility.ThreadScheduler
+
+type TransferSlots = QSemN
+
+{- A special exception that can be thrown to pause or resume a transfer, while
+ - keeping its slot in use. -}
+data TransferException = PauseTransfer | ResumeTransfer
+ deriving (Show, Eq, Typeable)
+
+instance E.Exception TransferException
+
+type TransferSlotRunner = TransferSlots -> IO () -> IO ThreadId
+
+{- Number of concurrent transfers allowed to be run from the assistant.
+ -
+ - Transfers launched by other means, including by remote assistants,
+ - do not currently take up slots.
+ -}
+numSlots :: Int
+numSlots = 1
+
+newTransferSlots :: IO TransferSlots
+newTransferSlots = newQSemN numSlots
+
+{- Waits until a transfer slot becomes available, and runs a transfer
+ - action in the slot, in its own thread.
+ -}
+inTransferSlot :: TransferSlotRunner
+inTransferSlot = runTransferSlot (\s -> waitQSemN s 1)
+
+{- Runs a transfer action, without waiting for a slot to become available. -}
+inImmediateTransferSlot :: TransferSlotRunner
+inImmediateTransferSlot = runTransferSlot (\s -> signalQSemN s (-1))
+
+{- Note that the action is subject to being killed when the transfer
+ - is canceled or paused.
+ -
+ - A PauseTransfer exception is handled by letting the action be killed,
+ - then pausing the thread until a ResumeTransfer exception is raised,
+ - then rerunning the action.
+ -}
+runTransferSlot :: (QSemN -> IO ()) -> TransferSlotRunner
+runTransferSlot allocator s transfer = do
+ allocator s
+ forkIO $ E.bracket_ noop (signalQSemN s 1) go
+ where
+ go = catchPauseResume transfer
+ pause = catchPauseResume $ runEvery (Seconds 86400) noop
+ catchPauseResume a = E.catch a handlePauseResume
+ handlePauseResume PauseTransfer = do
+ putStrLn "pause"
+ pause
+ handlePauseResume ResumeTransfer = do
+ putStrLn "resume"
+ go
diff --git a/Assistant/WebApp.hs b/Assistant/WebApp.hs
new file mode 100644
index 000000000..721257294
--- /dev/null
+++ b/Assistant/WebApp.hs
@@ -0,0 +1,177 @@
+{- git-annex assistant webapp data types
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE TypeFamilies, QuasiQuotes, MultiParamTypeClasses, TemplateHaskell, OverloadedStrings, RankNTypes #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+
+module Assistant.WebApp where
+
+import Assistant.Common
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import Assistant.Alert
+import Utility.NotificationBroadcaster
+import Utility.WebApp
+import Utility.Yesod
+import Logs.Transfer
+
+import Yesod
+import Yesod.Static
+import Text.Hamlet
+import Data.Text (Text, pack, unpack)
+import Control.Concurrent.STM
+
+staticFiles "static"
+
+mkYesodData "WebApp" $(parseRoutesFile "Assistant/WebApp/routes")
+
+data WebApp = WebApp
+ { threadState :: Maybe ThreadState
+ , daemonStatus :: DaemonStatusHandle
+ , scanRemotes :: ScanRemoteMap
+ , transferQueue :: TransferQueue
+ , transferSlots :: TransferSlots
+ , secretToken :: Text
+ , relDir :: Maybe FilePath
+ , getStatic :: Static
+ , webAppState :: TMVar WebAppState
+ , postFirstRun :: Maybe (IO String)
+ }
+
+data NavBarItem = DashBoard | Config | About
+ deriving (Eq)
+
+navBarName :: NavBarItem -> Text
+navBarName DashBoard = "Dashboard"
+navBarName Config = "Configuration"
+navBarName About = "About"
+
+navBarRoute :: NavBarItem -> Route WebApp
+navBarRoute DashBoard = HomeR
+navBarRoute Config = ConfigR
+navBarRoute About = AboutR
+
+defaultNavBar :: [NavBarItem]
+defaultNavBar = [DashBoard, Config, About]
+
+firstRunNavBar :: [NavBarItem]
+firstRunNavBar = [Config, About]
+
+selectNavBar :: Handler [NavBarItem]
+selectNavBar = ifM (inFirstRun) (return firstRunNavBar, return defaultNavBar)
+
+inFirstRun :: Handler Bool
+inFirstRun = isNothing . relDir <$> getYesod
+
+{- Used instead of defaultContent; highlights the current page if it's
+ - on the navbar. -}
+bootstrap :: Maybe NavBarItem -> Widget -> Handler RepHtml
+bootstrap navbaritem content = do
+ webapp <- getYesod
+ navbar <- map navdetails <$> selectNavBar
+ page <- widgetToPageContent $ do
+ addStylesheet $ StaticR css_bootstrap_css
+ addStylesheet $ StaticR css_bootstrap_responsive_css
+ addScript $ StaticR jquery_full_js
+ addScript $ StaticR js_bootstrap_dropdown_js
+ addScript $ StaticR js_bootstrap_modal_js
+ $(widgetFile "page")
+ hamletToRepHtml $(hamletFile $ hamletTemplate "bootstrap")
+ where
+ navdetails i = (navBarName i, navBarRoute i, Just i == navbaritem)
+
+instance Yesod WebApp where
+ {- Require an auth token be set when accessing any (non-static route) -}
+ isAuthorized _ _ = checkAuthToken secretToken
+
+ {- Add the auth token to every url generated, except static subsite
+ - urls (which can show up in Permission Denied pages). -}
+ joinPath = insertAuthToken secretToken excludeStatic
+ where
+ excludeStatic [] = True
+ excludeStatic (p:_) = p /= "static"
+
+ makeSessionBackend = webAppSessionBackend
+ jsLoader _ = BottomOfHeadBlocking
+
+instance RenderMessage WebApp FormMessage where
+ renderMessage _ _ = defaultFormMessage
+
+type Form x = Html -> MForm WebApp WebApp (FormResult x, Widget)
+
+data WebAppState = WebAppState
+ { showIntro :: Bool
+ }
+
+newWebAppState :: IO (TMVar WebAppState)
+newWebAppState = liftIO $ atomically $
+ newTMVar $ WebAppState { showIntro = True }
+
+getWebAppState :: forall sub. GHandler sub WebApp WebAppState
+getWebAppState = liftIO . atomically . readTMVar =<< webAppState <$> getYesod
+
+modifyWebAppState :: forall sub. (WebAppState -> WebAppState) -> GHandler sub WebApp ()
+modifyWebAppState a = go =<< webAppState <$> getYesod
+ where
+ go s = liftIO $ atomically $ do
+ v <- takeTMVar s
+ putTMVar s $ a v
+
+{- Runs an Annex action from the webapp.
+ -
+ - When the webapp is run outside a git-annex repository, the fallback
+ - value is returned.
+ -}
+runAnnex :: forall sub a. a -> Annex a -> GHandler sub WebApp a
+runAnnex fallback a = maybe (return fallback) go =<< threadState <$> getYesod
+ where
+ go st = liftIO $ runThreadState st a
+
+waitNotifier :: forall sub. (DaemonStatus -> NotificationBroadcaster) -> NotificationId -> GHandler sub WebApp ()
+waitNotifier selector nid = do
+ notifier <- getNotifier selector
+ liftIO $ waitNotification $ notificationHandleFromId notifier nid
+
+newNotifier :: forall sub. (DaemonStatus -> NotificationBroadcaster) -> GHandler sub WebApp NotificationId
+newNotifier selector = do
+ notifier <- getNotifier selector
+ liftIO $ notificationHandleToId <$> newNotificationHandle notifier
+
+getNotifier :: forall sub. (DaemonStatus -> NotificationBroadcaster) -> GHandler sub WebApp NotificationBroadcaster
+getNotifier selector = do
+ webapp <- getYesod
+ liftIO $ selector <$> getDaemonStatus (daemonStatus webapp)
+
+instance PathPiece NotificationId where
+ toPathPiece = pack . show
+ fromPathPiece = readish . unpack
+
+instance PathPiece AlertId where
+ toPathPiece = pack . show
+ fromPathPiece = readish . unpack
+
+instance PathPiece Transfer where
+ toPathPiece = pack . show
+ fromPathPiece = readish . unpack
+
+{- Adds the auth parameter as a hidden field on a form. Must be put into
+ - every form. -}
+webAppFormAuthToken :: Widget
+webAppFormAuthToken = do
+ webapp <- lift getYesod
+ [whamlet|<input type="hidden" name="auth" value="#{secretToken webapp}">|]
+
+{- A button with an icon, and maybe label, that can be clicked to perform
+ - some action.
+ - With javascript, clicking it POSTs the Route, and remains on the same
+ - page.
+ - With noscript, clicking it GETs the Route. -}
+actionButton :: Route WebApp -> (Maybe String) -> String -> String -> Widget
+actionButton route label buttonclass iconclass = $(widgetFile "actionbutton")
diff --git a/Assistant/WebApp/Configurators.hs b/Assistant/WebApp/Configurators.hs
new file mode 100644
index 000000000..ad29459a9
--- /dev/null
+++ b/Assistant/WebApp/Configurators.hs
@@ -0,0 +1,363 @@
+{- git-annex assistant webapp configurators
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE TypeFamilies, QuasiQuotes, MultiParamTypeClasses, TemplateHaskell, OverloadedStrings, RankNTypes #-}
+
+module Assistant.WebApp.Configurators where
+
+import Assistant.Common
+import Assistant.WebApp
+import Assistant.WebApp.SideBar
+import Assistant.DaemonStatus
+import Assistant.Threads.MountWatcher (handleMount)
+import Utility.Yesod
+import qualified Remote
+import qualified Types.Remote as Remote
+import Remote.List
+import Annex.UUID (getUUID)
+import Init
+import qualified Git
+import qualified Git.Construct
+import qualified Git.Config
+import qualified Git.Command
+import qualified Annex
+import Locations.UserConfig
+import Utility.FreeDesktop
+import Utility.Mounts
+import Utility.DiskFree
+import Utility.DataUnits
+import Utility.Network
+
+import Yesod
+import Data.Text (Text)
+import qualified Data.Text as T
+import Data.Char
+import System.Posix.Directory
+import qualified Control.Exception as E
+
+{- The main configuration screen. -}
+getConfigR :: Handler RepHtml
+getConfigR = ifM (inFirstRun)
+ ( getFirstRepositoryR
+ , bootstrap (Just Config) $ do
+ sideBarDisplay
+ setTitle "Configuration"
+ $(widgetFile "configurators/main")
+ )
+
+{- Lists known repositories, followed by options to add more. -}
+getRepositoriesR :: Handler RepHtml
+getRepositoriesR = bootstrap (Just Config) $ do
+ sideBarDisplay
+ setTitle "Repositories"
+ repolist <- lift repoList
+ $(widgetFile "configurators/repositories")
+
+{- A numbered list of known repositories, including the current one. -}
+repoList :: Handler [(String, String)]
+repoList = do
+ rs <- filter (not . Remote.readonly) . knownRemotes <$>
+ (liftIO . getDaemonStatus =<< daemonStatus <$> getYesod)
+ l <- runAnnex [] $ do
+ u <- getUUID
+ Remote.prettyListUUIDs $ nub $ u:(map Remote.uuid rs)
+ return $ zip counter l
+ where
+ counter = map show ([1..] :: [Int])
+
+{- An intro message, list of repositories, and nudge to make more. -}
+introDisplay :: Text -> Widget
+introDisplay ident = do
+ webapp <- lift getYesod
+ repolist <- lift repoList
+ let n = length repolist
+ let numrepos = show n
+ let notenough = n < enough
+ let barelyenough = n == enough
+ let morethanenough = n > enough
+ $(widgetFile "configurators/intro")
+ lift $ modifyWebAppState $ \s -> s { showIntro = False }
+ where
+ enough = 2
+
+data RepositoryPath = RepositoryPath Text
+ deriving Show
+
+{- Custom field display for a RepositoryPath, with an icon etc.
+ -
+ - Validates that the path entered is not empty, and is a safe value
+ - to use as a repository. -}
+repositoryPathField :: forall sub. Bool -> Field sub WebApp Text
+repositoryPathField autofocus = Field { fieldParse = parse, fieldView = view }
+ where
+ view idAttr nameAttr attrs val isReq =
+ [whamlet|<input type="text" *{attrs} id="#{idAttr}" name="#{nameAttr}" :isReq:required :autofocus:autofocus value="#{either id id val}">|]
+
+ parse [path]
+ | T.null path = nopath
+ | otherwise = liftIO $ checkRepositoryPath path
+ parse [] = return $ Right Nothing
+ parse _ = nopath
+
+ nopath = return $ Left "Enter a location for the repository"
+
+{- As well as checking the path for a lot of silly things, tilde is
+ - expanded in the returned path. -}
+checkRepositoryPath :: Text -> IO (Either (SomeMessage WebApp) (Maybe Text))
+checkRepositoryPath p = do
+ home <- myHomeDir
+ let basepath = expandTilde home $ T.unpack p
+ path <- absPath basepath
+ let parent = parentDir path
+ problems <- catMaybes <$> mapM runcheck
+ [ (return $ path == "/", "Enter the full path to use for the repository.")
+ , (return $ all isSpace basepath, "A blank path? Seems unlikely.")
+ , (doesFileExist path, "A file already exists with that name.")
+ , (return $ path == home, "Sorry, using git-annex for your whole home directory is not currently supported.")
+ , (not <$> doesDirectoryExist parent, "Parent directory does not exist.")
+ , (not <$> canWrite path, "Cannot write a repository there.")
+ , (not <$> canMakeSymlink path, "That directory is on a filesystem that does not support symlinks. Try a different location.")
+ ]
+ return $
+ case headMaybe problems of
+ Nothing -> Right $ Just $ T.pack basepath
+ Just prob -> Left prob
+ where
+ runcheck (chk, msg) = ifM (chk)
+ ( return $ Just msg
+ , return Nothing
+ )
+ expandTilde home ('~':'/':path) = home </> path
+ expandTilde _ path = path
+
+
+{- On first run, if run in the home directory, default to putting it in
+ - ~/Desktop/annex, when a Desktop directory exists, and ~/annex otherwise.
+ -
+ - If run in another directory, the user probably wants to put it there. -}
+defaultRepositoryPath :: Bool -> IO FilePath
+defaultRepositoryPath firstrun = do
+ cwd <- liftIO $ getCurrentDirectory
+ home <- myHomeDir
+ if home == cwd && firstrun
+ then do
+ desktop <- userDesktopDir
+ ifM (doesDirectoryExist desktop)
+ (relHome (desktop </> "annex"), return "~/annex")
+ else return cwd
+
+localRepositoryForm :: Form RepositoryPath
+localRepositoryForm msg = do
+ path <- T.pack . addTrailingPathSeparator
+ <$> (liftIO . defaultRepositoryPath =<< lift inFirstRun)
+ (pathRes, pathView) <- mreq (repositoryPathField True) "" (Just path)
+ let (err, errmsg) = case pathRes of
+ FormMissing -> (False, "")
+ FormFailure l -> (True, concat $ map T.unpack l)
+ FormSuccess _ -> (False, "")
+ let form = do
+ webAppFormAuthToken
+ $(widgetFile "configurators/localrepositoryform")
+ return (RepositoryPath <$> pathRes, form)
+
+{- Making the first repository, when starting the webapp for the first time. -}
+getFirstRepositoryR :: Handler RepHtml
+getFirstRepositoryR = bootstrap (Just Config) $ do
+ sideBarDisplay
+ setTitle "Getting started"
+ ((res, form), enctype) <- lift $ runFormGet localRepositoryForm
+ case res of
+ FormSuccess (RepositoryPath p) -> lift $
+ startFullAssistant $ T.unpack p
+ _ -> $(widgetFile "configurators/firstrepository")
+
+data RemovableDrive = RemovableDrive
+ { diskFree :: Maybe Integer
+ , mountPoint :: Text
+ }
+ deriving (Show, Eq, Ord)
+
+selectDriveForm :: [RemovableDrive] -> Maybe RemovableDrive -> Form RemovableDrive
+selectDriveForm drives def = renderBootstrap $ RemovableDrive
+ <$> pure Nothing
+ <*> areq (selectFieldList pairs) "Select drive:" (mountPoint <$> def)
+ where
+ pairs = zip (map describe drives) (map mountPoint drives)
+ describe drive = case diskFree drive of
+ Nothing -> mountPoint drive
+ Just free ->
+ let sz = roughSize storageUnits True free
+ in T.unwords
+ [ mountPoint drive
+ , T.concat ["(", T.pack sz]
+ , "free)"
+ ]
+
+{- Adding a removable drive. -}
+getAddDriveR :: Handler RepHtml
+getAddDriveR = bootstrap (Just Config) $ do
+ sideBarDisplay
+ setTitle "Add a removable drive"
+ removabledrives <- liftIO $ driveList
+ writabledrives <- liftIO $
+ filterM (canWrite . T.unpack . mountPoint) removabledrives
+ ((res, form), enctype) <- lift $ runFormGet $
+ selectDriveForm (sort writabledrives) Nothing
+ case res of
+ FormSuccess (RemovableDrive { mountPoint = d }) -> lift $ do
+ go $ T.unpack d
+ setMessage $ toHtml $ T.unwords ["Added", d]
+ redirect RepositoriesR
+ _ -> do
+ let authtoken = webAppFormAuthToken
+ $(widgetFile "configurators/adddrive")
+ where
+ go mountpoint = do
+ liftIO $ makerepo dir
+ liftIO $ initRepo dir $ Just remotename
+ addremotes dir remotename
+ webapp <- getYesod
+ liftIO $ syncrepo dir webapp
+ where
+ dir = mountpoint </> "annex"
+ remotename = takeFileName mountpoint
+ {- The repo may already exist, when adding removable media
+ - that has already been used elsewhere. -}
+ makerepo dir = liftIO $ do
+ r <- E.try (inDir dir $ return True) :: IO (Either E.SomeException Bool)
+ case r of
+ Right _ -> noop
+ Left _e -> do
+ createDirectoryIfMissing True dir
+ bare <- not <$> canMakeSymlink dir
+ makeRepo dir bare
+ {- Synthesize a mount event of the new git repository.
+ - This will sync it, and queue file transfers. -}
+ syncrepo dir webapp =
+ handleMount
+ (fromJust $ threadState webapp)
+ (daemonStatus webapp)
+ (scanRemotes webapp)
+ dir
+ {- Each repository is made a remote of the other. -}
+ addremotes dir name = runAnnex () $ do
+ hostname <- maybe "host" id <$> liftIO getHostname
+ hostlocation <- fromRepo Git.repoLocation
+ void $ liftIO $ inDir dir $
+ addremote hostname hostlocation
+ whenM (addremote name dir) $
+ void $ remoteListRefresh
+ {- Adds a remote only if there is not already one with
+ - the location. -}
+ addremote name location = inRepo $ \r ->
+ if (null $ filter samelocation $ Git.remotes r)
+ then do
+ let name' = uniqueremotename r name (0 :: Int)
+ Git.Command.runBool "remote"
+ [Param "add", Param name', Param location] r
+ else return False
+ where
+ samelocation x = Git.repoLocation x == location
+ {- Generate an unused name for a remote, adding a number if
+ - necessary. -}
+ uniqueremotename r basename n
+ | null namecollision = name
+ | otherwise = uniqueremotename r basename (succ n)
+ where
+ namecollision = filter samename (Git.remotes r)
+ samename x = Git.remoteName x == Just name
+ name
+ | n == 0 = basename
+ | otherwise = basename ++ show n
+
+{- List of removable drives. -}
+driveList :: IO [RemovableDrive]
+driveList = mapM (gen . mnt_dir) =<< filter sane <$> getMounts
+ where
+ gen dir = RemovableDrive
+ <$> getDiskFree dir
+ <*> pure (T.pack dir)
+ -- filter out some things that are surely not removable drives
+ sane Mntent { mnt_dir = dir, mnt_fsname = dev }
+ {- We want real disks like /dev/foo, not
+ - dummy mount points like proc or tmpfs or
+ - gvfs-fuse-daemon. -}
+ | not ('/' `elem` dev) = False
+ {- Just in case: These mount points are surely not
+ - removable disks. -}
+ | dir == "/" = False
+ | dir == "/tmp" = False
+ | dir == "/run/shm" = False
+ | dir == "/run/lock" = False
+ | otherwise = True
+
+{- Bootstraps from first run mode to a fully running assistant in a
+ - repository, by running the postFirstRun callback, which returns the
+ - url to the new webapp. -}
+startFullAssistant :: FilePath -> Handler ()
+startFullAssistant path = do
+ webapp <- getYesod
+ url <- liftIO $ do
+ makeRepo path False
+ initRepo path Nothing
+ addAutoStart path
+ changeWorkingDirectory path
+ fromJust $ postFirstRun webapp
+ redirect $ T.pack url
+
+{- Makes a new git-annex repository. -}
+makeRepo :: FilePath -> Bool -> IO ()
+makeRepo path bare = do
+ unlessM (boolSystem "git" params) $
+ error "git init failed!"
+ where
+ baseparams = [Param "init", Param "--quiet"]
+ params
+ | bare = baseparams ++ [Param "--bare", File path]
+ | otherwise = baseparams ++ [File path]
+
+{- Runs an action in the git-annex repository in the specified directory. -}
+inDir :: FilePath -> Annex a -> IO a
+inDir dir a = do
+ state <- Annex.new =<< Git.Config.read =<< Git.Construct.fromPath dir
+ Annex.eval state a
+
+{- Initializes a git-annex repository in a directory with a description. -}
+initRepo :: FilePath -> Maybe String -> IO ()
+initRepo dir desc = inDir dir $
+ unlessM isInitialized $
+ initialize desc
+
+{- Adds a directory to the autostart file. -}
+addAutoStart :: FilePath -> IO ()
+addAutoStart path = do
+ autostart <- autoStartFile
+ createDirectoryIfMissing True (parentDir autostart)
+ appendFile autostart $ path ++ "\n"
+
+{- Checks if the user can write to a directory.
+ -
+ - The directory may be in the process of being created; if so
+ - the parent directory is checked instead. -}
+canWrite :: FilePath -> IO Bool
+canWrite dir = do
+ tocheck <- ifM (doesDirectoryExist dir)
+ (return dir, return $ parentDir dir)
+ catchBoolIO $ fileAccess tocheck False True False
+
+{- Checks if a directory is on a filesystem that supports symlinks. -}
+canMakeSymlink :: FilePath -> IO Bool
+canMakeSymlink dir = ifM (doesDirectoryExist dir)
+ ( catchBoolIO $ test dir
+ , canMakeSymlink (parentDir dir)
+ )
+ where
+ test d = do
+ let link = d </> "delete.me"
+ createSymbolicLink link link
+ removeLink link
+ return True
diff --git a/Assistant/WebApp/DashBoard.hs b/Assistant/WebApp/DashBoard.hs
new file mode 100644
index 000000000..949793121
--- /dev/null
+++ b/Assistant/WebApp/DashBoard.hs
@@ -0,0 +1,219 @@
+{- git-annex assistant webapp dashboard
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE CPP, TypeFamilies, QuasiQuotes, MultiParamTypeClasses, TemplateHaskell, OverloadedStrings, RankNTypes #-}
+
+module Assistant.WebApp.DashBoard where
+
+import Assistant.Common
+import Assistant.WebApp
+import Assistant.WebApp.SideBar
+import Assistant.WebApp.Notifications
+import Assistant.WebApp.Configurators
+import Assistant.DaemonStatus
+import Assistant.TransferQueue
+import Assistant.TransferSlots
+import qualified Assistant.Threads.Transferrer as Transferrer
+import Utility.NotificationBroadcaster
+import Utility.Yesod
+import Logs.Transfer
+import Utility.Percentage
+import Utility.DataUnits
+import Types.Key
+import qualified Remote
+import qualified Git
+import Locations.UserConfig
+
+import Yesod
+import Text.Hamlet
+import qualified Data.Map as M
+import Control.Concurrent
+import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL)
+import System.Posix.Process (getProcessGroupIDOf)
+
+{- A display of currently running and queued transfers.
+ -
+ - Or, if there have never been any this run, an intro display. -}
+transfersDisplay :: Bool -> Widget
+transfersDisplay warnNoScript = do
+ webapp <- lift getYesod
+ current <- lift $ M.toList <$> getCurrentTransfers
+ queued <- liftIO $ getTransferQueue $ transferQueue webapp
+ let ident = "transfers"
+ autoUpdate ident NotifierTransfersR (10 :: Int) (10 :: Int)
+ let transfers = current ++ queued
+ if null transfers
+ then ifM (lift $ showIntro <$> getWebAppState)
+ ( introDisplay ident
+ , $(widgetFile "dashboard/transfers")
+ )
+ else $(widgetFile "dashboard/transfers")
+ where
+ isrunning info = not $
+ transferPaused info || isNothing (startedTime info)
+
+{- Called by client to get a display of currently in process transfers.
+ -
+ - Returns a div, which will be inserted into the calling page.
+ -
+ - Note that the head of the widget is not included, only its
+ - body is. To get the widget head content, the widget is also
+ - inserted onto the getHomeR page.
+ -}
+getTransfersR :: NotificationId -> Handler RepHtml
+getTransfersR nid = do
+ waitNotifier transferNotifier nid
+
+ page <- widgetToPageContent $ transfersDisplay False
+ hamletToRepHtml $ [hamlet|^{pageBody page}|]
+
+{- The main dashboard. -}
+dashboard :: Bool -> Widget
+dashboard warnNoScript = do
+ sideBarDisplay
+ let content = transfersDisplay warnNoScript
+ $(widgetFile "dashboard/main")
+
+getHomeR :: Handler RepHtml
+getHomeR = ifM (inFirstRun)
+ ( redirect ConfigR
+ , bootstrap (Just DashBoard) $ dashboard True
+ )
+
+{- Same as HomeR, except no autorefresh at all (and no noscript warning). -}
+getNoScriptR :: Handler RepHtml
+getNoScriptR = bootstrap (Just DashBoard) $ dashboard False
+
+{- Same as HomeR, except with autorefreshing via meta refresh. -}
+getNoScriptAutoR :: Handler RepHtml
+getNoScriptAutoR = bootstrap (Just DashBoard) $ do
+ let ident = NoScriptR
+ let delayseconds = 3 :: Int
+ let this = NoScriptAutoR
+ toWidgetHead $(hamletFile $ hamletTemplate "dashboard/metarefresh")
+ dashboard False
+
+{- The javascript code does a post. -}
+postFileBrowserR :: Handler ()
+postFileBrowserR = void openFileBrowser
+
+{- Used by non-javascript browsers, where clicking on the link actually
+ - opens this page, so we redirect back to the referrer. -}
+getFileBrowserR :: Handler ()
+getFileBrowserR = whenM openFileBrowser $ redirectBack
+
+redirectBack :: Handler ()
+redirectBack = do
+ clearUltDest
+ setUltDestReferer
+ redirectUltDest HomeR
+
+{- Opens the system file browser on the repo, or, as a fallback,
+ - goes to a file:// url. Returns True if it's ok to redirect away
+ - from the page (ie, the system file browser was opened).
+ -
+ - Note that the command is opened using a different thread, to avoid
+ - blocking the response to the browser on it. -}
+openFileBrowser :: Handler Bool
+openFileBrowser = do
+ path <- runAnnex (error "no configured repository") $
+ fromRepo Git.repoPath
+ ifM (liftIO $ inPath cmd <&&> inPath cmd)
+ ( do
+ void $ liftIO $ forkIO $ void $
+ boolSystem cmd [Param path]
+ return True
+ , do
+ clearUltDest
+ setUltDest $ "file://" ++ path
+ void $ redirectUltDest HomeR
+ return False
+ )
+ where
+#if OSX
+ cmd = "open"
+#else
+ cmd = "xdg-open"
+#endif
+
+{- Transfer controls. The GET is done in noscript mode and redirects back
+ - to the referring page. The POST is called by javascript. -}
+getPauseTransferR :: Transfer -> Handler ()
+getPauseTransferR t = pauseTransfer t >> redirectBack
+postPauseTransferR :: Transfer -> Handler ()
+postPauseTransferR t = pauseTransfer t
+getStartTransferR :: Transfer -> Handler ()
+getStartTransferR t = startTransfer t >> redirectBack
+postStartTransferR :: Transfer -> Handler ()
+postStartTransferR t = startTransfer t
+getCancelTransferR :: Transfer -> Handler ()
+getCancelTransferR t = cancelTransfer False t >> redirectBack
+postCancelTransferR :: Transfer -> Handler ()
+postCancelTransferR t = cancelTransfer False t
+
+pauseTransfer :: Transfer -> Handler ()
+pauseTransfer = cancelTransfer True
+
+cancelTransfer :: Bool -> Transfer-> Handler ()
+cancelTransfer pause t = do
+ webapp <- getYesod
+ let dstatus = daemonStatus webapp
+ m <- getCurrentTransfers
+ liftIO $ do
+ {- remove queued transfer -}
+ void $ dequeueTransfer (transferQueue webapp) dstatus t
+ {- stop running transfer -}
+ maybe noop (stop dstatus) (M.lookup t m)
+ where
+ stop dstatus info = do
+ {- When there's a thread associated with the
+ - transfer, it's killed first, to avoid it
+ - displaying any alert about the transfer having
+ - failed when the transfer process is killed. -}
+ maybe noop signalthread $ transferTid info
+ maybe noop killproc $ transferPid info
+ if pause
+ then void $
+ updateTransferInfo dstatus t $ info
+ { transferPaused = True }
+ else void $
+ removeTransfer dstatus t
+ signalthread tid
+ | pause = throwTo tid PauseTransfer
+ | otherwise = killThread tid
+ {- In order to stop helper processes like rsync,
+ - kill the whole process group of the process running the
+ - transfer. -}
+ killproc pid = do
+ g <- getProcessGroupIDOf pid
+ void $ tryIO $ signalProcessGroup sigTERM g
+ threadDelay 100000 -- 0.1 second grace period
+ void $ tryIO $ signalProcessGroup sigKILL g
+
+startTransfer :: Transfer -> Handler ()
+startTransfer t = do
+ m <- getCurrentTransfers
+ maybe noop resume (M.lookup t m)
+ -- TODO: handle starting a queued transfer
+ where
+ resume info = maybe (start info) signalthread $ transferTid info
+ signalthread tid = liftIO $ throwTo tid ResumeTransfer
+ start info = do
+ webapp <- getYesod
+ let dstatus = daemonStatus webapp
+ let slots = transferSlots webapp
+ {- This transfer was being run by another process,
+ - forget that old pid, and start a new one. -}
+ liftIO $ updateTransferInfo dstatus t $ info
+ { transferPid = Nothing }
+ liftIO $ Transferrer.transferThread
+ dstatus slots t info inImmediateTransferSlot
+ =<< readProgramFile
+
+getCurrentTransfers :: Handler TransferMap
+getCurrentTransfers = currentTransfers
+ <$> (liftIO . getDaemonStatus =<< daemonStatus <$> getYesod)
diff --git a/Assistant/WebApp/Documentation.hs b/Assistant/WebApp/Documentation.hs
new file mode 100644
index 000000000..b0a9e4d98
--- /dev/null
+++ b/Assistant/WebApp/Documentation.hs
@@ -0,0 +1,22 @@
+{- git-annex assistant webapp documentation
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE TypeFamilies, QuasiQuotes, MultiParamTypeClasses, TemplateHaskell, OverloadedStrings, RankNTypes #-}
+
+module Assistant.WebApp.Documentation where
+
+import Assistant.WebApp
+import Assistant.WebApp.SideBar
+import Utility.Yesod
+
+import Yesod
+
+getAboutR :: Handler RepHtml
+getAboutR = bootstrap (Just About) $ do
+ sideBarDisplay
+ setTitle "About git-annex"
+ $(widgetFile "documentation/about")
diff --git a/Assistant/WebApp/Notifications.hs b/Assistant/WebApp/Notifications.hs
new file mode 100644
index 000000000..3aa56424a
--- /dev/null
+++ b/Assistant/WebApp/Notifications.hs
@@ -0,0 +1,58 @@
+{- git-annex assistant webapp notifications
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE TypeFamilies, QuasiQuotes, MultiParamTypeClasses, TemplateHaskell, OverloadedStrings, RankNTypes #-}
+
+module Assistant.WebApp.Notifications where
+
+import Assistant.Common
+import Assistant.WebApp
+import Assistant.DaemonStatus
+import Utility.NotificationBroadcaster
+import Utility.Yesod
+
+import Yesod
+import Data.Text (Text)
+import qualified Data.Text as T
+
+{- Add to any widget to make it auto-update using long polling.
+ -
+ - The widget should have a html element with an id=ident, which will be
+ - replaced when it's updated.
+ -
+ - The geturl route should return the notifier url to use for polling.
+ -
+ - ms_delay is how long to delay between AJAX updates
+ - ms_startdelay is how long to delay before updating with AJAX at the start
+ -}
+autoUpdate :: Text -> Route WebApp -> Int -> Int -> Widget
+autoUpdate ident geturl ms_delay ms_startdelay = do
+ let delay = show ms_delay
+ let startdelay = show ms_startdelay
+ addScript $ StaticR longpolling_js
+ $(widgetFile "notifications/longpolling")
+
+{- Notifier urls are requested by the javascript, to avoid allocation
+ - of NotificationIds when noscript pages are loaded. This constructs a
+ - notifier url for a given Route and NotificationBroadcaster.
+ -}
+notifierUrl :: (NotificationId -> Route WebApp) -> (DaemonStatus -> NotificationBroadcaster) -> Handler RepPlain
+notifierUrl route selector = do
+ (urlbits, _params) <- renderRoute . route <$> newNotifier selector
+ webapp <- getYesod
+ return $ RepPlain $ toContent $ T.concat
+ [ "/"
+ , T.intercalate "/" urlbits
+ , "?auth="
+ , secretToken webapp
+ ]
+
+getNotifierTransfersR :: Handler RepPlain
+getNotifierTransfersR = notifierUrl TransfersR transferNotifier
+
+getNotifierSideBarR :: Handler RepPlain
+getNotifierSideBarR = notifierUrl SideBarR alertNotifier
diff --git a/Assistant/WebApp/SideBar.hs b/Assistant/WebApp/SideBar.hs
new file mode 100644
index 000000000..d44c75d43
--- /dev/null
+++ b/Assistant/WebApp/SideBar.hs
@@ -0,0 +1,84 @@
+{- git-annex assistant webapp sidebar
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE TypeFamilies, QuasiQuotes, MultiParamTypeClasses, TemplateHaskell, OverloadedStrings, RankNTypes #-}
+
+module Assistant.WebApp.SideBar where
+
+import Assistant.Common
+import Assistant.WebApp
+import Assistant.WebApp.Notifications
+import Assistant.DaemonStatus
+import Assistant.Alert
+import Utility.NotificationBroadcaster
+import Utility.Yesod
+
+import Yesod
+import Data.Text (Text)
+import qualified Data.Map as M
+import Control.Concurrent
+
+sideBarDisplay :: Widget
+sideBarDisplay = do
+ let content = do
+ {- Add newest alerts to the sidebar. -}
+ webapp <- lift getYesod
+ alertpairs <- M.toList . alertMap
+ <$> liftIO (getDaemonStatus $ daemonStatus webapp)
+ mapM_ renderalert $
+ take displayAlerts $ reverse $ sortAlertPairs alertpairs
+ let ident = "sidebar"
+ $(widgetFile "sidebar/main")
+ autoUpdate ident NotifierSideBarR (10 :: Int) (10 :: Int)
+ where
+ bootstrapclass Activity = "alert-info"
+ bootstrapclass Warning = "alert"
+ bootstrapclass Error = "alert-error"
+ bootstrapclass Success = "alert-success"
+ bootstrapclass Message = "alert-info"
+
+ renderalert (alertid, alert) = addalert
+ alertid
+ (alertClosable alert)
+ (alertBlockDisplay alert)
+ (bootstrapclass $ alertClass alert)
+ (renderAlertHeader alert)
+ (renderAlertMessage alert)
+ (alertIcon alert)
+
+ addalert :: AlertId -> Bool -> Bool -> Text -> Maybe Text -> Text -> Maybe String -> Widget
+ addalert i closable block divclass heading message icon = do
+ let alertid = show i
+ $(widgetFile "sidebar/alert")
+
+{- Called by client to get a sidebar display.
+ -
+ - Returns a div, which will be inserted into the calling page.
+ -
+ - Note that the head of the widget is not included, only its
+ - body is. To get the widget head content, the widget is also
+ - inserted onto all pages.
+ -}
+getSideBarR :: NotificationId -> Handler RepHtml
+getSideBarR nid = do
+ waitNotifier alertNotifier nid
+
+ {- This 0.1 second delay avoids very transient notifications from
+ - being displayed and churning the sidebar unnecesarily.
+ -
+ - This needs to be below the level perceptable by the user,
+ - to avoid slowing down user actions like closing alerts. -}
+ liftIO $ threadDelay 100000
+
+ page <- widgetToPageContent sideBarDisplay
+ hamletToRepHtml $ [hamlet|^{pageBody page}|]
+
+{- Called by the client to close an alert. -}
+getCloseAlert :: AlertId -> Handler ()
+getCloseAlert i = do
+ webapp <- getYesod
+ void $ liftIO $ removeAlert (daemonStatus webapp) i
diff --git a/Assistant/WebApp/routes b/Assistant/WebApp/routes
new file mode 100644
index 000000000..e3e7daf87
--- /dev/null
+++ b/Assistant/WebApp/routes
@@ -0,0 +1,22 @@
+/ HomeR GET
+/noscript NoScriptR GET
+/noscript/auto NoScriptAutoR GET
+/about AboutR GET
+
+/config ConfigR GET
+/config/repository RepositoriesR GET
+/config/repository/add/drive AddDriveR GET
+/config/repository/first FirstRepositoryR GET
+
+/transfers/#NotificationId TransfersR GET
+/sidebar/#NotificationId SideBarR GET
+/notifier/transfers NotifierTransfersR GET
+/notifier/sidebar NotifierSideBarR GET
+/closealert/#AlertId CloseAlert GET
+/filebrowser FileBrowserR GET POST
+
+/transfer/pause/#Transfer PauseTransferR GET POST
+/transfer/start/#Transfer StartTransferR GET POST
+/transfer/cancel/#Transfer CancelTransferR GET POST
+
+/static StaticR Static getStatic