diff options
author | Joey Hess <joeyh@debian.org> | 2013-11-27 18:41:44 -0400 |
---|---|---|
committer | Joey Hess <joeyh@debian.org> | 2013-11-27 18:41:44 -0400 |
commit | 2e6d39d426f6b08f236d6071e671a9dcfc799d91 (patch) | |
tree | 1618fd9e34a30409ee0937cb4b3861ec3b5e7bba /Assistant |
git-annex (5.20131127) unstable; urgency=low
* webapp: Detect when upgrades are available, and upgrade if the user
desires.
(Only when git-annex is installed using the prebuilt binaries
from git-annex upstream, not from eg Debian.)
* assistant: Detect when the git-annex binary is modified or replaced,
and either prompt the user to restart the program, or automatically
restart it.
* annex.autoupgrade configures both the above upgrade behaviors.
* Added support for quvi 0.9. Slightly suboptimal due to limitations in its
interface compared with the old version.
* Bug fix: annex.version did not get set on automatic upgrade to v5 direct
mode repo, so the upgrade was performed repeatedly, slowing commands down.
* webapp: Fix bug that broke switching between local repositories
that use the new guarded direct mode.
* Android: Fix stripping of the git-annex binary.
* Android: Make terminal app show git-annex version number.
* Android: Re-enable XMPP support.
* reinject: Allow to be used in direct mode.
* Futher improvements to git repo repair. Has now been tested in tens
of thousands of intentionally damaged repos, and successfully
repaired them all.
* Allow use of --unused in bare repository.
# imported from the archive
Diffstat (limited to 'Assistant')
70 files changed, 8469 insertions, 0 deletions
diff --git a/Assistant/Alert.hs b/Assistant/Alert.hs new file mode 100644 index 000000000..c767d429d --- /dev/null +++ b/Assistant/Alert.hs @@ -0,0 +1,433 @@ +{- git-annex assistant alerts + - + - Copyright 2012, 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE OverloadedStrings, CPP #-} + +module Assistant.Alert where + +import Common.Annex +import Assistant.Types.Alert +import Assistant.Alert.Utility +import qualified Remote +import Utility.Tense +import Logs.Transfer +import Types.Distribution + +import Data.String +import qualified Data.Text as T +import qualified Control.Exception as E + +#ifdef WITH_WEBAPP +import Assistant.DaemonStatus +import Assistant.WebApp.Types +import Assistant.WebApp (renderUrl) +import Yesod +#endif +import Assistant.Monad +import Assistant.Types.UrlRenderer + +{- Makes a button for an alert that opens a Route. + - + - If autoclose is set, the button will close the alert it's + - attached to when clicked. -} +#ifdef WITH_WEBAPP +mkAlertButton :: Bool -> T.Text -> UrlRenderer -> Route WebApp -> Assistant AlertButton +mkAlertButton autoclose label urlrenderer route = do + close <- asIO1 removeAlert + url <- liftIO $ renderUrl urlrenderer route [] + return $ AlertButton + { buttonLabel = label + , buttonUrl = url + , buttonAction = if autoclose then Just close else Nothing + , buttonPrimary = True + } +#endif + +renderData :: Alert -> TenseText +renderData = tenseWords . alertData + +baseActivityAlert :: Alert +baseActivityAlert = Alert + { alertClass = Activity + , alertHeader = Nothing + , alertMessageRender = renderData + , alertData = [] + , alertCounter = 0 + , alertBlockDisplay = False + , alertClosable = False + , alertPriority = Medium + , alertIcon = Just ActivityIcon + , alertCombiner = Nothing + , alertName = Nothing + , alertButtons = [] + } + +warningAlert :: String -> String -> Alert +warningAlert name msg = Alert + { alertClass = Warning + , alertHeader = Just $ tenseWords ["warning"] + , alertMessageRender = renderData + , alertData = [UnTensed $ T.pack msg] + , alertCounter = 0 + , alertBlockDisplay = True + , alertClosable = True + , alertPriority = High + , alertIcon = Just ErrorIcon + , alertCombiner = Just $ dataCombiner $ \_old new -> new + , alertName = Just $ WarningAlert name + , alertButtons = [] + } + +errorAlert :: String -> [AlertButton] -> Alert +errorAlert msg buttons = Alert + { alertClass = Error + , alertHeader = Nothing + , alertMessageRender = renderData + , alertData = [UnTensed $ T.pack msg] + , alertCounter = 0 + , alertBlockDisplay = True + , alertClosable = True + , alertPriority = Pinned + , alertIcon = Just ErrorIcon + , alertCombiner = Nothing + , alertName = Nothing + , alertButtons = buttons + } + +activityAlert :: Maybe TenseText -> [TenseChunk] -> Alert +activityAlert header dat = baseActivityAlert + { alertHeader = header + , alertData = dat + } + +startupScanAlert :: Alert +startupScanAlert = activityAlert Nothing + [Tensed "Performing" "Performed", "startup scan"] + +{- Displayed when a shutdown is occurring, so will be seen after shutdown + - has happened. -} +shutdownAlert :: Alert +shutdownAlert = warningAlert "shutdown" "git-annex has been shut down" + +commitAlert :: Alert +commitAlert = activityAlert Nothing + [Tensed "Committing" "Committed", "changes to git"] + +showRemotes :: [Remote] -> TenseChunk +showRemotes = UnTensed . T.intercalate ", " . map (T.pack . Remote.name) + +syncAlert :: [Remote] -> Alert +syncAlert rs = baseActivityAlert + { alertName = Just SyncAlert + , alertHeader = Just $ tenseWords + [Tensed "Syncing" "Synced", "with", showRemotes rs] + , alertPriority = Low + , alertIcon = Just SyncIcon + } + +syncResultAlert :: [Remote] -> [Remote] -> Alert +syncResultAlert succeeded failed = makeAlertFiller (not $ null succeeded) $ + baseActivityAlert + { alertName = Just SyncAlert + , alertHeader = Just $ tenseWords msg + } + where + msg + | null succeeded = ["Failed to sync with", showRemotes failed] + | null failed = ["Synced with", showRemotes succeeded] + | otherwise = + [ "Synced with", showRemotes succeeded + , "but not with", showRemotes failed + ] + +sanityCheckAlert :: Alert +sanityCheckAlert = activityAlert + (Just $ tenseWords [Tensed "Running" "Ran", "daily sanity check"]) + ["to make sure everything is ok."] + +sanityCheckFixAlert :: String -> Alert +sanityCheckFixAlert msg = Alert + { alertClass = Warning + , alertHeader = Just $ tenseWords ["Fixed a problem"] + , alertMessageRender = render + , alertData = [UnTensed $ T.pack msg] + , alertCounter = 0 + , alertBlockDisplay = True + , alertPriority = High + , alertClosable = True + , alertIcon = Just ErrorIcon + , alertName = Just SanityCheckFixAlert + , alertCombiner = Just $ dataCombiner (++) + , alertButtons = [] + } + where + render alert = tenseWords $ alerthead : alertData alert ++ [alertfoot] + alerthead = "The daily sanity check found and fixed a problem:" + alertfoot = "If these problems persist, consider filing a bug report." + +fsckingAlert :: AlertButton -> Maybe Remote -> Alert +fsckingAlert button mr = baseActivityAlert + { alertData = case mr of + Nothing -> [ UnTensed $ T.pack $ "Consistency check in progress" ] + Just r -> [ UnTensed $ T.pack $ "Consistency check of " ++ Remote.name r ++ " in progress"] + , alertButtons = [button] + } + +showFscking :: UrlRenderer -> Maybe Remote -> IO (Either E.SomeException a) -> Assistant a +showFscking urlrenderer mr a = do +#ifdef WITH_WEBAPP + button <- mkAlertButton False (T.pack "Configure") urlrenderer ConfigFsckR + r <- alertDuring (fsckingAlert button mr) $ + liftIO a +#else + r <- liftIO a +#endif + either (liftIO . E.throwIO) return r + +notFsckedNudge :: UrlRenderer -> Maybe Remote -> Assistant () +#ifdef WITH_WEBAPP +notFsckedNudge urlrenderer mr = do + button <- mkAlertButton True (T.pack "Configure") urlrenderer ConfigFsckR + void $ addAlert (notFsckedAlert mr button) +#else +notFsckedNudge _ _ = noop +#endif + +notFsckedAlert :: Maybe Remote -> AlertButton -> Alert +notFsckedAlert mr button = Alert + { alertHeader = Just $ fromString $ concat + [ "You should enable consistency checking to protect your data" + , maybe "" (\r -> " in " ++ Remote.name r) mr + , "." + ] + , alertIcon = Just InfoIcon + , alertPriority = High + , alertButtons = [button] + , alertClosable = True + , alertClass = Message + , alertMessageRender = renderData + , alertCounter = 0 + , alertBlockDisplay = True + , alertName = Just NotFsckedAlert + , alertCombiner = Just $ dataCombiner $ \_old new -> new + , alertData = [] + } + +baseUpgradeAlert :: [AlertButton] -> TenseText -> Alert +baseUpgradeAlert buttons message = Alert + { alertHeader = Just message + , alertIcon = Just UpgradeIcon + , alertPriority = High + , alertButtons = buttons + , alertClosable = True + , alertClass = Message + , alertMessageRender = renderData + , alertCounter = 0 + , alertBlockDisplay = True + , alertName = Just UpgradeAlert + , alertCombiner = Just $ fullCombiner $ \new _old -> new + , alertData = [] + } + +canUpgradeAlert :: AlertPriority -> GitAnnexVersion -> AlertButton -> Alert +canUpgradeAlert priority version button = + (baseUpgradeAlert [button] $ fromString msg) + { alertPriority = priority + , alertData = [fromString $ " (version " ++ version ++ ")"] + } + where + msg = if priority >= High + then "An important upgrade of git-annex is available!" + else "An upgrade of git-annex is available." + +upgradeReadyAlert :: AlertButton -> Alert +upgradeReadyAlert button = baseUpgradeAlert [button] $ + fromString "A new version of git-annex has been installed." + +upgradingAlert :: Alert +upgradingAlert = activityAlert Nothing [ fromString "Upgrading git-annex" ] + +upgradeFinishedAlert :: Maybe AlertButton -> GitAnnexVersion -> Alert +upgradeFinishedAlert button version = + baseUpgradeAlert (maybe [] (:[]) button) $ fromString $ + "Finished upgrading git-annex to version " ++ version + +upgradeFailedAlert :: String -> Alert +upgradeFailedAlert msg = (errorAlert msg []) + { alertHeader = Just $ fromString "Upgrade failed." } + +brokenRepositoryAlert :: [AlertButton] -> Alert +brokenRepositoryAlert = errorAlert "Serious problems have been detected with your repository. This needs your immediate attention!" + +repairingAlert :: String -> Alert +repairingAlert repodesc = activityAlert Nothing + [ Tensed "Attempting to repair" "Repaired" + , UnTensed $ T.pack repodesc + ] + +pairingAlert :: AlertButton -> Alert +pairingAlert button = baseActivityAlert + { alertData = [ UnTensed "Pairing in progress" ] + , alertPriority = High + , alertButtons = [button] + } + +pairRequestReceivedAlert :: String -> AlertButton -> Alert +pairRequestReceivedAlert who button = Alert + { alertClass = Message + , alertHeader = Nothing + , alertMessageRender = renderData + , alertData = [UnTensed $ T.pack $ who ++ " is sending a pair request."] + , alertCounter = 0 + , alertBlockDisplay = False + , alertPriority = High + , alertClosable = True + , alertIcon = Just InfoIcon + , alertName = Just $ PairAlert who + , alertCombiner = Just $ dataCombiner $ \_old new -> new + , alertButtons = [button] + } + +pairRequestAcknowledgedAlert :: String -> Maybe AlertButton -> Alert +pairRequestAcknowledgedAlert who button = baseActivityAlert + { alertData = ["Pairing with", UnTensed (T.pack who), Tensed "in progress" "complete"] + , alertPriority = High + , alertName = Just $ PairAlert who + , alertCombiner = Just $ dataCombiner $ \_old new -> new + , alertButtons = maybe [] (:[]) button + } + +xmppNeededAlert :: AlertButton -> Alert +xmppNeededAlert button = Alert + { alertHeader = Just "Share with friends, and keep your devices in sync across the cloud." + , alertIcon = Just TheCloud + , alertPriority = High + , alertButtons = [button] + , alertClosable = True + , alertClass = Message + , alertMessageRender = renderData + , alertCounter = 0 + , alertBlockDisplay = True + , alertName = Just $ XMPPNeededAlert + , alertCombiner = Just $ dataCombiner $ \_old new -> new + , alertData = [] + } + +cloudRepoNeededAlert :: Maybe String -> AlertButton -> Alert +cloudRepoNeededAlert friendname button = Alert + { alertHeader = Just $ fromString $ unwords + [ "Unable to download files from" + , (fromMaybe "your other devices" friendname) ++ "." + ] + , alertIcon = Just ErrorIcon + , alertPriority = High + , alertButtons = [button] + , alertClosable = True + , alertClass = Message + , alertMessageRender = renderData + , alertCounter = 0 + , alertBlockDisplay = True + , alertName = Just $ CloudRepoNeededAlert + , alertCombiner = Just $ dataCombiner $ \_old new -> new + , alertData = [] + } + +remoteRemovalAlert :: String -> AlertButton -> Alert +remoteRemovalAlert desc button = Alert + { alertHeader = Just $ fromString $ + "The repository \"" ++ desc ++ + "\" has been emptied, and can now be removed." + , alertIcon = Just InfoIcon + , alertPriority = High + , alertButtons = [button] + , alertClosable = True + , alertClass = Message + , alertMessageRender = renderData + , alertCounter = 0 + , alertBlockDisplay = True + , alertName = Just $ RemoteRemovalAlert desc + , alertCombiner = Just $ dataCombiner $ \_old new -> new + , alertData = [] + } + +{- Show a message that relates to a list of files. + - + - The most recent several files are shown, and a count of any others. -} +fileAlert :: TenseChunk -> [FilePath] -> Alert +fileAlert msg files = (activityAlert Nothing shortfiles) + { alertName = Just $ FileAlert msg + , alertMessageRender = renderer + , alertCounter = counter + , alertCombiner = Just $ fullCombiner combiner + } + where + maxfilesshown = 10 + + (somefiles, counter) = splitcounter (dedupadjacent files) + shortfiles = map (fromString . shortFile . takeFileName) somefiles + + renderer alert = tenseWords $ msg : alertData alert ++ showcounter + where + showcounter = case alertCounter alert of + 0 -> [] + _ -> [fromString $ "and " ++ show (alertCounter alert) ++ " other files"] + + dedupadjacent (x:y:rest) + | x == y = dedupadjacent (y:rest) + | otherwise = x : dedupadjacent (y:rest) + dedupadjacent (x:[]) = [x] + dedupadjacent [] = [] + + {- Note that this ensures the counter is never 1; no need to say + - "1 file" when the filename could be shown. -} + splitcounter l + | length l <= maxfilesshown = (l, 0) + | otherwise = + let (keep, rest) = splitAt (maxfilesshown - 1) l + in (keep, length rest) + + combiner new old = + let (fs, n) = splitcounter $ + dedupadjacent $ alertData new ++ alertData old + cnt = n + alertCounter new + alertCounter old + in old + { alertData = fs + , alertCounter = cnt + } + +addFileAlert :: [FilePath] -> Alert +addFileAlert = fileAlert (Tensed "Adding" "Added") + +{- This is only used as a success alert after a transfer, not during it. -} +transferFileAlert :: Direction -> Bool -> FilePath -> Alert +transferFileAlert direction True file + | direction == Upload = fileAlert "Uploaded" [file] + | otherwise = fileAlert "Downloaded" [file] +transferFileAlert direction False file + | direction == Upload = fileAlert "Upload failed" [file] + | otherwise = fileAlert "Download failed" [file] + +dataCombiner :: ([TenseChunk] -> [TenseChunk] -> [TenseChunk]) -> AlertCombiner +dataCombiner combiner = fullCombiner $ + \new old -> old { alertData = alertData new `combiner` alertData old } + +fullCombiner :: (Alert -> Alert -> Alert) -> AlertCombiner +fullCombiner combiner new old + | alertClass new /= alertClass old = Nothing + | alertName new == alertName old = + Just $! new `combiner` old + | otherwise = Nothing + +shortFile :: FilePath -> String +shortFile f + | len < maxlen = f + | otherwise = take half f ++ ".." ++ drop (len - half) f + where + len = length f + maxlen = 20 + half = (maxlen - 2) `div` 2 + diff --git a/Assistant/Alert/Utility.hs b/Assistant/Alert/Utility.hs new file mode 100644 index 000000000..db2ea1925 --- /dev/null +++ b/Assistant/Alert/Utility.hs @@ -0,0 +1,130 @@ +{- git-annex assistant alert utilities + - + - Copyright 2012, 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Alert.Utility where + +import Common.Annex +import Assistant.Types.Alert +import Utility.Tense + +import qualified Data.Text as T +import Data.Text (Text) +import qualified Data.Map as M + +{- This is as many alerts as it makes sense to display at a time. + - A display might be smaller, or larger, the point is to not overwhelm the + - user with a ton of alerts. -} +displayAlerts :: Int +displayAlerts = 6 + +{- This is not a hard maximum, but there's no point in keeping a great + - many filler alerts in an AlertMap, so when there's more than this many, + - they start being pruned, down toward displayAlerts. -} +maxAlerts :: Int +maxAlerts = displayAlerts * 2 + +type AlertPair = (AlertId, Alert) + +{- The desired order is the reverse of: + - + - - Pinned alerts + - - High priority alerts, newest first + - - Medium priority Activity, newest first (mostly used for Activity) + - - Low priority alerts, newest first + - - Filler priorty alerts, newest first + - - Ties are broken by the AlertClass, with Errors etc coming first. + -} +compareAlertPairs :: AlertPair -> AlertPair -> Ordering +compareAlertPairs + (aid, Alert { alertClass = aclass, alertPriority = aprio }) + (bid, Alert { alertClass = bclass, alertPriority = bprio }) + = compare aprio bprio + `thenOrd` compare aid bid + `thenOrd` compare aclass bclass + +sortAlertPairs :: [AlertPair] -> [AlertPair] +sortAlertPairs = sortBy compareAlertPairs + +{- Renders an alert's header for display, if it has one. -} +renderAlertHeader :: Alert -> Maybe Text +renderAlertHeader alert = renderTense (alertTense alert) <$> alertHeader alert + +{- Renders an alert's message for display. -} +renderAlertMessage :: Alert -> Text +renderAlertMessage alert = renderTense (alertTense alert) $ + (alertMessageRender alert) alert + +showAlert :: Alert -> String +showAlert alert = T.unpack $ T.unwords $ catMaybes + [ renderAlertHeader alert + , Just $ renderAlertMessage alert + ] + +alertTense :: Alert -> Tense +alertTense alert + | alertClass alert == Activity = Present + | otherwise = Past + +{- Checks if two alerts display the same. -} +effectivelySameAlert :: Alert -> Alert -> Bool +effectivelySameAlert x y = all id + [ alertClass x == alertClass y + , alertHeader x == alertHeader y + , alertData x == alertData y + , alertBlockDisplay x == alertBlockDisplay y + , alertClosable x == alertClosable y + , alertPriority x == alertPriority y + ] + +makeAlertFiller :: Bool -> Alert -> Alert +makeAlertFiller success alert + | isFiller alert = alert + | otherwise = alert + { alertClass = if c == Activity then c' else c + , alertPriority = Filler + , alertClosable = True + , alertButtons = [] + , alertIcon = Just $ if success then SuccessIcon else ErrorIcon + } + where + c = alertClass alert + c' + | success = Success + | otherwise = Error + +isFiller :: Alert -> Bool +isFiller alert = alertPriority alert == Filler + +{- Updates the Alertmap, adding or updating an alert. + - + - Any old filler that looks the same as the alert is removed. + - + - Or, if the alert has an alertCombiner that combines it with + - an old alert, the old alert is replaced with the result, and the + - alert is removed. + - + - Old filler alerts are pruned once maxAlerts is reached. + -} +mergeAlert :: AlertId -> Alert -> AlertMap -> AlertMap +mergeAlert i al m = maybe updatePrune updateCombine (alertCombiner al) + where + pruneSame k al' = k == i || not (effectivelySameAlert al al') + pruneBloat m' + | bloat > 0 = M.fromList $ pruneold $ M.toList m' + | otherwise = m' + where + bloat = M.size m' - maxAlerts + pruneold l = + let (f, rest) = partition (\(_, a) -> isFiller a) l + in drop bloat f ++ rest + updatePrune = pruneBloat $ M.filterWithKey pruneSame $ + M.insertWith' const i al m + updateCombine combiner = + let combined = M.mapMaybe (combiner al) m + in if M.null combined + then updatePrune + else M.delete i $ M.union combined m diff --git a/Assistant/BranchChange.hs b/Assistant/BranchChange.hs new file mode 100644 index 000000000..c9354544a --- /dev/null +++ b/Assistant/BranchChange.hs @@ -0,0 +1,19 @@ +{- git-annex assistant git-annex branch change tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.BranchChange where + +import Assistant.Common +import Assistant.Types.BranchChange + +import Control.Concurrent.MSampleVar + +branchChanged :: Assistant () +branchChanged = flip writeSV () <<~ (fromBranchChangeHandle . branchChangeHandle) + +waitBranchChange :: Assistant () +waitBranchChange = readSV <<~ (fromBranchChangeHandle . branchChangeHandle) diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs new file mode 100644 index 000000000..2ecd2036c --- /dev/null +++ b/Assistant/Changes.hs @@ -0,0 +1,47 @@ +{- git-annex assistant change tracking + - + - Copyright 2012-2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Changes where + +import Assistant.Common +import Assistant.Types.Changes +import Utility.TList + +import Data.Time.Clock +import Control.Concurrent.STM + +{- Handlers call this when they made a change that needs to get committed. -} +madeChange :: FilePath -> ChangeInfo -> Assistant (Maybe Change) +madeChange f t = Just <$> (Change <$> liftIO getCurrentTime <*> pure f <*> pure t) + +noChange :: Assistant (Maybe Change) +noChange = return Nothing + +{- Indicates an add needs to be done, but has not started yet. -} +pendingAddChange :: FilePath -> Assistant (Maybe Change) +pendingAddChange f = Just <$> (PendingAddChange <$> liftIO getCurrentTime <*> pure f) + +{- Gets all unhandled changes. + - Blocks until at least one change is made. -} +getChanges :: Assistant [Change] +getChanges = (atomically . getTList) <<~ changePool + +{- Gets all unhandled changes, without blocking. -} +getAnyChanges :: Assistant [Change] +getAnyChanges = (atomically . takeTList) <<~ changePool + +{- Puts unhandled changes back into the pool. + - Note: Original order is not preserved. -} +refillChanges :: [Change] -> Assistant () +refillChanges cs = (atomically . flip appendTList cs) <<~ changePool + +{- Records a change to the pool. -} +recordChange :: Change -> Assistant () +recordChange c = (atomically . flip snocTList c) <<~ changePool + +recordChanges :: [Change] -> Assistant () +recordChanges = refillChanges diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs new file mode 100644 index 000000000..7d1d3780f --- /dev/null +++ b/Assistant/Commits.hs @@ -0,0 +1,23 @@ +{- git-annex assistant commit tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Commits where + +import Assistant.Common +import Assistant.Types.Commits +import Utility.TList + +import Control.Concurrent.STM + +{- Gets all unhandled commits. + - Blocks until at least one commit is made. -} +getCommits :: Assistant [Commit] +getCommits = (atomically . getTList) <<~ commitChan + +{- Records a commit in the channel. -} +recordCommit :: Assistant () +recordCommit = (atomically . flip consTList Commit) <<~ commitChan diff --git a/Assistant/Common.hs b/Assistant/Common.hs new file mode 100644 index 000000000..f9719422d --- /dev/null +++ b/Assistant/Common.hs @@ -0,0 +1,14 @@ +{- Common infrastructure for the git-annex assistant. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Common (module X) where + +import Common.Annex as X +import Assistant.Monad as X +import Assistant.Types.DaemonStatus as X +import Assistant.Types.NamedThread as X +import Assistant.Types.Alert as X diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs new file mode 100644 index 000000000..7268bbbfb --- /dev/null +++ b/Assistant/DaemonStatus.hs @@ -0,0 +1,262 @@ +{- git-annex assistant daemon status + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.DaemonStatus where + +import Assistant.Common +import Assistant.Alert.Utility +import Utility.Tmp +import Assistant.Types.NetMessager +import Utility.NotificationBroadcaster +import Logs.Transfer +import Logs.Trust +import qualified Remote +import qualified Types.Remote as Remote +import qualified Git + +import Control.Concurrent.STM +import System.Posix.Types +import Data.Time.Clock.POSIX +import Data.Time +import System.Locale +import qualified Data.Map as M +import qualified Data.Text as T + +getDaemonStatus :: Assistant DaemonStatus +getDaemonStatus = (atomically . readTMVar) <<~ daemonStatusHandle + +modifyDaemonStatus_ :: (DaemonStatus -> DaemonStatus) -> Assistant () +modifyDaemonStatus_ a = modifyDaemonStatus $ \s -> (a s, ()) + +modifyDaemonStatus :: (DaemonStatus -> (DaemonStatus, b)) -> Assistant b +modifyDaemonStatus a = do + dstatus <- getAssistant daemonStatusHandle + liftIO $ do + (s, b) <- atomically $ do + r@(s, _) <- a <$> takeTMVar dstatus + putTMVar dstatus s + return r + sendNotification $ changeNotifier s + return b + +{- Returns a function that updates the lists of syncable remotes + - and other associated information. -} +calcSyncRemotes :: Annex (DaemonStatus -> DaemonStatus) +calcSyncRemotes = do + rs <- filter (remoteAnnexSync . Remote.gitconfig) . + concat . Remote.byCost <$> Remote.remoteList + alive <- trustExclude DeadTrusted (map Remote.uuid rs) + let good r = Remote.uuid r `elem` alive + let syncable = filter good rs + let syncdata = filter (not . remoteAnnexIgnore . Remote.gitconfig) $ + filter (not . isXMPPRemote) syncable + + return $ \dstatus -> dstatus + { syncRemotes = syncable + , syncGitRemotes = filter Remote.syncableRemote syncable + , syncDataRemotes = syncdata + , syncingToCloudRemote = any iscloud syncdata + } + where + iscloud r = not (Remote.readonly r) && Remote.globallyAvailable r + +{- Updates the syncRemotes list from the list of all remotes in Annex state. -} +updateSyncRemotes :: Assistant () +updateSyncRemotes = do + modifyDaemonStatus_ =<< liftAnnex calcSyncRemotes + status <- getDaemonStatus + liftIO $ sendNotification $ syncRemotesNotifier status + + when (syncingToCloudRemote status) $ + updateAlertMap $ + M.filter $ \alert -> + alertName alert /= Just CloudRepoNeededAlert + +updateScheduleLog :: Assistant () +updateScheduleLog = + liftIO . sendNotification =<< scheduleLogNotifier <$> getDaemonStatus + +{- Load any previous daemon status file, and store it in a MVar for this + - process to use as its DaemonStatus. Also gets current transfer status. -} +startDaemonStatus :: Annex DaemonStatusHandle +startDaemonStatus = do + file <- fromRepo gitAnnexDaemonStatusFile + status <- liftIO $ + flip catchDefaultIO (readDaemonStatusFile file) =<< newDaemonStatus + transfers <- M.fromList <$> getTransfers + addsync <- calcSyncRemotes + liftIO $ atomically $ newTMVar $ addsync $ status + { scanComplete = False + , sanityCheckRunning = False + , currentTransfers = transfers + } + +{- Don't just dump out the structure, because it will change over time, + - and parts of it are not relevant. -} +writeDaemonStatusFile :: FilePath -> DaemonStatus -> IO () +writeDaemonStatusFile file status = + viaTmp writeFile file =<< serialized <$> getPOSIXTime + where + serialized now = unlines + [ "lastRunning:" ++ show now + , "scanComplete:" ++ show (scanComplete status) + , "sanityCheckRunning:" ++ show (sanityCheckRunning status) + , "lastSanityCheck:" ++ maybe "" show (lastSanityCheck status) + ] + +readDaemonStatusFile :: FilePath -> IO DaemonStatus +readDaemonStatusFile file = parse <$> newDaemonStatus <*> readFile file + where + parse status = foldr parseline status . lines + parseline line status + | key == "lastRunning" = parseval readtime $ \v -> + status { lastRunning = Just v } + | key == "scanComplete" = parseval readish $ \v -> + status { scanComplete = v } + | key == "sanityCheckRunning" = parseval readish $ \v -> + status { sanityCheckRunning = v } + | key == "lastSanityCheck" = parseval readtime $ \v -> + status { lastSanityCheck = Just v } + | otherwise = status -- unparsable line + where + (key, value) = separate (== ':') line + parseval parser a = maybe status a (parser value) + readtime s = do + d <- parseTime defaultTimeLocale "%s%Qs" s + Just $ utcTimeToPOSIXSeconds d + +{- Checks if a time stamp was made after the daemon was lastRunning. + - + - Some slop is built in; this really checks if the time stamp was made + - at least ten minutes after the daemon was lastRunning. This is to + - ensure the daemon shut down cleanly, and deal with minor clock skew. + - + - If the daemon has never ran before, this always returns False. + -} +afterLastDaemonRun :: EpochTime -> DaemonStatus -> Bool +afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status) + where + t = realToFrac (timestamp + slop) :: POSIXTime + slop = fromIntegral tenMinutes + +tenMinutes :: Int +tenMinutes = 10 * 60 + +{- Mutates the transfer map. Runs in STM so that the transfer map can + - be modified in the same transaction that modifies the transfer queue. + - Note that this does not send a notification of the change; that's left + - to the caller. -} +adjustTransfersSTM :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> STM () +adjustTransfersSTM dstatus a = do + s <- takeTMVar dstatus + putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) } + +{- Checks if a transfer is currently running. -} +checkRunningTransferSTM :: DaemonStatusHandle -> Transfer -> STM Bool +checkRunningTransferSTM dstatus t = M.member t . currentTransfers + <$> readTMVar dstatus + +{- Alters a transfer's info, if the transfer is in the map. -} +alterTransferInfo :: Transfer -> (TransferInfo -> TransferInfo) -> Assistant () +alterTransferInfo t a = updateTransferInfo' $ M.adjust a t + +{- Updates a transfer's info. Adds the transfer to the map if necessary, + - or if already present, updates it while preserving the old transferTid, + - transferPaused, and bytesComplete values, which are not written to disk. -} +updateTransferInfo :: Transfer -> TransferInfo -> Assistant () +updateTransferInfo t info = updateTransferInfo' $ M.insertWith' merge t info + where + merge new old = new + { transferTid = maybe (transferTid new) Just (transferTid old) + , transferPaused = transferPaused new || transferPaused old + , bytesComplete = maybe (bytesComplete new) Just (bytesComplete old) + } + +updateTransferInfo' :: (TransferMap -> TransferMap) -> Assistant () +updateTransferInfo' a = notifyTransfer `after` modifyDaemonStatus_ update + where + update s = s { currentTransfers = a (currentTransfers s) } + +{- Removes a transfer from the map, and returns its info. -} +removeTransfer :: Transfer -> Assistant (Maybe TransferInfo) +removeTransfer t = notifyTransfer `after` modifyDaemonStatus remove + where + remove s = + let (info, ts) = M.updateLookupWithKey + (\_k _v -> Nothing) + t (currentTransfers s) + in (s { currentTransfers = ts }, info) + +{- Send a notification when a transfer is changed. -} +notifyTransfer :: Assistant () +notifyTransfer = do + dstatus <- getAssistant daemonStatusHandle + liftIO $ sendNotification + =<< transferNotifier <$> atomically (readTMVar dstatus) + +{- Send a notification when alerts are changed. -} +notifyAlert :: Assistant () +notifyAlert = do + dstatus <- getAssistant daemonStatusHandle + liftIO $ sendNotification + =<< alertNotifier <$> atomically (readTMVar dstatus) + +{- Returns the alert's identifier, which can be used to remove it. -} +addAlert :: Alert -> Assistant AlertId +addAlert alert = do + notice [showAlert alert] + notifyAlert `after` modifyDaemonStatus add + where + add s = (s { lastAlertId = i, alertMap = m }, i) + where + i = nextAlertId $ lastAlertId s + m = mergeAlert i alert (alertMap s) + +removeAlert :: AlertId -> Assistant () +removeAlert i = updateAlert i (const Nothing) + +updateAlert :: AlertId -> (Alert -> Maybe Alert) -> Assistant () +updateAlert i a = updateAlertMap $ \m -> M.update a i m + +updateAlertMap :: (AlertMap -> AlertMap) -> Assistant () +updateAlertMap a = notifyAlert `after` modifyDaemonStatus_ update + where + update s = s { alertMap = a (alertMap s) } + +{- Displays an alert while performing an activity that returns True on + - success. + - + - The alert is left visible afterwards, as filler. + - Old filler is pruned, to prevent the map growing too large. -} +alertWhile :: Alert -> Assistant Bool -> Assistant Bool +alertWhile alert a = alertWhile' alert $ do + r <- a + return (r, r) + +{- Like alertWhile, but allows the activity to return a value too. -} +alertWhile' :: Alert -> Assistant (Bool, a) -> Assistant a +alertWhile' alert a = do + let alert' = alert { alertClass = Activity } + i <- addAlert alert' + (ok, r) <- a + updateAlertMap $ mergeAlert i $ makeAlertFiller ok alert' + return r + +{- Displays an alert while performing an activity, then removes it. -} +alertDuring :: Alert -> Assistant a -> Assistant a +alertDuring alert a = do + i <- addAlert $ alert { alertClass = Activity } + removeAlert i `after` a + +{- Remotes using the XMPP transport have urls like xmpp::user@host -} +isXMPPRemote :: Remote -> Bool +isXMPPRemote remote = Git.repoIsUrl r && "xmpp::" `isPrefixOf` Git.repoLocation r + where + r = Remote.repo remote + +getXMPPClientID :: Remote -> ClientID +getXMPPClientID r = T.pack $ drop (length "xmpp::") (Git.repoLocation (Remote.repo r)) diff --git a/Assistant/DeleteRemote.hs b/Assistant/DeleteRemote.hs new file mode 100644 index 000000000..cc05786e4 --- /dev/null +++ b/Assistant/DeleteRemote.hs @@ -0,0 +1,89 @@ +{- git-annex assistant remote deletion utilities + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.DeleteRemote where + +import Assistant.Common +import Assistant.Types.UrlRenderer +import Assistant.TransferQueue +import Logs.Transfer +import Logs.Location +import Assistant.DaemonStatus +import qualified Remote +import Remote.List +import qualified Git.Remote +import Logs.Trust +import qualified Annex + +#ifdef WITH_WEBAPP +import Assistant.WebApp.Types +import Assistant.Alert +import qualified Data.Text as T +#endif + +{- Removes a remote (but leave the repository as-is), and returns the old + - Remote data. -} +disableRemote :: UUID -> Assistant Remote +disableRemote uuid = do + remote <- fromMaybe (error "unknown remote") + <$> liftAnnex (Remote.remoteFromUUID uuid) + liftAnnex $ do + inRepo $ Git.Remote.remove (Remote.name remote) + void $ remoteListRefresh + updateSyncRemotes + return remote + +{- Removes a remote, marking it dead .-} +removeRemote :: UUID -> Assistant Remote +removeRemote uuid = do + liftAnnex $ trustSet uuid DeadTrusted + disableRemote uuid + +{- Called when a Remote is probably empty, to remove it. + - + - This does one last check for any objects remaining in the Remote, + - and if there are any, queues Downloads of them, and defers removing + - the remote for later. This is to catch any objects not referred to + - in keys in the current branch. + -} +removableRemote :: UrlRenderer -> UUID -> Assistant () +removableRemote urlrenderer uuid = do + keys <- getkeys + if null keys + then finishRemovingRemote urlrenderer uuid + else do + r <- fromMaybe (error "unknown remote") + <$> liftAnnex (Remote.remoteFromUUID uuid) + mapM_ (queueremaining r) keys + where + queueremaining r k = + queueTransferWhenSmall "remaining object in unwanted remote" + Nothing (Transfer Download uuid k) r + {- Scanning for keys can take a long time; do not tie up + - the Annex monad while doing it, so other threads continue to + - run. -} + getkeys = do + a <- liftAnnex $ Annex.withCurrentState $ loggedKeysFor uuid + liftIO a + +{- With the webapp, this asks the user to click on a button to finish + - removing the remote. + - + - Without the webapp, just do the removal now. + -} +finishRemovingRemote :: UrlRenderer -> UUID -> Assistant () +#ifdef WITH_WEBAPP +finishRemovingRemote urlrenderer uuid = do + desc <- liftAnnex $ Remote.prettyUUID uuid + button <- mkAlertButton True (T.pack "Finish deletion process") urlrenderer $ + FinishDeleteRepositoryR uuid + void $ addAlert $ remoteRemovalAlert desc button +#else +finishRemovingRemote _ uuid = void $ removeRemote uuid +#endif diff --git a/Assistant/Drop.hs b/Assistant/Drop.hs new file mode 100644 index 000000000..d677a69c8 --- /dev/null +++ b/Assistant/Drop.hs @@ -0,0 +1,112 @@ +{- git-annex assistant dropping of unwanted content + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Drop where + +import Assistant.Common +import Assistant.DaemonStatus +import Logs.Location +import Logs.Trust +import Types.Remote (uuid) +import qualified Remote +import qualified Command.Drop +import Command +import Annex.Wanted +import Annex.Exception +import Config +import Annex.Content.Direct + +import qualified Data.Set as S + +type Reason = String + +{- Drop from local and/or remote when allowed by the preferred content and + - numcopies settings. -} +handleDrops :: Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> Assistant () +handleDrops _ _ _ Nothing _ = noop +handleDrops reason fromhere key f knownpresentremote = do + syncrs <- syncDataRemotes <$> getDaemonStatus + locs <- liftAnnex $ loggedLocations key + handleDropsFrom locs syncrs reason fromhere key f knownpresentremote + +{- The UUIDs are ones where the content is believed to be present. + - The Remote list can include other remotes that do not have the content; + - only ones that match the UUIDs will be dropped from. + - If allowed to drop fromhere, that drop will be tried first. + - + - In direct mode, all associated files are checked, and only if all + - of them are unwanted are they dropped. + -} +handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> Assistant () +handleDropsFrom _ _ _ _ _ Nothing _ = noop +handleDropsFrom locs rs reason fromhere key (Just afile) knownpresentremote = do + fs <- liftAnnex $ ifM isDirect + ( do + l <- associatedFilesRelative key + if null l + then return [afile] + else return l + , return [afile] + ) + n <- getcopies fs + if fromhere && checkcopies n Nothing + then go fs rs =<< dropl fs n + else go fs rs n + where + getcopies fs = liftAnnex $ do + (untrusted, have) <- trustPartition UnTrusted locs + numcopies <- maximum <$> mapM (getNumCopies <=< numCopies) fs + return (length have, numcopies, S.fromList untrusted) + + {- Check that we have enough copies still to drop the content. + - When the remote being dropped from is untrusted, it was not + - counted as a copy, so having only numcopies suffices. Otherwise, + - we need more than numcopies to safely drop. -} + checkcopies (have, numcopies, _untrusted) Nothing = have > numcopies + checkcopies (have, numcopies, untrusted) (Just u) + | S.member u untrusted = have >= numcopies + | otherwise = have > numcopies + + decrcopies (have, numcopies, untrusted) Nothing = + (have - 1, numcopies, untrusted) + decrcopies v@(_have, _numcopies, untrusted) (Just u) + | S.member u untrusted = v + | otherwise = decrcopies v Nothing + + go _ [] _ = noop + go fs (r:rest) n + | uuid r `S.notMember` slocs = go fs rest n + | checkcopies n (Just $ Remote.uuid r) = + dropr fs r n >>= go fs rest + | otherwise = noop + + checkdrop fs n@(have, numcopies, _untrusted) u a = + ifM (liftAnnex $ allM (wantDrop True u . Just) fs) + ( ifM (liftAnnex $ safely $ doCommand $ a (Just numcopies)) + ( do + debug + [ "dropped" + , afile + , "(from " ++ maybe "here" show u ++ ")" + , "(copies now " ++ show (have - 1) ++ ")" + , ": " ++ reason + ] + return $ decrcopies n u + , return n + ) + , return n + ) + + dropl fs n = checkdrop fs n Nothing $ \numcopies -> + Command.Drop.startLocal afile numcopies key knownpresentremote + + dropr fs r n = checkdrop fs n (Just $ Remote.uuid r) $ \numcopies -> + Command.Drop.startRemote afile numcopies key r + + safely a = either (const False) id <$> tryAnnex a + + slocs = S.fromList locs diff --git a/Assistant/Install.hs b/Assistant/Install.hs new file mode 100644 index 000000000..dee1b5be3 --- /dev/null +++ b/Assistant/Install.hs @@ -0,0 +1,101 @@ +{- Assistant installation + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Install where + +import Assistant.Common +import Assistant.Install.AutoStart +import Assistant.Install.Menu +import Assistant.Ssh +import Config.Files +import Utility.FileMode +import Utility.Shell +import Utility.Tmp +import Utility.Env + +#ifdef darwin_HOST_OS +import Utility.OSX +#else +import Utility.FreeDesktop +#endif + +standaloneAppBase :: IO (Maybe FilePath) +standaloneAppBase = getEnv "GIT_ANNEX_APP_BASE" + +{- The standalone app does not have an installation process. + - So when it's run, it needs to set up autostarting of the assistant + - daemon, as well as writing the programFile, and putting a + - git-annex-shell wrapper into ~/.ssh + - + - Note that this is done every time it's started, so if the user moves + - it around, the paths this sets up won't break. + -} +ensureInstalled :: IO () +ensureInstalled = go =<< standaloneAppBase + where + go Nothing = noop + go (Just base) = do + let program = base </> "git-annex" + programfile <- programFile + createDirectoryIfMissing True (parentDir programfile) + writeFile programfile program + +#ifdef darwin_HOST_OS + autostartfile <- userAutoStart osxAutoStartLabel +#else + menufile <- desktopMenuFilePath "git-annex" <$> userDataDir + icondir <- iconDir <$> userDataDir + installMenu program menufile base icondir + autostartfile <- autoStartPath "git-annex" <$> userConfigDir +#endif + installAutoStart program autostartfile + + {- This shim is only updated if it doesn't + - already exist with the right content. -} + sshdir <- sshDir + let shim = sshdir </> "git-annex-shell" + let runshell var = "exec " ++ base </> "runshell" ++ + " git-annex-shell -c \"" ++ var ++ "\"" + let content = unlines + [ shebang_local + , "set -e" + , "if [ \"x$SSH_ORIGINAL_COMMAND\" != \"x\" ]; then" + , runshell "$SSH_ORIGINAL_COMMAND" + , "else" + , runshell "$@" + , "fi" + ] + + curr <- catchDefaultIO "" $ readFileStrict shim + when (curr /= content) $ do + createDirectoryIfMissing True (parentDir shim) + viaTmp writeFile shim content + modifyFileMode shim $ addModes [ownerExecuteMode] + +{- Returns a cleaned up environment that lacks settings used to make the + - standalone builds use their bundled libraries and programs. + - Useful when calling programs not included in the standalone builds. + - + - For a non-standalone build, returns Nothing. + -} +cleanEnvironment :: IO (Maybe [(String, String)]) +cleanEnvironment = clean <$> getEnvironment + where + clean env + | null vars = Nothing + | otherwise = Just $ catMaybes $ map (restoreorig env) env + | otherwise = Nothing + where + vars = words $ fromMaybe "" $ + lookup "GIT_ANNEX_STANDLONE_ENV" env + restoreorig oldenv p@(k, _v) + | k `elem` vars = case lookup ("ORIG_" ++ k) oldenv of + Nothing -> Nothing + (Just v') -> Just (k, v') + | otherwise = Just p diff --git a/Assistant/Install/AutoStart.hs b/Assistant/Install/AutoStart.hs new file mode 100644 index 000000000..b03d20224 --- /dev/null +++ b/Assistant/Install/AutoStart.hs @@ -0,0 +1,39 @@ +{- Assistant autostart file installation + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Install.AutoStart where + +import Utility.FreeDesktop +#ifdef darwin_HOST_OS +import Utility.OSX +import Utility.Path +import System.Directory +#endif + +installAutoStart :: FilePath -> FilePath -> IO () +installAutoStart command file = do +#ifdef darwin_HOST_OS + createDirectoryIfMissing True (parentDir file) + writeFile file $ genOSXAutoStartFile osxAutoStartLabel command + ["assistant", "--autostart"] +#else + writeDesktopMenuFile (fdoAutostart command) file +#endif + +osxAutoStartLabel :: String +osxAutoStartLabel = "com.branchable.git-annex.assistant" + +fdoAutostart :: FilePath -> DesktopEntry +fdoAutostart command = genDesktopEntry + "Git Annex Assistant" + "Autostart" + False + (command ++ " assistant --autostart") + Nothing + [] diff --git a/Assistant/Install/Menu.hs b/Assistant/Install/Menu.hs new file mode 100644 index 000000000..41ec855b6 --- /dev/null +++ b/Assistant/Install/Menu.hs @@ -0,0 +1,47 @@ +{- Assistant menu installation. + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Install.Menu where + +import Common + +import Utility.FreeDesktop + +installMenu :: FilePath -> FilePath -> FilePath -> FilePath -> IO () +installMenu command menufile iconsrcdir icondir = do +#ifdef darwin_HOST_OS + return () +#else + writeDesktopMenuFile (fdoDesktopMenu command) menufile + installIcon (iconsrcdir </> "logo.svg") $ + iconFilePath (iconBaseName ++ ".svg") "scalable" icondir + installIcon (iconsrcdir </> "favicon.png") $ + iconFilePath (iconBaseName ++ ".png") "16x16" icondir +#endif + +{- The command can be either just "git-annex", or the full path to use + - to run it. -} +fdoDesktopMenu :: FilePath -> DesktopEntry +fdoDesktopMenu command = genDesktopEntry + "Git Annex" + "Track and sync the files in your Git Annex" + False + (command ++ " webapp") + (Just iconBaseName) + ["Network", "FileTransfer"] + +installIcon :: FilePath -> FilePath -> IO () +installIcon src dest = do + createDirectoryIfMissing True (parentDir dest) + withBinaryFile src ReadMode $ \hin -> + withBinaryFile dest WriteMode $ \hout -> + hGetContents hin >>= hPutStr hout + +iconBaseName :: String +iconBaseName = "git-annex" diff --git a/Assistant/MakeRemote.hs b/Assistant/MakeRemote.hs new file mode 100644 index 000000000..bf316e49d --- /dev/null +++ b/Assistant/MakeRemote.hs @@ -0,0 +1,165 @@ +{- git-annex assistant remote creation utilities + - + - Copyright 2012, 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.MakeRemote where + +import Assistant.Common +import Assistant.Ssh +import qualified Types.Remote as R +import qualified Remote +import Remote.List +import qualified Remote.Rsync as Rsync +import qualified Remote.GCrypt as GCrypt +import qualified Git +import qualified Git.Command +import qualified Command.InitRemote +import Logs.UUID +import Logs.Remote +import Git.Remote +import Git.Types (RemoteName) +import Creds +import Assistant.Gpg +import Utility.Gpg (KeyId) + +import qualified Data.Map as M + +{- Sets up a new git or rsync remote, accessed over ssh. -} +makeSshRemote :: SshData -> Annex RemoteName +makeSshRemote sshdata = maker (sshRepoName sshdata) (genSshUrl sshdata) + where + maker + | onlyCapability sshdata RsyncCapable = makeRsyncRemote + | otherwise = makeGitRemote + +{- Runs an action that returns a name of the remote, and finishes adding it. -} +addRemote :: Annex RemoteName -> Annex Remote +addRemote a = do + name <- a + void remoteListRefresh + maybe (error "failed to add remote") return + =<< Remote.byName (Just name) + +{- Inits a rsync special remote, and returns its name. -} +makeRsyncRemote :: RemoteName -> String -> Annex String +makeRsyncRemote name location = makeRemote name location $ const $ void $ + go =<< Command.InitRemote.findExisting name + where + go Nothing = setupSpecialRemote name Rsync.remote config + (Nothing, Command.InitRemote.newConfig name) + go (Just (u, c)) = setupSpecialRemote name Rsync.remote config (Just u, c) + config = M.fromList + [ ("encryption", "shared") + , ("rsyncurl", location) + , ("type", "rsync") + ] + +{- Inits a gcrypt special remote, and returns its name. -} +makeGCryptRemote :: RemoteName -> String -> KeyId -> Annex RemoteName +makeGCryptRemote remotename location keyid = + initSpecialRemote remotename GCrypt.remote $ M.fromList + [ ("type", "gcrypt") + , ("gitrepo", location) + , configureEncryption HybridEncryption + , ("keyid", keyid) + ] + +type SpecialRemoteMaker = RemoteName -> RemoteType -> R.RemoteConfig -> Annex RemoteName + +{- Inits a new special remote. The name is used as a suggestion, but + - will be changed if there is already a special remote with that name. -} +initSpecialRemote :: SpecialRemoteMaker +initSpecialRemote name remotetype config = go 0 + where + go :: Int -> Annex RemoteName + go n = do + let fullname = if n == 0 then name else name ++ show n + r <- Command.InitRemote.findExisting fullname + case r of + Nothing -> setupSpecialRemote fullname remotetype config + (Nothing, Command.InitRemote.newConfig fullname) + Just _ -> go (n + 1) + +{- Enables an existing special remote. -} +enableSpecialRemote :: SpecialRemoteMaker +enableSpecialRemote name remotetype config = do + r <- Command.InitRemote.findExisting name + case r of + Nothing -> error $ "Cannot find a special remote named " ++ name + Just (u, c) -> setupSpecialRemote name remotetype config (Just u, c) + +setupSpecialRemote :: RemoteName -> RemoteType -> R.RemoteConfig -> (Maybe UUID, R.RemoteConfig) -> Annex RemoteName +setupSpecialRemote name remotetype config (mu, c) = do + {- Currently, only 'weak' ciphers can be generated from the + - assistant, because otherwise GnuPG may block once the entropy + - pool is drained, and as of now there's no way to tell the user + - to perform IO actions to refill the pool. -} + (c', u) <- R.setup remotetype mu $ + M.insert "highRandomQuality" "false" $ M.union config c + describeUUID u name + configSet u c' + return name + +{- Returns the name of the git remote it created. If there's already a + - remote at the location, returns its name. -} +makeGitRemote :: String -> String -> Annex RemoteName +makeGitRemote basename location = makeRemote basename location $ \name -> + void $ inRepo $ Git.Command.runBool + [Param "remote", Param "add", Param name, Param location] + +{- If there's not already a remote at the location, adds it using the + - action, which is passed the name of the remote to make. + - + - Returns the name of the remote. -} +makeRemote :: String -> String -> (RemoteName -> Annex ()) -> Annex RemoteName +makeRemote basename location a = do + g <- gitRepo + if not (any samelocation $ Git.remotes g) + then do + let name = uniqueRemoteName basename 0 g + a name + return name + else return basename + where + samelocation x = Git.repoLocation x == location + +{- Generate an unused name for a remote, adding a number if + - necessary. + - + - Ensures that the returned name is a legal git remote name. -} +uniqueRemoteName :: String -> Int -> Git.Repo -> RemoteName +uniqueRemoteName basename n r + | null namecollision = name + | otherwise = uniqueRemoteName legalbasename (succ n) r + where + namecollision = filter samename (Git.remotes r) + samename x = Git.remoteName x == Just name + name + | n == 0 = legalbasename + | otherwise = legalbasename ++ show n + legalbasename = makeLegalName basename + +{- Finds a CredPair belonging to any Remote that is of a given type + - and matches some other criteria. + - + - This can be used as a default when another repository is being set up + - using the same service. + - + - A function must be provided that returns the CredPairStorage + - to use for a particular Remote's uuid. + -} +previouslyUsedCredPair + :: (UUID -> CredPairStorage) + -> RemoteType + -> (Remote -> Bool) + -> Annex (Maybe CredPair) +previouslyUsedCredPair getstorage remotetype criteria = + getM fromstorage =<< filter criteria . filter sametype <$> remoteList + where + sametype r = R.typename (R.remotetype r) == R.typename remotetype + fromstorage r = do + let storage = getstorage (R.uuid r) + getRemoteCredPair (R.config r) storage diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs new file mode 100644 index 000000000..6b843ea88 --- /dev/null +++ b/Assistant/Monad.hs @@ -0,0 +1,144 @@ +{- git-annex assistant monad + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE GeneralizedNewtypeDeriving, MultiParamTypeClasses #-} + +module Assistant.Monad ( + Assistant, + AssistantData(..), + newAssistantData, + runAssistant, + getAssistant, + LiftAnnex, + liftAnnex, + (<~>), + (<<~), + asIO, + asIO1, + asIO2, + ThreadName, + debug, + notice +) where + +import "mtl" Control.Monad.Reader +import System.Log.Logger + +import Common.Annex +import Assistant.Types.ThreadedMonad +import Assistant.Types.DaemonStatus +import Assistant.Types.ScanRemotes +import Assistant.Types.TransferQueue +import Assistant.Types.TransferSlots +import Assistant.Types.TransferrerPool +import Assistant.Types.Pushes +import Assistant.Types.BranchChange +import Assistant.Types.Commits +import Assistant.Types.Changes +import Assistant.Types.RepoProblem +import Assistant.Types.Buddies +import Assistant.Types.NetMessager +import Assistant.Types.ThreadName + +newtype Assistant a = Assistant { mkAssistant :: ReaderT AssistantData IO a } + deriving ( + Monad, + MonadIO, + MonadReader AssistantData, + Functor, + Applicative + ) + +data AssistantData = AssistantData + { threadName :: ThreadName + , threadState :: ThreadState + , daemonStatusHandle :: DaemonStatusHandle + , scanRemoteMap :: ScanRemoteMap + , transferQueue :: TransferQueue + , transferSlots :: TransferSlots + , transferrerPool :: TransferrerPool + , failedPushMap :: FailedPushMap + , commitChan :: CommitChan + , changePool :: ChangePool + , repoProblemChan :: RepoProblemChan + , branchChangeHandle :: BranchChangeHandle + , buddyList :: BuddyList + , netMessager :: NetMessager + } + +newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData +newAssistantData st dstatus = AssistantData + <$> pure (ThreadName "main") + <*> pure st + <*> pure dstatus + <*> newScanRemoteMap + <*> newTransferQueue + <*> newTransferSlots + <*> newTransferrerPool + <*> newFailedPushMap + <*> newCommitChan + <*> newChangePool + <*> newRepoProblemChan + <*> newBranchChangeHandle + <*> newBuddyList + <*> newNetMessager + +runAssistant :: AssistantData -> Assistant a -> IO a +runAssistant d a = runReaderT (mkAssistant a) d + +getAssistant :: (AssistantData -> a) -> Assistant a +getAssistant = reader + +{- Using a type class for lifting into the annex monad allows + - easily lifting to it from multiple different monads. -} +class LiftAnnex m where + liftAnnex :: Annex a -> m a + +{- Runs an action in the git-annex monad. Note that the same monad state + - is shared amoung all assistant threads, so only one of these can run at + - a time. Therefore, long-duration actions should be avoided. -} +instance LiftAnnex Assistant where + liftAnnex a = do + st <- reader threadState + liftIO $ runThreadState st a + +{- Runs an IO action, passing it an IO action that runs an Assistant action. -} +(<~>) :: (IO a -> IO b) -> Assistant a -> Assistant b +io <~> a = do + d <- reader id + liftIO $ io $ runAssistant d a + +{- Creates an IO action that will run an Assistant action when run. -} +asIO :: Assistant a -> Assistant (IO a) +asIO a = do + d <- reader id + return $ runAssistant d a + +asIO1 :: (a -> Assistant b) -> Assistant (a -> IO b) +asIO1 a = do + d <- reader id + return $ \v -> runAssistant d $ a v + +asIO2 :: (a -> b -> Assistant c) -> Assistant (a -> b -> IO c) +asIO2 a = do + d <- reader id + return $ \v1 v2 -> runAssistant d (a v1 v2) + +{- Runs an IO action on a selected field of the AssistantData. -} +(<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b +io <<~ v = reader v >>= liftIO . io + +debug :: [String] -> Assistant () +debug = logaction debugM + +notice :: [String] -> Assistant () +notice = logaction noticeM + +logaction :: (String -> String -> IO ()) -> [String] -> Assistant () +logaction a ws = do + ThreadName name <- getAssistant threadName + liftIO $ a name $ unwords $ (name ++ ":") : ws diff --git a/Assistant/NamedThread.hs b/Assistant/NamedThread.hs new file mode 100644 index 000000000..e1b3983f7 --- /dev/null +++ b/Assistant/NamedThread.hs @@ -0,0 +1,102 @@ +{- git-annex assistant named threads. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.NamedThread where + +import Common.Annex +import Assistant.Types.NamedThread +import Assistant.Types.ThreadName +import Assistant.Types.DaemonStatus +import Assistant.Types.UrlRenderer +import Assistant.DaemonStatus +import Assistant.Monad +import Utility.NotificationBroadcaster + +import Control.Concurrent +import Control.Concurrent.Async +import qualified Data.Map as M +import qualified Control.Exception as E + +#ifdef WITH_WEBAPP +import Assistant.WebApp.Types +import Assistant.Types.Alert +import Assistant.Alert +import qualified Data.Text as T +#endif + +{- Starts a named thread, if it's not already running. + - + - Named threads are run by a management thread, so if they crash + - an alert is displayed, allowing the thread to be restarted. -} +startNamedThread :: UrlRenderer -> NamedThread -> Assistant () +startNamedThread urlrenderer (NamedThread afterstartupsanitycheck name a) = do + m <- startedThreads <$> getDaemonStatus + case M.lookup name m of + Nothing -> start + Just (aid, _) -> do + r <- liftIO (E.try (poll aid) :: IO (Either E.SomeException (Maybe (Either E.SomeException ())))) + case r of + Right Nothing -> noop + _ -> start + where + start + | afterstartupsanitycheck = do + status <- getDaemonStatus + h <- liftIO $ newNotificationHandle False $ + startupSanityCheckNotifier status + startwith $ runmanaged $ + liftIO $ waitNotification h + | otherwise = startwith $ runmanaged noop + startwith runner = do + d <- getAssistant id + aid <- liftIO $ runner $ d { threadName = name } + restart <- asIO $ startNamedThread urlrenderer (NamedThread False name a) + modifyDaemonStatus_ $ \s -> s + { startedThreads = M.insertWith' const name (aid, restart) (startedThreads s) } + runmanaged first d = do + aid <- async $ runAssistant d $ do + void first + a + void $ forkIO $ manager d aid + return aid + manager d aid = do + r <- E.try (wait aid) :: IO (Either E.SomeException ()) + case r of + Right _ -> noop + Left e -> do + let msg = unwords + [ fromThreadName $ threadName d + , "crashed:", show e + ] + hPutStrLn stderr msg +#ifdef WITH_WEBAPP + button <- runAssistant d $ mkAlertButton True + (T.pack "Restart Thread") + urlrenderer + (RestartThreadR name) + runAssistant d $ void $ addAlert $ + (warningAlert (fromThreadName name) msg) + { alertButtons = [button] } +#endif + +namedThreadId :: NamedThread -> Assistant (Maybe ThreadId) +namedThreadId (NamedThread _ name _) = do + m <- startedThreads <$> getDaemonStatus + return $ asyncThreadId . fst <$> M.lookup name m + +{- Waits for all named threads that have been started to finish. + - + - Note that if a named thread crashes, it will probably + - cause this to crash as well. Also, named threads that are started + - after this is called will not be waited on. -} +waitNamedThreads :: Assistant () +waitNamedThreads = do + m <- startedThreads <$> getDaemonStatus + liftIO $ mapM_ (wait . fst) $ M.elems m + diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs new file mode 100644 index 000000000..acb18b648 --- /dev/null +++ b/Assistant/NetMessager.hs @@ -0,0 +1,180 @@ +{- git-annex assistant out of band network messager interface + - + - Copyright 2012-2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE BangPatterns #-} + +module Assistant.NetMessager where + +import Assistant.Common +import Assistant.Types.NetMessager + +import Control.Concurrent.STM +import Control.Concurrent.MSampleVar +import qualified Data.Set as S +import qualified Data.Map as M +import qualified Data.DList as D + +sendNetMessage :: NetMessage -> Assistant () +sendNetMessage m = + (atomically . flip writeTChan m) <<~ (netMessages . netMessager) + +waitNetMessage :: Assistant (NetMessage) +waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager) + +notifyNetMessagerRestart :: Assistant () +notifyNetMessagerRestart = + flip writeSV () <<~ (netMessagerRestart . netMessager) + +{- This can be used to get an early indication if the network has + - changed, to immediately restart a connection. However, that is not + - available on all systems, so clients also need to deal with + - restarting dropped connections in the usual way. -} +waitNetMessagerRestart :: Assistant () +waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) + +{- Store a new important NetMessage for a client, and if an equivilant + - older message is already stored, remove it from both importantNetMessages + - and sentImportantNetMessages. -} +storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant () +storeImportantNetMessage m client matchingclient = go <<~ netMessager + where + go nm = atomically $ do + q <- takeTMVar $ importantNetMessages nm + sent <- takeTMVar $ sentImportantNetMessages nm + putTMVar (importantNetMessages nm) $ + M.alter (Just . maybe (S.singleton m) (S.insert m)) client $ + M.mapWithKey removematching q + putTMVar (sentImportantNetMessages nm) $ + M.mapWithKey removematching sent + removematching someclient s + | matchingclient someclient = S.filter (not . equivilantImportantNetMessages m) s + | otherwise = s + +{- Indicates that an important NetMessage has been sent to a client. -} +sentImportantNetMessage :: NetMessage -> ClientID -> Assistant () +sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager) + where + go v = atomically $ do + sent <- takeTMVar v + putTMVar v $ + M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent + +{- Checks for important NetMessages that have been stored for a client, and + - sent to a client. Typically the same client for both, although + - a modified or more specific client may need to be used. -} +checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage) +checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager + where + go nm = atomically $ do + stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm) + sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm) + return (fromMaybe S.empty stored, fromMaybe S.empty sent) + +{- Queues a push initiation message in the queue for the appropriate + - side of the push but only if there is not already an initiation message + - from the same client in the queue. -} +queuePushInitiation :: NetMessage -> Assistant () +queuePushInitiation msg@(Pushing clientid stage) = do + tv <- getPushInitiationQueue side + liftIO $ atomically $ do + r <- tryTakeTMVar tv + case r of + Nothing -> putTMVar tv [msg] + Just l -> do + let !l' = msg : filter differentclient l + putTMVar tv l' + where + side = pushDestinationSide stage + differentclient (Pushing cid _) = cid /= clientid + differentclient _ = True +queuePushInitiation _ = noop + +{- Waits for a push inititation message to be received, and runs + - function to select a message from the queue. -} +waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage +waitPushInitiation side selector = do + tv <- getPushInitiationQueue side + liftIO $ atomically $ do + q <- takeTMVar tv + if null q + then retry + else do + let (msg, !q') = selector q + unless (null q') $ + putTMVar tv q' + return msg + +{- Stores messages for a push into the appropriate inbox. + - + - To avoid overflow, only 1000 messages max are stored in any + - inbox, which should be far more than necessary. + - + - TODO: If we have more than 100 inboxes for different clients, + - discard old ones that are not currently being used by any push. + -} +storeInbox :: NetMessage -> Assistant () +storeInbox msg@(Pushing clientid stage) = do + inboxes <- getInboxes side + stored <- liftIO $ atomically $ do + m <- readTVar inboxes + let update = \v -> do + writeTVar inboxes $ + M.insertWith' const clientid v m + return True + case M.lookup clientid m of + Nothing -> update (1, tostore) + Just (sz, l) + | sz > 1000 -> return False + | otherwise -> + let !sz' = sz + 1 + !l' = D.append l tostore + in update (sz', l') + if stored + then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"] + else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"] + where + side = pushDestinationSide stage + tostore = D.singleton msg +storeInbox _ = noop + +{- Gets the new message for a push from its inbox. + - Blocks until a message has been received. -} +waitInbox :: ClientID -> PushSide -> Assistant (NetMessage) +waitInbox clientid side = do + inboxes <- getInboxes side + liftIO $ atomically $ do + m <- readTVar inboxes + case M.lookup clientid m of + Nothing -> retry + Just (sz, dl) + | sz < 1 -> retry + | otherwise -> do + let msg = D.head dl + let dl' = D.tail dl + let !sz' = sz - 1 + writeTVar inboxes $ + M.insertWith' const clientid (sz', dl') m + return msg + +emptyInbox :: ClientID -> PushSide -> Assistant () +emptyInbox clientid side = do + inboxes <- getInboxes side + liftIO $ atomically $ + modifyTVar' inboxes $ + M.delete clientid + +getInboxes :: PushSide -> Assistant Inboxes +getInboxes side = + getSide side . netMessagerInboxes <$> getAssistant netMessager + +getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage]) +getPushInitiationQueue side = + getSide side . netMessagerPushInitiations <$> getAssistant netMessager + +netMessagerDebug :: ClientID -> [String] -> Assistant () +netMessagerDebug clientid l = debug $ + "NetMessager" : l ++ [show $ logClientID clientid] diff --git a/Assistant/Pairing.hs b/Assistant/Pairing.hs new file mode 100644 index 000000000..bb1384a15 --- /dev/null +++ b/Assistant/Pairing.hs @@ -0,0 +1,92 @@ +{- git-annex assistant repo pairing, core data types + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Pairing where + +import Common.Annex +import Utility.Verifiable +import Assistant.Ssh + +import Control.Concurrent +import Network.Socket +import Data.Char +import qualified Data.Text as T + +data PairStage + {- "I'll pair with anybody who shares the secret that can be used + - to verify this request." -} + = PairReq + {- "I've verified your request, and you can verify this to see + - that I know the secret. I set up your ssh key already. + - Here's mine for you to set up." -} + | PairAck + {- "I saw your PairAck; you can stop sending them." -} + | PairDone + deriving (Eq, Read, Show, Ord, Enum) + +newtype PairMsg = PairMsg (Verifiable (PairStage, PairData, SomeAddr)) + deriving (Eq, Read, Show) + +verifiedPairMsg :: PairMsg -> PairingInProgress -> Bool +verifiedPairMsg (PairMsg m) pip = verify m $ inProgressSecret pip + +fromPairMsg :: PairMsg -> Verifiable (PairStage, PairData, SomeAddr) +fromPairMsg (PairMsg m) = m + +pairMsgStage :: PairMsg -> PairStage +pairMsgStage (PairMsg (Verifiable (s, _, _) _)) = s + +pairMsgData :: PairMsg -> PairData +pairMsgData (PairMsg (Verifiable (_, d, _) _)) = d + +pairMsgAddr :: PairMsg -> SomeAddr +pairMsgAddr (PairMsg (Verifiable (_, _, a) _)) = a + +data PairData = PairData + -- uname -n output, not a full domain name + { remoteHostName :: Maybe HostName + , remoteUserName :: UserName + , remoteDirectory :: FilePath + , remoteSshPubKey :: SshPubKey + , pairUUID :: UUID + } + deriving (Eq, Read, Show) + +type UserName = String + +{- A pairing that is in progress has a secret, a thread that is + - broadcasting pairing messages, and a SshKeyPair that has not yet been + - set up on disk. -} +data PairingInProgress = PairingInProgress + { inProgressSecret :: Secret + , inProgressThreadId :: Maybe ThreadId + , inProgressSshKeyPair :: SshKeyPair + , inProgressPairData :: PairData + , inProgressPairStage :: PairStage + } + deriving (Show) + +data SomeAddr = IPv4Addr HostAddress +{- My Android build of the Network library does not currently have IPV6 + - support. -} +#ifndef __ANDROID__ + | IPv6Addr HostAddress6 +#endif + deriving (Ord, Eq, Read, Show) + +{- This contains the whole secret, just lightly obfuscated to make it not + - too obvious. It's only displayed in the user's web browser. -} +newtype SecretReminder = SecretReminder [Int] + deriving (Show, Eq, Ord, Read) + +toSecretReminder :: T.Text -> SecretReminder +toSecretReminder = SecretReminder . map ord . T.unpack + +fromSecretReminder :: SecretReminder -> T.Text +fromSecretReminder (SecretReminder s) = T.pack $ map chr s diff --git a/Assistant/Pairing/MakeRemote.hs b/Assistant/Pairing/MakeRemote.hs new file mode 100644 index 000000000..144b236a4 --- /dev/null +++ b/Assistant/Pairing/MakeRemote.hs @@ -0,0 +1,95 @@ +{- git-annex assistant pairing remote creation + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Pairing.MakeRemote where + +import Assistant.Common +import Assistant.Ssh +import Assistant.Pairing +import Assistant.Pairing.Network +import Assistant.MakeRemote +import Assistant.Sync +import Config.Cost +import Config + +import Network.Socket +import qualified Data.Text as T + +{- Authorized keys are set up before pairing is complete, so that the other + - side can immediately begin syncing. -} +setupAuthorizedKeys :: PairMsg -> FilePath -> IO () +setupAuthorizedKeys msg repodir = do + validateSshPubKey pubkey + unlessM (liftIO $ addAuthorizedKeys True repodir pubkey) $ + error "failed setting up ssh authorized keys" + where + pubkey = remoteSshPubKey $ pairMsgData msg + +{- When local pairing is complete, this is used to set up the remote for + - the host we paired with. -} +finishedLocalPairing :: PairMsg -> SshKeyPair -> Assistant () +finishedLocalPairing msg keypair = do + sshdata <- liftIO $ setupSshKeyPair keypair =<< pairMsgToSshData msg + {- Ensure that we know the ssh host key for the host we paired with. + - If we don't, ssh over to get it. -} + liftIO $ unlessM (knownHost $ sshHostName sshdata) $ + void $ sshTranscript + [ sshOpt "StrictHostKeyChecking" "no" + , sshOpt "NumberOfPasswordPrompts" "0" + , "-n" + , genSshHost (sshHostName sshdata) (sshUserName sshdata) + , "git-annex-shell -c configlist " ++ T.unpack (sshDirectory sshdata) + ] + Nothing + r <- liftAnnex $ addRemote $ makeSshRemote sshdata + liftAnnex $ setRemoteCost r semiExpensiveRemoteCost + syncRemote r + +{- Mostly a straightforward conversion. Except: + - * Determine the best hostname to use to contact the host. + - * Strip leading ~/ from the directory name. + -} +pairMsgToSshData :: PairMsg -> IO SshData +pairMsgToSshData msg = do + let d = pairMsgData msg + hostname <- liftIO $ bestHostName msg + let dir = case remoteDirectory d of + ('~':'/':v) -> v + v -> v + return SshData + { sshHostName = T.pack hostname + , sshUserName = Just (T.pack $ remoteUserName d) + , sshDirectory = T.pack dir + , sshRepoName = genSshRepoName hostname dir + , sshPort = 22 + , needsPubKey = True + , sshCapabilities = [GitAnnexShellCapable, GitCapable, RsyncCapable] + } + +{- Finds the best hostname to use for the host that sent the PairMsg. + - + - If remoteHostName is set, tries to use a .local address based on it. + - That's the most robust, if this system supports .local. + - Otherwise, looks up the hostname in the DNS for the remoteAddress, + - if any. May fall back to remoteAddress if there's no DNS. Ugh. -} +bestHostName :: PairMsg -> IO HostName +bestHostName msg = case remoteHostName $ pairMsgData msg of + Just h -> do + let localname = h ++ ".local" + addrs <- catchDefaultIO [] $ + getAddrInfo Nothing (Just localname) Nothing + maybe fallback (const $ return localname) (headMaybe addrs) + Nothing -> fallback + where + fallback = do + let a = pairMsgAddr msg + let sockaddr = case a of + IPv4Addr addr -> SockAddrInet (PortNum 0) addr + IPv6Addr addr -> SockAddrInet6 (PortNum 0) 0 addr 0 + fromMaybe (showAddr a) + <$> catchDefaultIO Nothing + (fst <$> getNameInfo [] True False sockaddr) diff --git a/Assistant/Pairing/Network.hs b/Assistant/Pairing/Network.hs new file mode 100644 index 000000000..6c625f881 --- /dev/null +++ b/Assistant/Pairing/Network.hs @@ -0,0 +1,130 @@ +{- git-annex assistant pairing network code + - + - All network traffic is sent over multicast UDP. For reliability, + - each message is repeated until acknowledged. This is done using a + - thread, that gets stopped before the next message is sent. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Pairing.Network where + +import Assistant.Common +import Assistant.Pairing +import Assistant.DaemonStatus +import Utility.ThreadScheduler +import Utility.Verifiable + +import Network.Multicast +import Network.Info +import Network.Socket +import Control.Exception (bracket) +import qualified Data.Map as M +import Control.Concurrent + +{- This is an arbitrary port in the dynamic port range, that could + - conceivably be used for some other broadcast messages. + - If so, hope they ignore the garbage from us; we'll certianly + - ignore garbage from them. Wild wild west. -} +pairingPort :: PortNumber +pairingPort = 55556 + +{- Goal: Reach all hosts on the same network segment. + - Method: Use same address that avahi uses. Other broadcast addresses seem + - to not be let through some routers. -} +multicastAddress :: SomeAddr -> HostName +multicastAddress (IPv4Addr _) = "224.0.0.251" +multicastAddress (IPv6Addr _) = "ff02::fb" + +{- Multicasts a message repeatedly on all interfaces, with a 2 second + - delay between each transmission. The message is repeated forever + - unless a number of repeats is specified. + - + - The remoteHostAddress is set to the interface's IP address. + - + - Note that new sockets are opened each time. This is hardly efficient, + - but it allows new network interfaces to be used as they come up. + - On the other hand, the expensive DNS lookups are cached. + -} +multicastPairMsg :: Maybe Int -> Secret -> PairData -> PairStage -> IO () +multicastPairMsg repeats secret pairdata stage = go M.empty repeats + where + go _ (Just 0) = noop + go cache n = do + addrs <- activeNetworkAddresses + let cache' = updatecache cache addrs + mapM_ (sendinterface cache') addrs + threadDelaySeconds (Seconds 2) + go cache' $ pred <$> n + {- The multicast library currently chokes on ipv6 addresses. -} + sendinterface _ (IPv6Addr _) = noop + sendinterface cache i = void $ tryIO $ + withSocketsDo $ bracket setup cleanup use + where + setup = multicastSender (multicastAddress i) pairingPort + cleanup (sock, _) = sClose sock -- FIXME does not work + use (sock, addr) = do + setInterface sock (showAddr i) + maybe noop (\s -> void $ sendTo sock s addr) + (M.lookup i cache) + updatecache cache [] = cache + updatecache cache (i:is) + | M.member i cache = updatecache cache is + | otherwise = updatecache (M.insert i (show $ mkmsg i) cache) is + mkmsg addr = PairMsg $ + mkVerifiable (stage, pairdata, addr) secret + +startSending :: PairingInProgress -> PairStage -> (PairStage -> IO ()) -> Assistant () +startSending pip stage sender = do + a <- asIO start + void $ liftIO $ forkIO a + where + start = do + tid <- liftIO myThreadId + let pip' = pip { inProgressPairStage = stage, inProgressThreadId = Just tid } + oldpip <- modifyDaemonStatus $ + \s -> (s { pairingInProgress = Just pip' }, pairingInProgress s) + maybe noop stopold oldpip + liftIO $ sender stage + stopold = maybe noop (liftIO . killThread) . inProgressThreadId + +stopSending :: PairingInProgress -> Assistant () +stopSending pip = do + maybe noop (liftIO . killThread) $ inProgressThreadId pip + modifyDaemonStatus_ $ \s -> s { pairingInProgress = Nothing } + +class ToSomeAddr a where + toSomeAddr :: a -> SomeAddr + +instance ToSomeAddr IPv4 where + toSomeAddr (IPv4 a) = IPv4Addr a + +instance ToSomeAddr IPv6 where + toSomeAddr (IPv6 o1 o2 o3 o4) = IPv6Addr (o1, o2, o3, o4) + +showAddr :: SomeAddr -> HostName +showAddr (IPv4Addr a) = show $ IPv4 a +showAddr (IPv6Addr (o1, o2, o3, o4)) = show $ IPv6 o1 o2 o3 o4 + +activeNetworkAddresses :: IO [SomeAddr] +activeNetworkAddresses = filter (not . all (`elem` "0.:") . showAddr) + . concatMap (\ni -> [toSomeAddr $ ipv4 ni, toSomeAddr $ ipv6 ni]) + <$> getNetworkInterfaces + +{- A human-visible description of the repository being paired with. + - Note that the repository's description is not shown to the user, because + - it could be something like "my repo", which is confusing when pairing + - with someone else's repo. However, this has the same format as the + - default decription of a repo. -} +pairRepo :: PairMsg -> String +pairRepo msg = concat + [ remoteUserName d + , "@" + , fromMaybe (showAddr $ pairMsgAddr msg) (remoteHostName d) + , ":" + , remoteDirectory d + ] + where + d = pairMsgData msg diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs new file mode 100644 index 000000000..54f31a84b --- /dev/null +++ b/Assistant/Pushes.hs @@ -0,0 +1,40 @@ +{- git-annex assistant push tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Pushes where + +import Assistant.Common +import Assistant.Types.Pushes + +import Control.Concurrent.STM +import Data.Time.Clock +import qualified Data.Map as M + +{- Blocks until there are failed pushes. + - Returns Remotes whose pushes failed a given time duration or more ago. + - (This may be an empty list.) -} +getFailedPushesBefore :: NominalDiffTime -> Assistant [Remote] +getFailedPushesBefore duration = do + v <- getAssistant failedPushMap + liftIO $ do + m <- atomically $ readTMVar v + now <- getCurrentTime + return $ M.keys $ M.filter (not . toorecent now) m + where + toorecent now time = now `diffUTCTime` time < duration + +{- Modifies the map. -} +changeFailedPushMap :: (PushMap -> PushMap) -> Assistant () +changeFailedPushMap a = do + v <- getAssistant failedPushMap + liftIO $ atomically $ store v . a . fromMaybe M.empty =<< tryTakeTMVar v + where + {- tryTakeTMVar empties the TMVar; refill it only if + - the modified map is not itself empty -} + store v m + | m == M.empty = noop + | otherwise = putTMVar v $! m diff --git a/Assistant/RepoProblem.hs b/Assistant/RepoProblem.hs new file mode 100644 index 000000000..6913fefc6 --- /dev/null +++ b/Assistant/RepoProblem.hs @@ -0,0 +1,34 @@ +{- git-annex assistant remote problem handling + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.RepoProblem where + +import Assistant.Common +import Assistant.Types.RepoProblem +import Utility.TList + +import Control.Concurrent.STM + +{- Gets all repositories that have problems. Blocks until there is at + - least one. -} +getRepoProblems :: Assistant [RepoProblem] +getRepoProblems = nubBy sameRepoProblem + <$> (atomically . getTList) <<~ repoProblemChan + +{- Indicates that there was a problem with a repository, and the problem + - appears to not be a transient (eg network connection) problem. + - + - If the problem is able to be repaired, the passed action will be run. + - (However, if multiple problems are reported with a single repository, + - only a single action will be run.) + -} +repoHasProblem :: UUID -> Assistant () -> Assistant () +repoHasProblem u afterrepair = do + rp <- RepoProblem + <$> pure u + <*> asIO afterrepair + (atomically . flip consTList rp) <<~ repoProblemChan diff --git a/Assistant/ScanRemotes.hs b/Assistant/ScanRemotes.hs new file mode 100644 index 000000000..2743c0f36 --- /dev/null +++ b/Assistant/ScanRemotes.hs @@ -0,0 +1,41 @@ +{- git-annex assistant remotes needing scanning + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.ScanRemotes where + +import Assistant.Common +import Assistant.Types.ScanRemotes +import qualified Types.Remote as Remote + +import Data.Function +import Control.Concurrent.STM +import qualified Data.Map as M + +{- Blocks until there is a remote or remotes that need to be scanned. + - + - The list has higher priority remotes listed first. -} +getScanRemote :: Assistant [(Remote, ScanInfo)] +getScanRemote = do + v <- getAssistant scanRemoteMap + liftIO $ atomically $ + reverse . sortBy (compare `on` scanPriority . snd) . M.toList + <$> takeTMVar v + +{- Adds new remotes that need scanning. -} +addScanRemotes :: Bool -> [Remote] -> Assistant () +addScanRemotes _ [] = noop +addScanRemotes full rs = do + v <- getAssistant scanRemoteMap + liftIO $ atomically $ do + m <- fromMaybe M.empty <$> tryTakeTMVar v + putTMVar v $ M.unionWith merge (M.fromList $ zip rs (map info rs)) m + where + info r = ScanInfo (-1 * Remote.cost r) full + merge x y = ScanInfo + { scanPriority = max (scanPriority x) (scanPriority y) + , fullScan = fullScan x || fullScan y + } diff --git a/Assistant/Ssh.hs b/Assistant/Ssh.hs new file mode 100644 index 000000000..1dc982ba6 --- /dev/null +++ b/Assistant/Ssh.hs @@ -0,0 +1,342 @@ +{- git-annex assistant ssh utilities + - + - Copyright 2012-2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Ssh where + +import Common.Annex +import Utility.Tmp +import Utility.UserInfo +import Utility.Shell +import Utility.Rsync +import Utility.FileMode +import Git.Remote + +import Data.Text (Text) +import qualified Data.Text as T +import Data.Char +import Network.URI + +data SshData = SshData + { sshHostName :: Text + , sshUserName :: Maybe Text + , sshDirectory :: Text + , sshRepoName :: String + , sshPort :: Int + , needsPubKey :: Bool + , sshCapabilities :: [SshServerCapability] + } + deriving (Read, Show, Eq) + +data SshServerCapability = GitAnnexShellCapable | GitCapable | RsyncCapable + deriving (Read, Show, Eq) + +hasCapability :: SshData -> SshServerCapability -> Bool +hasCapability d c = c `elem` sshCapabilities d + +onlyCapability :: SshData -> SshServerCapability -> Bool +onlyCapability d c = all (== c) (sshCapabilities d) + +data SshKeyPair = SshKeyPair + { sshPubKey :: String + , sshPrivKey :: String + } + +instance Show SshKeyPair where + show = sshPubKey + +type SshPubKey = String + +{- ssh -ofoo=bar command-line option -} +sshOpt :: String -> String -> String +sshOpt k v = concat ["-o", k, "=", v] + +sshDir :: IO FilePath +sshDir = do + home <- myHomeDir + return $ home </> ".ssh" + +{- user@host or host -} +genSshHost :: Text -> Maybe Text -> String +genSshHost host user = maybe "" (\v -> T.unpack v ++ "@") user ++ T.unpack host + +{- Generates a ssh or rsync url from a SshData. -} +genSshUrl :: SshData -> String +genSshUrl sshdata = addtrailingslash $ T.unpack $ T.concat $ + if (onlyCapability sshdata RsyncCapable) + then [u, h, T.pack ":", sshDirectory sshdata] + else [T.pack "ssh://", u, h, d] + where + u = maybe (T.pack "") (\v -> T.concat [v, T.pack "@"]) $ sshUserName sshdata + h = sshHostName sshdata + d + | T.pack "/" `T.isPrefixOf` sshDirectory sshdata = sshDirectory sshdata + | T.pack "~/" `T.isPrefixOf` sshDirectory sshdata = T.concat [T.pack "/", sshDirectory sshdata] + | otherwise = T.concat [T.pack "/~/", sshDirectory sshdata] + addtrailingslash s + | "/" `isSuffixOf` s = s + | otherwise = s ++ "/" + +{- Reverses genSshUrl -} +parseSshUrl :: String -> Maybe SshData +parseSshUrl u + | "ssh://" `isPrefixOf` u = fromssh (drop (length "ssh://") u) + | otherwise = fromrsync u + where + mkdata (userhost, dir) = Just $ SshData + { sshHostName = T.pack host + , sshUserName = if null user then Nothing else Just $ T.pack user + , sshDirectory = T.pack dir + , sshRepoName = genSshRepoName host dir + -- dummy values, cannot determine from url + , sshPort = 22 + , needsPubKey = True + , sshCapabilities = [] + } + where + (user, host) = if '@' `elem` userhost + then separate (== '@') userhost + else ("", userhost) + fromrsync s + | not (rsyncUrlIsShell u) = Nothing + | otherwise = mkdata $ separate (== ':') s + fromssh = mkdata . break (== '/') + +{- Generates a git remote name, like host_dir or host -} +genSshRepoName :: String -> FilePath -> String +genSshRepoName host dir + | null dir = makeLegalName host + | otherwise = makeLegalName $ host ++ "_" ++ dir + +{- The output of ssh, including both stdout and stderr. -} +sshTranscript :: [String] -> (Maybe String) -> IO (String, Bool) +sshTranscript opts input = processTranscript "ssh" opts input + +{- Ensure that the ssh public key doesn't include any ssh options, like + - command=foo, or other weirdness -} +validateSshPubKey :: SshPubKey -> IO () +validateSshPubKey pubkey + | length (lines pubkey) == 1 = + either error return $ check $ words pubkey + | otherwise = error "too many lines in ssh public key" + where + check [prefix, _key, comment] = do + checkprefix prefix + checkcomment comment + check [prefix, _key] = + checkprefix prefix + check _ = err "wrong number of words in ssh public key" + + ok = Right () + err msg = Left $ unwords [msg, pubkey] + + checkprefix prefix + | ssh == "ssh" && all isAlphaNum keytype = ok + | otherwise = err "bad ssh public key prefix" + where + (ssh, keytype) = separate (== '-') prefix + + checkcomment comment = case filter (not . safeincomment) comment of + [] -> ok + badstuff -> err $ "bad comment in ssh public key (contains: \"" ++ badstuff ++ "\")" + safeincomment c = isAlphaNum c || c == '@' || c == '-' || c == '_' || c == '.' + +addAuthorizedKeys :: Bool -> FilePath -> SshPubKey -> IO Bool +addAuthorizedKeys gitannexshellonly dir pubkey = boolSystem "sh" + [ Param "-c" , Param $ addAuthorizedKeysCommand gitannexshellonly dir pubkey ] + +removeAuthorizedKeys :: Bool -> FilePath -> SshPubKey -> IO () +removeAuthorizedKeys gitannexshellonly dir pubkey = do + let keyline = authorizedKeysLine gitannexshellonly dir pubkey + sshdir <- sshDir + let keyfile = sshdir </> "authorized_keys" + ls <- lines <$> readFileStrict keyfile + writeFile keyfile $ unlines $ filter (/= keyline) ls + +{- Implemented as a shell command, so it can be run on remote servers over + - ssh. + - + - The ~/.ssh/git-annex-shell wrapper script is created if not already + - present. + -} +addAuthorizedKeysCommand :: Bool -> FilePath -> SshPubKey -> String +addAuthorizedKeysCommand gitannexshellonly dir pubkey = intercalate "&&" + [ "mkdir -p ~/.ssh" + , intercalate "; " + [ "if [ ! -e " ++ wrapper ++ " ]" + , "then (" ++ intercalate ";" (map echoval script) ++ ") > " ++ wrapper + , "fi" + ] + , "chmod 700 " ++ wrapper + , "touch ~/.ssh/authorized_keys" + , "chmod 600 ~/.ssh/authorized_keys" + , unwords + [ "echo" + , shellEscape $ authorizedKeysLine gitannexshellonly dir pubkey + , ">>~/.ssh/authorized_keys" + ] + ] + where + echoval v = "echo " ++ shellEscape v + wrapper = "~/.ssh/git-annex-shell" + script = + [ shebang_portable + , "set -e" + , "if [ \"x$SSH_ORIGINAL_COMMAND\" != \"x\" ]; then" + , runshell "$SSH_ORIGINAL_COMMAND" + , "else" + , runshell "$@" + , "fi" + ] + runshell var = "exec git-annex-shell -c \"" ++ var ++ "\"" + +authorizedKeysLine :: Bool -> FilePath -> SshPubKey -> String +authorizedKeysLine gitannexshellonly dir pubkey + | gitannexshellonly = limitcommand ++ pubkey + {- TODO: Locking down rsync is difficult, requiring a rather + - long perl script. -} + | otherwise = pubkey + where + limitcommand = "command=\"GIT_ANNEX_SHELL_DIRECTORY="++shellEscape dir++" ~/.ssh/git-annex-shell\",no-agent-forwarding,no-port-forwarding,no-X11-forwarding " + +{- Generates a ssh key pair. -} +genSshKeyPair :: IO SshKeyPair +genSshKeyPair = withTmpDir "git-annex-keygen" $ \dir -> do + ok <- boolSystem "ssh-keygen" + [ Param "-P", Param "" -- no password + , Param "-f", File $ dir </> "key" + ] + unless ok $ + error "ssh-keygen failed" + SshKeyPair + <$> readFile (dir </> "key.pub") + <*> readFile (dir </> "key") + +{- Installs a ssh key pair, and sets up ssh config with a mangled hostname + - that will enable use of the key. This way we avoid changing the user's + - regular ssh experience at all. Returns a modified SshData containing the + - mangled hostname. + - + - Note that the key files are put in ~/.ssh/git-annex/, rather than directly + - in ssh because of an **INSANE** behavior of gnome-keyring: It loads + - ~/.ssh/ANYTHING.pub, and uses them indiscriminately. But using this key + - for a normal login to the server will force git-annex-shell to run, + - and locks the user out. Luckily, it does not recurse into subdirectories. + - + - Similarly, IdentitiesOnly is set in the ssh config to prevent the + - ssh-agent from forcing use of a different key. + -} +setupSshKeyPair :: SshKeyPair -> SshData -> IO SshData +setupSshKeyPair sshkeypair sshdata = do + sshdir <- sshDir + createDirectoryIfMissing True $ parentDir $ sshdir </> sshprivkeyfile + + unlessM (doesFileExist $ sshdir </> sshprivkeyfile) $ + writeFileProtected (sshdir </> sshprivkeyfile) (sshPrivKey sshkeypair) + unlessM (doesFileExist $ sshdir </> sshpubkeyfile) $ + writeFile (sshdir </> sshpubkeyfile) (sshPubKey sshkeypair) + + setSshConfig sshdata + [ ("IdentityFile", "~/.ssh/" ++ sshprivkeyfile) + , ("IdentitiesOnly", "yes") + ] + where + sshprivkeyfile = "git-annex" </> "key." ++ mangleSshHostName sshdata + sshpubkeyfile = sshprivkeyfile ++ ".pub" + +{- Fixes git-annex ssh key pairs configured in .ssh/config + - by old versions to set IdentitiesOnly. -} +fixSshKeyPair :: IO () +fixSshKeyPair = do + sshdir <- sshDir + let configfile = sshdir </> "config" + whenM (doesFileExist configfile) $ do + ls <- lines <$> readFileStrict configfile + let ls' = fixSshKeyPair' ls + when (ls /= ls') $ + viaTmp writeFile configfile $ unlines ls' + +{- Strategy: Search for IdentityFile lines in for files with key.git-annex + - in their names. These are for git-annex ssh key pairs. + - Add the IdentitiesOnly line immediately after them, if not already + - present. -} +fixSshKeyPair' :: [String] -> [String] +fixSshKeyPair' = go [] + where + go c [] = reverse c + go c (l:[]) + | all (`isInfixOf` l) indicators = go (fixedline l:l:c) [] + | otherwise = go (l:c) [] + go c (l:next:rest) + | all (`isInfixOf` l) indicators && not ("IdentitiesOnly" `isInfixOf` next) = + go (fixedline l:l:c) (next:rest) + | otherwise = go (l:c) (next:rest) + indicators = ["IdentityFile", "key.git-annex"] + fixedline tmpl = takeWhile isSpace tmpl ++ "IdentitiesOnly yes" + +{- Setups up a ssh config with a mangled hostname. + - Returns a modified SshData containing the mangled hostname. -} +setSshConfig :: SshData -> [(String, String)] -> IO SshData +setSshConfig sshdata config = do + sshdir <- sshDir + createDirectoryIfMissing True sshdir + let configfile = sshdir </> "config" + unlessM (catchBoolIO $ isInfixOf mangledhost <$> readFile configfile) $ + appendFile configfile $ unlines $ + [ "" + , "# Added automatically by git-annex" + , "Host " ++ mangledhost + ] ++ map (\(k, v) -> "\t" ++ k ++ " " ++ v) + (settings ++ config) + return $ sshdata { sshHostName = T.pack mangledhost } + where + mangledhost = mangleSshHostName sshdata + settings = + [ ("Hostname", T.unpack $ sshHostName sshdata) + , ("Port", show $ sshPort sshdata) + ] + +{- This hostname is specific to a given repository on the ssh host, + - so it is based on the real hostname, the username, and the directory. + - + - The mangled hostname has the form "git-annex-realhostname-username_dir". + - The only use of "-" is to separate the parts shown; this is necessary + - to allow unMangleSshHostName to work. Any unusual characters in the + - username or directory are url encoded, except using "." rather than "%" + - (the latter has special meaning to ssh). + -} +mangleSshHostName :: SshData -> String +mangleSshHostName sshdata = "git-annex-" ++ T.unpack (sshHostName sshdata) + ++ "-" ++ escape extra + where + extra = intercalate "_" $ map T.unpack $ catMaybes + [ sshUserName sshdata + , Just $ sshDirectory sshdata + ] + safe c + | isAlphaNum c = True + | c == '_' = True + | otherwise = False + escape s = replace "%" "." $ escapeURIString safe s + +{- Extracts the real hostname from a mangled ssh hostname. -} +unMangleSshHostName :: String -> String +unMangleSshHostName h = case split "-" h of + ("git":"annex":rest) -> intercalate "-" (beginning rest) + _ -> h + +{- Does ssh have known_hosts data for a hostname? -} +knownHost :: Text -> IO Bool +knownHost hostname = do + sshdir <- sshDir + ifM (doesFileExist $ sshdir </> "known_hosts") + ( not . null <$> checkhost + , return False + ) + where + {- ssh-keygen -F can crash on some old known_hosts file -} + checkhost = catchDefaultIO "" $ + readProcess "ssh-keygen" ["-F", T.unpack hostname] diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs new file mode 100644 index 000000000..f7656f52d --- /dev/null +++ b/Assistant/Sync.hs @@ -0,0 +1,276 @@ +{- git-annex assistant repo syncing + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Sync where + +import Assistant.Common +import Assistant.Pushes +import Assistant.NetMessager +import Assistant.Types.NetMessager +import Assistant.Alert +import Assistant.Alert.Utility +import Assistant.DaemonStatus +import Assistant.ScanRemotes +import qualified Command.Sync +import Utility.Parallel +import qualified Git +import qualified Git.Branch +import qualified Git.Command +import qualified Git.Ref +import qualified Remote +import qualified Types.Remote as Remote +import qualified Remote.List as Remote +import qualified Annex.Branch +import Annex.UUID +import Annex.TaggedPush +import qualified Config +import Git.Config +import Assistant.NamedThread +import Assistant.Threads.Watcher (watchThread, WatcherControl(..)) +import Assistant.TransferSlots +import Assistant.TransferQueue +import Assistant.RepoProblem +import Logs.Transfer + +import Data.Time.Clock +import qualified Data.Map as M +import qualified Data.Set as S +import Control.Concurrent + +{- Syncs with remotes that may have been disconnected for a while. + - + - First gets git in sync, and then prepares any necessary file transfers. + - + - An expensive full scan is queued when the git-annex branches of some of + - the remotes have diverged from the local git-annex branch. Otherwise, + - it's sufficient to requeue failed transfers. + - + - XMPP remotes are also signaled that we can push to them, and we request + - they push to us. Since XMPP pushes run ansynchronously, any scan of the + - XMPP remotes has to be deferred until they're done pushing to us, so + - all XMPP remotes are marked as possibly desynced. + - + - Also handles signaling any connectRemoteNotifiers, after the syncing is + - done. + -} +reconnectRemotes :: Bool -> [Remote] -> Assistant () +reconnectRemotes _ [] = noop +reconnectRemotes notifypushes rs = void $ do + rs' <- liftIO $ filterM (Remote.checkAvailable True) rs + unless (null rs') $ do + modifyDaemonStatus_ $ \s -> s + { desynced = S.union (S.fromList $ map Remote.uuid xmppremotes) (desynced s) } + failedrs <- syncAction rs' (const go) + forM_ failedrs $ \r -> + whenM (liftIO $ Remote.checkAvailable False r) $ + repoHasProblem (Remote.uuid r) (syncRemote r) + mapM_ signal $ filter (`notElem` failedrs) rs' + where + gitremotes = filter (notspecialremote . Remote.repo) rs + (xmppremotes, nonxmppremotes) = partition isXMPPRemote rs + notspecialremote r + | Git.repoIsUrl r = True + | Git.repoIsLocal r = True + | Git.repoIsLocalUnknown r = True + | otherwise = False + sync (Just branch) = do + (failedpull, diverged) <- manualPull (Just branch) gitremotes + now <- liftIO getCurrentTime + failedpush <- pushToRemotes' now notifypushes gitremotes + return (nub $ failedpull ++ failedpush, diverged) + {- No local branch exists yet, but we can try pulling. -} + sync Nothing = manualPull Nothing gitremotes + go = do + (failed, diverged) <- sync + =<< liftAnnex (inRepo Git.Branch.current) + addScanRemotes diverged $ + filter (not . remoteAnnexIgnore . Remote.gitconfig) + nonxmppremotes + return failed + signal r = liftIO . mapM_ (flip tryPutMVar ()) + =<< fromMaybe [] . M.lookup (Remote.uuid r) . connectRemoteNotifiers + <$> getDaemonStatus + +{- Updates the local sync branch, then pushes it to all remotes, in + - parallel, along with the git-annex branch. This is the same + - as "git annex sync", except in parallel, and will co-exist with use of + - "git annex sync". + - + - After the pushes to normal git remotes, also signals XMPP clients that + - they can request an XMPP push. + - + - Avoids running possibly long-duration commands in the Annex monad, so + - as not to block other threads. + - + - This can fail, when the remote's sync branch (or git-annex branch) has + - been updated by some other remote pushing into it, or by the remote + - itself. To handle failure, a manual pull and merge is done, and the push + - is retried. + - + - When there's a lot of activity, we may fail more than once. + - On the other hand, we may fail because the remote is not available. + - Rather than retrying indefinitely, after the first retry we enter a + - fallback mode, where our push is guarenteed to succeed if the remote is + - reachable. If the fallback fails, the push is queued to be retried + - later. + - + - Returns any remotes that it failed to push to. + -} +pushToRemotes :: Bool -> [Remote] -> Assistant [Remote] +pushToRemotes notifypushes remotes = do + now <- liftIO getCurrentTime + syncAction remotes (pushToRemotes' now notifypushes) +pushToRemotes' :: UTCTime -> Bool -> [Remote] -> Assistant [Remote] +pushToRemotes' now notifypushes remotes = do + (g, branch, u) <- liftAnnex $ do + Annex.Branch.commit "update" + (,,) + <$> gitRepo + <*> inRepo Git.Branch.current + <*> getUUID + let (xmppremotes, normalremotes) = partition isXMPPRemote remotes + ret <- go True branch g u normalremotes + unless (null xmppremotes) $ do + shas <- liftAnnex $ map fst <$> + inRepo (Git.Ref.matchingWithHEAD + [Annex.Branch.fullname, Git.Ref.headRef]) + forM_ xmppremotes $ \r -> sendNetMessage $ + Pushing (getXMPPClientID r) (CanPush u shas) + return ret + where + go _ Nothing _ _ _ = return [] -- no branch, so nothing to do + go _ _ _ _ [] = return [] -- no remotes, so nothing to do + go shouldretry (Just branch) g u rs = do + debug ["pushing to", show rs] + liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g + (succeeded, failed) <- liftIO $ inParallel (push g branch) rs + updatemap succeeded [] + if null failed + then do + when notifypushes $ + sendNetMessage $ NotifyPush $ + map Remote.uuid succeeded + return failed + else if shouldretry + then retry branch g u failed + else fallback branch g u failed + + updatemap succeeded failed = changeFailedPushMap $ \m -> + M.union (makemap failed) $ + M.difference m (makemap succeeded) + makemap l = M.fromList $ zip l (repeat now) + + retry branch g u rs = do + debug ["trying manual pull to resolve failed pushes"] + void $ manualPull (Just branch) rs + go False (Just branch) g u rs + + fallback branch g u rs = do + debug ["fallback pushing to", show rs] + (succeeded, failed) <- liftIO $ + inParallel (\r -> taggedPush u Nothing branch r g) rs + updatemap succeeded failed + when (notifypushes && (not $ null succeeded)) $ + sendNetMessage $ NotifyPush $ + map Remote.uuid succeeded + return failed + + push g branch remote = Command.Sync.pushBranch remote branch g + +{- Displays an alert while running an action that syncs with some remotes, + - and returns any remotes that it failed to sync with. + - + - XMPP remotes are handled specially; since the action can only start + - an async process for them, they are not included in the alert, but are + - still passed to the action. + - + - Readonly remotes are also hidden (to hide the web special remote). + -} +syncAction :: [Remote] -> ([Remote] -> Assistant [Remote]) -> Assistant [Remote] +syncAction rs a + | null visibleremotes = a rs + | otherwise = do + i <- addAlert $ syncAlert visibleremotes + failed <- a rs + let failed' = filter (not . Git.repoIsLocalUnknown . Remote.repo) failed + let succeeded = filter (`notElem` failed) visibleremotes + if null succeeded && null failed' + then removeAlert i + else updateAlertMap $ mergeAlert i $ + syncResultAlert succeeded failed' + return failed + where + visibleremotes = filter (not . Remote.readonly) $ + filter (not . isXMPPRemote) rs + +{- Manually pull from remotes and merge their branches. Returns any + - remotes that it failed to pull from, and a Bool indicating + - whether the git-annex branches of the remotes and local had + - diverged before the pull. + - + - After pulling from the normal git remotes, requests pushes from any + - XMPP remotes. However, those pushes will run asynchronously, so their + - results are not included in the return data. + -} +manualPull :: Maybe Git.Ref -> [Remote] -> Assistant ([Remote], Bool) +manualPull currentbranch remotes = do + g <- liftAnnex gitRepo + let (xmppremotes, normalremotes) = partition isXMPPRemote remotes + failed <- liftIO $ forM normalremotes $ \r -> + ifM (Git.Command.runBool [Param "fetch", Param $ Remote.name r] g) + ( return Nothing + , return $ Just r + ) + haddiverged <- liftAnnex Annex.Branch.forceUpdate + forM_ normalremotes $ \r -> + liftAnnex $ Command.Sync.mergeRemote r currentbranch + u <- liftAnnex getUUID + forM_ xmppremotes $ \r -> + sendNetMessage $ Pushing (getXMPPClientID r) (PushRequest u) + return (catMaybes failed, haddiverged) + +{- Start syncing a remote, using a background thread. -} +syncRemote :: Remote -> Assistant () +syncRemote remote = do + updateSyncRemotes + thread <- asIO $ do + reconnectRemotes False [remote] + addScanRemotes True [remote] + void $ liftIO $ forkIO $ thread + +{- Use Nothing to change autocommit setting; or a remote to change + - its sync setting. -} +changeSyncable :: Maybe Remote -> Bool -> Assistant () +changeSyncable Nothing enable = do + liftAnnex $ Config.setConfig key (boolConfig enable) + liftIO . maybe noop (`throwTo` signal) + =<< namedThreadId watchThread + where + key = Config.annexConfig "autocommit" + signal + | enable = ResumeWatcher + | otherwise = PauseWatcher +changeSyncable (Just r) True = do + liftAnnex $ changeSyncFlag r True + syncRemote r +changeSyncable (Just r) False = do + liftAnnex $ changeSyncFlag r False + updateSyncRemotes + {- Stop all transfers to or from this remote. + - XXX Can't stop any ongoing scan, or git syncs. -} + void $ dequeueTransfers tofrom + mapM_ (cancelTransfer False) =<< + filter tofrom . M.keys . currentTransfers <$> getDaemonStatus + where + tofrom t = transferUUID t == Remote.uuid r + +changeSyncFlag :: Remote -> Bool -> Annex () +changeSyncFlag r enabled = do + Config.setConfig key (boolConfig enabled) + void Remote.remoteListRefresh + where + key = Config.remoteConfig (Remote.repo r) "sync" diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs new file mode 100644 index 000000000..695703e22 --- /dev/null +++ b/Assistant/Threads/Committer.hs @@ -0,0 +1,493 @@ +{- git-annex assistant commit thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Threads.Committer where + +import Assistant.Common +import Assistant.Changes +import Assistant.Types.Changes +import Assistant.Commits +import Assistant.Alert +import Assistant.DaemonStatus +import Assistant.TransferQueue +import Assistant.Drop +import Logs.Transfer +import Logs.Location +import qualified Annex.Queue +import qualified Git.Command +import qualified Git.LsFiles +import qualified Git.BuildVersion +import qualified Command.Add +import Utility.ThreadScheduler +import qualified Utility.Lsof as Lsof +import qualified Utility.DirWatcher as DirWatcher +import Types.KeySource +import Config +import Annex.Exception +import Annex.Content +import Annex.Link +import Annex.CatFile +import qualified Annex +import Utility.InodeCache +import Annex.Content.Direct + +import Data.Time.Clock +import Data.Tuple.Utils +import qualified Data.Set as S +import qualified Data.Map as M +import Data.Either +import Control.Concurrent + +{- This thread makes git commits at appropriate times. -} +commitThread :: NamedThread +commitThread = namedThread "Committer" $ do + delayadd <- liftAnnex $ + maybe delayaddDefault (return . Just . Seconds) + =<< annexDelayAdd <$> Annex.getGitConfig + waitChangeTime $ \(changes, time) -> do + readychanges <- handleAdds delayadd changes + if shouldCommit time (length readychanges) readychanges + then do + debug + [ "committing" + , show (length readychanges) + , "changes" + ] + void $ alertWhile commitAlert $ + liftAnnex commitStaged + recordCommit + let numchanges = length readychanges + mapM_ checkChangeContent readychanges + return numchanges + else do + refill readychanges + return 0 + +refill :: [Change] -> Assistant () +refill [] = noop +refill cs = do + debug ["delaying commit of", show (length cs), "changes"] + refillChanges cs + +{- Wait for one or more changes to arrive to be committed, and then + - runs an action to commit them. If more changes arrive while this is + - going on, they're handled intelligently, batching up changes into + - large commits where possible, doing rename detection, and + - commiting immediately otherwise. -} +waitChangeTime :: (([Change], UTCTime) -> Assistant Int) -> Assistant () +waitChangeTime a = waitchanges 0 + where + waitchanges lastcommitsize = do + -- Wait one one second as a simple rate limiter. + liftIO $ threadDelaySeconds (Seconds 1) + -- Now, wait until at least one change is available for + -- processing. + cs <- getChanges + handlechanges cs lastcommitsize + handlechanges changes lastcommitsize = do + let len = length changes + -- See if now's a good time to commit. + now <- liftIO getCurrentTime + case (lastcommitsize >= maxCommitSize, shouldCommit now len changes, possiblyrename changes) of + (True, True, _) + | len > maxCommitSize -> + waitchanges =<< a (changes, now) + | otherwise -> aftermaxcommit changes + (_, True, False) -> + waitchanges =<< a (changes, now) + (_, True, True) -> do + morechanges <- getrelatedchanges changes + waitchanges =<< a (changes ++ morechanges, now) + _ -> do + refill changes + waitchanges lastcommitsize + + {- Did we perhaps only get one of the AddChange and RmChange pair + - that make up a file rename? Or some of the pairs that make up + - a directory rename? + -} + possiblyrename = all renamepart + + renamepart (PendingAddChange _ _) = True + renamepart c = isRmChange c + + {- Gets changes related to the passed changes, without blocking + - very long. + - + - If there are multiple RmChanges, this is probably a directory + - rename, in which case it may be necessary to wait longer to get + - all the Changes involved. + -} + getrelatedchanges oldchanges + | length (filter isRmChange oldchanges) > 1 = + concat <$> getbatchchanges [] + | otherwise = do + liftIO humanImperceptibleDelay + getAnyChanges + getbatchchanges cs = do + liftIO $ threadDelay $ fromIntegral $ oneSecond `div` 10 + cs' <- getAnyChanges + if null cs' + then return cs + else getbatchchanges (cs':cs) + + {- The last commit was maximum size, so it's very likely there + - are more changes and we'd like to ensure we make another commit + - of maximum size if possible. + - + - But, it can take a while for the Watcher to wake back up + - after a commit. It can get blocked by another thread + - that is using the Annex state, such as a git-annex branch + - commit. Especially after such a large commit, this can + - take several seconds. When this happens, it defeats the + - normal commit batching, which sees some old changes the + - Watcher found while the commit was being prepared, and sees + - no recent ones, and wants to commit immediately. + - + - All that we need to do, then, is wait for the Watcher to + - wake up, and queue up one more change. + - + - However, it's also possible that we're at the end of changes for + - now. So to avoid waiting a really long time before committing + - those changes we have, poll for up to 30 seconds, and then + - commit them. + - + - Also, try to run something in Annex, to ensure we block + - longer if the Annex state is indeed blocked. + -} + aftermaxcommit oldchanges = loop (30 :: Int) + where + loop 0 = continue oldchanges + loop n = do + liftAnnex noop -- ensure Annex state is free + liftIO $ threadDelaySeconds (Seconds 1) + changes <- getAnyChanges + if null changes + then loop (n - 1) + else continue (oldchanges ++ changes) + continue cs + | null cs = waitchanges 0 + | otherwise = handlechanges cs 0 + +isRmChange :: Change -> Bool +isRmChange (Change { changeInfo = i }) | i == RmChange = True +isRmChange _ = False + +{- An amount of time that is hopefully imperceptably short for humans, + - while long enough for a computer to get some work done. + - Note that 0.001 is a little too short for rename change batching to + - work. -} +humanImperceptibleInterval :: NominalDiffTime +humanImperceptibleInterval = 0.01 + +humanImperceptibleDelay :: IO () +humanImperceptibleDelay = threadDelay $ + truncate $ humanImperceptibleInterval * fromIntegral oneSecond + +maxCommitSize :: Int +maxCommitSize = 5000 + +{- Decide if now is a good time to make a commit. + - Note that the list of changes has an undefined order. + - + - Current strategy: If there have been 10 changes within the past second, + - a batch activity is taking place, so wait for later. + -} +shouldCommit :: UTCTime -> Int -> [Change] -> Bool +shouldCommit now len changes + | len == 0 = False + | len >= maxCommitSize = True + | length recentchanges < 10 = True + | otherwise = False -- batch activity + where + thissecond c = timeDelta c <= 1 + recentchanges = filter thissecond changes + timeDelta c = now `diffUTCTime` changeTime c + +commitStaged :: Annex Bool +commitStaged = do + {- This could fail if there's another commit being made by + - something else. -} + v <- tryAnnex Annex.Queue.flush + case v of + Left _ -> return False + Right _ -> do + {- Empty commits may be made if tree changes cancel + - each other out, etc. Git returns nonzero on those, + - so don't propigate out commit failures. -} + void $ inRepo $ catchMaybeIO . + Git.Command.runQuiet + (Param "commit" : nomessage params) + return True + where + params = + [ Param "--quiet" + {- Avoid running the usual pre-commit hook; + - the Watcher does the same symlink fixing, + - and direct mode bookkeeping updating. -} + , Param "--no-verify" + ] + nomessage ps + | Git.BuildVersion.older "1.7.2" = + Param "-m" : Param "autocommit" : ps + | Git.BuildVersion.older "1.7.8" = + Param "--allow-empty-message" : + Param "-m" : Param "" : ps + | otherwise = + Param "--allow-empty-message" : + Param "--no-edit" : Param "-m" : Param "" : ps + +{- OSX needs a short delay after a file is added before locking it down, + - when using a non-direct mode repository, as pasting a file seems to + - try to set file permissions or otherwise access the file after closing + - it. -} +delayaddDefault :: Annex (Maybe Seconds) +#ifdef darwin_HOST_OS +delayaddDefault = ifM isDirect + ( return Nothing + , return $ Just $ Seconds 1 + ) +#else +delayaddDefault = return Nothing +#endif + +{- If there are PendingAddChanges, or InProcessAddChanges, the files + - have not yet actually been added to the annex, and that has to be done + - now, before committing. + - + - Deferring the adds to this point causes batches to be bundled together, + - which allows faster checking with lsof that the files are not still open + - for write by some other process, and faster checking with git-ls-files + - that the files are not already checked into git. + - + - When a file is added, Inotify will notice the new symlink. So this waits + - for additional Changes to arrive, so that the symlink has hopefully been + - staged before returning, and will be committed immediately. + - + - OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly + - created and staged. + - + - Returns a list of all changes that are ready to be committed. + - Any pending adds that are not ready yet are put back into the ChangeChan, + - where they will be retried later. + -} +handleAdds :: Maybe Seconds -> [Change] -> Assistant [Change] +handleAdds delayadd cs = returnWhen (null incomplete) $ do + let (pending, inprocess) = partition isPendingAddChange incomplete + direct <- liftAnnex isDirect + (pending', cleanup) <- if direct + then return (pending, noop) + else findnew pending + (postponed, toadd) <- partitionEithers <$> safeToAdd delayadd pending' inprocess + cleanup + + unless (null postponed) $ + refillChanges postponed + + returnWhen (null toadd) $ do + added <- addaction toadd $ + catMaybes <$> if direct + then adddirect toadd + else forM toadd add + if DirWatcher.eventsCoalesce || null added || direct + then return $ added ++ otherchanges + else do + r <- handleAdds delayadd =<< getChanges + return $ r ++ added ++ otherchanges + where + (incomplete, otherchanges) = partition (\c -> isPendingAddChange c || isInProcessAddChange c) cs + + findnew [] = return ([], noop) + findnew pending@(exemplar:_) = do + (newfiles, cleanup) <- liftAnnex $ + inRepo (Git.LsFiles.notInRepo False $ map changeFile pending) + -- note: timestamp info is lost here + let ts = changeTime exemplar + return (map (PendingAddChange ts) newfiles, void $ liftIO cleanup) + + returnWhen c a + | c = return otherchanges + | otherwise = a + + add :: Change -> Assistant (Maybe Change) + add change@(InProcessAddChange { keySource = ks }) = + catchDefaultIO Nothing <~> doadd + where + doadd = sanitycheck ks $ do + (mkey, mcache) <- liftAnnex $ do + showStart "add" $ keyFilename ks + Command.Add.ingest $ Just ks + maybe (failedingest change) (done change mcache $ keyFilename ks) mkey + add _ = return Nothing + + {- In direct mode, avoid overhead of re-injesting a renamed + - file, by examining the other Changes to see if a removed + - file has the same InodeCache as the new file. If so, + - we can just update bookkeeping, and stage the file in git. + -} + adddirect :: [Change] -> Assistant [Maybe Change] + adddirect toadd = do + ct <- liftAnnex compareInodeCachesWith + m <- liftAnnex $ removedKeysMap ct cs + if M.null m + then forM toadd add + else forM toadd $ \c -> do + mcache <- liftIO $ genInodeCache $ changeFile c + case mcache of + Nothing -> add c + Just cache -> + case M.lookup (inodeCacheToKey ct cache) m of + Nothing -> add c + Just k -> fastadd c k + + fastadd :: Change -> Key -> Assistant (Maybe Change) + fastadd change key = do + let source = keySource change + liftAnnex $ Command.Add.finishIngestDirect key source + done change Nothing (keyFilename source) key + + removedKeysMap :: InodeComparisonType -> [Change] -> Annex (M.Map InodeCacheKey Key) + removedKeysMap ct l = do + mks <- forM (filter isRmChange l) $ \c -> + catKeyFile $ changeFile c + M.fromList . concat <$> mapM mkpairs (catMaybes mks) + where + mkpairs k = map (\c -> (inodeCacheToKey ct c, k)) <$> + recordedInodeCache k + + failedingest change = do + refill [retryChange change] + liftAnnex showEndFail + return Nothing + + done change mcache file key = liftAnnex $ do + logStatus key InfoPresent + link <- ifM isDirect + ( inRepo $ gitAnnexLink file key + , Command.Add.link file key mcache + ) + whenM (pure DirWatcher.eventsCoalesce <||> isDirect) $ + stageSymlink file =<< hashSymlink link + showEndOk + return $ Just $ finishedChange change key + + {- Check that the keysource's keyFilename still exists, + - and is still a hard link to its contentLocation, + - before ingesting it. -} + sanitycheck keysource a = do + fs <- liftIO $ getSymbolicLinkStatus $ keyFilename keysource + ks <- liftIO $ getSymbolicLinkStatus $ contentLocation keysource + if deviceID ks == deviceID fs && fileID ks == fileID fs + then a + else do + -- remove the hard link + when (contentLocation keysource /= keyFilename keysource) $ + void $ liftIO $ tryIO $ removeFile $ contentLocation keysource + return Nothing + + {- Shown an alert while performing an action to add a file or + - files. When only a few files are added, their names are shown + - in the alert. When it's a batch add, the number of files added + - is shown. + - + - Add errors tend to be transient and will be + - automatically dealt with, so the alert is always told + - the add succeeded. + -} + addaction [] a = a + addaction toadd a = alertWhile' (addFileAlert $ map changeFile toadd) $ + (,) + <$> pure True + <*> a + +{- Files can Either be Right to be added now, + - or are unsafe, and must be Left for later. + - + - Check by running lsof on the repository. + -} +safeToAdd :: Maybe Seconds -> [Change] -> [Change] -> Assistant [Either Change Change] +safeToAdd _ [] [] = return [] +safeToAdd delayadd pending inprocess = do + maybe noop (liftIO . threadDelaySeconds) delayadd + liftAnnex $ do + keysources <- forM pending $ Command.Add.lockDown . changeFile + let inprocess' = inprocess ++ mapMaybe mkinprocess (zip pending keysources) + openfiles <- S.fromList . map fst3 . filter openwrite <$> + findopenfiles (map keySource inprocess') + let checked = map (check openfiles) inprocess' + + {- If new events are received when files are closed, + - there's no need to retry any changes that cannot + - be done now. -} + if DirWatcher.closingTracked + then do + mapM_ canceladd $ lefts checked + allRight $ rights checked + else return checked + where + check openfiles change@(InProcessAddChange { keySource = ks }) + | S.member (contentLocation ks) openfiles = Left change + check _ change = Right change + + mkinprocess (c, Just ks) = Just InProcessAddChange + { changeTime = changeTime c + , keySource = ks + } + mkinprocess (_, Nothing) = Nothing + + canceladd (InProcessAddChange { keySource = ks }) = do + warning $ keyFilename ks + ++ " still has writers, not adding" + -- remove the hard link + when (contentLocation ks /= keyFilename ks) $ + void $ liftIO $ tryIO $ removeFile $ contentLocation ks + canceladd _ = noop + + openwrite (_file, mode, _pid) + | mode == Lsof.OpenWriteOnly = True + | mode == Lsof.OpenReadWrite = True + | mode == Lsof.OpenUnknown = True + | otherwise = False + + allRight = return . map Right + + {- Normally the KeySources are locked down inside the temp directory, + - so can just lsof that, which is quite efficient. + - + - In crippled filesystem mode, there is no lock down, so must run lsof + - on each individual file. + -} + findopenfiles keysources = ifM crippledFileSystem + ( liftIO $ do + let segments = segmentXargs $ map keyFilename keysources + concat <$> forM segments (\fs -> Lsof.query $ "--" : fs) + , do + tmpdir <- fromRepo gitAnnexTmpDir + liftIO $ Lsof.queryDir tmpdir + ) + +{- After a Change is committed, queue any necessary transfers or drops + - of the content of the key. + - + - This is not done during the startup scan, because the expensive + - transfer scan does the same thing then. + -} +checkChangeContent :: Change -> Assistant () +checkChangeContent change@(Change { changeInfo = i }) = + case changeInfoKey i of + Nothing -> noop + Just k -> whenM (scanComplete <$> getDaemonStatus) $ do + present <- liftAnnex $ inAnnex k + if present + then queueTransfers "new file created" Next k (Just f) Upload + else queueTransfers "new or renamed file wanted" Next k (Just f) Download + handleDrops "file renamed" present k (Just f) Nothing + where + f = changeFile change +checkChangeContent _ = noop diff --git a/Assistant/Threads/ConfigMonitor.hs b/Assistant/Threads/ConfigMonitor.hs new file mode 100644 index 000000000..c180c4da9 --- /dev/null +++ b/Assistant/Threads/ConfigMonitor.hs @@ -0,0 +1,87 @@ +{- git-annex assistant config monitor thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.ConfigMonitor where + +import Assistant.Common +import Assistant.BranchChange +import Assistant.DaemonStatus +import Assistant.Commits +import Utility.ThreadScheduler +import Logs +import Logs.UUID +import Logs.Trust +import Logs.PreferredContent +import Logs.Group +import Remote.List (remoteListRefresh) +import qualified Git.LsTree as LsTree +import Git.FilePath +import qualified Annex.Branch + +import qualified Data.Set as S + +{- This thread detects when configuration changes have been made to the + - git-annex branch and reloads cached configuration. + - + - If the branch is frequently changing, it's checked for configuration + - changes no more often than once every 60 seconds. On the other hand, + - if the branch has not changed in a while, configuration changes will + - be detected immediately. + -} +configMonitorThread :: NamedThread +configMonitorThread = namedThread "ConfigMonitor" $ loop =<< getConfigs + where + loop old = do + waitBranchChange + new <- getConfigs + when (old /= new) $ do + let changedconfigs = new `S.difference` old + debug $ "reloading config" : + map fst (S.toList changedconfigs) + reloadConfigs new + {- Record a commit to get this config + - change pushed out to remotes. -} + recordCommit + liftIO $ threadDelaySeconds (Seconds 60) + loop new + +{- Config files, and their checksums. -} +type Configs = S.Set (FilePath, String) + +{- All git-annex's config files, and actions to run when they change. -} +configFilesActions :: [(FilePath, Assistant ())] +configFilesActions = + [ (uuidLog, void $ liftAnnex uuidMapLoad) + , (remoteLog, void $ liftAnnex remoteListRefresh) + , (trustLog, void $ liftAnnex trustMapLoad) + , (groupLog, void $ liftAnnex groupMapLoad) + , (scheduleLog, void updateScheduleLog) + -- Preferred content settings depend on most of the other configs, + -- so will be reloaded whenever any configs change. + , (preferredContentLog, noop) + ] + +reloadConfigs :: Configs -> Assistant () +reloadConfigs changedconfigs = do + sequence_ as + void $ liftAnnex preferredContentMapLoad + {- Changes to the remote log, or the trust log, can affect the + - syncRemotes list. Changes to the uuid log may affect its + - display so are also included. -} + when (any (`elem` fs) [remoteLog, trustLog, uuidLog]) + updateSyncRemotes + where + (fs, as) = unzip $ filter (flip S.member changedfiles . fst) + configFilesActions + changedfiles = S.map fst changedconfigs + +getConfigs :: Assistant Configs +getConfigs = S.fromList . map extract + <$> liftAnnex (inRepo $ LsTree.lsTreeFiles Annex.Branch.fullname files) + where + files = map fst configFilesActions + extract treeitem = (getTopFilePath $ LsTree.file treeitem, LsTree.sha treeitem) diff --git a/Assistant/Threads/Cronner.hs b/Assistant/Threads/Cronner.hs new file mode 100644 index 000000000..55b3ca2f1 --- /dev/null +++ b/Assistant/Threads/Cronner.hs @@ -0,0 +1,225 @@ +{- git-annex assistant sceduled jobs runner + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE DeriveDataTypeable #-} + +module Assistant.Threads.Cronner ( + cronnerThread +) where + +import Assistant.Common +import Assistant.DaemonStatus +import Utility.NotificationBroadcaster +import Annex.UUID +import Config.Files +import Logs.Schedule +import Utility.Scheduled +import Types.ScheduledActivity +import Utility.ThreadScheduler +import Utility.HumanTime +import Utility.Batch +import Assistant.TransferQueue +import Annex.Content +import Logs.Transfer +import Assistant.Types.UrlRenderer +import Assistant.Alert +import Remote +import qualified Types.Remote as Remote +import qualified Git +import qualified Git.Fsck +import Assistant.Fsck +import Assistant.Repair + +import Control.Concurrent.Async +import Control.Concurrent.MVar +import Data.Time.LocalTime +import Data.Time.Clock +import qualified Data.Map as M +import qualified Data.Set as S + +{- Loads schedules for this repository, and fires off one thread for each + - scheduled event that runs on this repository. Each thread sleeps until + - its event is scheduled to run. + - + - To handle events that run on remotes, which need to only run when + - their remote gets connected, threads are also started, and are passed + - a MVar to wait on, which is stored in the DaemonStatus's + - connectRemoteNotifiers. + - + - In the meantime the main thread waits for any changes to the + - schedules. When there's a change, compare the old and new list of + - schedules to find deleted and added ones. Start new threads for added + - ones, and kill the threads for deleted ones. -} +cronnerThread :: UrlRenderer -> NamedThread +cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do + fsckNudge urlrenderer Nothing + dstatus <- getDaemonStatus + h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus) + go h M.empty M.empty + where + go h amap nmap = do + activities <- liftAnnex $ scheduleGet =<< getUUID + + let addedactivities = activities `S.difference` M.keysSet amap + let removedactivities = M.keysSet amap `S.difference` activities + + forM_ (S.toList removedactivities) $ \activity -> + case M.lookup activity amap of + Just a -> do + debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)] + liftIO $ cancel a + Nothing -> noop + + lastruntimes <- liftAnnex getLastRunTimes + started <- startactivities (S.toList addedactivities) lastruntimes + let addedamap = M.fromList $ map fst started + let addednmap = M.fromList $ catMaybes $ map snd started + + let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities) + let amap' = M.difference (M.union addedamap amap) (removefiltered amap) + let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap) + modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') } + + liftIO $ waitNotification h + debug ["reloading changed activities"] + go h amap' nmap' + startactivities as lastruntimes = forM as $ \activity -> + case connectActivityUUID activity of + Nothing -> do + runner <- asIO2 (sleepingActivityThread urlrenderer) + a <- liftIO $ async $ + runner activity (M.lookup activity lastruntimes) + return ((activity, a), Nothing) + Just u -> do + mvar <- liftIO newEmptyMVar + runner <- asIO2 (remoteActivityThread urlrenderer mvar) + a <- liftIO $ async $ + runner activity (M.lookup activity lastruntimes) + return ((activity, a), Just (activity, (u, [mvar]))) + +{- Calculate the next time the activity is scheduled to run, then + - sleep until that time, and run it. Then call setLastRunTime, and + - loop. + -} +sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant () +sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime + where + getnexttime = liftIO . nextTime schedule + go _ Nothing = debug ["no scheduled events left for", desc] + go l (Just (NextTimeExactly t)) = waitrun l t Nothing + go l (Just (NextTimeWindow windowstart windowend)) = + waitrun l windowstart (Just windowend) + desc = fromScheduledActivity activity + schedule = getSchedule activity + waitrun l t mmaxt = do + seconds <- liftIO $ secondsUntilLocalTime t + when (seconds > Seconds 0) $ do + debug ["waiting", show seconds, "for next scheduled", desc] + liftIO $ threadDelaySeconds seconds + now <- liftIO getCurrentTime + tz <- liftIO $ getTimeZone now + let nowt = utcToLocalTime tz now + if tolate nowt tz + then do + debug ["too late to run scheduled", desc] + go l =<< getnexttime l + else run nowt + where + tolate nowt tz = case mmaxt of + Just maxt -> nowt > maxt + -- allow the job to start 10 minutes late + Nothing ->diffUTCTime + (localTimeToUTC tz nowt) + (localTimeToUTC tz t) > 600 + run nowt = do + runActivity urlrenderer activity nowt + go (Just nowt) =<< getnexttime (Just nowt) + +{- Wait for the remote to become available by waiting on the MVar. + - Then check if the time is within a time window when activity + - is scheduled to run, and if so run it. + - Otherwise, just wait again on the MVar. + -} +remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant () +remoteActivityThread urlrenderer mvar activity lasttime = do + liftIO $ takeMVar mvar + go =<< liftIO (nextTime (getSchedule activity) lasttime) + where + go (Just (NextTimeWindow windowstart windowend)) = do + now <- liftIO getCurrentTime + tz <- liftIO $ getTimeZone now + if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend + then do + let nowt = utcToLocalTime tz now + runActivity urlrenderer activity nowt + loop (Just nowt) + else loop lasttime + go _ = noop -- running at exact time not handled here + loop = remoteActivityThread urlrenderer mvar activity + +secondsUntilLocalTime :: LocalTime -> IO Seconds +secondsUntilLocalTime t = do + now <- getCurrentTime + tz <- getTimeZone now + let secs = truncate $ diffUTCTime (localTimeToUTC tz t) now + return $ if secs > 0 + then Seconds secs + else Seconds 0 + +runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant () +runActivity urlrenderer activity nowt = do + debug ["starting", desc] + runActivity' urlrenderer activity + debug ["finished", desc] + liftAnnex $ setLastRunTime activity nowt + where + desc = fromScheduledActivity activity + +runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant () +runActivity' urlrenderer (ScheduledSelfFsck _ d) = do + program <- liftIO $ readProgramFile + g <- liftAnnex gitRepo + fsckresults <- showFscking urlrenderer Nothing $ tryNonAsync $ do + void $ batchCommand program (Param "fsck" : annexFsckParams d) + Git.Fsck.findBroken True g + u <- liftAnnex getUUID + void $ repairWhenNecessary urlrenderer u Nothing fsckresults + mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir) + where + reget k = queueTransfers "fsck found bad file; redownloading" Next k Nothing Download +runActivity' urlrenderer (ScheduledRemoteFsck u s d) = handle =<< liftAnnex (remoteFromUUID u) + where + handle Nothing = debug ["skipping remote fsck of uuid without a configured remote", fromUUID u, fromSchedule s] + handle (Just rmt) = void $ case Remote.remoteFsck rmt of + Nothing -> go rmt $ do + program <- readProgramFile + void $ batchCommand program $ + [ Param "fsck" + -- avoid downloading files + , Param "--fast" + , Param "--from" + , Param $ Remote.name rmt + ] ++ annexFsckParams d + Just mkfscker -> do + {- Note that having mkfsker return an IO action + - avoids running a long duration fsck in the + - Annex monad. -} + go rmt =<< liftAnnex (mkfscker (annexFsckParams d)) + go rmt annexfscker = do + fsckresults <- showFscking urlrenderer (Just rmt) $ tryNonAsync $ do + void annexfscker + let r = Remote.repo rmt + if Git.repoIsLocal r && not (Git.repoIsLocalUnknown r) + then Just <$> Git.Fsck.findBroken True r + else pure Nothing + maybe noop (void . repairWhenNecessary urlrenderer u (Just rmt)) fsckresults + +annexFsckParams :: Duration -> [CommandParam] +annexFsckParams d = + [ Param "--incremental-schedule=1d" + , Param $ "--time-limit=" ++ fromDuration d + ] diff --git a/Assistant/Threads/DaemonStatus.hs b/Assistant/Threads/DaemonStatus.hs new file mode 100644 index 000000000..5bbb15acb --- /dev/null +++ b/Assistant/Threads/DaemonStatus.hs @@ -0,0 +1,29 @@ +{- git-annex assistant daemon status thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.DaemonStatus where + +import Assistant.Common +import Assistant.DaemonStatus +import Utility.ThreadScheduler +import Utility.NotificationBroadcaster + +{- This writes the daemon status to disk, when it changes, but no more + - frequently than once every ten minutes. + -} +daemonStatusThread :: NamedThread +daemonStatusThread = namedThread "DaemonStatus" $ do + notifier <- liftIO . newNotificationHandle False + =<< changeNotifier <$> getDaemonStatus + checkpoint + runEvery (Seconds tenMinutes) <~> do + liftIO $ waitNotification notifier + checkpoint + where + checkpoint = do + file <- liftAnnex $ fromRepo gitAnnexDaemonStatusFile + liftIO . writeDaemonStatusFile file =<< getDaemonStatus diff --git a/Assistant/Threads/Glacier.hs b/Assistant/Threads/Glacier.hs new file mode 100644 index 000000000..4c4012a67 --- /dev/null +++ b/Assistant/Threads/Glacier.hs @@ -0,0 +1,43 @@ +{- git-annex assistant Amazon Glacier retrieval + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} + +module Assistant.Threads.Glacier where + +import Assistant.Common +import Utility.ThreadScheduler +import qualified Types.Remote as Remote +import qualified Remote.Glacier as Glacier +import Logs.Transfer +import Assistant.DaemonStatus +import Assistant.TransferQueue + +import qualified Data.Set as S + +{- Wakes up every half hour and checks if any glacier remotes have failed + - downloads. If so, runs glacier-cli to check if the files are now + - available, and queues the downloads. -} +glacierThread :: NamedThread +glacierThread = namedThread "Glacier" $ runEvery (Seconds 3600) <~> go + where + isglacier r = Remote.remotetype r == Glacier.remote + go = do + rs <- filter isglacier . syncDataRemotes <$> getDaemonStatus + forM_ rs $ \r -> + check r =<< liftAnnex (getFailedTransfers $ Remote.uuid r) + check _ [] = noop + check r l = do + let keys = map getkey l + (availkeys, failedkeys) <- liftAnnex $ Glacier.jobList r keys + let s = S.fromList (failedkeys ++ availkeys) + let l' = filter (\p -> S.member (getkey p) s) l + forM_ l' $ \(t, info) -> do + liftAnnex $ removeFailedTransfer t + queueTransferWhenSmall "object available from glacier" (associatedFile info) t r + getkey = transferKey . fst diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs new file mode 100644 index 000000000..3f4fcb0cc --- /dev/null +++ b/Assistant/Threads/Merger.hs @@ -0,0 +1,118 @@ +{- git-annex assistant git merge thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Merger where + +import Assistant.Common +import Assistant.TransferQueue +import Assistant.BranchChange +import Assistant.DaemonStatus +import Assistant.ScanRemotes +import Utility.DirWatcher +import Utility.DirWatcher.Types +import qualified Annex.Branch +import qualified Git +import qualified Git.Branch +import qualified Command.Sync +import Annex.TaggedPush +import Remote (remoteFromUUID) + +import qualified Data.Set as S +import qualified Data.Text as T + +{- This thread watches for changes to .git/refs/, and handles incoming + - pushes. -} +mergeThread :: NamedThread +mergeThread = namedThread "Merger" $ do + g <- liftAnnex gitRepo + let dir = Git.localGitDir g </> "refs" + liftIO $ createDirectoryIfMissing True dir + let hook a = Just <$> asIO2 (runHandler a) + changehook <- hook onChange + errhook <- hook onErr + let hooks = mkWatchHooks + { addHook = changehook + , modifyHook = changehook + , errHook = errhook + } + void $ liftIO $ watchDir dir (const False) hooks id + debug ["watching", dir] + +type Handler = FilePath -> Assistant () + +{- Runs an action handler. + - + - Exceptions are ignored, otherwise a whole thread could be crashed. + -} +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file _filestatus = + either (liftIO . print) (const noop) =<< tryIO <~> handler file + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr = error + +{- Called when a new branch ref is written, or a branch ref is modified. + - + - At startup, synthetic add events fire, causing this to run, but that's + - ok; it ensures that any changes pushed since the last time the assistant + - ran are merged in. + -} +onChange :: Handler +onChange file + | ".lock" `isSuffixOf` file = noop + | isAnnexBranch file = do + branchChanged + diverged <- liftAnnex Annex.Branch.forceUpdate + when diverged $ + unlessM handleDesynced $ + queueDeferredDownloads "retrying deferred download" Later + | "/synced/" `isInfixOf` file = + mergecurrent =<< liftAnnex (inRepo Git.Branch.current) + | otherwise = noop + where + changedbranch = fileToBranch file + + mergecurrent (Just current) + | equivBranches changedbranch current = do + debug + [ "merging", show changedbranch + , "into", show current + ] + void $ liftAnnex $ Command.Sync.mergeFrom changedbranch + mergecurrent _ = noop + + handleDesynced = case fromTaggedBranch changedbranch of + Nothing -> return False + Just (u, info) -> do + mr <- liftAnnex $ remoteFromUUID u + case mr of + Nothing -> return False + Just r -> do + s <- desynced <$> getDaemonStatus + if S.member u s || Just (T.unpack $ getXMPPClientID r) == info + then do + modifyDaemonStatus_ $ \st -> st + { desynced = S.delete u s } + addScanRemotes True [r] + return True + else return False + +equivBranches :: Git.Ref -> Git.Ref -> Bool +equivBranches x y = base x == base y + where + base = takeFileName . show + +isAnnexBranch :: FilePath -> Bool +isAnnexBranch f = n `isSuffixOf` f + where + n = '/' : show Annex.Branch.name + +fileToBranch :: FilePath -> Git.Ref +fileToBranch f = Git.Ref $ "refs" </> base + where + base = Prelude.last $ split "/refs/" f diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs new file mode 100644 index 000000000..39ae67537 --- /dev/null +++ b/Assistant/Threads/MountWatcher.hs @@ -0,0 +1,195 @@ +{- git-annex assistant mount watcher, using either dbus or mtab polling + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} + +module Assistant.Threads.MountWatcher where + +import Assistant.Common +import Assistant.DaemonStatus +import Assistant.Sync +import qualified Annex +import qualified Git +import Utility.ThreadScheduler +import Utility.Mounts +import Remote.List +import qualified Types.Remote as Remote +import Assistant.Types.UrlRenderer +import Assistant.Fsck + +import qualified Data.Set as S + +#if WITH_DBUS +import Utility.DBus +import DBus.Client +import DBus +import Data.Word (Word32) +import Control.Concurrent +import qualified Control.Exception as E +#else +#warning Building without dbus support; will use mtab polling +#endif + +mountWatcherThread :: UrlRenderer -> NamedThread +mountWatcherThread urlrenderer = namedThread "MountWatcher" $ +#if WITH_DBUS + dbusThread urlrenderer +#else + pollingThread urlrenderer +#endif + +#if WITH_DBUS + +dbusThread :: UrlRenderer -> Assistant () +dbusThread urlrenderer = do + runclient <- asIO1 go + r <- liftIO $ E.try $ runClient getSessionAddress runclient + either onerr (const noop) r + where + go client = ifM (checkMountMonitor client) + ( do + {- Store the current mount points in an MVar, to be + - compared later. We could in theory work out the + - mount point from the dbus message, but this is + - easier. -} + mvar <- liftIO $ newMVar =<< currentMountPoints + handleevent <- asIO1 $ \_event -> do + nowmounted <- liftIO $ currentMountPoints + wasmounted <- liftIO $ swapMVar mvar nowmounted + handleMounts urlrenderer wasmounted nowmounted + liftIO $ forM_ mountChanged $ \matcher -> + listen client matcher handleevent + , do + liftAnnex $ + warning "No known volume monitor available through dbus; falling back to mtab polling" + pollingThread urlrenderer + ) + onerr :: E.SomeException -> Assistant () + onerr e = do + {- If the session dbus fails, the user probably + - logged out of their desktop. Even if they log + - back in, we won't have access to the dbus + - session key, so polling is the best that can be + - done in this situation. -} + liftAnnex $ + warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")" + pollingThread urlrenderer + +{- Examine the list of services connected to dbus, to see if there + - are any we can use to monitor mounts. If not, will attempt to start one. -} +checkMountMonitor :: Client -> Assistant Bool +checkMountMonitor client = do + running <- filter (`elem` usableservices) + <$> liftIO (listServiceNames client) + case running of + [] -> startOneService client startableservices + (service:_) -> do + debug [ "Using running DBUS service" + , service + , "to monitor mount events." + ] + return True + where + startableservices = [gvfs, gvfsgdu] + usableservices = startableservices ++ [kde] + gvfs = "org.gtk.Private.UDisks2VolumeMonitor" + gvfsgdu = "org.gtk.Private.GduVolumeMonitor" + kde = "org.kde.DeviceNotifications" + +startOneService :: Client -> [ServiceName] -> Assistant Bool +startOneService _ [] = return False +startOneService client (x:xs) = do + _ <- liftIO $ tryNonAsync $ callDBus client "StartServiceByName" + [toVariant x, toVariant (0 :: Word32)] + ifM (liftIO $ elem x <$> listServiceNames client) + ( do + debug + [ "Started DBUS service", x + , "to monitor mount events." + ] + return True + , startOneService client xs + ) + +{- Filter matching events recieved when drives are mounted and unmounted. -} +mountChanged :: [MatchRule] +mountChanged = [gvfs True, gvfs False, kde, kdefallback] + where + {- gvfs reliably generates this event whenever a + - drive is mounted/unmounted, whether automatically, or manually -} + gvfs mount = matchAny + { matchInterface = Just "org.gtk.Private.RemoteVolumeMonitor" + , matchMember = Just $ if mount then "MountAdded" else "MountRemoved" + } + {- This event fires when KDE prompts the user what to do with a drive, + - but maybe not at other times. And it's not received -} + kde = matchAny + { matchInterface = Just "org.kde.Solid.Device" + , matchMember = Just "setupDone" + } + {- This event may not be closely related to mounting a drive, but it's + - observed reliably when a drive gets mounted or unmounted. -} + kdefallback = matchAny + { matchInterface = Just "org.kde.KDirNotify" + , matchMember = Just "enteredDirectory" + } + +#endif + +pollingThread :: UrlRenderer -> Assistant () +pollingThread urlrenderer = go =<< liftIO currentMountPoints + where + go wasmounted = do + liftIO $ threadDelaySeconds (Seconds 10) + nowmounted <- liftIO currentMountPoints + handleMounts urlrenderer wasmounted nowmounted + go nowmounted + +handleMounts :: UrlRenderer -> MountPoints -> MountPoints -> Assistant () +handleMounts urlrenderer wasmounted nowmounted = + mapM_ (handleMount urlrenderer . mnt_dir) $ + S.toList $ newMountPoints wasmounted nowmounted + +handleMount :: UrlRenderer -> FilePath -> Assistant () +handleMount urlrenderer dir = do + debug ["detected mount of", dir] + rs <- filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder dir + mapM_ (fsckNudge urlrenderer . Just) rs + reconnectRemotes True rs + +{- Finds remotes located underneath the mount point. + - + - Updates state to include the remotes. + - + - The config of git remotes is re-read, as it may not have been available + - at startup time, or may have changed (it could even be a different + - repository at the same remote location..) + -} +remotesUnder :: FilePath -> Assistant [Remote] +remotesUnder dir = do + repotop <- liftAnnex $ fromRepo Git.repoPath + rs <- liftAnnex remoteList + pairs <- liftAnnex $ mapM (checkremote repotop) rs + let (waschanged, rs') = unzip pairs + when (or waschanged) $ do + liftAnnex $ Annex.changeState $ \s -> s { Annex.remotes = catMaybes rs' } + updateSyncRemotes + return $ mapMaybe snd $ filter fst pairs + where + checkremote repotop r = case Remote.localpath r of + Just p | dirContains dir (absPathFrom repotop p) -> + (,) <$> pure True <*> updateRemote r + _ -> return (False, Just r) + +type MountPoints = S.Set Mntent + +currentMountPoints :: IO MountPoints +currentMountPoints = S.fromList <$> getMounts + +newMountPoints :: MountPoints -> MountPoints -> MountPoints +newMountPoints old new = S.difference new old diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs new file mode 100644 index 000000000..a7124fa01 --- /dev/null +++ b/Assistant/Threads/NetWatcher.hs @@ -0,0 +1,138 @@ +{- git-annex assistant network connection watcher, using dbus + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} + +module Assistant.Threads.NetWatcher where + +import Assistant.Common +import Assistant.Sync +import Utility.ThreadScheduler +import qualified Types.Remote as Remote +import Assistant.DaemonStatus +import Utility.NotificationBroadcaster + +#if WITH_DBUS +import Utility.DBus +import DBus.Client +import DBus +import Data.Word (Word32) +import Assistant.NetMessager +#else +#warning Building without dbus support; will poll for network connection changes +#endif + +netWatcherThread :: NamedThread +#if WITH_DBUS +netWatcherThread = thread dbusThread +#else +netWatcherThread = thread noop +#endif + where + thread = namedThread "NetWatcher" + +{- This is a fallback for when dbus cannot be used to detect + - network connection changes, but it also ensures that + - any networked remotes that may have not been routable for a + - while (despite the local network staying up), are synced with + - periodically. + - + - Note that it does not call notifyNetMessagerRestart, because + - it doesn't know that the network has changed. + -} +netWatcherFallbackThread :: NamedThread +netWatcherFallbackThread = namedThread "NetWatcherFallback" $ + runEvery (Seconds 3600) <~> handleConnection + +#if WITH_DBUS + +dbusThread :: Assistant () +dbusThread = do + handleerr <- asIO2 onerr + runclient <- asIO1 go + liftIO $ persistentClient getSystemAddress () handleerr runclient + where + go client = ifM (checkNetMonitor client) + ( do + listenNMConnections client <~> handleconn + listenWicdConnections client <~> handleconn + , do + liftAnnex $ + warning "No known network monitor available through dbus; falling back to polling" + ) + handleconn = do + debug ["detected network connection"] + notifyNetMessagerRestart + handleConnection + onerr e _ = do + liftAnnex $ + warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" + {- Wait, in hope that dbus will come back -} + liftIO $ threadDelaySeconds (Seconds 60) + +{- Examine the list of services connected to dbus, to see if there + - are any we can use to monitor network connections. -} +checkNetMonitor :: Client -> Assistant Bool +checkNetMonitor client = do + running <- liftIO $ filter (`elem` [networkmanager, wicd]) + <$> listServiceNames client + case running of + [] -> return False + (service:_) -> do + debug [ "Using running DBUS service" + , service + , "to monitor network connection events." + ] + return True + where + networkmanager = "org.freedesktop.NetworkManager" + wicd = "org.wicd.daemon" + +{- Listens for new NetworkManager connections. -} +listenNMConnections :: Client -> IO () -> IO () +listenNMConnections client callback = + listen client matcher $ \event -> + when (Just True == anyM activeconnection (signalBody event)) $ + callback + where + matcher = matchAny + { matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active" + , matchMember = Just "PropertiesChanged" + } + nm_connection_activated = toVariant (2 :: Word32) + nm_state_key = toVariant ("State" :: String) + activeconnection v = do + m <- fromVariant v + vstate <- lookup nm_state_key $ dictionaryItems m + state <- fromVariant vstate + return $ state == nm_connection_activated + +{- Listens for new Wicd connections. -} +listenWicdConnections :: Client -> IO () -> IO () +listenWicdConnections client callback = + listen client matcher $ \event -> + when (any (== wicd_success) (signalBody event)) $ + callback + where + matcher = matchAny + { matchInterface = Just "org.wicd.daemon" + , matchMember = Just "ConnectResultsSent" + } + wicd_success = toVariant ("success" :: String) + +#endif + +handleConnection :: Assistant () +handleConnection = do + liftIO . sendNotification . networkConnectedNotifier =<< getDaemonStatus + reconnectRemotes True =<< networkRemotes + +{- Network remotes to sync with. -} +networkRemotes :: Assistant [Remote] +networkRemotes = filter (isNothing . Remote.localpath) . syncRemotes + <$> getDaemonStatus diff --git a/Assistant/Threads/PairListener.hs b/Assistant/Threads/PairListener.hs new file mode 100644 index 000000000..cd95ab5a4 --- /dev/null +++ b/Assistant/Threads/PairListener.hs @@ -0,0 +1,160 @@ +{- git-annex assistant thread to listen for incoming pairing traffic + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.PairListener where + +import Assistant.Common +import Assistant.Pairing +import Assistant.Pairing.Network +import Assistant.Pairing.MakeRemote +import Assistant.WebApp (UrlRenderer) +import Assistant.WebApp.Types +import Assistant.Alert +import Assistant.DaemonStatus +import Utility.ThreadScheduler +import Utility.Format +import Git + +import Network.Multicast +import Network.Socket +import qualified Data.Text as T +import Data.Char + +pairListenerThread :: UrlRenderer -> NamedThread +pairListenerThread urlrenderer = namedThread "PairListener" $ do + listener <- asIO1 $ go [] [] + liftIO $ withSocketsDo $ + runEvery (Seconds 60) $ void $ tryIO $ + listener =<< getsock + where + {- Note this can crash if there's no network interface, + - or only one like lo that doesn't support multicast. -} + getsock = multicastReceiver (multicastAddress $ IPv4Addr undefined) pairingPort + + go reqs cache sock = liftIO (getmsg sock []) >>= \msg -> case readish msg of + Nothing -> go reqs cache sock + Just m -> do + debug ["received", show msg] + sane <- checkSane msg + (pip, verified) <- verificationCheck m + =<< (pairingInProgress <$> getDaemonStatus) + let wrongstage = maybe False (\p -> pairMsgStage m <= inProgressPairStage p) pip + let fromus = maybe False (\p -> remoteSshPubKey (pairMsgData m) == remoteSshPubKey (inProgressPairData p)) pip + case (wrongstage, fromus, sane, pairMsgStage m) of + (_, True, _, _) -> do + debug ["ignoring message that looped back"] + go reqs cache sock + (_, _, False, _) -> go reqs cache sock + -- PairReq starts a pairing process, so a + -- new one is always heeded, even if + -- some other pairing is in process. + (_, _, _, PairReq) -> if m `elem` reqs + then go reqs (invalidateCache m cache) sock + else do + pairReqReceived verified urlrenderer m + go (m:take 10 reqs) (invalidateCache m cache) sock + (True, _, _, _) -> do + debug + ["ignoring out of order message" + , show (pairMsgStage m) + , "expected" + , show (succ . inProgressPairStage <$> pip) + ] + go reqs cache sock + (_, _, _, PairAck) -> do + cache' <- pairAckReceived verified pip m cache + go reqs cache' sock + (_,_ , _, PairDone) -> do + pairDoneReceived verified pip m + go reqs cache sock + + {- As well as verifying the message using the shared secret, + - check its UUID against the UUID we have stored. If + - they're the same, someone is sending bogus messages, + - which could be an attempt to brute force the shared secret. -} + verificationCheck _ Nothing = return (Nothing, False) + verificationCheck m (Just pip) + | not verified && sameuuid = do + liftAnnex $ warning + "detected possible pairing brute force attempt; disabled pairing" + stopSending pip + return (Nothing, False) + |otherwise = return (Just pip, verified && sameuuid) + where + verified = verifiedPairMsg m pip + sameuuid = pairUUID (inProgressPairData pip) == pairUUID (pairMsgData m) + + checkSane msg + {- Control characters could be used in a + - console poisoning attack. -} + | any isControl (filter (/= '\n') (decode_c msg)) = do + liftAnnex $ warning + "illegal control characters in pairing message; ignoring" + return False + | otherwise = return True + + {- PairReqs invalidate the cache of recently finished pairings. + - This is so that, if a new pairing is started with the + - same secret used before, a bogus PairDone is not sent. -} + invalidateCache msg = filter (not . verifiedPairMsg msg) + + getmsg sock c = do + (msg, n, _) <- recvFrom sock chunksz + if n < chunksz + then return $ c ++ msg + else getmsg sock $ c ++ msg + where + chunksz = 1024 + +{- Show an alert when a PairReq is seen. -} +pairReqReceived :: Bool -> UrlRenderer -> PairMsg -> Assistant () +pairReqReceived True _ _ = noop -- ignore our own PairReq +pairReqReceived False urlrenderer msg = do + button <- mkAlertButton True (T.pack "Respond") urlrenderer (FinishLocalPairR msg) + void $ addAlert $ pairRequestReceivedAlert repo button + where + repo = pairRepo msg + +{- When a verified PairAck is seen, a host is ready to pair with us, and has + - already configured our ssh key. Stop sending PairReqs, finish the pairing, + - and send a single PairDone. -} +pairAckReceived :: Bool -> Maybe PairingInProgress -> PairMsg -> [PairingInProgress] -> Assistant [PairingInProgress] +pairAckReceived True (Just pip) msg cache = do + stopSending pip + repodir <- repoPath <$> liftAnnex gitRepo + liftIO $ setupAuthorizedKeys msg repodir + finishedLocalPairing msg (inProgressSshKeyPair pip) + startSending pip PairDone $ multicastPairMsg + (Just 1) (inProgressSecret pip) (inProgressPairData pip) + return $ pip : take 10 cache +{- A stale PairAck might also be seen, after we've finished pairing. + - Perhaps our PairDone was not received. To handle this, we keep + - a cache of recently finished pairings, and re-send PairDone in + - response to stale PairAcks for them. -} +pairAckReceived _ _ msg cache = do + let pips = filter (verifiedPairMsg msg) cache + unless (null pips) $ + forM_ pips $ \pip -> + startSending pip PairDone $ multicastPairMsg + (Just 1) (inProgressSecret pip) (inProgressPairData pip) + return cache + +{- If we get a verified PairDone, the host has accepted our PairAck, and + - has paired with us. Stop sending PairAcks, and finish pairing with them. + - + - TODO: Should third-party hosts remove their pair request alert when they + - see a PairDone? + - Complication: The user could have already clicked on the alert and be + - entering the secret. Would be better to start a fresh pair request in this + - situation. + -} +pairDoneReceived :: Bool -> Maybe PairingInProgress -> PairMsg -> Assistant () +pairDoneReceived False _ _ = noop -- not verified +pairDoneReceived True Nothing _ = noop -- not in progress +pairDoneReceived True (Just pip) msg = do + stopSending pip + finishedLocalPairing msg (inProgressSshKeyPair pip) diff --git a/Assistant/Threads/ProblemFixer.hs b/Assistant/Threads/ProblemFixer.hs new file mode 100644 index 000000000..8095581a6 --- /dev/null +++ b/Assistant/Threads/ProblemFixer.hs @@ -0,0 +1,70 @@ +{- git-annex assistant thread to handle fixing problems with repositories + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.ProblemFixer ( + problemFixerThread +) where + +import Assistant.Common +import Assistant.Types.RepoProblem +import Assistant.RepoProblem +import Assistant.Types.UrlRenderer +import Assistant.Alert +import Remote +import qualified Types.Remote as Remote +import qualified Git.Fsck +import Assistant.Repair +import qualified Git +import Annex.UUID +import Utility.ThreadScheduler + +{- Waits for problems with a repo, and tries to fsck the repo and repair + - the problem. -} +problemFixerThread :: UrlRenderer -> NamedThread +problemFixerThread urlrenderer = namedThread "ProblemFixer" $ + go =<< getRepoProblems + where + go problems = do + mapM_ (handleProblem urlrenderer) problems + liftIO $ threadDelaySeconds (Seconds 60) + -- Problems may have been re-reported while they were being + -- fixed, so ignore those. If a new unique problem happened + -- 60 seconds after the last was fixed, we're unlikely + -- to do much good anyway. + go =<< filter (\p -> not (any (sameRepoProblem p) problems)) + <$> getRepoProblems + +handleProblem :: UrlRenderer -> RepoProblem -> Assistant () +handleProblem urlrenderer repoproblem = do + fixed <- ifM ((==) (problemUUID repoproblem) <$> liftAnnex getUUID) + ( handleLocalRepoProblem urlrenderer + , maybe (return False) (handleRemoteProblem urlrenderer) + =<< liftAnnex (remoteFromUUID $ problemUUID repoproblem) + ) + when fixed $ + liftIO $ afterFix repoproblem + +handleRemoteProblem :: UrlRenderer -> Remote -> Assistant Bool +handleRemoteProblem urlrenderer rmt + | Git.repoIsLocal r && not (Git.repoIsLocalUnknown r) = + ifM (liftIO $ checkAvailable True rmt) + ( do + fixedlocks <- repairStaleGitLocks r + fsckresults <- showFscking urlrenderer (Just rmt) $ tryNonAsync $ + Git.Fsck.findBroken True r + repaired <- repairWhenNecessary urlrenderer (Remote.uuid rmt) (Just rmt) fsckresults + return $ fixedlocks || repaired + , return False + ) + | otherwise = return False + where + r = Remote.repo rmt + +{- This is not yet used, and should probably do a fsck. -} +handleLocalRepoProblem :: UrlRenderer -> Assistant Bool +handleLocalRepoProblem _urlrenderer = do + repairStaleGitLocks =<< liftAnnex gitRepo diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs new file mode 100644 index 000000000..3ec922fe4 --- /dev/null +++ b/Assistant/Threads/Pusher.hs @@ -0,0 +1,49 @@ +{- git-annex assistant git pushing thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Pusher where + +import Assistant.Common +import Assistant.Commits +import Assistant.Pushes +import Assistant.DaemonStatus +import Assistant.Sync +import Utility.ThreadScheduler +import qualified Remote +import qualified Types.Remote as Remote + +{- This thread retries pushes that failed before. -} +pushRetryThread :: NamedThread +pushRetryThread = namedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do + -- We already waited half an hour, now wait until there are failed + -- pushes to retry. + topush <- getFailedPushesBefore (fromIntegral halfhour) + unless (null topush) $ do + debug ["retrying", show (length topush), "failed pushes"] + void $ pushToRemotes True topush + where + halfhour = 1800 + +{- This thread pushes git commits out to remotes soon after they are made. -} +pushThread :: NamedThread +pushThread = namedThread "Pusher" $ runEvery (Seconds 2) <~> do + -- We already waited two seconds as a simple rate limiter. + -- Next, wait until at least one commit has been made + void getCommits + -- Now see if now's a good time to push. + void $ pushToRemotes True =<< pushTargets + +{- We want to avoid pushing to remotes that are marked readonly. + - + - Also, avoid pushing to local remotes we can easily tell are not available, + - to avoid ugly messages when a removable drive is not attached. + -} +pushTargets :: Assistant [Remote] +pushTargets = liftIO . filterM (Remote.checkAvailable True) + =<< candidates <$> getDaemonStatus + where + candidates = filter (not . Remote.readonly) . syncGitRemotes diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs new file mode 100644 index 000000000..6946e8b3a --- /dev/null +++ b/Assistant/Threads/SanityChecker.hs @@ -0,0 +1,175 @@ +{- git-annex assistant sanity checker + - + - Copyright 2012, 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.SanityChecker ( + sanityCheckerStartupThread, + sanityCheckerDailyThread, + sanityCheckerHourlyThread +) where + +import Assistant.Common +import Assistant.DaemonStatus +import Assistant.Alert +import Assistant.Repair +import qualified Git.LsFiles +import qualified Git.Command +import qualified Git.Config +import Utility.ThreadScheduler +import qualified Assistant.Threads.Watcher as Watcher +import Utility.LogFile +import Utility.Batch +import Utility.NotificationBroadcaster +import Config +import Utility.HumanTime +import Git.Repair + +import Data.Time.Clock.POSIX +import qualified Data.Set as S + +{- This thread runs once at startup, and most other threads wait for it + - to finish. (However, the webapp thread does not, to prevent the UI + - being nonresponsive.) -} +sanityCheckerStartupThread :: Maybe Duration -> NamedThread +sanityCheckerStartupThread startupdelay = namedThreadUnchecked "SanityCheckerStartup" $ do + {- Stale git locks can prevent commits from happening, etc. -} + void $ repairStaleGitLocks =<< liftAnnex gitRepo + + {- A corrupt index file can prevent the assistant from working at + - all, so detect and repair. -} + ifM (not <$> liftAnnex (inRepo (checkIndex S.empty))) + ( do + notice ["corrupt index file found at startup; removing and restaging"] + liftAnnex $ inRepo nukeIndex + {- Normally the startup scan avoids re-staging files, + - but with the index deleted, everything needs to be + - restaged. -} + modifyDaemonStatus_ $ \s -> s { forceRestage = True } + , whenM (liftAnnex $ inRepo missingIndex) $ do + debug ["no index file; restaging"] + modifyDaemonStatus_ $ \s -> s { forceRestage = True } + ) + + {- If there's a startup delay, it's done here. -} + liftIO $ maybe noop (threadDelaySeconds . Seconds . fromIntegral . durationSeconds) startupdelay + + {- Notify other threads that the startup sanity check is done. -} + status <- getDaemonStatus + liftIO $ sendNotification $ startupSanityCheckNotifier status + +{- This thread wakes up hourly for inxepensive frequent sanity checks. -} +sanityCheckerHourlyThread :: NamedThread +sanityCheckerHourlyThread = namedThread "SanityCheckerHourly" $ forever $ do + liftIO $ threadDelaySeconds $ Seconds oneHour + hourlyCheck + +{- This thread wakes up daily to make sure the tree is in good shape. -} +sanityCheckerDailyThread :: NamedThread +sanityCheckerDailyThread = namedThread "SanityCheckerDaily" $ forever $ do + waitForNextCheck + + debug ["starting sanity check"] + void $ alertWhile sanityCheckAlert go + debug ["sanity check complete"] + where + go = do + modifyDaemonStatus_ $ \s -> s { sanityCheckRunning = True } + + now <- liftIO getPOSIXTime -- before check started + r <- either showerr return =<< (tryIO . batch) <~> dailyCheck + + modifyDaemonStatus_ $ \s -> s + { sanityCheckRunning = False + , lastSanityCheck = Just now + } + + return r + + showerr e = do + liftAnnex $ warning $ show e + return False + +{- Only run one check per day, from the time of the last check. -} +waitForNextCheck :: Assistant () +waitForNextCheck = do + v <- lastSanityCheck <$> getDaemonStatus + now <- liftIO getPOSIXTime + liftIO $ threadDelaySeconds $ Seconds $ calcdelay now v + where + calcdelay _ Nothing = oneDay + calcdelay now (Just lastcheck) + | lastcheck < now = max oneDay $ + oneDay - truncate (now - lastcheck) + | otherwise = oneDay + +{- It's important to stay out of the Annex monad as much as possible while + - running potentially expensive parts of this check, since remaining in it + - will block the watcher. -} +dailyCheck :: Assistant Bool +dailyCheck = do + g <- liftAnnex gitRepo + + -- Find old unstaged symlinks, and add them to git. + (unstaged, cleanup) <- liftIO $ Git.LsFiles.notInRepo False ["."] g + now <- liftIO getPOSIXTime + forM_ unstaged $ \file -> do + ms <- liftIO $ catchMaybeIO $ getSymbolicLinkStatus file + case ms of + Just s | toonew (statusChangeTime s) now -> noop + | isSymbolicLink s -> addsymlink file ms + _ -> noop + liftIO $ void cleanup + + {- Allow git-gc to run once per day. More frequent gc is avoided + - by default to avoid slowing things down. Only run repacks when 100x + - the usual number of loose objects are present; we tend + - to have a lot of small objects and they should not be a + - significant size. -} + when (Git.Config.getMaybe "gc.auto" g == Just "0") $ + liftIO $ void $ Git.Command.runBool + [ Param "-c", Param "gc.auto=670000" + , Param "gc" + , Param "--auto" + ] g + + return True + where + toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime) + slop = fromIntegral tenMinutes + insanity msg = do + liftAnnex $ warning msg + void $ addAlert $ sanityCheckFixAlert msg + addsymlink file s = do + isdirect <- liftAnnex isDirect + Watcher.runHandler (Watcher.onAddSymlink isdirect) file s + insanity $ "found unstaged symlink: " ++ file + +hourlyCheck :: Assistant () +hourlyCheck = checkLogSize 0 + +{- Rotate logs until log file size is < 1 mb. -} +checkLogSize :: Int -> Assistant () +checkLogSize n = do + f <- liftAnnex $ fromRepo gitAnnexLogFile + logs <- liftIO $ listLogs f + totalsize <- liftIO $ sum <$> mapM filesize logs + when (totalsize > oneMegabyte) $ do + notice ["Rotated logs due to size:", show totalsize] + liftIO $ openLog f >>= redirLog + when (n < maxLogs + 1) $ + checkLogSize $ n + 1 + where + filesize f = fromIntegral . fileSize <$> liftIO (getFileStatus f) + +oneMegabyte :: Int +oneMegabyte = 1000000 + +oneHour :: Int +oneHour = 60 * 60 + +oneDay :: Int +oneDay = 24 * oneHour + diff --git a/Assistant/Threads/TransferPoller.hs b/Assistant/Threads/TransferPoller.hs new file mode 100644 index 000000000..68075cac8 --- /dev/null +++ b/Assistant/Threads/TransferPoller.hs @@ -0,0 +1,56 @@ +{- git-annex assistant transfer polling thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.TransferPoller where + +import Assistant.Common +import Assistant.DaemonStatus +import Logs.Transfer +import Utility.NotificationBroadcaster +import qualified Assistant.Threads.TransferWatcher as TransferWatcher + +import Control.Concurrent +import qualified Data.Map as M + +{- This thread polls the status of ongoing transfers, determining how much + - of each transfer is complete. -} +transferPollerThread :: NamedThread +transferPollerThread = namedThread "TransferPoller" $ do + g <- liftAnnex gitRepo + tn <- liftIO . newNotificationHandle True =<< + transferNotifier <$> getDaemonStatus + forever $ do + liftIO $ threadDelay 500000 -- 0.5 seconds + ts <- currentTransfers <$> getDaemonStatus + if M.null ts + -- block until transfers running + then liftIO $ waitNotification tn + else mapM_ (poll g) $ M.toList ts + where + poll g (t, info) + {- Downloads are polled by checking the size of the + - temp file being used for the transfer. -} + | transferDirection t == Download = do + let f = gitAnnexTmpLocation (transferKey t) g + sz <- liftIO $ catchMaybeIO $ + fromIntegral . fileSize <$> getFileStatus f + newsize t info sz + {- Uploads don't need to be polled for when the TransferWatcher + - thread can track file modifications. -} + | TransferWatcher.watchesTransferSize = noop + {- Otherwise, this code polls the upload progress + - by reading the transfer info file. -} + | otherwise = do + let f = transferFile t g + mi <- liftIO $ catchDefaultIO Nothing $ + readTransferInfoFile Nothing f + maybe noop (newsize t info . bytesComplete) mi + + newsize t info sz + | bytesComplete info /= sz && isJust sz = + alterTransferInfo t $ \i -> i { bytesComplete = sz } + | otherwise = noop diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs new file mode 100644 index 000000000..ba302d6bb --- /dev/null +++ b/Assistant/Threads/TransferScanner.hs @@ -0,0 +1,183 @@ +{- git-annex assistant thread to scan remotes to find needed transfers + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.TransferScanner where + +import Assistant.Common +import Assistant.Types.ScanRemotes +import Assistant.ScanRemotes +import Assistant.TransferQueue +import Assistant.DaemonStatus +import Assistant.Drop +import Assistant.Sync +import Assistant.DeleteRemote +import Assistant.Types.UrlRenderer +import Logs.Transfer +import Logs.Location +import Logs.Group +import Logs.Web (webUUID) +import qualified Remote +import qualified Types.Remote as Remote +import Utility.ThreadScheduler +import Utility.NotificationBroadcaster +import Utility.Batch +import qualified Git.LsFiles as LsFiles +import qualified Backend +import Annex.Content +import Annex.Wanted + +import qualified Data.Set as S + +{- This thread waits until a remote needs to be scanned, to find transfers + - that need to be made, to keep data in sync. + -} +transferScannerThread :: UrlRenderer -> NamedThread +transferScannerThread urlrenderer = namedThread "TransferScanner" $ do + startupScan + go S.empty + where + go scanned = do + scanrunning False + liftIO $ threadDelaySeconds (Seconds 2) + (rs, infos) <- unzip <$> getScanRemote + scanrunning True + if any fullScan infos || any (`S.notMember` scanned) rs + then do + expensiveScan urlrenderer rs + go $ scanned `S.union` S.fromList rs + else do + mapM_ failedTransferScan rs + go scanned + scanrunning b = do + ds <- modifyDaemonStatus $ \s -> + (s { transferScanRunning = b }, s) + liftIO $ sendNotification $ transferNotifier ds + + {- All git remotes are synced, and all available remotes + - are scanned in full on startup, for multiple reasons, including: + - + - * This may be the first run, and there may be remotes + - already in place, that need to be synced. + - * Changes may have been made last time we run, but remotes were + - not available to be synced with. + - * Changes may have been made to remotes while we were down. + - * We may have run before, and scanned a remote, but + - only been in a subdirectory of the git remote, and so + - not synced it all. + - * We may have run before, and had transfers queued, + - and then the system (or us) crashed, and that info was + - lost. + - * A remote may be in the unwanted group, and this is a chance + - to determine if the remote has been emptied. + -} + startupScan = do + reconnectRemotes True =<< syncGitRemotes <$> getDaemonStatus + addScanRemotes True =<< syncDataRemotes <$> getDaemonStatus + +{- This is a cheap scan for failed transfers involving a remote. -} +failedTransferScan :: Remote -> Assistant () +failedTransferScan r = do + failed <- liftAnnex $ clearFailedTransfers (Remote.uuid r) + mapM_ retry failed + where + retry (t, info) + | transferDirection t == Download = + {- Check if the remote still has the key. + - If not, relies on the expensiveScan to + - get it queued from some other remote. -} + whenM (liftAnnex $ remoteHas r $ transferKey t) $ + requeue t info + | otherwise = + {- The Transferrer checks when uploading + - that the remote doesn't already have the + - key, so it's not redundantly checked here. -} + requeue t info + requeue t info = queueTransferWhenSmall "retrying failed transfer" (associatedFile info) t r + +{- This is a expensive scan through the full git work tree, finding + - files to transfer. The scan is blocked when the transfer queue gets + - too large. + - + - This also finds files that are present either here or on a remote + - but that are not preferred content, and drops them. Searching for files + - to drop is done concurrently with the scan for transfers. + - + - TODO: It would be better to first drop as much as we can, before + - transferring much, to minimise disk use. + - + - During the scan, we'll also check if any unwanted repositories are empty, + - and can be removed. While unrelated, this is a cheap place to do it, + - since we need to look at the locations of all keys anyway. + -} +expensiveScan :: UrlRenderer -> [Remote] -> Assistant () +expensiveScan urlrenderer rs = unless onlyweb $ batch <~> do + debug ["starting scan of", show visiblers] + + let us = map Remote.uuid rs + + mapM_ (liftAnnex . clearFailedTransfers) us + + unwantedrs <- liftAnnex $ S.fromList + <$> filterM inUnwantedGroup us + + g <- liftAnnex gitRepo + (files, cleanup) <- liftIO $ LsFiles.inRepo [] g + removablers <- scan unwantedrs files + void $ liftIO cleanup + + debug ["finished scan of", show visiblers] + + remove <- asIO1 $ removableRemote urlrenderer + liftIO $ mapM_ (void . tryNonAsync . remove) $ S.toList removablers + where + onlyweb = all (== webUUID) $ map Remote.uuid rs + visiblers = let rs' = filter (not . Remote.readonly) rs + in if null rs' then rs else rs' + + scan unwanted [] = return unwanted + scan unwanted (f:fs) = do + (unwanted', ts) <- maybe + (return (unwanted, [])) + (findtransfers f unwanted) + =<< liftAnnex (Backend.lookupFile f) + mapM_ (enqueue f) ts + scan unwanted' fs + + enqueue f (r, t) = + queueTransferWhenSmall "expensive scan found missing object" + (Just f) t r + findtransfers f unwanted (key, _) = do + {- The syncable remotes may have changed since this + - scan began. -} + syncrs <- syncDataRemotes <$> getDaemonStatus + locs <- liftAnnex $ loggedLocations key + present <- liftAnnex $ inAnnex key + handleDropsFrom locs syncrs + "expensive scan found too many copies of object" + present key (Just f) Nothing + liftAnnex $ do + let slocs = S.fromList locs + let use a = return $ mapMaybe (a key slocs) syncrs + ts <- if present + then filterM (wantSend True (Just f) . Remote.uuid . fst) + =<< use (genTransfer Upload False) + else ifM (wantGet True $ Just f) + ( use (genTransfer Download True) , return [] ) + let unwanted' = S.difference unwanted slocs + return (unwanted', ts) + +genTransfer :: Direction -> Bool -> Key -> S.Set UUID -> Remote -> Maybe (Remote, Transfer) +genTransfer direction want key slocs r + | direction == Upload && Remote.readonly r = Nothing + | S.member (Remote.uuid r) slocs == want = Just + (r, Transfer direction (Remote.uuid r) key) + | otherwise = Nothing + +remoteHas :: Remote -> Key -> Annex Bool +remoteHas r key = elem + <$> pure (Remote.uuid r) + <*> loggedLocations key diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs new file mode 100644 index 000000000..cd7282865 --- /dev/null +++ b/Assistant/Threads/TransferWatcher.hs @@ -0,0 +1,104 @@ +{- git-annex assistant transfer watching thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.TransferWatcher where + +import Assistant.Common +import Assistant.DaemonStatus +import Assistant.TransferSlots +import Logs.Transfer +import Utility.DirWatcher +import Utility.DirWatcher.Types +import qualified Remote + +import Control.Concurrent +import qualified Data.Map as M + +{- This thread watches for changes to the gitAnnexTransferDir, + - and updates the DaemonStatus's map of ongoing transfers. -} +transferWatcherThread :: NamedThread +transferWatcherThread = namedThread "TransferWatcher" $ do + dir <- liftAnnex $ gitAnnexTransferDir <$> gitRepo + liftIO $ createDirectoryIfMissing True dir + let hook a = Just <$> asIO2 (runHandler a) + addhook <- hook onAdd + delhook <- hook onDel + modifyhook <- hook onModify + errhook <- hook onErr + let hooks = mkWatchHooks + { addHook = addhook + , delHook = delhook + , modifyHook = modifyhook + , errHook = errhook + } + void $ liftIO $ watchDir dir (const False) hooks id + debug ["watching for transfers"] + +type Handler = FilePath -> Assistant () + +{- Runs an action handler. + - + - Exceptions are ignored, otherwise a whole thread could be crashed. + -} +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file _filestatus = + either (liftIO . print) (const noop) =<< tryIO <~> handler file + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr = error + +{- Called when a new transfer information file is written. -} +onAdd :: Handler +onAdd file = case parseTransferFile file of + Nothing -> noop + Just t -> go t =<< liftAnnex (checkTransfer t) + where + go _ Nothing = noop -- transfer already finished + go t (Just info) = do + debug [ "transfer starting:", describeTransfer t info ] + r <- liftAnnex $ Remote.remoteFromUUID $ transferUUID t + updateTransferInfo t info { transferRemote = r } + +{- Called when a transfer information file is updated. + - + - The only thing that should change in the transfer info is the + - bytesComplete, so that's the only thing updated in the DaemonStatus. -} +onModify :: Handler +onModify file = case parseTransferFile file of + Nothing -> noop + Just t -> go t =<< liftIO (readTransferInfoFile Nothing file) + where + go _ Nothing = noop + go t (Just newinfo) = alterTransferInfo t $ + \i -> i { bytesComplete = bytesComplete newinfo } + +{- This thread can only watch transfer sizes when the DirWatcher supports + - tracking modificatons to files. -} +watchesTransferSize :: Bool +watchesTransferSize = modifyTracked + +{- Called when a transfer information file is removed. -} +onDel :: Handler +onDel file = case parseTransferFile file of + Nothing -> noop + Just t -> do + debug [ "transfer finishing:", show t] + minfo <- removeTransfer t + + -- Run transfer hook. + m <- transferHook <$> getDaemonStatus + maybe noop (\hook -> void $ liftIO $ forkIO $ hook t) + (M.lookup (transferKey t) m) + + finished <- asIO2 finishedTransfer + void $ liftIO $ forkIO $ do + {- XXX race workaround delay. The location + - log needs to be updated before finishedTransfer + - runs. -} + threadDelay 10000000 -- 10 seconds + finished t minfo diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs new file mode 100644 index 000000000..0bc419e15 --- /dev/null +++ b/Assistant/Threads/Transferrer.hs @@ -0,0 +1,25 @@ +{- git-annex assistant data transferrer thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Transferrer where + +import Assistant.Common +import Assistant.TransferQueue +import Assistant.TransferSlots +import Logs.Transfer +import Config.Files + +{- Dispatches transfers from the queue. -} +transfererThread :: NamedThread +transfererThread = namedThread "Transferrer" $ do + program <- liftIO readProgramFile + forever $ inTransferSlot program $ + maybe (return Nothing) (uncurry genTransfer) + =<< getNextTransfer notrunning + where + {- Skip transfers that are already running. -} + notrunning = isNothing . startedTime diff --git a/Assistant/Threads/UpgradeWatcher.hs b/Assistant/Threads/UpgradeWatcher.hs new file mode 100644 index 000000000..80f2040a0 --- /dev/null +++ b/Assistant/Threads/UpgradeWatcher.hs @@ -0,0 +1,109 @@ +{- git-annex assistant thread to detect when git-annex is upgraded + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Threads.UpgradeWatcher ( + upgradeWatcherThread +) where + +import Assistant.Common +import Assistant.Upgrade +import Utility.DirWatcher +import Utility.DirWatcher.Types +import Utility.ThreadScheduler +import Assistant.Types.UrlRenderer +import Assistant.Alert +import Assistant.DaemonStatus +#ifdef WITH_WEBAPP +import Assistant.WebApp.Types +import qualified Build.SysConfig +#endif + +import Control.Concurrent.MVar +import qualified Data.Text as T + +data WatcherState = InStartupScan | Started | Upgrading + deriving (Eq) + +upgradeWatcherThread :: UrlRenderer -> NamedThread +upgradeWatcherThread urlrenderer = namedThread "UpgradeWatcher" $ do + whenM (liftIO checkSuccessfulUpgrade) $ + showSuccessfulUpgrade urlrenderer + go =<< liftIO upgradeFlagFile + where + go Nothing = debug [ "cannot determine program path" ] + go (Just flagfile) = do + mvar <- liftIO $ newMVar InStartupScan + changed <- Just <$> asIO2 (changedFile urlrenderer mvar flagfile) + let hooks = mkWatchHooks + { addHook = changed + , delHook = changed + , addSymlinkHook = changed + , modifyHook = changed + , delDirHook = changed + } + let dir = parentDir flagfile + let depth = length (splitPath dir) + 1 + let nosubdirs f = length (splitPath f) == depth + void $ liftIO $ watchDir dir nosubdirs hooks (startup mvar) + -- Ignore bogus events generated during the startup scan. + startup mvar scanner = do + r <- scanner + void $ swapMVar mvar Started + return r + +changedFile :: UrlRenderer -> MVar WatcherState -> FilePath -> FilePath -> Maybe FileStatus -> Assistant () +changedFile urlrenderer mvar flagfile file _status + | flagfile /= file = noop + | otherwise = do + state <- liftIO $ readMVar mvar + when (state == Started) $ do + setstate Upgrading + ifM (liftIO upgradeSanityCheck) + ( handleUpgrade urlrenderer + , do + debug ["new version failed sanity check; not using"] + setstate Started + ) + where + setstate = void . liftIO . swapMVar mvar + +handleUpgrade :: UrlRenderer -> Assistant () +handleUpgrade urlrenderer = do + -- Wait 2 minutes for any final upgrade changes to settle. + -- (For example, other associated files may be being put into + -- place.) Not needed when using a distribution bundle, because + -- in that case git-annex handles the upgrade in a non-racy way. + liftIO $ unlessM usingDistribution $ + threadDelaySeconds (Seconds 120) + ifM autoUpgradeEnabled + ( do + debug ["starting automatic upgrade"] + unattendedUpgrade +#ifdef WITH_WEBAPP + , do + button <- mkAlertButton True (T.pack "Finish Upgrade") urlrenderer ConfigFinishUpgradeR + void $ addAlert $ upgradeReadyAlert button +#else + , noop +#endif + ) + +showSuccessfulUpgrade :: UrlRenderer -> Assistant () +showSuccessfulUpgrade urlrenderer = do +#ifdef WITH_WEBAPP + button <- ifM autoUpgradeEnabled + ( pure Nothing + , Just <$> mkAlertButton True + (T.pack "Enable Automatic Upgrades") + urlrenderer ConfigEnableAutomaticUpgradeR + ) + void $ addAlert $ upgradeFinishedAlert button Build.SysConfig.packageversion +#else + noop +#endif diff --git a/Assistant/Threads/Upgrader.hs b/Assistant/Threads/Upgrader.hs new file mode 100644 index 000000000..f0c47e844 --- /dev/null +++ b/Assistant/Threads/Upgrader.hs @@ -0,0 +1,101 @@ +{- git-annex assistant thread to detect when upgrade is available + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Threads.Upgrader ( + upgraderThread +) where + +import Assistant.Common +import Assistant.Upgrade + +import Assistant.Types.UrlRenderer +import Assistant.DaemonStatus +import Assistant.Alert +import Utility.NotificationBroadcaster +import Utility.Tmp +import qualified Annex +import qualified Build.SysConfig +import qualified Utility.Url as Url +import qualified Annex.Url as Url +import qualified Git.Version +import Types.Distribution +#ifdef WITH_WEBAPP +import Assistant.WebApp.Types +#endif + +import Data.Time.Clock +import qualified Data.Text as T + +upgraderThread :: UrlRenderer -> NamedThread +upgraderThread urlrenderer = namedThread "Upgrader" $ + when (isJust Build.SysConfig.upgradelocation) $ do + {- Check for upgrade on startup, unless it was just + - upgraded. -} + unlessM (liftIO checkSuccessfulUpgrade) $ + checkUpgrade urlrenderer + h <- liftIO . newNotificationHandle False . networkConnectedNotifier =<< getDaemonStatus + go h =<< liftIO getCurrentTime + where + {- Wait for a network connection event. Then see if it's been + - half a day since the last upgrade check. If so, proceed with + - check. -} + go h lastchecked = do + liftIO $ waitNotification h + autoupgrade <- liftAnnex $ annexAutoUpgrade <$> Annex.getGitConfig + if autoupgrade == NoAutoUpgrade + then go h lastchecked + else do + now <- liftIO getCurrentTime + if diffUTCTime now lastchecked > halfday + then do + checkUpgrade urlrenderer + go h =<< liftIO getCurrentTime + else go h lastchecked + halfday = 12 * 60 * 60 + +checkUpgrade :: UrlRenderer -> Assistant () +checkUpgrade urlrenderer = do + debug [ "Checking if an upgrade is available." ] + go =<< getDistributionInfo + where + go Nothing = debug [ "Failed to check if upgrade is available." ] + go (Just d) = do + let installed = Git.Version.normalize Build.SysConfig.packageversion + let avail = Git.Version.normalize $ distributionVersion d + let old = Git.Version.normalize <$> distributionUrgentUpgrade d + if Just installed <= old + then canUpgrade High urlrenderer d + else if installed < avail + then canUpgrade Low urlrenderer d + else debug [ "No new version found." ] + +canUpgrade :: AlertPriority -> UrlRenderer -> GitAnnexDistribution -> Assistant () +canUpgrade urgency urlrenderer d = ifM autoUpgradeEnabled + ( startDistributionDownload d + , do +#ifdef WITH_WEBAPP + button <- mkAlertButton True (T.pack "Upgrade") urlrenderer (ConfigStartUpgradeR d) + void $ addAlert (canUpgradeAlert urgency (distributionVersion d) button) +#else + noop +#endif + ) + +getDistributionInfo :: Assistant (Maybe GitAnnexDistribution) +getDistributionInfo = do + ua <- liftAnnex Url.getUserAgent + liftIO $ withTmpFile "git-annex.tmp" $ \tmpfile h -> do + hClose h + ifM (Url.downloadQuiet distributionInfoUrl [] [] tmpfile ua) + ( readish <$> readFileStrict tmpfile + , return Nothing + ) + +distributionInfoUrl :: String +distributionInfoUrl = fromJust Build.SysConfig.upgradelocation ++ ".info" diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs new file mode 100644 index 000000000..6a56eadbb --- /dev/null +++ b/Assistant/Threads/Watcher.hs @@ -0,0 +1,355 @@ +{- git-annex assistant tree watcher + - + - Copyright 2012-2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE DeriveDataTypeable, CPP #-} + +module Assistant.Threads.Watcher ( + watchThread, + WatcherControl(..), + checkCanWatch, + needLsof, + onAddSymlink, + runHandler, +) where + +import Assistant.Common +import Assistant.DaemonStatus +import Assistant.Changes +import Assistant.Types.Changes +import Assistant.Alert +import Utility.DirWatcher +import Utility.DirWatcher.Types +import qualified Utility.Lsof as Lsof +import qualified Annex +import qualified Annex.Queue +import qualified Git +import qualified Git.UpdateIndex +import qualified Git.LsFiles as LsFiles +import qualified Backend +import Annex.Direct +import Annex.Content.Direct +import Annex.CatFile +import Annex.CheckIgnore +import Annex.Link +import Annex.FileMatcher +import Annex.ReplaceFile +import Git.Types +import Config +import Utility.ThreadScheduler + +import Data.Bits.Utils +import Data.Typeable +import qualified Data.ByteString.Lazy as L +import qualified Control.Exception as E +import Data.Time.Clock + +checkCanWatch :: Annex () +checkCanWatch + | canWatch = do + liftIO Lsof.setup + unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) + needLsof + | otherwise = error "watch mode is not available on this system" + +needLsof :: Annex () +needLsof = error $ unlines + [ "The lsof command is needed for watch mode to be safe, and is not in PATH." + , "To override lsof checks to ensure that files are not open for writing" + , "when added to the annex, you can use --force" + , "Be warned: This can corrupt data in the annex, and make fsck complain." + ] + +{- A special exception that can be thrown to pause or resume the watcher. -} +data WatcherControl = PauseWatcher | ResumeWatcher + deriving (Show, Eq, Typeable) + +instance E.Exception WatcherControl + +watchThread :: NamedThread +watchThread = namedThread "Watcher" $ + ifM (liftAnnex $ annexAutoCommit <$> Annex.getGitConfig) + ( runWatcher + , waitFor ResumeWatcher runWatcher + ) + +runWatcher :: Assistant () +runWatcher = do + startup <- asIO1 startupScan + matcher <- liftAnnex largeFilesMatcher + direct <- liftAnnex isDirect + symlinkssupported <- liftAnnex $ coreSymlinks <$> Annex.getGitConfig + addhook <- hook $ if direct + then onAddDirect symlinkssupported matcher + else onAdd matcher + delhook <- hook onDel + addsymlinkhook <- hook $ onAddSymlink direct + deldirhook <- hook onDelDir + errhook <- hook onErr + let hooks = mkWatchHooks + { addHook = addhook + , delHook = delhook + , addSymlinkHook = addsymlinkhook + , delDirHook = deldirhook + , errHook = errhook + } + handle <- liftIO $ watchDir "." ignored hooks startup + debug [ "watching", "."] + + {- Let the DirWatcher thread run until signalled to pause it, + - then wait for a resume signal, and restart. -} + waitFor PauseWatcher $ do + liftIO $ stopWatchDir handle + waitFor ResumeWatcher runWatcher + where + hook a = Just <$> asIO2 (runHandler a) + +waitFor :: WatcherControl -> Assistant () -> Assistant () +waitFor sig next = do + r <- liftIO (E.try pause :: IO (Either E.SomeException ())) + case r of + Left e -> case E.fromException e of + Just s + | s == sig -> next + _ -> noop + _ -> noop + where + pause = runEvery (Seconds 86400) noop + +{- Initial scartup scan. The action should return once the scan is complete. -} +startupScan :: IO a -> Assistant a +startupScan scanner = do + liftAnnex $ showAction "scanning" + alertWhile' startupScanAlert $ do + r <- liftIO scanner + + -- Notice any files that were deleted before + -- watching was started. + top <- liftAnnex $ fromRepo Git.repoPath + (fs, cleanup) <- liftAnnex $ inRepo $ LsFiles.deleted [top] + forM_ fs $ \f -> do + liftAnnex $ onDel' f + maybe noop recordChange =<< madeChange f RmChange + void $ liftIO cleanup + + liftAnnex $ showAction "started" + liftIO $ putStrLn "" + + modifyDaemonStatus_ $ \s -> s { scanComplete = True } + + return (True, r) + +{- Hardcoded ignores, passed to the DirWatcher so it can avoid looking + - at the entire .git directory. Does not include .gitignores. -} +ignored :: FilePath -> Bool +ignored = ig . takeFileName + where + ig ".git" = True + ig ".gitignore" = True + ig ".gitattributes" = True +#ifdef darwin_HOST_OS + ig ".DS_Store" = True +#endif + ig _ = False + +unlessIgnored :: FilePath -> Assistant (Maybe Change) -> Assistant (Maybe Change) +unlessIgnored file a = ifM (liftAnnex $ checkIgnored file) + ( noChange + , a + ) + +type Handler = FilePath -> Maybe FileStatus -> Assistant (Maybe Change) + +{- Runs an action handler, and if there was a change, adds it to the ChangeChan. + - + - Exceptions are ignored, otherwise a whole watcher thread could be crashed. + -} +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file filestatus = void $ do + r <- tryIO <~> handler (normalize file) filestatus + case r of + Left e -> liftIO $ print e + Right Nothing -> noop + Right (Just change) -> do + -- Just in case the commit thread is not + -- flushing the queue fast enough. + liftAnnex Annex.Queue.flushWhenFull + recordChange change + where + normalize f + | "./" `isPrefixOf` file = drop 2 f + | otherwise = f + +{- Small files are added to git as-is, while large ones go into the annex. -} +add :: FileMatcher -> FilePath -> Assistant (Maybe Change) +add bigfilematcher file = ifM (liftAnnex $ checkFileMatcher bigfilematcher file) + ( pendingAddChange file + , do + liftAnnex $ Annex.Queue.addCommand "add" + [Params "--force --"] [file] + madeChange file AddFileChange + ) + +onAdd :: FileMatcher -> Handler +onAdd matcher file filestatus + | maybe False isRegularFile filestatus = + unlessIgnored file $ + add matcher file + | otherwise = noChange + +shouldRestage :: DaemonStatus -> Bool +shouldRestage ds = scanComplete ds || forceRestage ds + +{- In direct mode, add events are received for both new files, and + - modified existing files. + -} +onAddDirect :: Bool -> FileMatcher -> Handler +onAddDirect symlinkssupported matcher file fs = do + v <- liftAnnex $ catKeyFile file + case (v, fs) of + (Just key, Just filestatus) -> + ifM (liftAnnex $ sameFileStatus key filestatus) + {- It's possible to get an add event for + - an existing file that is not + - really modified, but it might have + - just been deleted and been put back, + - so it symlink is restaged to make sure. -} + ( ifM (shouldRestage <$> getDaemonStatus) + ( do + link <- liftAnnex $ inRepo $ gitAnnexLink file key + addLink file link (Just key) + , noChange + ) + , guardSymlinkStandin (Just key) $ do + debug ["changed direct", file] + liftAnnex $ changedDirect key file + add matcher file + ) + _ -> unlessIgnored file $ + guardSymlinkStandin Nothing $ do + debug ["add direct", file] + add matcher file + where + {- On a filesystem without symlinks, we'll get changes for regular + - files that git uses to stand-in for symlinks. Detect when + - this happens, and stage the symlink, rather than annexing the + - file. -} + guardSymlinkStandin mk a + | symlinkssupported = a + | otherwise = do + linktarget <- liftAnnex $ getAnnexLinkTarget file + case linktarget of + Nothing -> a + Just lt -> do + case fileKey $ takeFileName lt of + Nothing -> noop + Just key -> void $ liftAnnex $ + addAssociatedFile key file + onAddSymlink' linktarget mk True file fs + +{- A symlink might be an arbitrary symlink, which is just added. + - Or, if it is a git-annex symlink, ensure it points to the content + - before adding it. + -} +onAddSymlink :: Bool -> Handler +onAddSymlink isdirect file filestatus = unlessIgnored file $ do + linktarget <- liftIO (catchMaybeIO $ readSymbolicLink file) + kv <- liftAnnex (Backend.lookupFile file) + onAddSymlink' linktarget (fmap fst kv) isdirect file filestatus + +onAddSymlink' :: Maybe String -> Maybe Key -> Bool -> Handler +onAddSymlink' linktarget mk isdirect file filestatus = go mk + where + go (Just key) = do + when isdirect $ + liftAnnex $ void $ addAssociatedFile key file + link <- liftAnnex $ inRepo $ gitAnnexLink file key + if linktarget == Just link + then ensurestaged (Just link) =<< getDaemonStatus + else do + unless isdirect $ + liftAnnex $ replaceFile file $ + makeAnnexLink link + addLink file link (Just key) + -- other symlink, not git-annex + go Nothing = ensurestaged linktarget =<< getDaemonStatus + + {- This is often called on symlinks that are already + - staged correctly. A symlink may have been deleted + - and being re-added, or added when the watcher was + - not running. So they're normally restaged to make sure. + - + - As an optimisation, during the startup scan, avoid + - restaging everything. Only links that were created since + - the last time the daemon was running are staged. + - (If the daemon has never ran before, avoid staging + - links too.) + -} + ensurestaged (Just link) daemonstatus + | shouldRestage daemonstatus = addLink file link mk + | otherwise = case filestatus of + Just s + | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange + _ -> addLink file link mk + ensurestaged Nothing _ = noChange + +{- For speed, tries to reuse the existing blob for symlink target. -} +addLink :: FilePath -> FilePath -> Maybe Key -> Assistant (Maybe Change) +addLink file link mk = do + debug ["add symlink", file] + liftAnnex $ do + v <- catObjectDetails $ Ref $ ':':file + case v of + Just (currlink, sha, _type) + | s2w8 link == L.unpack currlink -> + stageSymlink file sha + _ -> stageSymlink file =<< hashSymlink link + madeChange file $ LinkChange mk + +onDel :: Handler +onDel file _ = do + debug ["file deleted", file] + liftAnnex $ onDel' file + madeChange file RmChange + +onDel' :: FilePath -> Annex () +onDel' file = do + whenM isDirect $ do + mkey <- catKeyFile file + case mkey of + Nothing -> noop + Just key -> void $ removeAssociatedFile key file + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.unstageFile file) + +{- A directory has been deleted, or moved, so tell git to remove anything + - that was inside it from its cache. Since it could reappear at any time, + - use --cached to only delete it from the index. + - + - This queues up a lot of RmChanges, which assists the Committer in + - pairing up renamed files when the directory was renamed. -} +onDelDir :: Handler +onDelDir dir _ = do + debug ["directory deleted", dir] + (fs, clean) <- liftAnnex $ inRepo $ LsFiles.deleted [dir] + + liftAnnex $ mapM_ onDel' fs + + -- Get the events queued up as fast as possible, so the + -- committer sees them all in one block. + now <- liftIO getCurrentTime + recordChanges $ map (\f -> Change now f RmChange) fs + + void $ liftIO clean + liftAnnex Annex.Queue.flushWhenFull + noChange + +{- Called when there's an error with inotify or kqueue. -} +onErr :: Handler +onErr msg _ = do + liftAnnex $ warning msg + void $ addAlert $ warningAlert "watcher" msg + noChange diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs new file mode 100644 index 000000000..2ad61168e --- /dev/null +++ b/Assistant/Threads/WebApp.hs @@ -0,0 +1,109 @@ +{- git-annex assistant webapp thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE TemplateHaskell, MultiParamTypeClasses #-} +{-# LANGUAGE CPP #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} + +module Assistant.Threads.WebApp where + +import Assistant.Common +import Assistant.WebApp +import Assistant.WebApp.Types +import Assistant.WebApp.DashBoard +import Assistant.WebApp.SideBar +import Assistant.WebApp.Notifications +import Assistant.WebApp.RepoList +import Assistant.WebApp.Configurators +import Assistant.WebApp.Configurators.Local +import Assistant.WebApp.Configurators.Ssh +import Assistant.WebApp.Configurators.Pairing +import Assistant.WebApp.Configurators.AWS +import Assistant.WebApp.Configurators.IA +import Assistant.WebApp.Configurators.WebDAV +import Assistant.WebApp.Configurators.XMPP +import Assistant.WebApp.Configurators.Preferences +import Assistant.WebApp.Configurators.Edit +import Assistant.WebApp.Configurators.Delete +import Assistant.WebApp.Configurators.Fsck +import Assistant.WebApp.Configurators.Upgrade +import Assistant.WebApp.Documentation +import Assistant.WebApp.Control +import Assistant.WebApp.OtherRepos +import Assistant.WebApp.Repair +import Assistant.Types.ThreadedMonad +import Utility.WebApp +import Utility.Tmp +import Utility.FileMode +import Git + +import Yesod +import Network.Socket (SockAddr, HostName) +import Data.Text (pack, unpack) + +mkYesodDispatch "WebApp" $(parseRoutesFile "Assistant/WebApp/routes") + +type Url = String + +webAppThread + :: AssistantData + -> UrlRenderer + -> Bool + -> Maybe String + -> Maybe HostName + -> Maybe (IO Url) + -> Maybe (Url -> FilePath -> IO ()) + -> NamedThread +webAppThread assistantdata urlrenderer noannex cannotrun listenhost postfirstrun onstartup = thread $ liftIO $ do +#ifdef __ANDROID__ + when (isJust listenhost) $ + -- See Utility.WebApp + error "Sorry, --listen is not currently supported on Android" +#endif + webapp <- WebApp + <$> pure assistantdata + <*> (pack <$> genRandomToken) + <*> getreldir + <*> pure staticRoutes + <*> pure postfirstrun + <*> pure cannotrun + <*> pure noannex + <*> pure listenhost + setUrlRenderer urlrenderer $ yesodRender webapp (pack "") + app <- toWaiAppPlain webapp + app' <- ifM debugEnabled + ( return $ httpDebugLogger app + , return app + ) + runWebApp listenhost app' $ \addr -> if noannex + then withTmpFile "webapp.html" $ \tmpfile _ -> + go addr webapp tmpfile Nothing + else do + let st = threadState assistantdata + htmlshim <- runThreadState st $ fromRepo gitAnnexHtmlShim + urlfile <- runThreadState st $ fromRepo gitAnnexUrlFile + go addr webapp htmlshim (Just urlfile) + where + -- The webapp thread does not wait for the startupSanityCheckThread + -- to finish, so that the user interface remains responsive while + -- that's going on. + thread = namedThreadUnchecked "WebApp" + getreldir + | noannex = return Nothing + | otherwise = Just <$> + (relHome =<< absPath + =<< runThreadState (threadState assistantdata) (fromRepo repoPath)) + go addr webapp htmlshim urlfile = do + let url = myUrl webapp addr + maybe noop (`writeFileProtected` url) urlfile + writeHtmlShim "Starting webapp..." url htmlshim + maybe noop (\a -> a url htmlshim) onstartup + +myUrl :: WebApp -> SockAddr -> Url +myUrl webapp addr = unpack $ yesodRender webapp urlbase DashboardR [] + where + urlbase = pack $ "http://" ++ show addr diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs new file mode 100644 index 000000000..8eb469939 --- /dev/null +++ b/Assistant/Threads/XMPPClient.hs @@ -0,0 +1,368 @@ +{- git-annex XMPP client + - + - Copyright 2012, 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.XMPPClient where + +import Assistant.Common +import Assistant.XMPP +import Assistant.XMPP.Client +import Assistant.NetMessager +import Assistant.Types.NetMessager +import Assistant.Types.Buddies +import Assistant.XMPP.Buddies +import Assistant.Sync +import Assistant.DaemonStatus +import qualified Remote +import Utility.ThreadScheduler +import Assistant.WebApp (UrlRenderer) +import Assistant.WebApp.Types hiding (liftAssistant) +import Assistant.Alert +import Assistant.Pairing +import Assistant.XMPP.Git +import Annex.UUID +import Logs.UUID + +import Network.Protocol.XMPP +import Control.Concurrent +import Control.Concurrent.STM.TMVar +import Control.Concurrent.STM (atomically) +import qualified Data.Text as T +import qualified Data.Set as S +import qualified Data.Map as M +import qualified Git.Branch +import Data.Time.Clock +import Control.Concurrent.Async + +xmppClientThread :: UrlRenderer -> NamedThread +xmppClientThread urlrenderer = namedThread "XMPPClient" $ + restartableClient . xmppClient urlrenderer =<< getAssistant id + +{- Runs the client, handing restart events. -} +restartableClient :: (XMPPCreds -> IO ()) -> Assistant () +restartableClient a = forever $ go =<< liftAnnex getXMPPCreds + where + go Nothing = waitNetMessagerRestart + go (Just creds) = do + tid <- liftIO $ forkIO $ a creds + waitNetMessagerRestart + liftIO $ killThread tid + +xmppClient :: UrlRenderer -> AssistantData -> XMPPCreds -> IO () +xmppClient urlrenderer d creds = + retry (runclient creds) =<< getCurrentTime + where + liftAssistant = runAssistant d + inAssistant = liftIO . liftAssistant + + {- When the client exits, it's restarted; + - if it keeps failing, back off to wait 5 minutes before + - trying it again. -} + retry client starttime = do + {- The buddy list starts empty each time + - the client connects, so that stale info + - is not retained. -} + liftAssistant $ + updateBuddyList (const noBuddies) <<~ buddyList + void client + liftAssistant $ modifyDaemonStatus_ $ \s -> s + { xmppClientID = Nothing } + now <- getCurrentTime + if diffUTCTime now starttime > 300 + then do + liftAssistant $ debug ["connection lost; reconnecting"] + retry client now + else do + liftAssistant $ debug ["connection failed; will retry"] + threadDelaySeconds (Seconds 300) + retry client =<< getCurrentTime + + runclient c = liftIO $ connectXMPP c $ \jid -> do + selfjid <- bindJID jid + putStanza gitAnnexSignature + + inAssistant $ do + modifyDaemonStatus_ $ \s -> s + { xmppClientID = Just $ xmppJID creds } + debug ["connected", logJid selfjid] + + lasttraffic <- liftIO $ atomically . newTMVar =<< getCurrentTime + + sender <- xmppSession $ sendnotifications selfjid + receiver <- xmppSession $ receivenotifications selfjid lasttraffic + pinger <- xmppSession $ sendpings selfjid lasttraffic + {- Run all 3 threads concurrently, until + - any of them throw an exception. + - Then kill all 3 threads, and rethrow the + - exception. + - + - If this thread gets an exception, the 3 threads + - will also be killed. -} + liftIO $ pinger `concurrently` sender `concurrently` receiver + + sendnotifications selfjid = forever $ + join $ inAssistant $ relayNetMessage selfjid + receivenotifications selfjid lasttraffic = forever $ do + l <- decodeStanza selfjid <$> getStanza + void $ liftIO $ atomically . swapTMVar lasttraffic =<< getCurrentTime + inAssistant $ debug + ["received:", show $ map logXMPPEvent l] + mapM_ (handle selfjid) l + sendpings selfjid lasttraffic = forever $ do + putStanza pingstanza + + startping <- liftIO getCurrentTime + liftIO $ threadDelaySeconds (Seconds 120) + t <- liftIO $ atomically $ readTMVar lasttraffic + when (t < startping) $ do + inAssistant $ debug ["ping timeout"] + error "ping timeout" + where + {- XEP-0199 says that the server will respond with either + - a ping response or an error message. Either will + - cause traffic, so good enough. -} + pingstanza = xmppPing selfjid + + handle selfjid (PresenceMessage p) = do + void $ inAssistant $ + updateBuddyList (updateBuddies p) <<~ buddyList + resendImportantMessages selfjid p + handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature + handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us + handle selfjid (GotNetMessage (PairingNotification stage c u)) = + maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) + handle _ (GotNetMessage m@(Pushing _ pushstage)) + | isPushNotice pushstage = inAssistant $ handlePushNotice m + | isPushInitiation pushstage = inAssistant $ queuePushInitiation m + | otherwise = inAssistant $ storeInbox m + handle _ (Ignorable _) = noop + handle _ (Unknown _) = noop + handle _ (ProtocolError _) = noop + + resendImportantMessages selfjid (Presence { presenceFrom = Just jid }) = do + let c = formatJID jid + (stored, sent) <- inAssistant $ + checkImportantNetMessages (formatJID (baseJID jid), c) + forM_ (S.toList $ S.difference stored sent) $ \msg -> do + let msg' = readdressNetMessage msg c + inAssistant $ debug + [ "sending to new client:" + , logJid jid + , show $ logNetMessage msg' + ] + join $ inAssistant $ convertNetMsg msg' selfjid + inAssistant $ sentImportantNetMessage msg c + resendImportantMessages _ _ = noop + +data XMPPEvent + = GotNetMessage NetMessage + | PresenceMessage Presence + | Ignorable ReceivedStanza + | Unknown ReceivedStanza + | ProtocolError ReceivedStanza + deriving Show + +logXMPPEvent :: XMPPEvent -> String +logXMPPEvent (GotNetMessage m) = logNetMessage m +logXMPPEvent (PresenceMessage p) = logPresence p +logXMPPEvent (Ignorable (ReceivedPresence p)) = "Ignorable " ++ logPresence p +logXMPPEvent (Ignorable _) = "Ignorable message" +logXMPPEvent (Unknown _) = "Unknown message" +logXMPPEvent (ProtocolError _) = "Protocol error message" + +logPresence :: Presence -> String +logPresence (p@Presence { presenceFrom = Just jid }) = unwords + [ "Presence from" + , logJid jid + , show $ extractGitAnnexTag p + ] +logPresence _ = "Presence from unknown" + +logJid :: JID -> String +logJid jid = + let name = T.unpack (buddyName jid) + resource = maybe "" (T.unpack . strResource) (jidResource jid) + in take 1 name ++ show (length name) ++ "/" ++ resource + +logClient :: Client -> String +logClient (Client jid) = logJid jid + +{- Decodes an XMPP stanza into one or more events. -} +decodeStanza :: JID -> ReceivedStanza -> [XMPPEvent] +decodeStanza selfjid s@(ReceivedPresence p) + | presenceType p == PresenceError = [ProtocolError s] + | isNothing (presenceFrom p) = [Ignorable s] + | presenceFrom p == Just selfjid = [Ignorable s] + | otherwise = maybe [PresenceMessage p] decode (gitAnnexTagInfo p) + where + decode i + | tagAttr i == pushAttr = impliedp $ GotNetMessage $ NotifyPush $ + decodePushNotification (tagValue i) + | tagAttr i == queryAttr = impliedp $ GotNetMessage QueryPresence + | otherwise = [Unknown s] + {- Things sent via presence imply a presence message, + - along with their real meaning. -} + impliedp v = [PresenceMessage p, v] +decodeStanza selfjid s@(ReceivedMessage m) + | isNothing (messageFrom m) = [Ignorable s] + | messageFrom m == Just selfjid = [Ignorable s] + | messageType m == MessageError = [ProtocolError s] + | otherwise = [fromMaybe (Unknown s) (GotNetMessage <$> decodeMessage m)] +decodeStanza _ s = [Unknown s] + +{- Waits for a NetMessager message to be sent, and relays it to XMPP. + - + - Chat messages must be directed to specific clients, not a base + - account JID, due to git-annex clients using a negative presence priority. + - PairingNotification messages are always directed at specific + - clients, but Pushing messages are sometimes not, and need to be exploded + - out to specific clients. + - + - Important messages, not directed at any specific client, + - are cached to be sent later when additional clients connect. + -} +relayNetMessage :: JID -> Assistant (XMPP ()) +relayNetMessage selfjid = do + msg <- waitNetMessage + debug ["sending:", logNetMessage msg] + a1 <- handleImportant msg + a2 <- convert msg + return (a1 >> a2) + where + handleImportant msg = case parseJID =<< isImportantNetMessage msg of + Just tojid + | tojid == baseJID tojid -> do + storeImportantNetMessage msg (formatJID tojid) $ + \c -> (baseJID <$> parseJID c) == Just tojid + return $ putStanza presenceQuery + _ -> return noop + convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> + if tojid == baseJID tojid + then do + clients <- maybe [] (S.toList . buddyAssistants) + <$> getBuddy (genBuddyKey tojid) <<~ buddyList + debug ["exploded undirected message to clients", unwords $ map logClient clients] + return $ forM_ clients $ \(Client jid) -> + putStanza $ pushMessage pushstage jid selfjid + else do + debug ["to client:", logJid tojid] + return $ putStanza $ pushMessage pushstage tojid selfjid + convert msg = convertNetMsg msg selfjid + +{- Converts a NetMessage to an XMPP action. -} +convertNetMsg :: NetMessage -> JID -> Assistant (XMPP ()) +convertNetMsg msg selfjid = convert msg + where + convert (NotifyPush us) = return $ putStanza $ pushNotification us + convert QueryPresence = return $ putStanza presenceQuery + convert (PairingNotification stage c u) = withOtherClient selfjid c $ \tojid -> do + changeBuddyPairing tojid True + return $ putStanza $ pairingNotification stage u tojid selfjid + convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> + return $ putStanza $ pushMessage pushstage tojid selfjid + +withOtherClient :: JID -> ClientID -> (JID -> Assistant (XMPP ())) -> Assistant (XMPP ()) +withOtherClient selfjid c a = case parseJID c of + Nothing -> return noop + Just tojid + | tojid == selfjid -> return noop + | otherwise -> a tojid + +withClient :: ClientID -> (JID -> XMPP ()) -> XMPP () +withClient c a = maybe noop a $ parseJID c + +{- Returns an IO action that runs a XMPP action in a separate thread, + - using a session to allow it to access the same XMPP client. -} +xmppSession :: XMPP () -> XMPP (IO ()) +xmppSession a = do + s <- getSession + return $ void $ runXMPP s a + +{- We only pull from one remote out of the set listed in the push + - notification, as an optimisation. + - + - Note that it might be possible (though very unlikely) for the push + - notification to take a while to be sent, and multiple pushes happen + - before it is sent, so it includes multiple remotes that were pushed + - to at different times. + - + - It could then be the case that the remote we choose had the earlier + - push sent to it, but then failed to get the later push, and so is not + - fully up-to-date. If that happens, the pushRetryThread will come along + - and retry the push, and we'll get another notification once it succeeds, + - and pull again. -} +pull :: [UUID] -> Assistant () +pull [] = noop +pull us = do + rs <- filter matching . syncGitRemotes <$> getDaemonStatus + debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs + pullone rs =<< liftAnnex (inRepo Git.Branch.current) + where + matching r = Remote.uuid r `S.member` s + s = S.fromList us + + pullone [] _ = noop + pullone (r:rs) branch = + unlessM (null . fst <$> manualPull branch [r]) $ + pullone rs branch + +{- PairReq from another client using our JID is automatically + - accepted. This is so pairing devices all using the same XMPP + - account works without confirmations. + - + - Also, autoaccept PairReq from the same JID of any repo we've + - already paired with, as long as the UUID in the PairReq is + - one we know about. +-} +pairMsgReceived :: UrlRenderer -> PairStage -> UUID -> JID -> JID -> Assistant () +pairMsgReceived urlrenderer PairReq theiruuid selfjid theirjid + | baseJID selfjid == baseJID theirjid = autoaccept + | otherwise = do + knownjids <- mapMaybe (parseJID . getXMPPClientID) + . filter isXMPPRemote . syncRemotes <$> getDaemonStatus + um <- liftAnnex uuidMap + if elem (baseJID theirjid) knownjids && M.member theiruuid um + then autoaccept + else showalert + + where + autoaccept = do + selfuuid <- liftAnnex getUUID + sendNetMessage $ + PairingNotification PairAck (formatJID theirjid) selfuuid + finishXMPPPairing theirjid theiruuid + -- Show an alert to let the user decide if they want to pair. + showalert = do + button <- mkAlertButton True (T.pack "Respond") urlrenderer $ + ConfirmXMPPPairFriendR $ + PairKey theiruuid $ formatJID theirjid + void $ addAlert $ pairRequestReceivedAlert + (T.unpack $ buddyName theirjid) + button + +{- PairAck must come from one of the buddies we are pairing with; + - don't pair with just anyone. -} +pairMsgReceived _ PairAck theiruuid _selfjid theirjid = + whenM (isBuddyPairing theirjid) $ do + changeBuddyPairing theirjid False + selfuuid <- liftAnnex getUUID + sendNetMessage $ + PairingNotification PairDone (formatJID theirjid) selfuuid + finishXMPPPairing theirjid theiruuid + +pairMsgReceived _ PairDone _theiruuid _selfjid theirjid = + changeBuddyPairing theirjid False + +isBuddyPairing :: JID -> Assistant Bool +isBuddyPairing jid = maybe False buddyPairing <$> + getBuddy (genBuddyKey jid) <<~ buddyList + +changeBuddyPairing :: JID -> Bool -> Assistant () +changeBuddyPairing jid ispairing = + updateBuddyList (M.adjust set key) <<~ buddyList + where + key = genBuddyKey jid + set b = b { buddyPairing = ispairing } diff --git a/Assistant/Threads/XMPPPusher.hs b/Assistant/Threads/XMPPPusher.hs new file mode 100644 index 000000000..30c91c7f0 --- /dev/null +++ b/Assistant/Threads/XMPPPusher.hs @@ -0,0 +1,81 @@ +{- git-annex XMPP pusher threads + - + - This is a pair of threads. One handles git send-pack, + - and the other git receive-pack. Each thread can be running at most + - one such operation at a time. + - + - Why not use a single thread? Consider two clients A and B. + - If both decide to run a receive-pack at the same time to the other, + - they would deadlock with only one thread. For larger numbers of + - clients, the two threads are also sufficient. + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.XMPPPusher where + +import Assistant.Common +import Assistant.NetMessager +import Assistant.Types.NetMessager +import Assistant.WebApp (UrlRenderer) +import Assistant.WebApp.Configurators.XMPP (checkCloudRepos) +import Assistant.XMPP.Git + +import Control.Exception as E + +xmppSendPackThread :: UrlRenderer -> NamedThread +xmppSendPackThread = pusherThread "XMPPSendPack" SendPack + +xmppReceivePackThread :: UrlRenderer -> NamedThread +xmppReceivePackThread = pusherThread "XMPPReceivePack" ReceivePack + +pusherThread :: String -> PushSide -> UrlRenderer -> NamedThread +pusherThread threadname side urlrenderer = namedThread threadname $ go Nothing + where + go lastpushedto = do + msg <- waitPushInitiation side $ selectNextPush lastpushedto + debug ["started running push", logNetMessage msg] + + runpush <- asIO $ runPush checker msg + r <- liftIO (E.try runpush :: IO (Either SomeException (Maybe ClientID))) + let successful = case r of + Right (Just _) -> True + _ -> False + + {- Empty the inbox, because stuff may have + - been left in it if the push failed. -} + let justpushedto = getclient msg + maybe noop (`emptyInbox` side) justpushedto + + debug ["finished running push", logNetMessage msg, show successful] + go $ if successful then justpushedto else lastpushedto + + checker = checkCloudRepos urlrenderer + + getclient (Pushing cid _) = Just cid + getclient _ = Nothing + +{- Select the next push to run from the queue. + - The queue cannot be empty! + - + - We prefer to select the most recently added push, because its requestor + - is more likely to still be connected. + - + - When passed the ID of a client we just pushed to, we prefer to not + - immediately push again to that same client. This avoids one client + - drowing out others. So pushes from the client we just pushed to are + - relocated to the beginning of the list, to be processed later. + -} +selectNextPush :: Maybe ClientID -> [NetMessage] -> (NetMessage, [NetMessage]) +selectNextPush _ (m:[]) = (m, []) -- common case +selectNextPush _ [] = error "selectNextPush: empty list" +selectNextPush lastpushedto l = go [] l + where + go (r:ejected) [] = (r, ejected) + go rejected (m:ms) = case m of + (Pushing clientid _) + | Just clientid /= lastpushedto -> (m, rejected ++ ms) + _ -> go (m:rejected) ms + go [] [] = undefined diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs new file mode 100644 index 000000000..f94e73c2b --- /dev/null +++ b/Assistant/TransferQueue.hs @@ -0,0 +1,223 @@ +{- git-annex assistant pending transfer queue + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.TransferQueue ( + TransferQueue, + Schedule(..), + newTransferQueue, + getTransferQueue, + queueTransfers, + queueTransfersMatching, + queueDeferredDownloads, + queueTransfer, + queueTransferAt, + queueTransferWhenSmall, + getNextTransfer, + getMatchingTransfers, + dequeueTransfers, +) where + +import Assistant.Common +import Assistant.DaemonStatus +import Assistant.Types.TransferQueue +import Logs.Transfer +import Types.Remote +import qualified Remote +import qualified Types.Remote as Remote +import Annex.Wanted +import Utility.TList + +import Control.Concurrent.STM +import qualified Data.Map as M +import qualified Data.Set as S + +type Reason = String + +{- Reads the queue's content without blocking or changing it. -} +getTransferQueue :: Assistant [(Transfer, TransferInfo)] +getTransferQueue = (atomically . readTList . queuelist) <<~ transferQueue + +stubInfo :: AssociatedFile -> Remote -> TransferInfo +stubInfo f r = stubTransferInfo + { transferRemote = Just r + , associatedFile = f + } + +{- Adds transfers to queue for some of the known remotes. + - Honors preferred content settings, only transferring wanted files. -} +queueTransfers :: Reason -> Schedule -> Key -> AssociatedFile -> Direction -> Assistant () +queueTransfers = queueTransfersMatching (const True) + +{- Adds transfers to queue for some of the known remotes, that match a + - condition. Honors preferred content settings. -} +queueTransfersMatching :: (UUID -> Bool) -> Reason -> Schedule -> Key -> AssociatedFile -> Direction -> Assistant () +queueTransfersMatching matching reason schedule k f direction + | direction == Download = whenM (liftAnnex $ wantGet True f) go + | otherwise = go + where + go = do + + rs <- liftAnnex . selectremotes + =<< syncDataRemotes <$> getDaemonStatus + let matchingrs = filter (matching . Remote.uuid) rs + if null matchingrs + then defer + else forM_ matchingrs $ \r -> + enqueue reason schedule (gentransfer r) (stubInfo f r) + selectremotes rs + {- Queue downloads from all remotes that + - have the key. The list of remotes is ordered with + - cheapest first. More expensive ones will only be tried + - if downloading from a cheap one fails. -} + | direction == Download = do + s <- locs + return $ filter (inset s) rs + {- Upload to all remotes that want the content and don't + - already have it. -} + | otherwise = do + s <- locs + filterM (wantSend True f . Remote.uuid) $ + filter (\r -> not (inset s r || Remote.readonly r)) rs + where + locs = S.fromList <$> Remote.keyLocations k + inset s r = S.member (Remote.uuid r) s + gentransfer r = Transfer + { transferDirection = direction + , transferKey = k + , transferUUID = Remote.uuid r + } + defer + {- Defer this download, as no known remote has the key. -} + | direction == Download = do + q <- getAssistant transferQueue + void $ liftIO $ atomically $ + consTList (deferreddownloads q) (k, f) + | otherwise = noop + +{- Queues any deferred downloads that can now be accomplished, leaving + - any others in the list to try again later. -} +queueDeferredDownloads :: Reason -> Schedule -> Assistant () +queueDeferredDownloads reason schedule = do + q <- getAssistant transferQueue + l <- liftIO $ atomically $ readTList (deferreddownloads q) + rs <- syncDataRemotes <$> getDaemonStatus + left <- filterM (queue rs) l + unless (null left) $ + liftIO $ atomically $ appendTList (deferreddownloads q) left + where + queue rs (k, f) = do + uuids <- liftAnnex $ Remote.keyLocations k + let sources = filter (\r -> uuid r `elem` uuids) rs + unless (null sources) $ + forM_ sources $ \r -> + enqueue reason schedule + (gentransfer r) (stubInfo f r) + return $ null sources + where + gentransfer r = Transfer + { transferDirection = Download + , transferKey = k + , transferUUID = Remote.uuid r + } + +enqueue :: Reason -> Schedule -> Transfer -> TransferInfo -> Assistant () +enqueue reason schedule t info + | schedule == Next = go consTList + | otherwise = go snocTList + where + go modlist = whenM (add modlist) $ do + debug [ "queued", describeTransfer t info, ": " ++ reason ] + notifyTransfer + add modlist = do + q <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ atomically $ ifM (checkRunningTransferSTM dstatus t) + ( return False + , do + l <- readTList (queuelist q) + if (t `notElem` map fst l) + then do + void $ modifyTVar' (queuesize q) succ + void $ modlist (queuelist q) (t, info) + return True + else return False + ) + +{- Adds a transfer to the queue. -} +queueTransfer :: Reason -> Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant () +queueTransfer reason schedule f t remote = + enqueue reason schedule t (stubInfo f remote) + +{- Blocks until the queue is no larger than a given size, and then adds a + - transfer to the queue. -} +queueTransferAt :: Int -> Reason -> Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant () +queueTransferAt wantsz reason schedule f t remote = do + q <- getAssistant transferQueue + liftIO $ atomically $ do + sz <- readTVar (queuesize q) + unless (sz <= wantsz) $ + retry -- blocks until queuesize changes + enqueue reason schedule t (stubInfo f remote) + +queueTransferWhenSmall :: Reason -> AssociatedFile -> Transfer -> Remote -> Assistant () +queueTransferWhenSmall reason = queueTransferAt 10 reason Later + +{- Blocks until a pending transfer is available in the queue, + - and removes it. + - + - Checks that it's acceptable, before adding it to the + - currentTransfers map. If it's not acceptable, it's discarded. + - + - This is done in a single STM transaction, so there is no window + - where an observer sees an inconsistent status. -} +getNextTransfer :: (TransferInfo -> Bool) -> Assistant (Maybe (Transfer, TransferInfo)) +getNextTransfer acceptable = do + q <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ atomically $ do + sz <- readTVar (queuesize q) + if sz < 1 + then retry -- blocks until queuesize changes + else do + (r@(t,info):rest) <- readTList (queuelist q) + void $ modifyTVar' (queuesize q) pred + setTList (queuelist q) rest + if acceptable info + then do + adjustTransfersSTM dstatus $ + M.insertWith' const t info + return $ Just r + else return Nothing + +{- Moves transfers matching a condition from the queue, to the + - currentTransfers map. -} +getMatchingTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)] +getMatchingTransfers c = do + q <- getAssistant transferQueue + dstatus <- getAssistant daemonStatusHandle + liftIO $ atomically $ do + ts <- dequeueTransfersSTM q c + unless (null ts) $ + adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts + return ts + +{- Removes transfers matching a condition from the queue, and returns the + - removed transfers. -} +dequeueTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)] +dequeueTransfers c = do + q <- getAssistant transferQueue + removed <- liftIO $ atomically $ dequeueTransfersSTM q c + unless (null removed) $ + notifyTransfer + return removed + +dequeueTransfersSTM :: TransferQueue -> (Transfer -> Bool) -> STM [(Transfer, TransferInfo)] +dequeueTransfersSTM q c = do + (removed, ts) <- partition (c . fst) <$> readTList (queuelist q) + void $ writeTVar (queuesize q) (length ts) + setTList (queuelist q) ts + return removed diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs new file mode 100644 index 000000000..cb66e845a --- /dev/null +++ b/Assistant/TransferSlots.hs @@ -0,0 +1,286 @@ +{- git-annex assistant transfer slots + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.TransferSlots where + +import Assistant.Common +import Utility.ThreadScheduler +import Assistant.Types.TransferSlots +import Assistant.DaemonStatus +import Assistant.TransferrerPool +import Assistant.Types.TransferrerPool +import Assistant.Types.TransferQueue +import Assistant.TransferQueue +import Assistant.Alert +import Assistant.Alert.Utility +import Assistant.Commits +import Assistant.Drop +import Logs.Transfer +import Logs.Location +import qualified Git +import qualified Remote +import qualified Types.Remote as Remote +import Annex.Content +import Annex.Wanted +import Config.Files + +import qualified Data.Map as M +import qualified Control.Exception as E +import Control.Concurrent +import qualified Control.Concurrent.MSemN as MSemN +#ifndef mingw32_HOST_OS +import System.Posix.Process (getProcessGroupIDOf) +import System.Posix.Signals (signalProcessGroup, sigTERM, sigKILL) +#endif + +type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ())) + +{- Waits until a transfer slot becomes available, then runs a + - TransferGenerator, and then runs the transfer action in its own thread. + -} +inTransferSlot :: FilePath -> TransferGenerator -> Assistant () +inTransferSlot program gen = do + flip MSemN.wait 1 <<~ transferSlots + runTransferThread program =<< gen + +{- Runs a TransferGenerator, and its transfer action, + - without waiting for a slot to become available. -} +inImmediateTransferSlot :: FilePath -> TransferGenerator -> Assistant () +inImmediateTransferSlot program gen = do + flip MSemN.signal (-1) <<~ transferSlots + runTransferThread program =<< gen + +{- Runs a transfer action, in an already allocated transfer slot. + - Once it finishes, frees the transfer slot. + - + - Note that the action is subject to being killed when the transfer + - is canceled or paused. + - + - A PauseTransfer exception is handled by letting the action be killed, + - then pausing the thread until a ResumeTransfer exception is raised, + - then rerunning the action. + -} +runTransferThread :: FilePath -> Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()) -> Assistant () +runTransferThread _ Nothing = flip MSemN.signal 1 <<~ transferSlots +runTransferThread program (Just (t, info, a)) = do + d <- getAssistant id + aio <- asIO1 a + tid <- liftIO $ forkIO $ runTransferThread' program d aio + updateTransferInfo t $ info { transferTid = Just tid } + +runTransferThread' :: FilePath -> AssistantData -> (Transferrer -> IO ()) -> IO () +runTransferThread' program d run = go + where + go = catchPauseResume $ + withTransferrer program (transferrerPool d) + run + pause = catchPauseResume $ + runEvery (Seconds 86400) noop + {- Note: This must use E.try, rather than E.catch. + - When E.catch is used, and has called go in its exception + - handler, Control.Concurrent.throwTo will block sometimes + - when signaling. Using E.try avoids the problem. -} + catchPauseResume a' = do + r <- E.try a' :: IO (Either E.SomeException ()) + case r of + Left e -> case E.fromException e of + Just PauseTransfer -> pause + Just ResumeTransfer -> go + _ -> done + _ -> done + done = runAssistant d $ + flip MSemN.signal 1 <<~ transferSlots + +{- By the time this is called, the daemonstatus's currentTransfers map should + - already have been updated to include the transfer. -} +genTransfer :: Transfer -> TransferInfo -> TransferGenerator +genTransfer t info = case (transferRemote info, associatedFile info) of + (Just remote, Just file) + | Git.repoIsLocalUnknown (Remote.repo remote) -> do + -- optimisation for removable drives not plugged in + liftAnnex $ recordFailedTransfer t info + void $ removeTransfer t + return Nothing + | otherwise -> ifM (liftAnnex $ shouldTransfer t info) + ( do + debug [ "Transferring:" , describeTransfer t info ] + notifyTransfer + return $ Just (t, info, go remote file) + , do + debug [ "Skipping unnecessary transfer:", + describeTransfer t info ] + void $ removeTransfer t + finishedTransfer t (Just info) + return Nothing + ) + _ -> return Nothing + where + direction = transferDirection t + isdownload = direction == Download + + {- Alerts are only shown for successful transfers. + - Transfers can temporarily fail for many reasons, + - so there's no point in bothering the user about + - those. The assistant should recover. + - + - After a successful upload, handle dropping it from + - here, if desired. In this case, the remote it was + - uploaded to is known to have it. + - + - Also, after a successful transfer, the location + - log has changed. Indicate that a commit has been + - made, in order to queue a push of the git-annex + - branch out to remotes that did not participate + - in the transfer. + - + - If the process failed, it could have crashed, + - so remove the transfer from the list of current + - transfers, just in case it didn't stop + - in a way that lets the TransferWatcher do its + - usual cleanup. However, first check if something else is + - running the transfer, to avoid removing active transfers. + -} + go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info) + ( do + void $ addAlert $ makeAlertFiller True $ + transferFileAlert direction True file + unless isdownload $ + handleDrops + ("object uploaded to " ++ show remote) + True (transferKey t) + (associatedFile info) + (Just remote) + void recordCommit + , whenM (liftAnnex $ isNothing <$> checkTransfer t) $ + void $ removeTransfer t + ) + +{- Called right before a transfer begins, this is a last chance to avoid + - unnecessary transfers. + - + - For downloads, we obviously don't need to download if the already + - have the object. + - + - Smilarly, for uploads, check if the remote is known to already have + - the object. + - + - Also, uploads get queued to all remotes, in order of cost. + - This may mean, for example, that an object is uploaded over the LAN + - to a locally paired client, and once that upload is done, a more + - expensive transfer remote no longer wants the object. (Since + - all the clients have it already.) So do one last check if this is still + - preferred content. + - + - We'll also do one last preferred content check for downloads. An + - example of a case where this could be needed is if a download is queued + - for a file that gets moved out of an archive directory -- but before + - that download can happen, the file is put back in the archive. + -} +shouldTransfer :: Transfer -> TransferInfo -> Annex Bool +shouldTransfer t info + | transferDirection t == Download = + (not <$> inAnnex key) <&&> wantGet True file + | transferDirection t == Upload = case transferRemote info of + Nothing -> return False + Just r -> notinremote r + <&&> wantSend True file (Remote.uuid r) + | otherwise = return False + where + key = transferKey t + file = associatedFile info + + {- Trust the location log to check if the remote already has + - the key. This avoids a roundtrip to the remote. -} + notinremote r = notElem (Remote.uuid r) <$> loggedLocations key + +{- Queue uploads of files downloaded to us, spreading them + - out to other reachable remotes. + - + - Downloading a file may have caused a remote to not want it; + - so check for drops from remotes. + - + - Uploading a file may cause the local repo, or some other remote to not + - want it; handle that too. + -} +finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant () +finishedTransfer t (Just info) + | transferDirection t == Download = + whenM (liftAnnex $ inAnnex $ transferKey t) $ do + dodrops False + queueTransfersMatching (/= transferUUID t) + "newly received object" + Later (transferKey t) (associatedFile info) Upload + | otherwise = dodrops True + where + dodrops fromhere = handleDrops + ("drop wanted after " ++ describeTransfer t info) + fromhere (transferKey t) (associatedFile info) Nothing +finishedTransfer _ _ = noop + +{- Pause a running transfer. -} +pauseTransfer :: Transfer -> Assistant () +pauseTransfer = cancelTransfer True + +{- Cancel a running transfer. -} +cancelTransfer :: Bool -> Transfer -> Assistant () +cancelTransfer pause t = do + m <- getCurrentTransfers + unless pause $ + {- remove queued transfer -} + void $ dequeueTransfers $ equivilantTransfer t + {- stop running transfer -} + maybe noop stop (M.lookup t m) + where + stop info = do + {- When there's a thread associated with the + - transfer, it's signaled first, to avoid it + - displaying any alert about the transfer having + - failed when the transfer process is killed. -} + liftIO $ maybe noop signalthread $ transferTid info + liftIO $ maybe noop killproc $ transferPid info + if pause + then void $ alterTransferInfo t $ + \i -> i { transferPaused = True } + else void $ removeTransfer t + signalthread tid + | pause = throwTo tid PauseTransfer + | otherwise = killThread tid + killproc pid = void $ tryIO $ do +#ifndef mingw32_HOST_OS + {- In order to stop helper processes like rsync, + - kill the whole process group of the process + - running the transfer. -} + g <- getProcessGroupIDOf pid + void $ tryIO $ signalProcessGroup sigTERM g + threadDelay 50000 -- 0.05 second grace period + void $ tryIO $ signalProcessGroup sigKILL g +#else + error "TODO: cancelTransfer not implemented on Windows" +#endif + +{- Start or resume a transfer. -} +startTransfer :: Transfer -> Assistant () +startTransfer t = do + m <- getCurrentTransfers + maybe startqueued go (M.lookup t m) + where + go info = maybe (start info) resume $ transferTid info + startqueued = do + is <- map snd <$> getMatchingTransfers (== t) + maybe noop start $ headMaybe is + resume tid = do + alterTransferInfo t $ \i -> i { transferPaused = False } + liftIO $ throwTo tid ResumeTransfer + start info = do + program <- liftIO readProgramFile + inImmediateTransferSlot program $ + genTransfer t info + +getCurrentTransfers :: Assistant TransferMap +getCurrentTransfers = currentTransfers <$> getDaemonStatus diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs new file mode 100644 index 000000000..bb4648731 --- /dev/null +++ b/Assistant/TransferrerPool.hs @@ -0,0 +1,95 @@ +{- A pool of "git-annex transferkeys" processes + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.TransferrerPool where + +import Assistant.Common +import Assistant.Types.TransferrerPool +import Logs.Transfer + +#ifndef mingw32_HOST_OS +import qualified Command.TransferKeys as T +#endif + +import Control.Concurrent.STM +import System.Process (create_group) +import Control.Exception (throw) +import Control.Concurrent + +{- Runs an action with a Transferrer from the pool. -} +withTransferrer :: FilePath -> TransferrerPool -> (Transferrer -> IO a) -> IO a +withTransferrer program pool a = do + t <- maybe (mkTransferrer program) (checkTransferrer program) + =<< atomically (tryReadTChan pool) + v <- tryNonAsync $ a t + unlessM (putback t) $ + void $ forkIO $ stopTransferrer t + either throw return v + where + putback t = atomically $ ifM (isEmptyTChan pool) + ( do + writeTChan pool t + return True + , return False + ) + +{- Requests that a Transferrer perform a Transfer, and waits for it to + - finish. -} +performTransfer :: Transferrer -> Transfer -> AssociatedFile -> IO Bool +performTransfer transferrer t f = catchBoolIO $ do +#ifndef mingw32_HOST_OS + T.sendRequest t f (transferrerWrite transferrer) + T.readResponse (transferrerRead transferrer) +#else + error "TODO performTransfer not implemented on Windows" +#endif + +{- Starts a new git-annex transferkeys process, setting up a pipe + - that will be used to communicate with it. -} +mkTransferrer :: FilePath -> IO Transferrer +mkTransferrer program = do +#ifndef mingw32_HOST_OS + (myread, twrite) <- createPipe + (tread, mywrite) <- createPipe + mapM_ (\fd -> setFdOption fd CloseOnExec True) [myread, mywrite] + let params = + [ Param "transferkeys" + , Param "--readfd", Param $ show tread + , Param "--writefd", Param $ show twrite + ] + {- It's put into its own group so that the whole group can be + - killed to stop a transfer. -} + (_, _, _, pid) <- createProcess (proc program $ toCommand params) + { create_group = True } + closeFd twrite + closeFd tread + myreadh <- fdToHandle myread + mywriteh <- fdToHandle mywrite + fileEncoding myreadh + fileEncoding mywriteh + return $ Transferrer + { transferrerRead = myreadh + , transferrerWrite = mywriteh + , transferrerHandle = pid + } +#else + error "TODO mkTransferrer not implemented on Windows" +#endif + +{- Checks if a Transferrer is still running. If not, makes a new one. -} +checkTransferrer :: FilePath -> Transferrer -> IO Transferrer +checkTransferrer program t = maybe (return t) (const $ mkTransferrer program) + =<< getProcessExitCode (transferrerHandle t) + +{- Closing the fds will stop the transferrer. -} +stopTransferrer :: Transferrer -> IO () +stopTransferrer t = do + hClose $ transferrerRead t + hClose $ transferrerWrite t + void $ waitForProcess $ transferrerHandle t diff --git a/Assistant/Types/Alert.hs b/Assistant/Types/Alert.hs new file mode 100644 index 000000000..e6fbe86d3 --- /dev/null +++ b/Assistant/Types/Alert.hs @@ -0,0 +1,78 @@ +{- git-annex assistant alert types + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.Alert where + +import Utility.Tense + +import Data.Text (Text) +import qualified Data.Map as M + +{- Different classes of alerts are displayed differently. -} +data AlertClass = Success | Message | Activity | Warning | Error + deriving (Eq, Ord) + +data AlertPriority = Filler | Low | Medium | High | Pinned + deriving (Eq, Ord) + +{- An alert can have an name, which is used to combine it with other similar + - alerts. -} +data AlertName + = FileAlert TenseChunk + | SanityCheckFixAlert + | WarningAlert String + | PairAlert String + | XMPPNeededAlert + | RemoteRemovalAlert String + | CloudRepoNeededAlert + | SyncAlert + | NotFsckedAlert + | UpgradeAlert + deriving (Eq) + +{- The first alert is the new alert, the second is an old alert. + - Should return a modified version of the old alert. -} +type AlertCombiner = Alert -> Alert -> Maybe Alert + +data Alert = Alert + { alertClass :: AlertClass + , alertHeader :: Maybe TenseText + , alertMessageRender :: Alert -> TenseText + , alertData :: [TenseChunk] + , alertCounter :: Int + , alertBlockDisplay :: Bool + , alertClosable :: Bool + , alertPriority :: AlertPriority + , alertIcon :: Maybe AlertIcon + , alertCombiner :: Maybe AlertCombiner + , alertName :: Maybe AlertName + , alertButtons :: [AlertButton] + } + +data AlertIcon = ActivityIcon | SyncIcon | SuccessIcon | ErrorIcon | InfoIcon | UpgradeIcon | TheCloud + +type AlertMap = M.Map AlertId Alert + +{- Higher AlertId indicates a more recent alert. -} +newtype AlertId = AlertId Integer + deriving (Read, Show, Eq, Ord) + +firstAlertId :: AlertId +firstAlertId = AlertId 0 + +nextAlertId :: AlertId -> AlertId +nextAlertId (AlertId i) = AlertId $ succ i + +{- When clicked, a button always redirects to a URL + - It may also run an IO action in the background, which is useful + - to make the button close or otherwise change the alert. -} +data AlertButton = AlertButton + { buttonLabel :: Text + , buttonUrl :: Text + , buttonAction :: Maybe (AlertId -> IO ()) + , buttonPrimary :: Bool + } diff --git a/Assistant/Types/BranchChange.hs b/Assistant/Types/BranchChange.hs new file mode 100644 index 000000000..399abee54 --- /dev/null +++ b/Assistant/Types/BranchChange.hs @@ -0,0 +1,19 @@ +{- git-annex assistant git-annex branch change tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.BranchChange where + +import Control.Concurrent.MSampleVar +import Common.Annex + +newtype BranchChangeHandle = BranchChangeHandle (MSampleVar ()) + +newBranchChangeHandle :: IO BranchChangeHandle +newBranchChangeHandle = BranchChangeHandle <$> newEmptySV + +fromBranchChangeHandle :: BranchChangeHandle -> MSampleVar () +fromBranchChangeHandle (BranchChangeHandle v) = v diff --git a/Assistant/Types/Buddies.hs b/Assistant/Types/Buddies.hs new file mode 100644 index 000000000..36d8a4fed --- /dev/null +++ b/Assistant/Types/Buddies.hs @@ -0,0 +1,80 @@ +{- git-annex assistant buddies + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Types.Buddies where + +import Common.Annex + +import qualified Data.Map as M +import Control.Concurrent.STM +import Utility.NotificationBroadcaster +import Data.Text as T + +{- For simplicity, dummy types are defined even when XMPP is disabled. -} +#ifdef WITH_XMPP +import Network.Protocol.XMPP +import Data.Set as S +import Data.Ord + +newtype Client = Client JID + deriving (Eq, Show) + +instance Ord Client where + compare = comparing show + +data Buddy = Buddy + { buddyPresent :: S.Set Client + , buddyAway :: S.Set Client + , buddyAssistants :: S.Set Client + , buddyPairing :: Bool + } +#else +data Buddy = Buddy +#endif + deriving (Eq, Show) + +data BuddyKey = BuddyKey T.Text + deriving (Eq, Ord, Show, Read) + +data PairKey = PairKey UUID T.Text + deriving (Eq, Ord, Show, Read) + +type Buddies = M.Map BuddyKey Buddy + +{- A list of buddies, and a way to notify when it changes. -} +type BuddyList = (TMVar Buddies, NotificationBroadcaster) + +noBuddies :: Buddies +noBuddies = M.empty + +newBuddyList :: IO BuddyList +newBuddyList = (,) + <$> atomically (newTMVar noBuddies) + <*> newNotificationBroadcaster + +getBuddyList :: BuddyList -> IO [Buddy] +getBuddyList (v, _) = M.elems <$> atomically (readTMVar v) + +getBuddy :: BuddyKey -> BuddyList -> IO (Maybe Buddy) +getBuddy k (v, _) = M.lookup k <$> atomically (readTMVar v) + +getBuddyBroadcaster :: BuddyList -> NotificationBroadcaster +getBuddyBroadcaster (_, h) = h + +{- Applies a function to modify the buddy list, and if it's changed, + - sends notifications to any listeners. -} +updateBuddyList :: (Buddies -> Buddies) -> BuddyList -> IO () +updateBuddyList a (v, caster) = do + changed <- atomically $ do + buds <- takeTMVar v + let buds' = a buds + putTMVar v buds' + return $ buds /= buds' + when changed $ + sendNotification caster diff --git a/Assistant/Types/Changes.hs b/Assistant/Types/Changes.hs new file mode 100644 index 000000000..e8ecc6e48 --- /dev/null +++ b/Assistant/Types/Changes.hs @@ -0,0 +1,77 @@ +{- git-annex assistant change tracking + - + - Copyright 2012-2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.Changes where + +import Types.KeySource +import Types.Key +import Utility.TList + +import Control.Concurrent.STM +import Data.Time.Clock + +{- An un-ordered pool of Changes that have been noticed and should be + - staged and committed. Changes will typically be in order, but ordering + - may be lost. In any case, order should not matter, as any given Change + - may later be reverted by a later Change (ie, a file is added and then + - deleted). Code that processes the changes needs to deal with such + - scenarios. + -} +type ChangePool = TList Change + +newChangePool :: IO ChangePool +newChangePool = atomically newTList + +data Change + = Change + { changeTime :: UTCTime + , _changeFile :: FilePath + , changeInfo :: ChangeInfo + } + | PendingAddChange + { changeTime ::UTCTime + , _changeFile :: FilePath + } + | InProcessAddChange + { changeTime ::UTCTime + , keySource :: KeySource + } + deriving (Show) + +data ChangeInfo = AddKeyChange Key | AddFileChange | LinkChange (Maybe Key) | RmChange + deriving (Show, Eq, Ord) + +changeInfoKey :: ChangeInfo -> Maybe Key +changeInfoKey (AddKeyChange k) = Just k +changeInfoKey (LinkChange (Just k)) = Just k +changeInfoKey _ = Nothing + +changeFile :: Change -> FilePath +changeFile (Change _ f _) = f +changeFile (PendingAddChange _ f) = f +changeFile (InProcessAddChange _ ks) = keyFilename ks + +isPendingAddChange :: Change -> Bool +isPendingAddChange (PendingAddChange {}) = True +isPendingAddChange _ = False + +isInProcessAddChange :: Change -> Bool +isInProcessAddChange (InProcessAddChange {}) = True +isInProcessAddChange _ = False + +retryChange :: Change -> Change +retryChange (InProcessAddChange time ks) = + PendingAddChange time (keyFilename ks) +retryChange c = c + +finishedChange :: Change -> Key -> Change +finishedChange c@(InProcessAddChange { keySource = ks }) k = Change + { changeTime = changeTime c + , _changeFile = keyFilename ks + , changeInfo = AddKeyChange k + } +finishedChange c _ = c diff --git a/Assistant/Types/Commits.hs b/Assistant/Types/Commits.hs new file mode 100644 index 000000000..500faa901 --- /dev/null +++ b/Assistant/Types/Commits.hs @@ -0,0 +1,19 @@ +{- git-annex assistant commit tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.Commits where + +import Utility.TList + +import Control.Concurrent.STM + +type CommitChan = TList Commit + +data Commit = Commit + +newCommitChan :: IO CommitChan +newCommitChan = atomically newTList diff --git a/Assistant/Types/DaemonStatus.hs b/Assistant/Types/DaemonStatus.hs new file mode 100644 index 000000000..a618c700d --- /dev/null +++ b/Assistant/Types/DaemonStatus.hs @@ -0,0 +1,119 @@ +{- git-annex assistant daemon status + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.DaemonStatus where + +import Common.Annex +import Assistant.Pairing +import Utility.NotificationBroadcaster +import Logs.Transfer +import Assistant.Types.ThreadName +import Assistant.Types.NetMessager +import Assistant.Types.Alert +import Utility.Url + +import Control.Concurrent.STM +import Control.Concurrent.MVar +import Control.Concurrent.Async +import Data.Time.Clock.POSIX +import qualified Data.Map as M +import qualified Data.Set as S + +data DaemonStatus = DaemonStatus + -- All the named threads that comprise the daemon, + -- and actions to run to restart them. + { startedThreads :: M.Map ThreadName (Async (), IO ()) + -- False when the daemon is performing its startup scan + , scanComplete :: Bool + -- True when all files should be restaged. + , forceRestage :: Bool + -- Time when a previous process of the daemon was running ok + , lastRunning :: Maybe POSIXTime + -- True when the daily sanity checker is running + , sanityCheckRunning :: Bool + -- Last time the daily sanity checker ran + , lastSanityCheck :: Maybe POSIXTime + -- True when a scan for file transfers is running + , transferScanRunning :: Bool + -- Currently running file content transfers + , currentTransfers :: TransferMap + -- Messages to display to the user. + , alertMap :: AlertMap + , lastAlertId :: AlertId + -- Ordered list of all remotes that can be synced with + , syncRemotes :: [Remote] + -- Ordered list of remotes to sync git with + , syncGitRemotes :: [Remote] + -- Ordered list of remotes to sync data with + , syncDataRemotes :: [Remote] + -- Are we syncing to any cloud remotes? + , syncingToCloudRemote :: Bool + -- List of uuids of remotes that we may have gotten out of sync with. + , desynced :: S.Set UUID + -- Pairing request that is in progress. + , pairingInProgress :: Maybe PairingInProgress + -- Broadcasts notifications about all changes to the DaemonStatus. + , changeNotifier :: NotificationBroadcaster + -- Broadcasts notifications when queued or current transfers change. + , transferNotifier :: NotificationBroadcaster + -- Broadcasts notifications when there's a change to the alerts. + , alertNotifier :: NotificationBroadcaster + -- Broadcasts notifications when the syncRemotes change. + , syncRemotesNotifier :: NotificationBroadcaster + -- Broadcasts notifications when the scheduleLog changes. + , scheduleLogNotifier :: NotificationBroadcaster + -- Broadcasts a notification once the startup sanity check has run. + , startupSanityCheckNotifier :: NotificationBroadcaster + -- Broadcasts notifications when the network is connected. + , networkConnectedNotifier :: NotificationBroadcaster + -- Broadcasts notifications when a global redirect is needed. + , globalRedirNotifier :: NotificationBroadcaster + , globalRedirUrl :: Maybe URLString + -- Actions to run after a Key is transferred. + , transferHook :: M.Map Key (Transfer -> IO ()) + -- When the XMPP client is connected, this will contain the XMPP + -- address. + , xmppClientID :: Maybe ClientID + -- MVars to signal when a remote gets connected. + , connectRemoteNotifiers :: M.Map UUID [MVar ()] + } + +type TransferMap = M.Map Transfer TransferInfo + +{- This TMVar is never left empty, so accessing it will never block. -} +type DaemonStatusHandle = TMVar DaemonStatus + +newDaemonStatus :: IO DaemonStatus +newDaemonStatus = DaemonStatus + <$> pure M.empty + <*> pure False + <*> pure False + <*> pure Nothing + <*> pure False + <*> pure Nothing + <*> pure False + <*> pure M.empty + <*> pure M.empty + <*> pure firstAlertId + <*> pure [] + <*> pure [] + <*> pure [] + <*> pure False + <*> pure S.empty + <*> pure Nothing + <*> newNotificationBroadcaster + <*> newNotificationBroadcaster + <*> newNotificationBroadcaster + <*> newNotificationBroadcaster + <*> newNotificationBroadcaster + <*> newNotificationBroadcaster + <*> newNotificationBroadcaster + <*> newNotificationBroadcaster + <*> pure Nothing + <*> pure M.empty + <*> pure Nothing + <*> pure M.empty diff --git a/Assistant/Types/NamedThread.hs b/Assistant/Types/NamedThread.hs new file mode 100644 index 000000000..5dd1364ad --- /dev/null +++ b/Assistant/Types/NamedThread.hs @@ -0,0 +1,21 @@ +{- named threads + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.NamedThread where + +import Assistant.Monad +import Assistant.Types.ThreadName + +{- Information about a named thread that can be run. -} +data NamedThread = NamedThread Bool ThreadName (Assistant ()) + +namedThread :: String -> Assistant () -> NamedThread +namedThread = NamedThread True . ThreadName + +{- A named thread that can start running before the startup sanity check. -} +namedThreadUnchecked :: String -> Assistant () -> NamedThread +namedThreadUnchecked = NamedThread False . ThreadName diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs new file mode 100644 index 000000000..0af262e9a --- /dev/null +++ b/Assistant/Types/NetMessager.hs @@ -0,0 +1,155 @@ +{- git-annex assistant out of band network messager types + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.NetMessager where + +import Common.Annex +import Assistant.Pairing +import Git.Types + +import qualified Data.Text as T +import qualified Data.Set as S +import qualified Data.Map as M +import qualified Data.DList as D +import Control.Concurrent.STM +import Control.Concurrent.MSampleVar +import Data.ByteString (ByteString) +import qualified Data.ByteString.Char8 as B8 +import Data.Text (Text) + +{- Messages that can be sent out of band by a network messager. -} +data NetMessage + -- indicate that pushes have been made to the repos with these uuids + = NotifyPush [UUID] + -- requests other clients to inform us of their presence + | QueryPresence + -- notification about a stage in the pairing process, + -- involving a client, and a UUID. + | PairingNotification PairStage ClientID UUID + -- used for git push over the network messager + | Pushing ClientID PushStage + deriving (Show, Eq, Ord) + +{- Something used to identify the client, or clients to send the message to. -} +type ClientID = Text + +data PushStage + -- indicates that we have data to push over the out of band network + = CanPush UUID [Sha] + -- request that a git push be sent over the out of band network + | PushRequest UUID + -- indicates that a push is starting + | StartingPush UUID + -- a chunk of output of git receive-pack + | ReceivePackOutput SequenceNum ByteString + -- a chuck of output of git send-pack + | SendPackOutput SequenceNum ByteString + -- sent when git receive-pack exits, with its exit code + | ReceivePackDone ExitCode + deriving (Show, Eq, Ord) + +{- A sequence number. Incremented by one per packet in a sequence, + - starting with 1 for the first packet. 0 means sequence numbers are + - not being used. -} +type SequenceNum = Int + +{- NetMessages that are important (and small), and should be stored to be + - resent when new clients are seen. -} +isImportantNetMessage :: NetMessage -> Maybe ClientID +isImportantNetMessage (Pushing c (CanPush _ _)) = Just c +isImportantNetMessage (Pushing c (PushRequest _)) = Just c +isImportantNetMessage _ = Nothing + +{- Checks if two important NetMessages are equivilant. + - That is to say, assuming they were sent to the same client, + - would it do the same thing for one as for the other? -} +equivilantImportantNetMessages :: NetMessage -> NetMessage -> Bool +equivilantImportantNetMessages (Pushing _ (CanPush _ _)) (Pushing _ (CanPush _ _)) = True +equivilantImportantNetMessages (Pushing _ (PushRequest _)) (Pushing _ (PushRequest _)) = True +equivilantImportantNetMessages _ _ = False + +readdressNetMessage :: NetMessage -> ClientID -> NetMessage +readdressNetMessage (PairingNotification stage _ uuid) c = PairingNotification stage c uuid +readdressNetMessage (Pushing _ stage) c = Pushing c stage +readdressNetMessage m _ = m + +{- Convert a NetMessage to something that can be logged. -} +logNetMessage :: NetMessage -> String +logNetMessage (Pushing c stage) = show $ Pushing (logClientID c) $ + case stage of + ReceivePackOutput n _ -> ReceivePackOutput n elided + SendPackOutput n _ -> SendPackOutput n elided + s -> s + where + elided = B8.pack "<elided>" +logNetMessage (PairingNotification stage c uuid) = + show $ PairingNotification stage (logClientID c) uuid +logNetMessage m = show m + +logClientID :: ClientID -> ClientID +logClientID c = T.concat [T.take 1 c, T.pack $ show $ T.length c] + +{- Things that initiate either side of a push, but do not actually send data. -} +isPushInitiation :: PushStage -> Bool +isPushInitiation (PushRequest _) = True +isPushInitiation (StartingPush _) = True +isPushInitiation _ = False + +isPushNotice :: PushStage -> Bool +isPushNotice (CanPush _ _) = True +isPushNotice _ = False + +data PushSide = SendPack | ReceivePack + deriving (Eq, Ord, Show) + +pushDestinationSide :: PushStage -> PushSide +pushDestinationSide (CanPush _ _) = ReceivePack +pushDestinationSide (PushRequest _) = SendPack +pushDestinationSide (StartingPush _) = ReceivePack +pushDestinationSide (ReceivePackOutput _ _) = SendPack +pushDestinationSide (SendPackOutput _ _) = ReceivePack +pushDestinationSide (ReceivePackDone _) = SendPack + +type SideMap a = PushSide -> a + +mkSideMap :: STM a -> IO (SideMap a) +mkSideMap gen = do + (sp, rp) <- atomically $ (,) <$> gen <*> gen + return $ lookupside sp rp + where + lookupside sp _ SendPack = sp + lookupside _ rp ReceivePack = rp + +getSide :: PushSide -> SideMap a -> a +getSide side m = m side + +type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage)) + +data NetMessager = NetMessager + -- outgoing messages + { netMessages :: TChan NetMessage + -- important messages for each client + , importantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage)) + -- important messages that are believed to have been sent to a client + , sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage)) + -- write to this to restart the net messager + , netMessagerRestart :: MSampleVar () + -- queue of incoming messages that request the initiation of pushes + , netMessagerPushInitiations :: SideMap (TMVar [NetMessage]) + -- incoming messages containing data for a running + -- (or not yet started) push + , netMessagerInboxes :: SideMap Inboxes + } + +newNetMessager :: IO NetMessager +newNetMessager = NetMessager + <$> atomically newTChan + <*> atomically (newTMVar M.empty) + <*> atomically (newTMVar M.empty) + <*> newEmptySV + <*> mkSideMap newEmptyTMVar + <*> mkSideMap (newTVar M.empty) diff --git a/Assistant/Types/Pushes.hs b/Assistant/Types/Pushes.hs new file mode 100644 index 000000000..99e0ee162 --- /dev/null +++ b/Assistant/Types/Pushes.hs @@ -0,0 +1,24 @@ +{- git-annex assistant push tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.Pushes where + +import Common.Annex + +import Control.Concurrent.STM +import Data.Time.Clock +import qualified Data.Map as M + +{- Track the most recent push failure for each remote. -} +type PushMap = M.Map Remote UTCTime +type FailedPushMap = TMVar PushMap + +{- The TMVar starts empty, and is left empty when there are no + - failed pushes. This way we can block until there are some failed pushes. + -} +newFailedPushMap :: IO FailedPushMap +newFailedPushMap = atomically newEmptyTMVar diff --git a/Assistant/Types/RepoProblem.hs b/Assistant/Types/RepoProblem.hs new file mode 100644 index 000000000..ece5a5286 --- /dev/null +++ b/Assistant/Types/RepoProblem.hs @@ -0,0 +1,28 @@ +{- git-annex assistant repository problem tracking + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.RepoProblem where + +import Types +import Utility.TList + +import Control.Concurrent.STM +import Data.Function + +data RepoProblem = RepoProblem + { problemUUID :: UUID + , afterFix :: IO () + } + +{- The afterFix actions are assumed to all be equivilant. -} +sameRepoProblem :: RepoProblem -> RepoProblem -> Bool +sameRepoProblem = (==) `on` problemUUID + +type RepoProblemChan = TList RepoProblem + +newRepoProblemChan :: IO RepoProblemChan +newRepoProblemChan = atomically newTList diff --git a/Assistant/Types/ScanRemotes.hs b/Assistant/Types/ScanRemotes.hs new file mode 100644 index 000000000..8219f9baf --- /dev/null +++ b/Assistant/Types/ScanRemotes.hs @@ -0,0 +1,25 @@ +{- git-annex assistant remotes needing scanning + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.ScanRemotes where + +import Common.Annex + +import Control.Concurrent.STM +import qualified Data.Map as M + +data ScanInfo = ScanInfo + { scanPriority :: Float + , fullScan :: Bool + } + +type ScanRemoteMap = TMVar (M.Map Remote ScanInfo) + +{- The TMVar starts empty, and is left empty when there are no remotes + - to scan. -} +newScanRemoteMap :: IO ScanRemoteMap +newScanRemoteMap = atomically newEmptyTMVar diff --git a/Assistant/Types/ThreadName.hs b/Assistant/Types/ThreadName.hs new file mode 100644 index 000000000..c8d264a38 --- /dev/null +++ b/Assistant/Types/ThreadName.hs @@ -0,0 +1,14 @@ +{- name of a thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.ThreadName where + +newtype ThreadName = ThreadName String + deriving (Eq, Read, Show, Ord) + +fromThreadName :: ThreadName -> String +fromThreadName (ThreadName n) = n diff --git a/Assistant/Types/ThreadedMonad.hs b/Assistant/Types/ThreadedMonad.hs new file mode 100644 index 000000000..1a2aa7eb7 --- /dev/null +++ b/Assistant/Types/ThreadedMonad.hs @@ -0,0 +1,38 @@ +{- making the Annex monad available across threads + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.ThreadedMonad where + +import Common.Annex +import qualified Annex + +import Control.Concurrent +import Data.Tuple + +{- The Annex state is stored in a MVar, so that threaded actions can access + - it. -} +type ThreadState = MVar Annex.AnnexState + +{- Stores the Annex state in a MVar. + - + - Once the action is finished, retrieves the state from the MVar. + -} +withThreadState :: (ThreadState -> Annex a) -> Annex a +withThreadState a = do + state <- Annex.getState id + mvar <- liftIO $ newMVar state + r <- a mvar + newstate <- liftIO $ takeMVar mvar + Annex.changeState (const newstate) + return r + +{- Runs an Annex action, using the state from the MVar. + - + - This serializes calls by threads; only one thread can run in Annex at a + - time. -} +runThreadState :: ThreadState -> Annex a -> IO a +runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a diff --git a/Assistant/Types/TransferQueue.hs b/Assistant/Types/TransferQueue.hs new file mode 100644 index 000000000..e4bf2ae92 --- /dev/null +++ b/Assistant/Types/TransferQueue.hs @@ -0,0 +1,29 @@ +{- git-annex assistant pending transfer queue + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.TransferQueue where + +import Common.Annex +import Logs.Transfer + +import Control.Concurrent.STM +import Utility.TList + +data TransferQueue = TransferQueue + { queuesize :: TVar Int + , queuelist :: TList (Transfer, TransferInfo) + , deferreddownloads :: TList (Key, AssociatedFile) + } + +data Schedule = Next | Later + deriving (Eq) + +newTransferQueue :: IO TransferQueue +newTransferQueue = atomically $ TransferQueue + <$> newTVar 0 + <*> newTList + <*> newTList diff --git a/Assistant/Types/TransferSlots.hs b/Assistant/Types/TransferSlots.hs new file mode 100644 index 000000000..5140995a3 --- /dev/null +++ b/Assistant/Types/TransferSlots.hs @@ -0,0 +1,34 @@ +{- git-annex assistant transfer slots + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE DeriveDataTypeable #-} + +module Assistant.Types.TransferSlots where + +import qualified Control.Exception as E +import qualified Control.Concurrent.MSemN as MSemN +import Data.Typeable + +type TransferSlots = MSemN.MSemN Int + +{- A special exception that can be thrown to pause or resume a transfer, while + - keeping its slot in use. -} +data TransferException = PauseTransfer | ResumeTransfer + deriving (Show, Eq, Typeable) + +instance E.Exception TransferException + +{- Number of concurrent transfers allowed to be run from the assistant. + - + - Transfers launched by other means, including by remote assistants, + - do not currently take up slots. + -} +numSlots :: Int +numSlots = 1 + +newTransferSlots :: IO TransferSlots +newTransferSlots = MSemN.new numSlots diff --git a/Assistant/Types/TransferrerPool.hs b/Assistant/Types/TransferrerPool.hs new file mode 100644 index 000000000..2727a6919 --- /dev/null +++ b/Assistant/Types/TransferrerPool.hs @@ -0,0 +1,23 @@ +{- A pool of "git-annex transferkeys" processes + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.TransferrerPool where + +import Common.Annex + +import Control.Concurrent.STM + +type TransferrerPool = TChan Transferrer + +data Transferrer = Transferrer + { transferrerRead :: Handle + , transferrerWrite :: Handle + , transferrerHandle :: ProcessHandle + } + +newTransferrerPool :: IO TransferrerPool +newTransferrerPool = newTChanIO diff --git a/Assistant/Types/UrlRenderer.hs b/Assistant/Types/UrlRenderer.hs new file mode 100644 index 000000000..521905bf3 --- /dev/null +++ b/Assistant/Types/UrlRenderer.hs @@ -0,0 +1,26 @@ +{- webapp url renderer access from the assistant + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Types.UrlRenderer ( + UrlRenderer, + newUrlRenderer +) where + +#ifdef WITH_WEBAPP + +import Assistant.WebApp (UrlRenderer, newUrlRenderer) + +#else + +data UrlRenderer = UrlRenderer -- dummy type + +newUrlRenderer :: IO UrlRenderer +newUrlRenderer = return UrlRenderer + +#endif diff --git a/Assistant/XMPP.hs b/Assistant/XMPP.hs new file mode 100644 index 000000000..09b7daf4e --- /dev/null +++ b/Assistant/XMPP.hs @@ -0,0 +1,273 @@ +{- core xmpp support + - + - Copyright 2012-2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE OverloadedStrings #-} + +module Assistant.XMPP where + +import Assistant.Common +import Assistant.Types.NetMessager +import Assistant.Pairing +import Git.Sha (extractSha) + +import Network.Protocol.XMPP hiding (Node) +import Data.Text (Text) +import qualified Data.Text as T +import qualified Data.Map as M +import Data.ByteString (ByteString) +import qualified Data.ByteString as B +import Data.XML.Types +import qualified "dataenc" Codec.Binary.Base64 as B64 + +{- Name of the git-annex tag, in our own XML namespace. + - (Not using a namespace URL to avoid unnecessary bloat.) -} +gitAnnexTagName :: Name +gitAnnexTagName = "{git-annex}git-annex" + +{- Creates a git-annex tag containing a particular attribute and value. -} +gitAnnexTag :: Name -> Text -> Element +gitAnnexTag attr val = gitAnnexTagContent attr val [] + +{- Also with some content. -} +gitAnnexTagContent :: Name -> Text -> [Node] -> Element +gitAnnexTagContent attr val = Element gitAnnexTagName [(attr, [ContentText val])] + +isGitAnnexTag :: Element -> Bool +isGitAnnexTag t = elementName t == gitAnnexTagName + +{- Things that a git-annex tag can inserted into. -} +class GitAnnexTaggable a where + insertGitAnnexTag :: a -> Element -> a + + extractGitAnnexTag :: a -> Maybe Element + + hasGitAnnexTag :: a -> Bool + hasGitAnnexTag = isJust . extractGitAnnexTag + +instance GitAnnexTaggable Message where + insertGitAnnexTag m elt = m { messagePayloads = elt : messagePayloads m } + extractGitAnnexTag = headMaybe . filter isGitAnnexTag . messagePayloads + +instance GitAnnexTaggable Presence where + -- always mark extended away and set presence priority to negative + insertGitAnnexTag p elt = p + { presencePayloads = extendedAway : negativePriority : elt : presencePayloads p } + extractGitAnnexTag = headMaybe . filter isGitAnnexTag . presencePayloads + +data GitAnnexTagInfo = GitAnnexTagInfo + { tagAttr :: Name + , tagValue :: Text + , tagElement :: Element + } + +type Decoder = Message -> GitAnnexTagInfo -> Maybe NetMessage + +gitAnnexTagInfo :: GitAnnexTaggable a => a -> Maybe GitAnnexTagInfo +gitAnnexTagInfo v = case extractGitAnnexTag v of + {- Each git-annex tag has a single attribute. -} + Just (tag@(Element _ [(attr, _)] _)) -> GitAnnexTagInfo + <$> pure attr + <*> attributeText attr tag + <*> pure tag + _ -> Nothing + +{- A presence with a git-annex tag in it. + - Also includes a status tag, which may be visible in XMPP clients. -} +gitAnnexPresence :: Element -> Presence +gitAnnexPresence = insertGitAnnexTag $ addStatusTag $ emptyPresence PresenceAvailable + where + addStatusTag p = p + { presencePayloads = status : presencePayloads p } + status = Element "status" [] [statusMessage] + statusMessage = NodeContent $ ContentText $ T.pack "git-annex" + +{- A presence with an empty git-annex tag in it, used for letting other + - clients know we're around and are a git-annex client. -} +gitAnnexSignature :: Presence +gitAnnexSignature = gitAnnexPresence $ Element gitAnnexTagName [] [] + +{- XMPP client to server ping -} +xmppPing :: JID -> IQ +xmppPing selfjid = (emptyIQ IQGet) + { iqID = Just "c2s1" + , iqFrom = Just selfjid + , iqTo = Just $ JID Nothing (jidDomain selfjid) Nothing + , iqPayload = Just $ Element xmppPingTagName [] [] + } + +xmppPingTagName :: Name +xmppPingTagName = "{urn:xmpp}ping" + +{- A message with a git-annex tag in it. -} +gitAnnexMessage :: Element -> JID -> JID -> Message +gitAnnexMessage elt tojid fromjid = (insertGitAnnexTag silentMessage elt) + { messageTo = Just tojid + , messageFrom = Just fromjid + } + +{- A notification that we've pushed to some repositories, listing their + - UUIDs. -} +pushNotification :: [UUID] -> Presence +pushNotification = gitAnnexPresence . gitAnnexTag pushAttr . encodePushNotification + +encodePushNotification :: [UUID] -> Text +encodePushNotification = T.intercalate uuidSep . map (T.pack . fromUUID) + +decodePushNotification :: Text -> [UUID] +decodePushNotification = map (toUUID . T.unpack) . T.splitOn uuidSep + +uuidSep :: Text +uuidSep = "," + +{- A request for other git-annex clients to send presence. -} +presenceQuery :: Presence +presenceQuery = gitAnnexPresence $ gitAnnexTag queryAttr T.empty + +{- A notification about a stage of pairing. -} +pairingNotification :: PairStage -> UUID -> JID -> JID -> Message +pairingNotification pairstage u = gitAnnexMessage $ + gitAnnexTag pairAttr $ encodePairingNotification pairstage u + +encodePairingNotification :: PairStage -> UUID -> Text +encodePairingNotification pairstage u = T.unwords $ map T.pack + [ show pairstage + , fromUUID u + ] + +decodePairingNotification :: Decoder +decodePairingNotification m = parse . words . T.unpack . tagValue + where + parse [stage, u] = PairingNotification + <$> readish stage + <*> (formatJID <$> messageFrom m) + <*> pure (toUUID u) + parse _ = Nothing + +pushMessage :: PushStage -> JID -> JID -> Message +pushMessage = gitAnnexMessage . encode + where + encode (CanPush u shas) = + gitAnnexTag canPushAttr $ T.pack $ unwords $ + fromUUID u : map show shas + encode (PushRequest u) = + gitAnnexTag pushRequestAttr $ T.pack $ fromUUID u + encode (StartingPush u) = + gitAnnexTag startingPushAttr $ T.pack $ fromUUID u + encode (ReceivePackOutput n b) = + gitAnnexTagContent receivePackAttr (val n) $ encodeTagContent b + encode (SendPackOutput n b) = + gitAnnexTagContent sendPackAttr (val n) $ encodeTagContent b + encode (ReceivePackDone code) = + gitAnnexTag receivePackDoneAttr $ val $ encodeExitCode code + val = T.pack . show + +decodeMessage :: Message -> Maybe NetMessage +decodeMessage m = decode =<< gitAnnexTagInfo m + where + decode i = M.lookup (tagAttr i) decoders >>= rundecoder i + rundecoder i d = d m i + decoders = M.fromList $ zip + [ pairAttr + , canPushAttr + , pushRequestAttr + , startingPushAttr + , receivePackAttr + , sendPackAttr + , receivePackDoneAttr + ] + [ decodePairingNotification + , pushdecoder $ shasgen CanPush + , pushdecoder $ gen PushRequest + , pushdecoder $ gen StartingPush + , pushdecoder $ seqgen ReceivePackOutput + , pushdecoder $ seqgen SendPackOutput + , pushdecoder $ + fmap (ReceivePackDone . decodeExitCode) . readish . + T.unpack . tagValue + ] + pushdecoder a m' i = Pushing + <$> (formatJID <$> messageFrom m') + <*> a i + gen c i = c . toUUID <$> headMaybe (words (T.unpack (tagValue i))) + seqgen c i = do + packet <- decodeTagContent $ tagElement i + let seqnum = fromMaybe 0 $ readish $ T.unpack $ tagValue i + return $ c seqnum packet + shasgen c i = do + let (u:shas) = words $ T.unpack $ tagValue i + return $ c (toUUID u) (mapMaybe extractSha shas) + +decodeExitCode :: Int -> ExitCode +decodeExitCode 0 = ExitSuccess +decodeExitCode n = ExitFailure n + +encodeExitCode :: ExitCode -> Int +encodeExitCode ExitSuccess = 0 +encodeExitCode (ExitFailure n) = n + +{- Base 64 encoding a ByteString to use as the content of a tag. -} +encodeTagContent :: ByteString -> [Node] +encodeTagContent b = [NodeContent $ ContentText $ T.pack $ B64.encode $ B.unpack b] + +decodeTagContent :: Element -> Maybe ByteString +decodeTagContent elt = B.pack <$> B64.decode s + where + s = T.unpack $ T.concat $ elementText elt + +{- The JID without the client part. -} +baseJID :: JID -> JID +baseJID j = JID (jidNode j) (jidDomain j) Nothing + +{- An XMPP chat message with an empty body. This should not be displayed + - by clients, but can be used for communications. -} +silentMessage :: Message +silentMessage = (emptyMessage MessageChat) + { messagePayloads = [ emptybody ] } + where + emptybody = Element + { elementName = "body" + , elementAttributes = [] + , elementNodes = [] + } + +{- Add to a presence to mark its client as extended away. -} +extendedAway :: Element +extendedAway = Element "show" [] [NodeContent $ ContentText "xa"] + +{- Add to a presence to give it a negative priority. -} +negativePriority :: Element +negativePriority = Element "priority" [] [NodeContent $ ContentText "-1"] + +pushAttr :: Name +pushAttr = "push" + +queryAttr :: Name +queryAttr = "query" + +pairAttr :: Name +pairAttr = "pair" + +canPushAttr :: Name +canPushAttr = "canpush" + +pushRequestAttr :: Name +pushRequestAttr = "pushrequest" + +startingPushAttr :: Name +startingPushAttr = "startingpush" + +receivePackAttr :: Name +receivePackAttr = "rp" + +sendPackAttr :: Name +sendPackAttr = "sp" + +receivePackDoneAttr :: Name +receivePackDoneAttr = "rpdone" + +shasAttr :: Name +shasAttr = "shas" diff --git a/Assistant/XMPP/Buddies.hs b/Assistant/XMPP/Buddies.hs new file mode 100644 index 000000000..0c466e51c --- /dev/null +++ b/Assistant/XMPP/Buddies.hs @@ -0,0 +1,87 @@ +{- xmpp buddies + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.XMPP.Buddies where + +import Assistant.XMPP +import Common.Annex +import Assistant.Types.Buddies + +import Network.Protocol.XMPP +import qualified Data.Map as M +import qualified Data.Set as S +import Data.Text (Text) +import qualified Data.Text as T + +genBuddyKey :: JID -> BuddyKey +genBuddyKey j = BuddyKey $ formatJID $ baseJID j + +buddyName :: JID -> Text +buddyName j = maybe (T.pack "") strNode (jidNode j) + +ucFirst :: Text -> Text +ucFirst s = let (first, rest) = T.splitAt 1 s + in T.concat [T.toUpper first, rest] + +{- Summary of info about a buddy. + - + - If the buddy has no clients at all anymore, returns Nothing. -} +buddySummary :: [JID] -> Buddy -> Maybe (Text, Bool, Bool, Bool, BuddyKey) +buddySummary pairedwith b = case clients of + ((Client j):_) -> Just (buddyName j, away, canpair, alreadypaired j, genBuddyKey j) + [] -> Nothing + where + away = S.null (buddyPresent b) && S.null (buddyAssistants b) + canpair = not $ S.null (buddyAssistants b) + clients = S.toList $ buddyPresent b `S.union` buddyAway b `S.union` buddyAssistants b + alreadypaired j = baseJID j `elem` pairedwith + +{- Updates the buddies with XMPP presence info. -} +updateBuddies :: Presence -> Buddies -> Buddies +updateBuddies p@(Presence { presenceFrom = Just jid }) = M.alter update key + where + key = genBuddyKey jid + update (Just b) = Just $ applyPresence p b + update Nothing = newBuddy p +updateBuddies _ = id + +{- Creates a new buddy based on XMPP presence info. -} +newBuddy :: Presence -> Maybe Buddy +newBuddy p + | presenceType p == PresenceAvailable = go + | presenceType p == PresenceUnavailable = go + | otherwise = Nothing + where + go = make <$> presenceFrom p + make _jid = applyPresence p $ Buddy + { buddyPresent = S.empty + , buddyAway = S.empty + , buddyAssistants = S.empty + , buddyPairing = False + } + +applyPresence :: Presence -> Buddy -> Buddy +applyPresence p b = fromMaybe b $! go <$> presenceFrom p + where + go jid + | presenceType p == PresenceUnavailable = b + { buddyAway = addto $ buddyAway b + , buddyPresent = removefrom $ buddyPresent b + , buddyAssistants = removefrom $ buddyAssistants b + } + | hasGitAnnexTag p = b + { buddyAssistants = addto $ buddyAssistants b + , buddyAway = removefrom $ buddyAway b } + | presenceType p == PresenceAvailable = b + { buddyPresent = addto $ buddyPresent b + , buddyAway = removefrom $ buddyAway b + } + | otherwise = b + where + client = Client jid + removefrom = S.filter (/= client) + addto = S.insert client diff --git a/Assistant/XMPP/Client.hs b/Assistant/XMPP/Client.hs new file mode 100644 index 000000000..677bb2ff3 --- /dev/null +++ b/Assistant/XMPP/Client.hs @@ -0,0 +1,84 @@ +{- xmpp client support + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.XMPP.Client where + +import Assistant.Common +import Utility.SRV +import Creds + +import Network.Protocol.XMPP +import Network +import Control.Concurrent +import qualified Data.Text as T +import Control.Exception (SomeException) + +{- Everything we need to know to connect to an XMPP server. -} +data XMPPCreds = XMPPCreds + { xmppUsername :: T.Text + , xmppPassword :: T.Text + , xmppHostname :: HostName + , xmppPort :: Int + , xmppJID :: T.Text + } + deriving (Read, Show) + +connectXMPP :: XMPPCreds -> (JID -> XMPP a) -> IO [(HostPort, Either SomeException ())] +connectXMPP c a = case parseJID (xmppJID c) of + Nothing -> error "bad JID" + Just jid -> connectXMPP' jid c a + +{- Do a SRV lookup, but if it fails, fall back to the cached xmppHostname. -} +connectXMPP' :: JID -> XMPPCreds -> (JID -> XMPP a) -> IO [(HostPort, Either SomeException ())] +connectXMPP' jid c a = reverse <$> (handle =<< lookupSRV srvrecord) + where + srvrecord = mkSRVTcp "xmpp-client" $ + T.unpack $ strDomain $ jidDomain jid + serverjid = JID Nothing (jidDomain jid) Nothing + + handle [] = do + let h = xmppHostname c + let p = PortNumber $ fromIntegral $ xmppPort c + r <- run h p $ a jid + return [r] + handle srvs = go [] srvs + + go l [] = return l + go l ((h,p):rest) = do + {- Try each SRV record in turn, until one connects, + - at which point the MVar will be full. -} + mv <- newEmptyMVar + r <- run h p $ do + liftIO $ putMVar mv () + a jid + ifM (isEmptyMVar mv) + ( go (r : l) rest + , return (r : l) + ) + + {- Async exceptions are let through so the XMPP thread can + - be killed. -} + run h p a' = do + r <- tryNonAsync $ + runClientError (Server serverjid h p) jid + (xmppUsername c) (xmppPassword c) (void a') + return ((h, p), r) + +{- XMPP runClient, that throws errors rather than returning an Either -} +runClientError :: Server -> JID -> T.Text -> T.Text -> XMPP a -> IO a +runClientError s j u p x = either (error . show) return =<< runClient s j u p x + +getXMPPCreds :: Annex (Maybe XMPPCreds) +getXMPPCreds = parse <$> readCacheCreds xmppCredsFile + where + parse s = readish =<< s + +setXMPPCreds :: XMPPCreds -> Annex () +setXMPPCreds creds = writeCacheCreds (show creds) xmppCredsFile + +xmppCredsFile :: FilePath +xmppCredsFile = "xmpp" diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs new file mode 100644 index 000000000..97b974f82 --- /dev/null +++ b/Assistant/XMPP/Git.hs @@ -0,0 +1,382 @@ +{- git over XMPP + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.XMPP.Git where + +import Assistant.Common +import Assistant.NetMessager +import Assistant.Types.NetMessager +import Assistant.XMPP +import Assistant.XMPP.Buddies +import Assistant.DaemonStatus +import Assistant.Alert +import Assistant.MakeRemote +import Assistant.Sync +import qualified Command.Sync +import qualified Annex.Branch +import Annex.UUID +import Logs.UUID +import Annex.TaggedPush +import Annex.CatFile +import Config +import Git +import qualified Git.Branch +import Config.Files +import qualified Types.Remote as Remote +import qualified Remote as Remote +import Remote.List +import Utility.FileMode +import Utility.Shell +import Utility.Env + +import Network.Protocol.XMPP +import qualified Data.Text as T +import System.Posix.Types +import System.Process (std_in, std_out, std_err) +import Control.Concurrent +import System.Timeout +import qualified Data.ByteString as B +import qualified Data.Map as M + +{- Largest chunk of data to send in a single XMPP message. -} +chunkSize :: Int +chunkSize = 4096 + +{- How long to wait for an expected message before assuming the other side + - has gone away and canceling a push. + - + - This needs to be long enough to allow a message of up to 2+ times + - chunkSize to propigate up to a XMPP server, perhaps across to another + - server, and back down to us. On the other hand, other XMPP pushes can be + - delayed for running until the timeout is reached, so it should not be + - excessive. + -} +xmppTimeout :: Int +xmppTimeout = 120000000 -- 120 seconds + +finishXMPPPairing :: JID -> UUID -> Assistant () +finishXMPPPairing jid u = void $ alertWhile alert $ + makeXMPPGitRemote buddy (baseJID jid) u + where + buddy = T.unpack $ buddyName jid + alert = pairRequestAcknowledgedAlert buddy Nothing + +gitXMPPLocation :: JID -> String +gitXMPPLocation jid = "xmpp::" ++ T.unpack (formatJID $ baseJID jid) + +makeXMPPGitRemote :: String -> JID -> UUID -> Assistant Bool +makeXMPPGitRemote buddyname jid u = do + remote <- liftAnnex $ addRemote $ + makeGitRemote buddyname $ gitXMPPLocation jid + liftAnnex $ storeUUID (remoteConfig (Remote.repo remote) "uuid") u + liftAnnex $ void remoteListRefresh + remote' <- liftAnnex $ fromMaybe (error "failed to add remote") + <$> Remote.byName (Just buddyname) + syncRemote remote' + return True + +{- Pushes over XMPP, communicating with a specific client. + - Runs an arbitrary IO action to push, which should run git-push with + - an xmpp:: url. + - + - To handle xmpp:: urls, git push will run git-remote-xmpp, which is + - injected into its PATH, and in turn runs git-annex xmppgit. The + - dataflow them becomes: + - + - git push <--> git-annex xmppgit <--> xmppPush <-------> xmpp + - | + - git receive-pack <--> xmppReceivePack <---------------> xmpp + - + - The pipe between git-annex xmppgit and us is set up and communicated + - using two environment variables, relayIn and relayOut, that are set + - to the file descriptors to use. Another, relayControl, is used to + - propigate the exit status of git receive-pack. + - + - We listen at the other end of the pipe and relay to and from XMPP. + -} +xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool +xmppPush cid gitpush = do + u <- liftAnnex getUUID + sendNetMessage $ Pushing cid (StartingPush u) + + (Fd inf, writepush) <- liftIO createPipe + (readpush, Fd outf) <- liftIO createPipe + (Fd controlf, writecontrol) <- liftIO createPipe + + tmpdir <- gettmpdir + installwrapper tmpdir + + env <- liftIO getEnvironment + path <- liftIO getSearchPath + let myenv = M.fromList + [ ("PATH", intercalate [searchPathSeparator] $ tmpdir:path) + , (relayIn, show inf) + , (relayOut, show outf) + , (relayControl, show controlf) + ] + `M.union` M.fromList env + + inh <- liftIO $ fdToHandle readpush + outh <- liftIO $ fdToHandle writepush + controlh <- liftIO $ fdToHandle writecontrol + + t1 <- forkIO <~> toxmpp 0 inh + t2 <- forkIO <~> fromxmpp outh controlh + + {- This can take a long time to run, so avoid running it in the + - Annex monad. Also, override environment. -} + g <- liftAnnex gitRepo + r <- liftIO $ gitpush $ g { gitEnv = Just $ M.toList myenv } + + liftIO $ do + mapM_ killThread [t1, t2] + mapM_ hClose [inh, outh, controlh] + mapM_ closeFd [Fd inf, Fd outf, Fd controlf] + + return r + where + toxmpp seqnum inh = do + b <- liftIO $ B.hGetSome inh chunkSize + if B.null b + then liftIO $ killThread =<< myThreadId + else do + let seqnum' = succ seqnum + sendNetMessage $ Pushing cid $ + SendPackOutput seqnum' b + toxmpp seqnum' inh + + fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle + where + handle (Just (Pushing _ (ReceivePackOutput _ b))) = + liftIO $ writeChunk outh b + handle (Just (Pushing _ (ReceivePackDone exitcode))) = + liftIO $ do + hPrint controlh exitcode + hFlush controlh + handle (Just _) = noop + handle Nothing = do + debug ["timeout waiting for git receive-pack output via XMPP"] + -- Send a synthetic exit code to git-annex + -- xmppgit, which will exit and cause git push + -- to die. + liftIO $ do + hPrint controlh (ExitFailure 1) + hFlush controlh + killThread =<< myThreadId + + installwrapper tmpdir = liftIO $ do + createDirectoryIfMissing True tmpdir + let wrapper = tmpdir </> "git-remote-xmpp" + program <- readProgramFile + writeFile wrapper $ unlines + [ shebang_local + , "exec " ++ program ++ " xmppgit" + ] + modifyFileMode wrapper $ addModes executeModes + + {- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp + - dir (ie, not on a crippled filesystem where we can't make + - the wrapper executable). -} + gettmpdir = do + v <- liftIO $ getEnv "GIT_ANNEX_TMP_DIR" + case v of + Nothing -> do + tmp <- liftAnnex $ fromRepo gitAnnexTmpDir + return $ tmp </> "xmppgit" + Just d -> return $ d </> "xmppgit" + +type EnvVar = String + +envVar :: String -> EnvVar +envVar s = "GIT_ANNEX_XMPPGIT_" ++ s + +relayIn :: EnvVar +relayIn = envVar "IN" + +relayOut :: EnvVar +relayOut = envVar "OUT" + +relayControl :: EnvVar +relayControl = envVar "CONTROL" + +relayHandle :: EnvVar -> IO Handle +relayHandle var = do + v <- getEnv var + case readish =<< v of + Nothing -> error $ var ++ " not set" + Just n -> fdToHandle $ Fd n + +{- Called by git-annex xmppgit. + - + - git-push is talking to us on stdin + - we're talking to git-push on stdout + - git-receive-pack is talking to us on relayIn (via XMPP) + - we're talking to git-receive-pack on relayOut (via XMPP) + - git-receive-pack's exit code will be passed to us on relayControl + -} +xmppGitRelay :: IO () +xmppGitRelay = do + flip relay stdout =<< relayHandle relayIn + relay stdin =<< relayHandle relayOut + code <- hGetLine =<< relayHandle relayControl + exitWith $ fromMaybe (ExitFailure 1) $ readish code + where + {- Is it possible to set up pipes and not need to copy the data + - ourselves? See splice(2) -} + relay fromh toh = void $ forkIO $ forever $ do + b <- B.hGetSome fromh chunkSize + when (B.null b) $ do + hClose fromh + hClose toh + killThread =<< myThreadId + writeChunk toh b + +{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating + - its exit status to XMPP. -} +xmppReceivePack :: ClientID -> Assistant Bool +xmppReceivePack cid = do + repodir <- liftAnnex $ fromRepo repoPath + let p = (proc "git" ["receive-pack", repodir]) + { std_in = CreatePipe + , std_out = CreatePipe + , std_err = Inherit + } + (Just inh, Just outh, _, pid) <- liftIO $ createProcess p + readertid <- forkIO <~> relayfromxmpp inh + relaytoxmpp 0 outh + code <- liftIO $ waitForProcess pid + void $ sendNetMessage $ Pushing cid $ ReceivePackDone code + liftIO $ do + killThread readertid + hClose inh + hClose outh + return $ code == ExitSuccess + where + relaytoxmpp seqnum outh = do + b <- liftIO $ B.hGetSome outh chunkSize + -- empty is EOF, so exit + unless (B.null b) $ do + let seqnum' = succ seqnum + sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b + relaytoxmpp seqnum' outh + relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle + where + handle (Just (Pushing _ (SendPackOutput _ b))) = + liftIO $ writeChunk inh b + handle (Just _) = noop + handle Nothing = do + debug ["timeout waiting for git send-pack output via XMPP"] + -- closing the handle will make git receive-pack exit + liftIO $ do + hClose inh + killThread =<< myThreadId + +xmppRemotes :: ClientID -> UUID -> Assistant [Remote] +xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of + Nothing -> return [] + Just jid -> do + let loc = gitXMPPLocation jid + um <- liftAnnex uuidMap + filter (matching loc . Remote.repo) . filter (knownuuid um) . syncGitRemotes + <$> getDaemonStatus + where + matching loc r = repoIsUrl r && repoLocation r == loc + knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um + +{- Returns the ClientID that it pushed to. -} +runPush :: (Remote -> Assistant ()) -> NetMessage -> Assistant (Maybe ClientID) +runPush checkcloudrepos (Pushing cid (PushRequest theiruuid)) = + go =<< liftAnnex (inRepo Git.Branch.current) + where + go Nothing = return Nothing + go (Just branch) = do + rs <- xmppRemotes cid theiruuid + liftAnnex $ Annex.Branch.commit "update" + (g, u) <- liftAnnex $ (,) + <$> gitRepo + <*> getUUID + liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g + selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus + if null rs + then return Nothing + else do + forM_ rs $ \r -> do + void $ alertWhile (syncAlert [r]) $ + xmppPush cid (taggedPush u selfjid branch r) + checkcloudrepos r + return $ Just cid +runPush checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do + rs <- xmppRemotes cid theiruuid + if null rs + then return Nothing + else do + void $ alertWhile (syncAlert rs) $ + xmppReceivePack cid + mapM_ checkcloudrepos rs + return $ Just cid +runPush _ _ = return Nothing + +{- Check if any of the shas that can be pushed are ones we do not + - have. + - + - (Older clients send no shas, so when there are none, always + - request a push.) + -} +handlePushNotice :: NetMessage -> Assistant () +handlePushNotice (Pushing cid (CanPush theiruuid shas)) = + unlessM (null <$> xmppRemotes cid theiruuid) $ + if null shas + then go + else ifM (haveall shas) + ( debug ["ignoring CanPush with known shas"] + , go + ) + where + go = do + u <- liftAnnex getUUID + sendNetMessage $ Pushing cid (PushRequest u) + haveall l = liftAnnex $ not <$> anyM donthave l + donthave sha = isNothing <$> catObjectDetails sha +handlePushNotice _ = noop + +writeChunk :: Handle -> B.ByteString -> IO () +writeChunk h b = do + B.hPut h b + hFlush h + +{- Gets NetMessages for a PushSide, ensures they are in order, + - and runs an action to handle each in turn. The action will be passed + - Nothing on timeout. + - + - Does not currently reorder messages, but does ensure that any + - duplicate messages, or messages not in the sequence, are discarded. + -} +withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant () +withPushMessagesInSequence cid side a = loop 0 + where + loop seqnum = do + m <- timeout xmppTimeout <~> waitInbox cid side + let go s = a m >> loop s + let next = seqnum + 1 + case extractSequence =<< m of + Just seqnum' + | seqnum' == next -> go next + | seqnum' == 0 -> go seqnum + | seqnum' == seqnum -> do + debug ["ignoring duplicate sequence number", show seqnum] + loop seqnum + | otherwise -> do + debug ["ignoring out of order sequence number", show seqnum', "expected", show next] + loop seqnum + Nothing -> go seqnum + +extractSequence :: NetMessage -> Maybe Int +extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum +extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum +extractSequence _ = Nothing |