diff options
author | Joey Hess <joey@kitenet.net> | 2013-10-13 17:14:56 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2013-10-13 17:14:56 -0400 |
commit | 7e226e373b203dce324b9d3b7fc2a82ca1781183 (patch) | |
tree | 50cf3d083441c4a99d4e46f95a66421e1e1d2a4a /Assistant/Threads | |
parent | a0865e6c494ec4bdfb7ab058a5633a8634d48bd4 (diff) |
cronner: run jobs triggered by remotes becoming connected (untested)
Diffstat (limited to 'Assistant/Threads')
-rw-r--r-- | Assistant/Threads/Cronner.hs | 106 |
1 files changed, 76 insertions, 30 deletions
diff --git a/Assistant/Threads/Cronner.hs b/Assistant/Threads/Cronner.hs index a1f716882..8eb3d472e 100644 --- a/Assistant/Threads/Cronner.hs +++ b/Assistant/Threads/Cronner.hs @@ -34,6 +34,7 @@ import Assistant.WebApp.Types import Git.Remote (RemoteName) import Control.Concurrent.Async +import Control.Concurrent.MVar import Data.Time.LocalTime import Data.Time.Clock import qualified Data.Map as M @@ -42,8 +43,13 @@ import qualified Control.Exception as E import qualified Data.Text as T {- Loads schedules for this repository, and fires off one thread for each - - scheduled event. These threads sleep until the next time the event - - should run. + - scheduled event that runs on this repository. Each thread sleeps until + - its event is scheduled to run. + - + - To handle events that run on remotes, which need to only run when + - their remote gets connected, threads are also started, and are passed + - a MVar to wait on, which is stored in the DaemonStatus's + - connectRemoteNotifiers. - - In the meantime the main thread waits for any changes to the - schedules. When there's a change, compare the old and new list of @@ -53,51 +59,63 @@ cronnerThread :: UrlRenderer -> NamedThread cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do dstatus <- getDaemonStatus h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus) - go h M.empty + go h M.empty M.empty where - go h m = do + go h amap nmap = do activities <- liftAnnex $ scheduleGet =<< getUUID - let addedactivities = activities `S.difference` M.keysSet m - let removedactivities = M.keysSet m `S.difference` activities + let addedactivities = activities `S.difference` M.keysSet amap + let removedactivities = M.keysSet amap `S.difference` activities forM_ (S.toList removedactivities) $ \activity -> - case M.lookup activity m of + case M.lookup activity amap of Just a -> do debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)] liftIO $ cancel a Nothing -> noop lastruntimes <- liftAnnex getLastRunTimes - addedm <- M.fromList <$> startactivities (S.toList addedactivities) lastruntimes + started <- startactivities (S.toList addedactivities) lastruntimes + let addedamap = M.fromList $ map fst started + let addednmap = M.fromList $ catMaybes $ map snd started + + let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities) + let amap' = M.difference (M.union addedamap amap) (removefiltered amap) + let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap) + modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') } liftIO $ waitNotification h debug ["reloading changed activities"] - - let m' = M.difference (M.union addedm m) - (M.filterWithKey (\k _ -> S.member k removedactivities) m) - go h m' - startactivities as lastruntimes = forM as $ \activity -> do - runner <- asIO2 (activityThread urlrenderer) - a <- liftIO $ async $ - runner activity (M.lookup activity lastruntimes) - return (activity, a) + go h amap' nmap' + startactivities as lastruntimes = forM as $ \activity -> + case connectActivityUUID activity of + Nothing -> do + runner <- asIO2 (sleepingActivityThread urlrenderer) + a <- liftIO $ async $ + runner activity (M.lookup activity lastruntimes) + return ((activity, a), Nothing) + Just u -> do + mvar <- liftIO newEmptyMVar + runner <- asIO2 (remoteActivityThread urlrenderer mvar) + a <- liftIO $ async $ + runner activity (M.lookup activity lastruntimes) + return ((activity, a), Just (activity, (u, [mvar]))) {- Calculate the next time the activity is scheduled to run, then - sleep until that time, and run it. Then call setLastRunTime, and - loop. -} -activityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant () -activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime +sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant () +sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime where getnexttime = liftIO . nextTime schedule go _ Nothing = debug ["no scheduled events left for", desc] - go l (Just (NextTimeExactly t)) = runafter l t Nothing run + go l (Just (NextTimeExactly t)) = waitrun l t Nothing go l (Just (NextTimeWindow windowstart windowend)) = - runafter l windowstart (Just windowend) run + waitrun l windowstart (Just windowend) desc = fromScheduledActivity activity schedule = getSchedule activity - runafter l t mmaxt a = do + waitrun l t mmaxt = do seconds <- liftIO $ secondsUntilLocalTime t when (seconds > Seconds 0) $ do debug ["waiting", show seconds, "for next scheduled", desc] @@ -109,7 +127,7 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt then do debug ["too late to run scheduled", desc] go l =<< getnexttime l - else a nowt + else run nowt where tolate nowt tz = case mmaxt of Just maxt -> nowt > maxt @@ -118,12 +136,31 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt (localTimeToUTC tz nowt) (localTimeToUTC tz t) > 600 run nowt = do - debug ["starting", desc] - runActivity urlrenderer activity - debug ["finished", desc] - liftAnnex $ setLastRunTime activity nowt + runActivity urlrenderer activity nowt go (Just nowt) =<< getnexttime (Just nowt) +{- Wait for the remote to become available by waiting on the MVar. + - Then check if the time is within a time window when activity + - is scheduled to run, and if so run it. + - Otherwise, just wait again on the MVar. + -} +remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant () +remoteActivityThread urlrenderer mvar activity lasttime = do + liftIO $ takeMVar mvar + go =<< liftIO (nextTime (getSchedule activity) lasttime) + where + go (Just (NextTimeWindow windowstart windowend)) = do + now <- liftIO getCurrentTime + tz <- liftIO $ getTimeZone now + if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend + then do + let nowt = utcToLocalTime tz now + runActivity urlrenderer activity nowt + loop (Just nowt) + else loop lasttime + go _ = noop -- running at exact time not handled here + loop = remoteActivityThread urlrenderer mvar activity + secondsUntilLocalTime :: LocalTime -> IO Seconds secondsUntilLocalTime t = do now <- getCurrentTime @@ -133,15 +170,24 @@ secondsUntilLocalTime t = do then Seconds secs else Seconds 0 -runActivity :: UrlRenderer -> ScheduledActivity -> Assistant () -runActivity urlrenderer (ScheduledSelfFsck _ d) = do +runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant () +runActivity urlrenderer activity nowt = do + debug ["starting", desc] + runActivity' urlrenderer activity + debug ["finished", desc] + liftAnnex $ setLastRunTime activity nowt + where + desc = fromScheduledActivity activity + +runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant () +runActivity' urlrenderer (ScheduledSelfFsck _ d) = do program <- liftIO $ readProgramFile void $ runFsck urlrenderer Nothing $ batchCommand program (Param "fsck" : fsckParams d) mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir) where reget k = queueTransfers "fsck found bad file; redownloading" Next k Nothing Download -runActivity urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u) +runActivity' urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u) where go (Just r) = void $ case Remote.remoteFsck r of Nothing -> void $ runFsck urlrenderer (Just $ Remote.name r) $ do |