diff options
author | Joey Hess <joey@kitenet.net> | 2013-10-13 17:14:56 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2013-10-13 17:14:56 -0400 |
commit | 7e226e373b203dce324b9d3b7fc2a82ca1781183 (patch) | |
tree | 50cf3d083441c4a99d4e46f95a66421e1e1d2a4a | |
parent | a0865e6c494ec4bdfb7ab058a5633a8634d48bd4 (diff) |
cronner: run jobs triggered by remotes becoming connected (untested)
-rw-r--r-- | Assistant/Sync.hs | 9 | ||||
-rw-r--r-- | Assistant/Threads/Cronner.hs | 106 | ||||
-rw-r--r-- | Assistant/Types/DaemonStatus.hs | 4 | ||||
-rw-r--r-- | Types/ScheduledActivity.hs | 6 |
4 files changed, 94 insertions, 31 deletions
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index fe578ab43..fe4fda896 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -44,13 +44,17 @@ import Control.Concurrent - they push to us. Since XMPP pushes run ansynchronously, any scan of the - XMPP remotes has to be deferred until they're done pushing to us, so - all XMPP remotes are marked as possibly desynced. + - + - Also handles signaling any connectRemoteNotifiers, after the syncing is + - done. -} reconnectRemotes :: Bool -> [Remote] -> Assistant () reconnectRemotes _ [] = noop reconnectRemotes notifypushes rs = void $ do modifyDaemonStatus_ $ \s -> s { desynced = S.union (S.fromList $ map Remote.uuid xmppremotes) (desynced s) } - syncAction rs (const go) + void $ syncAction rs (const go) + mapM_ signal rs where gitremotes = filter (notspecialremote . Remote.repo) rs (xmppremotes, nonxmppremotes) = partition isXMPPRemote rs @@ -73,6 +77,9 @@ reconnectRemotes notifypushes rs = void $ do filter (not . remoteAnnexIgnore . Remote.gitconfig) nonxmppremotes return failed + signal r = liftIO . mapM_ (flip tryPutMVar ()) + =<< fromMaybe [] . M.lookup (Remote.uuid r) . connectRemoteNotifiers + <$> getDaemonStatus {- Updates the local sync branch, then pushes it to all remotes, in - parallel, along with the git-annex branch. This is the same diff --git a/Assistant/Threads/Cronner.hs b/Assistant/Threads/Cronner.hs index a1f716882..8eb3d472e 100644 --- a/Assistant/Threads/Cronner.hs +++ b/Assistant/Threads/Cronner.hs @@ -34,6 +34,7 @@ import Assistant.WebApp.Types import Git.Remote (RemoteName) import Control.Concurrent.Async +import Control.Concurrent.MVar import Data.Time.LocalTime import Data.Time.Clock import qualified Data.Map as M @@ -42,8 +43,13 @@ import qualified Control.Exception as E import qualified Data.Text as T {- Loads schedules for this repository, and fires off one thread for each - - scheduled event. These threads sleep until the next time the event - - should run. + - scheduled event that runs on this repository. Each thread sleeps until + - its event is scheduled to run. + - + - To handle events that run on remotes, which need to only run when + - their remote gets connected, threads are also started, and are passed + - a MVar to wait on, which is stored in the DaemonStatus's + - connectRemoteNotifiers. - - In the meantime the main thread waits for any changes to the - schedules. When there's a change, compare the old and new list of @@ -53,51 +59,63 @@ cronnerThread :: UrlRenderer -> NamedThread cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do dstatus <- getDaemonStatus h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus) - go h M.empty + go h M.empty M.empty where - go h m = do + go h amap nmap = do activities <- liftAnnex $ scheduleGet =<< getUUID - let addedactivities = activities `S.difference` M.keysSet m - let removedactivities = M.keysSet m `S.difference` activities + let addedactivities = activities `S.difference` M.keysSet amap + let removedactivities = M.keysSet amap `S.difference` activities forM_ (S.toList removedactivities) $ \activity -> - case M.lookup activity m of + case M.lookup activity amap of Just a -> do debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)] liftIO $ cancel a Nothing -> noop lastruntimes <- liftAnnex getLastRunTimes - addedm <- M.fromList <$> startactivities (S.toList addedactivities) lastruntimes + started <- startactivities (S.toList addedactivities) lastruntimes + let addedamap = M.fromList $ map fst started + let addednmap = M.fromList $ catMaybes $ map snd started + + let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities) + let amap' = M.difference (M.union addedamap amap) (removefiltered amap) + let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap) + modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') } liftIO $ waitNotification h debug ["reloading changed activities"] - - let m' = M.difference (M.union addedm m) - (M.filterWithKey (\k _ -> S.member k removedactivities) m) - go h m' - startactivities as lastruntimes = forM as $ \activity -> do - runner <- asIO2 (activityThread urlrenderer) - a <- liftIO $ async $ - runner activity (M.lookup activity lastruntimes) - return (activity, a) + go h amap' nmap' + startactivities as lastruntimes = forM as $ \activity -> + case connectActivityUUID activity of + Nothing -> do + runner <- asIO2 (sleepingActivityThread urlrenderer) + a <- liftIO $ async $ + runner activity (M.lookup activity lastruntimes) + return ((activity, a), Nothing) + Just u -> do + mvar <- liftIO newEmptyMVar + runner <- asIO2 (remoteActivityThread urlrenderer mvar) + a <- liftIO $ async $ + runner activity (M.lookup activity lastruntimes) + return ((activity, a), Just (activity, (u, [mvar]))) {- Calculate the next time the activity is scheduled to run, then - sleep until that time, and run it. Then call setLastRunTime, and - loop. -} -activityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant () -activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime +sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant () +sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime where getnexttime = liftIO . nextTime schedule go _ Nothing = debug ["no scheduled events left for", desc] - go l (Just (NextTimeExactly t)) = runafter l t Nothing run + go l (Just (NextTimeExactly t)) = waitrun l t Nothing go l (Just (NextTimeWindow windowstart windowend)) = - runafter l windowstart (Just windowend) run + waitrun l windowstart (Just windowend) desc = fromScheduledActivity activity schedule = getSchedule activity - runafter l t mmaxt a = do + waitrun l t mmaxt = do seconds <- liftIO $ secondsUntilLocalTime t when (seconds > Seconds 0) $ do debug ["waiting", show seconds, "for next scheduled", desc] @@ -109,7 +127,7 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt then do debug ["too late to run scheduled", desc] go l =<< getnexttime l - else a nowt + else run nowt where tolate nowt tz = case mmaxt of Just maxt -> nowt > maxt @@ -118,12 +136,31 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt (localTimeToUTC tz nowt) (localTimeToUTC tz t) > 600 run nowt = do - debug ["starting", desc] - runActivity urlrenderer activity - debug ["finished", desc] - liftAnnex $ setLastRunTime activity nowt + runActivity urlrenderer activity nowt go (Just nowt) =<< getnexttime (Just nowt) +{- Wait for the remote to become available by waiting on the MVar. + - Then check if the time is within a time window when activity + - is scheduled to run, and if so run it. + - Otherwise, just wait again on the MVar. + -} +remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant () +remoteActivityThread urlrenderer mvar activity lasttime = do + liftIO $ takeMVar mvar + go =<< liftIO (nextTime (getSchedule activity) lasttime) + where + go (Just (NextTimeWindow windowstart windowend)) = do + now <- liftIO getCurrentTime + tz <- liftIO $ getTimeZone now + if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend + then do + let nowt = utcToLocalTime tz now + runActivity urlrenderer activity nowt + loop (Just nowt) + else loop lasttime + go _ = noop -- running at exact time not handled here + loop = remoteActivityThread urlrenderer mvar activity + secondsUntilLocalTime :: LocalTime -> IO Seconds secondsUntilLocalTime t = do now <- getCurrentTime @@ -133,15 +170,24 @@ secondsUntilLocalTime t = do then Seconds secs else Seconds 0 -runActivity :: UrlRenderer -> ScheduledActivity -> Assistant () -runActivity urlrenderer (ScheduledSelfFsck _ d) = do +runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant () +runActivity urlrenderer activity nowt = do + debug ["starting", desc] + runActivity' urlrenderer activity + debug ["finished", desc] + liftAnnex $ setLastRunTime activity nowt + where + desc = fromScheduledActivity activity + +runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant () +runActivity' urlrenderer (ScheduledSelfFsck _ d) = do program <- liftIO $ readProgramFile void $ runFsck urlrenderer Nothing $ batchCommand program (Param "fsck" : fsckParams d) mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir) where reget k = queueTransfers "fsck found bad file; redownloading" Next k Nothing Download -runActivity urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u) +runActivity' urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u) where go (Just r) = void $ case Remote.remoteFsck r of Nothing -> void $ runFsck urlrenderer (Just $ Remote.name r) $ do diff --git a/Assistant/Types/DaemonStatus.hs b/Assistant/Types/DaemonStatus.hs index afb5f940a..abb857a36 100644 --- a/Assistant/Types/DaemonStatus.hs +++ b/Assistant/Types/DaemonStatus.hs @@ -18,6 +18,7 @@ import Assistant.Types.NetMessager import Assistant.Types.Alert import Control.Concurrent.STM +import Control.Concurrent.MVar import Control.Concurrent.Async import Data.Time.Clock.POSIX import qualified Data.Map as M @@ -69,6 +70,8 @@ data DaemonStatus = DaemonStatus -- When the XMPP client is connected, this will contain the XMPP -- address. , xmppClientID :: Maybe ClientID + -- MVars to signal when a remote gets connected. + , connectRemoteNotifiers :: M.Map UUID [MVar ()] } type TransferMap = M.Map Transfer TransferInfo @@ -100,3 +103,4 @@ newDaemonStatus = DaemonStatus <*> newNotificationBroadcaster <*> newNotificationBroadcaster <*> pure Nothing + <*> pure M.empty diff --git a/Types/ScheduledActivity.hs b/Types/ScheduledActivity.hs index 386f42333..b683409ce 100644 --- a/Types/ScheduledActivity.hs +++ b/Types/ScheduledActivity.hs @@ -19,6 +19,12 @@ data ScheduledActivity | ScheduledRemoteFsck UUID Schedule Duration deriving (Eq, Read, Show, Ord) +{- Activities that run on a remote, within a time window, so + - should be run when the remote gets connected. -} +connectActivityUUID :: ScheduledActivity -> Maybe UUID +connectActivityUUID (ScheduledRemoteFsck u (Schedule _ AnyTime) _) = Just u +connectActivityUUID _ = Nothing + getSchedule :: ScheduledActivity -> Schedule getSchedule (ScheduledSelfFsck s _) = s getSchedule (ScheduledRemoteFsck _ s _) = s |