diff options
Diffstat (limited to 'Assistant/Threads/Cronner.hs')
-rw-r--r-- | Assistant/Threads/Cronner.hs | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/Assistant/Threads/Cronner.hs b/Assistant/Threads/Cronner.hs new file mode 100644 index 000000000..55b3ca2f1 --- /dev/null +++ b/Assistant/Threads/Cronner.hs @@ -0,0 +1,225 @@ +{- git-annex assistant sceduled jobs runner + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE DeriveDataTypeable #-} + +module Assistant.Threads.Cronner ( + cronnerThread +) where + +import Assistant.Common +import Assistant.DaemonStatus +import Utility.NotificationBroadcaster +import Annex.UUID +import Config.Files +import Logs.Schedule +import Utility.Scheduled +import Types.ScheduledActivity +import Utility.ThreadScheduler +import Utility.HumanTime +import Utility.Batch +import Assistant.TransferQueue +import Annex.Content +import Logs.Transfer +import Assistant.Types.UrlRenderer +import Assistant.Alert +import Remote +import qualified Types.Remote as Remote +import qualified Git +import qualified Git.Fsck +import Assistant.Fsck +import Assistant.Repair + +import Control.Concurrent.Async +import Control.Concurrent.MVar +import Data.Time.LocalTime +import Data.Time.Clock +import qualified Data.Map as M +import qualified Data.Set as S + +{- Loads schedules for this repository, and fires off one thread for each + - scheduled event that runs on this repository. Each thread sleeps until + - its event is scheduled to run. + - + - To handle events that run on remotes, which need to only run when + - their remote gets connected, threads are also started, and are passed + - a MVar to wait on, which is stored in the DaemonStatus's + - connectRemoteNotifiers. + - + - In the meantime the main thread waits for any changes to the + - schedules. When there's a change, compare the old and new list of + - schedules to find deleted and added ones. Start new threads for added + - ones, and kill the threads for deleted ones. -} +cronnerThread :: UrlRenderer -> NamedThread +cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do + fsckNudge urlrenderer Nothing + dstatus <- getDaemonStatus + h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus) + go h M.empty M.empty + where + go h amap nmap = do + activities <- liftAnnex $ scheduleGet =<< getUUID + + let addedactivities = activities `S.difference` M.keysSet amap + let removedactivities = M.keysSet amap `S.difference` activities + + forM_ (S.toList removedactivities) $ \activity -> + case M.lookup activity amap of + Just a -> do + debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)] + liftIO $ cancel a + Nothing -> noop + + lastruntimes <- liftAnnex getLastRunTimes + started <- startactivities (S.toList addedactivities) lastruntimes + let addedamap = M.fromList $ map fst started + let addednmap = M.fromList $ catMaybes $ map snd started + + let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities) + let amap' = M.difference (M.union addedamap amap) (removefiltered amap) + let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap) + modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') } + + liftIO $ waitNotification h + debug ["reloading changed activities"] + go h amap' nmap' + startactivities as lastruntimes = forM as $ \activity -> + case connectActivityUUID activity of + Nothing -> do + runner <- asIO2 (sleepingActivityThread urlrenderer) + a <- liftIO $ async $ + runner activity (M.lookup activity lastruntimes) + return ((activity, a), Nothing) + Just u -> do + mvar <- liftIO newEmptyMVar + runner <- asIO2 (remoteActivityThread urlrenderer mvar) + a <- liftIO $ async $ + runner activity (M.lookup activity lastruntimes) + return ((activity, a), Just (activity, (u, [mvar]))) + +{- Calculate the next time the activity is scheduled to run, then + - sleep until that time, and run it. Then call setLastRunTime, and + - loop. + -} +sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant () +sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime + where + getnexttime = liftIO . nextTime schedule + go _ Nothing = debug ["no scheduled events left for", desc] + go l (Just (NextTimeExactly t)) = waitrun l t Nothing + go l (Just (NextTimeWindow windowstart windowend)) = + waitrun l windowstart (Just windowend) + desc = fromScheduledActivity activity + schedule = getSchedule activity + waitrun l t mmaxt = do + seconds <- liftIO $ secondsUntilLocalTime t + when (seconds > Seconds 0) $ do + debug ["waiting", show seconds, "for next scheduled", desc] + liftIO $ threadDelaySeconds seconds + now <- liftIO getCurrentTime + tz <- liftIO $ getTimeZone now + let nowt = utcToLocalTime tz now + if tolate nowt tz + then do + debug ["too late to run scheduled", desc] + go l =<< getnexttime l + else run nowt + where + tolate nowt tz = case mmaxt of + Just maxt -> nowt > maxt + -- allow the job to start 10 minutes late + Nothing ->diffUTCTime + (localTimeToUTC tz nowt) + (localTimeToUTC tz t) > 600 + run nowt = do + runActivity urlrenderer activity nowt + go (Just nowt) =<< getnexttime (Just nowt) + +{- Wait for the remote to become available by waiting on the MVar. + - Then check if the time is within a time window when activity + - is scheduled to run, and if so run it. + - Otherwise, just wait again on the MVar. + -} +remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant () +remoteActivityThread urlrenderer mvar activity lasttime = do + liftIO $ takeMVar mvar + go =<< liftIO (nextTime (getSchedule activity) lasttime) + where + go (Just (NextTimeWindow windowstart windowend)) = do + now <- liftIO getCurrentTime + tz <- liftIO $ getTimeZone now + if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend + then do + let nowt = utcToLocalTime tz now + runActivity urlrenderer activity nowt + loop (Just nowt) + else loop lasttime + go _ = noop -- running at exact time not handled here + loop = remoteActivityThread urlrenderer mvar activity + +secondsUntilLocalTime :: LocalTime -> IO Seconds +secondsUntilLocalTime t = do + now <- getCurrentTime + tz <- getTimeZone now + let secs = truncate $ diffUTCTime (localTimeToUTC tz t) now + return $ if secs > 0 + then Seconds secs + else Seconds 0 + +runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant () +runActivity urlrenderer activity nowt = do + debug ["starting", desc] + runActivity' urlrenderer activity + debug ["finished", desc] + liftAnnex $ setLastRunTime activity nowt + where + desc = fromScheduledActivity activity + +runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant () +runActivity' urlrenderer (ScheduledSelfFsck _ d) = do + program <- liftIO $ readProgramFile + g <- liftAnnex gitRepo + fsckresults <- showFscking urlrenderer Nothing $ tryNonAsync $ do + void $ batchCommand program (Param "fsck" : annexFsckParams d) + Git.Fsck.findBroken True g + u <- liftAnnex getUUID + void $ repairWhenNecessary urlrenderer u Nothing fsckresults + mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir) + where + reget k = queueTransfers "fsck found bad file; redownloading" Next k Nothing Download +runActivity' urlrenderer (ScheduledRemoteFsck u s d) = handle =<< liftAnnex (remoteFromUUID u) + where + handle Nothing = debug ["skipping remote fsck of uuid without a configured remote", fromUUID u, fromSchedule s] + handle (Just rmt) = void $ case Remote.remoteFsck rmt of + Nothing -> go rmt $ do + program <- readProgramFile + void $ batchCommand program $ + [ Param "fsck" + -- avoid downloading files + , Param "--fast" + , Param "--from" + , Param $ Remote.name rmt + ] ++ annexFsckParams d + Just mkfscker -> do + {- Note that having mkfsker return an IO action + - avoids running a long duration fsck in the + - Annex monad. -} + go rmt =<< liftAnnex (mkfscker (annexFsckParams d)) + go rmt annexfscker = do + fsckresults <- showFscking urlrenderer (Just rmt) $ tryNonAsync $ do + void annexfscker + let r = Remote.repo rmt + if Git.repoIsLocal r && not (Git.repoIsLocalUnknown r) + then Just <$> Git.Fsck.findBroken True r + else pure Nothing + maybe noop (void . repairWhenNecessary urlrenderer u (Just rmt)) fsckresults + +annexFsckParams :: Duration -> [CommandParam] +annexFsckParams d = + [ Param "--incremental-schedule=1d" + , Param $ "--time-limit=" ++ fromDuration d + ] |